Coverage for task / background / after_market / premium_watchlist_generator_task.py: 96%
82 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# task/background/after_market/premium_watchlist_generator_task.py
2"""
3전일 기준 우량주 생성 백그라운드 태스크.
5장 마감 후 자동으로 OneilUniverseService.generate_premium_watchlist()를 실행하여
6오닐 전략 전일 기준 우량주 풀을 갱신한다.
7"""
8import asyncio
9import logging
10import time
11from typing import Dict, Optional, TYPE_CHECKING
13from task.background.after_market.after_market_task_base import AfterMarketTask
14from interfaces.schedulable_task import TaskState
15from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true
18 from services.oneil_universe_service import OneilUniverseService
19 from services.market_calendar_service import MarketCalendarService
20 from core.market_clock import MarketClock
23class PremiumWatchlistGeneratorTask(AfterMarketTask):
24 """장 마감 후 전일 기준 우량주 풀을 자동 생성하는 백그라운드 태스크."""
26 def __init__(
27 self,
28 universe_service: "OneilUniverseService",
29 market_calendar_service: Optional["MarketCalendarService"] = None,
30 market_clock: Optional["MarketClock"] = None,
31 logger=None,
32 notification_service: Optional["NotificationService"] = None,
33 ):
34 super().__init__(
35 mcs=market_calendar_service,
36 market_clock=market_clock,
37 logger=logger or logging.getLogger(__name__),
38 )
39 self._universe_service = universe_service
40 self._ns = notification_service
42 self._is_generating: bool = False
43 self._last_generated_date: Optional[str] = None
44 self._progress: Dict = {
45 "running": False,
46 "last_generated_date": None,
47 "last_result": None,
48 }
50 # ── SchedulableTask 인터페이스 구현 ────────────────────────
52 @property
53 def task_name(self) -> str:
54 return "전일기준주도주_생성"
56 @property
57 def _scheduler_label(self) -> str:
58 return "전일기준우량주생성"
60 async def start(self) -> None:
61 if self._state == TaskState.RUNNING:
62 return
63 self._state = TaskState.RUNNING
64 self._tasks.append(asyncio.create_task(self._after_market_scheduler()))
65 self._logger.info("PremiumWatchlistGeneratorTask 시작")
67 def get_progress(self) -> Dict:
68 result = dict(self._progress)
69 gen = self._universe_service.generation_progress
70 result.update({
71 "phase": gen.get("phase"),
72 "processed": gen.get("processed", 0),
73 "total": gen.get("total", 0),
74 "passed": gen.get("passed", 0),
75 "selected": gen.get("selected", 0),
76 "elapsed": gen.get("elapsed", 0.0),
77 })
78 return result
80 async def _on_market_closed(self, latest_trading_date: str) -> None:
81 """장 마감 후 콜백: 해당 거래일의 생성이 필요하면 실행.
83 인메모리 기록과 파일 메타데이터를 모두 확인하여
84 이미 생성된 경우 재실행을 생략한다 (서버 재시작 후에도 유효).
85 """
86 if self._last_generated_date == latest_trading_date:
87 if self._ns:
88 await self._ns.emit(
89 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전일기준우량주 생성 스킵",
90 f"{latest_trading_date} 이미 생성 완료된 상태입니다."
91 )
92 return
94 # 파일에 이미 당일 기준 우량주가 저장되어 있으면 재생성 불필요
95 meta = self._universe_service.get_premium_stocks_meta()
96 if meta and meta.get("generated_date") == latest_trading_date:
97 self._last_generated_date = latest_trading_date
98 self._progress["last_generated_date"] = latest_trading_date
99 self._logger.info(
100 f"전일 기준 우량주 이미 생성됨 (기준일: {latest_trading_date}, "
101 f"생성시각: {meta.get('generated_at', '알 수 없음')}) — 생성 스킵"
102 )
103 if self._ns:
104 await self._ns.emit(
105 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전일기준우량주 생성 스킵",
106 f"{latest_trading_date} 이미 생성 완료된 상태입니다."
107 )
108 return
110 await self._run_generation(latest_trading_date)
112 async def _run_generation(self, trading_date: str) -> None:
113 if self._is_generating:
114 self._logger.info("전일 기준 우량주 생성 이미 진행 중 — 스킵")
115 return
117 self._is_generating = True
118 self._progress["running"] = True
119 start_time = time.time()
120 self._logger.info(f"전일 기준 우량주 생성 시작 (기준일: {trading_date})")
122 try:
123 result = await self._universe_service.generate_premium_watchlist(trading_date=trading_date)
124 elapsed = time.time() - start_time
125 self._last_generated_date = trading_date
126 self._progress["last_generated_date"] = trading_date
127 self._progress["last_result"] = result
128 self._logger.info(
129 f"전일 기준 우량주 생성 완료: "
130 f"KOSPI {result.get('kospi_count')}종목, "
131 f"KOSDAQ {result.get('kosdaq_count')}종목, "
132 f"소요: {elapsed:.1f}초"
133 )
134 if self._ns:
135 await self._ns.emit(
136 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전일기준우량주 생성 완료",
137 f"KOSPI {result.get('kospi_count')}개, KOSDAQ {result.get('kosdaq_count')}개 종목 수집 완료 (소요: {elapsed:.1f}초)"
138 )
139 except Exception as e:
140 self._logger.error(f"전일 기준 우량주 생성 실패: {e}", exc_info=True)
141 if self._ns:
142 await self._ns.emit(NotificationCategory.BACKGROUND, NotificationLevel.ERROR, "전일기준우량주 생성 실패", str(e))
143 finally:
144 self._is_generating = False
145 self._progress["running"] = False
147 async def force_generate(self) -> None:
148 """강제 생성: skip 조건을 무시하고 전일 기준 우량주를 재생성한다."""
149 self._logger.info("PremiumWatchlistGeneratorTask 강제 생성 요청")
150 target_date = None
151 if self._mcs:
152 target_date = await self._mcs.get_latest_trading_date()
153 if not target_date:
154 self._logger.error("최근 거래일을 확인할 수 없어 강제 생성을 중단합니다.")
155 return
156 await self._run_generation(target_date)