Coverage for strategies / traditional_volume_breakout_strategy.py: 98%

286 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/traditional_volume_breakout_strategy.py 

2"""전통적 거래량 돌파매매 전략 (Traditional Volume Breakout). 

3 

4핵심: 20일 최고가를 거래량 150% 동반 돌파 시 매수, 트레일링 스탑/손절/가짜돌파/추세종료로 매도. 

5""" 

6from __future__ import annotations 

7 

8import asyncio 

9import json 

10import logging 

11import os 

12from dataclasses import dataclass, asdict 

13from typing import Dict, List, Optional 

14 

15from interfaces.live_strategy import LiveStrategy 

16from common.types import TradeSignal, ErrorCode 

17from services.stock_query_service import StockQueryService 

18from repositories.stock_code_repository import StockCodeRepository 

19from core.market_clock import MarketClock 

20from strategies.base_strategy_config import BaseStrategyConfig 

21from core.logger import get_strategy_logger 

22 

23 

24@dataclass 

25class TraditionalVBConfig(BaseStrategyConfig): 

26 """전통적 거래량 돌파 전략 설정.""" 

27 # 유니버스 필터 

28 min_avg_trading_value_5d: int = 10_000_000_000 # 5일 평균 거래대금 100억 원 

29 ma_period: int = 20 # 이동평균 기간 

30 high_period: int = 20 # 최고가 기간 

31 near_high_pct: float = 3.0 # 20일 최고가 대비 거리 3% 이내 

32 max_watchlist: int = 50 # 최대 감시 종목 수 

33 

34 # 매수 조건 

35 volume_breakout_multiplier: float = 1.5 # 20일 평균 거래량의 150% 

36 

37 # 매도 조건 

38 stop_loss_pct: float = -3.0 # 진입가 대비 -3% 손절 

39 trailing_stop_pct: float = 5.0 # 최고가 대비 -5% 트레일링 스탑 

40 

41 # 자금 관리 

42 total_portfolio_krw: int = 10_000_000 # 전체 포트폴리오 금액 (원) 

43 position_size_pct: float = 5.0 # 1회 매수 비중 (%) — MAX 5% 

44 min_qty: int = 1 # 최소 주문 수량 

45 

46 

47@dataclass 

48class WatchlistItem: 

49 """워치리스트 종목 정보.""" 

50 code: str 

51 name: str 

52 high_20d: int # 20일 최고가 (돌파 기준선) 

53 ma_20d: float # 20일 이동평균 

54 avg_vol_20d: float # 20일 평균 거래량 

55 avg_trading_value_5d: float # 5일 평균 거래대금 

56 

57 

58@dataclass 

59class PositionState: 

60 """보유 포지션 추적 상태.""" 

61 breakout_level: int # 진입 시점 20일 최고가 (가짜 돌파 판정용) 

62 peak_price: int # 진입 후 최고가 (트레일링 스탑용) 

63 

64 

65class TraditionalVolumeBreakoutStrategy(LiveStrategy): 

66 """전통적 거래량 돌파매매 전략. 

67 

68 scan(): 

69 1. 당일 첫 호출 시 워치리스트 빌드 (거래대금 상위 → 코스닥 필터 → 20일 OHLCV) 

70 2. 워치리스트 종목의 현재가/거래량으로 돌파 조건 검사 

71 3. 가격 돌파(20일 최고가) + 거래량 돌파(150%) AND 조건 충족 시 BUY 

72 

73 check_exits(): 

74 - 손절: 진입가 대비 -3% 

75 - 가짜돌파: 현재가 < 돌파 시 20일 최고가 

76 - 트레일링 스탑: 최고가 대비 -5% 

77 - 추세종료: 현재가 < 20일 MA 

78 """ 

79 

80 STATE_FILE = os.path.join("data", "tvb_position_state.json") 

81 

82 def __init__( 

83 self, 

84 stock_query_service: StockQueryService, 

85 stock_code_repository: StockCodeRepository, 

86 market_clock: MarketClock, 

87 config: Optional[TraditionalVBConfig] = None, 

88 logger: Optional[logging.Logger] = None, 

89 ): 

90 self._sqs = stock_query_service 

91 self.stock_code_repository = stock_code_repository 

92 self._tm = market_clock 

