Coverage for strategies / first_pullback_strategy.py: 91%

288 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/first_pullback_strategy.py 

2from __future__ import annotations 

3 

4import asyncio 

5import logging 

6import os 

7import json 

8from dataclasses import asdict 

9from datetime import timedelta 

10from typing import List, Optional, Dict, Tuple 

11 

12from interfaces.live_strategy import LiveStrategy 

13from common.types import TradeSignal 

14from services.stock_query_service import StockQueryService 

15from core.market_clock import MarketClock 

16from strategies.first_pullback_types import FirstPullbackConfig, FPPositionState 

17from services.oneil_universe_service import OneilUniverseService 

18from core.logger import get_strategy_logger 

19 

20 

21class FirstPullbackStrategy(LiveStrategy): 

22 """주도주 첫 눌림목(Holy Grail) 매매 전략. 

23 

24 핵심: 급등 후 20MA까지 건전하게 조정받은 주도주가, 

25 거래량 고갈 상태에서 반등 양봉을 만드는 순간 진입. 

26 

27 진입 조건 (4단계 필터): 

28 Phase 1 (Setup): PoolA 종목 중 급등 이력(상한가 or +30%) + 20MA 5일 연속 우상향 

29 Phase 2 (Pullback): 장중 저가가 20MA -1%~+3% 범위 + 거래량 ≤ 급등일의 50% 

30 Phase 3 (Trigger): 양봉 전환 or 전일고가 돌파 + 체결강도 >= 100% 

31 

32 청산 조건: 

33 1. 손절: 현재가 < 20MA * (1 - 2%) → 잔량 전체 매도 

34 2. 부분 익절: PnL +10~15% 도달 & 미실행 → 50% 매도 

35 """ 

36 STATE_FILE = os.path.join("data", "fp_position_state.json") 

37 

38 def __init__( 

39 self, 

40 stock_query_service: StockQueryService, 

41 universe_service: OneilUniverseService, 

42 market_clock: MarketClock, 

43 config: Optional[FirstPullbackConfig] = None, 

44 logger: Optional[logging.Logger] = None, 

45 ): 

46 self._sqs = stock_query_service 

47 self._universe = universe_service 

48 self._tm = market_clock 

49 self._cfg = config or FirstPullbackConfig() 

50 if logger: 50 ↛ 53line 50 didn't jump to line 53 because the condition on line 50 was always true

51 self._logger = logger 

52 else: 

53 self._logger = get_strategy_logger("FirstPullback", sub_dir="oneil") 

54 

55 self._position_state: Dict[str, FPPositionState] = {} 

56 self._load_state() 

57 

58 @property 

59 def name(self) -> str: 

60 return "첫눌림목" 

61 

62 # ── scan ──────────────────────────────────────────────────────── 

63 

64 async def scan(self) -> List[TradeSignal]: 

65 signals: List[TradeSignal] = [] 

66 self._logger.info({"event": "scan_started", "strategy_name": self.name}) 

67 

68 watchlist = await self._universe.get_watchlist(logger=self._logger) 

69 if not watchlist: 

70 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"}) 

71 return signals 

72 

73 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)}) 

74 

75 market_progress = self._get_market_progress_ratio() 

76 if market_progress <= 0: 

77 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"}) 

78 return signals 

79 

80 # 마켓 타이밍 사전 체크 

81 market_timing = { 

82 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger), 

83 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger) 

84 } 

85 if not any(market_timing.values()): 

86 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"}) 

87 return signals 

88 

89 candidates = [ 

90 (code, item) for code, item in watchlist.items() 

91 if code not in self._position_state 

92 and market_timing.get(item.market, False) 

93 ] 

94 for i in range(0, len(candidates), 10): 

95 chunk = candidates[i:i + 10] 

96 results = await asyncio.gather( 

97 *[self._check_entry(code, item, market_progress) for code, item in chunk], 

98 return_exceptions=True, 

99 ) 

100 for result in results: 

101 if isinstance(result, Exception): 

102 self._logger.error(f"Scan error: {result}") 

103 elif result: 

104 signals.append(result) 

105 

106 self._logger.info({"event": "scan_finished", "signals_found": len(signals)}) 

107 return signals 

108 

109 async def _check_entry(self, code, item, progress) -> Optional[TradeSignal]: 

110 """진입 조건 검사: Phase 1 → 2 → 3 순서로 필터링.""" 

