Coverage for strategies / oneil_squeeze_breakout_strategy.py: 92%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/oneil/breakout_strategy.py 

2from __future__ import annotations 

3 

4import asyncio 

5import logging 

6import os 

7import json 

8from dataclasses import dataclass, asdict 

9from typing import List, Optional, Dict 

10 

11from interfaces.live_strategy import LiveStrategy 

12from common.types import TradeSignal, ErrorCode 

13from services.stock_query_service import StockQueryService 

14from core.market_clock import MarketClock 

15from strategies.oneil_common_types import OneilBreakoutConfig, OSBPositionState 

16from services.oneil_universe_service import OneilUniverseService 

17from core.logger import get_strategy_logger 

18 

19 

20class OneilSqueezeBreakoutStrategy(LiveStrategy): 

21 """오닐식 스퀴즈 주도주 돌파매매 (O'Neil Squeeze Breakout). 

22 

23 핵심: 시장 주도주 중 볼린저 밴드가 극도로 수축(스퀴즈)된 종목이 

24 거래량을 동반하며 20일 최고가를 돌파할 때 매수. 

25 프로그램 순매수 필터(2중 스마트 머니)로 기관 수급 확인. 

26  

27 특징: 

28 - 유니버스 관리(종목 발굴)는 OneilUniverseService에 위임. 

29 - 이 클래스는 '언제 살까(돌파)'와 '언제 팔까(청산)'에만 집중. 

30  

31 [v1 범위] 

32 - 유니버스: get_top_trading_value_stocks() → 기본 필터(거래대금/52주고가/정배열) 

33 - 매수/매도 뼈대 구축 및 스케줄러 연동 완료 

34 

35 [v2 완료 사항] 

36 - 아키텍처: OneilUniverseService 완전 분리 및 메모리 캐싱 (API 중복 호출 방지) 

37 - 유니버스: Pool A(장 마감 후 배치) / Pool B(장중 3중 그물망 실시간 발굴) 병합 

38 - 스코어링: RS(3개월 상대강도 상위10% → +30점), 영업이익 25% 이상 증가 → +20점 적용 

39 - 마켓타이밍: ETF 프록시(KODEX 200/코스닥150) 20일선 3일 연속 우상향 로직 적용 

40  

41 [v3 예정 (TODO)] 

42 - 스코어링 고도화: 업종 소분류 주도 (테마 대장주) 키워드 매칭 스코어링 (+20점) 추가 

43 - 마켓타이밍 고도화: 코스닥/코스피 지수 직접 조회 API 연동 (ETF 프록시 대체) 

44 - 실시간 호가창(Websocket) 연동을 통한 초단위 고래(>=5000만원) 탐지 (현재는 부하 이슈로 보류) 

45 """ 

46 STATE_FILE = os.path.join("data", "osb_position_state.json") 

47 

48 def __init__( 

49 self, 

50 stock_query_service: StockQueryService, 

51 universe_service: OneilUniverseService, 

52 market_clock: MarketClock, 

53 config: Optional[OneilBreakoutConfig] = None, 

54 logger: Optional[logging.Logger] = None, 

55 ): 

56 self._sqs = stock_query_service 

57 self._universe = universe_service 

58 self._tm = market_clock 

59 self._cfg = config or OneilBreakoutConfig() 

60 if logger: 

61 self._logger = logger 

62 else: 

63 self._logger = get_strategy_logger("OneilSqueezeBreakout") 

64 

65 self._position_state: Dict[str, OSBPositionState] = {} 

66 self._load_state() 

67 

68 @property 

69 def name(self) -> str: 

70 return "오닐스퀴즈돌파" 

71 

72 async def scan(self) -> List[TradeSignal]: 

73 signals: List[TradeSignal] = [] 

74 self._logger.info({"event": "scan_started", "strategy_name": self.name}) 

75 

76 # 1. 유니버스 서비스로부터 완성된 워치리스트 획득 (캐싱됨) 

77 watchlist = await self._universe.get_watchlist(logger=self._logger) 

78 if not watchlist: 

79 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"}) 

80 return signals 

81 

82 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)}) 

83 

84 # 2. 장중 경과 비율 (거래량 환산용) 

85 market_progress = self._get_market_progress_ratio() 

