Coverage for strategies / oneil_pocket_pivot_strategy.py: 95%

373 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/oneil/pocket_pivot_strategy.py 

2from __future__ import annotations 

3 

4import asyncio 

5import logging 

6import os 

7import json 

8from dataclasses import dataclass, asdict 

9from datetime import timedelta 

10from typing import List, Optional, Dict, Tuple 

11 

12from interfaces.live_strategy import LiveStrategy 

13from common.types import TradeSignal, ErrorCode 

14from services.stock_query_service import StockQueryService 

15from core.market_clock import MarketClock 

16from strategies.oneil_common_types import OneilPocketPivotConfig, PPPositionState 

17from services.oneil_universe_service import OneilUniverseService 

18from core.logger import get_strategy_logger 

19 

20 

21class OneilPocketPivotStrategy(LiveStrategy): 

22 """오닐식 포켓 피봇 & BGU 매매 (O'Neil Pocket Pivot & Buyable Gap-Up). 

23 

24 핵심: 시장 주도주 중 이동평균선 근처에서 기관의 숨은 매집(포켓 피봇)을 포착해 

25 선취매하거나, 강력한 호재로 인한 폭발적 갭상승(BGU) 초입에 올라탄다. 

26 

27 진입 조건: 

28 [공통 필터] 스마트머니(PG순매수 비율) + 체결강도(>=120%) 스냅샷 

29 [조건 A] Pocket Pivot: MA 근접(-2%~+4%) + 환산 거래량 > 하락일 최대 거래량 

30 [조건 B] BGU: 갭 >=4% + 환산 거래량 >= 50일 평균 300% + 09:10 이후 시가 지지 

31 

32 청산 조건 (우선순위): 

33 1. 하드 스탑: 마켓타이밍 악화 OR 고점 대비 -10% 

34 2. PP 손절: 지지MA -2% 이탈 / BGU 손절: 갭업 당일 저가 이탈 

35 3. 부분 익절: +15% 시 50% 매도 (잔고 1주면 전량) 

36 4. 7주 룰: +5% 안착 후 35거래일 경과 & 50MA 이탈 시 전량 청산 

37 """ 

38 STATE_FILE = os.path.join("data", "pp_position_state.json") 

39 

40 def __init__( 

41 self, 

42 stock_query_service: StockQueryService, 

43 universe_service: OneilUniverseService, 

44 market_clock: MarketClock, 

45 config: Optional[OneilPocketPivotConfig] = None, 

46 logger: Optional[logging.Logger] = None, 

47 ): 

48 self._sqs = stock_query_service 

49 self._universe = universe_service 

50 self._tm = market_clock 

51 self._cfg = config or OneilPocketPivotConfig() 

52 if logger: 

53 self._logger = logger 

54 else: 

55 self._logger = get_strategy_logger("OneilPocketPivot") 

56 

57 self._position_state: Dict[str, PPPositionState] = {} 

58 self._load_state() 

59 

60 @property 

61 def name(self) -> str: 

62 return "오닐PP/BGU" 

63 

64 # ── scan ──────────────────────────────────────────────────────── 

65 

66 async def scan(self) -> List[TradeSignal]: 

67 signals: List[TradeSignal] = [] 

68 self._logger.info({"event": "scan_started", "strategy_name": self.name}) 

69 

70 watchlist = await self._universe.get_watchlist(logger=self._logger) 

71 if not watchlist: 

72 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"}) 

73 return signals 

74 

75 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)}) 

76 

77 market_progress = self._get_market_progress_ratio() 

78 if market_progress <= 0: 

79 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"}) 

80 return signals 

81 

82 # 3. 마켓 타이밍 사전 체크 

83 market_timing = { 

84 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger), 

85 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger) 

86 } 

87 if not any(market_timing.values()): 

88 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"}) 

89 return signals 

90 

91 candidates = [ 

92 (code, item) for code, item in watchlist.items() 

93 if code not in self._position_state 

94 and market_timing.get(item.market, False) 

95 ] 

96 for i in range(0, len(candidates), 10): 

97 chunk = candidates[i:i + 10] 

98 results = await asyncio.gather( 

99 *[self._check_entry(code, item, market_progress) for code, item in chunk], 

100 return_exceptions=True, 

101 ) 

