Coverage for strategies / volume_breakout_live_strategy.py: 91%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/volume_breakout_live_strategy.py 

2from __future__ import annotations 

3 

4import logging 

5from typing import List, Optional 

6 

7from interfaces.live_strategy import LiveStrategy 

8from common.types import TradeSignal, ErrorCode 

9from services.stock_query_service import StockQueryService 

10from strategies.volume_breakout_strategy import VolumeBreakoutConfig 

11from core.market_clock import MarketClock 

12from core.logger import get_strategy_logger 

13 

14 

15class VolumeBreakoutLiveStrategy(LiveStrategy): 

16 """거래량 돌파 라이브 전략. 

17 

18 scan(): 

19 1. 거래대금 상위 30 종목 조회 

20 2. 시가 대비 현재가 >= trigger_pct 필터 

21 3. 당일 거래량 >= 평균거래량 * avg_vol_multiplier 필터 

22 4. 통과 종목에 대해 BUY TradeSignal 반환 

23 

24 check_exits(): 

25 - 익절: 당일 고가 대비 <= -trailing_stop_pct 

26 - 손절: 매수가 대비 <= stop_loss_pct 

27 - 시간청산: 장 마감 15분 전 

28 """ 

29 

30 def __init__( 

31 self, 

32 stock_query_service: StockQueryService, 

33 market_clock: MarketClock, 

34 config: Optional[VolumeBreakoutConfig] = None, 

35 logger: Optional[logging.Logger] = None, 

36 ): 

37 self._sqs = stock_query_service 

38 self._tm = market_clock 

39 self._cfg = config or VolumeBreakoutConfig() 

40 if logger: 

41 self._logger = logger 

42 else: 

43 self._logger = get_strategy_logger("VolumeBreakoutLive") 

44 

45 self._bought_today = set() 

46 self._last_date = "" 

47 

48 @property 

49 def name(self) -> str: 

50 return "거래량돌파" 

51 

52 async def scan(self) -> List[TradeSignal]: 

53 signals: List[TradeSignal] = [] 

54 self._logger.info({"event": "scan_started", "strategy_name": self.name}) 

55 

56 # 날짜 변경 시 당일 매수 기록 초기화 

57 today = self._tm.get_current_kst_time().strftime("%Y%m%d") 

58 if self._last_date != today: 

59 self._bought_today.clear() 

60 self._last_date = today 

61 

62 # 1) 거래대금 상위 종목 조회 

63 resp = await self._sqs.get_top_trading_value_stocks() 

64 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

65 self._logger.warning({ 

66 "event": "scan_failed", 

67 "reason": "Failed to get top trading value stocks", 

68 "response": vars(resp) if resp else None, 

69 }) 

70 return signals 

71 

72 candidates = resp.data or [] 

73 self._logger.info({ 

74 "event": "scan_candidates_fetched", 

75 "count": len(candidates), 

76 }) 

77 

78 for stock in candidates: 

79 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or "" 

80 stock_name = stock.get("hts_kor_isnm", code) 

81 log_data = {"code": code, "name": stock_name} 

82 

83 if not code: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 continue 

85 

86 # 재진입 불허 설정 시, 이미 매수한 종목은 스킵 

87 if not self._cfg.allow_reentry and code in self._bought_today: 

88 continue 

89 

90 try: 

91 # 2) 현재가/시가 조회 

92 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name) 

93 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 log_data.update({"reason": "Failed to get current price"}) 

95 self._logger.info({"event": "candidate_rejected", **log_data}) 

96 continue 

97 

98 data = price_resp.data or {} 

99 current = int(data.get("price", "0") or "0") 

100 open_price = int(data.get("open", "0") or "0") 

101 current_vol = int(stock.get("acml_vol", "0") or "0") 

102 

103 log_data.update({ 

104 "current_price": current, 

105 "open_price": open_price, 

106 "volume": current_vol, 

107 }) 

108 

109 if open_price <= 0 or current <= 0: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 log_data.update({"reason": "Invalid price data (open or current is zero)"}) 

111 self._logger.info({"event": "candidate_rejected", **log_data}) 

112 continue 

113 

114 # 3) 시가 대비 변동률 체크 

115 change_from_open = (current / open_price - 1.0) * 100 

116 log_data["change_from_open_pct"] = round(change_from_open, 2) 

117 

118 if change_from_open < self._cfg.trigger_pct: 

119 log_data.update({"reason": f"Change from open {change_from_open:.2f}% < trigger {self._cfg.trigger_pct}%"}) 

120 self._logger.info({"event": "candidate_rejected", **log_data}) 

121 continue 

