Coverage for services / oneil_universe_service.py: 91%

442 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/oneil/universe_service.py 

2import asyncio 

3import json 

4import logging 

5import os 

6import time 

7from datetime import datetime 

8from dataclasses import asdict 

9from typing import Dict, List, Optional 

10 

11from common.types import ErrorCode 

12from services.stock_query_service import StockQueryService 

13from services.indicator_service import IndicatorService 

14from repositories.stock_code_repository import StockCodeRepository 

15from services.naver_finance_scraper_service import NaverFinanceScraperService 

16from core.market_clock import MarketClock 

17from strategies.oneil_common_types import OneilUniverseConfig, OSBWatchlistItem 

18from core.logger import get_strategy_logger 

19from core.performance_profiler import PerformanceProfiler 

20from services.price_subscription_service import SubscriptionPriority 

21 

22def _chunked(lst, size): 

23 for i in range(0, len(lst), size): 

24 yield lst[i:i + size] 

25 

26 

27class OneilUniverseService: 

28 """오닐 전략 유니버스 관리 서비스. 

29  

30 역할: 

31 1. 전일 기준 우량주 생성 및 로드 (Pool A) 

32 2. 당일 급등주 실시간 발굴 (Pool B) 

33 3. Watchlist (감시 대상 60종목) 병합 및 제공 

34 4. 마켓 타이밍 판단 

35 """ 

36 

37 def __init__( 

38 self, 

39 stock_query_service: StockQueryService, 

40 indicator_service: IndicatorService, 

41 stock_code_repository: StockCodeRepository, 

42 market_clock: MarketClock, 

43 scraper_service: Optional[NaverFinanceScraperService] = None, # 추가됨 

44 config: Optional[OneilUniverseConfig] = None, 

45 logger: Optional[logging.Logger] = None, 

46 performance_profiler: Optional[PerformanceProfiler] = None, 

47 price_subscription_service=None 

48 ): 

49 self._sqs = stock_query_service 

50 self._indicator = indicator_service 

51 self.stock_code_repository = stock_code_repository 

52 self._tm = market_clock 

53 self._scraper = scraper_service 

54 self._cfg = config or OneilUniverseConfig() 

55 self._logger = logger or logging.getLogger(__name__) 

56 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

57 self._price_sub_svc = price_subscription_service 

58 

59 # 상태 관리 

60 self._watchlist: Dict[str, OSBWatchlistItem] = {} 

61 self._watchlist_date: str = "" 

62 self._watchlist_refresh_done: set = set() 

63 self._pool_a_loaded: bool = False 

64 self._pool_a_items: Dict[str, OSBWatchlistItem] = {} 

65 

66 # 마켓 타이밍 캐시 

67 self._market_timing_cache: Dict[str, bool] = {} 

68 self._market_timing_date: str = "" 

69 

70 # 전일 기준 우량주 생성 진행률 

71 self._generation_progress: Dict = { 

72 "running": False, 

73 "phase": None, 

74 "processed": 0, 

75 "total": 0, 

76 "passed": 0, 

77 "selected": 0, 

78 "elapsed": 0.0, 

79 } 

80 

81 @property 

82 def generation_progress(self) -> Dict: 

83 """전일 기준 우량주 생성 진행률 스냅샷 반환.""" 

84 return dict(self._generation_progress) 

85 

86 async def get_watchlist(self, logger: Optional[logging.Logger] = None) -> Dict[str, OSBWatchlistItem]: 

87 """현재 유효한 워치리스트를 반환 (캐싱 + 자동 갱신).""" 

88 logger = logger or self._logger 

89 today = self._tm.get_current_kst_time().strftime("%Y%m%d") 

90 

91 # 날짜 변경 시 초기화 

92 if self._watchlist_date != today: 

93 self._watchlist_refresh_done = set() 

94 self._pool_a_loaded = False 

95 self._pool_a_items = {} 

96 await self._build_watchlist(logger=logger) 

97 self._watchlist_date = today 

98 self._market_timing_date = "" # 마켓타이밍도 재확인 필요 

99 

100 # 초기화 시점에도 현재 시간 기준 이미 지난 갱신 주기는 완료 처리하여 중복 갱신 방지 

101 self._should_refresh_watchlist() 

102 

103 # 장중 갱신 주기 체크 

104 elif self._should_refresh_watchlist(): 

105 await self._build_watchlist(logger=logger) 

106 

107 if self._price_sub_svc and self._watchlist: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 asyncio.create_task(self._price_sub_svc.sync_subscriptions( 

109 codes=list(self._watchlist.keys()), 

110 category_key="strategy_oneil", 

111 priority=SubscriptionPriority.MEDIUM, 

112 )) 

113 return self._watchlist 

114 

115 async def is_market_timing_ok(self, market: str, logger: Optional[logging.Logger] = None) -> bool: 

116 """해당 시장(KOSPI/KOSDAQ)의 마켓 타이밍이 매수 적합한지 확인.""" 

117 logger = logger or self._logger 

118 today = self._tm.get_current_kst_time().strftime("%Y%m%d") 

119 if self._market_timing_date != today: 

120 await self._update_market_timing(logger=logger) 

121 self._market_timing_date = today 

122 