93 self._cfg = config or TraditionalVBConfig() 

94 if logger: 

95 self._logger = logger 

96 else: 

97 self._logger = get_strategy_logger("TraditionalVolumeBreakout") 

98 

99 # 내부 상태 

100 self._watchlist: Dict[str, WatchlistItem] = {} 

101 self._watchlist_date: str = "" # "YYYYMMDD" 형식 

102 self._position_state: Dict[str, PositionState] = {} 

103 

104 # 파일에서 포지션 상태 복원 

105 self._load_state() 

106 

107 @property 

108 def name(self) -> str: 

109 return "거래량돌파(전통)" 

110 

111 # ── 매수 스캔 ── 

112 

113 async def scan(self) -> List[TradeSignal]: 

114 signals: List[TradeSignal] = [] 

115 self._logger.info({"event": "scan_started", "strategy_name": self.name}) 

116 

117 # 1) 워치리스트 빌드 (당일 1회) 

118 today = self._tm.get_current_kst_time().strftime("%Y%m%d") 

119 if self._watchlist_date != today: 

120 await self._build_watchlist() 

121 self._watchlist_date = today 

122 

123 if not self._watchlist: 

124 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"}) 

125 return signals 

126 

127 self._logger.info({"event": "scan_with_watchlist", "count": len(self._watchlist)}) 

128 

129 # 2) 장중 경과 비율 (거래량 환산용) 

130 market_progress = self._get_market_progress_ratio() 

131 if market_progress <= 0: 

132 return signals 

133 

134 # 3) 각 종목 돌파 조건 체크 (청크 기반 병렬 처리, TPS 제한 대응) 

135 candidates = [ 

136 (code, item) for code, item in self._watchlist.items() 

137 if code not in self._position_state 

138 ] 

139 for i in range(0, len(candidates), 10): 

140 chunk = candidates[i:i + 10] 

141 results = await asyncio.gather( 

142 *[self._check_breakout_for_code(code, item, market_progress) for code, item in chunk], 

143 return_exceptions=True, 

144 ) 

145 for result in results: 

146 if isinstance(result, Exception): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 self._logger.error({"event": "scan_error", "error": str(result)}) 

148 elif result: 

149 signals.append(result) 

150 

151 self._logger.info({"event": "scan_finished", "signals_found": len(signals)}) 

152 return signals 

153 

154 async def _check_breakout_for_code( 

155 self, code: str, item: WatchlistItem, market_progress: float 

156 ) -> Optional[TradeSignal]: 

157 """단일 종목의 가격·거래량 돌파 조건을 검사하고 매수 시그널을 반환한다.""" 

158 log_data = {"code": code, "name": item.name, "watchlist_item": asdict(item)} 

159 try: 

160 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name) 

161 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value: 

162 return None 

163 

164 data = price_resp.data or {} 

165 current = int(data.get("price", "0") or "0") 

166 acml_vol = int(data.get("acml_vol", "0") or "0") 

167 log_data.update({"current_price": current, "accumulated_volume": acml_vol}) 

168 

169 if current <= 0: 

170 return None 

171 

172 # 가격 돌파: 현재가 > 20일 최고가 

173 if current <= item.high_20d: 

174 return None 

175 

176 # 거래량 돌파: 예상 일 거래량 >= 20일 평균 × 1.5 

177 projected_vol = acml_vol / market_progress if market_progress > 0 else acml_vol 

178 vol_threshold = item.avg_vol_20d * self._cfg.volume_breakout_multiplier 

179 log_data.update({"projected_volume": projected_vol, "volume_threshold": vol_threshold}) 

180 

181 if projected_vol < vol_threshold: 

182 log_data["reason"] = "Projected volume below threshold" 

183 self._logger.info({"event": "candidate_rejected", **log_data}) 

184 return None 

185 

186 self._logger.info({"event": "breakout_detected", **log_data}) 

187 

188 qty = self._calculate_qty(current) 

189 self._position_state[code] = PositionState( 

190 breakout_level=item.high_20d, 

191 peak_price=current, 

192 ) 

193 self._save_state() 

194 

195 vol_ratio = (projected_vol / item.avg_vol_20d * 100) if item.avg_vol_20d > 0 else 0.0 