102 for result in results: 

103 if isinstance(result, Exception): 

104 self._logger.error(f"Scan error: {result}") 

105 elif result: 

106 signals.append(result) 

107 

108 self._logger.info({"event": "scan_finished", "signals_found": len(signals)}) 

109 return signals 

110 

111 async def _check_entry(self, code, item, progress) -> Optional[TradeSignal]: 

112 """진입 조건 검사: PP 또는 BGU → 스마트머니 → 체결강도.""" 

113 # 1. 현재가 데이터 조회 

114 resp = await self._sqs.get_current_price(code, caller=self.name) 

115 if not resp or resp.rt_cd != "0": 

116 return None 

117 

118 out = resp.data.get("output") if isinstance(resp.data, dict) else None 

119 if not out: 

120 return None 

121 

122 if isinstance(out, dict): 122 ↛ 133line 122 didn't jump to line 133 because the condition on line 122 was always true

123 current = int(out.get("stck_prpr", 0)) 

124 vol = int(out.get("acml_vol", 0)) 

125 pg_buy = int(out.get("pgtr_ntby_qty", 0)) 

126 trade_value = int(out.get("acml_tr_pbmn", 0)) 

127 today_open = int(out.get("stck_oprc", 0)) 

128 today_high = int(out.get("stck_hgpr", 0)) 

129 today_low = int(out.get("stck_lwpr", 0)) 

130 prdy_vrss = int(out.get("prdy_vrss", 0)) 

131 prdy_vrss_sign = str(out.get("prdy_vrss_sign", "3")) 

132 else: 

133 current = int(getattr(out, "stck_prpr", 0) or 0) 

134 vol = int(getattr(out, "acml_vol", 0) or 0) 

135 pg_buy = int(getattr(out, "pgtr_ntby_qty", 0) or 0) 

136 trade_value = int(getattr(out, "acml_tr_pbmn", 0) or 0) 

137 today_open = int(getattr(out, "stck_oprc", 0) or 0) 

138 today_high = int(getattr(out, "stck_hgpr", 0) or 0) 

139 today_low = int(getattr(out, "stck_lwpr", 0) or 0) 

140 prdy_vrss = int(getattr(out, "prdy_vrss", 0) or 0) 

141 prdy_vrss_sign = str(getattr(out, "prdy_vrss_sign", "3") or "3") 

142 

143 # 전일 종가 계산 (현재가와 전일대비를 이용해 역산) 

144 if prdy_vrss_sign in ("1", "2"): # 상한, 상승 

145 prev_close = current - prdy_vrss 

146 elif prdy_vrss_sign in ("4", "5"): # 하한, 하락 

147 prev_close = current + prdy_vrss 

148 else: # 보합 

149 prev_close = current 

150 

151 if current <= 0 or prev_close <= 0: 

152 return None 

153 

154 # 2. OHLCV: 어제까지 확정 데이터(캐시) + 오늘 캔들(현재가로 합성) 

155 now = self._tm.get_current_kst_time() 

156 yesterday_str = (now - timedelta(days=1)).strftime("%Y%m%d") 

157 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=60, end_date=yesterday_str) 

158 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else [] 

159 

160 today_str = now.strftime("%Y%m%d") 

161 today_candle = { 

162 "date": today_str, 

163 "open": float(today_open), 

164 "high": float(today_high), 

165 "low": float(today_low), 

166 "close": float(current), 

167 "volume": vol, 

168 } 

169 if ohlcv and ohlcv[-1].get("date") == today_str: 

170 ohlcv[-1] = today_candle 

171 else: 

172 ohlcv.append(today_candle) 

173 

174 if len(ohlcv) < 10: 

175 return None 

176 

177 # 3. 조건 A (Pocket Pivot) 시도 

178 entry_result = self._check_pocket_pivot( 

179 code, current, vol, progress, ohlcv, item, prev_close 

180 ) 

181 

182 # 4. 조건 B (BGU) 시도 

183 if not entry_result: 

184 entry_result = self._check_bgu( 

185 code, current, vol, progress, ohlcv, today_open, today_low, prev_close 

186 ) 

187 

188 if not entry_result: 

189 return None 

190 

191 entry_type, supporting_ma, gap_day_low, extra_info = entry_result 