123 return self._market_timing_cache.get(market, False) 

124 

125 # ── 워치리스트 빌드 ──────────────────────────────────────────── 

126 

127 async def _build_watchlist(self, logger: Optional[logging.Logger] = None): 

128 """Pool A + Pool B 병합 -> 스코어링 -> 상위 N개 선정.""" 

129 logger = logger or self._logger 

130 t_start = self.pm.start_timer() 

131 logger.info({"event": "build_watchlist_started"}) 

132 

133 # 1) Pool A 로드 

134 if not self._pool_a_loaded: 

135 raw = self._load_premium_stocks() 

136 self._pool_a_items = {item.code: item for item in raw} 

137 self._pool_a_loaded = True 

138 

139 # 2) 당일 급등주 빌드 (실시간 랭킹) 

140 pool_b_items = await self._build_daily_surge_pool(logger=logger) 

141 

142 # 3) 병합 

143 merged: Dict[str, OSBWatchlistItem] = dict(self._pool_a_items) 

144 for code, item in pool_b_items.items(): 

145 if code not in merged: 

146 merged[code] = item 

147 

148 # 4) 정렬 및 절삭 

149 sorted_items = sorted( 

150 merged.values(), 

151 key=lambda x: (x.total_score, self._calc_turnover_ratio(x)), 

152 reverse=True, 

153 ) 

154 

155 # 스코어링 후 정렬된 상위 종목 로그 

156 top_n_for_log = 10 

157 logger.debug({ 

158 "event": "watchlist_sorted", 

159 "top_n": top_n_for_log, 

160 "items": [ 

161 { 

162 "code": i.code, "name": i.name, "total_score": i.total_score, 

163 "rs_score": i.rs_score, "profit_score": i.profit_growth_score, 

164 "turnover": round(self._calc_turnover_ratio(i), 4) 

165 } 

166 for i in sorted_items[:top_n_for_log] 

167 ] 

168 }) 

169 

170 self._watchlist = { 

171 item.code: item for item in sorted_items[:self._cfg.max_watchlist] 

172 } 

173 logger.info({ 

174 "event": "build_watchlist_finished", 

175 "premium_stocks": len(self._pool_a_items), 

176 "daily_surge_stocks": len(pool_b_items), 

177 "final_count": len(self._watchlist) 

178 }) 

179 self.pm.log_timer("OneilUniverseService._build_watchlist", t_start, threshold=5.0) 

180 

181 async def _build_daily_surge_pool(self, logger: Optional[logging.Logger] = None) -> Dict[str, OSBWatchlistItem]: 

182 """당일 급등주: 실시간 랭킹 기반 종목 발굴.""" 

183 logger = logger or self._logger 

184 t_start = self.pm.start_timer() 

185 # 3가지 랭킹 병합 

186 trading_val_resp, rise_resp, volume_resp = await asyncio.gather( 

187 self._sqs.get_top_trading_value_stocks(), 

188 self._sqs.get_top_rise_fall_stocks(rise=True), 

189 self._sqs.get_top_volume_stocks(), 

190 return_exceptions=True, 

191 ) 

192 

193 candidate_map = {} 

194 for resp in [trading_val_resp, rise_resp, volume_resp]: 

195 if isinstance(resp, Exception) or not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

196 continue 

197 for stock in (resp.data or []): 

198 if isinstance(stock, dict): 

199 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or "" 

200 name = stock.get("hts_kor_isnm", "") 

201 else: 

202 code = getattr(stock, "mksc_shrn_iscd", "") or getattr(stock, "stck_shrn_iscd", "") 

203 name = getattr(stock, "hts_kor_isnm", "") 

204 if code: 204 ↛ 197line 204 didn't jump to line 197 because the condition on line 204 was always true

205 candidate_map[code] = name 

206 

207 # 분석 및 필터링 

208 items = [] 

209 skip_codes = set(self._pool_a_items.keys()) | set(self._watchlist.keys()) 

210 

211 # [성능 개선] 순차 처리 -> 청크 단위 병렬 처리 (asyncio.gather) 

212 candidates = [(c, n) for c, n in candidate_map.items() if c not in skip_codes] 

213 

214 for chunk in _chunked(candidates, self._cfg.api_chunk_size): 

215 tasks = [self._analyze_candidate(code, name, logger=logger) for code, name in chunk] 

216 results = await asyncio.gather(*tasks, return_exceptions=True) 

217 

218 for res in results: 

219 if isinstance(res, Exception) or res is None: 

220 continue 

221 items.append(res) 

222 # 레이트 리밋 고려하여 약간의 대기 (필요 시) 

223 await asyncio.sleep(0.1) 

224 

225 # 스코어링 

226 self._compute_rs_scores(items, logger=logger) 

227 # 2. 실적(스크래핑) 및 과거 3일 수급(API)은 장중 병목 방지 및  

228 # 당일 첫 급등주(Day-1) 포착을 위해 장 중에는 생략! 

229 # await self._compute_profit_growth_scores(items, logger=logger) 

230 # await self._compute_smart_money_scores(items, logger=logger) 

231 self._compute_total_scores(items, logger=logger) 

232 

233 # 상위 N개 

