Coverage for task / background / after_market / cache_warmup_task.py: 94%

162 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/after_market/cache_warmup_task.py 

2""" 

3Watchlist / 보유종목 / 관심종목(우량주) 위주로 캐시를 사전 구성하는 백그라운드 태스크. 

4 

5장 마감 후 전략 실행에 자주 쓰이는 종목들의 가격 요약 데이터를 미리 캐시에 적재하여 

6다음날 장 시작 전·후 전략이 빠르게 데이터에 접근할 수 있도록 한다. 

7 

8대상 종목 (우선순위 순): 

9 1. OneilUniverseService watchlist — 전략이 직접 참조하는 핵심 관심 풀 

10 2. 보유종목(계좌 잔고 output2.pdno) — 리스크 관리 최우선 

11 3. 우량주 풀(data/premium_stocks.json) — 전일기준 주도주 

12""" 

13from __future__ import annotations 

14 

15import asyncio 

16import json 

17import logging 

18import os 

19import time 

20from typing import Dict, List, Optional, Set, TYPE_CHECKING 

21 

22from common.types import ErrorCode 

23from task.background.after_market.after_market_task_base import AfterMarketTask 

24from interfaces.schedulable_task import TaskState 

25from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

26 

27if TYPE_CHECKING: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 from services.market_data_service import MarketDataService 

29 from services.stock_query_service import StockQueryService 

30 from services.oneil_universe_service import OneilUniverseService 

31 from services.market_calendar_service import MarketCalendarService 

32 from core.market_clock import MarketClock 

33 

34_PREMIUM_STOCKS_PATH = os.path.join( 

35 os.path.dirname(os.path.abspath(__file__)), 

36 "..", "..", "..", "data", "premium_stocks.json", 

37) 

38 

39# 청크당 병렬 호출 수 — 가격 조회는 가벼우므로 다소 넉넉하게 설정 

40_API_CHUNK_SIZE = 10 

41_CHUNK_SLEEP_SEC = 0.8 

42 

43 

44def _chunked(lst: list, size: int): 

45 for i in range(0, len(lst), size): 

46 yield lst[i:i + size] 

47 

48 

49class CacheWarmupTask(AfterMarketTask): 

50 """장 마감 후 주요 관심 종목의 가격 데이터를 캐시에 사전 적재하는 태스크.""" 

51 

52 def __init__( 

53 self, 

54 market_data_service: "MarketDataService", 

55 stock_query_service: "StockQueryService", 

56 universe_service: Optional["OneilUniverseService"] = None, 

57 market_calendar_service: Optional["MarketCalendarService"] = None, 

58 market_clock: Optional["MarketClock"] = None, 

59 notification_service: Optional["NotificationService"] = None, 

60 logger=None, 

61 ) -> None: 

62 super().__init__( 

63 mcs=market_calendar_service, 

64 market_clock=market_clock, 

65 logger=logger or logging.getLogger(__name__), 

66 ) 

67 self._mds = market_data_service 

68 self._sqs = stock_query_service 

69 self._universe_service = universe_service 

70 self._ns = notification_service 

71 

72 self._suspend_event: asyncio.Event = asyncio.Event() 

73 self._suspend_event.set() 

74 

75 self._is_warming: bool = False 

76 self._last_warmed_date: Optional[str] = None 

77 self._progress: Dict = { 

78 "running": False, 

79 "processed": 0, 

80 "total": 0, 

81 "cached": 0, 

82 "failed": 0, 

83 "elapsed": 0.0, 

84 "last_warmed_date": None, 

85 } 

86 

87 # ── SchedulableTask 인터페이스 ──────────────────────────────── 

88 

89 @property 

90 def task_name(self) -> str: 

91 return "cache_warmup" 

92 

93 @property 

94 def _scheduler_label(self) -> str: 

95 return "CacheWarmup" 

96 

97 async def start(self) -> None: 

98 if self._state == TaskState.RUNNING: 

99 return 

