Coverage for strategies / high_tight_flag_strategy.py: 93%

237 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/high_tight_flag_strategy.py 

2from __future__ import annotations 

3 

4import asyncio 

5import logging 

6import os 

7import json 

8from dataclasses import asdict 

9from typing import List, Optional, Dict 

10 

11from interfaces.live_strategy import LiveStrategy 

12from common.types import TradeSignal 

13from services.stock_query_service import StockQueryService 

14from core.market_clock import MarketClock 

15from strategies.oneil_common_types import HTFConfig, HTFPositionState 

16from services.oneil_universe_service import OneilUniverseService 

17from core.logger import get_strategy_logger 

18 

19 

20class HighTightFlagStrategy(LiveStrategy): 

21 """하이 타이트 플래그 (High Tight Flag) 전략. 

22 

23 핵심: 40거래일 내 90%+ 폭등(깃대) 후 고점 대비 20% 이내 횡보(깃발)하는 종목이 

24 거래량·체결강도를 동반하며 신고가를 돌파하는 순간 매수. 

25 

26 Phase 1 (깃대): min→max 90%+ 급등 확인 

27 Phase 2 (깃발): 고점 대비 <=20% 하락 횡보 + 거래량 건조 

28 Phase 3 (돌파): 현재가 > 40일 최고가 + 예상거래량 200%+ + 체결강도 120%+ 

29 Phase 4 (청산): 칼손절 -5% / 10일 MA 트레일링스탑 

30 """ 

31 

32 STATE_FILE = os.path.join("data", "htf_position_state.json") 

33 

34 def __init__( 

35 self, 

36 stock_query_service: StockQueryService, 

37 universe_service: OneilUniverseService, 

38 market_clock: MarketClock, 

39 config: Optional[HTFConfig] = None, 

40 logger: Optional[logging.Logger] = None, 

41 ): 

42 self._sqs = stock_query_service 

43 self._universe = universe_service 

44 self._tm = market_clock 

45 self._cfg = config or HTFConfig() 

46 if logger: 46 ↛ 49line 46 didn't jump to line 49 because the condition on line 46 was always true

47 self._logger = logger 

48 else: 

49 self._logger = get_strategy_logger("HighTightFlag", sub_dir="oneil") 

50 

51 self._position_state: Dict[str, HTFPositionState] = {} 

52 self._load_state() 

53 

54 @property 

55 def name(self) -> str: 

56 return "하이타이트플래그" 

57 

58 # ── scan ────────────────────────────────────────────────────────── 

59 

60 async def scan(self) -> List[TradeSignal]: 

61 signals: List[TradeSignal] = [] 

62 self._logger.info({"event": "scan_started", "strategy_name": self.name}) 

63 

64 watchlist = await self._universe.get_watchlist(logger=self._logger) 

65 if not watchlist: 

66 self._logger.info({"event": "scan_skipped", "reason": "Watchlist is empty"}) 

67 return signals 

68 

69 self._logger.info({"event": "scan_with_watchlist", "count": len(watchlist)}) 

70 

71 market_progress = self._get_market_progress_ratio() 

72 if market_progress <= 0: 

73 self._logger.info({"event": "scan_skipped", "reason": "Market not open or just started"}) 

74 return signals 

75 

76 # 마켓 타이밍 사전 체크 

77 market_timing = { 

78 "KOSPI": await self._universe.is_market_timing_ok("KOSPI", logger=self._logger), 

79 "KOSDAQ": await self._universe.is_market_timing_ok("KOSDAQ", logger=self._logger) 

80 } 

81 if not any(market_timing.values()): 

82 self._logger.info({"event": "scan_skipped", "reason": "Bad market timing for both markets"}) 

83 return signals 

84 

85 candidates = [ 

86 (code, item) for code, item in watchlist.items() 

87 if code not in self._position_state 

88 and market_timing.get(item.market, False) 

89 ] 

90 for i in range(0, len(candidates), 10): 

91 chunk = candidates[i:i + 10] 

92 results = await asyncio.gather( 

93 *[self._check_htf_setup(code, item, market_progress) for code, item in chunk], 

94 return_exceptions=True, 

95 ) 

96 for result in results: 

97 if isinstance(result, Exception): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 self._logger.error(f"Scan error: {result}") 

99 elif result: 

100 signals.append(result) 

101 

102 self._logger.info({"event": "scan_finished", "signals_found": len(signals)}) 