234 items.sort(key=lambda x: (x.total_score, self._calc_turnover_ratio(x)), reverse=True) 

235 

236 # Pool B 스코어링 후 정렬된 상위 종목 로그 

237 top_n_for_log = 10 

238 logger.debug({ 

239 "event": "daily_surge_pool_sorted", 

240 "top_n": top_n_for_log, 

241 "items": [ 

242 { 

243 "code": i.code, "name": i.name, "total_score": i.total_score, 

244 "rs_score": i.rs_score, "profit_score": i.profit_growth_score, 

245 "turnover": round(self._calc_turnover_ratio(i), 4) 

246 } 

247 for i in items[:top_n_for_log] 

248 ] 

249 }) 

250 

251 self.pm.log_timer("OneilUniverseService._build_daily_surge_pool", t_start, threshold=3.0) 

252 return {item.code: item for item in items[:self._cfg.daily_surge_size]} 

253 

254 async def _analyze_candidate(self, code: str, name: str, logger: Optional[logging.Logger] = None) -> Optional[OSBWatchlistItem]: 

255 """개별 종목 분석 (OHLCV, BB, RS 등).""" 

256 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=90) 

257 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

258 

259 if not ohlcv: 

260 if logger: logger.debug({"event": "drop", "code": code, "reason": "no_ohlcv"}) 

261 return None 

262 

263 period = self._cfg.high_breakout_period 

264 closes = [r.get("close", 0) for r in ohlcv if r.get("close") is not None] 

265 if len(closes) < 50: 

266 if logger: logger.debug({"event": "drop", "code": code, "reason": "insufficient_data_len", "len": len(closes)}) 

267 return None 

268 

269 highs = [r.get("high", 0) for r in ohlcv[-period:] if r.get("high") is not None] 

270 volumes = [r.get("volume", 0) for r in ohlcv[-period:] if r.get("volume") is not None] 

271 

272 if not highs or not volumes: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 if logger: logger.debug({"event": "drop", "code": code, "reason": "missing_high_or_volume"}) 

274 return None 

275 

276 ma_20d = sum(closes[-20:]) / 20 

277 ma_50d = sum(closes[-50:]) / 50 

278 high_20d = int(max(highs)) 

279 avg_vol_20d = sum(volumes) / len(volumes) 

280 prev_close = closes[-1] 

281 

282 # 필터: 거래대금, 정배열 

283 recent_5 = ohlcv[-5:] 

284 tv_5d = sum([(r.get("volume", 0) * r.get("close", 0)) for r in recent_5]) / len(recent_5) 

285 if tv_5d < self._cfg.min_avg_trading_value_5d: 

286 if logger: logger.debug({"event": "drop", "code": code, "reason": "low_trading_value", "value": tv_5d}) 

287 return None 

288 if not (prev_close > ma_20d > ma_50d): 

289 if logger: logger.debug({"event": "drop", "code": code, "reason": "not_uptrend", "close": prev_close, "ma20": ma_20d, "ma50": ma_50d}) 

290 return None 

291 

292 if logger: logger.debug({"event": "pass_trend", "code": code, "reason": "uptrend_and_volume_ok"}) 

293 

294 # 필터: 52주 고가 근접 

295 full_resp = await self._sqs.get_current_price(code, caller="OneilUniverseService") 

296 if not full_resp or full_resp.rt_cd != ErrorCode.SUCCESS.value: 

297 if logger: logger.debug({"event": "drop", "code": code, "reason": "current_price_api_fail"}) 

298 return None 

299 output = full_resp.data.get("output") if full_resp.data else None 

300 if not output: 

301 if logger: logger.debug({"event": "drop", "code": code, "reason": "no_price_output"}) 

302 return None 

303 

304 if isinstance(output, dict): 

305 w52_hgpr = int(output.get("w52_hgpr") or 0) 

306 # hts_avls: 시가총액(억), stck_llam: 상장주식수(주) - 시가총액 우선 사용 및 억 단위 보정 

307 cap_billion = int(output.get("hts_avls") or output.get("stck_llam") or 0) 

308 else: 

309 w52_hgpr = int(getattr(output, "w52_hgpr", 0) or 0) 

310 cap_billion = int(getattr(output, "hts_avls", 0) or getattr(output, "stck_llam", 0) or 0) 

311 stck_llam = cap_billion * 100_000_000 # 억 단위 -> 원 단위 변환 

312 

313 # 필터: 시가총액 (2천억 ~ 2조) 

314 if not (self._cfg.premium_stocks_cap_min <= stck_llam <= self._cfg.premium_stocks_cap_max): 

315 if logger: logger.debug({"event": "drop", "code": code, "reason": "market_cap_out_of_range", "cap": stck_llam}) 

316 return None 

317 

318 dist = 0 

319 if w52_hgpr > 0: 

320 dist = ((w52_hgpr - prev_close) / w52_hgpr) * 100 

321 if dist > self._cfg.near_52w_high_pct: 

322 if logger: logger.debug({"event": "drop", "code": code, "reason": "far_from_52w_high", "dist": dist}) 

323 return None 

324 

325 if logger: logger.debug({"event": "pass_52w", "code": code, "dist": dist}) 

326 

