Coverage for task / background / after_market / ohlcv_update_task.py: 96%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# task/background/ohlcv_update_task.py
2"""
3장 마감 후 전체 종목의 OHLCV를 DB에 저장하는 백그라운드 태스크.
4- 당일 OHLCV 및 전략에 필요한 최대 600일치 역사 데이터를 유지한다.
5- DB에 이미 존재하는 날짜는 API를 호출하지 않아 불필요한 중복 요청을 방지한다.
6"""
7import asyncio
8import logging
9import time
10from typing import List, Dict, Optional, TYPE_CHECKING
12from common.types import ErrorCode
13from core.performance_profiler import PerformanceProfiler
14from core.market_clock import MarketClock
15from task.background.after_market.after_market_task_base import AfterMarketTask
16from interfaces.schedulable_task import TaskState
17from repositories.stock_repository import StockRepository
18from repositories.stock_code_repository import StockCodeRepository
19from services.market_calendar_service import MarketCalendarService
20from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
22if TYPE_CHECKING: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true
23 from services.stock_query_service import StockQueryService
26def _chunked(lst, size):
27 for i in range(0, len(lst), size):
28 yield lst[i:i + size]
31# ETF/ETN 브랜드명 접두사 (DailyPriceCollectorTask와 동일)
32_ETF_PREFIXES = (
33 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE",
34 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS",
35 "VITA", "TREX", "MASTER", "WOORI", "KINDEX",
36)
39class OhlcvUpdateTask(AfterMarketTask):
40 """장 마감 후 전체 종목의 OHLCV를 수집하여 DB에 저장하는 백그라운드 태스크.
42 - DB에 이미 TARGET_OHLCV_DAYS일치 데이터가 있고 당일 날짜까지 갱신된 종목은 스킵.
43 - 데이터가 부족하거나 당일 캔들이 없는 종목만 API를 호출하여 저장.
44 - StockQueryService.get_ohlcv()가 내부적으로 누락 구간만 API 호출 후 DB에 upsert하므로
45 중복된 날짜는 자동으로 INSERT OR REPLACE 처리된다.
46 """
48 TARGET_OHLCV_DAYS = 600 # 전략에서 최대 600일치를 사용하므로 동일하게 유지
49 API_CHUNK_SIZE = 4 # 병렬 처리 종목 수 (OHLCV는 현재가보다 API 비용이 높음)
50 CHUNK_SLEEP_SEC = 1.5 # 청크 간 대기 시간 (API 레이트 리밋 준수)
52 def __init__(
53 self,
54 stock_query_service: "StockQueryService",
55 stock_code_repository: StockCodeRepository,
56 stock_repo: StockRepository,
57 market_calendar_service: Optional[MarketCalendarService] = None,
58 market_clock: Optional[MarketClock] = None,
59 performance_profiler: Optional[PerformanceProfiler] = None,
60 notification_service: Optional[NotificationService] = None,
61 logger=None,
62 ):
63 super().__init__(
64 mcs=market_calendar_service,
65 market_clock=market_clock,
66 logger=logger or logging.getLogger(__name__),
67 )
68 self._stock_query_service = stock_query_service
69 self.stock_code_repository = stock_code_repository
70 self._stock_repo = stock_repo
71 self._pm = performance_profiler or PerformanceProfiler(enabled=False)
72 self._ns = notification_service
73 self._suspend_event: asyncio.Event = asyncio.Event()
74 self._suspend_event.set() # 초기에는 실행 가능
76 # 수집 상태
77 self._is_collecting: bool = False
78 self._last_collected_date: Optional[str] = None
79 self._progress: Dict = {
80 "running": False,
81 "processed": 0,
82 "total": 0,
83 "updated": 0,
84 "skipped": 0,
85 "elapsed": 0.0,
86 }
88 # ── SchedulableTask 인터페이스 구현 ────────────────────────
90 @property
91 def task_name(self) -> str:
92 return "ohlcv_update"
94 @property
95 def _scheduler_label(self) -> str:
96 return "OhlcvUpdate"
98 async def start(self) -> None:
99 """장마감 후 자동 스케줄러 시작."""
100 if self._state == TaskState.RUNNING:
101 return
102 self._state = TaskState.RUNNING
103 self._suspend_event.set()
105 self._tasks.append(
106 asyncio.create_task(self._after_market_scheduler())
107 )
108 self._logger.info(f"OhlcvUpdateTask 시작: {len(self._tasks)}개 태스크")
110 async def suspend(self) -> None:
111 """수집을 일시 중지한다 (chunk 사이에서 대기)."""
112 if self._state == TaskState.RUNNING:
113 self._suspend_event.clear()
114 self._state = TaskState.SUSPENDED
115 self._logger.info("OhlcvUpdateTask 일시 중지")
117 async def resume(self) -> None:
118 """일시 중지된 수집을 재개한다."""
119 if self._state == TaskState.SUSPENDED:
120 self._suspend_event.set()
121 self._state = TaskState.RUNNING
122 self._logger.info("OhlcvUpdateTask 재개")
124 async def _on_market_closed(self, latest_trading_date: str) -> None:
125 """장 마감 후 콜백: 해당 거래일의 수집이 필요하면 실행."""
126 if self._last_collected_date != latest_trading_date:
127 await self._collect_all_ohlcv()
129 async def force_collect(self) -> None:
130 """강제 전체 수집: skip 조건을 무시하고 모든 종목을 API 재호출한다.
132 - 최초 설치(로컬 DB 없음) 또는 다른 머신으로 이전 시 전체 백필 보장
133 - 중간 날짜 누락 등 데이터 정합성이 의심될 때 사용
134 """
135 self._logger.info("OhlcvUpdateTask 강제 수집 요청")
136 await self._collect_all_ohlcv(force=True)
138 # ── 전체 종목 OHLCV 수집 ────────────────────────────────
140 async def _collect_all_ohlcv(self, force: bool = False) -> None:
141 """전체 종목 OHLCV를 수집하여 DB에 저장한다.
143 Args:
144 force: True이면 skip 조건(count/latest_date)을 무시하고 전 종목 API 재호출.
145 """
146 if not force and self._mcs and await self._mcs.is_market_open_now():
147 self._logger.info("장 운영 중이므로 OHLCV 수집을 건너뜁니다.")
148 return
150 if self._is_collecting:
151 self._logger.info("OHLCV 수집 이미 진행 중 — 스킵")
152 return
154 t_start_total = self._pm.start_timer()
155 self._is_collecting = True
156 start_time = time.time()
158 try:
159 # 기준일 확인
160 target_date = None
161 if self._mcs: 161 ↛ 164line 161 didn't jump to line 164 because the condition on line 161 was always true
162 target_date = await self._mcs.get_latest_trading_date()
164 if not target_date:
165 self._logger.error("최근 거래일을 확인할 수 없어 OHLCV 수집을 중단합니다.")
166 return
168 if not force and self._last_collected_date == target_date:
169 self._logger.info(f"이미 {target_date} OHLCV 수집 완료 — 스킵")
170 return
172 self._logger.info(f"전체 종목 OHLCV 수집 시작 (기준일: {target_date})")
173 self._progress = {
174 "running": True, "force": force, "processed": 0, "total": 0,
175 "updated": 0, "skipped": 0, "elapsed": 0.0,
176 }
177 all_stocks = self._load_all_stocks()
178 total = len(all_stocks)
179 self._progress["total"] = total
180 self._logger.info(f"OHLCV 수집: 전체 {total}개 종목 순회 시작")
182 processed = 0
183 updated = 0
184 skipped = 0
186 for chunk in _chunked(all_stocks, self.API_CHUNK_SIZE):
187 # suspend 체크포인트
188 await self._suspend_event.wait()
190 # 병렬 처리
191 tasks = [
192 self._update_stock_ohlcv(code, target_date, force=force)
193 for code, _, _ in chunk
194 ]
195 results = await asyncio.gather(*tasks, return_exceptions=True)
197 chunk_had_api_call = False
198 for result in results:
199 if result is True:
200 updated += 1
201 chunk_had_api_call = True
202 elif result is False:
203 skipped += 1
204 # None은 오류 — 카운트 제외
206 processed += len(chunk)
207 elapsed = time.time() - start_time
208 self._progress.update({
209 "processed": processed,
210 "updated": updated,
211 "skipped": skipped,
212 "elapsed": round(elapsed, 1),
213 })
215 if processed % 100 == 0 or processed >= total:
216 self._logger.info(
217 f"OHLCV 수집 진행: {processed}/{total} "
218 f"({processed / total * 100:.1f}%) "
219 f"| 갱신: {updated} | 스킵: {skipped} | 소요: {elapsed:.1f}s"
220 )
222 # API 호출이 있었던 청크만 rate limit 대기
223 if chunk_had_api_call:
224 await asyncio.sleep(self.CHUNK_SLEEP_SEC)
225 else:
226 await asyncio.sleep(0) # 이벤트 루프 양보만
228 # 완료 처리
229 self._last_collected_date = target_date
230 elapsed = time.time() - start_time
231 self._logger.info(
232 f"전체 종목 OHLCV 수집 완료: 갱신 {updated}개 / 스킵 {skipped}개, "
233 f"소요: {elapsed:.1f}s"
234 )
235 self._pm.log_timer(
236 "OhlcvUpdateTask._collect_all_ohlcv",
237 t_start_total, threshold=10.0,
238 )
239 if self._ns:
240 await self._ns.emit(
241 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전체 종목 OHLCV 수집 완료",
242 f"갱신 {updated}개, 소요: {elapsed:.1f}초",
243 )
245 except Exception as e:
246 self._logger.error(f"OHLCV 수집 실패: {e}", exc_info=True)
247 if self._ns: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 await self._ns.emit(NotificationCategory.BACKGROUND, NotificationLevel.ERROR, "OHLCV 수집 실패", str(e))
249 finally:
250 self._is_collecting = False
251 self._progress["running"] = False
253 # ── 내부 헬퍼 ─────────────────────────────────────────
255 async def _update_stock_ohlcv(
256 self, code: str, target_date: str, force: bool = False,
257 ) -> Optional[bool]:
258 """단일 종목 OHLCV를 필요 시에만 API 호출하여 업데이트한다.
260 DB 상태를 먼저 조회하여:
261 - 당일 데이터가 이미 존재하면 → 스킵 (False)
262 (역사 데이터는 최초 실행 시 full backfill로 채워지며, 이후에는 force collect로 복구)
263 - 당일 데이터 없으면 → get_ohlcv() 호출 후 DB 저장 (True)
265 Args:
266 force: True이면 skip 조건을 무시하고 무조건 API 호출.
268 Returns:
269 True - API 호출 후 갱신 성공
270 False - 이미 최신 상태여서 스킵
271 None - 오류 발생
272 """
273 try:
274 if not force:
275 summary = await self._stock_repo.get_ohlcv_summary(code)
276 latest_date = summary["latest_date"]
278 # 당일 캔들이 이미 존재하면 API 불필요
279 if latest_date == target_date:
280 return False
282 # get_ohlcv: DB에 없는 구간은 자동으로 API 조회 후 StockRepository에 upsert
283 resp = await self._stock_query_service.get_ohlcv(code, caller="OhlcvUpdateTask")
284 if resp and resp.rt_cd == ErrorCode.SUCCESS.value:
285 return True
286 return None
288 except asyncio.CancelledError:
289 raise
290 except Exception as e:
291 self._logger.warning(f"OHLCV 업데이트 실패 ({code}): {e}")
292 return None
294 def _load_all_stocks(self) -> List[tuple]:
295 """StockCodeRepository에서 KOSPI/KOSDAQ 전체 종목 로드 (ETF/우선주 제외)."""
296 all_stocks = []
297 for _, row in self.stock_code_repository.df.iterrows():
298 code = row.get("종목코드", "")
299 name = row.get("종목명", "")
300 market = row.get("시장구분", "")
302 if not code: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 continue
304 if any(name.startswith(p) for p in _ETF_PREFIXES):
305 continue
306 if code[-1] != '0':
307 continue
308 if "스팩" in name:
309 continue
310 if market in ("KOSPI", "KOSDAQ"):
311 all_stocks.append((code, name, market))
312 return all_stocks
314 def get_progress(self) -> Dict:
315 """수집 진행률 반환."""
316 return dict(self._progress)