103 return signals 

104 

105 async def _check_htf_setup(self, code, item, progress) -> Optional[TradeSignal]: 

106 """HTF 패턴 감지 + 실시간 돌파 확인.""" 

107 # 1. OHLCV 조회 (깃대 40일 + 깃발 최대 25일 = 65일) 

108 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=65) 

109 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else [] 

110 if not ohlcv or len(ohlcv) < self._cfg.pole_lookback_days: 

111 return None 

112 

113 # 2. Phase 1+2: 깃대·깃발 패턴 감지 (순수 계산) 

114 pattern = self._detect_pole_and_flag(ohlcv) 

115 if not pattern: 

116 return None 

117 

118 self._logger.info({ 

119 "event": "htf_pattern_detected", 

120 "code": code, "name": item.name, 

121 "surge_ratio": round(pattern["surge_ratio"], 2), 

122 "flag_days": pattern["flag_days"], 

123 "drawdown_pct": round(pattern["drawdown_pct"], 1), 

124 }) 

125 

126 # 3. Phase 3: 실시간 돌파 확인 

127 return await self._check_breakout(code, item, pattern, ohlcv, progress) 

128 

129 def _detect_pole_and_flag(self, ohlcv: list) -> Optional[dict]: 

130 """Phase 1+2: 깃대 폭등 + 깃발 횡보 패턴 감지 (순수 계산, API 호출 없음). 

131 

132 Returns: 

133 dict with pole_high, surge_ratio, flag_days, drawdown_pct or None 

134 """ 

135 highs = [r.get("high", 0) for r in ohlcv] 

136 lows = [r.get("low", 0) for r in ohlcv] 

137 volumes = [r.get("volume", 0) for r in ohlcv] 

138 n = len(ohlcv) 

139 

140 # 전체 구간에서 최고점 찾기 

141 peak_high = max(highs) 

142 peak_idx = highs.index(peak_high) 

143 

144 # Phase 1: 깃대 (peak 이전 최대 40일 구간) 

145 pole_start = max(0, peak_idx - self._cfg.pole_lookback_days + 1) 

146 pole_lows = lows[pole_start:peak_idx + 1] 

147 if not pole_lows: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 return None 

149 

150 pole_low = min(pole_lows) 

151 if pole_low <= 0: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 return None 

153 

154 surge_ratio = peak_high / pole_low 

155 if surge_ratio < self._cfg.pole_min_surge_ratio: 

156 return None 

157 

158 # Phase 2: 깃발 (peak 이후 구간) 

159 flag_days = n - peak_idx - 1 

160 if flag_days < self._cfg.flag_min_days: 

161 return None 

162 if flag_days > self._cfg.flag_max_days: 

163 return None 

164 

165 # 깃발 구간 하락폭 체크 (종가 기준 — 장중 꼬리는 용인) 

166 closes = [r.get("close", 0) for r in ohlcv] 

167 flag_closes = closes[peak_idx + 1:] 

168 flag_min_close = min(flag_closes) 

169 drawdown_pct = (peak_high - flag_min_close) / peak_high * 100 

170 if drawdown_pct > self._cfg.flag_max_drawdown_pct: 

171 return None 

172 

173 # 거래량 감소 확인 (깃발 평균 vs 50일 평균 거래량) 

174 flag_volumes = volumes[peak_idx + 1:] 

175 flag_avg_vol = sum(flag_volumes) / len(flag_volumes) if flag_volumes else 0 

176 

177 vol_count = min(50, n) 

178 avg_vol_50d = sum(volumes[-vol_count:]) / vol_count if vol_count > 0 else 0 

179 if avg_vol_50d <= 0: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 return None 

181 if flag_avg_vol > avg_vol_50d * self._cfg.flag_volume_shrink_ratio: 

182 return None 

183 

184 return { 

185 "pole_high": peak_high, 

186 "pole_low": pole_low, 

187 "surge_ratio": surge_ratio, 

188 "flag_days": flag_days, 

189 "drawdown_pct": drawdown_pct, 

190 "avg_vol_50d": avg_vol_50d, 

191 "flag_avg_vol": flag_avg_vol, 

192 } 

193 

194 async def _check_breakout(self, code, item, pattern, ohlcv, progress) -> Optional[TradeSignal]: 

195 """Phase 3: 실시간 돌파 확인 (가격 + 거래량 + 체결강도).""" 