327 # BB 스퀴즈 (동기 계산: async/await 오버헤드 제거) 

328 widths = self._indicator.calc_bb_widths_sync( 

329 ohlcv, period=self._cfg.bb_period, multiplier=self._cfg.multiplier 

330 ) 

331 

332 if len(widths) < period: 

333 if logger: logger.debug({"event": "drop", "code": code, "reason": "insufficient_bb_data"}) 

334 return None 

335 

336 bb_min = min(widths[-period:]) 

337 prev_width = widths[-1] 

338 

339 # 스퀴즈 조건 체크 (전일 BB폭 <= 20일 최소폭 * 1.2) 

340 if prev_width > bb_min * self._cfg.squeeze_tolerance: 

341 if logger: logger.debug({ 

342 "event": "drop", "code": code, "reason": "no_squeeze", 

343 "prev_width": prev_width, "bb_min": bb_min, 

344 "ratio": round(prev_width / bb_min, 2) if bb_min > 0 else 0 

345 }) 

346 return None 

347 

348 if logger: logger.debug({ 

349 "event": "pass_squeeze", "code": code, 

350 "prev_width": prev_width, "bb_min": bb_min 

351 }) 

352 

353 # RS 계산 (동기 계산: async/await 오버헤드 제거) 

354 rs_return = self._indicator.calc_rs_sync( 

355 ohlcv, period_days=self._cfg.rs_period_days 

356 ) 

357 

358 market = "KOSDAQ" if self.stock_code_repository.is_kosdaq(code) else "KOSPI" 

359 

360 if logger: logger.debug({"event": "selected", "code": code, "name": name}) 

361 

362 return OSBWatchlistItem( 

363 code=code, name=name, market=market, 

364 high_20d=high_20d, ma_20d=ma_20d, ma_50d=ma_50d, 

365 avg_vol_20d=avg_vol_20d, bb_width_min_20d=bb_min, prev_bb_width=prev_width, 

366 w52_hgpr=w52_hgpr, avg_trading_value_5d=tv_5d, market_cap=stck_llam, 

367 rs_return_3m=rs_return 

368 ) 

369 

370 # ── 전일 기준 우량주 생성 (배치) ───────────────────────────────────────── 

371 

372 async def generate_premium_watchlist(self, trading_date: Optional[str] = None) -> dict: 

373 """전체 종목 스캔 -> 전일 기준 우량주 생성 및 파일 저장. 

374 

375 Args: 

376 trading_date: 기준 거래일(YYYYMMDD). 지정하면 파일의 generated_date로 저장. 

377 None이면 현재 날짜를 사용 (직접 호출 시 하위 호환). 

378 """ 

379 # 전용 로거 생성 (logs/strategies/oneil/YYYYMMDD_HHMMSS_generate_premium_watchlist.log.json) 

380 pool_a_logger = get_strategy_logger("generate_premium_watchlist", sub_dir="oneil_pool") 

381 pool_a_logger.setLevel(logging.DEBUG) 

382 

383 self._logger.info({"event": "generate_premium_watchlist_started"}) 

384 pool_a_logger.info({"event": "generate_premium_watchlist_started"}) 

385 

386 start_time = time.time() 

387 start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) 

388 

389 # 1. 전체 종목 로드 

390 all_stocks = [] 

391 for _, row in self.stock_code_repository.df.iterrows(): 

392 code = row.get("종목코드", "") 

393 name = row.get("종목명", "") 

394 market = row.get("시장구분", "") 

395 if code and market in ("KOSPI", "KOSDAQ"): 395 ↛ 391line 395 didn't jump to line 391 because the condition on line 395 was always true

396 all_stocks.append((code, name, market)) 

397 

398 total_stocks = len(all_stocks) 

399 print(f"[전일 기준 우량주 생성] 시작시간: {start_time_str} | 전체 종목 수: {total_stocks}개. 1차 필터링(시총) 시작...") 

400 pool_a_logger.info({"event": "1st_filter_start", "total_stocks": total_stocks}) 

401 self._generation_progress = { 

402 "running": True, "phase": "1차_필터(시총)", 

403 "processed": 0, "total": total_stocks, 

404 "passed": 0, "selected": 0, "elapsed": 0.0, 

405 } 

406 

407 # 2. 1차 필터 (시총) 

408 passed_first = [] 

409 processed_count = 0 

410 for chunk in _chunked(all_stocks, self._cfg.api_chunk_size): 

411 tasks = [self._sqs.get_current_price(c, caller="OneilUniverseService") for c, _, _ in chunk] 

412 results = await asyncio.gather(*tasks, return_exceptions=True) 

413 for (code, name, market), resp in zip(chunk, results): 

414 if isinstance(resp, Exception) or not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

415 error_msg = str(resp) if isinstance(resp, Exception) else (getattr(resp, 'msg1', 'No response') if resp else 'Empty response') 

416 pool_a_logger.warning({ 

417 "event": "api_error_1st_filter", 

418 "code": code, 

419 "name": name, 

420 "error": error_msg 

421 }) 

422 continue 

423 out = resp.data.get("output") if resp.data else None 

424 if not out: continue 424 ↛ 413line 424 didn't jump to line 413 because the continue on line 424 wasn't executed

425 

