Coverage for services / oneil_universe_service.py: 91%
442 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/oneil/universe_service.py
2import asyncio
3import json
4import logging
5import os
6import time
7from datetime import datetime
8from dataclasses import asdict
9from typing import Dict, List, Optional
11from common.types import ErrorCode
12from services.stock_query_service import StockQueryService
13from services.indicator_service import IndicatorService
14from repositories.stock_code_repository import StockCodeRepository
15from services.naver_finance_scraper_service import NaverFinanceScraperService
16from core.market_clock import MarketClock
17from strategies.oneil_common_types import OneilUniverseConfig, OSBWatchlistItem
18from core.logger import get_strategy_logger
19from core.performance_profiler import PerformanceProfiler
20from services.price_subscription_service import SubscriptionPriority
22def _chunked(lst, size):
23 for i in range(0, len(lst), size):
24 yield lst[i:i + size]
27class OneilUniverseService:
28 """오닐 전략 유니버스 관리 서비스.
30 역할:
31 1. 전일 기준 우량주 생성 및 로드 (Pool A)
32 2. 당일 급등주 실시간 발굴 (Pool B)
33 3. Watchlist (감시 대상 60종목) 병합 및 제공
34 4. 마켓 타이밍 판단
35 """
37 def __init__(
38 self,
39 stock_query_service: StockQueryService,
40 indicator_service: IndicatorService,
41 stock_code_repository: StockCodeRepository,
42 market_clock: MarketClock,
43 scraper_service: Optional[NaverFinanceScraperService] = None, # 추가됨
44 config: Optional[OneilUniverseConfig] = None,
45 logger: Optional[logging.Logger] = None,
46 performance_profiler: Optional[PerformanceProfiler] = None,
47 price_subscription_service=None
48 ):
49 self._sqs = stock_query_service
50 self._indicator = indicator_service
51 self.stock_code_repository = stock_code_repository
52 self._tm = market_clock
53 self._scraper = scraper_service
54 self._cfg = config or OneilUniverseConfig()
55 self._logger = logger or logging.getLogger(__name__)
56 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
57 self._price_sub_svc = price_subscription_service
59 # 상태 관리
60 self._watchlist: Dict[str, OSBWatchlistItem] = {}
61 self._watchlist_date: str = ""
62 self._watchlist_refresh_done: set = set()
63 self._pool_a_loaded: bool = False
64 self._pool_a_items: Dict[str, OSBWatchlistItem] = {}
66 # 마켓 타이밍 캐시
67 self._market_timing_cache: Dict[str, bool] = {}
68 self._market_timing_date: str = ""
70 # 전일 기준 우량주 생성 진행률
71 self._generation_progress: Dict = {
72 "running": False,
73 "phase": None,
74 "processed": 0,
75 "total": 0,
76 "passed": 0,
77 "selected": 0,
78 "elapsed": 0.0,
79 }
81 @property
82 def generation_progress(self) -> Dict:
83 """전일 기준 우량주 생성 진행률 스냅샷 반환."""
84 return dict(self._generation_progress)
86 async def get_watchlist(self, logger: Optional[logging.Logger] = None) -> Dict[str, OSBWatchlistItem]:
87 """현재 유효한 워치리스트를 반환 (캐싱 + 자동 갱신)."""
88 logger = logger or self._logger
89 today = self._tm.get_current_kst_time().strftime("%Y%m%d")
91 # 날짜 변경 시 초기화
92 if self._watchlist_date != today:
93 self._watchlist_refresh_done = set()
94 self._pool_a_loaded = False
95 self._pool_a_items = {}
96 await self._build_watchlist(logger=logger)
97 self._watchlist_date = today
98 self._market_timing_date = "" # 마켓타이밍도 재확인 필요
100 # 초기화 시점에도 현재 시간 기준 이미 지난 갱신 주기는 완료 처리하여 중복 갱신 방지
101 self._should_refresh_watchlist()
103 # 장중 갱신 주기 체크
104 elif self._should_refresh_watchlist():
105 await self._build_watchlist(logger=logger)
107 if self._price_sub_svc and self._watchlist: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 asyncio.create_task(self._price_sub_svc.sync_subscriptions(
109 codes=list(self._watchlist.keys()),
110 category_key="strategy_oneil",
111 priority=SubscriptionPriority.MEDIUM,
112 ))
113 return self._watchlist
115 async def is_market_timing_ok(self, market: str, logger: Optional[logging.Logger] = None) -> bool:
116 """해당 시장(KOSPI/KOSDAQ)의 마켓 타이밍이 매수 적합한지 확인."""
117 logger = logger or self._logger
118 today = self._tm.get_current_kst_time().strftime("%Y%m%d")
119 if self._market_timing_date != today:
120 await self._update_market_timing(logger=logger)
121 self._market_timing_date = today
123 return self._market_timing_cache.get(market, False)
125 # ── 워치리스트 빌드 ────────────────────────────────────────────
127 async def _build_watchlist(self, logger: Optional[logging.Logger] = None):
128 """Pool A + Pool B 병합 -> 스코어링 -> 상위 N개 선정."""
129 logger = logger or self._logger
130 t_start = self.pm.start_timer()
131 logger.info({"event": "build_watchlist_started"})
133 # 1) Pool A 로드
134 if not self._pool_a_loaded:
135 raw = self._load_premium_stocks()
136 self._pool_a_items = {item.code: item for item in raw}
137 self._pool_a_loaded = True
139 # 2) 당일 급등주 빌드 (실시간 랭킹)
140 pool_b_items = await self._build_daily_surge_pool(logger=logger)
142 # 3) 병합
143 merged: Dict[str, OSBWatchlistItem] = dict(self._pool_a_items)
144 for code, item in pool_b_items.items():
145 if code not in merged:
146 merged[code] = item
148 # 4) 정렬 및 절삭
149 sorted_items = sorted(
150 merged.values(),
151 key=lambda x: (x.total_score, self._calc_turnover_ratio(x)),
152 reverse=True,
153 )
155 # 스코어링 후 정렬된 상위 종목 로그
156 top_n_for_log = 10
157 logger.debug({
158 "event": "watchlist_sorted",
159 "top_n": top_n_for_log,
160 "items": [
161 {
162 "code": i.code, "name": i.name, "total_score": i.total_score,
163 "rs_score": i.rs_score, "profit_score": i.profit_growth_score,
164 "turnover": round(self._calc_turnover_ratio(i), 4)
165 }
166 for i in sorted_items[:top_n_for_log]
167 ]
168 })
170 self._watchlist = {
171 item.code: item for item in sorted_items[:self._cfg.max_watchlist]
172 }
173 logger.info({
174 "event": "build_watchlist_finished",
175 "premium_stocks": len(self._pool_a_items),
176 "daily_surge_stocks": len(pool_b_items),
177 "final_count": len(self._watchlist)
178 })
179 self.pm.log_timer("OneilUniverseService._build_watchlist", t_start, threshold=5.0)
181 async def _build_daily_surge_pool(self, logger: Optional[logging.Logger] = None) -> Dict[str, OSBWatchlistItem]:
182 """당일 급등주: 실시간 랭킹 기반 종목 발굴."""
183 logger = logger or self._logger
184 t_start = self.pm.start_timer()
185 # 3가지 랭킹 병합
186 trading_val_resp, rise_resp, volume_resp = await asyncio.gather(
187 self._sqs.get_top_trading_value_stocks(),
188 self._sqs.get_top_rise_fall_stocks(rise=True),
189 self._sqs.get_top_volume_stocks(),
190 return_exceptions=True,
191 )
193 candidate_map = {}
194 for resp in [trading_val_resp, rise_resp, volume_resp]:
195 if isinstance(resp, Exception) or not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
196 continue
197 for stock in (resp.data or []):
198 if isinstance(stock, dict):
199 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or ""
200 name = stock.get("hts_kor_isnm", "")
201 else:
202 code = getattr(stock, "mksc_shrn_iscd", "") or getattr(stock, "stck_shrn_iscd", "")
203 name = getattr(stock, "hts_kor_isnm", "")
204 if code: 204 ↛ 197line 204 didn't jump to line 197 because the condition on line 204 was always true
205 candidate_map[code] = name
207 # 분석 및 필터링
208 items = []
209 skip_codes = set(self._pool_a_items.keys()) | set(self._watchlist.keys())
211 # [성능 개선] 순차 처리 -> 청크 단위 병렬 처리 (asyncio.gather)
212 candidates = [(c, n) for c, n in candidate_map.items() if c not in skip_codes]
214 for chunk in _chunked(candidates, self._cfg.api_chunk_size):
215 tasks = [self._analyze_candidate(code, name, logger=logger) for code, name in chunk]
216 results = await asyncio.gather(*tasks, return_exceptions=True)
218 for res in results:
219 if isinstance(res, Exception) or res is None:
220 continue
221 items.append(res)
222 # 레이트 리밋 고려하여 약간의 대기 (필요 시)
223 await asyncio.sleep(0.1)
225 # 스코어링
226 self._compute_rs_scores(items, logger=logger)
227 # 2. 실적(스크래핑) 및 과거 3일 수급(API)은 장중 병목 방지 및
228 # 당일 첫 급등주(Day-1) 포착을 위해 장 중에는 생략!
229 # await self._compute_profit_growth_scores(items, logger=logger)
230 # await self._compute_smart_money_scores(items, logger=logger)
231 self._compute_total_scores(items, logger=logger)
233 # 상위 N개
234 items.sort(key=lambda x: (x.total_score, self._calc_turnover_ratio(x)), reverse=True)
236 # Pool B 스코어링 후 정렬된 상위 종목 로그
237 top_n_for_log = 10
238 logger.debug({
239 "event": "daily_surge_pool_sorted",
240 "top_n": top_n_for_log,
241 "items": [
242 {
243 "code": i.code, "name": i.name, "total_score": i.total_score,
244 "rs_score": i.rs_score, "profit_score": i.profit_growth_score,
245 "turnover": round(self._calc_turnover_ratio(i), 4)
246 }
247 for i in items[:top_n_for_log]
248 ]
249 })
251 self.pm.log_timer("OneilUniverseService._build_daily_surge_pool", t_start, threshold=3.0)
252 return {item.code: item for item in items[:self._cfg.daily_surge_size]}
254 async def _analyze_candidate(self, code: str, name: str, logger: Optional[logging.Logger] = None) -> Optional[OSBWatchlistItem]:
255 """개별 종목 분석 (OHLCV, BB, RS 등)."""
256 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(code, limit=90)
257 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else []
259 if not ohlcv:
260 if logger: logger.debug({"event": "drop", "code": code, "reason": "no_ohlcv"})
261 return None
263 period = self._cfg.high_breakout_period
264 closes = [r.get("close", 0) for r in ohlcv if r.get("close") is not None]
265 if len(closes) < 50:
266 if logger: logger.debug({"event": "drop", "code": code, "reason": "insufficient_data_len", "len": len(closes)})
267 return None
269 highs = [r.get("high", 0) for r in ohlcv[-period:] if r.get("high") is not None]
270 volumes = [r.get("volume", 0) for r in ohlcv[-period:] if r.get("volume") is not None]
272 if not highs or not volumes: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 if logger: logger.debug({"event": "drop", "code": code, "reason": "missing_high_or_volume"})
274 return None
276 ma_20d = sum(closes[-20:]) / 20
277 ma_50d = sum(closes[-50:]) / 50
278 high_20d = int(max(highs))
279 avg_vol_20d = sum(volumes) / len(volumes)
280 prev_close = closes[-1]
282 # 필터: 거래대금, 정배열
283 recent_5 = ohlcv[-5:]
284 tv_5d = sum([(r.get("volume", 0) * r.get("close", 0)) for r in recent_5]) / len(recent_5)
285 if tv_5d < self._cfg.min_avg_trading_value_5d:
286 if logger: logger.debug({"event": "drop", "code": code, "reason": "low_trading_value", "value": tv_5d})
287 return None
288 if not (prev_close > ma_20d > ma_50d):
289 if logger: logger.debug({"event": "drop", "code": code, "reason": "not_uptrend", "close": prev_close, "ma20": ma_20d, "ma50": ma_50d})
290 return None
292 if logger: logger.debug({"event": "pass_trend", "code": code, "reason": "uptrend_and_volume_ok"})
294 # 필터: 52주 고가 근접
295 full_resp = await self._sqs.get_current_price(code, caller="OneilUniverseService")
296 if not full_resp or full_resp.rt_cd != ErrorCode.SUCCESS.value:
297 if logger: logger.debug({"event": "drop", "code": code, "reason": "current_price_api_fail"})
298 return None
299 output = full_resp.data.get("output") if full_resp.data else None
300 if not output:
301 if logger: logger.debug({"event": "drop", "code": code, "reason": "no_price_output"})
302 return None
304 if isinstance(output, dict):
305 w52_hgpr = int(output.get("w52_hgpr") or 0)
306 # hts_avls: 시가총액(억), stck_llam: 상장주식수(주) - 시가총액 우선 사용 및 억 단위 보정
307 cap_billion = int(output.get("hts_avls") or output.get("stck_llam") or 0)
308 else:
309 w52_hgpr = int(getattr(output, "w52_hgpr", 0) or 0)
310 cap_billion = int(getattr(output, "hts_avls", 0) or getattr(output, "stck_llam", 0) or 0)
311 stck_llam = cap_billion * 100_000_000 # 억 단위 -> 원 단위 변환
313 # 필터: 시가총액 (2천억 ~ 2조)
314 if not (self._cfg.premium_stocks_cap_min <= stck_llam <= self._cfg.premium_stocks_cap_max):
315 if logger: logger.debug({"event": "drop", "code": code, "reason": "market_cap_out_of_range", "cap": stck_llam})
316 return None
318 dist = 0
319 if w52_hgpr > 0:
320 dist = ((w52_hgpr - prev_close) / w52_hgpr) * 100
321 if dist > self._cfg.near_52w_high_pct:
322 if logger: logger.debug({"event": "drop", "code": code, "reason": "far_from_52w_high", "dist": dist})
323 return None
325 if logger: logger.debug({"event": "pass_52w", "code": code, "dist": dist})
327 # BB 스퀴즈 (동기 계산: async/await 오버헤드 제거)
328 widths = self._indicator.calc_bb_widths_sync(
329 ohlcv, period=self._cfg.bb_period, multiplier=self._cfg.multiplier
330 )
332 if len(widths) < period:
333 if logger: logger.debug({"event": "drop", "code": code, "reason": "insufficient_bb_data"})
334 return None
336 bb_min = min(widths[-period:])
337 prev_width = widths[-1]
339 # 스퀴즈 조건 체크 (전일 BB폭 <= 20일 최소폭 * 1.2)
340 if prev_width > bb_min * self._cfg.squeeze_tolerance:
341 if logger: logger.debug({
342 "event": "drop", "code": code, "reason": "no_squeeze",
343 "prev_width": prev_width, "bb_min": bb_min,
344 "ratio": round(prev_width / bb_min, 2) if bb_min > 0 else 0
345 })
346 return None
348 if logger: logger.debug({
349 "event": "pass_squeeze", "code": code,
350 "prev_width": prev_width, "bb_min": bb_min
351 })
353 # RS 계산 (동기 계산: async/await 오버헤드 제거)
354 rs_return = self._indicator.calc_rs_sync(
355 ohlcv, period_days=self._cfg.rs_period_days
356 )
358 market = "KOSDAQ" if self.stock_code_repository.is_kosdaq(code) else "KOSPI"
360 if logger: logger.debug({"event": "selected", "code": code, "name": name})
362 return OSBWatchlistItem(
363 code=code, name=name, market=market,
364 high_20d=high_20d, ma_20d=ma_20d, ma_50d=ma_50d,
365 avg_vol_20d=avg_vol_20d, bb_width_min_20d=bb_min, prev_bb_width=prev_width,
366 w52_hgpr=w52_hgpr, avg_trading_value_5d=tv_5d, market_cap=stck_llam,
367 rs_return_3m=rs_return
368 )
370 # ── 전일 기준 우량주 생성 (배치) ─────────────────────────────────────────
372 async def generate_premium_watchlist(self, trading_date: Optional[str] = None) -> dict:
373 """전체 종목 스캔 -> 전일 기준 우량주 생성 및 파일 저장.
375 Args:
376 trading_date: 기준 거래일(YYYYMMDD). 지정하면 파일의 generated_date로 저장.
377 None이면 현재 날짜를 사용 (직접 호출 시 하위 호환).
378 """
379 # 전용 로거 생성 (logs/strategies/oneil/YYYYMMDD_HHMMSS_generate_premium_watchlist.log.json)
380 pool_a_logger = get_strategy_logger("generate_premium_watchlist", sub_dir="oneil_pool")
381 pool_a_logger.setLevel(logging.DEBUG)
383 self._logger.info({"event": "generate_premium_watchlist_started"})
384 pool_a_logger.info({"event": "generate_premium_watchlist_started"})
386 start_time = time.time()
387 start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
389 # 1. 전체 종목 로드
390 all_stocks = []
391 for _, row in self.stock_code_repository.df.iterrows():
392 code = row.get("종목코드", "")
393 name = row.get("종목명", "")
394 market = row.get("시장구분", "")
395 if code and market in ("KOSPI", "KOSDAQ"): 395 ↛ 391line 395 didn't jump to line 391 because the condition on line 395 was always true
396 all_stocks.append((code, name, market))
398 total_stocks = len(all_stocks)
399 print(f"[전일 기준 우량주 생성] 시작시간: {start_time_str} | 전체 종목 수: {total_stocks}개. 1차 필터링(시총) 시작...")
400 pool_a_logger.info({"event": "1st_filter_start", "total_stocks": total_stocks})
401 self._generation_progress = {
402 "running": True, "phase": "1차_필터(시총)",
403 "processed": 0, "total": total_stocks,
404 "passed": 0, "selected": 0, "elapsed": 0.0,
405 }
407 # 2. 1차 필터 (시총)
408 passed_first = []
409 processed_count = 0
410 for chunk in _chunked(all_stocks, self._cfg.api_chunk_size):
411 tasks = [self._sqs.get_current_price(c, caller="OneilUniverseService") for c, _, _ in chunk]
412 results = await asyncio.gather(*tasks, return_exceptions=True)
413 for (code, name, market), resp in zip(chunk, results):
414 if isinstance(resp, Exception) or not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
415 error_msg = str(resp) if isinstance(resp, Exception) else (getattr(resp, 'msg1', 'No response') if resp else 'Empty response')
416 pool_a_logger.warning({
417 "event": "api_error_1st_filter",
418 "code": code,
419 "name": name,
420 "error": error_msg
421 })
422 continue
423 out = resp.data.get("output") if resp.data else None
424 if not out: continue 424 ↛ 413line 424 didn't jump to line 413 because the continue on line 424 wasn't executed
426 if isinstance(out, dict): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 val_avls = out.get("hts_avls")
428 val_llam = out.get("stck_llam")
429 else:
430 val_avls = getattr(out, "hts_avls", None)
431 val_llam = getattr(out, "stck_llam", None)
433 if val_avls:
434 cap = int(val_avls) * 100_000_000
435 else:
436 # Fallback: stck_llam 사용 (테스트 호환성 위해 큰 값은 원 단위로 처리)
437 val = int(val_llam or 0)
438 cap = val if val > 100_000_000 else val * 100_000_000
440 if self._cfg.premium_stocks_cap_min <= cap <= self._cfg.premium_stocks_cap_max:
441 passed_first.append((code, name, market))
442 pool_a_logger.debug({"event": "pass_1st", "code": code, "name": name, "market_cap(억)": cap/100_000_000})
443 else:
444 # pool_a_logger.debug({"event": "drop_1st", "code": code, "reason": "market_cap", "cap": cap})
445 pass
447 processed_count += len(chunk)
448 if processed_count % 100 == 0 or processed_count >= total_stocks: 448 ↛ 410line 448 didn't jump to line 410 because the condition on line 448 was always true
449 pct = (processed_count / total_stocks * 100) if total_stocks > 0 else 0.0
450 elapsed = time.time() - start_time
451 print(f" > [1차 필터] 진행: {processed_count}/{total_stocks} ({pct:.1f}%) | 통과: {len(passed_first)} | 소요: {elapsed:.1f}s")
452 pool_a_logger.info({"event": "1st_filter_progress", "processed": processed_count, "total": total_stocks, "passed": len(passed_first)})
453 self._generation_progress.update({
454 "processed": processed_count, "passed": len(passed_first), "elapsed": round(elapsed, 1),
455 })
458 print(f"[전일 기준 우량주 생성] 1차 필터 완료. 통과: {len(passed_first)}개. 2차 상세 분석(OHLCV/지표) 시작...")
459 pool_a_logger.info({"event": "1st_filter_done", "passed": len(passed_first)})
460 pool_a_logger.info({"event": "2nd_filter_start", "total_candidates": len(passed_first)})
461 self._generation_progress.update({
462 "phase": "2차_필터(지표)", "processed": 0, "total": len(passed_first), "selected": 0,
463 })
465 # 3. 2차 필터 (상세 분석)
466 items = []
467 total_passed = len(passed_first)
468 processed_count_2 = 0
469 for chunk in _chunked(passed_first, self._cfg.api_chunk_size):
470 for code, name, market in chunk:
471 item = await self._analyze_candidate(code, name, logger=pool_a_logger)
472 if item: 472 ↛ 470line 472 didn't jump to line 470 because the condition on line 472 was always true
473 items.append(item)
475 processed_count_2 += len(chunk)
476 if processed_count_2 % 50 == 0 or processed_count_2 >= total_passed: 476 ↛ 469line 476 didn't jump to line 469 because the condition on line 476 was always true
477 pct2 = (processed_count_2 / total_passed * 100) if total_passed > 0 else 0.0
478 elapsed = time.time() - start_time
479 print(f" > [2차 필터] 진행: {processed_count_2}/{total_passed} ({pct2:.1f}%) | 선정: {len(items)} | 소요: {elapsed:.1f}s")
480 pool_a_logger.info({"event": "2nd_filter_progress", "processed": processed_count_2, "total": total_passed, "selected": len(items)})
481 self._generation_progress.update({
482 "processed": processed_count_2, "selected": len(items), "elapsed": round(elapsed, 1),
483 })
486 pool_a_logger.info({"event": "2nd_filter_done", "selected": len(items)})
487 self._generation_progress.update({"phase": "스코어링"})
489 # 4. 스코어링 및 저장
490 self._compute_rs_scores(items, logger=pool_a_logger)
491 await self._compute_profit_growth_scores(items, logger=pool_a_logger)
492 await self._compute_smart_money_scores(items, logger=pool_a_logger, date=trading_date)
493 self._compute_total_scores(items, logger=pool_a_logger)
494 pool_a_logger.info({"event": "scoring_done"})
496 sort_key = lambda x: (x.total_score, self._calc_turnover_ratio(x))
497 kospi = sorted([i for i in items if i.market != "KOSDAQ"], key=sort_key, reverse=True)[:self._cfg.premium_stocks_kospi_size]
498 kosdaq = sorted([i for i in items if i.market == "KOSDAQ"], key=sort_key, reverse=True)[:self._cfg.premium_stocks_kosdaq_size]
500 self._save_premium_stocks(kospi, kosdaq, trading_date=trading_date)
501 pool_a_logger.info({"event": "save_done", "kospi_count": len(kospi), "kosdaq_count": len(kosdaq)})
503 total_elapsed = time.time() - start_time
504 print(f"[전일 기준 우량주 생성] 완료. 총 소요시간: {total_elapsed:.1f}초")
505 pool_a_logger.info({"event": "generate_premium_watchlist_finished", "elapsed_seconds": total_elapsed})
506 self._generation_progress.update({"running": False, "phase": None, "elapsed": round(total_elapsed, 1)})
508 # 시총 범위 문자열 생성 (예: 2000억 ~ 2조)
509 min_cap = self._cfg.premium_stocks_cap_min // 100000000
510 max_cap = self._cfg.premium_stocks_cap_max // 100000000
511 cap_str = f"{min_cap}억 ~ {max_cap}억"
512 if self._cfg.premium_stocks_cap_max >= 1000000000000: 512 ↛ 515line 512 didn't jump to line 515 because the condition on line 512 was always true
513 cap_str = f"{min_cap}억 ~ {self._cfg.premium_stocks_cap_max // 1000000000000}조"
515 return {
516 "kospi_count": len(kospi), "kosdaq_count": len(kosdaq),
517 "total_scanned": len(all_stocks), "scanned": len(all_stocks),
518 "passed_first": len(passed_first), "first_filter_passed": len(passed_first),
519 "second_filter_passed": len(items),
520 "market_cap_filter": cap_str,
521 "total_elapsed_seconds": total_elapsed
522 }
524 # ── 헬퍼 메서드 ───────────────────────────────────────────────
526 def _should_refresh_watchlist(self) -> bool:
527 now = self._tm.get_current_kst_time()
528 open_time = self._tm.get_market_open_time()
529 elapsed = (now - open_time).total_seconds() / 60
531 triggered = False
532 for t_min in self._cfg.watchlist_refresh_minutes:
533 if elapsed >= t_min and t_min not in self._watchlist_refresh_done:
534 self._watchlist_refresh_done.add(t_min)
535 triggered = True
536 return triggered
538 async def _update_market_timing(self, logger: Optional[logging.Logger] = None):
539 logger = logger or self._logger
540 for market, code in [("KOSDAQ", self._cfg.kosdaq_etf_code), ("KOSPI", self._cfg.kospi_etf_code)]:
541 self._market_timing_cache[market] = await self._check_etf_ma_rising(code, logger=logger)
543 async def _check_etf_ma_rising(self, etf_code: str, logger: Optional[logging.Logger] = None) -> bool:
544 logger = logger or self._logger
545 period = self._cfg.market_ma_period
546 days = self._cfg.market_ma_rising_days
547 ohlcv_resp = await self._sqs.get_recent_daily_ohlcv(etf_code, limit=period + days + 5)
548 ohlcv = ohlcv_resp.data if ohlcv_resp and ohlcv_resp.rt_cd == ErrorCode.SUCCESS.value else []
550 if not ohlcv or len(ohlcv) < period + days: # This check should be based on the actual 'closes' list length
551 return False
553 closes = [r.get("close", 0) for r in ohlcv] # Changed: Do not filter out 0 values
554 ma_values = []
555 for i in range(days + 1):
556 end = len(closes) - days + i
557 ma_values.append(sum(closes[end-period:end]) / period)
559 is_rising = True
560 fail_detail = ""
561 for j in range(1, len(ma_values)):
562 if ma_values[j] <= ma_values[j-1]:
563 is_rising = False
564 fail_detail = f"MA decline: {ma_values[j-1]:.2f} -> {ma_values[j]:.2f} (idx {j})"
565 break
567 log_data = {
568 "event": "market_timing_check",
569 "etf_code": etf_code,
570 "is_rising": is_rising,
571 "ma_values": [round(v, 2) for v in ma_values]
572 }
573 if not is_rising:
574 log_data["fail_detail"] = fail_detail
576 logger.debug(log_data)
578 return is_rising
580 def _compute_rs_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None):
581 logger = logger or self._logger
582 if not items: return
583 logger.debug({"event": "compute_rs_scores_started", "item_count": len(items)})
584 rets = sorted([i.rs_return_3m for i in items])
586 # 백분위수 계산 (상위 10% -> 90 백분위수)
587 percentile_index = min(int(len(rets) * (1 - self._cfg.rs_top_percentile / 100)), len(rets) - 1)
588 cutoff = rets[percentile_index]
590 logger.debug({
591 "event": "rs_score_calculation_details",
592 "item_count": len(items),
593 "top_percentile_config": self._cfg.rs_top_percentile,
594 "cutoff_return": round(cutoff, 2),
595 "returns_distribution": {
596 "min": round(rets[0], 2),
597 "p25": round(rets[int(len(rets) * 0.25)], 2),
598 "median": round(rets[int(len(rets) * 0.5)], 2),
599 "p75": round(rets[int(len(rets) * 0.75)], 2),
600 "max": round(rets[-1], 2)
601 }
602 })
604 for item in items:
605 is_top_tier = item.rs_return_3m >= cutoff
606 item.rs_score = self._cfg.rs_score_points if is_top_tier else 0.0
607 if is_top_tier:
608 logger.debug({
609 "event": "rs_score_assigned", "code": item.code, "name": item.name,
610 "return_3m": round(item.rs_return_3m, 2), "score": item.rs_score
611 })
612 logger.debug({"event": "compute_rs_scores_finished"})
614 async def _compute_profit_growth_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None):
615 logger = logger or self._logger
616 if not items: return 616 ↛ exitline 616 didn't return from function '_compute_profit_growth_scores' because the return on line 616 wasn't executed
617 logger.debug({"event": "compute_profit_growth_scores_started", "item_count": len(items)})
619 for chunk in _chunked(items, self._cfg.api_chunk_size):
620 # API 대신 스크래퍼의 메서드를 호출
621 tasks = [self._scraper.fetch_yoy_profit_growth(i.code) for i in chunk]
622 results = await asyncio.gather(*tasks, return_exceptions=True)
624 for item, growth in zip(chunk, results):
625 if isinstance(growth, Exception):
626 logger.warning({"event": "profit_growth_scraping_error", "code": item.code, "error": str(growth)})
627 item.profit_growth_score = 0.0
628 continue
630 # 턴어라운드(999.0)이거나 설정한 한계치 이상의 성장이면 스코어 부여
631 if growth >= self._cfg.profit_growth_threshold_pct or growth == 999.0:
632 item.profit_growth_score = self._cfg.profit_growth_score_points
633 logger.debug({
634 "event": "profit_growth_score_assigned",
635 "code": item.code,
636 "name": item.name,
637 "growth_pct": "Turnaround" if growth == 999.0 else round(growth, 2),
638 "score": item.profit_growth_score
639 })
640 else:
641 item.profit_growth_score = 0.0
643 logger.debug({"event": "compute_profit_growth_scores_finished"})
645 async def _compute_smart_money_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None, date: Optional[str] = None):
646 """3일 누적 외국인+기관 순매수금액 기반 스마트머니 스코어링.
648 조건 A: 3일 누적 (외국인 + 기관 순매수금액) >= 시총의 smart_money_to_mcap_pct%
649 조건 B: 3일 누적 (외국인 + 기관 순매수금액) >= 3일 누적 총거래대금의 smart_money_to_tv_pct%
650 A 또는 B 만족 시 smart_money_score_points 부여.
652 단위: frgn/orgn_ntby_tr_pbmn 은 백만원 → *1_000_000 = 원, acml_tr_pbmn 은 원.
653 """
654 logger = logger or self._logger
655 if not items: return 655 ↛ exitline 655 didn't return from function '_compute_smart_money_scores' because the return on line 655 wasn't executed
656 days = self._cfg.smart_money_lookback_days
657 logger.debug({"event": "compute_smart_money_scores_started", "item_count": len(items), "lookback_days": days})
659 for chunk in _chunked(items, self._cfg.api_chunk_size):
660 tasks = [self._sqs.get_investor_trade_daily_multi(i.code, date, days) for i in chunk]
661 results = await asyncio.gather(*tasks, return_exceptions=True)
663 for item, resp in zip(chunk, results):
664 item.smart_money_score = 0.0
665 if isinstance(resp, Exception): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 logger.warning({"event": "smart_money_api_error", "code": item.code, "error": str(resp)})
667 continue
668 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value or not resp.data: 668 ↛ 671line 668 didn't jump to line 671 because the condition on line 668 was always true
669 continue
671 rows = resp.data # list of dicts, newest first
672 sum_fi_won = 0.0 # 3일 누적 외국인+기관 순매수금액 (원)
673 sum_tv_won = 0.0 # 3일 누적 총거래대금 (원)
674 for row in rows:
675 frgn = float(row.get("frgn_ntby_tr_pbmn", "0") or "0")
676 orgn = float(row.get("orgn_ntby_tr_pbmn", "0") or "0")
677 tv = float(row.get("acml_tr_pbmn", "0") or "0")
678 sum_fi_won += (frgn + orgn) * 1_000_000 # 백만원 → 원
679 sum_tv_won += tv # 이미 원 단위
681 mcap = float(item.market_cap) if item.market_cap else 0.0
682 cond_a = mcap > 0 and sum_fi_won >= mcap * (self._cfg.smart_money_to_mcap_pct / 100.0)
683 cond_b = sum_tv_won > 0 and sum_fi_won >= sum_tv_won * (self._cfg.smart_money_to_tv_pct / 100.0)
685 if cond_a or cond_b:
686 item.smart_money_score = self._cfg.smart_money_score_points
687 logger.debug({
688 "event": "smart_money_score_assigned",
689 "code": item.code, "name": item.name,
690 "sum_fi_억": round(sum_fi_won / 1e8, 2),
691 "mcap_억": round(mcap / 1e8, 2),
692 "sum_tv_억": round(sum_tv_won / 1e8, 2),
693 "cond_a": cond_a, "cond_b": cond_b,
694 "score": item.smart_money_score,
695 })
697 logger.debug({"event": "compute_smart_money_scores_finished"})
699 def _compute_total_scores(self, items: List[OSBWatchlistItem], logger: Optional[logging.Logger] = None):
700 logger = logger or self._logger
701 if not items: return
702 logger.debug({"event": "compute_total_scores_started", "item_count": len(items)})
703 for item in items:
704 item.total_score = item.rs_score + item.profit_growth_score + item.smart_money_score
705 if item.total_score > 0: 705 ↛ 703line 705 didn't jump to line 703 because the condition on line 705 was always true
706 logger.debug({
707 "event": "total_score_calculated", "code": item.code, "name": item.name,
708 "rs_score": item.rs_score, "profit_score": item.profit_growth_score,
709 "smart_money_score": item.smart_money_score,
710 "total_score": item.total_score
711 })
712 logger.debug({"event": "compute_total_scores_finished"})
714 def _save_premium_stocks(self, kospi, kosdaq, trading_date: Optional[str] = None):
715 """전일 기준 우량주를 파일에 저장한다.
717 Args:
718 trading_date: 기준 거래일(YYYYMMDD). generated_date 필드에 기록.
719 None이면 현재 날짜를 사용.
720 """
721 try:
722 os.makedirs(os.path.dirname(self._cfg.premium_stocks_file), exist_ok=True)
723 now = self._tm.get_current_kst_time()
724 data = {
725 # generated_date: 어떤 거래일 기준으로 생성됐는지 (스킵 로직의 기준)
726 "generated_date": trading_date or now.strftime("%Y%m%d"),
727 # generated_at: 실제 파일을 저장한 시각 (주말/공휴일에 생성 가능)
728 "generated_at": now.strftime("%Y-%m-%dT%H:%M:%S"),
729 "kospi": [asdict(i) for i in kospi],
730 "kosdaq": [asdict(i) for i in kosdaq]
731 }
732 with open(self._cfg.premium_stocks_file, "w", encoding="utf-8") as f:
733 json.dump(data, f, ensure_ascii=False, indent=2)
734 except Exception as e:
735 self._logger.error(f"Failed to save premium stocks: {e}")
737 def _load_premium_stocks(self) -> List[OSBWatchlistItem]:
738 if not os.path.exists(self._cfg.premium_stocks_file):
739 return []
740 try:
741 with open(self._cfg.premium_stocks_file, "r", encoding="utf-8") as f:
742 data = json.load(f)
743 # 날짜 체크 (오늘/어제만 유효)
744 gen_date = data.get("generated_date", "")
746 try:
747 gen_dt = datetime.strptime(gen_date, "%Y%m%d").date()
748 curr_dt = self._tm.get_current_kst_time().date()
749 # 7일 이내만 유효 (한국 최장 연휴 5일 + 여유)
750 # generated_date는 거래일 기준이므로 월요일에 금요일 파일도 유효
751 if (curr_dt - gen_dt).days > 7:
752 return []
753 except ValueError:
754 return []
756 items = []
757 for k in ["kospi", "kosdaq"]:
758 for d in data.get(k, []): 758 ↛ 759line 758 didn't jump to line 759 because the loop on line 758 never started
759 items.append(OSBWatchlistItem(**d))
760 return items
761 except Exception:
762 return []
764 def get_premium_stocks_meta(self) -> Optional[dict]:
765 """저장된 전일 기준 우량주 파일의 메타데이터 반환. 파일 없으면 None."""
766 if not os.path.exists(self._cfg.premium_stocks_file):
767 return None
768 try:
769 with open(self._cfg.premium_stocks_file, "r", encoding="utf-8") as f:
770 data = json.load(f)
771 return {
772 "generated_date": data.get("generated_date"),
773 "generated_at": data.get("generated_at"),
774 }
775 except Exception:
776 return None
778 @staticmethod
779 def _calc_turnover_ratio(item: OSBWatchlistItem) -> float:
780 """회전율 계산: (5일 평균 거래대금 / 시가총액). 동점자 처리용."""
781 return (item.avg_trading_value_5d / item.market_cap) if item.market_cap > 0 else 0
783 @staticmethod
784 def _extract_op_profit_growth(data) -> float:
785 """API 응답에서 영업이익 증가율 추출.
787 resp.data 구조: {"rt_cd": "0", "output": [{"stac_yymm": "...", "bsop_prfi_inrt": "...", ...}]}
788 output 리스트의 첫 번째 항목(최신 분기)에서 영업이익 관련 필드를 탐색.
789 """
790 try:
791 # API 응답 dict에서 output 리스트 추출
792 if isinstance(data, dict):
793 output = data.get("output", data)
794 else:
795 output = data
797 target = output[0] if isinstance(output, list) and output else output
798 if isinstance(target, dict):
799 for k in ["bsop_prti_icdc", "sale_totl_prfi_icdc", "op_profit_growth", "bsop_prfi_inrt", "grs"]:
800 if val := target.get(k): return float(val)
801 except: pass
802 return 0.0