111 # ── 현재가 데이터 선행 조회 (OHLCV 캐시 활용 위해) ── 

112 resp = await self._sqs.get_current_price(code, caller=self.name) 

113 if not resp or resp.rt_cd != "0": 

114 return None 

115 

116 out = resp.data.get("output") if isinstance(resp.data, dict) else None 

117 if not out: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 return None 

119 

120 if isinstance(out, dict): 120 ↛ 127line 120 didn't jump to line 127 because the condition on line 120 was always true

121 current = int(out.get("stck_prpr", 0)) 

122 today_open = int(out.get("stck_oprc", 0)) 

123 today_low = int(out.get("stck_lwpr", 0)) 

124 prdy_vrss = int(out.get("prdy_vrss", 0)) 

125 prdy_vrss_sign = str(out.get("prdy_vrss_sign", "3")) 

126 else: 

127 current = int(getattr(out, "stck_prpr", 0) or 0) 

128 today_open = int(getattr(out, "stck_oprc", 0) or 0) 

129 today_low = int(getattr(out, "stck_lwpr", 0) or 0) 

130 prdy_vrss = int(getattr(out, "prdy_vrss", 0) or 0) 

131 prdy_vrss_sign = str(getattr(out, "prdy_vrss_sign", "3") or "3") 

132 

133 # 전일 종가 계산 (현재가와 전일대비를 이용해 역산) 

134 if prdy_vrss_sign in ("1", "2"): # 상한, 상승 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 prev_close = current - prdy_vrss 

136 elif prdy_vrss_sign in ("4", "5"): # 하한, 하락 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 prev_close = current + prdy_vrss 

138 else: # 보합 

139 prev_close = current 

140 

141 if current <= 0 or today_low <= 0: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 return None 

143 

144 # ── Phase 1: Setup (로켓 발사) ── 어제까지 확정 OHLCV(캐시) 

145 now = self._tm.get_current_kst_time() 

146 yesterday_str = (now - timedelta(days=1)).strftime("%Y%m%d") 

147 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=30, end_date=yesterday_str) 

148 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else [] 

149 if not ohlcv or len(ohlcv) < 25: 

150 return None 

151 

152 surge_result = self._check_surge_history(ohlcv) 

153 if not surge_result: 

154 self._logger.debug({"event": "entry_rejected", "code": code, "reason": "no_surge_history"}) 

155 return None 

156 

157 surge_volume, surge_day_high = surge_result 

158 

159 if not self._check_ma_uptrend(ohlcv): 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 self._logger.debug({"event": "entry_rejected", "code": code, "reason": "ma_not_uptrending"}) 

161 return None 

162 

163 # ── Phase 2: Pullback (건전한 숨 고르기) ── 

164 # 20MA 계산 (어제까지 확정 OHLCV 기준) 

165 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

166 ma_20d = sum(closes[-self._cfg.ma_period:]) / self._cfg.ma_period if len(closes) >= self._cfg.ma_period else 0 

167 if ma_20d <= 0: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 return None 

169 

170 if not self._check_pullback_to_ma(today_low, ma_20d): 

171 pullback_pct = (today_low - ma_20d) / ma_20d * 100 if ma_20d > 0 else 0.0 

172 self._logger.debug({ 

173 "event": "entry_rejected", "code": code, "reason": "pullback_out_of_range", 

174 "pullback_pct": round(pullback_pct, 2), 

175 "allowed_range": f"{self._cfg.pullback_lower_pct}% ~ {self._cfg.pullback_upper_pct}%" 

176 }) 

177 return None 

178 

179 if not self._check_volume_dryup(ohlcv, surge_volume): 

180 days = self._cfg.volume_dryup_days 

181 recent_vols = [r.get("volume", 0) for r in ohlcv[-days:]] 

182 avg_vol = sum(recent_vols) / len(recent_vols) if recent_vols else 0 

183 vol_dryup_pct = (avg_vol / surge_volume * 100) if surge_volume > 0 else 0.0 

184 self._logger.debug({ 

185 "event": "entry_rejected", "code": code, "reason": "volume_not_dry", 

186 "vol_dryup_pct": round(vol_dryup_pct, 2), "threshold_pct": self._cfg.volume_dryup_ratio * 100 

187 }) 

188 return None 

189 

190 # ── Phase 3: Trigger (매수 방아쇠) ── 

191 prev_high = ohlcv[-1].get("high", 0) if ohlcv else 0 

