Coverage for strategies / oneil_squeeze_breakout_strategy.py: 92%
235 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/oneil/breakout_strategy.py
2from __future__ import annotations
4import asyncio
5import logging
6import os
7import json
8from dataclasses import dataclass, asdict
9from typing import List, Optional, Dict
11from interfaces.live_strategy import LiveStrategy
12from common.types import TradeSignal, ErrorCode
13from services.stock_query_service import StockQueryService
14from core.market_clock import MarketClock
15from strategies.oneil_common_types import OneilBreakoutConfig, OSBPositionState
16from services.oneil_universe_service import OneilUniverseService
17from core.logger import get_strategy_logger
20class OneilSqueezeBreakoutStrategy(LiveStrategy):
21 """오닐식 스퀴즈 주도주 돌파매매 (O'Neil Squeeze Breakout).
23 핵심: 시장 주도주 중 볼린저 밴드가 극도로 수축(스퀴즈)된 종목이
24 거래량을 동반하며 20일 최고가를 돌파할 때 매수.
25 프로그램 순매수 필터(2중 스마트 머니)로 기관 수급 확인.
27 특징:
28 - 유니버스 관리(종목 발굴)는 OneilUniverseService에 위임.
29 - 이 클래스는 '언제 살까(돌파)'와 '언제 팔까(청산)'에만 집중.
31 [v1 범위]
32 - 유니버스: get_top_trading_value_stocks() → 기본 필터(거래대금/52주고가/정배열)
33 - 매수/매도 뼈대 구축 및 스케줄러 연동 완료
35 [v2 완료 사항]
36 - 아키텍처: OneilUniverseService 완전 분리 및 메모리 캐싱 (API 중복 호출 방지)
37 - 유니버스: Pool A(장 마감 후 배치) / Pool B(장중 3중 그물망 실시간 발굴) 병합
38 - 스코어링: RS(3개월 상대강도 상위10% → +30점), 영업이익 25% 이상 증가 → +20점 적용
39 - 마켓타이밍: ETF 프록시(KODEX 200/코스닥150) 20일선 3일 연속 우상향 로직 적용
41 [v3 예정 (TODO)]
42 - 스코어링 고도화: 업종 소분류 주도 (테마 대장주) 키워드 매칭 스코어링 (+20점) 추가
43 - 마켓타이밍 고도화: 코스닥/코스피 지수 직접 조회 API 연동 (ETF 프록시 대체)
44 - 실시간 호가창(Websocket) 연동을 통한 초단위 고래(>=5000만원) 탐지 (현재는 부하 이슈로 보류)
45 """
46 STATE_FILE = os.path.join("data", "osb_position_state.json")
48 def __init__(
49 self,
50 stock_query_service: StockQueryService,
51 universe_service: OneilUniverseService,
52 market_clock: MarketClock,
53 config: Optional[OneilBreakoutConfig] = None,
54 logger: Optional[logging.Logger] = None,
55 ):
56 self._sqs = stock_query_service
57 self._universe = universe_service
58 self._tm = market_clock
59 self._cfg = config or OneilBreakoutConfig()
60 if logger:
61 self._logger = logger
62 else:
63 self._logger = get_strategy_logger("OneilSqueezeBreakout")
65 self._position_state: Dict[str, OSBPositionState] = {}
66 self._load_state()
68 @property
69 def name(self) -> str:
70 return "오닐스퀴즈돌파"
72 async def scan(self) -> List[TradeSignal]:
73 signals: List[TradeSignal] = []
74 self._logger.info({"event": "scan_started", "strategy_name": self.name})
76 # 1. 유니버스 서비스로부터 완성된 워치리스트 획득 (캐싱됨)
77 watchlist = await self._universe.get_watchlist(logger=self._logger)
78 if not watchlist:
79 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"})
80 return signals
82 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)})
84 # 2. 장중 경과 비율 (거래량 환산용)
85 market_progress = self._get_market_progress_ratio()
86 if market_progress <= 0:
87 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"})
88 return signals
90 # 3. 마켓 타이밍 사전 체크 (루프 내 중복 await 방지)
91 market_timing = {
92 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger),
93 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger)
94 }
95 if not any(market_timing.values()):
96 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"})
97 return signals
99 # 4. 종목별 돌파 체크 (청크 기반 병렬 처리, TPS 제한 대응)
100 candidates = [
101 (code, item) for code, item in watchlist.items()
102 if code not in self._position_state
103 and market_timing.get(item.market, False)
104 ]
105 for i in range(0, len(candidates), 10):
106 chunk = candidates[i:i + 10]
107 results = await asyncio.gather(
108 *[self._check_breakout(code, item, market_progress) for code, item in chunk],
109 return_exceptions=True,
110 )
111 for result in results:
112 if isinstance(result, Exception):
113 self._logger.error(f"Scan error: {result}")
114 elif result:
115 signals.append(result)
117 self._logger.info({"event": "scan_finished", "signals_found": len(signals)})
118 return signals
120 async def _check_breakout(self, code, item, progress) -> Optional[TradeSignal]:
121 # 1. 기본 시세 및 프로그램 수급 조회
122 resp = await self._sqs.get_current_price(code, caller=self.name)
123 if not resp or resp.rt_cd != "0": return None
125 out = resp.data.get("output") if isinstance(resp.data, dict) else None
126 if not out: return None 126 ↛ exitline 126 didn't return from function '_check_breakout' because the return on line 126 wasn't executed
128 if isinstance(out, dict):
129 current = int(out.get("stck_prpr", 0))
130 vol = int(out.get("acml_vol", 0))
131 pg_buy = int(out.get("pgtr_ntby_qty", 0))
132 trade_value = int(out.get("acml_tr_pbmn", 0))
133 else:
134 current = int(getattr(out, "stck_prpr", 0) or 0)
135 vol = int(getattr(out, "acml_vol", 0) or 0)
136 pg_buy = int(getattr(out, "pgtr_ntby_qty", 0) or 0)
137 trade_value = int(getattr(out, "acml_tr_pbmn", 0) or 0)
139 # 🚨 [관문 1] 가격 돌파
140 if current <= item.high_20d:
141 # 너무 많은 로그를 피하기 위해 이 단계는 로그 생략
142 return None
144 # 🚨 [관문 2] 거래량 돌파 (+ 뻥튀기 방어)
145 effective_progress = max(progress, 0.05) # 장 초반 최소 5% 진행 보장
146 proj_vol = vol / effective_progress
148 if vol < (item.avg_vol_20d * 0.3): # 최소 절대 거래량 (평소 20일 평균의 30%) 미달 시 가짜 돌파
149 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "low_absolute_volume", "vol": vol, "min_required": item.avg_vol_20d * 0.3})
150 return None
151 if proj_vol < item.avg_vol_20d * self._cfg.volume_breakout_multiplier:
152 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "insufficient_projected_volume", "proj_vol": int(proj_vol), "threshold": item.avg_vol_20d * self._cfg.volume_breakout_multiplier})
153 return None
155 # 🚨 [관문 3] 스마트 머니(프로그램 수급) 상세 필터
156 if pg_buy <= self._cfg.program_net_buy_min:
157 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "low_program_net_buy", "pg_buy": pg_buy, "min_required": self._cfg.program_net_buy_min})
158 return None
160 pg_buy_amount = pg_buy * current # 프로그램 순매수 금액 (추정)
162 # 3-1. 거래대금의 10% 이상 개입했는가?
163 if trade_value > 0: 163 ↛ 173line 163 didn't jump to line 173 because the condition on line 163 was always true
164 pg_to_tv_pct = pg_buy_amount / trade_value * 100
165 if pg_to_tv_pct < self._cfg.program_to_trade_value_pct:
166 self._logger.debug({
167 "event": "breakout_rejected", "code": code, "reason": "low_program_to_trade_value",
168 "pg_to_tv_pct": round(pg_to_tv_pct, 2), "threshold": self._cfg.program_to_trade_value_pct
169 })
170 return None
172 # 3-2. 시가총액의 0.5% 이상 개입했는가?
173 if item.market_cap > 0: 173 ↛ 182line 173 didn't jump to line 182 because the condition on line 173 was always true
174 pg_to_mc_pct = pg_buy_amount / item.market_cap * 100
175 if pg_to_mc_pct < self._cfg.program_to_market_cap_pct:
176 self._logger.debug({
177 "event": "breakout_rejected", "code": code, "reason": "low_program_to_market_cap",
178 "pg_to_mc_pct": round(pg_to_mc_pct, 2), "threshold": self._cfg.program_to_market_cap_pct
179 })
180 return None
182 self._logger.debug({"event": "smart_money_passed", "code": code, "pg_buy_amount": pg_buy_amount})
184 # 🌟 [최종 관문] 매수 직전 체결강도 스냅샷 (>=120%) 🌟
185 # 이 관문까지 살아서 내려왔다면 조건이 완벽하게 맞은 상태입니다.
186 # 매수 버튼을 누르기 직전, '주식현재가 체결(inquire-ccnl)' API를 1회 쏴서 체결강도를 확인합니다.
187 cgld_val = 0.0
188 try:
189 ccnl_resp = await self._sqs.get_stock_conclusion(code)
190 if ccnl_resp and ccnl_resp.rt_cd == "0": 190 ↛ 201line 190 didn't jump to line 201 because the condition on line 190 was always true
191 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None
192 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0: 192 ↛ 201line 192 didn't jump to line 201 because the condition on line 192 was always true
193 # output은 체결 내역 배열 → 첫 번째(최신) 체결의 당일 체결강도 사용
194 val = ccnl_output[0].get("tday_rltv")
195 cgld_val = float(val) if val else 0.0
196 except Exception as e:
197 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)})
198 # 실패 시 안전을 위해 매수 보류하거나, 정책에 따라 통과시킬 수 있음. 여기서는 보류(None)
199 return None
201 if cgld_val < 120.0:
202 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "low_execution_strength", "cgld": cgld_val})
203 return None
205 # ========= 모든 관문 통과! 매수 시그널 생성 =========
206 qty = self._calculate_qty(current)
207 self._position_state[code] = OSBPositionState(
208 entry_price=current,
209 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"),
210 peak_price=current,
211 breakout_level=item.high_20d
212 )
213 self._save_state()
215 pg_ratio = (pg_buy_amount / trade_value * 100) if trade_value > 0 else 0.0
216 vol_ratio = (proj_vol / item.avg_vol_20d * 100) if item.avg_vol_20d > 0 else 0.0
218 reason_msg = (
219 f"오닐돌파(돌파 {current:,}>{item.high_20d:,}, "
220 f"예상거래 {vol_ratio:.0f}%, "
221 f"PG매수 {pg_buy_amount//100_000_000:,}억({pg_ratio:.1f}%), "
222 f"체결강도 {cgld_val:.1f}%)"
223 )
225 self._logger.info({
226 "event": "buy_signal_generated",
227 "code": code,
228 "name": item.name,
229 "price": current,
230 "reason": reason_msg
231 })
233 return TradeSignal(
234 code=code, name=item.name, action="BUY", price=current, qty=qty,
235 reason=reason_msg, strategy_name=self.name
236 )
238 def _check_trend_break(self, code: str, current_price: int, current_vol: int, ohlcv: list) -> tuple[bool, str]:
239 """추세 이탈 검사 (10일선 붕괴 + 대량 거래량 동반). ohlcv는 호출자가 미리 조회해서 전달."""
240 period = self._cfg.trend_exit_ma_period # 10일
242 if not ohlcv or len(ohlcv) < period: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 return False, ""
245 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
246 volumes = [r.get("volume", 0) for r in ohlcv if r.get("volume")]
248 # 2. 10일 이동평균선 계산
249 ma_10d = sum(closes[-period:]) / period
251 # 🚨 가격 조건: 현재가가 10일선을 깼는가? (안 깼으면 안전하므로 바로 리턴)
252 if current_price >= ma_10d: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 return False, ""
255 # 3. 거래량 조건 검증 (현재가가 10일선을 깬 상태에서만 계산)
256 avg_vol_20d = sum(volumes[-20:]) / 20 if len(volumes) >= 20 else sum(volumes) / len(volumes)
258 progress = self._get_market_progress_ratio()
259 effective_progress = max(progress, 0.05) # 뻥튀기 방어 (최소 5% 진행 보장)
260 proj_vol = current_vol / effective_progress
262 # 🚨 거래량 조건: 장중 환산(예상) 거래량이 평소 20일 평균보다 많은가? (기관 매도 징후)
263 if proj_vol > avg_vol_20d:
264 reason = f"추세이탈(10MA {ma_10d:,.0f} 붕괴+대량거래)"
265 self._logger.warning({
266 "event": "trend_break_triggered",
267 "code": code,
268 "price": current_price,
269 "ma_10d": round(ma_10d, 0),
270 "proj_vol": int(proj_vol),
271 "avg_vol": int(avg_vol_20d)
272 })
273 return True, reason
275 return False, ""
277 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]:
278 signals = []
279 state_dirty = False
280 ohlcv_limit = max(self._cfg.time_stop_days + 20, max(self._cfg.trend_exit_ma_period, 20))
282 for hold in holdings:
283 code = hold.get("code")
284 buy_price = hold.get("buy_price")
285 if not code or not buy_price: continue 285 ↛ 282line 285 didn't jump to line 282 because the continue on line 285 wasn't executed
287 state = self._position_state.get(code)
288 if not state: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true
289 state = OSBPositionState(buy_price, "", buy_price, buy_price)
290 self._position_state[code] = state
292 resp = await self._sqs.get_current_price(code, caller=self.name)
293 if not resp or resp.rt_cd != "0": continue
295 output = resp.data.get("output") if isinstance(resp.data, dict) else None
296 if not output: continue 296 ↛ 282line 296 didn't jump to line 282 because the continue on line 296 wasn't executed
298 if isinstance(output, dict): 298 ↛ 302line 298 didn't jump to line 302 because the condition on line 298 was always true
299 current = int(output.get("stck_prpr", 0))
300 current_vol = int(output.get("acml_vol", 0))
301 else:
302 current = int(getattr(output, "stck_prpr", 0) or 0)
303 current_vol = int(getattr(output, "acml_vol", 0) or 0)
305 if current <= 0: continue 305 ↛ 282line 305 didn't jump to line 282 because the continue on line 305 wasn't executed
307 # 최고가 갱신 (dirty flag — 루프 후 1회 저장)
308 if current > state.peak_price:
309 state.peak_price = current
310 state_dirty = True
312 pnl = (current - buy_price) / buy_price * 100
313 reason = ""
315 # 1. 손절
316 if pnl <= self._cfg.stop_loss_pct:
317 reason = f"손절({pnl:.1f}%)"
318 # 2. 트레일링 스탑
319 elif state.peak_price > 0: 319 ↛ 325line 319 didn't jump to line 325 because the condition on line 319 was always true
320 drop = (current - state.peak_price) / state.peak_price * 100
321 if drop <= -self._cfg.trailing_stop_pct:
322 reason = f"트레일링스탑({drop:.1f}%)"
324 # 3·4. 시간손절 + 추세이탈 — OHLCV 1회 조회 후 양쪽에 전달
325 if not reason:
326 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=ohlcv_limit)
327 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else []
329 if self._check_time_stop(state, current, ohlcv): 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 reason = f"시간손절({self._cfg.time_stop_days}일 횡보)"
331 elif ohlcv:
332 is_break, break_reason = self._check_trend_break(code, current, current_vol, ohlcv)
333 if is_break:
334 reason = break_reason
336 # 매도 시그널 생성
337 if reason:
338 holding_qty = int(hold.get("qty", 1))
339 self._position_state.pop(code, None)
340 state_dirty = True
341 signals.append(TradeSignal(
342 code=code, name=hold.get("name", code), action="SELL",
343 price=current, qty=holding_qty, reason=reason, strategy_name=self.name
344 ))
346 if state_dirty:
347 self._save_state()
348 return signals
350 def _check_time_stop(self, state: OSBPositionState, current_price: int, ohlcv: list) -> bool:
351 """시간 손절 조건 체크. ohlcv는 호출자가 미리 조회해서 전달.
353 조건:
354 1. 진입 후 N거래일(time_stop_days) 경과
355 2. 현재가가 진입가 대비 박스권(time_stop_box_range_pct) 이내 횡보
356 3. 진입 후 시세 분출 이력(peak_price 급등)이 없어야 함
357 """
358 if not state.entry_date or state.entry_price <= 0: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 return False
361 today_str = self._tm.get_current_kst_time().strftime("%Y%m%d")
362 if state.entry_date.replace("-", "") == today_str:
363 return False
365 if not ohlcv:
366 return False
368 trading_days = 0
369 safe_entry_date = state.entry_date.replace("-", "")
371 # 🌟 버그 수정: == 대신 >= 를 사용하여 하이픈 제거 및 진입일 이후 데이터 필터링
372 for candle in ohlcv:
373 date_str = str(candle.get('date', '')).replace("-", "")
374 if date_str > safe_entry_date: # 진입일 '다음 날'부터 1일로 카운트
375 trading_days += 1
377 # 설정된 거래일이 안 지났으면 패스
378 if trading_days < self._cfg.time_stop_days:
379 return False
381 # 2. 횡보 또는 하락 조건 확인 (현재가가 박스권 상단 이상으로 치고 나가지 못했는가?)
382 pnl_pct = (current_price - state.entry_price) / state.entry_price * 100
384 # 🌟 버그 수정: abs() 제거. 2% 이상 '상승'한 게 아니라면 다 잘라버림 (하락 포함)
385 if pnl_pct > self._cfg.time_stop_box_range_pct:
386 return False
388 # 3. '찍고 내려온 놈' 제외 (최고가가 진입가 대비 크게 오르지 않았어야 함)
389 peak_pnl_pct = (state.peak_price - state.entry_price) / state.entry_price * 100
390 if peak_pnl_pct > (self._cfg.time_stop_box_range_pct * 2.5):
391 return False
393 self._logger.info({
394 "event": "time_stop_triggered",
395 "entry_date": state.entry_date,
396 "trading_days": trading_days,
397 "pnl_pct": round(pnl_pct, 2)
398 })
399 return True
401 def _calculate_qty(self, price: int) -> int:
402 if price <= 0: return 1
403 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100)
404 return max(int(budget / price), 1)
406 def _get_market_progress_ratio(self) -> float:
407 now = self._tm.get_current_kst_time()
408 open_t = self._tm.get_market_open_time()
409 close_t = self._tm.get_market_close_time()
410 total = (close_t - open_t).total_seconds()
411 elapsed = (now - open_t).total_seconds()
412 return min(elapsed / total, 1.0) if total > 0 else 0.0
414 def _load_state(self):
415 if os.path.exists(self.STATE_FILE):
416 try:
417 with open(self.STATE_FILE, "r") as f:
418 data = json.load(f)
419 for k, v in data.items():
420 self._position_state[k] = OSBPositionState(**v)
421 except: pass
423 def _save_state(self):
424 try:
425 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True)
426 data = {k: asdict(v) for k, v in self._position_state.items()}
427 with open(self.STATE_FILE, "w") as f:
428 json.dump(data, f, indent=2)
429 except: pass