426 if isinstance(out, dict): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 val_avls = out.get("hts_avls") 

428 val_llam = out.get("stck_llam") 

429 else: 

430 val_avls = getattr(out, "hts_avls", None) 

431 val_llam = getattr(out, "stck_llam", None) 

432 

433 if val_avls: 

434 cap = int(val_avls) * 100_000_000 

435 else: 

436 # Fallback: stck_llam 사용 (테스트 호환성 위해 큰 값은 원 단위로 처리) 

437 val = int(val_llam or 0) 

438 cap = val if val > 100_000_000 else val * 100_000_000 

439 

440 if self._cfg.premium_stocks_cap_min <= cap <= self._cfg.premium_stocks_cap_max: 

441 passed_first.append((code, name, market)) 

442 pool_a_logger.debug({"event": "pass_1st", "code": code, "name": name, "market_cap(억)": cap/100_000_000}) 

443 else: 

444 # pool_a_logger.debug({"event": "drop_1st", "code": code, "reason": "market_cap", "cap": cap}) 

445 pass 

446 

447 processed_count += len(chunk) 

448 if processed_count % 100 == 0 or processed_count >= total_stocks: 448 ↛ 410line 448 didn't jump to line 410 because the condition on line 448 was always true

449 pct = (processed_count / total_stocks * 100) if total_stocks > 0 else 0.0 

450 elapsed = time.time() - start_time 

451 print(f" > [1차 필터] 진행: {processed_count}/{total_stocks} ({pct:.1f}%) | 통과: {len(passed_first)} | 소요: {elapsed:.1f}s") 

452 pool_a_logger.info({"event": "1st_filter_progress", "processed": processed_count, "total": total_stocks, "passed": len(passed_first)}) 

453 self._generation_progress.update({ 

454 "processed": processed_count, "passed": len(passed_first), "elapsed": round(elapsed, 1), 

455 }) 

456 

457 

458 print(f"[전일 기준 우량주 생성] 1차 필터 완료. 통과: {len(passed_first)}개. 2차 상세 분석(OHLCV/지표) 시작...") 

459 pool_a_logger.info({"event": "1st_filter_done", "passed": len(passed_first)}) 

460 pool_a_logger.info({"event": "2nd_filter_start", "total_candidates": len(passed_first)}) 

461 self._generation_progress.update({ 

462 "phase": "2차_필터(지표)", "processed": 0, "total": len(passed_first), "selected": 0, 

463 }) 

464 

465 # 3. 2차 필터 (상세 분석) 

466 items = [] 

467 total_passed = len(passed_first) 

468 processed_count_2 = 0 

469 for chunk in _chunked(passed_first, self._cfg.api_chunk_size): 

470 for code, name, market in chunk: 

471 item = await self._analyze_candidate(code, name, logger=pool_a_logger) 

472 if item: 472 ↛ 470line 472 didn't jump to line 470 because the condition on line 472 was always true

473 items.append(item) 

474 

475 processed_count_2 += len(chunk) 

476 if processed_count_2 % 50 == 0 or processed_count_2 >= total_passed: 476 ↛ 469line 476 didn't jump to line 469 because the condition on line 476 was always true

477 pct2 = (processed_count_2 / total_passed * 100) if total_passed > 0 else 0.0 

478 elapsed = time.time() - start_time 

479 print(f" > [2차 필터] 진행: {processed_count_2}/{total_passed} ({pct2:.1f}%) | 선정: {len(items)} | 소요: {elapsed:.1f}s") 

480 pool_a_logger.info({"event": "2nd_filter_progress", "processed": processed_count_2, "total": total_passed, "selected": len(items)}) 

481 self._generation_progress.update({ 

482 "processed": processed_count_2, "selected": len(items), "elapsed": round(elapsed, 1), 

483 }) 

484 

485 

486 pool_a_logger.info({"event": "2nd_filter_done", "selected": len(items)}) 

487 self._generation_progress.update({"phase": "스코어링"}) 

488 

489 # 4. 스코어링 및 저장 

490 self._compute_rs_scores(items, logger=pool_a_logger) 

491 await self._compute_profit_growth_scores(items, logger=pool_a_logger) 

492 await self._compute_smart_money_scores(items, logger=pool_a_logger, date=trading_date) 

493 self._compute_total_scores(items, logger=pool_a_logger) 

494 pool_a_logger.info({"event": "scoring_done"}) 

495 

496 sort_key = lambda x: (x.total_score, self._calc_turnover_ratio(x)) 

497 kospi = sorted([i for i in items if i.market != "KOSDAQ"], key=sort_key, reverse=True)[:self._cfg.premium_stocks_kospi_size] 

498 kosdaq = sorted([i for i in items if i.market == "KOSDAQ"], key=sort_key, reverse=True)[:self._cfg.premium_stocks_kosdaq_size] 

499 

500 self._save_premium_stocks(kospi, kosdaq, trading_date=trading_date) 

501 pool_a_logger.info({"event": "save_done", "kospi_count": len(kospi), "kosdaq_count": len(kosdaq)}) 

502 

503 total_elapsed = time.time() - start_time 

504 print(f"[전일 기준 우량주 생성] 완료. 총 소요시간: {total_elapsed:.1f}초") 