192 

193 # 5. ★ 공통 스마트 머니 필터 (기술적 조건 통과 후에만 호출) 

194 if not self._check_smart_money(code, current, pg_buy, trade_value, item.market_cap): 

195 self._logger.debug({"event": "entry_rejected_by_smart_money", "code": code, "entry_type": entry_type}) 

196 return None 

197 

198 # 6. ★ 체결강도 스냅샷 (>=120%) 

199 cgld_val = 0.0 

200 try: 

201 ccnl_resp = await self._sqs.get_stock_conclusion(code) 

202 if ccnl_resp and ccnl_resp.rt_cd == "0": 202 ↛ 211line 202 didn't jump to line 211 because the condition on line 202 was always true

203 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None 

204 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0: 204 ↛ 211line 204 didn't jump to line 211 because the condition on line 204 was always true

205 val = ccnl_output[0].get("tday_rltv") 

206 cgld_val = float(val) if val else 0.0 

207 except Exception as e: 

208 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)}) 

209 return None 

210 

211 if cgld_val < self._cfg.execution_strength_min: 

212 self._logger.debug({"event": "entry_rejected", "code": code, "reason": "low_execution_strength", "cgld": cgld_val}) 

213 return None 

214 

215 # ========= 모든 관문 통과! 매수 시그널 생성 ========= 

216 qty = self._calculate_qty(current) 

217 pg_buy_amount = pg_buy * current 

218 pg_ratio = (pg_buy_amount / trade_value * 100) if trade_value > 0 else 0.0 

219 

220 self._position_state[code] = PPPositionState( 

221 entry_type=entry_type, 

222 entry_price=current, 

223 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"), 

224 peak_price=current, 

225 supporting_ma=supporting_ma, 

226 gap_day_low=gap_day_low, 

227 ) 

228 self._save_state() 

229 

230 if entry_type == "PP": 

231 proj_vol = extra_info.get("proj_vol", 0) 

232 max_down_vol = extra_info.get("max_down_vol", 0) 

233 vol_ratio = (proj_vol / max_down_vol * 100) if max_down_vol > 0 else 0.0 

234 reason_msg = ( 

235 f"PP진입({supporting_ma}MA지지, " 

236 f"예상거래 {vol_ratio:.0f}%(하락최대대비), " 

237 f"PG매수 {pg_buy_amount // 100_000_000:,}억({pg_ratio:.1f}%), " 

238 f"체결강도 {cgld_val:.1f}%)" 

239 ) 

240 elif entry_type == "BGU": 240 ↛ 252line 240 didn't jump to line 252 because the condition on line 240 was always true

241 gap_ratio = extra_info.get("gap_ratio", 0.0) 

242 proj_vol = extra_info.get("proj_vol", 0) 

243 avg_vol_50d = extra_info.get("avg_vol_50d", 0) 

244 vol_ratio = (proj_vol / avg_vol_50d * 100) if avg_vol_50d > 0 else 0.0 

245 reason_msg = ( 

246 f"BGU진입(갭 {gap_ratio:.1f}%, " 

247 f"예상거래 {vol_ratio:.0f}%(50일평균대비), " 

248 f"PG매수 {pg_buy_amount // 100_000_000:,}억({pg_ratio:.1f}%), " 

249 f"체결강도 {cgld_val:.1f}%)" 

250 ) 

251 else: 

252 reason_msg = ( 

253 f"{entry_type}진입(체결강도 {cgld_val:.1f}%, " 

254 f"PG매수 {pg_buy_amount // 100_000_000:,}억({pg_ratio:.1f}%))" 

255 ) 

256 

257 self._logger.info({ 

258 "event": "buy_signal_generated", 

259 "code": code, "name": item.name, 

260 "entry_type": entry_type, 

261 "price": current, 

262 "reason": reason_msg, 

263 }) 

264 

265 return TradeSignal( 

266 code=code, name=item.name, action="BUY", price=current, qty=qty, 

267 reason=reason_msg, strategy_name=self.name 

268 ) 

269 

270 # ── 조건 A: Pocket Pivot ────────────────────────────────────── 

271 

272 def _check_pocket_pivot( 

273 self, code, current, vol, progress, ohlcv, item, prev_close 

274 ) -> Optional[Tuple[str, str, int, dict]]: 