192 if not self._check_bullish_reversal(current, today_open, prev_high): 

193 self._logger.debug({ 

194 "event": "entry_rejected", "code": code, "reason": "no_bullish_reversal", 

195 "current": current, "today_open": today_open, "prev_high": prev_high 

196 }) 

197 return None 

198 

199 # 체결강도 확인 

200 cgld_val = 0.0 

201 try: 

202 ccnl_resp = await self._sqs.get_stock_conclusion(code) 

203 if ccnl_resp and ccnl_resp.rt_cd == "0": 203 ↛ 212line 203 didn't jump to line 212 because the condition on line 203 was always true

204 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None 

205 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0: 205 ↛ 212line 205 didn't jump to line 212 because the condition on line 205 was always true

206 val = ccnl_output[0].get("tday_rltv") 

207 cgld_val = float(val) if val else 0.0 

208 except Exception as e: 

209 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)}) 

210 return None 

211 

212 if cgld_val < self._cfg.execution_strength_min: 

213 self._logger.debug({ 

214 "event": "entry_rejected", "code": code, "reason": "low_execution_strength", 

215 "cgld": cgld_val, "threshold": self._cfg.execution_strength_min 

216 }) 

217 return None 

218 

219 # ========= 모든 관문 통과! 매수 시그널 생성 ========= 

220 qty = self._calculate_qty(current) 

221 

222 self._position_state[code] = FPPositionState( 

223 entry_price=current, 

224 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"), 

225 peak_price=current, 

226 surge_day_high=surge_day_high, 

227 ) 

228 self._save_state() 

229 

230 # 상세 근거 계산 

231 recent_vols = [r.get("volume", 0) for r in ohlcv[-self._cfg.volume_dryup_days:]] 

232 avg_vol = sum(recent_vols) / len(recent_vols) if recent_vols else 0 

233 vol_dryup_pct = (avg_vol / surge_volume * 100) if surge_volume > 0 else 0.0 

234 pullback_pct = (today_low - ma_20d) / ma_20d * 100 if ma_20d > 0 else 0.0 

235 

236 reason_msg = ( 

237 f"첫눌림목(20MA {ma_20d:,.0f} 지지({pullback_pct:+.1f}%), " 

238 f"거래고갈 {vol_dryup_pct:.0f}%(급등대비), " 

239 f"반등확인 {today_open:,}->{current:,}, " 

240 f"체결강도 {cgld_val:.1f}%)" 

241 ) 

242 

243 self._logger.info({ 

244 "event": "buy_signal_generated", 

245 "code": code, "name": item.name, 

246 "price": current, 

247 "reason": reason_msg, 

248 }) 

249 

250 return TradeSignal( 

251 code=code, name=item.name, action="BUY", price=current, qty=qty, 

252 reason=reason_msg, strategy_name=self.name 

253 ) 

254 

255 # ── Phase 1 검사 메서드 ──────────────────────────────────────── 

256 

257 def _check_surge_history(self, ohlcv: list) -> Optional[Tuple[int, int]]: 

258 """최근 20거래일 내 급등 이력 확인. 

259 

260 Returns: (surge_volume, surge_day_high) 또는 None 

261 """ 

262 lookback = min(self._cfg.surge_lookback_days, len(ohlcv)) 

263 recent = ohlcv[-lookback:] 

264 

265 # 조건 A: 상한가 (종가 기준 전일 대비 +29%) 

266 for i in range(1, len(recent)): 

267 prev_close = recent[i - 1].get("close", 0) 

268 curr_close = recent[i].get("close", 0) 

269 if prev_close > 0 and curr_close > 0: 269 ↛ 266line 269 didn't jump to line 266 because the condition on line 269 was always true

270 change = (curr_close - prev_close) / prev_close * 100 

271 if change >= self._cfg.upper_limit_pct: 

272 return (recent[i].get("volume", 0), recent[i].get("high", curr_close)) 

273 

274 # 조건 B: 단기간(5~10일) +30% 급등 

275 for window in range(self._cfg.rapid_surge_min_days, self._cfg.rapid_surge_max_days + 1): 

276 for start in range(len(recent) - window): 

277 end = start + window 

278 start_close = recent[start].get("close", 0) 

279 end_close = recent[end].get("close", 0) 

280 if start_close > 0 and end_close > 0: 280 ↛ 276line 280 didn't jump to line 276 because the condition on line 280 was always true