86 if market_progress <= 0: 

87 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"}) 

88 return signals 

89 

90 # 3. 마켓 타이밍 사전 체크 (루프 내 중복 await 방지) 

91 market_timing = { 

92 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger), 

93 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger) 

94 } 

95 if not any(market_timing.values()): 

96 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"}) 

97 return signals 

98 

99 # 4. 종목별 돌파 체크 (청크 기반 병렬 처리, TPS 제한 대응) 

100 candidates = [ 

101 (code, item) for code, item in watchlist.items() 

102 if code not in self._position_state 

103 and market_timing.get(item.market, False) 

104 ] 

105 for i in range(0, len(candidates), 10): 

106 chunk = candidates[i:i + 10] 

107 results = await asyncio.gather( 

108 *[self._check_breakout(code, item, market_progress) for code, item in chunk], 

109 return_exceptions=True, 

110 ) 

111 for result in results: 

112 if isinstance(result, Exception): 

113 self._logger.error(f"Scan error: {result}") 

114 elif result: 

115 signals.append(result) 

116 

117 self._logger.info({"event": "scan_finished", "signals_found": len(signals)}) 

118 return signals 

119 

120 async def _check_breakout(self, code, item, progress) -> Optional[TradeSignal]: 

121 # 1. 기본 시세 및 프로그램 수급 조회 

122 resp = await self._sqs.get_current_price(code, caller=self.name) 

123 if not resp or resp.rt_cd != "0": return None 

124 

125 out = resp.data.get("output") if isinstance(resp.data, dict) else None 

126 if not out: return None 126 ↛ exitline 126 didn't return from function '_check_breakout' because the return on line 126 wasn't executed

127 

128 if isinstance(out, dict): 

129 current = int(out.get("stck_prpr", 0)) 

130 vol = int(out.get("acml_vol", 0)) 

131 pg_buy = int(out.get("pgtr_ntby_qty", 0)) 

132 trade_value = int(out.get("acml_tr_pbmn", 0)) 

133 else: 

134 current = int(getattr(out, "stck_prpr", 0) or 0) 

135 vol = int(getattr(out, "acml_vol", 0) or 0) 

136 pg_buy = int(getattr(out, "pgtr_ntby_qty", 0) or 0) 

137 trade_value = int(getattr(out, "acml_tr_pbmn", 0) or 0) 

138 

139 # 🚨 [관문 1] 가격 돌파 

140 if current <= item.high_20d: 

141 # 너무 많은 로그를 피하기 위해 이 단계는 로그 생략 

142 return None 

143 

144 # 🚨 [관문 2] 거래량 돌파 (+ 뻥튀기 방어) 

145 effective_progress = max(progress, 0.05) # 장 초반 최소 5% 진행 보장 

146 proj_vol = vol / effective_progress 

147 

148 if vol < (item.avg_vol_20d * 0.3): # 최소 절대 거래량 (평소 20일 평균의 30%) 미달 시 가짜 돌파 

149 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "low_absolute_volume", "vol": vol, "min_required": item.avg_vol_20d * 0.3}) 

150 return None 

151 if proj_vol < item.avg_vol_20d * self._cfg.volume_breakout_multiplier: 

152 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "insufficient_projected_volume", "proj_vol": int(proj_vol), "threshold": item.avg_vol_20d * self._cfg.volume_breakout_multiplier}) 

153 return None 

154 

155 # 🚨 [관문 3] 스마트 머니(프로그램 수급) 상세 필터 

156 if pg_buy <= self._cfg.program_net_buy_min: 

157 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "low_program_net_buy", "pg_buy": pg_buy, "min_required": self._cfg.program_net_buy_min}) 

158 return None 

159 

160 pg_buy_amount = pg_buy * current # 프로그램 순매수 금액 (추정) 

161 

162 # 3-1. 거래대금의 10% 이상 개입했는가? 

163 if trade_value > 0: 163 ↛ 173line 163 didn't jump to line 173 because the condition on line 163 was always true

164 pg_to_tv_pct = pg_buy_amount / trade_value * 100 

165 if pg_to_tv_pct < self._cfg.program_to_trade_value_pct: 