196 # 1. 현재가 조회 

197 resp = await self._sqs.get_current_price(code, caller=self.name) 

198 if not resp or resp.rt_cd != "0": 

199 return None 

200 

201 out = resp.data.get("output") if isinstance(resp.data, dict) else None 

202 if not out: 

203 return None 

204 

205 if isinstance(out, dict): 

206 current = int(out.get("stck_prpr", 0)) 

207 vol = int(out.get("acml_vol", 0)) 

208 else: 

209 current = int(getattr(out, "stck_prpr", 0) or 0) 

210 vol = int(getattr(out, "acml_vol", 0) or 0) 

211 

212 if current <= 0: 

213 return None 

214 

215 # 2. 가격 돌파: 현재가 > 40일 최고가 (옵션 A) 

216 pole_high = pattern["pole_high"] 

217 if current <= pole_high: 

218 # 너무 많은 로그를 피하기 위해 가격 미달 단계는 로그 생략 

219 return None 

220 

221 # 3. 거래량 돌파: 예상거래량 >= 50일 평균 * 200% 

222 effective_progress = max(progress, 0.05) 

223 proj_vol = vol / effective_progress 

224 

225 volumes = [r.get("volume", 0) for r in ohlcv if r.get("volume")] 

226 vol_count = min(50, len(volumes)) 

227 if vol_count < 20: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 self._logger.debug({"event": "breakout_rejected", "code": code, "reason": "insufficient_volume_data"}) 

229 return None 

230 avg_vol_50d = sum(volumes[-vol_count:]) / vol_count 

231 

232 vol_threshold = avg_vol_50d * self._cfg.volume_breakout_multiplier 

233 if proj_vol < vol_threshold: 

234 self._logger.debug({ 

235 "event": "breakout_rejected", "code": code, 

236 "reason": "insufficient_projected_volume", 

237 "proj_vol": int(proj_vol), "threshold": int(vol_threshold) 

238 }) 

239 return None 

240 

241 # 4. 체결강도 >= 120% 

242 cgld_val = 0.0 

243 try: 

244 ccnl_resp = await self._sqs.get_stock_conclusion(code) 

245 if ccnl_resp and ccnl_resp.rt_cd == "0": 245 ↛ 254line 245 didn't jump to line 254 because the condition on line 245 was always true

246 ccnl_output = ccnl_resp.data.get("output") if isinstance(ccnl_resp.data, dict) else None 

247 if ccnl_output and isinstance(ccnl_output, list) and len(ccnl_output) > 0: 

248 val = ccnl_output[0].get("tday_rltv") 

249 cgld_val = float(val) if val else 0.0 

250 except Exception as e: 

251 self._logger.warning({"event": "cgld_check_failed", "code": code, "error": str(e)}) 

252 return None 

253 

254 if cgld_val < self._cfg.execution_strength_min: 

255 self._logger.debug({ 

256 "event": "breakout_rejected", "code": code, 

257 "reason": "low_execution_strength", 

258 "cgld": cgld_val, "threshold": self._cfg.execution_strength_min 

259 }) 

260 return None 

261 

262 # ========= 모든 관문 통과! 매수 시그널 생성 ========= 

263 qty = self._calculate_qty(current) 

264 self._position_state[code] = HTFPositionState( 

265 entry_price=current, 

266 entry_date=self._tm.get_current_kst_time().strftime("%Y%m%d"), 

267 peak_price=current, 

268 pole_high=pole_high, 

269 ) 

270 self._save_state() 

271 

272 vol_ratio = (proj_vol / avg_vol_50d * 100) if avg_vol_50d > 0 else 0.0 

273 

274 reason_msg = ( 

275 f"HTF돌파(돌파 {current:,}>{pole_high:,}, " 

276 f"예상거래 {vol_ratio:.0f}%, " 

277 f"깃대폭등 {pattern['surge_ratio']:.0%}, " 

278 f"깃발 {pattern['flag_days']}일(-{pattern['drawdown_pct']:.1f}%), " 

279 f"체결강도 {cgld_val:.1f}%)" 

280 ) 

281 

282 self._logger.info({ 

283 "event": "buy_signal_generated", 

284 "code": code, "name": item.name, 

285 "price": current, "reason": reason_msg, 

286 }) 

287 

288 return TradeSignal( 

289 code=code, name=item.name, action="BUY", price=current, qty=qty, 

290 reason=reason_msg, strategy_name=self.name, 

291 ) 

