Coverage for strategies / oneil_pocket_pivot_strategy.py: 95%
373 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/oneil/pocket_pivot_strategy.py
2from __future__ import annotations
4import asyncio
5import logging
6import os
7import json
8from dataclasses import dataclass, asdict
9from datetime import timedelta
10from typing import List, Optional, Dict, Tuple
12from interfaces.live_strategy import LiveStrategy
13from common.types import TradeSignal, ErrorCode
14from services.stock_query_service import StockQueryService
15from core.market_clock import MarketClock
16from strategies.oneil_common_types import OneilPocketPivotConfig, PPPositionState
17from services.oneil_universe_service import OneilUniverseService
18from core.logger import get_strategy_logger
21class OneilPocketPivotStrategy(LiveStrategy):
22 """오닐식 포켓 피봇 & BGU 매매 (O'Neil Pocket Pivot & Buyable Gap-Up).
24 핵심: 시장 주도주 중 이동평균선 근처에서 기관의 숨은 매집(포켓 피봇)을 포착해
25 선취매하거나, 강력한 호재로 인한 폭발적 갭상승(BGU) 초입에 올라탄다.
27 진입 조건:
28 [공통 필터] 스마트머니(PG순매수 비율) + 체결강도(>=120%) 스냅샷
29 [조건 A] Pocket Pivot: MA 근접(-2%~+4%) + 환산 거래량 > 하락일 최대 거래량
30 [조건 B] BGU: 갭 >=4% + 환산 거래량 >= 50일 평균 300% + 09:10 이후 시가 지지
32 청산 조건 (우선순위):
33 1. 하드 스탑: 마켓타이밍 악화 OR 고점 대비 -10%
34 2. PP 손절: 지지MA -2% 이탈 / BGU 손절: 갭업 당일 저가 이탈
35 3. 부분 익절: +15% 시 50% 매도 (잔고 1주면 전량)
36 4. 7주 룰: +5% 안착 후 35거래일 경과 & 50MA 이탈 시 전량 청산
37 """
38 STATE_FILE = os.path.join("data", "pp_position_state.json")
40 def __init__(
41 self,
42 stock_query_service: StockQueryService,
43 universe_service: OneilUniverseService,
44 market_clock: MarketClock,
45 config: Optional[OneilPocketPivotConfig] = None,
46 logger: Optional[logging.Logger] = None,
47 ):
48 self._sqs = stock_query_service
49 self._universe = universe_service
50 self._tm = market_clock
51 self._cfg = config or OneilPocketPivotConfig()
52 if logger:
53 self._logger = logger
54 else:
55 self._logger = get_strategy_logger("OneilPocketPivot")
57 self._position_state: Dict[str, PPPositionState] = {}
58 self._load_state()
60 @property
61 def name(self) -> str:
62 return "오닐PP/BGU"
64 # ── scan ────────────────────────────────────────────────────────
66 async def scan(self) -> List[TradeSignal]:
67 signals: List[TradeSignal] = []
68 self._logger.info({"event": "scan_started", "strategy_name": self.name})
70 watchlist = await self._universe.get_watchlist(logger=self._logger)
71 if not watchlist:
72 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"})
73 return signals
75 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)})
77 market_progress = self._get_market_progress_ratio()
78 if market_progress <= 0:
79 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"})
80 return signals
82 # 3. 마켓 타이밍 사전 체크
83 market_timing = {
84 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger),
85 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger)
86 }
87 if not any(market_timing.values()):
88 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"})
89 return signals
91 candidates = [
92 (code, item) for code, item in watchlist.items()
93 if code not in self._position_state
94 and market_timing.get(item.market, False)
95 ]
96 for i in range(0, len(candidates), 10):
97 chunk = candidates[i:i + 10]
98 results = await asyncio.gather(
99 *[self._check_entry(code, item, market_progress) for code, item in chunk],
100 return_exceptions=True,
101 )
102 for result in results:
103 if isinstance(result, Exception):
104 self._logger.error(f"Scan error: {result}")
105 elif result:
106 signals.append(result)
108 self._logger.info({"event": "scan_finished", "signals_found": len(signals)})
109 return signals
111 async def _check_entry(self, code, item, progress) -> Optional[TradeSignal]:
112 """진입 조건 검사: PP 또는 BGU → 스마트머니 → 체결강도."""
113 # 1. 현재가 데이터 조회
114 resp = await self._sqs.get_current_price(code, caller=self.name)
115 if not resp or resp.rt_cd != "0":
116 return None
118 out = resp.data.get("output") if isinstance(resp.data, dict) else None
119 if not out:
120 return None
122 if isinstance(out, dict): 122 ↛ 133line 122 didn't jump to line 133 because the condition on line 122 was always true
123 current = int(out.get("stck_prpr", 0))
124 vol = int(out.get("acml_vol", 0))
125 pg_buy = int(out.get("pgtr_ntby_qty", 0))
126 trade_value = int(out.get("acml_tr_pbmn", 0))
127 today_open = int(out.get("stck_oprc", 0))
128 today_high = int(out.get("stck_hgpr", 0))
129 today_low = int(out.get("stck_lwpr", 0))
130 prdy_vrss = int(out.get("prdy_vrss", 0))
131 prdy_vrss_sign = str(out.get("prdy_vrss_sign", "3"))
132 else:
133 current = int(getattr(out, "stck_prpr", 0) or 0)
134 vol = int(getattr(out, "acml_vol", 0) or 0)
135 pg_buy = int(getattr(out, "pgtr_ntby_qty", 0) or 0)
136 trade_value = int(getattr(out, "acml_tr_pbmn", 0) or 0)
137 today_open = int(getattr(out, "stck_oprc", 0) or 0)
138 today_high = int(getattr(out, "stck_hgpr", 0) or 0)
139 today_low = int(getattr(out, "stck_lwpr", 0) or 0)
140 prdy_vrss = int(getattr(out, "prdy_vrss", 0) or 0)
141 prdy_vrss_sign = str(getattr(out, "prdy_vrss_sign", "3") or "3")
143 # 전일 종가 계산 (현재가와 전일대비를 이용해 역산)
144 if prdy_vrss_sign in ("1", "2"): # 상한, 상승
145 prev_close = current - prdy_vrss
146 elif prdy_vrss_sign in ("4", "5"): # 하한, 하락
147 prev_close = current + prdy_vrss
148 else: # 보합
149 prev_close = current
151 if current <= 0 or prev_close <= 0:
152 return None
154 # 2. OHLCV: 어제까지 확정 데이터(캐시) + 오늘 캔들(현재가로 합성)
155 now = self._tm.get_current_kst_time()
156 yesterday_str = (now - timedelta(days=1)).strftime("%Y%m%d")
157 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=60, end_date=yesterday_str)
158 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else []
160 today_str = now.strftime("%Y%m%d")
161 today_candle = {
162 "date": today_str,
163 "open": float(today_open),
164 "high": float(today_high),
165 "low": float(today_low),
166 "close": float(current),
167 "volume": vol,
168 }
169 if ohlcv and ohlcv[-1].get("date") == today_str:
170 ohlcv[-1] = today_candle
171 else:
172 ohlcv.append(today_candle)
174 if len(ohlcv) < 10:
175 return None
177 # 3. 조건 A (Pocket Pivot) 시도
178 entry_result = self._check_pocket_pivot(
179 code, current, vol, progress, ohlcv, item, prev_close
180 )
182 # 4. 조건 B (BGU) 시도
183 if not entry_result:
184 entry_result = self._check_bgu(
185 code, current, vol, progress, ohlcv, today_open, today_low, prev_close
186 )
188 if not entry_result:
189 return None
191 entry_type, supporting_ma, gap_day_low, extra_info = entry_result
193 # 5. ★ 공통 스마트 머니 필터 (기술적 조건 통과 후에만 호출)
194 if not self._check_smart_money(code, current, pg_buy, trade_value, item.market_cap):
195 self._logger.debug({"event": "entry_rejected_by_smart_money", "code": code, "entry_type": entry_type})
196 return None
198 # 6. ★ 체결강도 스냅샷 (>=120%)
199 cgld_val = 0.0
200 try:
201 ccnl_resp = await self._sqs.get_stock_conclusion(code)
202 if ccnl_resp and ccnl_resp.rt_cd == "0": 202 ↛ 211line 202 didn't jump to line 211 because the condition on line 202 was always true
203 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None
204 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0: 204 ↛ 211line 204 didn't jump to line 211 because the condition on line 204 was always true
205 val = ccnl_output[0].get("tday_rltv")
206 cgld_val = float(val) if val else 0.0
207 except Exception as e:
208 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)})
209 return None
211 if cgld_val < self._cfg.execution_strength_min:
212 self._logger.debug({"event": "entry_rejected", "code": code, "reason": "low_execution_strength", "cgld": cgld_val})
213 return None
215 # ========= 모든 관문 통과! 매수 시그널 생성 =========
216 qty = self._calculate_qty(current)
217 pg_buy_amount = pg_buy * current
218 pg_ratio = (pg_buy_amount / trade_value * 100) if trade_value > 0 else 0.0
220 self._position_state[code] = PPPositionState(
221 entry_type=entry_type,
222 entry_price=current,
223 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"),
224 peak_price=current,
225 supporting_ma=supporting_ma,
226 gap_day_low=gap_day_low,
227 )
228 self._save_state()
230 if entry_type == "PP":
231 proj_vol = extra_info.get("proj_vol", 0)
232 max_down_vol = extra_info.get("max_down_vol", 0)
233 vol_ratio = (proj_vol / max_down_vol * 100) if max_down_vol > 0 else 0.0
234 reason_msg = (
235 f"PP진입({supporting_ma}MA지지, "
236 f"예상거래 {vol_ratio:.0f}%(하락최대대비), "
237 f"PG매수 {pg_buy_amount // 100_000_000:,}억({pg_ratio:.1f}%), "
238 f"체결강도 {cgld_val:.1f}%)"
239 )
240 elif entry_type == "BGU": 240 ↛ 252line 240 didn't jump to line 252 because the condition on line 240 was always true
241 gap_ratio = extra_info.get("gap_ratio", 0.0)
242 proj_vol = extra_info.get("proj_vol", 0)
243 avg_vol_50d = extra_info.get("avg_vol_50d", 0)
244 vol_ratio = (proj_vol / avg_vol_50d * 100) if avg_vol_50d > 0 else 0.0
245 reason_msg = (
246 f"BGU진입(갭 {gap_ratio:.1f}%, "
247 f"예상거래 {vol_ratio:.0f}%(50일평균대비), "
248 f"PG매수 {pg_buy_amount // 100_000_000:,}억({pg_ratio:.1f}%), "
249 f"체결강도 {cgld_val:.1f}%)"
250 )
251 else:
252 reason_msg = (
253 f"{entry_type}진입(체결강도 {cgld_val:.1f}%, "
254 f"PG매수 {pg_buy_amount // 100_000_000:,}억({pg_ratio:.1f}%))"
255 )
257 self._logger.info({
258 "event": "buy_signal_generated",
259 "code": code, "name": item.name,
260 "entry_type": entry_type,
261 "price": current,
262 "reason": reason_msg,
263 })
265 return TradeSignal(
266 code=code, name=item.name, action="BUY", price=current, qty=qty,
267 reason=reason_msg, strategy_name=self.name
268 )
270 # ── 조건 A: Pocket Pivot ──────────────────────────────────────
272 def _check_pocket_pivot(
273 self, code, current, vol, progress, ohlcv, item, prev_close
274 ) -> Optional[Tuple[str, str, int, dict]]:
275 """Pocket Pivot 조건 검사.
277 Returns: ("PP", supporting_ma, 0, extra_info) 또는 None
278 """
279 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
280 if len(closes) < 10:
281 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "insufficient_data"})
282 return None
284 # 1. MA 계산 (10일은 직접 계산, 20/50일은 item에서)
285 ma_10d = sum(closes[-10:]) / 10
286 ma_candidates = [
287 (ma_10d, "10"),
288 (item.ma_20d, "20"),
289 (item.ma_50d, "50"),
290 ]
292 # 2. 이평선 근접성 체크 (-2% ~ +4%)
293 supporting_ma = ""
294 for ma_val, ma_name in ma_candidates:
295 if ma_val <= 0: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true
296 continue
297 lower = ma_val * (1 + self._cfg.pp_ma_proximity_lower_pct / 100)
298 upper = ma_val * (1 + self._cfg.pp_ma_proximity_upper_pct / 100)
299 if lower <= current <= upper:
300 supporting_ma = ma_name
301 break
303 if not supporting_ma:
304 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "not_near_ma"})
305 return None
307 # 3. 당일 상승일 확인 (현재가 > 전일 종가)
308 if current <= prev_close:
309 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "not_an_up_day"})
310 return None
312 # 4. 과거 10일 하락일(close < open) 거래량 중 MAX 산출
313 lookback = min(self._cfg.pp_down_day_lookback, len(ohlcv))
314 recent = ohlcv[-lookback:]
315 down_day_volumes = []
316 for candle in recent:
317 c = candle.get("close", 0)
318 o = candle.get("open", 0)
319 v = candle.get("volume", 0)
320 if c and o and c < o and v:
321 down_day_volumes.append(v)
323 max_down_vol = max(down_day_volumes) if down_day_volumes else 0
325 # 하락일(음봉)이 단 하루도 없었다면, 정상적인 조정(Base) 구간이 아니므로 기각
326 if max_down_vol <= 0:
327 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "no_down_day_volume"})
328 return None
330 # 5. 거래량 우위: 환산 거래량 > 하락일 최대 거래량
331 effective_progress = max(progress, 0.05)
332 proj_vol = vol / effective_progress
334 if proj_vol <= max_down_vol:
335 self._logger.debug({"event": "pp_rejected", "code": code, "reason": "insufficient_volume", "proj_vol": int(proj_vol), "max_down_vol": max_down_vol})
336 return None
338 self._logger.debug({
339 "event": "pocket_pivot_matched", "code": code,
340 "supporting_ma": supporting_ma,
341 "proj_vol": int(proj_vol), "max_down_vol": int(max_down_vol),
342 })
344 return ("PP", supporting_ma, 0, {"proj_vol": proj_vol, "max_down_vol": max_down_vol})
346 # ── 조건 B: BGU ───────────────────────────────────────────────
348 def _check_bgu(
349 self, code, current, vol, progress, ohlcv, today_open, today_low, prev_close
350 ) -> Optional[Tuple[str, str, int, dict]]:
351 """BGU(Buyable Gap-Up) 조건 검사.
353 Returns: ("BGU", "", gap_day_low, extra_info) 또는 None
354 """
355 if today_open <= 0 or prev_close <= 0: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 return None
358 # 1. 갭 비율 체크 (시가 >= 전일 종가 + 4%)
359 gap_ratio = (today_open - prev_close) / prev_close * 100
360 if gap_ratio < self._cfg.bgu_gap_pct:
361 return None
363 # 2. 휩소 필터: 장 시작 후 10분 경과 확인
364 now = self._tm.get_current_kst_time()
365 open_time = self._tm.get_market_open_time()
366 elapsed_minutes = (now - open_time).total_seconds() / 60
367 if elapsed_minutes < self._cfg.bgu_whipsaw_after_minutes:
368 return None
370 # 3. 가격 지지 확인 (현재가 >= 시가)
371 if current < today_open:
372 return None
374 # 4. 상대 거래량 체크 (환산 거래량 >= 50일 평균 × 300%)
375 volumes = [r.get("volume", 0) for r in ohlcv if r.get("volume")]
376 vol_50_count = min(50, len(volumes))
377 if vol_50_count < 20:
378 return None
379 avg_vol_50d = sum(volumes[-vol_50_count:]) / vol_50_count
381 effective_progress = max(progress, 0.05)
382 proj_vol = vol / effective_progress
384 if proj_vol < avg_vol_50d * self._cfg.bgu_volume_multiplier:
385 return None
387 self._logger.debug({
388 "event": "bgu_matched", "code": code,
389 "gap_ratio": round(gap_ratio, 2),
390 "proj_vol": int(proj_vol), "avg_vol_50d": int(avg_vol_50d),
391 "today_low": today_low,
392 })
394 return ("BGU", "", today_low, {"gap_ratio": gap_ratio, "proj_vol": proj_vol, "avg_vol_50d": avg_vol_50d})
396 # ── 스마트 머니 필터 ──────────────────────────────────────────
398 def _check_smart_money(self, code: str, current: int, pg_buy: int, trade_value: int, market_cap: int) -> bool:
399 """스마트 머니(프로그램 수급) 필터."""
400 if pg_buy <= 0:
401 self._logger.debug({"event": "smart_money_rejected", "code": code, "reason": "not_net_buy", "pg_buy": pg_buy})
402 return False
404 pg_buy_amount = pg_buy * current
406 # 거래대금의 10% 이상 개입
407 if trade_value > 0:
408 pg_to_tv_pct = pg_buy_amount / trade_value * 100
409 if pg_to_tv_pct < self._cfg.program_to_trade_value_pct:
410 self._logger.debug({
411 "event": "smart_money_rejected", "code": code, "reason": "low_pg_to_trade_value",
412 "pg_to_tv_pct": round(pg_to_tv_pct, 2), "threshold": self._cfg.program_to_trade_value_pct
413 })
414 return False
416 # 시가총액의 0.3% 이상 개입
417 if market_cap > 0:
418 pg_to_mc_pct = pg_buy_amount / market_cap * 100
419 if pg_to_mc_pct < self._cfg.program_to_market_cap_pct: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 self._logger.debug({
421 "event": "smart_money_rejected", "code": code, "reason": "low_pg_to_market_cap",
422 "pg_to_mc_pct": round(pg_to_mc_pct, 2), "threshold": self._cfg.program_to_market_cap_pct
423 })
424 return False
426 self._logger.debug({"event": "smart_money_passed", "code": code, "pg_buy_amount": pg_buy_amount})
427 return True
429 # ── check_exits ────────────────────────────────────────────────
431 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]:
432 signals = []
433 state_dirty = False
434 for hold in holdings:
435 code = hold.get("code")
436 buy_price = hold.get("buy_price")
437 if not code or not buy_price:
438 continue
440 state = self._position_state.get(code)
441 if not state:
442 state = PPPositionState(
443 entry_type="PP", entry_price=buy_price,
444 entry_date="", peak_price=buy_price,
445 supporting_ma="20", gap_day_low=0,
446 )
447 self._position_state[code] = state
449 resp = await self._sqs.get_current_price(code, caller=self.name)
450 if not resp or resp.rt_cd != "0":
451 continue
453 output = resp.data.get("output") if isinstance(resp.data, dict) else None
454 if not output:
455 continue
457 if isinstance(output, dict): 457 ↛ 460line 457 didn't jump to line 460 because the condition on line 457 was always true
458 current = int(output.get("stck_prpr", 0))
459 else:
460 current = int(getattr(output, "stck_prpr", 0) or 0)
462 if current <= 0:
463 continue
465 # 최고가 갱신 (dirty flag — 루프 후 1회 저장)
466 if current > state.peak_price:
467 state.peak_price = current
468 state_dirty = True
470 pnl = (current - buy_price) / buy_price * 100
471 today_str = self._tm.get_current_kst_time().strftime("%Y%m%d")
473 # 수익 안착 추적 (+5% 돌파 시 1회만 기록)
474 if pnl >= self._cfg.holding_profit_anchor_pct and state.holding_start_date == "":
475 state.holding_start_date = today_str
476 state_dirty = True
478 # OHLCV (MA 기반 체크용)
479 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=60)
480 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else []
482 reason = ""
484 # 🚨 우선순위 1: 하드 스탑 (마켓타이밍 악화 OR 고점 대비 -10%)
485 market = hold.get("market", "KOSPI") # type: ignore
486 hard_reason = await self._check_hard_stop(state, current, market)
487 if hard_reason:
488 reason = hard_reason
490 # 🚨 우선순위 2: 엔트리별 손절
491 if not reason:
492 if state.entry_type == "PP":
493 pp_reason = self._check_pp_stop_loss(state, current, ohlcv)
494 if pp_reason:
495 reason = pp_reason
496 elif state.entry_type == "BGU": 496 ↛ 502line 496 didn't jump to line 502 because the condition on line 496 was always true
497 bgu_reason = self._check_bgu_stop_loss(state, current)
498 if bgu_reason:
499 reason = bgu_reason
501 # 🌟 우선순위 3: 부분 익절 (직전 익절가 대비 +15% 시 반복 실행)
502 if not reason:
503 ref_price = state.last_partial_sell_price if state.last_partial_sell_price > 0 else buy_price
504 partial_signal = self._check_partial_profit(code, state, current, ref_price, hold)
505 if partial_signal:
506 signals.append(partial_signal)
507 state.last_partial_sell_price = current
508 state_dirty = True
509 continue # 부분 매도 후 전량 청산하지 않음
511 # 🌟 우선순위 4: 7주 룰 만료 (수익 안착 후 35거래일 & 50MA 이탈)
512 if not reason and state.holding_start_date:
513 week7_reason = self._check_7week_hold(state, current, ohlcv)
514 if week7_reason:
515 reason = week7_reason
517 # 매도 시그널 생성
518 if reason:
519 holding_qty = int(hold.get("qty", 1))
520 self._position_state.pop(code, None)
521 state_dirty = True
522 signals.append(TradeSignal(
523 code=code, name=hold.get("name", code), action="SELL",
524 price=current, qty=holding_qty, reason=reason, strategy_name=self.name
525 ))
527 if state_dirty:
528 self._save_state()
529 return signals
531 async def _check_hard_stop(self, state: PPPositionState, current: int, market: str) -> Optional[str]:
532 """하드 스탑: 마켓타이밍 악화 또는 고점 대비 -10%."""
533 # 마켓 타이밍 악화
534 if not await self._universe.is_market_timing_ok(market, logger=self._logger):
535 return "하드스탑(마켓타이밍 악화)"
537 # 고점 대비 폭락
538 if state.peak_price > 0: 538 ↛ 543line 538 didn't jump to line 543 because the condition on line 538 was always true
539 drop = (current - state.peak_price) / state.peak_price * 100
540 if drop <= self._cfg.hard_stop_from_peak_pct:
541 return f"하드스탑(고점대비 {drop:.1f}%)"
543 return None
545 def _check_pp_stop_loss(self, state: PPPositionState, current: int, ohlcv) -> Optional[str]:
546 """PP 손절: 지지 MA를 -2% 이상 하향 이탈."""
547 if not ohlcv or not state.supporting_ma:
548 return None
550 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
551 ma_period = int(state.supporting_ma)
552 if len(closes) < ma_period:
553 return None
555 ma_value = sum(closes[-ma_period:]) / ma_period
556 threshold = ma_value * (1 + self._cfg.pp_stop_loss_below_ma_pct / 100)
558 if current < threshold:
559 return f"PP손절({state.supporting_ma}MA {ma_value:,.0f} 하향이탈)"
561 return None
563 def _check_bgu_stop_loss(self, state: PPPositionState, current: int) -> Optional[str]:
564 """BGU 손절: 갭업 당일 장중 저가 이탈."""
565 if state.gap_day_low > 0 and current < state.gap_day_low:
566 return f"BGU손절(갭업저가 {state.gap_day_low:,} 이탈)"
567 return None
569 def _check_partial_profit(
570 self, code: str, state: PPPositionState, current: int, buy_price: int, hold: dict
571 ) -> Optional[TradeSignal]:
572 """부분 익절: +15% 시 50% 매도. 잔고 1주면 전량."""
573 pnl = (current - buy_price) / buy_price * 100
574 if pnl < self._cfg.partial_profit_trigger_pct:
575 return None
577 holding_qty = int(hold.get("qty", 1))
578 sell_qty = max(1, int(holding_qty * self._cfg.partial_sell_ratio))
580 if sell_qty >= holding_qty:
581 sell_qty = holding_qty
582 reason = f"전량익절({pnl:.1f}%, 잔고 {holding_qty}주)"
583 else:
584 reason = f"부분익절({pnl:.1f}%, {sell_qty}주/{holding_qty}주)"
586 self._logger.info({
587 "event": "partial_profit_signal",
588 "code": code, "pnl": round(pnl, 2),
589 "sell_qty": sell_qty, "holding_qty": holding_qty,
590 })
592 return TradeSignal(
593 code=code, name=hold.get("name", code), action="SELL",
594 price=current, qty=sell_qty,
595 reason=reason, strategy_name=self.name
596 )
598 def _check_7week_hold(self, state: PPPositionState, current: int, ohlcv) -> Optional[str]:
599 """7주 룰: 수익 안착(+5%) 후 35거래일 경과 & 50MA 이탈 시 청산."""
600 if not state.holding_start_date or not ohlcv:
601 return None
603 safe_date = state.holding_start_date.replace("-", "")
604 trading_days = sum(
605 1 for candle in ohlcv
606 if str(candle.get("date", "")).replace("-", "") > safe_date
607 )
609 if trading_days < self._cfg.holding_rule_days:
610 return None
612 # 50MA 이탈 체크
613 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
614 ma_period = self._cfg.holding_rule_ma_period
615 if len(closes) < ma_period: 615 ↛ 616line 615 didn't jump to line 616 because the condition on line 615 was never true
616 return None
618 ma_50 = sum(closes[-ma_period:]) / ma_period
620 if current < ma_50:
621 return f"7주룰(50MA {ma_50:,.0f} 이탈, {trading_days}일 보유)"
623 return None
625 # ── 헬퍼 ──────────────────────────────────────────────────────
627 def _calculate_qty(self, price: int) -> int:
628 if price <= 0:
629 return self._cfg.min_qty
630 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100)
631 return max(int(budget / price), self._cfg.min_qty)
633 def _get_market_progress_ratio(self) -> float:
634 now = self._tm.get_current_kst_time()
635 open_t = self._tm.get_market_open_time()
636 close_t = self._tm.get_market_close_time()
637 total = (close_t - open_t).total_seconds()
638 elapsed = (now - open_t).total_seconds()
639 return min(elapsed / total, 1.0) if total > 0 else 0.0
641 def _load_state(self):
642 if os.path.exists(self.STATE_FILE):
643 try:
644 with open(self.STATE_FILE, "r") as f:
645 data = json.load(f)
646 for k, v in data.items():
647 self._position_state[k] = PPPositionState(**v)
648 except Exception:
649 pass
651 def _save_state(self):
652 try:
653 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True)
654 data = {k: asdict(v) for k, v in self._position_state.items()}
655 with open(self.STATE_FILE, "w") as f:
656 json.dump(data, f, indent=2)
657 except Exception:
658 pass