505 pool_a_logger.info({"event": "generate_premium_watchlist_finished", "elapsed_seconds": total_elapsed}) 

506 self._generation_progress.update({"running": False, "phase": None, "elapsed": round(total_elapsed, 1)}) 

507 

508 # 시총 범위 문자열 생성 (예: 2000억 ~ 2조) 

509 min_cap = self._cfg.premium_stocks_cap_min // 100000000 

510 max_cap = self._cfg.premium_stocks_cap_max // 100000000 

511 cap_str = f"{min_cap}억 ~ {max_cap}억" 

512 if self._cfg.premium_stocks_cap_max >= 1000000000000: 512 ↛ 515line 512 didn't jump to line 515 because the condition on line 512 was always true

513 cap_str = f"{min_cap}억 ~ {self._cfg.premium_stocks_cap_max // 1000000000000}조" 

514 

515 return { 

516 "kospi_count": len(kospi), "kosdaq_count": len(kosdaq), 

517 "total_scanned": len(all_stocks), "scanned": len(all_stocks), 

518 "passed_first": len(passed_first), "first_filter_passed": len(passed_first), 

519 "second_filter_passed": len(items), 

520 "market_cap_filter": cap_str, 

521 "total_elapsed_seconds": total_elapsed 

522 } 

523 

524 # ── 헬퍼 메서드 ─────────────────────────────────────────────── 

525 

526 def _should_refresh_watchlist(self) -> bool: 

527 now = self._tm.get_current_kst_time() 

528 open_time = self._tm.get_market_open_time() 

529 elapsed = (now - open_time).total_seconds() / 60 

530 

531 triggered = False 

532 for t_min in self._cfg.watchlist_refresh_minutes: 

533 if elapsed >= t_min and t_min not in self._watchlist_refresh_done: 

534 self._watchlist_refresh_done.add(t_min) 

535 triggered = True 

536 return triggered 

537 

538 async def _update_market_timing(self, logger: Optional[logging.Logger] = None): 

539 logger = logger or self._logger 

540 for market, code in [("KOSDAQ", self._cfg.kosdaq_etf_code), ("KOSPI", self._cfg.kospi_etf_code)]: 

541 self._market_timing_cache[market] = await self._check_etf_ma_rising(code, logger=logger) 

542 

543 async def _check_etf_ma_rising(self, etf_code: str, logger: Optional[logging.Logger] = None) -> bool: 

544 logger = logger or self._logger 

545 period = self._cfg.market_ma_period 

546 days = self._cfg.market_ma_rising_days 

547 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(etf_code, limit=period + days + 5) 

548 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

549 

550 if not ohlcv or len(ohlcv) < period + days: # This check should be based on the actual 'closes' list length 

551 return False 

552 

553 closes = [r.get("close", 0) for r in ohlcv] # Changed: Do not filter out 0 values 

554 ma_values = [] 

555 for i in range(days + 1): 

556 end = len(closes) - days + i 

557 ma_values.append(sum(closes[end-period:end]) / period) 

558 

559 is_rising = True 

560 fail_detail = "" 

561 for j in range(1, len(ma_values)): 

562 if ma_values[j] <= ma_values[j-1]: 

563 is_rising = False 

564 fail_detail = f"MA decline: {ma_values[j-1]:.2f} -> {ma_values[j]:.2f} (idx {j})" 

565 break 

566 

567 log_data = { 

568 "event": "market_timing_check", 

569 "etf_code": etf_code, 

570 "is_rising": is_rising, 

571 "ma_values": [round(v, 2) for v in ma_values] 

572 } 

573 if not is_rising: 

574 log_data["fail_detail"] = fail_detail 

575 

576 logger.debug(log_data) 

577 

578 return is_rising 

579 

580 def _compute_rs_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None): 

581 logger = logger or self._logger 

582 if not items: return 

583 logger.debug({"event": "compute_rs_scores_started", "item_count": len(items)}) 

584 rets = sorted([i.rs_return_3m for i in items]) 

585 

586 # 백분위수 계산 (상위 10% -> 90 백분위수) 

587 percentile_index = min(int(len(rets) * (1 - self._cfg.rs_top_percentile / 100)), len(rets) - 1) 

588 cutoff = rets[percentile_index] 

589 

590 logger.debug({ 

591 "event": "rs_score_calculation_details", 

592 "item_count": len(items), 

593 "top_percentile_config": self._cfg.rs_top_percentile, 

594 "cutoff_return": round(cutoff, 2), 

595 "returns_distribution": { 

596 "min": round(rets[0], 2), 

597 "p25": round(rets[int(len(rets) * 0.25)], 2), 

598 "median": round(rets[int(len(rets) * 0.5)], 2), 

599 "p75": round(rets[int(len(rets) * 0.75)], 2), 

600 "max": round(rets[-1], 2) 

601 } 

602 }) 

603 

604 for item in items: 

605 is_top_tier = item.rs_return_3m >= cutoff 

606 item.rs_score = self._cfg.rs_score_points if is_top_tier else 0.0 

607 if is_top_tier: 

608 logger.debug({ 

609 "event": "rs_score_assigned", "code": item.code, "name": item.name, 

610 "return_3m": round(item.rs_return_3m, 2), "score": item.rs_score 

611 }) 