292 

293 # ── check_exits ────────────────────────────────────────────────── 

294 

295 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]: 

296 signals = [] 

297 state_dirty = False 

298 for hold in holdings: 

299 code = hold.get("code") 

300 buy_price = hold.get("buy_price") 

301 if not code or not buy_price: 

302 continue 

303 

304 state = self._position_state.get(code) 

305 if not state: 

306 state = HTFPositionState(buy_price, "", buy_price, buy_price) 

307 self._position_state[code] = state 

308 

309 resp = await self._sqs.get_current_price(code, caller=self.name) 

310 if not resp or resp.rt_cd != "0": 

311 continue 

312 

313 output = resp.data.get("output") if isinstance(resp.data, dict) else None 

314 if not output: 

315 continue 

316 

317 if isinstance(output, dict): 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true

318 current = int(output.get("stck_prpr", 0)) 

319 else: 

320 current = int(getattr(output, "stck_prpr", 0) or 0) 

321 

322 if current <= 0: 

323 continue 

324 

325 # 최고가 갱신 (dirty flag — 루프 후 1회 저장) 

326 if current > state.peak_price: 

327 state.peak_price = current 

328 state_dirty = True 

329 

330 pnl = (current - buy_price) / buy_price * 100 

331 reason = "" 

332 

333 # 1. 칼손절 

334 if pnl <= self._cfg.stop_loss_pct: 

335 reason = f"칼손절({pnl:.1f}%)" 

336 

337 # 2. 10일 MA 트레일링스탑 

338 if not reason: 

339 is_break, break_reason = await self._check_trailing_ma_stop(code, current) 

340 if is_break: 

341 reason = break_reason 

342 

343 # 매도 시그널 생성 (전량 매도) 

344 if reason: 

345 holding_qty = int(hold.get("qty", 1)) 

346 self._position_state.pop(code, None) 

347 state_dirty = True 

348 signals.append(TradeSignal( 

349 code=code, name=hold.get("name", code), action="SELL", 

350 price=current, qty=holding_qty, reason=reason, strategy_name=self.name, 

351 )) 

352 

353 if state_dirty: 

354 self._save_state() 

355 return signals 

356 

357 async def _check_trailing_ma_stop(self, code: str, current_price: int) -> tuple: 

358 """10일 MA 트레일링스탑 체크.""" 

359 period = self._cfg.trailing_ma_period 

360 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=period) 

361 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == "0" else [] 

362 if not ohlcv or len(ohlcv) < period: 

363 return False, "" 

364 

365 closes = [r.get("close", 0) for r in ohlcv if r.get("close")] 

366 if len(closes) < period: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 return False, "" 

368 

369 ma = sum(closes[-period:]) / period 

370 if current_price < ma: 

371 return True, f"트레일링스탑(10MA {ma:,.0f} 하향이탈)" 

372 return False, "" 

373 

374 # ── 헬퍼 ───────────────────────────────────────────────────────── 

375 

376 def _calculate_qty(self, price: int) -> int: 

377 if price <= 0: 

378 return self._cfg.min_qty 

379 budget = self._cfg.total_portfolio_krw * (self._cfg.position_size_pct / 100) 

380 return max(int(budget / price), self._cfg.min_qty) 

381 

382 def _get_market_progress_ratio(self) -> float: 

383 now = self._tm.get_current_kst_time() 

384 open_t = self._tm.get_market_open_time() 

385 close_t = self._tm.get_market_close_time() 

386 total = (close_t - open_t).total_seconds() 

387 elapsed = (now - open_t).total_seconds() 

388 return min(elapsed / total, 1.0) if total > 0 else 0.0 

389 

390 def _load_state(self): 

391 if os.path.exists(self.STATE_FILE): 

392 try: 

393 with open(self.STATE_FILE, "r") as f: 

394 data = json.load(f) 

395 for k, v in data.items(): 

396 self._position_state[k] = HTFPositionState(**v) 

397 except Exception: 

398 pass 

399 

400 def _save_state(self): 

401 try: 

402 os.makedirs(os.path.dirname(self.STATE_FILE), exist_ok=True) 

403 data = {k: asdict(v) for k, v in self._position_state.items()} 

404 with open(self.STATE_FILE, "w") as f: 

405 json.dump(data, f, indent=2) 

406 except Exception: 

407 pass