166 self._logger.debug({ 

167 "event": "breakout_rejected", "code": code, "reason": "low_program_to_trade_value", 

168 "pg_to_tv_pct": round(pg_to_tv_pct, 2), "threshold": self._cfg.program_to_trade_value_pct 

169 }) 

170 return None 

171 

172 # 3-2. 시가총액의 0.5% 이상 개입했는가? 

173 if item.market_cap > 0: 173 ↛ 182line 173 didn't jump to line 182 because the condition on line 173 was always true

174 pg_to_mc_pct = pg_buy_amount / item.market_cap * 100 

175 if pg_to_mc_pct < self._cfg.program_to_market_cap_pct: 

176 self._logger.debug({ 

177 "event": "breakout_rejected", "code": code, "reason": "low_program_to_market_cap", 

178 "pg_to_mc_pct": round(pg_to_mc_pct, 2), "threshold": self._cfg.program_to_market_cap_pct 

179 }) 

180 return None 

181 

182 self._logger.debug({"event": "smart_money_passed", "code": code, "pg_buy_amount": pg_buy_amount}) 

183 

184 # 🌟 [최종 관문] 매수 직전 체결강도 스냅샷 (>=120%) 🌟 

185 # 이 관문까지 살아서 내려왔다면 조건이 완벽하게 맞은 상태입니다. 

186 # 매수 버튼을 누르기 직전, '주식현재가 체결(inquire-ccnl)' API를 1회 쏴서 체결강도를 확인합니다. 

187 cgld_val = 0.0 

188 try: 

189 ccnl_resp = await self._sqs.get_stock_conclusion(code) 

190 if ccnl_resp and ccnl_resp.rt_cd == "0": 190 ↛ 201line 190 didn't jump to line 201 because the condition on line 190 was always true

191 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None 

192 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0: 192 ↛ 201line 192 didn't jump to line 201 because the condition on line 192 was always true

193 # output은 체결 내역 배열 → 첫 번째(최신) 체결의 당일 체결강도 사용 

194 val = ccnl_output[0].get("tday_rltv") 

195 cgld_val = float(val) if val else 0.0 

196 except Exception as e: 

197 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)}) 

198 # 실패 시 안전을 위해 매수 보류하거나, 정책에 따라 통과시킬 수 있음. 여기서는 보류(None) 

199 return None 

200 

201 if cgld_val < 120.0: 

202 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "low_execution_strength", "cgld": cgld_val}) 

203 return None 

204 

205 # ========= 모든 관문 통과! 매수 시그널 생성 ========= 

206 qty = self._calculate_qty(current) 

207 self._position_state[code] = OSBPositionState( 

208 entry_price=current, 

209 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"), 

210 peak_price=current, 

211 breakout_level=item.high_20d 

212 ) 

213 self._save_state() 

214 

215 pg_ratio = (pg_buy_amount / trade_value * 100) if trade_value > 0 else 0.0 

216 vol_ratio = (proj_vol / item.avg_vol_20d * 100) if item.avg_vol_20d > 0 else 0.0 

217 

218 reason_msg = ( 

219 f"오닐돌파(돌파 {current:,}>{item.high_20d:,}, " 

220 f"예상거래 {vol_ratio:.0f}%, " 

221 f"PG매수 {pg_buy_amount//100_000_000:,}억({pg_ratio:.1f}%), " 

222 f"체결강도 {cgld_val:.1f}%)" 

223 ) 

224 

225 self._logger.info({ 

226 "event": "buy_signal_generated", 

227 "code": code, 

228 "name": item.name, 

229 "price": current, 

230 "reason": reason_msg 

231 }) 

232 

233 return TradeSignal( 

234 code=code, name=item.name, action="BUY", price=current, qty=qty, 

235 reason=reason_msg, strategy_name=self.name 

236 ) 

237 

238 def _check_trend_break(self, code: str, current_price: int, current_vol: int, ohlcv: list) -> tuple[bool, str]: 

239 """추세 이탈 검사 (10일선 붕괴 + 대량 거래량 동반). ohlcv는 호출자가 미리 조회해서 전달.""" 

240 period = self._cfg.trend_exit_ma_period # 10일 

241 

242 if not ohlcv or len(ohlcv) < period: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 return False, "" 

244 

245 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

246 volumes = [r.get("volume", 0) for r in ohlcv if r.get("volume")] 