612 logger.debug({"event": "compute_rs_scores_finished"}) 

613 

614 async def _compute_profit_growth_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None): 

615 logger = logger or self._logger 

616 if not items: return 616 ↛ exitline 616 didn't return from function '_compute_profit_growth_scores' because the return on line 616 wasn't executed

617 logger.debug({"event": "compute_profit_growth_scores_started", "item_count": len(items)}) 

618 

619 for chunk in _chunked(items, self._cfg.api_chunk_size): 

620 # API 대신 스크래퍼의 메서드를 호출 

621 tasks = [self._scraper.fetch_yoy_profit_growth(i.code) for i in chunk] 

622 results = await asyncio.gather(*tasks, return_exceptions=True) 

623 

624 for item, growth in zip(chunk, results): 

625 if isinstance(growth, Exception): 

626 logger.warning({"event": "profit_growth_scraping_error", "code": item.code, "error": str(growth)}) 

627 item.profit_growth_score = 0.0 

628 continue 

629 

630 # 턴어라운드(999.0)이거나 설정한 한계치 이상의 성장이면 스코어 부여 

631 if growth >= self._cfg.profit_growth_threshold_pct or growth == 999.0: 

632 item.profit_growth_score = self._cfg.profit_growth_score_points 

633 logger.debug({ 

634 "event": "profit_growth_score_assigned", 

635 "code": item.code, 

636 "name": item.name, 

637 "growth_pct": "Turnaround" if growth == 999.0 else round(growth, 2), 

638 "score": item.profit_growth_score 

639 }) 

640 else: 

641 item.profit_growth_score = 0.0 

642 

643 logger.debug({"event": "compute_profit_growth_scores_finished"}) 

644 

645 async def _compute_smart_money_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None, date: Optional[str] = None): 

646 """3일 누적 외국인+기관 순매수금액 기반 스마트머니 스코어링. 

647 

648 조건 A: 3일 누적 (외국인 + 기관 순매수금액) >= 시총의 smart_money_to_mcap_pct% 

649 조건 B: 3일 누적 (외국인 + 기관 순매수금액) >= 3일 누적 총거래대금의 smart_money_to_tv_pct% 

650 A 또는 B 만족 시 smart_money_score_points 부여. 

651 

652 단위: frgn/orgn_ntby_tr_pbmn 은 백만원 → *1_000_000 = 원, acml_tr_pbmn 은 원. 

653 """ 

654 logger = logger or self._logger 

655 if not items: return 655 ↛ exitline 655 didn't return from function '_compute_smart_money_scores' because the return on line 655 wasn't executed

656 days = self._cfg.smart_money_lookback_days 

657 logger.debug({"event": "compute_smart_money_scores_started", "item_count": len(items), "lookback_days": days}) 

658 

659 for chunk in _chunked(items, self._cfg.api_chunk_size): 

660 tasks = [self._sqs.get_investor_trade_daily_multi(i.code, date, days) for i in chunk] 

661 results = await asyncio.gather(*tasks, return_exceptions=True) 

662 

663 for item, resp in zip(chunk, results): 

664 item.smart_money_score = 0.0 

665 if isinstance(resp, Exception): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 logger.warning({"event": "smart_money_api_error", "code": item.code, "error": str(resp)}) 

667 continue 

668 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value or not resp.data: 668 ↛ 671line 668 didn't jump to line 671 because the condition on line 668 was always true

669 continue 

670 

671 rows = resp.data # list of dicts, newest first 

672 sum_fi_won = 0.0 # 3일 누적 외국인+기관 순매수금액 (원) 

673 sum_tv_won = 0.0 # 3일 누적 총거래대금 (원) 

674 for row in rows: 

675 frgn = float(row.get("frgn_ntby_tr_pbmn", "0") or "0") 

676 orgn = float(row.get("orgn_ntby_tr_pbmn", "0") or "0") 

677 tv = float(row.get("acml_tr_pbmn", "0") or "0") 

678 sum_fi_won += (frgn + orgn) * 1_000_000 # 백만원 → 원 

679 sum_tv_won += tv # 이미 원 단위 

680 

681 mcap = float(item.market_cap) if item.market_cap else 0.0 

682 cond_a = mcap > 0 and sum_fi_won >= mcap * (self._cfg.smart_money_to_mcap_pct / 100.0) 

683 cond_b = sum_tv_won > 0 and sum_fi_won >= sum_tv_won * (self._cfg.smart_money_to_tv_pct / 100.0) 

684 

685 if cond_a or cond_b: 

686 item.smart_money_score = self._cfg.smart_money_score_points 

687 logger.debug({ 

688 "event": "smart_money_score_assigned", 

689 "code": item.code, "name": item.name, 

690 "sum_fi_억": round(sum_fi_won / 1e8, 2), 

691 "mcap_억": round(mcap / 1e8, 2), 

692 "sum_tv_억": round(sum_tv_won / 1e8, 2), 

693 "cond_a": cond_a, "cond_b": cond_b, 

694 "score": item.smart_money_score, 

695 }) 

696 

697 logger.debug({"event": "compute_smart_money_scores_finished"}) 

