Coverage for strategies / volume_breakout_live_strategy.py: 91%
118 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/volume_breakout_live_strategy.py
2from __future__ import annotations
4import logging
5from typing import List, Optional
7from interfaces.live_strategy import LiveStrategy
8from common.types import TradeSignal, ErrorCode
9from services.stock_query_service import StockQueryService
10from strategies.volume_breakout_strategy import VolumeBreakoutConfig
11from core.market_clock import MarketClock
12from core.logger import get_strategy_logger
15class VolumeBreakoutLiveStrategy(LiveStrategy):
16 """거래량 돌파 라이브 전략.
18 scan():
19 1. 거래대금 상위 30 종목 조회
20 2. 시가 대비 현재가 >= trigger_pct 필터
21 3. 당일 거래량 >= 평균거래량 * avg_vol_multiplier 필터
22 4. 통과 종목에 대해 BUY TradeSignal 반환
24 check_exits():
25 - 익절: 당일 고가 대비 <= -trailing_stop_pct
26 - 손절: 매수가 대비 <= stop_loss_pct
27 - 시간청산: 장 마감 15분 전
28 """
30 def __init__(
31 self,
32 stock_query_service: StockQueryService,
33 market_clock: MarketClock,
34 config: Optional[VolumeBreakoutConfig] = None,
35 logger: Optional[logging.Logger] = None,
36 ):
37 self._sqs = stock_query_service
38 self._tm = market_clock
39 self._cfg = config or VolumeBreakoutConfig()
40 if logger:
41 self._logger = logger
42 else:
43 self._logger = get_strategy_logger("VolumeBreakoutLive")
45 self._bought_today = set()
46 self._last_date = ""
48 @property
49 def name(self) -> str:
50 return "거래량돌파"
52 async def scan(self) -> List[TradeSignal]:
53 signals: List[TradeSignal] = []
54 self._logger.info({"event": "scan_started", "strategy_name": self.name})
56 # 날짜 변경 시 당일 매수 기록 초기화
57 today = self._tm.get_current_kst_time().strftime("%Y%m%d")
58 if self._last_date != today:
59 self._bought_today.clear()
60 self._last_date = today
62 # 1) 거래대금 상위 종목 조회
63 resp = await self._sqs.get_top_trading_value_stocks()
64 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
65 self._logger.warning({
66 "event": "scan_failed",
67 "reason": "Failed to get top trading value stocks",
68 "response": vars(resp) if resp else None,
69 })
70 return signals
72 candidates = resp.data or []
73 self._logger.info({
74 "event": "scan_candidates_fetched",
75 "count": len(candidates),
76 })
78 for stock in candidates:
79 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or ""
80 stock_name = stock.get("hts_kor_isnm", code)
81 log_data = {"code": code, "name": stock_name}
83 if not code: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 continue
86 # 재진입 불허 설정 시, 이미 매수한 종목은 스킵
87 if not self._cfg.allow_reentry and code in self._bought_today:
88 continue
90 try:
91 # 2) 현재가/시가 조회
92 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name)
93 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 log_data.update({"reason": "Failed to get current price"})
95 self._logger.info({"event": "candidate_rejected", **log_data})
96 continue
98 data = price_resp.data or {}
99 current = int(data.get("price", "0") or "0")
100 open_price = int(data.get("open", "0") or "0")
101 current_vol = int(stock.get("acml_vol", "0") or "0")
103 log_data.update({
104 "current_price": current,
105 "open_price": open_price,
106 "volume": current_vol,
107 })
109 if open_price <= 0 or current <= 0: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 log_data.update({"reason": "Invalid price data (open or current is zero)"})
111 self._logger.info({"event": "candidate_rejected", **log_data})
112 continue
114 # 3) 시가 대비 변동률 체크
115 change_from_open = (current / open_price - 1.0) * 100
116 log_data["change_from_open_pct"] = round(change_from_open, 2)
118 if change_from_open < self._cfg.trigger_pct:
119 log_data.update({"reason": f"Change from open {change_from_open:.2f}% < trigger {self._cfg.trigger_pct}%"})
120 self._logger.info({"event": "candidate_rejected", **log_data})
121 continue
123 # 4) 거래량 체크 (현재는 스킵, 추후 추가 가능)
125 # BUY 신호 생성
126 reason_msg = (
127 f"시가돌파(시가대비 +{change_from_open:.1f}%, "
128 f"상승확인 {open_price:,}->{current:,}, "
129 f"누적거래량 {current_vol:,})"
130 )
131 signals.append(TradeSignal(
132 code=code, name=stock_name, action="BUY", price=current, qty=1,
133 reason=reason_msg, strategy_name=self.name,
134 ))
135 self._bought_today.add(code) # 매수 신호 발생 기록
136 self._logger.info({
137 "event": "buy_signal_generated",
138 "strategy_name": self.name,
139 "code": code,
140 "name": stock_name,
141 "price": current,
142 "reason": reason_msg,
143 "data": log_data,
144 })
146 except Exception as e:
147 self._logger.error({
148 "event": "scan_error",
149 "strategy_name": self.name,
150 "code": code,
151 "error": str(e),
152 }, exc_info=True)
154 self._logger.info({"event": "scan_finished", "signals_found": len(signals)})
155 return signals
157 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]:
158 signals: List[TradeSignal] = []
159 self._logger.info({"event": "check_exits_started", "holdings_count": len(holdings)})
161 now = self._tm.get_current_kst_time()
162 close_time = self._tm.get_market_close_time()
163 minutes_to_close = (close_time - now).total_seconds() / 60
165 for hold in holdings:
166 code = str(hold.get("code", ""))
167 buy_price = hold.get("buy_price", 0)
168 stock_name = hold.get("name", code)
169 log_data = {"code": code, "name": stock_name, "buy_price": buy_price}
171 if not code or not buy_price:
172 continue
174 try:
175 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name)
176 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value:
177 self._logger.warning({
178 "event": "check_exits_failed",
179 "reason": "Failed to get current price for holding",
180 **log_data,
181 })
182 continue
184 data = price_resp.data or {}
185 current = int(data.get("price", "0") or "0")
186 high_price = int(data.get("high", "0") or "0")
187 log_data.update({"current_price": current, "day_high": high_price})
189 if current <= 0 or high_price <= 0: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 continue
192 reason = ""
193 should_sell = False
195 # 손익률과 고점 대비 하락률을 먼저 계산
196 pnl_pct = ((current - buy_price) / buy_price) * 100
197 drop_from_high = ((current - high_price) / high_price) * 100
199 # 익절 조건: 당일 고가 대비 설정된 비율(-8%) 이상 하락 시 (Trailing Stop)
200 if drop_from_high <= -self._cfg.trailing_stop_pct:
201 reason = f"익절(트레일링): 고가({high_price:,})대비 {drop_from_high:.1f}%"
202 should_sell = True
203 elif pnl_pct <= self._cfg.stop_loss_pct:
204 reason = f"손절: 매수가대비 {pnl_pct:.1f}%"
205 should_sell = True
207 if not should_sell and minutes_to_close <= 15:
208 reason = f"시간청산: 장마감 {minutes_to_close:.0f}분전"
209 should_sell = True
211 if should_sell:
212 holding_qty = int(hold.get("qty", 1))
213 signals.append(TradeSignal(
214 code=code, name=stock_name, action="SELL", price=current, qty=holding_qty,
215 reason=reason, strategy_name=self.name,
216 ))
217 self._logger.info({
218 "event": "sell_signal_generated",
219 "strategy_name": self.name,
220 "code": code,
221 "name": stock_name,
222 "price": current,
223 "reason": reason,
224 "data": {**log_data, "pnl_pct": round(pnl_pct, 2), "drop_from_high_pct": round(drop_from_high, 2)},
225 })
226 else:
227 self._logger.info({
228 "event": "hold_checked",
229 "code": code,
230 "reason": "No exit condition met",
231 "data": log_data,
232 })
234 except Exception as e:
235 self._logger.error({
236 "event": "check_exits_error",
237 "strategy_name": self.name,
238 "code": code,
239 "error": str(e),
240 }, exc_info=True)
242 self._logger.info({"event": "check_exits_finished", "signals_found": len(signals)})
243 return signals