247 

248 # 2. 10일 이동평균선 계산 

249 ma_10d = sum(closes[-period:]) / period 

250 

251 # 🚨 가격 조건: 현재가가 10일선을 깼는가? (안 깼으면 안전하므로 바로 리턴) 

252 if current_price >= ma_10d: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 return False, "" 

254 

255 # 3. 거래량 조건 검증 (현재가가 10일선을 깬 상태에서만 계산) 

256 avg_vol_20d = sum(volumes[-20:]) / 20 if len(volumes) >= 20 else sum(volumes) / len(volumes) 

257 

258 progress = self._get_market_progress_ratio() 

259 effective_progress = max(progress, 0.05) # 뻥튀기 방어 (최소 5% 진행 보장) 

260 proj_vol = current_vol / effective_progress 

261 

262 # 🚨 거래량 조건: 장중 환산(예상) 거래량이 평소 20일 평균보다 많은가? (기관 매도 징후) 

263 if proj_vol > avg_vol_20d: 

264 reason = f"추세이탈(10MA {ma_10d:,.0f} 붕괴+대량거래)" 

265 self._logger.warning({ 

266 "event": "trend_break_triggered", 

267 "code": code, 

268 "price": current_price, 

269 "ma_10d": round(ma_10d, 0), 

270 "proj_vol": int(proj_vol), 

271 "avg_vol": int(avg_vol_20d) 

272 }) 

273 return True, reason 

274 

275 return False, "" 

276 

277 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]: 

278 signals = [] 

279 state_dirty = False 

280 ohlcv_limit = max(self._cfg.time_stop_days + 20, max(self._cfg.trend_exit_ma_period, 20)) 

281 

282 for hold in holdings: 

283 code = hold.get("code") 

284 buy_price = hold.get("buy_price") 

285 if not code or not buy_price: continue 285 ↛ 282line 285 didn't jump to line 282 because the continue on line 285 wasn't executed

286 

287 state = self._position_state.get(code) 

288 if not state: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true

289 state = OSBPositionState(buy_price, "", buy_price, buy_price) 

290 self._position_state[code] = state 

291 

292 resp = await self._sqs.get_current_price(code, caller=self.name) 

293 if not resp or resp.rt_cd != "0": continue 

294 

295 output = resp.data.get("output") if isinstance(resp.data, dict) else None 

296 if not output: continue 296 ↛ 282line 296 didn't jump to line 282 because the continue on line 296 wasn't executed

297 

298 if isinstance(output, dict): 298 ↛ 302line 298 didn't jump to line 302 because the condition on line 298 was always true

299 current = int(output.get("stck_prpr", 0)) 

300 current_vol = int(output.get("acml_vol", 0)) 

301 else: 

302 current = int(getattr(output, "stck_prpr", 0) or 0) 

303 current_vol = int(getattr(output, "acml_vol", 0) or 0) 

304 

305 if current <= 0: continue 305 ↛ 282line 305 didn't jump to line 282 because the continue on line 305 wasn't executed

306 

307 # 최고가 갱신 (dirty flag — 루프 후 1회 저장) 

308 if current > state.peak_price: 

309 state.peak_price = current 

310 state_dirty = True 

311 

312 pnl = (current - buy_price) / buy_price * 100 

313 reason = "" 

314 

315 # 1. 손절 

316 if pnl <= self._cfg.stop_loss_pct: 

317 reason = f"손절({pnl:.1f}%)" 

318 # 2. 트레일링 스탑 

319 elif state.peak_price > 0: 319 ↛ 325line 319 didn't jump to line 325 because the condition on line 319 was always true

320 drop = (current - state.peak_price) / state.peak_price * 100 

321 if drop <= -self._cfg.trailing_stop_pct: 

322 reason = f"트레일링스탑({drop:.1f}%)" 

323 

324 # 3·4. 시간손절 + 추세이탈 — OHLCV 1회 조회 후 양쪽에 전달 

325 if not reason: 

326 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=ohlcv_limit) 

327 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

328 

329 if self._check_time_stop(state, current, ohlcv): 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 reason = f"시간손절({self._cfg.time_stop_days}일 횡보)" 

331 elif ohlcv: 