275 """Pocket Pivot 조건 검사. 

276 

277 Returns: ("PP", supporting_ma, 0, extra_info) 또는 None 

278 """ 

279 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

280 if len(closes) < 10: 

281 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "insufficient_data"}) 

282 return None 

283 

284 # 1. MA 계산 (10일은 직접 계산, 20/50일은 item에서) 

285 ma_10d = sum(closes[-10:]) / 10 

286 ma_candidates = [ 

287 (ma_10d, "10"), 

288 (item.ma_20d, "20"), 

289 (item.ma_50d, "50"), 

290 ] 

291 

292 # 2. 이평선 근접성 체크 (-2% ~ +4%) 

293 supporting_ma = "" 

294 for ma_val, ma_name in ma_candidates: 

295 if ma_val <= 0: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true

296 continue 

297 lower = ma_val * (1 + self._cfg.pp_ma_proximity_lower_pct / 100) 

298 upper = ma_val * (1 + self._cfg.pp_ma_proximity_upper_pct / 100) 

299 if lower <= current <= upper: 

300 supporting_ma = ma_name 

301 break 

302 

303 if not supporting_ma: 

304 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "not_near_ma"}) 

305 return None 

306 

307 # 3. 당일 상승일 확인 (현재가 > 전일 종가) 

308 if current <= prev_close: 

309 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "not_an_up_day"}) 

310 return None 

311 

312 # 4. 과거 10일 하락일(close < open) 거래량 중 MAX 산출 

313 lookback = min(self._cfg.pp_down_day_lookback, len(ohlcv)) 

314 recent = ohlcv[-lookback:] 

315 down_day_volumes = [] 

316 for candle in recent: 

317 c = candle.get("close", 0) 

318 o = candle.get("open", 0) 

319 v = candle.get("volume", 0) 

320 if c and o and c < o and v: 

321 down_day_volumes.append(v) 

322 

323 max_down_vol = max(down_day_volumes) if down_day_volumes else 0 

324 

325 # 하락일(음봉)이 단 하루도 없었다면, 정상적인 조정(Base) 구간이 아니므로 기각 

326 if max_down_vol <= 0: 

327 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "no_down_day_volume"}) 

328 return None 

329 

330 # 5. 거래량 우위: 환산 거래량 > 하락일 최대 거래량 

331 effective_progress = max(progress, 0.05) 

332 proj_vol = vol / effective_progress 

333 

334 if proj_vol <= max_down_vol: 

335 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "insufficient_volume", "proj_vol": int(proj_vol), "max_down_vol": max_down_vol}) 

336 return None 

337 

338 self._logger.debug({ 

339 "event": "pocket_pivot_matched", "code": code, 

340 "supporting_ma": supporting_ma, 

341 "proj_vol": int(proj_vol), "max_down_vol": int(max_down_vol), 

342 }) 

343 

344 return ("PP", supporting_ma, 0, {"proj_vol": proj_vol, "max_down_vol": max_down_vol}) 

345 

346 # ── 조건 B: BGU ─────────────────────────────────────────────── 

347 

348 def _check_bgu( 

349 self, code, current, vol, progress, ohlcv, today_open, today_low, prev_close 

350 ) -> Optional[Tuple[str, str, int, dict]]: 

351 """BGU(Buyable Gap-Up) 조건 검사. 

352 

353 Returns: ("BGU", "", gap_day_low, extra_info) 또는 None 

354 """ 

355 if today_open <= 0 or prev_close <= 0: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 return None 

357 

358 # 1. 갭 비율 체크 (시가 >= 전일 종가 + 4%) 

359 gap_ratio = (today_open - prev_close) / prev_close * 100 

360 if gap_ratio < self._cfg.bgu_gap_pct: 

361 return None 

362 

363 # 2. 휩소 필터: 장 시작 후 10분 경과 확인 

364 now = self._tm.get_current_kst_time() 

365 open_time = self._tm.get_market_open_time() 

366 elapsed_minutes = (now - open_time).total_seconds() / 60 

367 if elapsed_minutes < self._cfg.bgu_whipsaw_after_minutes: 

368 return None 

369 

370 # 3. 가격 지지 확인 (현재가 >= 시가) 

371 if current < today_open: 