122 

123 # 4) 거래량 체크 (현재는 스킵, 추후 추가 가능) 

124 

125 # BUY 신호 생성 

126 reason_msg = ( 

127 f"시가돌파(시가대비 +{change_from_open:.1f}%, " 

128 f"상승확인 {open_price:,}->{current:,}, " 

129 f"누적거래량 {current_vol:,})" 

130 ) 

131 signals.append(TradeSignal( 

132 code=code, name=stock_name, action="BUY", price=current, qty=1, 

133 reason=reason_msg, strategy_name=self.name, 

134 )) 

135 self._bought_today.add(code) # 매수 신호 발생 기록 

136 self._logger.info({ 

137 "event": "buy_signal_generated", 

138 "strategy_name": self.name, 

139 "code": code, 

140 "name": stock_name, 

141 "price": current, 

142 "reason": reason_msg, 

143 "data": log_data, 

144 }) 

145 

146 except Exception as e: 

147 self._logger.error({ 

148 "event": "scan_error", 

149 "strategy_name": self.name, 

150 "code": code, 

151 "error": str(e), 

152 }, exc_info=True) 

153 

154 self._logger.info({"event": "scan_finished", "signals_found": len(signals)}) 

155 return signals 

156 

157 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]: 

158 signals: List[TradeSignal] = [] 

159 self._logger.info({"event": "check_exits_started", "holdings_count": len(holdings)}) 

160 

161 now = self._tm.get_current_kst_time() 

162 close_time = self._tm.get_market_close_time() 

163 minutes_to_close = (close_time - now).total_seconds() / 60 

164 

165 for hold in holdings: 

166 code = str(hold.get("code", "")) 

167 buy_price = hold.get("buy_price", 0) 

168 stock_name = hold.get("name", code) 

169 log_data = {"code": code, "name": stock_name, "buy_price": buy_price} 

170 

171 if not code or not buy_price: 

172 continue 

173 

174 try: 

175 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name) 

176 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value: 

177 self._logger.warning({ 

178 "event": "check_exits_failed", 

179 "reason": "Failed to get current price for holding", 

180 **log_data, 

181 }) 

182 continue 

183 

184 data = price_resp.data or {} 

185 current = int(data.get("price", "0") or "0") 

186 high_price = int(data.get("high", "0") or "0") 

187 log_data.update({"current_price": current, "day_high": high_price}) 

188 

189 if current <= 0 or high_price <= 0: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 continue 

191 

192 reason = "" 

193 should_sell = False 

194 

195 # 손익률과 고점 대비 하락률을 먼저 계산 

196 pnl_pct = ((current - buy_price) / buy_price) * 100 

197 drop_from_high = ((current - high_price) / high_price) * 100 

198 

199 # 익절 조건: 당일 고가 대비 설정된 비율(-8%) 이상 하락 시 (Trailing Stop) 

200 if drop_from_high <= -self._cfg.trailing_stop_pct: 

201 reason = f"익절(트레일링): 고가({high_price:,})대비 {drop_from_high:.1f}%" 

202 should_sell = True 

203 elif pnl_pct <= self._cfg.stop_loss_pct: 

204 reason = f"손절: 매수가대비 {pnl_pct:.1f}%" 

205 should_sell = True 

206 

207 if not should_sell and minutes_to_close <= 15: 

208 reason = f"시간청산: 장마감 {minutes_to_close:.0f}분전" 

209 should_sell = True 

210 

211 if should_sell: 

212 holding_qty = int(hold.get("qty", 1)) 

213 signals.append(TradeSignal( 

214 code=code, name=stock_name, action="SELL", price=current, qty=holding_qty, 

215 reason=reason, strategy_name=self.name, 

216 )) 

217 self._logger.info({ 

218 "event": "sell_signal_generated", 

219 "strategy_name": self.name, 

220 "code": code, 

221 "name": stock_name, 

222 "price": current, 

223 "reason": reason, 

224 "data": {**log_data, "pnl_pct": round(pnl_pct, 2), "drop_from_high_pct": round(drop_from_high, 2)}, 

225 }) 

226 else: 

227 self._logger.info({ 

228 "event": "hold_checked", 

229 "code": code, 

230 "reason": "No exit condition met", 

231 "data": log_data, 

232 }) 

233 

234 except Exception as e: 

235 self._logger.error({ 

236 "event": "check_exits_error", 

237 "strategy_name": self.name, 

238 "code": code, 

239 "error": str(e), 

240 }, exc_info=True) 

241 

242 self._logger.info({"event": "check_exits_finished", "signals_found": len(signals)}) 

243 return signals