332 is_break, break_reason = self._check_trend_break(code, current, current_vol, ohlcv) 

333 if is_break: 

334 reason = break_reason 

335 

336 # 매도 시그널 생성 

337 if reason: 

338 holding_qty = int(hold.get("qty", 1)) 

339 self._position_state.pop(code, None) 

340 state_dirty = True 

341 signals.append(TradeSignal( 

342 code=code, name=hold.get("name", code), action="SELL", 

343 price=current, qty=holding_qty, reason=reason, strategy_name=self.name 

344 )) 

345 

346 if state_dirty: 

347 self._save_state() 

348 return signals 

349 

350 def _check_time_stop(self, state: OSBPositionState, current_price: int, ohlcv: list) -> bool: 

351 """시간 손절 조건 체크. ohlcv는 호출자가 미리 조회해서 전달. 

352 

353 조건: 

354 1. 진입 후 N거래일(time_stop_days) 경과 

355 2. 현재가가 진입가 대비 박스권(time_stop_box_range_pct) 이내 횡보 

356 3. 진입 후 시세 분출 이력(peak_price 급등)이 없어야 함 

357 """ 

358 if not state.entry_date or state.entry_price <= 0: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 return False 

360 

361 today_str = self._tm.get_current_kst_time().strftime("%Y%m%d") 

362 if state.entry_date.replace("-", "") == today_str: 

363 return False 

364 

365 if not ohlcv: 

366 return False 

367 

368 trading_days = 0 

369 safe_entry_date = state.entry_date.replace("-", "") 

370 

371 # 🌟 버그 수정: == 대신 >= 를 사용하여 하이픈 제거 및 진입일 이후 데이터 필터링 

372 for candle in ohlcv: 

373 date_str = str(candle.get('date', '')).replace("-", "") 

374 if date_str > safe_entry_date: # 진입일 '다음 날'부터 1일로 카운트 

375 trading_days += 1 

376 

377 # 설정된 거래일이 안 지났으면 패스 

378 if trading_days < self._cfg.time_stop_days: 

379 return False 

380 

381 # 2. 횡보 또는 하락 조건 확인 (현재가가 박스권 상단 이상으로 치고 나가지 못했는가?) 

382 pnl_pct = (current_price - state.entry_price) / state.entry_price * 100 

383 

384 # 🌟 버그 수정: abs() 제거. 2% 이상 '상승'한 게 아니라면 다 잘라버림 (하락 포함) 

385 if pnl_pct > self._cfg.time_stop_box_range_pct: 

386 return False 

387 

388 # 3. '찍고 내려온 놈' 제외 (최고가가 진입가 대비 크게 오르지 않았어야 함) 

389 peak_pnl_pct = (state.peak_price - state.entry_price) / state.entry_price * 100 

390 if peak_pnl_pct > (self._cfg.time_stop_box_range_pct * 2.5): 

391 return False 

392 

393 self._logger.info({ 

394 "event": "time_stop_triggered", 

395 "entry_date": state.entry_date, 

396 "trading_days": trading_days, 

397 "pnl_pct": round(pnl_pct, 2) 

398 }) 

399 return True 

400 

401 def _calculate_qty(self, price: int) -> int: 

402 if price <= 0: return 1 

403 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100) 

404 return max(int(budget / price), 1) 

405 

406 def _get_market_progress_ratio(self) -> float: 

407 now = self._tm.get_current_kst_time() 

408 open_t = self._tm.get_market_open_time() 

409 close_t = self._tm.get_market_close_time() 

410 total = (close_t - open_t).total_seconds() 

411 elapsed = (now - open_t).total_seconds() 

412 return min(elapsed / total, 1.0) if total > 0 else 0.0 

413 

414 def _load_state(self): 

415 if os.path.exists(self.STATE_FILE): 

416 try: 

417 with open(self.STATE_FILE, "r") as f: 

418 data = json.load(f) 

419 for k, v in data.items(): 

420 self._position_state[k] = OSBPositionState(**v) 

421 except: pass 

422 

423 def _save_state(self): 

424 try: 

425 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True) 

426 data = {k: asdict(v) for k, v in self._position_state.items()} 

427 with open(self.STATE_FILE, "w") as f: 

428 json.dump(data, f, indent=2) 

429 except: pass