372 return None 

373 

374 # 4. 상대 거래량 체크 (환산 거래량 >= 50일 평균 × 300%) 

375 volumes = [r.get("volume", 0) for r in ohlcv if r.get("volume")] 

376 vol_50_count = min(50, len(volumes)) 

377 if vol_50_count < 20: 

378 return None 

379 avg_vol_50d = sum(volumes[-vol_50_count:]) / vol_50_count 

380 

381 effective_progress = max(progress, 0.05) 

382 proj_vol = vol / effective_progress 

383 

384 if proj_vol < avg_vol_50d * self._cfg.bgu_volume_multiplier: 

385 return None 

386 

387 self._logger.debug({ 

388 "event": "bgu_matched", "code": code, 

389 "gap_ratio": round(gap_ratio, 2), 

390 "proj_vol": int(proj_vol), "avg_vol_50d": int(avg_vol_50d), 

391 "today_low": today_low, 

392 }) 

393 

394 return ("BGU", "", today_low, {"gap_ratio": gap_ratio, "proj_vol": proj_vol, "avg_vol_50d": avg_vol_50d}) 

395 

396 # ── 스마트 머니 필터 ────────────────────────────────────────── 

397 

398 def _check_smart_money(self, code: str, current: int, pg_buy: int, trade_value: int, market_cap: int) -> bool: 

399 """스마트 머니(프로그램 수급) 필터.""" 

400 if pg_buy <= 0: 

401 self._logger.debug({"event": "smart_money_rejected", "code": code, "reason": "not_net_buy", "pg_buy": pg_buy}) 

402 return False 

403 

404 pg_buy_amount = pg_buy * current 

405 

406 # 거래대금의 10% 이상 개입 

407 if trade_value > 0: 

408 pg_to_tv_pct = pg_buy_amount / trade_value * 100 

409 if pg_to_tv_pct < self._cfg.program_to_trade_value_pct: 

410 self._logger.debug({ 

411 "event": "smart_money_rejected", "code": code, "reason": "low_pg_to_trade_value", 

412 "pg_to_tv_pct": round(pg_to_tv_pct, 2), "threshold": self._cfg.program_to_trade_value_pct 

413 }) 

414 return False 

415 

416 # 시가총액의 0.3% 이상 개입 

417 if market_cap > 0: 

418 pg_to_mc_pct = pg_buy_amount / market_cap * 100 

419 if pg_to_mc_pct < self._cfg.program_to_market_cap_pct: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true

420 self._logger.debug({ 

421 "event": "smart_money_rejected", "code": code, "reason": "low_pg_to_market_cap", 

422 "pg_to_mc_pct": round(pg_to_mc_pct, 2), "threshold": self._cfg.program_to_market_cap_pct 

423 }) 

424 return False 

425 

426 self._logger.debug({"event": "smart_money_passed", "code": code, "pg_buy_amount": pg_buy_amount}) 

427 return True 

428 

429 # ── check_exits ──────────────────────────────────────────────── 

430 

431 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]: 

432 signals = [] 

433 state_dirty = False 

434 for hold in holdings: 

435 code = hold.get("code") 

436 buy_price = hold.get("buy_price") 

437 if not code or not buy_price: 

438 continue 

439 

440 state = self._position_state.get(code) 

441 if not state: 

442 state = PPPositionState( 

443 entry_type="PP", entry_price=buy_price, 

444 entry_date="", peak_price=buy_price, 

445 supporting_ma="20", gap_day_low=0, 

446 ) 

447 self._position_state[code] = state 

448 

449 resp = await self._sqs.get_current_price(code, caller=self.name) 

450 if not resp or resp.rt_cd != "0": 

451 continue 

452 

453 output = resp.data.get("output") if isinstance(resp.data, dict) else None 

454 if not output: 

455 continue 

456 

457 if isinstance(output, dict): 457 ↛ 460line 457 didn't jump to line 460 because the condition on line 457 was always true

458 current = int(output.get("stck_prpr", 0)) 

459 else: 

460 current = int(getattr(output, "stck_prpr", 0) or 0) 

461 

462 if current <= 0: 

463 continue 

464 

465 # 최고가 갱신 (dirty flag — 루프 후 1회 저장) 

466 if current > state.peak_price: 