698 

699 def _compute_total_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None): 

700 logger = logger or self._logger 

701 if not items: return 

702 logger.debug({"event": "compute_total_scores_started", "item_count": len(items)}) 

703 for item in items: 

704 item.total_score = item.rs_score + item.profit_growth_score + item.smart_money_score 

705 if item.total_score > 0: 705 ↛ 703line 705 didn't jump to line 703 because the condition on line 705 was always true

706 logger.debug({ 

707 "event": "total_score_calculated", "code": item.code, "name": item.name, 

708 "rs_score": item.rs_score, "profit_score": item.profit_growth_score, 

709 "smart_money_score": item.smart_money_score, 

710 "total_score": item.total_score 

711 }) 

712 logger.debug({"event": "compute_total_scores_finished"}) 

713 

714 def _save_premium_stocks(self, kospi, kosdaq, trading_date: Optional[str] = None): 

715 """전일 기준 우량주를 파일에 저장한다. 

716 

717 Args: 

718 trading_date: 기준 거래일(YYYYMMDD). generated_date 필드에 기록. 

719 None이면 현재 날짜를 사용. 

720 """ 

721 try: 

722 os.makedirs(os.path.dirname(self._cfg.premium_stocks_file), exist_ok=True) 

723 now = self._tm.get_current_kst_time() 

724 data = { 

725 # generated_date: 어떤 거래일 기준으로 생성됐는지 (스킵 로직의 기준) 

726 "generated_date": trading_date or now.strftime("%Y%m%d"), 

727 # generated_at: 실제 파일을 저장한 시각 (주말/공휴일에 생성 가능) 

728 "generated_at": now.strftime("%Y-%m-%dT%H:%M:%S"), 

729 "kospi": [asdict(i) for i in kospi], 

730 "kosdaq": [asdict(i) for i in kosdaq] 

731 } 

732 with open(self._cfg.premium_stocks_file, "w", encoding="utf-8") as f: 

733 json.dump(data, f, ensure_ascii=False, indent=2) 

734 except Exception as e: 

735 self._logger.error(f"Failed to save premium stocks: {e}") 

736 

737 def _load_premium_stocks(self) -> List[OSBWatchlistItem]: 

738 if not os.path.exists(self._cfg.premium_stocks_file): 

739 return [] 

740 try: 

741 with open(self._cfg.premium_stocks_file, "r", encoding="utf-8") as f: 

742 data = json.load(f) 

743 # 날짜 체크 (오늘/어제만 유효) 

744 gen_date = data.get("generated_date", "") 

745 

746 try: 

747 gen_dt = datetime.strptime(gen_date, "%Y%m%d").date() 

748 curr_dt = self._tm.get_current_kst_time().date() 

749 # 7일 이내만 유효 (한국 최장 연휴 5일 + 여유) 

750 # generated_date는 거래일 기준이므로 월요일에 금요일 파일도 유효 

751 if (curr_dt - gen_dt).days > 7: 

752 return [] 

753 except ValueError: 

754 return [] 

755 

756 items = [] 

757 for k in ["kospi", "kosdaq"]: 

758 for d in data.get(k, []): 758 ↛ 759line 758 didn't jump to line 759 because the loop on line 758 never started

759 items.append(OSBWatchlistItem(**d)) 

760 return items 

761 except Exception: 

762 return [] 

763 

764 def get_premium_stocks_meta(self) -> Optional[dict]: 

765 """저장된 전일 기준 우량주 파일의 메타데이터 반환. 파일 없으면 None.""" 

766 if not os.path.exists(self._cfg.premium_stocks_file): 

767 return None 

768 try: 

769 with open(self._cfg.premium_stocks_file, "r", encoding="utf-8") as f: 

770 data = json.load(f) 

771 return { 

772 "generated_date": data.get("generated_date"), 

773 "generated_at": data.get("generated_at"), 

774 } 

775 except Exception: 

776 return None 

777 

778 @staticmethod 

779 def _calc_turnover_ratio(item: OSBWatchlistItem) -> float: 

780 """회전율 계산: (5일 평균 거래대금 / 시가총액). 동점자 처리용.""" 

781 return (item.avg_trading_value_5d / item.market_cap) if item.market_cap > 0 else 0 

782 

783 @staticmethod 

784 def _extract_op_profit_growth(data) -> float: 

785 """API 응답에서 영업이익 증가율 추출. 

786 

787 resp.data 구조: {"rt_cd": "0", "output": [{"stac_yymm": "...", "bsop_prfi_inrt": "...", ...}]} 

788 output 리스트의 첫 번째 항목(최신 분기)에서 영업이익 관련 필드를 탐색. 

789 """ 

790 try: 

791 # API 응답 dict에서 output 리스트 추출 

792 if isinstance(data, dict): 

793 output = data.get("output", data) 

794 else: 

795 output = data 

796 

797 target = output[0] if isinstance(output, list) and output else output 

798 if isinstance(target, dict): 

799 for k in ["bsop_prti_icdc", "sale_totl_prfi_icdc", "op_profit_growth", "bsop_prfi_inrt", "grs"]: 

800 if val := target.get(k): return float(val) 

801 except: pass 

802 return 0.0