Coverage for task / background / after_market / cache_warmup_task.py: 94%
162 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# task/background/after_market/cache_warmup_task.py
2"""
3Watchlist / 보유종목 / 관심종목(우량주) 위주로 캐시를 사전 구성하는 백그라운드 태스크.
5장 마감 후 전략 실행에 자주 쓰이는 종목들의 가격 요약 데이터를 미리 캐시에 적재하여
6다음날 장 시작 전·후 전략이 빠르게 데이터에 접근할 수 있도록 한다.
8대상 종목 (우선순위 순):
9 1. OneilUniverseService watchlist — 전략이 직접 참조하는 핵심 관심 풀
10 2. 보유종목(계좌 잔고 output2.pdno) — 리스크 관리 최우선
11 3. 우량주 풀(data/premium_stocks.json) — 전일기준 주도주
12"""
13from __future__ import annotations
15import asyncio
16import json
17import logging
18import os
19import time
20from typing import Dict, List, Optional, Set, TYPE_CHECKING
22from common.types import ErrorCode
23from task.background.after_market.after_market_task_base import AfterMarketTask
24from interfaces.schedulable_task import TaskState
25from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
27if TYPE_CHECKING: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 from services.market_data_service import MarketDataService
29 from services.stock_query_service import StockQueryService
30 from services.oneil_universe_service import OneilUniverseService
31 from services.market_calendar_service import MarketCalendarService
32 from core.market_clock import MarketClock
34_PREMIUM_STOCKS_PATH = os.path.join(
35 os.path.dirname(os.path.abspath(__file__)),
36 "..", "..", "..", "data", "premium_stocks.json",
37)
39# 청크당 병렬 호출 수 — 가격 조회는 가벼우므로 다소 넉넉하게 설정
40_API_CHUNK_SIZE = 10
41_CHUNK_SLEEP_SEC = 0.8
44def _chunked(lst: list, size: int):
45 for i in range(0, len(lst), size):
46 yield lst[i:i + size]
49class CacheWarmupTask(AfterMarketTask):
50 """장 마감 후 주요 관심 종목의 가격 데이터를 캐시에 사전 적재하는 태스크."""
52 def __init__(
53 self,
54 market_data_service: "MarketDataService",
55 stock_query_service: "StockQueryService",
56 universe_service: Optional["OneilUniverseService"] = None,
57 market_calendar_service: Optional["MarketCalendarService"] = None,
58 market_clock: Optional["MarketClock"] = None,
59 notification_service: Optional["NotificationService"] = None,
60 logger=None,
61 ) -> None:
62 super().__init__(
63 mcs=market_calendar_service,
64 market_clock=market_clock,
65 logger=logger or logging.getLogger(__name__),
66 )
67 self._mds = market_data_service
68 self._sqs = stock_query_service
69 self._universe_service = universe_service
70 self._ns = notification_service
72 self._suspend_event: asyncio.Event = asyncio.Event()
73 self._suspend_event.set()
75 self._is_warming: bool = False
76 self._last_warmed_date: Optional[str] = None
77 self._progress: Dict = {
78 "running": False,
79 "processed": 0,
80 "total": 0,
81 "cached": 0,
82 "failed": 0,
83 "elapsed": 0.0,
84 "last_warmed_date": None,
85 }
87 # ── SchedulableTask 인터페이스 ────────────────────────────────
89 @property
90 def task_name(self) -> str:
91 return "cache_warmup"
93 @property
94 def _scheduler_label(self) -> str:
95 return "CacheWarmup"
97 async def start(self) -> None:
98 if self._state == TaskState.RUNNING:
99 return
100 self._state = TaskState.RUNNING
101 self._suspend_event.set()
102 self._tasks.append(asyncio.create_task(self._after_market_scheduler()))
103 self._logger.info("CacheWarmupTask 시작")
105 async def suspend(self) -> None:
106 if self._state == TaskState.RUNNING:
107 self._suspend_event.clear()
108 self._state = TaskState.SUSPENDED
109 self._logger.info("CacheWarmupTask 일시 중지")
111 async def resume(self) -> None:
112 if self._state == TaskState.SUSPENDED:
113 self._suspend_event.set()
114 self._state = TaskState.RUNNING
115 self._logger.info("CacheWarmupTask 재개")
117 def get_progress(self) -> Dict:
118 return dict(self._progress)
120 # ── 장 마감 콜백 ─────────────────────────────────────────────
122 async def _on_market_closed(self, latest_trading_date: str) -> None:
123 """장 마감 후 콜백: 해당 거래일 캐시 웜업이 필요하면 실행."""
124 if self._last_warmed_date == latest_trading_date:
125 self._logger.info(
126 f"CacheWarmupTask: {latest_trading_date} 이미 웜업 완료 — 스킵"
127 )
128 return
129 await self._run_warmup(latest_trading_date)
131 # ── 강제 실행 ─────────────────────────────────────────────────
133 async def force_warmup(self) -> None:
134 """skip 조건을 무시하고 즉시 캐시 웜업을 실행한다."""
135 self._logger.info("CacheWarmupTask 강제 웜업 요청")
136 target_date = None
137 if self._mcs:
138 target_date = await self._mcs.get_latest_trading_date()
139 if not target_date:
140 self._logger.error("최근 거래일을 확인할 수 없어 강제 웜업을 중단합니다.")
141 return
142 await self._run_warmup(target_date)
144 # ── 핵심 웜업 로직 ────────────────────────────────────────────
146 async def _run_warmup(self, trading_date: str) -> None:
147 if self._is_warming:
148 self._logger.info("CacheWarmupTask: 웜업 이미 진행 중 — 스킵")
149 return
151 self._is_warming = True
152 start_time = time.time()
153 self._progress = {
154 "running": True,
155 "processed": 0,
156 "total": 0,
157 "cached": 0,
158 "failed": 0,
159 "elapsed": 0.0,
160 "last_warmed_date": self._last_warmed_date,
161 }
163 try:
164 codes = await self._collect_target_codes()
165 total = len(codes)
166 self._progress["total"] = total
167 self._logger.info(
168 f"CacheWarmupTask 웜업 시작 (기준일: {trading_date}, 대상 {total}개 종목)"
169 )
171 cached = 0
172 failed = 0
173 processed = 0
175 for chunk in _chunked(list(codes), _API_CHUNK_SIZE):
176 await self._suspend_event.wait()
178 tasks = [self._warmup_code(code) for code in chunk]
179 results = await asyncio.gather(*tasks, return_exceptions=True)
181 chunk_had_api_call = False
182 for result in results:
183 if isinstance(result, Exception): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 failed += 1
185 elif result is True:
186 cached += 1
187 chunk_had_api_call = True
188 else:
189 failed += 1
191 processed += len(chunk)
192 elapsed = time.time() - start_time
193 self._progress.update({
194 "processed": processed,
195 "cached": cached,
196 "failed": failed,
197 "elapsed": round(elapsed, 1),
198 })
200 if chunk_had_api_call:
201 await asyncio.sleep(_CHUNK_SLEEP_SEC)
202 else:
203 await asyncio.sleep(0)
205 self._last_warmed_date = trading_date
206 elapsed = time.time() - start_time
207 self._progress["last_warmed_date"] = trading_date
208 self._logger.info(
209 f"CacheWarmupTask 웜업 완료 (기준일: {trading_date}) "
210 f"캐시 적재: {cached}/{total}, 실패: {failed}, 소요: {elapsed:.1f}s"
211 )
212 if self._ns: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 await self._ns.emit(
214 NotificationCategory.BACKGROUND, NotificationLevel.INFO,
215 "캐시 웜업 완료",
216 f"{total}개 종목 중 {cached}개 캐시 적재 완료 (소요: {elapsed:.1f}초)",
217 )
219 except Exception as e:
220 self._logger.error(f"CacheWarmupTask 웜업 실패: {e}", exc_info=True)
221 if self._ns: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 await self._ns.emit(
223 NotificationCategory.BACKGROUND, NotificationLevel.ERROR,
224 "캐시 웜업 실패", str(e),
225 )
226 finally:
227 self._is_warming = False
228 self._progress["running"] = False
230 async def _warmup_code(self, code: str) -> bool:
231 """단일 종목의 가격 요약을 조회하여 캐시에 적재한다.
233 Returns:
234 True — API 호출(캐시 적재) 성공
235 False — 실패 또는 응답 오류
236 """
237 try:
238 resp = await self._mds.get_price_summary(code)
239 if resp and resp.rt_cd == ErrorCode.SUCCESS.value:
240 return True
241 return False
242 except asyncio.CancelledError:
243 raise
244 except Exception as e:
245 self._logger.debug(f"CacheWarmupTask: {code} 웜업 실패: {e}")
246 return False
248 # ── 대상 종목 수집 ────────────────────────────────────────────
250 async def _collect_target_codes(self) -> Set[str]:
251 """watchlist + 보유종목 + 우량주(관심종목) 코드를 중복 없이 수집한다."""
252 codes: Set[str] = set()
254 watchlist_codes = await self._get_watchlist_codes()
255 holdings_codes = await self._get_holdings_codes()
257 codes.update(watchlist_codes)
258 codes.update(holdings_codes)
260 self._logger.info(
261 f"CacheWarmupTask 대상 종목: watchlist {len(watchlist_codes)}개 "
262 f"+ 보유 {len(holdings_codes)}개 "
263 f"= 합산(중복제거) {len(codes)}개"
264 )
265 return codes
267 async def _get_watchlist_codes(self) -> List[str]:
268 """OneilUniverseService watchlist 종목 코드를 반환한다."""
269 if not self._universe_service:
270 return []
271 try:
272 watchlist = await self._universe_service.get_watchlist()
273 return list(watchlist.keys())
274 except Exception as e:
275 self._logger.warning(f"CacheWarmupTask: watchlist 조회 실패: {e}")
276 return []
278 async def _get_holdings_codes(self) -> List[str]:
279 """계좌 잔고(output2)에서 보유 종목 코드를 반환한다."""
280 try:
281 resp = await self._sqs.handle_get_account_balance()
282 if not (resp and resp.rt_cd == ErrorCode.SUCCESS.value and resp.data):
283 return []
284 holdings = (
285 resp.data.get("output1", [])
286 if isinstance(resp.data, dict)
287 else []
288 )
289 codes = []
290 for item in holdings:
291 code = item.get("pdno", "").strip() if isinstance(item, dict) else ""
292 if code:
293 codes.append(code)
294 return codes
295 except Exception as e:
296 self._logger.warning(f"CacheWarmupTask: 보유종목 조회 실패: {e}")
297 return []