196 reason_msg = ( 

197 f"전통돌파(돌파 {current:,}>{item.high_20d:,}, " 

198 f"예상거래 {vol_ratio:.0f}%(20일평균대비), " 

199 f"5일평균대금 {item.avg_trading_value_5d / 100_000_000:,.0f}억)" 

200 ) 

201 self._logger.info({ 

202 "event": "buy_signal_generated", 

203 "code": code, "name": item.name, "price": current, "qty": qty, 

204 "reason": reason_msg, "data": log_data, 

205 }) 

206 return TradeSignal( 

207 code=code, name=item.name, action="BUY", price=current, qty=qty, 

208 reason=reason_msg, strategy_name=self.name, 

209 ) 

210 except Exception as e: 

211 self._logger.error({"event": "scan_error", "code": code, "error": str(e)}, exc_info=True) 

212 return None 

213 

214 # ── 매도 체크 ── 

215 

216 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]: 

217 signals: List[TradeSignal] = [] 

218 state_dirty = False 

219 self._logger.info({"event": "check_exits_started", "holdings_count": len(holdings)}) 

220 

221 for hold in holdings: 

222 code = str(hold.get("code", "")) 

223 buy_price = hold.get("buy_price", 0) 

224 stock_name = hold.get("name", code) 

225 log_data = {"code": code, "name": stock_name, "buy_price": buy_price} 

226 

227 if not code or not buy_price: 

228 continue 

229 

230 try: 

231 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name) 

232 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value: 

233 continue 

234 

235 data = price_resp.data or {} 

236 current = int(data.get("price", "0") or "0") 

237 if current <= 0: 

238 continue 

239 

240 log_data["current_price"] = current 

241 

242 # 포지션 상태 가져오기 

243 state = self._position_state.get(code) 

244 if not state: 

245 self._logger.warning({"event": "missing_position_state", **log_data}) 

246 state = PositionState(breakout_level=buy_price, peak_price=buy_price) 

247 self._position_state[code] = state 

248 

249 log_data["position_state"] = asdict(state) 

250 

251 # 최고가 갱신 (dirty flag — 루프 후 1회 저장) 

252 if current > state.peak_price: 

253 state.peak_price = current 

254 state_dirty = True 

255 self._logger.info({"event": "peak_price_updated", "code": code, "new_peak": current}) 

256 

257 reason = "" 

258 should_sell = False 

259 pnl_pct = ((current - buy_price) / buy_price) * 100 

260 log_data["pnl_pct"] = round(pnl_pct, 2) 

261 

262 # 1) 손절 

263 if pnl_pct <= self._cfg.stop_loss_pct: 

264 reason = f"손절: 매수가({buy_price:,}) 대비 {pnl_pct:.1f}%" 

265 should_sell = True 

266 

267 # 2) 가짜돌파 

268 if not should_sell and current < state.breakout_level: 

269 reason = f"가짜돌파: 현재가({current:,}) < 돌파기준({state.breakout_level:,})" 

270 should_sell = True 

271 

272 # 3) 트레일링 스탑 

273 if not should_sell and state.peak_price > 0: 

274 drop_from_peak = ((current - state.peak_price) / state.peak_price) * 100 

275 log_data["drop_from_peak_pct"] = round(drop_from_peak, 2) 

276 if drop_from_peak <= -self._cfg.trailing_stop_pct: 

277 reason = ( 

278 f"트레일링스탑: 최고가({state.peak_price:,}) 대비 " 

279 f"{drop_from_peak:.1f}%" 

280 ) 

281 should_sell = True 

282 

283 # 4) 추세종료 

284 if not should_sell: 

285 ma_20d = await self._get_current_ma(code, self._cfg.ma_period) 

286 if ma_20d: 286 ↛ 292line 286 didn't jump to line 292 because the condition on line 286 was always true

287 log_data["ma_20d"] = ma_20d 

288 if current < ma_20d: 

289 reason = f"추세종료: 현재가({current:,}) < 20일MA({ma_20d:,.0f})" 

290 should_sell = True 

291 

292 if should_sell: 

293 self._position_state.pop(code, None) 

294 state_dirty = True 

295 api_stock_name = data.get("name", "") or self.stock_code_repository.get_name_by_code(code) or code 

296 holding_qty = int(hold.get("qty", 1)) 

