Coverage for task / background / after_market / ohlcv_update_task.py: 96%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/ohlcv_update_task.py 

2""" 

3장 마감 후 전체 종목의 OHLCV를 DB에 저장하는 백그라운드 태스크. 

4- 당일 OHLCV 및 전략에 필요한 최대 600일치 역사 데이터를 유지한다. 

5- DB에 이미 존재하는 날짜는 API를 호출하지 않아 불필요한 중복 요청을 방지한다. 

6""" 

7import asyncio 

8import logging 

9import time 

10from typing import List, Dict, Optional, TYPE_CHECKING 

11 

12from common.types import ErrorCode 

13from core.performance_profiler import PerformanceProfiler 

14from core.market_clock import MarketClock 

15from task.background.after_market.after_market_task_base import AfterMarketTask 

16from interfaces.schedulable_task import TaskState 

17from repositories.stock_repository import StockRepository 

18from repositories.stock_code_repository import StockCodeRepository 

19from services.market_calendar_service import MarketCalendarService 

20from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

21 

22if TYPE_CHECKING: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true

23 from services.stock_query_service import StockQueryService 

24 

25 

26def _chunked(lst, size): 

27 for i in range(0, len(lst), size): 

28 yield lst[i:i + size] 

29 

30 

31# ETF/ETN 브랜드명 접두사 (DailyPriceCollectorTask와 동일) 

32_ETF_PREFIXES = ( 

33 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE", 

34 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS", 

35 "VITA", "TREX", "MASTER", "WOORI", "KINDEX", 

36) 

37 

38 

39class OhlcvUpdateTask(AfterMarketTask): 

40 """장 마감 후 전체 종목의 OHLCV를 수집하여 DB에 저장하는 백그라운드 태스크. 

41 

42 - DB에 이미 TARGET_OHLCV_DAYS일치 데이터가 있고 당일 날짜까지 갱신된 종목은 스킵. 

43 - 데이터가 부족하거나 당일 캔들이 없는 종목만 API를 호출하여 저장. 

44 - StockQueryService.get_ohlcv()가 내부적으로 누락 구간만 API 호출 후 DB에 upsert하므로 

45 중복된 날짜는 자동으로 INSERT OR REPLACE 처리된다. 

46 """ 

47 

48 TARGET_OHLCV_DAYS = 600 # 전략에서 최대 600일치를 사용하므로 동일하게 유지 

49 API_CHUNK_SIZE = 4 # 병렬 처리 종목 수 (OHLCV는 현재가보다 API 비용이 높음) 

50 CHUNK_SLEEP_SEC = 1.5 # 청크 간 대기 시간 (API 레이트 리밋 준수) 

51 

52 def __init__( 

53 self, 

54 stock_query_service: "StockQueryService", 

55 stock_code_repository: StockCodeRepository, 

56 stock_repo: StockRepository, 

57 market_calendar_service: Optional[MarketCalendarService] = None, 

58 market_clock: Optional[MarketClock] = None, 

59 performance_profiler: Optional[PerformanceProfiler] = None, 

60 notification_service: Optional[NotificationService] = None, 

61 logger=None, 

62 ): 

63 super().__init__( 

64 mcs=market_calendar_service, 

65 market_clock=market_clock, 

66 logger=logger or logging.getLogger(__name__), 

67 ) 

68 self._stock_query_service = stock_query_service 

69 self.stock_code_repository = stock_code_repository 

70 self._stock_repo = stock_repo 

71 self._pm = performance_profiler or PerformanceProfiler(enabled=False) 

72 self._ns = notification_service 

73 self._suspend_event: asyncio.Event = asyncio.Event() 

74 self._suspend_event.set() # 초기에는 실행 가능 

75 

76 # 수집 상태 

77 self._is_collecting: bool = False 

78 self._last_collected_date: Optional[str] = None 

79 self._progress: Dict = { 

80 "running": False, 

81 "processed": 0, 

82 "total": 0, 

83 "updated": 0, 

84 "skipped": 0, 

85 "elapsed": 0.0, 

86 } 

87 

88 # ── SchedulableTask 인터페이스 구현 ──────────────────────── 

89 

90 @property 

91 def task_name(self) -> str: 

92 return "ohlcv_update" 

93 

94 @property 

95 def _scheduler_label(self) -> str: 

96 return "OhlcvUpdate" 

97 

98 async def start(self) -> None: 

99 """장마감 후 자동 스케줄러 시작.""" 

100 if self._state == TaskState.RUNNING: 

101 return 

102 self._state = TaskState.RUNNING 