100 self._state = TaskState.RUNNING 

101 self._suspend_event.set() 

102 self._tasks.append(asyncio.create_task(self._after_market_scheduler())) 

103 self._logger.info("CacheWarmupTask 시작") 

104 

105 async def suspend(self) -> None: 

106 if self._state == TaskState.RUNNING: 

107 self._suspend_event.clear() 

108 self._state = TaskState.SUSPENDED 

109 self._logger.info("CacheWarmupTask 일시 중지") 

110 

111 async def resume(self) -> None: 

112 if self._state == TaskState.SUSPENDED: 

113 self._suspend_event.set() 

114 self._state = TaskState.RUNNING 

115 self._logger.info("CacheWarmupTask 재개") 

116 

117 def get_progress(self) -> Dict: 

118 return dict(self._progress) 

119 

120 # ── 장 마감 콜백 ───────────────────────────────────────────── 

121 

122 async def _on_market_closed(self, latest_trading_date: str) -> None: 

123 """장 마감 후 콜백: 해당 거래일 캐시 웜업이 필요하면 실행.""" 

124 if self._last_warmed_date == latest_trading_date: 

125 self._logger.info( 

126 f"CacheWarmupTask: {latest_trading_date} 이미 웜업 완료 — 스킵" 

127 ) 

128 return 

129 await self._run_warmup(latest_trading_date) 

130 

131 # ── 강제 실행 ───────────────────────────────────────────────── 

132 

133 async def force_warmup(self) -> None: 

134 """skip 조건을 무시하고 즉시 캐시 웜업을 실행한다.""" 

135 self._logger.info("CacheWarmupTask 강제 웜업 요청") 

136 target_date = None 

137 if self._mcs: 

138 target_date = await self._mcs.get_latest_trading_date() 

139 if not target_date: 

140 self._logger.error("최근 거래일을 확인할 수 없어 강제 웜업을 중단합니다.") 

141 return 

142 await self._run_warmup(target_date) 

143 

144 # ── 핵심 웜업 로직 ──────────────────────────────────────────── 

145 

146 async def _run_warmup(self, trading_date: str) -> None: 

147 if self._is_warming: 

148 self._logger.info("CacheWarmupTask: 웜업 이미 진행 중 — 스킵") 

149 return 

150 

151 self._is_warming = True 

152 start_time = time.time() 

153 self._progress = { 

154 "running": True, 

155 "processed": 0, 

156 "total": 0, 

157 "cached": 0, 

158 "failed": 0, 

159 "elapsed": 0.0, 

160 "last_warmed_date": self._last_warmed_date, 

161 } 

162 

163 try: 

164 codes = await self._collect_target_codes() 

165 total = len(codes) 

166 self._progress["total"] = total 

167 self._logger.info( 

168 f"CacheWarmupTask 웜업 시작 (기준일: {trading_date}, 대상 {total}개 종목)" 

169 ) 

170 

171 cached = 0 

172 failed = 0 

173 processed = 0 

174 

175 for chunk in _chunked(list(codes), _API_CHUNK_SIZE): 

176 await self._suspend_event.wait() 

177 

178 tasks = [self._warmup_code(code) for code in chunk] 

179 results = await asyncio.gather(*tasks, return_exceptions=True) 

180 

181 chunk_had_api_call = False 

182 for result in results: 

183 if isinstance(result, Exception): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 failed += 1 

185 elif result is True: 

186 cached += 1 

187 chunk_had_api_call = True 

188 else: 

189 failed += 1 

190 

191 processed += len(chunk) 

192 elapsed = time.time() - start_time 

193 self._progress.update({ 

194 "processed": processed, 

195 "cached": cached, 

196 "failed": failed, 

197 "elapsed": round(elapsed, 1), 

198 }) 

199 

200 if chunk_had_api_call: 

201 await asyncio.sleep(_CHUNK_SLEEP_SEC) 

202 else: 

203 await asyncio.sleep(0) 