297 signals.append(TradeSignal( 

298 code=code, name=api_stock_name, action="SELL", price=current, qty=holding_qty, 

299 reason=reason, strategy_name=self.name, 

300 )) 

301 self._logger.info({ 

302 "event": "sell_signal_generated", 

303 "code": code, "name": api_stock_name, "price": current, 

304 "reason": reason, "data": log_data, 

305 }) 

306 else: 

307 self._logger.info({"event": "hold_checked", "code": code, "reason": "No exit condition met", "data": log_data}) 

308 

309 

310 except Exception as e: 

311 self._logger.error({ 

312 "event": "check_exits_error", "code": code, "error": str(e), 

313 }, exc_info=True) 

314 

315 if state_dirty: 

316 self._save_state() 

317 self._logger.info({"event": "check_exits_finished", "signals_found": len(signals)}) 

318 return signals 

319 

320 # ── 워치리스트 빌드 ── 

321 

322 async def _build_watchlist(self): 

323 """거래대금 상위 → 코스닥 필터 → 20일 OHLCV → 조건 필터.""" 

324 self._watchlist.clear() 

325 self._logger.info({"event": "build_watchlist_started"}) 

326 

327 # 1) 거래대금 상위 종목 조회 

328 resp = await self._sqs.get_top_trading_value_stocks() 

329 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

330 self._logger.warning({"event": "build_watchlist_failed", "reason": "Failed to get top trading stocks"}) 

331 return 

332 

333 candidates = resp.data or [] 

334 self._logger.info({"event": "watchlist_candidates_fetched", "count": len(candidates)}) 

335 

336 watchlist_items: List[WatchlistItem] = [] 

337 

338 for stock in candidates: 

339 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or "" 

340 if not code: 

341 continue 

342 

343 stock_name = stock.get("hts_kor_isnm", "") or self.stock_code_repository.get_name_by_code(code) or code 

344 

345 try: 

346 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=self._cfg.high_period) 

347 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

348 if not ohlcv or len(ohlcv) < self._cfg.ma_period: 

349 continue 

350 

351 item = self._analyze_ohlcv(code, stock_name, ohlcv) 

352 if item: 

353 watchlist_items.append(item) 

354 

355 except Exception as e: 

356 self._logger.error({"event": "build_watchlist_error", "code": code, "error": str(e)}, exc_info=True) 

357 

358 self._watchlist = { 

359 item.code: item for item in watchlist_items[:self._cfg.max_watchlist] 

360 } 

361 

362 self._logger.info({ 

363 "event": "build_watchlist_finished", 

364 "initial_candidates": len(candidates), 

365 "final_watchlist_count": len(self._watchlist), 

366 "watchlist_codes": list(self._watchlist.keys()), 

367 }) 

368 

369 def _analyze_ohlcv(self, code: str, name: str, ohlcv: List[dict]) -> Optional[WatchlistItem]: 

370 """20일 OHLCV를 분석하여 조건 충족 시 WatchlistItem 반환.""" 

371 # ... (이하 로직은 로그 추가할 만한 부분이 적어 생략) ... 

372 if not ohlcv: 

373 return None 

374 

375 period = self._cfg.ma_period 

376 closes = [row.get("close", 0) for row in ohlcv[-period:] if row.get("close")] 

377 highs = [row.get("high", 0) for row in ohlcv[-period:] if row.get("high")] 

378 volumes = [row.get("volume", 0) for row in ohlcv[-period:] if row.get("volume")] 

379 

380 if len(closes) < period or len(highs) < period or len(volumes) < period: 

381 return None 

382 

383 ma_20d = sum(closes) / len(closes) 

384 high_20d = int(max(highs)) 

385 avg_vol_20d = sum(volumes) / len(volumes) 

386 

387 recent_5 = ohlcv[-5:] 

388 trading_values = [ 

389 (r.get("volume", 0) or 0) * (r.get("close", 0) or 0) for r in recent_5 

390 ] 

391 avg_trading_value_5d = sum(trading_values) / len(trading_values) if trading_values else 0 

392 prev_close = closes[-1] 

393 

394 log_data = { 

395 "code": code, "name": name, 

396 "ma_20d": ma_20d, "high_20d": high_20d, "avg_vol_20d": avg_vol_20d, 

397 "avg_trading_value_5d": avg_trading_value_5d, "prev_close": prev_close 

398 } 