467 state.peak_price = current 

468 state_dirty = True 

469 

470 pnl = (current - buy_price) / buy_price * 100 

471 today_str = self._tm.get_current_kst_time().strftime("%Y%m%d") 

472 

473 # 수익 안착 추적 (+5% 돌파 시 1회만 기록) 

474 if pnl >= self._cfg.holding_profit_anchor_pct and state.holding_start_date == "": 

475 state.holding_start_date = today_str 

476 state_dirty = True 

477 

478 # OHLCV (MA 기반 체크용) 

479 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=60) 

480 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else [] 

481 

482 reason = "" 

483 

484 # 🚨 우선순위 1: 하드 스탑 (마켓타이밍 악화 OR 고점 대비 -10%) 

485 market = hold.get("market", "KOSPI") # type: ignore 

486 hard_reason = await self._check_hard_stop(state, current, market) 

487 if hard_reason: 

488 reason = hard_reason 

489 

490 # 🚨 우선순위 2: 엔트리별 손절 

491 if not reason: 

492 if state.entry_type == "PP": 

493 pp_reason = self._check_pp_stop_loss(state, current, ohlcv) 

494 if pp_reason: 

495 reason = pp_reason 

496 elif state.entry_type == "BGU": 496 ↛ 502line 496 didn't jump to line 502 because the condition on line 496 was always true

497 bgu_reason = self._check_bgu_stop_loss(state, current) 

498 if bgu_reason: 

499 reason = bgu_reason 

500 

501 # 🌟 우선순위 3: 부분 익절 (직전 익절가 대비 +15% 시 반복 실행) 

502 if not reason: 

503 ref_price = state.last_partial_sell_price if state.last_partial_sell_price > 0 else buy_price 

504 partial_signal = self._check_partial_profit(code, state, current, ref_price, hold) 

505 if partial_signal: 

506 signals.append(partial_signal) 

507 state.last_partial_sell_price = current 

508 state_dirty = True 

509 continue # 부분 매도 후 전량 청산하지 않음 

510 

511 # 🌟 우선순위 4: 7주 룰 만료 (수익 안착 후 35거래일 & 50MA 이탈) 

512 if not reason and state.holding_start_date: 

513 week7_reason = self._check_7week_hold(state, current, ohlcv) 

514 if week7_reason: 

515 reason = week7_reason 

516 

517 # 매도 시그널 생성 

518 if reason: 

519 holding_qty = int(hold.get("qty", 1)) 

520 self._position_state.pop(code, None) 

521 state_dirty = True 

522 signals.append(TradeSignal( 

523 code=code, name=hold.get("name", code), action="SELL", 

524 price=current, qty=holding_qty, reason=reason, strategy_name=self.name 

525 )) 

526 

527 if state_dirty: 

528 self._save_state() 

529 return signals 

530 

531 async def _check_hard_stop(self, state: PPPositionState, current: int, market: str) -> Optional[str]: 

532 """하드 스탑: 마켓타이밍 악화 또는 고점 대비 -10%.""" 

533 # 마켓 타이밍 악화 

534 if not await self._universe.is_market_timing_ok(market, logger=self._logger): 

535 return "하드스탑(마켓타이밍 악화)" 

536 

537 # 고점 대비 폭락 

538 if state.peak_price > 0: 538 ↛ 543line 538 didn't jump to line 543 because the condition on line 538 was always true

539 drop = (current - state.peak_price) / state.peak_price * 100 

540 if drop <= self._cfg.hard_stop_from_peak_pct: 

541 return f"하드스탑(고점대비 {drop:.1f}%)" 

542 

543 return None 

544 

545 def _check_pp_stop_loss(self, state: PPPositionState, current: int, ohlcv) -> Optional[str]: 

546 """PP 손절: 지지 MA를 -2% 이상 하향 이탈.""" 

547 if not ohlcv or not state.supporting_ma: 

548 return None 

549 

550 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

551 ma_period = int(state.supporting_ma) 

552 if len(closes) < ma_period: 

553 return None 

554 

555 ma_value = sum(closes[-ma_period:]) / ma_period 

556 threshold = ma_value * (1 + self._cfg.pp_stop_loss_below_ma_pct / 100) 

557 

558 if current < threshold: 