204 

205 self._last_warmed_date = trading_date 

206 elapsed = time.time() - start_time 

207 self._progress["last_warmed_date"] = trading_date 

208 self._logger.info( 

209 f"CacheWarmupTask 웜업 완료 (기준일: {trading_date}) " 

210 f"캐시 적재: {cached}/{total}, 실패: {failed}, 소요: {elapsed:.1f}s" 

211 ) 

212 if self._ns: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 await self._ns.emit( 

214 NotificationCategory.BACKGROUND, NotificationLevel.INFO, 

215 "캐시 웜업 완료", 

216 f"{total}개 종목 중 {cached}개 캐시 적재 완료 (소요: {elapsed:.1f}초)", 

217 ) 

218 

219 except Exception as e: 

220 self._logger.error(f"CacheWarmupTask 웜업 실패: {e}", exc_info=True) 

221 if self._ns: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 await self._ns.emit( 

223 NotificationCategory.BACKGROUND, NotificationLevel.ERROR, 

224 "캐시 웜업 실패", str(e), 

225 ) 

226 finally: 

227 self._is_warming = False 

228 self._progress["running"] = False 

229 

230 async def _warmup_code(self, code: str) -> bool: 

231 """단일 종목의 가격 요약을 조회하여 캐시에 적재한다. 

232 

233 Returns: 

234 True — API 호출(캐시 적재) 성공 

235 False — 실패 또는 응답 오류 

236 """ 

237 try: 

238 resp = await self._mds.get_price_summary(code) 

239 if resp and resp.rt_cd == ErrorCode.SUCCESS.value: 

240 return True 

241 return False 

242 except asyncio.CancelledError: 

243 raise 

244 except Exception as e: 

245 self._logger.debug(f"CacheWarmupTask: {code} 웜업 실패: {e}") 

246 return False 

247 

248 # ── 대상 종목 수집 ──────────────────────────────────────────── 

249 

250 async def _collect_target_codes(self) -> Set[str]: 

251 """watchlist + 보유종목 + 우량주(관심종목) 코드를 중복 없이 수집한다.""" 

252 codes: Set[str] = set() 

253 

254 watchlist_codes = await self._get_watchlist_codes() 

255 holdings_codes = await self._get_holdings_codes() 

256 

257 codes.update(watchlist_codes) 

258 codes.update(holdings_codes) 

259 

260 self._logger.info( 

261 f"CacheWarmupTask 대상 종목: watchlist {len(watchlist_codes)}개 " 

262 f"+ 보유 {len(holdings_codes)}개 " 

263 f"= 합산(중복제거) {len(codes)}개" 

264 ) 

265 return codes 

266 

267 async def _get_watchlist_codes(self) -> List[str]: 

268 """OneilUniverseService watchlist 종목 코드를 반환한다.""" 

269 if not self._universe_service: 

270 return [] 

271 try: 

272 watchlist = await self._universe_service.get_watchlist() 

273 return list(watchlist.keys()) 

274 except Exception as e: 

275 self._logger.warning(f"CacheWarmupTask: watchlist 조회 실패: {e}") 

276 return [] 

277 

278 async def _get_holdings_codes(self) -> List[str]: 

279 """계좌 잔고(output2)에서 보유 종목 코드를 반환한다.""" 

280 try: 

281 resp = await self._sqs.handle_get_account_balance() 

282 if not (resp and resp.rt_cd == ErrorCode.SUCCESS.value and resp.data): 

283 return [] 

284 holdings = ( 

285 resp.data.get("output1", []) 

286 if isinstance(resp.data, dict) 

287 else [] 

288 ) 

289 codes = [] 

290 for item in holdings: 

291 code = item.get("pdno", "").strip() if isinstance(item, dict) else "" 

292 if code: 

293 codes.append(code) 

294 return codes 

295 except Exception as e: 

296 self._logger.warning(f"CacheWarmupTask: 보유종목 조회 실패: {e}") 

297 return []