399 

400 if avg_trading_value_5d < self._cfg.min_avg_trading_value_5d: 

401 self._logger.debug({"event": "ohlcv_filter_rejected", **log_data, "reason": "Avg trading value too low"}) 

402 return None 

403 if prev_close <= ma_20d: 

404 self._logger.debug({"event": "ohlcv_filter_rejected", **log_data, "reason": "Not in uptrend (close <= MA20)"}) 

405 return None 

406 if high_20d > 0: 406 ↛ 412line 406 didn't jump to line 412 because the condition on line 406 was always true

407 distance_pct = ((high_20d - prev_close) / high_20d) * 100 

408 if distance_pct > self._cfg.near_high_pct: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 self._logger.debug({"event": "ohlcv_filter_rejected", **log_data, "reason": f"Not near high ({distance_pct:.1f}% > {self._cfg.near_high_pct}%)"}) 

410 return None 

411 

412 self._logger.debug({"event": "ohlcv_filter_passed", **log_data}) 

413 return WatchlistItem( 

414 code=code, name=name, high_20d=high_20d, ma_20d=ma_20d, 

415 avg_vol_20d=avg_vol_20d, avg_trading_value_5d=avg_trading_value_5d, 

416 ) 

417 

418 # ── 상태 저장/복원 ── 

419 

420 def _load_state(self): 

421 """파일에서 포지션 상태를 복원한다.""" 

422 if not os.path.exists(self.STATE_FILE): 

423 return 

424 try: 

425 with open(self.STATE_FILE, "r", encoding="utf-8") as f: 

426 data = json.load(f) 

427 for code, state_dict in data.items(): 

428 self._position_state[code] = PositionState(**state_dict) 

429 if self._position_state: 

430 self._logger.info({ 

431 "event": "position_state_loaded", 

432 "count": len(self._position_state), 

433 "codes": list(self._position_state.keys()), 

434 }) 

435 except (json.JSONDecodeError, IOError, KeyError, TypeError) as e: 

436 self._logger.warning({"event": "load_state_failed", "error": str(e)}) 

437 

438 def _save_state(self): 

439 """포지션 상태를 파일에 저장한다.""" 

440 try: 

441 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True) 

442 data = {code: asdict(state) for code, state in self._position_state.items()} 

443 with open(self.STATE_FILE, "w", encoding="utf-8") as f: 

444 json.dump(data, f, ensure_ascii=False, indent=2) 

445 except (IOError, OSError) as e: 

446 self._logger.warning({"event": "save_state_failed", "error": str(e)}) 

447 

448 # ... (이하 유틸리티 함수는 로깅 추가 불필요) ... 

449 def _calculate_qty(self, price: int) -> int: 

450 """포트폴리오 비중 기반 주문 수량 계산.""" 

451 # [테스트용] 고정 수량 모드일 경우 무조건 1주 반환 

452 if self._cfg.use_fixed_qty: 

453 return 1 

454 

455 if price <= 0: 

456 return self._cfg.min_qty 

457 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100) 

458 qty = int(budget / price) 

459 return max(qty, self._cfg.min_qty) 

460 

461 def _get_market_progress_ratio(self) -> float: 

462 """장 시작 이후 경과 비율 (0.0 ~ 1.0). 거래량 환산용.""" 

463 now = self._tm.get_current_kst_time() 

464 open_time = self._tm.get_market_open_time() 

465 close_time = self._tm.get_market_close_time() 

466 

467 total_seconds = (close_time - open_time).total_seconds() 

468 elapsed_seconds = (now - open_time).total_seconds() 

469 

470 if total_seconds <= 0 or elapsed_seconds <= 0: 

471 return 0.0 

472 return min(elapsed_seconds / total_seconds, 1.0) 

473 

474 async def _get_current_ma(self, code: str, period: int) -> Optional[float]: 

475 """종목의 현재 N일 이동평균을 계산.""" 

476 try: 

477 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=period) 

478 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

479 if not ohlcv or len(ohlcv) < period: 

480 return None 

481 closes = [row.get("close", 0) for row in ohlcv[-period:] if row.get("close")] 

482 if len(closes) < period: 

483 return None 

484 return sum(closes) / len(closes) 

485 except Exception: 

486 return None