103 self._suspend_event.set() 

104 

105 self._tasks.append( 

106 asyncio.create_task(self._after_market_scheduler()) 

107 ) 

108 self._logger.info(f"OhlcvUpdateTask 시작: {len(self._tasks)}개 태스크") 

109 

110 async def suspend(self) -> None: 

111 """수집을 일시 중지한다 (chunk 사이에서 대기).""" 

112 if self._state == TaskState.RUNNING: 

113 self._suspend_event.clear() 

114 self._state = TaskState.SUSPENDED 

115 self._logger.info("OhlcvUpdateTask 일시 중지") 

116 

117 async def resume(self) -> None: 

118 """일시 중지된 수집을 재개한다.""" 

119 if self._state == TaskState.SUSPENDED: 

120 self._suspend_event.set() 

121 self._state = TaskState.RUNNING 

122 self._logger.info("OhlcvUpdateTask 재개") 

123 

124 async def _on_market_closed(self, latest_trading_date: str) -> None: 

125 """장 마감 후 콜백: 해당 거래일의 수집이 필요하면 실행.""" 

126 if self._last_collected_date != latest_trading_date: 

127 await self._collect_all_ohlcv() 

128 

129 async def force_collect(self) -> None: 

130 """강제 전체 수집: skip 조건을 무시하고 모든 종목을 API 재호출한다. 

131 

132 - 최초 설치(로컬 DB 없음) 또는 다른 머신으로 이전 시 전체 백필 보장 

133 - 중간 날짜 누락 등 데이터 정합성이 의심될 때 사용 

134 """ 

135 self._logger.info("OhlcvUpdateTask 강제 수집 요청") 

136 await self._collect_all_ohlcv(force=True) 

137 

138 # ── 전체 종목 OHLCV 수집 ──────────────────────────────── 

139 

140 async def _collect_all_ohlcv(self, force: bool = False) -> None: 

141 """전체 종목 OHLCV를 수집하여 DB에 저장한다. 

142 

143 Args: 

144 force: True이면 skip 조건(count/latest_date)을 무시하고 전 종목 API 재호출. 

145 """ 

146 if not force and self._mcs and await self._mcs.is_market_open_now(): 

147 self._logger.info("장 운영 중이므로 OHLCV 수집을 건너뜁니다.") 

148 return 

149 

150 if self._is_collecting: 

151 self._logger.info("OHLCV 수집 이미 진행 중 — 스킵") 

152 return 

153 

154 t_start_total = self._pm.start_timer() 

155 self._is_collecting = True 

156 start_time = time.time() 

157 

158 try: 

159 # 기준일 확인 

160 target_date = None 

161 if self._mcs: 161 ↛ 164line 161 didn't jump to line 164 because the condition on line 161 was always true

162 target_date = await self._mcs.get_latest_trading_date() 

163 

164 if not target_date: 

165 self._logger.error("최근 거래일을 확인할 수 없어 OHLCV 수집을 중단합니다.") 

166 return 

167 

168 if not force and self._last_collected_date == target_date: 

169 self._logger.info(f"이미 {target_date} OHLCV 수집 완료 — 스킵") 

170 return 

171 

172 self._logger.info(f"전체 종목 OHLCV 수집 시작 (기준일: {target_date})") 

173 self._progress = { 

174 "running": True, "force": force, "processed": 0, "total": 0, 

175 "updated": 0, "skipped": 0, "elapsed": 0.0, 

176 } 

177 all_stocks = self._load_all_stocks() 

178 total = len(all_stocks) 

179 self._progress["total"] = total 

180 self._logger.info(f"OHLCV 수집: 전체 {total}개 종목 순회 시작") 

181 

182 processed = 0 

183 updated = 0 

184 skipped = 0 

185 

186 for chunk in _chunked(all_stocks, self.API_CHUNK_SIZE): 

187 # suspend 체크포인트 

188 await self._suspend_event.wait() 

189 

190 # 병렬 처리 

191 tasks = [ 

192 self._update_stock_ohlcv(code, target_date, force=force) 

193 for code, _, _ in chunk 

194 ] 

195 results = await asyncio.gather(*tasks, return_exceptions=True) 

196 

197 chunk_had_api_call = False 

198 for result in results: 

199 if result is True: 

200 updated += 1 

201 chunk_had_api_call = True 

202 elif result is False: 

203 skipped += 1 

204 # None은 오류 — 카운트 제외 

205 

206 processed += len(chunk) 

207 elapsed = time.time() - start_time 