281 change = (end_close - start_close) / start_close * 100 

282 if change >= self._cfg.rapid_surge_pct: 

283 # 기준봉: 구간 내 최대 거래량일 

284 window_slice = recent[start:end + 1] 

285 max_vol_day = max(window_slice, key=lambda r: r.get("volume", 0)) 

286 surge_high = max(r.get("high", 0) for r in window_slice) 

287 return (max_vol_day.get("volume", 0), surge_high) 

288 

289 return None 

290 

291 def _check_ma_uptrend(self, ohlcv: list) -> bool: 

292 """20일 이동평균선이 최근 5일 연속 우상향인지 확인.""" 

293 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

294 period = self._cfg.ma_period 

295 needed = period + self._cfg.ma_rising_days 

296 if len(closes) < needed: 

297 return False 

298 

299 # 최근 (ma_rising_days + 1)일의 20MA 계산 

300 ma_values = [] 

301 for i in range(self._cfg.ma_rising_days + 1): 

302 end_idx = len(closes) - i 

303 start_idx = end_idx - period 

304 if start_idx < 0: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 return False 

306 ma = sum(closes[start_idx:end_idx]) / period 

307 ma_values.append(ma) 

308 

309 # ma_values: [오늘MA, 어제MA, ..., 5일전MA] (역순) 

310 ma_values.reverse() # [5일전MA, ..., 어제MA, 오늘MA] 

311 

312 # 5일 연속 기울기 양수 확인 

313 for i in range(1, len(ma_values)): 

314 if ma_values[i] <= ma_values[i - 1]: 

315 return False 

316 

317 return True 

318 

319 # ── Phase 2 검사 메서드 ──────────────────────────────────────── 

320 

321 def _check_pullback_to_ma(self, today_low: int, ma_20d: float) -> bool: 

322 """장중 최저가가 20MA의 -1% ~ +3% 범위 안에 있는지 확인.""" 

323 lower = ma_20d * (1 + self._cfg.pullback_lower_pct / 100) 

324 upper = ma_20d * (1 + self._cfg.pullback_upper_pct / 100) 

325 return lower <= today_low <= upper 

326 

327 def _check_volume_dryup(self, ohlcv: list, surge_volume: int) -> bool: 

328 """최근 3일 평균 거래량이 급등일 거래량의 50% 이하인지 확인.""" 

329 if surge_volume <= 0: 

330 return False 

331 

332 days = self._cfg.volume_dryup_days 

333 recent_vols = [r.get("volume", 0) for r in ohlcv[-days:]] 

334 if not recent_vols: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 return False 

336 

337 avg_vol = sum(recent_vols) / len(recent_vols) 

338 return avg_vol <= surge_volume * self._cfg.volume_dryup_ratio 

339 

340 # ── Phase 3 검사 메서드 ──────────────────────────────────────── 

341 

342 def _check_bullish_reversal(self, current: int, today_open: int, prev_high: int) -> bool: 

343 """양봉 전환(current > open) 또는 전일 고가 돌파 확인.""" 

344 if today_open > 0 and current > today_open: 

345 return True 

346 if prev_high > 0 and current > prev_high: 

347 return True 

348 return False 

349 

350 # ── check_exits ──────────────────────────────────────────────── 

351 

352 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]: 

353 signals = [] 

354 state_dirty = False 

355 for hold in holdings: 

356 code = hold.get("code") 

357 buy_price = hold.get("buy_price") 

358 if not code or not buy_price: 

359 continue 

360 

361 state = self._position_state.get(code) 

362 if not state: 

363 state = FPPositionState( 

364 entry_price=buy_price, 

365 entry_date="", 

366 peak_price=buy_price, 

367 surge_day_high=0, 

368 ) 

369 self._position_state[code] = state 

370 

371 # 현재가 조회 

372 resp = await self._sqs.get_current_price(code, caller=self.name) 

373 if not resp or resp.rt_cd != "0": 

374 continue 

375 

376 output = resp.data.get("output") if isinstance(resp.data, dict) else None 

377 if not output: 377 ↛ 378line 377 didn't jump to line 378 because the condition on line 377 was never true

378 continue 

379 

380 if isinstance(output, dict): 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true

381 current = int(output.get("stck_prpr", 0)) 

382 else: 

383 current = int(getattr(output, "stck_prpr", 0) or 0) 

