Coverage for strategies / high_tight_flag_strategy.py: 93%
237 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/high_tight_flag_strategy.py
2from __future__ import annotations
4import asyncio
5import logging
6import os
7import json
8from dataclasses import asdict
9from typing import List, Optional, Dict
11from interfaces.live_strategy import LiveStrategy
12from common.types import TradeSignal
13from services.stock_query_service import StockQueryService
14from core.market_clock import MarketClock
15from strategies.oneil_common_types import HTFConfig, HTFPositionState
16from services.oneil_universe_service import OneilUniverseService
17from core.logger import get_strategy_logger
20class HighTightFlagStrategy(LiveStrategy):
21 """하이 타이트 플래그 (High Tight Flag) 전략.
23 핵심: 40거래일 내 90%+ 폭등(깃대) 후 고점 대비 20% 이내 횡보(깃발)하는 종목이
24 거래량·체결강도를 동반하며 신고가를 돌파하는 순간 매수.
26 Phase 1 (깃대): min→max 90%+ 급등 확인
27 Phase 2 (깃발): 고점 대비 <=20% 하락 횡보 + 거래량 건조
28 Phase 3 (돌파): 현재가 > 40일 최고가 + 예상거래량 200%+ + 체결강도 120%+
29 Phase 4 (청산): 칼손절 -5% / 10일 MA 트레일링스탑
30 """
32 STATE_FILE = os.path.join("data", "htf_position_state.json")
34 def __init__(
35 self,
36 stock_query_service: StockQueryService,
37 universe_service: OneilUniverseService,
38 market_clock: MarketClock,
39 config: Optional[HTFConfig] = None,
40 logger: Optional[logging.Logger] = None,
41 ):
42 self._sqs = stock_query_service
43 self._universe = universe_service
44 self._tm = market_clock
45 self._cfg = config or HTFConfig()
46 if logger: 46 ↛ 49line 46 didn't jump to line 49 because the condition on line 46 was always true
47 self._logger = logger
48 else:
49 self._logger = get_strategy_logger("HighTightFlag", sub_dir="oneil")
51 self._position_state: Dict[str, HTFPositionState] = {}
52 self._load_state()
54 @property
55 def name(self) -> str:
56 return "하이타이트플래그"
58 # ── scan ──────────────────────────────────────────────────────────
60 async def scan(self) -> List[TradeSignal]:
61 signals: List[TradeSignal] = []
62 self._logger.info({"event": "scan_started", "strategy_name": self.name})
64 watchlist = await self._universe.get_watchlist(logger=self._logger)
65 if not watchlist:
66 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"})
67 return signals
69 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)})
71 market_progress = self._get_market_progress_ratio()
72 if market_progress <= 0:
73 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"})
74 return signals
76 # 마켓 타이밍 사전 체크
77 market_timing = {
78 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger),
79 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger)
80 }
81 if not any(market_timing.values()):
82 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"})
83 return signals
85 candidates = [
86 (code, item) for code, item in watchlist.items()
87 if code not in self._position_state
88 and market_timing.get(item.market, False)
89 ]
90 for i in range(0, len(candidates), 10):
91 chunk = candidates[i:i + 10]
92 results = await asyncio.gather(
93 *[self._check_htf_setup(code, item, market_progress) for code, item in chunk],
94 return_exceptions=True,
95 )
96 for result in results:
97 if isinstance(result, Exception): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 self._logger.error(f"Scan error: {result}")
99 elif result:
100 signals.append(result)
102 self._logger.info({"event": "scan_finished", "signals_found": len(signals)})
103 return signals
105 async def _check_htf_setup(self, code, item, progress) -> Optional[TradeSignal]:
106 """HTF 패턴 감지 + 실시간 돌파 확인."""
107 # 1. OHLCV 조회 (깃대 40일 + 깃발 최대 25일 = 65일)
108 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=65)
109 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else []
110 if not ohlcv or len(ohlcv) < self._cfg.pole_lookback_days:
111 return None
113 # 2. Phase 1+2: 깃대·깃발 패턴 감지 (순수 계산)
114 pattern = self._detect_pole_and_flag(ohlcv)
115 if not pattern:
116 return None
118 self._logger.info({
119 "event": "htf_pattern_detected",
120 "code": code, "name": item.name,
121 "surge_ratio": round(pattern["surge_ratio"], 2),
122 "flag_days": pattern["flag_days"],
123 "drawdown_pct": round(pattern["drawdown_pct"], 1),
124 })
126 # 3. Phase 3: 실시간 돌파 확인
127 return await self._check_breakout(code, item, pattern, ohlcv, progress)
129 def _detect_pole_and_flag(self, ohlcv: list) -> Optional[dict]:
130 """Phase 1+2: 깃대 폭등 + 깃발 횡보 패턴 감지 (순수 계산, API 호출 없음).
132 Returns:
133 dict with pole_high, surge_ratio, flag_days, drawdown_pct or None
134 """
135 highs = [r.get("high", 0) for r in ohlcv]
136 lows = [r.get("low", 0) for r in ohlcv]
137 volumes = [r.get("volume", 0) for r in ohlcv]
138 n = len(ohlcv)
140 # 전체 구간에서 최고점 찾기
141 peak_high = max(highs)
142 peak_idx = highs.index(peak_high)
144 # Phase 1: 깃대 (peak 이전 최대 40일 구간)
145 pole_start = max(0, peak_idx - self._cfg.pole_lookback_days + 1)
146 pole_lows = lows[pole_start:peak_idx + 1]
147 if not pole_lows: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 return None
150 pole_low = min(pole_lows)
151 if pole_low <= 0: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 return None
154 surge_ratio = peak_high / pole_low
155 if surge_ratio < self._cfg.pole_min_surge_ratio:
156 return None
158 # Phase 2: 깃발 (peak 이후 구간)
159 flag_days = n - peak_idx - 1
160 if flag_days < self._cfg.flag_min_days:
161 return None
162 if flag_days > self._cfg.flag_max_days:
163 return None
165 # 깃발 구간 하락폭 체크 (종가 기준 — 장중 꼬리는 용인)
166 closes = [r.get("close", 0) for r in ohlcv]
167 flag_closes = closes[peak_idx + 1:]
168 flag_min_close = min(flag_closes)
169 drawdown_pct = (peak_high - flag_min_close) / peak_high * 100
170 if drawdown_pct > self._cfg.flag_max_drawdown_pct:
171 return None
173 # 거래량 감소 확인 (깃발 평균 vs 50일 평균 거래량)
174 flag_volumes = volumes[peak_idx + 1:]
175 flag_avg_vol = sum(flag_volumes) / len(flag_volumes) if flag_volumes else 0
177 vol_count = min(50, n)
178 avg_vol_50d = sum(volumes[-vol_count:]) / vol_count if vol_count > 0 else 0
179 if avg_vol_50d <= 0: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 return None
181 if flag_avg_vol > avg_vol_50d * self._cfg.flag_volume_shrink_ratio:
182 return None
184 return {
185 "pole_high": peak_high,
186 "pole_low": pole_low,
187 "surge_ratio": surge_ratio,
188 "flag_days": flag_days,
189 "drawdown_pct": drawdown_pct,
190 "avg_vol_50d": avg_vol_50d,
191 "flag_avg_vol": flag_avg_vol,
192 }
194 async def _check_breakout(self, code, item, pattern, ohlcv, progress) -> Optional[TradeSignal]:
195 """Phase 3: 실시간 돌파 확인 (가격 + 거래량 + 체결강도)."""
196 # 1. 현재가 조회
197 resp = await self._sqs.get_current_price(code, caller=self.name)
198 if not resp or resp.rt_cd != "0":
199 return None
201 out = resp.data.get("output") if isinstance(resp.data, dict) else None
202 if not out:
203 return None
205 if isinstance(out, dict):
206 current = int(out.get("stck_prpr", 0))
207 vol = int(out.get("acml_vol", 0))
208 else:
209 current = int(getattr(out, "stck_prpr", 0) or 0)
210 vol = int(getattr(out, "acml_vol", 0) or 0)
212 if current <= 0:
213 return None
215 # 2. 가격 돌파: 현재가 > 40일 최고가 (옵션 A)
216 pole_high = pattern["pole_high"]
217 if current <= pole_high:
218 # 너무 많은 로그를 피하기 위해 가격 미달 단계는 로그 생략
219 return None
221 # 3. 거래량 돌파: 예상거래량 >= 50일 평균 * 200%
222 effective_progress = max(progress, 0.05)
223 proj_vol = vol / effective_progress
225 volumes = [r.get("volume", 0) for r in ohlcv if r.get("volume")]
226 vol_count = min(50, len(volumes))
227 if vol_count < 20: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "insufficient_volume_data"})
229 return None
230 avg_vol_50d = sum(volumes[-vol_count:]) / vol_count
232 vol_threshold = avg_vol_50d * self._cfg.volume_breakout_multiplier
233 if proj_vol < vol_threshold:
234 self._logger.debug({
235 "event": "breakout_rejected", "code": code,
236 "reason": "insufficient_projected_volume",
237 "proj_vol": int(proj_vol), "threshold": int(vol_threshold)
238 })
239 return None
241 # 4. 체결강도 >= 120%
242 cgld_val = 0.0
243 try:
244 ccnl_resp = await self._sqs.get_stock_conclusion(code)
245 if ccnl_resp and ccnl_resp.rt_cd == "0": 245 ↛ 254line 245 didn't jump to line 254 because the condition on line 245 was always true
246 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None
247 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0:
248 val = ccnl_output[0].get("tday_rltv")
249 cgld_val = float(val) if val else 0.0
250 except Exception as e:
251 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)})
252 return None
254 if cgld_val < self._cfg.execution_strength_min:
255 self._logger.debug({
256 "event": "breakout_rejected", "code": code,
257 "reason": "low_execution_strength",
258 "cgld": cgld_val, "threshold": self._cfg.execution_strength_min
259 })
260 return None
262 # ========= 모든 관문 통과! 매수 시그널 생성 =========
263 qty = self._calculate_qty(current)
264 self._position_state[code] = HTFPositionState(
265 entry_price=current,
266 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"),
267 peak_price=current,
268 pole_high=pole_high,
269 )
270 self._save_state()
272 vol_ratio = (proj_vol / avg_vol_50d * 100) if avg_vol_50d > 0 else 0.0
274 reason_msg = (
275 f"HTF돌파(돌파 {current:,}>{pole_high:,}, "
276 f"예상거래 {vol_ratio:.0f}%, "
277 f"깃대폭등 {pattern['surge_ratio']:.0%}, "
278 f"깃발 {pattern['flag_days']}일(-{pattern['drawdown_pct']:.1f}%), "
279 f"체결강도 {cgld_val:.1f}%)"
280 )
282 self._logger.info({
283 "event": "buy_signal_generated",
284 "code": code, "name": item.name,
285 "price": current, "reason": reason_msg,
286 })
288 return TradeSignal(
289 code=code, name=item.name, action="BUY", price=current, qty=qty,
290 reason=reason_msg, strategy_name=self.name,
291 )
293 # ── check_exits ──────────────────────────────────────────────────
295 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]:
296 signals = []
297 state_dirty = False
298 for hold in holdings:
299 code = hold.get("code")
300 buy_price = hold.get("buy_price")
301 if not code or not buy_price:
302 continue
304 state = self._position_state.get(code)
305 if not state:
306 state = HTFPositionState(buy_price, "", buy_price, buy_price)
307 self._position_state[code] = state
309 resp = await self._sqs.get_current_price(code, caller=self.name)
310 if not resp or resp.rt_cd != "0":
311 continue
313 output = resp.data.get("output") if isinstance(resp.data, dict) else None
314 if not output:
315 continue
317 if isinstance(output, dict): 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true
318 current = int(output.get("stck_prpr", 0))
319 else:
320 current = int(getattr(output, "stck_prpr", 0) or 0)
322 if current <= 0:
323 continue
325 # 최고가 갱신 (dirty flag — 루프 후 1회 저장)
326 if current > state.peak_price:
327 state.peak_price = current
328 state_dirty = True
330 pnl = (current - buy_price) / buy_price * 100
331 reason = ""
333 # 1. 칼손절
334 if pnl <= self._cfg.stop_loss_pct:
335 reason = f"칼손절({pnl:.1f}%)"
337 # 2. 10일 MA 트레일링스탑
338 if not reason:
339 is_break, break_reason = await self._check_trailing_ma_stop(code, current)
340 if is_break:
341 reason = break_reason
343 # 매도 시그널 생성 (전량 매도)
344 if reason:
345 holding_qty = int(hold.get("qty", 1))
346 self._position_state.pop(code, None)
347 state_dirty = True
348 signals.append(TradeSignal(
349 code=code, name=hold.get("name", code), action="SELL",
350 price=current, qty=holding_qty, reason=reason, strategy_name=self.name,
351 ))
353 if state_dirty:
354 self._save_state()
355 return signals
357 async def _check_trailing_ma_stop(self, code: str, current_price: int) -> tuple:
358 """10일 MA 트레일링스탑 체크."""
359 period = self._cfg.trailing_ma_period
360 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=period)
361 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else []
362 if not ohlcv or len(ohlcv) < period:
363 return False, ""
365 closes = [r.get("close", 0) for r in ohlcv if r.get("close")]
366 if len(closes) < period: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true
367 return False, ""
369 ma = sum(closes[-period:]) / period
370 if current_price < ma:
371 return True, f"트레일링스탑(10MA {ma:,.0f} 하향이탈)"
372 return False, ""
374 # ── 헬퍼 ─────────────────────────────────────────────────────────
376 def _calculate_qty(self, price: int) -> int:
377 if price <= 0:
378 return self._cfg.min_qty
379 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100)
380 return max(int(budget / price), self._cfg.min_qty)
382 def _get_market_progress_ratio(self) -> float:
383 now = self._tm.get_current_kst_time()
384 open_t = self._tm.get_market_open_time()
385 close_t = self._tm.get_market_close_time()
386 total = (close_t - open_t).total_seconds()
387 elapsed = (now - open_t).total_seconds()
388 return min(elapsed / total, 1.0) if total > 0 else 0.0
390 def _load_state(self):
391 if os.path.exists(self.STATE_FILE):
392 try:
393 with open(self.STATE_FILE, "r") as f:
394 data = json.load(f)
395 for k, v in data.items():
396 self._position_state[k] = HTFPositionState(**v)
397 except Exception:
398 pass
400 def _save_state(self):
401 try:
402 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True)
403 data = {k: asdict(v) for k, v in self._position_state.items()}
404 with open(self.STATE_FILE, "w") as f:
405 json.dump(data, f, indent=2)
406 except Exception:
407 pass