208 self._progress.update({ 

209 "processed": processed, 

210 "updated": updated, 

211 "skipped": skipped, 

212 "elapsed": round(elapsed, 1), 

213 }) 

214 

215 if processed % 100 == 0 or processed >= total: 

216 self._logger.info( 

217 f"OHLCV 수집 진행: {processed}/{total} " 

218 f"({processed / total * 100:.1f}%) " 

219 f"| 갱신: {updated} | 스킵: {skipped} | 소요: {elapsed:.1f}s" 

220 ) 

221 

222 # API 호출이 있었던 청크만 rate limit 대기 

223 if chunk_had_api_call: 

224 await asyncio.sleep(self.CHUNK_SLEEP_SEC) 

225 else: 

226 await asyncio.sleep(0) # 이벤트 루프 양보만 

227 

228 # 완료 처리 

229 self._last_collected_date = target_date 

230 elapsed = time.time() - start_time 

231 self._logger.info( 

232 f"전체 종목 OHLCV 수집 완료: 갱신 {updated}개 / 스킵 {skipped}개, " 

233 f"소요: {elapsed:.1f}s" 

234 ) 

235 self._pm.log_timer( 

236 "OhlcvUpdateTask._collect_all_ohlcv", 

237 t_start_total, threshold=10.0, 

238 ) 

239 if self._ns: 

240 await self._ns.emit( 

241 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전체 종목 OHLCV 수집 완료", 

242 f"갱신 {updated}개, 소요: {elapsed:.1f}초", 

243 ) 

244 

245 except Exception as e: 

246 self._logger.error(f"OHLCV 수집 실패: {e}", exc_info=True) 

247 if self._ns: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 await self._ns.emit(NotificationCategory.BACKGROUND, NotificationLevel.ERROR, "OHLCV 수집 실패", str(e)) 

249 finally: 

250 self._is_collecting = False 

251 self._progress["running"] = False 

252 

253 # ── 내부 헬퍼 ───────────────────────────────────────── 

254 

255 async def _update_stock_ohlcv( 

256 self, code: str, target_date: str, force: bool = False, 

257 ) -> Optional[bool]: 

258 """단일 종목 OHLCV를 필요 시에만 API 호출하여 업데이트한다. 

259 

260 DB 상태를 먼저 조회하여: 

261 - 당일 데이터가 이미 존재하면 → 스킵 (False) 

262 (역사 데이터는 최초 실행 시 full backfill로 채워지며, 이후에는 force collect로 복구) 

263 - 당일 데이터 없으면 → get_ohlcv() 호출 후 DB 저장 (True) 

264 

265 Args: 

266 force: True이면 skip 조건을 무시하고 무조건 API 호출. 

267 

268 Returns: 

269 True - API 호출 후 갱신 성공 

270 False - 이미 최신 상태여서 스킵 

271 None - 오류 발생 

272 """ 

273 try: 

274 if not force: 

275 summary = await self._stock_repo.get_ohlcv_summary(code) 

276 latest_date = summary["latest_date"] 

277 

278 # 당일 캔들이 이미 존재하면 API 불필요 

279 if latest_date == target_date: 

280 return False 

281 

282 # get_ohlcv: DB에 없는 구간은 자동으로 API 조회 후 StockRepository에 upsert 

283 resp = await self._stock_query_service.get_ohlcv(code, caller="OhlcvUpdateTask") 

284 if resp and resp.rt_cd == ErrorCode.SUCCESS.value: 

285 return True 

286 return None 

287 

288 except asyncio.CancelledError: 

289 raise 

290 except Exception as e: 

291 self._logger.warning(f"OHLCV 업데이트 실패 ({code}): {e}") 

292 return None 

293 

294 def _load_all_stocks(self) -> List[tuple]: 

295 """StockCodeRepository에서 KOSPI/KOSDAQ 전체 종목 로드 (ETF/우선주 제외).""" 

296 all_stocks = [] 

297 for _, row in self.stock_code_repository.df.iterrows(): 

298 code = row.get("종목코드", "") 

299 name = row.get("종목명", "") 

300 market = row.get("시장구분", "") 

301 

302 if not code: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 continue 

304 if any(name.startswith(p) for p in _ETF_PREFIXES): 

305 continue 

306 if code[-1] != '0': 

307 continue 

308 if "스팩" in name: 

309 continue 

310 if market in ("KOSPI", "KOSDAQ"): 

311 all_stocks.append((code, name, market)) 

312 return all_stocks 

313 

314 def get_progress(self) -> Dict: 

315 """수집 진행률 반환.""" 

316 return dict(self._progress)