559 return f"PP손절({state.supporting_ma}MA {ma_value:,.0f} 하향이탈)" 

560 

561 return None 

562 

563 def _check_bgu_stop_loss(self, state: PPPositionState, current: int) -> Optional[str]: 

564 """BGU 손절: 갭업 당일 장중 저가 이탈.""" 

565 if state.gap_day_low > 0 and current < state.gap_day_low: 

566 return f"BGU손절(갭업저가 {state.gap_day_low:,} 이탈)" 

567 return None 

568 

569 def _check_partial_profit( 

570 self, code: str, state: PPPositionState, current: int, buy_price: int, hold: dict 

571 ) -> Optional[TradeSignal]: 

572 """부분 익절: +15% 시 50% 매도. 잔고 1주면 전량.""" 

573 pnl = (current - buy_price) / buy_price * 100 

574 if pnl < self._cfg.partial_profit_trigger_pct: 

575 return None 

576 

577 holding_qty = int(hold.get("qty", 1)) 

578 sell_qty = max(1, int(holding_qty * self._cfg.partial_sell_ratio)) 

579 

580 if sell_qty >= holding_qty: 

581 sell_qty = holding_qty 

582 reason = f"전량익절({pnl:.1f}%, 잔고 {holding_qty}주)" 

583 else: 

584 reason = f"부분익절({pnl:.1f}%, {sell_qty}주/{holding_qty}주)" 

585 

586 self._logger.info({ 

587 "event": "partial_profit_signal", 

588 "code": code, "pnl": round(pnl, 2), 

589 "sell_qty": sell_qty, "holding_qty": holding_qty, 

590 }) 

591 

592 return TradeSignal( 

593 code=code, name=hold.get("name", code), action="SELL", 

594 price=current, qty=sell_qty, 

595 reason=reason, strategy_name=self.name 

596 ) 

597 

598 def _check_7week_hold(self, state: PPPositionState, current: int, ohlcv) -> Optional[str]: 

599 """7주 룰: 수익 안착(+5%) 후 35거래일 경과 & 50MA 이탈 시 청산.""" 

600 if not state.holding_start_date or not ohlcv: 

601 return None 

602 

603 safe_date = state.holding_start_date.replace("-", "") 

604 trading_days = sum( 

605 1 for candle in ohlcv 

606 if str(candle.get("date", "")).replace("-", "") > safe_date 

607 ) 

608 

609 if trading_days < self._cfg.holding_rule_days: 

610 return None 

611 

612 # 50MA 이탈 체크 

613 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

614 ma_period = self._cfg.holding_rule_ma_period 

615 if len(closes) < ma_period: 615 ↛ 616line 615 didn't jump to line 616 because the condition on line 615 was never true

616 return None 

617 

618 ma_50 = sum(closes[-ma_period:]) / ma_period 

619 

620 if current < ma_50: 

621 return f"7주룰(50MA {ma_50:,.0f} 이탈, {trading_days}일 보유)" 

622 

623 return None 

624 

625 # ── 헬퍼 ────────────────────────────────────────────────────── 

626 

627 def _calculate_qty(self, price: int) -> int: 

628 if price <= 0: 

629 return self._cfg.min_qty 

630 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100) 

631 return max(int(budget / price), self._cfg.min_qty) 

632 

633 def _get_market_progress_ratio(self) -> float: 

634 now = self._tm.get_current_kst_time() 

635 open_t = self._tm.get_market_open_time() 

636 close_t = self._tm.get_market_close_time() 

637 total = (close_t - open_t).total_seconds() 

638 elapsed = (now - open_t).total_seconds() 

639 return min(elapsed / total, 1.0) if total > 0 else 0.0 

640 

641 def _load_state(self): 

642 if os.path.exists(self.STATE_FILE): 

643 try: 

644 with open(self.STATE_FILE, "r") as f: 

645 data = json.load(f) 

646 for k, v in data.items(): 

647 self._position_state[k] = PPPositionState(**v) 

648 except Exception: 

649 pass 

650 

651 def _save_state(self): 

652 try: 

653 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True) 

654 data = {k: asdict(v) for k, v in self._position_state.items()} 

655 with open(self.STATE_FILE, "w") as f: 

656 json.dump(data, f, indent=2) 

657 except Exception: 

658 pass