384 

385 if current <= 0: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 continue 

387 

388 # 최고가 갱신 (dirty flag — 루프 후 1회 저장) 

389 if current > state.peak_price: 

390 state.peak_price = current 

391 state_dirty = True 

392 

393 # 20MA 동적 계산 (매일 변하는 최신 MA) 

394 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=self._cfg.ma_period) 

395 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else [] 

396 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

397 

398 pnl = (current - buy_price) / buy_price * 100 

399 reason = "" 

400 

401 # 🚨 손절: 20MA -2% 이탈 → 잔량 전체 매도 

402 if len(closes) >= self._cfg.ma_period: 402 ↛ 409line 402 didn't jump to line 409 because the condition on line 402 was always true

403 ma_20d = sum(closes[-self._cfg.ma_period:]) / self._cfg.ma_period 

404 threshold = ma_20d * (1 + self._cfg.stop_loss_below_ma_pct / 100) 

405 if current < threshold: 

406 reason = f"손절(20MA {ma_20d:,.0f} 하향이탈 {pnl:.1f}%)" 

407 

408 # 🌟 부분 익절: 직전 익절가(또는 진입가) 대비 +10% 도달 시 반복 실행 

409 if not reason: 

410 ref_price = state.last_partial_sell_price if state.last_partial_sell_price > 0 else buy_price 

411 pnl_from_ref = (current - ref_price) / ref_price * 100 

412 if pnl_from_ref >= self._cfg.take_profit_lower_pct: 

413 holding_qty = int(hold.get("qty", 1)) 

414 sell_qty = max(1, int(holding_qty * self._cfg.partial_sell_ratio)) 

415 

416 if sell_qty >= holding_qty: 

417 sell_qty = holding_qty 

418 sell_reason = f"전량익절({pnl_from_ref:.1f}%, 잔고 {holding_qty}주)" 

419 else: 

420 sell_reason = f"부분익절({pnl_from_ref:.1f}%, {sell_qty}주/{holding_qty}주)" 

421 

422 self._logger.info({ 

423 "event": "partial_profit_signal", 

424 "code": code, "pnl": round(pnl_from_ref, 2), 

425 "sell_qty": sell_qty, "holding_qty": holding_qty, 

426 }) 

427 

428 state.last_partial_sell_price = current 

429 state_dirty = True 

430 

431 signals.append(TradeSignal( 

432 code=code, name=hold.get("name", code), action="SELL", 

433 price=current, qty=sell_qty, 

434 reason=sell_reason, strategy_name=self.name 

435 )) 

436 continue # 부분 매도 후 손절 체크하지 않음 

437 

438 # 매도 시그널 생성 (손절) 

439 if reason: 

440 holding_qty = int(hold.get("qty", 1)) 

441 self._position_state.pop(code, None) 

442 state_dirty = True 

443 signals.append(TradeSignal( 

444 code=code, name=hold.get("name", code), action="SELL", 

445 price=current, qty=holding_qty, reason=reason, strategy_name=self.name 

446 )) 

447 

448 if state_dirty: 

449 self._save_state() 

450 return signals 

451 

452 # ── 헬퍼 ────────────────────────────────────────────────────── 

453 

454 def _calculate_qty(self, price: int) -> int: 

455 if price <= 0: 

456 return self._cfg.min_qty 

457 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100) 

458 return max(int(budget / price), self._cfg.min_qty) 

459 

460 def _get_market_progress_ratio(self) -> float: 

461 now = self._tm.get_current_kst_time() 

462 open_t = self._tm.get_market_open_time() 

463 close_t = self._tm.get_market_close_time() 

464 total = (close_t - open_t).total_seconds() 

465 elapsed = (now - open_t).total_seconds() 

466 return min(elapsed / total, 1.0) if total > 0 else 0.0 

467 

468 def _load_state(self): 

469 if os.path.exists(self.STATE_FILE): 

470 try: 

471 with open(self.STATE_FILE, "r") as f: 

472 data = json.load(f) 

473 for k, v in data.items(): 

474 self._position_state[k] = FPPositionState(**v) 

475 except Exception: 

476 pass 

477 

478 def _save_state(self): 

479 try: 

480 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True) 

481 data = {k: asdict(v) for k, v in self._position_state.items()} 

482 with open(self.STATE_FILE, "w") as f: 

483 json.dump(data, f, indent=2) 

484 except Exception: 

485 pass