Coverage for strategies / traditional_volume_breakout_strategy.py: 98%
286 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/traditional_volume_breakout_strategy.py
2"""전통적 거래량 돌파매매 전략 (Traditional Volume Breakout).
4핵심: 20일 최고가를 거래량 150% 동반 돌파 시 매수, 트레일링 스탑/손절/가짜돌파/추세종료로 매도.
5"""
6from __future__ import annotations
8import asyncio
9import json
10import logging
11import os
12from dataclasses import dataclass, asdict
13from typing import Dict, List, Optional
15from interfaces.live_strategy import LiveStrategy
16from common.types import TradeSignal, ErrorCode
17from services.stock_query_service import StockQueryService
18from repositories.stock_code_repository import StockCodeRepository
19from core.market_clock import MarketClock
20from strategies.base_strategy_config import BaseStrategyConfig
21from core.logger import get_strategy_logger
24@dataclass
25class TraditionalVBConfig(BaseStrategyConfig):
26 """전통적 거래량 돌파 전략 설정."""
27 # 유니버스 필터
28 min_avg_trading_value_5d: int = 10_000_000_000 # 5일 평균 거래대금 100억 원
29 ma_period: int = 20 # 이동평균 기간
30 high_period: int = 20 # 최고가 기간
31 near_high_pct: float = 3.0 # 20일 최고가 대비 거리 3% 이내
32 max_watchlist: int = 50 # 최대 감시 종목 수
34 # 매수 조건
35 volume_breakout_multiplier: float = 1.5 # 20일 평균 거래량의 150%
37 # 매도 조건
38 stop_loss_pct: float = -3.0 # 진입가 대비 -3% 손절
39 trailing_stop_pct: float = 5.0 # 최고가 대비 -5% 트레일링 스탑
41 # 자금 관리
42 total_portfolio_krw: int = 10_000_000 # 전체 포트폴리오 금액 (원)
43 position_size_pct: float = 5.0 # 1회 매수 비중 (%) — MAX 5%
44 min_qty: int = 1 # 최소 주문 수량
47@dataclass
48class WatchlistItem:
49 """워치리스트 종목 정보."""
50 code: str
51 name: str
52 high_20d: int # 20일 최고가 (돌파 기준선)
53 ma_20d: float # 20일 이동평균
54 avg_vol_20d: float # 20일 평균 거래량
55 avg_trading_value_5d: float # 5일 평균 거래대금
58@dataclass
59class PositionState:
60 """보유 포지션 추적 상태."""
61 breakout_level: int # 진입 시점 20일 최고가 (가짜 돌파 판정용)
62 peak_price: int # 진입 후 최고가 (트레일링 스탑용)
65class TraditionalVolumeBreakoutStrategy(LiveStrategy):
66 """전통적 거래량 돌파매매 전략.
68 scan():
69 1. 당일 첫 호출 시 워치리스트 빌드 (거래대금 상위 → 코스닥 필터 → 20일 OHLCV)
70 2. 워치리스트 종목의 현재가/거래량으로 돌파 조건 검사
71 3. 가격 돌파(20일 최고가) + 거래량 돌파(150%) AND 조건 충족 시 BUY
73 check_exits():
74 - 손절: 진입가 대비 -3%
75 - 가짜돌파: 현재가 < 돌파 시 20일 최고가
76 - 트레일링 스탑: 최고가 대비 -5%
77 - 추세종료: 현재가 < 20일 MA
78 """
80 STATE_FILE = os.path.join("data", "tvb_position_state.json")
82 def __init__(
83 self,
84 stock_query_service: StockQueryService,
85 stock_code_repository: StockCodeRepository,
86 market_clock: MarketClock,
87 config: Optional[TraditionalVBConfig] = None,
88 logger: Optional[logging.Logger] = None,
89 ):
90 self._sqs = stock_query_service
91 self.stock_code_repository = stock_code_repository
92 self._tm = market_clock
93 self._cfg = config or TraditionalVBConfig()
94 if logger:
95 self._logger = logger
96 else:
97 self._logger = get_strategy_logger("TraditionalVolumeBreakout")
99 # 내부 상태
100 self._watchlist: Dict[str, WatchlistItem] = {}
101 self._watchlist_date: str = "" # "YYYYMMDD" 형식
102 self._position_state: Dict[str, PositionState] = {}
104 # 파일에서 포지션 상태 복원
105 self._load_state()
107 @property
108 def name(self) -> str:
109 return "거래량돌파(전통)"
111 # ── 매수 스캔 ──
113 async def scan(self) -> List[TradeSignal]:
114 signals: List[TradeSignal] = []
115 self._logger.info({"event": "scan_started", "strategy_name": self.name})
117 # 1) 워치리스트 빌드 (당일 1회)
118 today = self._tm.get_current_kst_time().strftime("%Y%m%d")
119 if self._watchlist_date != today:
120 await self._build_watchlist()
121 self._watchlist_date = today
123 if not self._watchlist:
124 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"})
125 return signals
127 self._logger.info({"event": "scan_with_watchlist", "count": len(self._watchlist)})
129 # 2) 장중 경과 비율 (거래량 환산용)
130 market_progress = self._get_market_progress_ratio()
131 if market_progress <= 0:
132 return signals
134 # 3) 각 종목 돌파 조건 체크 (청크 기반 병렬 처리, TPS 제한 대응)
135 candidates = [
136 (code, item) for code, item in self._watchlist.items()
137 if code not in self._position_state
138 ]
139 for i in range(0, len(candidates), 10):
140 chunk = candidates[i:i + 10]
141 results = await asyncio.gather(
142 *[self._check_breakout_for_code(code, item, market_progress) for code, item in chunk],
143 return_exceptions=True,
144 )
145 for result in results:
146 if isinstance(result, Exception): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 self._logger.error({"event": "scan_error", "error": str(result)})
148 elif result:
149 signals.append(result)
151 self._logger.info({"event": "scan_finished", "signals_found": len(signals)})
152 return signals
154 async def _check_breakout_for_code(
155 self, code: str, item: WatchlistItem, market_progress: float
156 ) -> Optional[TradeSignal]:
157 """단일 종목의 가격·거래량 돌파 조건을 검사하고 매수 시그널을 반환한다."""
158 log_data = {"code": code, "name": item.name, "watchlist_item": asdict(item)}
159 try:
160 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name)
161 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value:
162 return None
164 data = price_resp.data or {}
165 current = int(data.get("price", "0") or "0")
166 acml_vol = int(data.get("acml_vol", "0") or "0")
167 log_data.update({"current_price": current, "accumulated_volume": acml_vol})
169 if current <= 0:
170 return None
172 # 가격 돌파: 현재가 > 20일 최고가
173 if current <= item.high_20d:
174 return None
176 # 거래량 돌파: 예상 일 거래량 >= 20일 평균 × 1.5
177 projected_vol = acml_vol / market_progress if market_progress > 0 else acml_vol
178 vol_threshold = item.avg_vol_20d * self._cfg.volume_breakout_multiplier
179 log_data.update({"projected_volume": projected_vol, "volume_threshold": vol_threshold})
181 if projected_vol < vol_threshold:
182 log_data["reason"] = "Projected volume below threshold"
183 self._logger.info({"event": "candidate_rejected", **log_data})
184 return None
186 self._logger.info({"event": "breakout_detected", **log_data})
188 qty = self._calculate_qty(current)
189 self._position_state[code] = PositionState(
190 breakout_level=item.high_20d,
191 peak_price=current,
192 )
193 self._save_state()
195 vol_ratio = (projected_vol / item.avg_vol_20d * 100) if item.avg_vol_20d > 0 else 0.0
196 reason_msg = (
197 f"전통돌파(돌파 {current:,}>{item.high_20d:,}, "
198 f"예상거래 {vol_ratio:.0f}%(20일평균대비), "
199 f"5일평균대금 {item.avg_trading_value_5d / 100_000_000:,.0f}억)"
200 )
201 self._logger.info({
202 "event": "buy_signal_generated",
203 "code": code, "name": item.name, "price": current, "qty": qty,
204 "reason": reason_msg, "data": log_data,
205 })
206 return TradeSignal(
207 code=code, name=item.name, action="BUY", price=current, qty=qty,
208 reason=reason_msg, strategy_name=self.name,
209 )
210 except Exception as e:
211 self._logger.error({"event": "scan_error", "code": code, "error": str(e)}, exc_info=True)
212 return None
214 # ── 매도 체크 ──
216 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]:
217 signals: List[TradeSignal] = []
218 state_dirty = False
219 self._logger.info({"event": "check_exits_started", "holdings_count": len(holdings)})
221 for hold in holdings:
222 code = str(hold.get("code", ""))
223 buy_price = hold.get("buy_price", 0)
224 stock_name = hold.get("name", code)
225 log_data = {"code": code, "name": stock_name, "buy_price": buy_price}
227 if not code or not buy_price:
228 continue
230 try:
231 price_resp = await self._sqs.handle_get_current_stock_price(code, caller=self.name)
232 if not price_resp or price_resp.rt_cd != ErrorCode.SUCCESS.value:
233 continue
235 data = price_resp.data or {}
236 current = int(data.get("price", "0") or "0")
237 if current <= 0:
238 continue
240 log_data["current_price"] = current
242 # 포지션 상태 가져오기
243 state = self._position_state.get(code)
244 if not state:
245 self._logger.warning({"event": "missing_position_state", **log_data})
246 state = PositionState(breakout_level=buy_price, peak_price=buy_price)
247 self._position_state[code] = state
249 log_data["position_state"] = asdict(state)
251 # 최고가 갱신 (dirty flag — 루프 후 1회 저장)
252 if current > state.peak_price:
253 state.peak_price = current
254 state_dirty = True
255 self._logger.info({"event": "peak_price_updated", "code": code, "new_peak": current})
257 reason = ""
258 should_sell = False
259 pnl_pct = ((current - buy_price) / buy_price) * 100
260 log_data["pnl_pct"] = round(pnl_pct, 2)
262 # 1) 손절
263 if pnl_pct <= self._cfg.stop_loss_pct:
264 reason = f"손절: 매수가({buy_price:,}) 대비 {pnl_pct:.1f}%"
265 should_sell = True
267 # 2) 가짜돌파
268 if not should_sell and current < state.breakout_level:
269 reason = f"가짜돌파: 현재가({current:,}) < 돌파기준({state.breakout_level:,})"
270 should_sell = True
272 # 3) 트레일링 스탑
273 if not should_sell and state.peak_price > 0:
274 drop_from_peak = ((current - state.peak_price) / state.peak_price) * 100
275 log_data["drop_from_peak_pct"] = round(drop_from_peak, 2)
276 if drop_from_peak <= -self._cfg.trailing_stop_pct:
277 reason = (
278 f"트레일링스탑: 최고가({state.peak_price:,}) 대비 "
279 f"{drop_from_peak:.1f}%"
280 )
281 should_sell = True
283 # 4) 추세종료
284 if not should_sell:
285 ma_20d = await self._get_current_ma(code, self._cfg.ma_period)
286 if ma_20d: 286 ↛ 292line 286 didn't jump to line 292 because the condition on line 286 was always true
287 log_data["ma_20d"] = ma_20d
288 if current < ma_20d:
289 reason = f"추세종료: 현재가({current:,}) < 20일MA({ma_20d:,.0f})"
290 should_sell = True
292 if should_sell:
293 self._position_state.pop(code, None)
294 state_dirty = True
295 api_stock_name = data.get("name", "") or self.stock_code_repository.get_name_by_code(code) or code
296 holding_qty = int(hold.get("qty", 1))
297 signals.append(TradeSignal(
298 code=code, name=api_stock_name, action="SELL", price=current, qty=holding_qty,
299 reason=reason, strategy_name=self.name,
300 ))
301 self._logger.info({
302 "event": "sell_signal_generated",
303 "code": code, "name": api_stock_name, "price": current,
304 "reason": reason, "data": log_data,
305 })
306 else:
307 self._logger.info({"event": "hold_checked", "code": code, "reason": "No exit condition met", "data": log_data})
310 except Exception as e:
311 self._logger.error({
312 "event": "check_exits_error", "code": code, "error": str(e),
313 }, exc_info=True)
315 if state_dirty:
316 self._save_state()
317 self._logger.info({"event": "check_exits_finished", "signals_found": len(signals)})
318 return signals
320 # ── 워치리스트 빌드 ──
322 async def _build_watchlist(self):
323 """거래대금 상위 → 코스닥 필터 → 20일 OHLCV → 조건 필터."""
324 self._watchlist.clear()
325 self._logger.info({"event": "build_watchlist_started"})
327 # 1) 거래대금 상위 종목 조회
328 resp = await self._sqs.get_top_trading_value_stocks()
329 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
330 self._logger.warning({"event": "build_watchlist_failed", "reason": "Failed to get top trading stocks"})
331 return
333 candidates = resp.data or []
334 self._logger.info({"event": "watchlist_candidates_fetched", "count": len(candidates)})
336 watchlist_items: List[WatchlistItem] = []
338 for stock in candidates:
339 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or ""
340 if not code:
341 continue
343 stock_name = stock.get("hts_kor_isnm", "") or self.stock_code_repository.get_name_by_code(code) or code
345 try:
346 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=self._cfg.high_period)
347 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else []
348 if not ohlcv or len(ohlcv) < self._cfg.ma_period:
349 continue
351 item = self._analyze_ohlcv(code, stock_name, ohlcv)
352 if item:
353 watchlist_items.append(item)
355 except Exception as e:
356 self._logger.error({"event": "build_watchlist_error", "code": code, "error": str(e)}, exc_info=True)
358 self._watchlist = {
359 item.code: item for item in watchlist_items[:self._cfg.max_watchlist]
360 }
362 self._logger.info({
363 "event": "build_watchlist_finished",
364 "initial_candidates": len(candidates),
365 "final_watchlist_count": len(self._watchlist),
366 "watchlist_codes": list(self._watchlist.keys()),
367 })
369 def _analyze_ohlcv(self, code: str, name: str, ohlcv: List[dict]) -> Optional[WatchlistItem]:
370 """20일 OHLCV를 분석하여 조건 충족 시 WatchlistItem 반환."""
371 # ... (이하 로직은 로그 추가할 만한 부분이 적어 생략) ...
372 if not ohlcv:
373 return None
375 period = self._cfg.ma_period
376 closes = [row.get("close", 0) for row in ohlcv[-period:] if row.get("close")]
377 highs = [row.get("high", 0) for row in ohlcv[-period:] if row.get("high")]
378 volumes = [row.get("volume", 0) for row in ohlcv[-period:] if row.get("volume")]
380 if len(closes) < period or len(highs) < period or len(volumes) < period:
381 return None
383 ma_20d = sum(closes) / len(closes)
384 high_20d = int(max(highs))
385 avg_vol_20d = sum(volumes) / len(volumes)
387 recent_5 = ohlcv[-5:]
388 trading_values = [
389 (r.get("volume", 0) or 0) * (r.get("close", 0) or 0) for r in recent_5
390 ]
391 avg_trading_value_5d = sum(trading_values) / len(trading_values) if trading_values else 0
392 prev_close = closes[-1]
394 log_data = {
395 "code": code, "name": name,
396 "ma_20d": ma_20d, "high_20d": high_20d, "avg_vol_20d": avg_vol_20d,
397 "avg_trading_value_5d": avg_trading_value_5d, "prev_close": prev_close
398 }
400 if avg_trading_value_5d < self._cfg.min_avg_trading_value_5d:
401 self._logger.debug({"event": "ohlcv_filter_rejected", **log_data, "reason": "Avg trading value too low"})
402 return None
403 if prev_close <= ma_20d:
404 self._logger.debug({"event": "ohlcv_filter_rejected", **log_data, "reason": "Not in uptrend (close <= MA20)"})
405 return None
406 if high_20d > 0: 406 ↛ 412line 406 didn't jump to line 412 because the condition on line 406 was always true
407 distance_pct = ((high_20d - prev_close) / high_20d) * 100
408 if distance_pct > self._cfg.near_high_pct: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 self._logger.debug({"event": "ohlcv_filter_rejected", **log_data, "reason": f"Not near high ({distance_pct:.1f}% > {self._cfg.near_high_pct}%)"})
410 return None
412 self._logger.debug({"event": "ohlcv_filter_passed", **log_data})
413 return WatchlistItem(
414 code=code, name=name, high_20d=high_20d, ma_20d=ma_20d,
415 avg_vol_20d=avg_vol_20d, avg_trading_value_5d=avg_trading_value_5d,
416 )
418 # ── 상태 저장/복원 ──
420 def _load_state(self):
421 """파일에서 포지션 상태를 복원한다."""
422 if not os.path.exists(self.STATE_FILE):
423 return
424 try:
425 with open(self.STATE_FILE, "r", encoding="utf-8") as f:
426 data = json.load(f)
427 for code, state_dict in data.items():
428 self._position_state[code] = PositionState(**state_dict)
429 if self._position_state:
430 self._logger.info({
431 "event": "position_state_loaded",
432 "count": len(self._position_state),
433 "codes": list(self._position_state.keys()),
434 })
435 except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
436 self._logger.warning({"event": "load_state_failed", "error": str(e)})
438 def _save_state(self):
439 """포지션 상태를 파일에 저장한다."""
440 try:
441 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True)
442 data = {code: asdict(state) for code, state in self._position_state.items()}
443 with open(self.STATE_FILE, "w", encoding="utf-8") as f:
444 json.dump(data, f, ensure_ascii=False, indent=2)
445 except (IOError, OSError) as e:
446 self._logger.warning({"event": "save_state_failed", "error": str(e)})
448 # ... (이하 유틸리티 함수는 로깅 추가 불필요) ...
449 def _calculate_qty(self, price: int) -> int:
450 """포트폴리오 비중 기반 주문 수량 계산."""
451 # [테스트용] 고정 수량 모드일 경우 무조건 1주 반환
452 if self._cfg.use_fixed_qty:
453 return 1
455 if price <= 0:
456 return self._cfg.min_qty
457 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100)
458 qty = int(budget / price)
459 return max(qty, self._cfg.min_qty)
461 def _get_market_progress_ratio(self) -> float:
462 """장 시작 이후 경과 비율 (0.0 ~ 1.0). 거래량 환산용."""
463 now = self._tm.get_current_kst_time()
464 open_time = self._tm.get_market_open_time()
465 close_time = self._tm.get_market_close_time()
467 total_seconds = (close_time - open_time).total_seconds()
468 elapsed_seconds = (now - open_time).total_seconds()
470 if total_seconds <= 0 or elapsed_seconds <= 0:
471 return 0.0
472 return min(elapsed_seconds / total_seconds, 1.0)
474 async def _get_current_ma(self, code: str, period: int) -> Optional[float]:
475 """종목의 현재 N일 이동평균을 계산."""
476 try:
477 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=period)
478 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else []
479 if not ohlcv or len(ohlcv) < period:
480 return None
481 closes = [row.get("close", 0) for row in ohlcv[-period:] if row.get("close")]
482 if len(closes) < period:
483 return None
484 return sum(closes) / len(closes)
485 except Exception:
486 return None