Coverage for strategies / first_pullback_strategy.py: 91%
288 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/first_pullback_strategy.py
2from __future__ import annotations
4import asyncio
5import logging
6import os
7import json
8from dataclasses import asdict
9from datetime import timedelta
10from typing import List, Optional, Dict, Tuple
12from interfaces.live_strategy import LiveStrategy
13from common.types import TradeSignal
14from services.stock_query_service import StockQueryService
15from core.market_clock import MarketClock
16from strategies.first_pullback_types import FirstPullbackConfig, FPPositionState
17from services.oneil_universe_service import OneilUniverseService
18from core.logger import get_strategy_logger
21class FirstPullbackStrategy(LiveStrategy):
22 """주도주 첫 눌림목(Holy Grail) 매매 전략.
24 핵심: 급등 후 20MA까지 건전하게 조정받은 주도주가,
25 거래량 고갈 상태에서 반등 양봉을 만드는 순간 진입.
27 진입 조건 (4단계 필터):
28 Phase 1 (Setup): PoolA 종목 중 급등 이력(상한가 or +30%) + 20MA 5일 연속 우상향
29 Phase 2 (Pullback): 장중 저가가 20MA -1%~+3% 범위 + 거래량 ≤ 급등일의 50%
30 Phase 3 (Trigger): 양봉 전환 or 전일고가 돌파 + 체결강도 >= 100%
32 청산 조건:
33 1. 손절: 현재가 < 20MA * (1 - 2%) → 잔량 전체 매도
34 2. 부분 익절: PnL +10~15% 도달 & 미실행 → 50% 매도
35 """
36 STATE_FILE = os.path.join("data", "fp_position_state.json")
38 def __init__(
39 self,
40 stock_query_service: StockQueryService,
41 universe_service: OneilUniverseService,
42 market_clock: MarketClock,
43 config: Optional[FirstPullbackConfig] = None,
44 logger: Optional[logging.Logger] = None,
45 ):
46 self._sqs = stock_query_service
47 self._universe = universe_service
48 self._tm = market_clock
49 self._cfg = config or FirstPullbackConfig()
50 if logger: 50 ↛ 53line 50 didn't jump to line 53 because the condition on line 50 was always true
51 self._logger = logger
52 else:
53 self._logger = get_strategy_logger("FirstPullback", sub_dir="oneil")
55 self._position_state: Dict[str, FPPositionState] = {}
56 self._load_state()
58 @property
59 def name(self) -> str:
60 return "첫눌림목"
62 # ── scan ────────────────────────────────────────────────────────
64 async def scan(self) -> List[TradeSignal]:
65 signals: List[TradeSignal] = []
66 self._logger.info({"event": "scan_started", "strategy_name": self.name})
68 watchlist = await self._universe.get_watchlist(logger=self._logger)
69 if not watchlist:
70 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"})
71 return signals
73 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)})
75 market_progress = self._get_market_progress_ratio()
76 if market_progress <= 0:
77 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"})
78 return signals
80 # 마켓 타이밍 사전 체크
81 market_timing = {
82 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger),
83 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger)
84 }
85 if not any(market_timing.values()):
86 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"})
87 return signals
89 candidates = [
90 (code, item) for code, item in watchlist.items()
91 if code not in self._position_state
92 and market_timing.get(item.market, False)
93 ]
94 for i in range(0, len(candidates), 10):
95 chunk = candidates[i:i + 10]
96 results = await asyncio.gather(
97 *[self._check_entry(code, item, market_progress) for code, item in chunk],
98 return_exceptions=True,
99 )
100 for result in results:
101 if isinstance(result, Exception):
102 self._logger.error(f"Scan error: {result}")
103 elif result:
104 signals.append(result)
106 self._logger.info({"event": "scan_finished", "signals_found": len(signals)})
107 return signals
109 async def _check_entry(self, code, item, progress) -> Optional[TradeSignal]:
110 """진입 조건 검사: Phase 1 → 2 → 3 순서로 필터링."""
111 # ── 현재가 데이터 선행 조회 (OHLCV 캐시 활용 위해) ──
112 resp = await self._sqs.get_current_price(code, caller=self.name)
113 if not resp or resp.rt_cd != "0":
114 return None
116 out = resp.data.get("output") if isinstance(resp.data, dict) else None
117 if not out: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 return None
120 if isinstance(out, dict): 120 ↛ 127line 120 didn't jump to line 127 because the condition on line 120 was always true
121 current = int(out.get("stck_prpr", 0))
122 today_open = int(out.get("stck_oprc", 0))
123 today_low = int(out.get("stck_lwpr", 0))
124 prdy_vrss = int(out.get("prdy_vrss", 0))
125 prdy_vrss_sign = str(out.get("prdy_vrss_sign", "3"))
126 else:
127 current = int(getattr(out, "stck_prpr", 0) or 0)
128 today_open = int(getattr(out, "stck_oprc", 0) or 0)
129 today_low = int(getattr(out, "stck_lwpr", 0) or 0)
130 prdy_vrss = int(getattr(out, "prdy_vrss", 0) or 0)
131 prdy_vrss_sign = str(getattr(out, "prdy_vrss_sign", "3") or "3")
133 # 전일 종가 계산 (현재가와 전일대비를 이용해 역산)
134 if prdy_vrss_sign in ("1", "2"): # 상한, 상승 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 prev_close = current - prdy_vrss
136 elif prdy_vrss_sign in ("4", "5"): # 하한, 하락 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 prev_close = current + prdy_vrss
138 else: # 보합
139 prev_close = current
141 if current <= 0 or today_low <= 0: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 return None
144 # ── Phase 1: Setup (로켓 발사) ── 어제까지 확정 OHLCV(캐시)
145 now = self._tm.get_current_kst_time()
146 yesterday_str = (now - timedelta(days=1)).strftime("%Y%m%d")
147 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=30, end_date=yesterday_str)
148 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else []
149 if not ohlcv or len(ohlcv) < 25:
150 return None
152 surge_result = self._check_surge_history(ohlcv)
153 if not surge_result:
154 self._logger.debug({"event": "entry_rejected", "code": code, "reason": "no_surge_history"})
155 return None
157 surge_volume, surge_day_high = surge_result
159 if not self._check_ma_uptrend(ohlcv): 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 self._logger.debug({"event": "entry_rejected", "code": code, "reason": "ma_not_uptrending"})
161 return None
163 # ── Phase 2: Pullback (건전한 숨 고르기) ──
164 # 20MA 계산 (어제까지 확정 OHLCV 기준)
165 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
166 ma_20d = sum(closes[-self._cfg.ma_period:]) / self._cfg.ma_period if len(closes) >= self._cfg.ma_period else 0
167 if ma_20d <= 0: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 return None
170 if not self._check_pullback_to_ma(today_low, ma_20d):
171 pullback_pct = (today_low - ma_20d) / ma_20d * 100 if ma_20d > 0 else 0.0
172 self._logger.debug({
173 "event": "entry_rejected", "code": code, "reason": "pullback_out_of_range",
174 "pullback_pct": round(pullback_pct, 2),
175 "allowed_range": f"{self._cfg.pullback_lower_pct}% ~ {self._cfg.pullback_upper_pct}%"
176 })
177 return None
179 if not self._check_volume_dryup(ohlcv, surge_volume):
180 days = self._cfg.volume_dryup_days
181 recent_vols = [r.get("volume", 0) for r in ohlcv[-days:]]
182 avg_vol = sum(recent_vols) / len(recent_vols) if recent_vols else 0
183 vol_dryup_pct = (avg_vol / surge_volume * 100) if surge_volume > 0 else 0.0
184 self._logger.debug({
185 "event": "entry_rejected", "code": code, "reason": "volume_not_dry",
186 "vol_dryup_pct": round(vol_dryup_pct, 2), "threshold_pct": self._cfg.volume_dryup_ratio * 100
187 })
188 return None
190 # ── Phase 3: Trigger (매수 방아쇠) ──
191 prev_high = ohlcv[-1].get("high", 0) if ohlcv else 0
192 if not self._check_bullish_reversal(current, today_open, prev_high):
193 self._logger.debug({
194 "event": "entry_rejected", "code": code, "reason": "no_bullish_reversal",
195 "current": current, "today_open": today_open, "prev_high": prev_high
196 })
197 return None
199 # 체결강도 확인
200 cgld_val = 0.0
201 try:
202 ccnl_resp = await self._sqs.get_stock_conclusion(code)
203 if ccnl_resp and ccnl_resp.rt_cd == "0": 203 ↛ 212line 203 didn't jump to line 212 because the condition on line 203 was always true
204 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None
205 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0: 205 ↛ 212line 205 didn't jump to line 212 because the condition on line 205 was always true
206 val = ccnl_output[0].get("tday_rltv")
207 cgld_val = float(val) if val else 0.0
208 except Exception as e:
209 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)})
210 return None
212 if cgld_val < self._cfg.execution_strength_min:
213 self._logger.debug({
214 "event": "entry_rejected", "code": code, "reason": "low_execution_strength",
215 "cgld": cgld_val, "threshold": self._cfg.execution_strength_min
216 })
217 return None
219 # ========= 모든 관문 통과! 매수 시그널 생성 =========
220 qty = self._calculate_qty(current)
222 self._position_state[code] = FPPositionState(
223 entry_price=current,
224 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"),
225 peak_price=current,
226 surge_day_high=surge_day_high,
227 )
228 self._save_state()
230 # 상세 근거 계산
231 recent_vols = [r.get("volume", 0) for r in ohlcv[-self._cfg.volume_dryup_days:]]
232 avg_vol = sum(recent_vols) / len(recent_vols) if recent_vols else 0
233 vol_dryup_pct = (avg_vol / surge_volume * 100) if surge_volume > 0 else 0.0
234 pullback_pct = (today_low - ma_20d) / ma_20d * 100 if ma_20d > 0 else 0.0
236 reason_msg = (
237 f"첫눌림목(20MA {ma_20d:,.0f} 지지({pullback_pct:+.1f}%), "
238 f"거래고갈 {vol_dryup_pct:.0f}%(급등대비), "
239 f"반등확인 {today_open:,}->{current:,}, "
240 f"체결강도 {cgld_val:.1f}%)"
241 )
243 self._logger.info({
244 "event": "buy_signal_generated",
245 "code": code, "name": item.name,
246 "price": current,
247 "reason": reason_msg,
248 })
250 return TradeSignal(
251 code=code, name=item.name, action="BUY", price=current, qty=qty,
252 reason=reason_msg, strategy_name=self.name
253 )
255 # ── Phase 1 검사 메서드 ────────────────────────────────────────
257 def _check_surge_history(self, ohlcv: list) -> Optional[Tuple[int, int]]:
258 """최근 20거래일 내 급등 이력 확인.
260 Returns: (surge_volume, surge_day_high) 또는 None
261 """
262 lookback = min(self._cfg.surge_lookback_days, len(ohlcv))
263 recent = ohlcv[-lookback:]
265 # 조건 A: 상한가 (종가 기준 전일 대비 +29%)
266 for i in range(1, len(recent)):
267 prev_close = recent[i - 1].get("close", 0)
268 curr_close = recent[i].get("close", 0)
269 if prev_close > 0 and curr_close > 0: 269 ↛ 266line 269 didn't jump to line 266 because the condition on line 269 was always true
270 change = (curr_close - prev_close) / prev_close * 100
271 if change >= self._cfg.upper_limit_pct:
272 return (recent[i].get("volume", 0), recent[i].get("high", curr_close))
274 # 조건 B: 단기간(5~10일) +30% 급등
275 for window in range(self._cfg.rapid_surge_min_days, self._cfg.rapid_surge_max_days + 1):
276 for start in range(len(recent) - window):
277 end = start + window
278 start_close = recent[start].get("close", 0)
279 end_close = recent[end].get("close", 0)
280 if start_close > 0 and end_close > 0: 280 ↛ 276line 280 didn't jump to line 276 because the condition on line 280 was always true
281 change = (end_close - start_close) / start_close * 100
282 if change >= self._cfg.rapid_surge_pct:
283 # 기준봉: 구간 내 최대 거래량일
284 window_slice = recent[start:end + 1]
285 max_vol_day = max(window_slice, key=lambda r: r.get("volume", 0))
286 surge_high = max(r.get("high", 0) for r in window_slice)
287 return (max_vol_day.get("volume", 0), surge_high)
289 return None
291 def _check_ma_uptrend(self, ohlcv: list) -> bool:
292 """20일 이동평균선이 최근 5일 연속 우상향인지 확인."""
293 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
294 period = self._cfg.ma_period
295 needed = period + self._cfg.ma_rising_days
296 if len(closes) < needed:
297 return False
299 # 최근 (ma_rising_days + 1)일의 20MA 계산
300 ma_values = []
301 for i in range(self._cfg.ma_rising_days + 1):
302 end_idx = len(closes) - i
303 start_idx = end_idx - period
304 if start_idx < 0: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 return False
306 ma = sum(closes[start_idx:end_idx]) / period
307 ma_values.append(ma)
309 # ma_values: [오늘MA, 어제MA, ..., 5일전MA] (역순)
310 ma_values.reverse() # [5일전MA, ..., 어제MA, 오늘MA]
312 # 5일 연속 기울기 양수 확인
313 for i in range(1, len(ma_values)):
314 if ma_values[i] <= ma_values[i - 1]:
315 return False
317 return True
319 # ── Phase 2 검사 메서드 ────────────────────────────────────────
321 def _check_pullback_to_ma(self, today_low: int, ma_20d: float) -> bool:
322 """장중 최저가가 20MA의 -1% ~ +3% 범위 안에 있는지 확인."""
323 lower = ma_20d * (1 + self._cfg.pullback_lower_pct / 100)
324 upper = ma_20d * (1 + self._cfg.pullback_upper_pct / 100)
325 return lower <= today_low <= upper
327 def _check_volume_dryup(self, ohlcv: list, surge_volume: int) -> bool:
328 """최근 3일 평균 거래량이 급등일 거래량의 50% 이하인지 확인."""
329 if surge_volume <= 0:
330 return False
332 days = self._cfg.volume_dryup_days
333 recent_vols = [r.get("volume", 0) for r in ohlcv[-days:]]
334 if not recent_vols: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 return False
337 avg_vol = sum(recent_vols) / len(recent_vols)
338 return avg_vol <= surge_volume * self._cfg.volume_dryup_ratio
340 # ── Phase 3 검사 메서드 ────────────────────────────────────────
342 def _check_bullish_reversal(self, current: int, today_open: int, prev_high: int) -> bool:
343 """양봉 전환(current > open) 또는 전일 고가 돌파 확인."""
344 if today_open > 0 and current > today_open:
345 return True
346 if prev_high > 0 and current > prev_high:
347 return True
348 return False
350 # ── check_exits ────────────────────────────────────────────────
352 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]:
353 signals = []
354 state_dirty = False
355 for hold in holdings:
356 code = hold.get("code")
357 buy_price = hold.get("buy_price")
358 if not code or not buy_price:
359 continue
361 state = self._position_state.get(code)
362 if not state:
363 state = FPPositionState(
364 entry_price=buy_price,
365 entry_date="",
366 peak_price=buy_price,
367 surge_day_high=0,
368 )
369 self._position_state[code] = state
371 # 현재가 조회
372 resp = await self._sqs.get_current_price(code, caller=self.name)
373 if not resp or resp.rt_cd != "0":
374 continue
376 output = resp.data.get("output") if isinstance(resp.data, dict) else None
377 if not output: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true
378 continue
380 if isinstance(output, dict): 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true
381 current = int(output.get("stck_prpr", 0))
382 else:
383 current = int(getattr(output, "stck_prpr", 0) or 0)
385 if current <= 0: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true
386 continue
388 # 최고가 갱신 (dirty flag — 루프 후 1회 저장)
389 if current > state.peak_price:
390 state.peak_price = current
391 state_dirty = True
393 # 20MA 동적 계산 (매일 변하는 최신 MA)
394 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=self._cfg.ma_period)
395 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else []
396 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
398 pnl = (current - buy_price) / buy_price * 100
399 reason = ""
401 # 🚨 손절: 20MA -2% 이탈 → 잔량 전체 매도
402 if len(closes) >= self._cfg.ma_period: 402 ↛ 409line 402 didn't jump to line 409 because the condition on line 402 was always true
403 ma_20d = sum(closes[-self._cfg.ma_period:]) / self._cfg.ma_period
404 threshold = ma_20d * (1 + self._cfg.stop_loss_below_ma_pct / 100)
405 if current < threshold:
406 reason = f"손절(20MA {ma_20d:,.0f} 하향이탈 {pnl:.1f}%)"
408 # 🌟 부분 익절: 직전 익절가(또는 진입가) 대비 +10% 도달 시 반복 실행
409 if not reason:
410 ref_price = state.last_partial_sell_price if state.last_partial_sell_price > 0 else buy_price
411 pnl_from_ref = (current - ref_price) / ref_price * 100
412 if pnl_from_ref >= self._cfg.take_profit_lower_pct:
413 holding_qty = int(hold.get("qty", 1))
414 sell_qty = max(1, int(holding_qty * self._cfg.partial_sell_ratio))
416 if sell_qty >= holding_qty:
417 sell_qty = holding_qty
418 sell_reason = f"전량익절({pnl_from_ref:.1f}%, 잔고 {holding_qty}주)"
419 else:
420 sell_reason = f"부분익절({pnl_from_ref:.1f}%, {sell_qty}주/{holding_qty}주)"
422 self._logger.info({
423 "event": "partial_profit_signal",
424 "code": code, "pnl": round(pnl_from_ref, 2),
425 "sell_qty": sell_qty, "holding_qty": holding_qty,
426 })
428 state.last_partial_sell_price = current
429 state_dirty = True
431 signals.append(TradeSignal(
432 code=code, name=hold.get("name", code), action="SELL",
433 price=current, qty=sell_qty,
434 reason=sell_reason, strategy_name=self.name
435 ))
436 continue # 부분 매도 후 손절 체크하지 않음
438 # 매도 시그널 생성 (손절)
439 if reason:
440 holding_qty = int(hold.get("qty", 1))
441 self._position_state.pop(code, None)
442 state_dirty = True
443 signals.append(TradeSignal(
444 code=code, name=hold.get("name", code), action="SELL",
445 price=current, qty=holding_qty, reason=reason, strategy_name=self.name
446 ))
448 if state_dirty:
449 self._save_state()
450 return signals
452 # ── 헬퍼 ──────────────────────────────────────────────────────
454 def _calculate_qty(self, price: int) -> int:
455 if price <= 0:
456 return self._cfg.min_qty
457 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100)
458 return max(int(budget / price), self._cfg.min_qty)
460 def _get_market_progress_ratio(self) -> float:
461 now = self._tm.get_current_kst_time()
462 open_t = self._tm.get_market_open_time()
463 close_t = self._tm.get_market_close_time()
464 total = (close_t - open_t).total_seconds()
465 elapsed = (now - open_t).total_seconds()
466 return min(elapsed / total, 1.0) if total > 0 else 0.0
468 def _load_state(self):
469 if os.path.exists(self.STATE_FILE):
470 try:
471 with open(self.STATE_FILE, "r") as f:
472 data = json.load(f)
473 for k, v in data.items():
474 self._position_state[k] = FPPositionState(**v)
475 except Exception:
476 pass
478 def _save_state(self):
479 try:
480 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True)
481 data = {k: asdict(v) for k, v in self._position_state.items()}
482 with open(self.STATE_FILE, "w") as f:
483 json.dump(data, f, indent=2)
484 except Exception:
485 pass