Coverage for task / background / after_market / daily_price_collector_task.py: 71%
315 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# task/background/daily_price_collector_task.py
2"""
3장 마감 후 전체 종목 현재가+펀더멘털을 수집하여 StockRepository에 저장하는 백그라운드 태스크.
4get_current_price API를 사용하여 종목별 50+ 필드(시가/고가/저가/현재가/PER/PBR 등)를 수집한다.
5"""
6import asyncio
7import logging
8import time
9import pandas as pd
10import FinanceDataReader as fdr
12from typing import Dict, List, Optional, TYPE_CHECKING
13from common.types import ErrorCode
14from core.performance_profiler import PerformanceProfiler
15from core.market_clock import MarketClock
16from task.background.after_market.after_market_task_base import AfterMarketTask
17from interfaces.schedulable_task import TaskState
18from repositories.stock_repository import StockRepository
19from repositories.stock_code_repository import StockCodeRepository
20from services.market_calendar_service import MarketCalendarService
21from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true
24 from services.stock_query_service import StockQueryService
27def _chunked(lst, size):
28 for i in range(0, len(lst), size):
29 yield lst[i:i + size]
32# ETF/ETN 브랜드명 접두사 (OhlcvUpdateTask와 동일)
33_ETF_PREFIXES = (
34 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE",
35 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS",
36 "VITA", "TREX", "MASTER", "WOORI", "KINDEX",
37)
40class DailyPriceCollectorTask(AfterMarketTask):
41 """장 마감 후 전체 종목 현재가+펀더멘털을 수집하여 StockRepository에 저장하는 백그라운드 태스크."""
43 API_CHUNK_SIZE = 8
44 CHUNK_SLEEP_SEC = 1.1
45 DB_UPSERT_BATCH_SIZE = 500
46 # 검증의 견고성을 위해 시장 대표성을 띄는 다수 종목(삼성전자, SK하이닉스, NAVER, 현대차, 셀트리온) 지정
47 CANARY_STOCKS = ["005930", "000660", "035420", "005380", "068270"]
49 def __init__(
50 self,
51 stock_query_service: "StockQueryService",
52 stock_code_repository: StockCodeRepository,
53 stock_repo: StockRepository,
54 market_calendar_service: Optional[MarketCalendarService] = None,
55 market_clock: Optional[MarketClock] = None,
56 performance_profiler: Optional[PerformanceProfiler] = None,
57 notification_service: Optional[NotificationService] = None,
58 logger=None,
59 ):
60 super().__init__(
61 mcs=market_calendar_service,
62 market_clock=market_clock,
63 logger=logger or logging.getLogger(__name__),
64 )
65 self._stock_query_service = stock_query_service
66 self.stock_code_repository = stock_code_repository
67 self._stock_repo = stock_repo
68 self._pm = performance_profiler or PerformanceProfiler(enabled=False)
69 self._ns = notification_service
70 self._suspend_event: asyncio.Event = asyncio.Event()
71 self._suspend_event.set() # 초기에는 실행 가능
73 # 수집 상태
74 self._is_collecting: bool = False
75 self._last_collected_date: Optional[str] = None
76 self._progress: Dict = {
77 "running": False,
78 "processed": 0,
79 "total": 0,
80 "collected": 0,
81 "elapsed": 0.0,
82 "status": "",
83 }
84 self._all_stocks_cache = None
86 # ── SchedulableTask 인터페이스 구현 ────────────────────────
88 @property
89 def task_name(self) -> str:
90 return "daily_price_collector"
92 @property
93 def _scheduler_label(self) -> str:
94 return "DailyPriceCollector"
96 async def start(self) -> None:
97 """장마감 후 자동 스케줄러 시작."""
98 if self._state == TaskState.RUNNING:
99 return
100 self._state = TaskState.RUNNING
101 self._suspend_event.set()
103 self._tasks.append(
104 asyncio.create_task(self._after_market_scheduler())
105 )
106 self._logger.info(f"DailyPriceCollectorTask 시작: {len(self._tasks)}개 태스크")
108 async def suspend(self) -> None:
109 """수집을 일시 중지한다 (chunk 사이에서 대기)."""
110 if self._state == TaskState.RUNNING:
111 self._suspend_event.clear()
112 self._state = TaskState.SUSPENDED
113 self._logger.info("DailyPriceCollectorTask 일시 중지")
115 async def resume(self) -> None:
116 """일시 중지된 수집을 재개한다."""
117 if self._state == TaskState.SUSPENDED:
118 self._suspend_event.set()
119 self._state = TaskState.RUNNING
120 self._logger.info("DailyPriceCollectorTask 재개")
122 async def _on_market_closed(self, latest_trading_date: str) -> None:
123 """장 마감 후 콜백: 해당 거래일의 수집이 필요하면 실행."""
124 if self._last_collected_date != latest_trading_date:
125 await self._collect_all_prices()
127 # ── 전체 종목 현재가 수집 ────────────────────────────
128 async def _collect_all_prices(self, force: bool = False) -> None:
129 """전체 종목 현재가+펀더멘털을 3-Tier Fallback 구조로 수집한다."""
130 if self._mcs and await self._mcs.is_market_open_now():
131 self._logger.info("장 운영 중이므로 현재가 수집을 건너뜁니다.")
132 return
134 if self._is_collecting: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 self._logger.info("현재가 수집 이미 진행 중 — 스킵")
136 return
138 target_date = await self._mcs.get_latest_trading_date() if self._mcs else None
139 if not target_date: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 self._logger.error("최근 거래일을 확인할 수 없어 현재가 수집을 중단합니다.")
141 return
143 if not force and self._last_collected_date == target_date:
144 self._logger.info(f"이미 {target_date} 현재가 수집 완료 — 스킵")
145 return
147 self._logger.info(f"전체 종목 수집 파이프라인 시작 (기준일: {target_date})")
148 self._is_collecting = True
149 start_time = time.time()
151 # 반복 조회를 피하기 위해 한 번 로드 후 캐싱
152 self._all_stocks_cache = self._load_all_stocks()
154 try:
155 # [Tier 1] pykrx 실패/검증 실패 시 FinanceDataReader 시도
156 if await self._try_collect_via_fdr(target_date, start_time):
157 await self._finish_collection(target_date, start_time, "FDR")
158 return
160 # [Tier 2] 모두 실패 시 최후의 보루 증권사 API 청크 수집
161 self._logger.warning("크롤링 모두 실패. 증권사 API(Chunk) 수집으로 Fallback 합니다.")
162 if self._ns: 162 ↛ 167line 162 didn't jump to line 167 because the condition on line 162 was always true
163 await self._ns.emit(
164 NotificationCategory.BACKGROUND, NotificationLevel.WARNING,
165 "수집 모드 전환", "크롤링 라이브러리 오류로 인해 증권사 API 일일이 수집 모드(약 10분 소요)로 동작합니다."
166 )
167 await self._collect_via_broker_api(target_date, start_time)
168 await self._finish_collection(target_date, start_time, "Broker API")
170 except Exception as e:
171 self._logger.error(f"전체 수집 파이프라인 실패: {e}", exc_info=True)
172 finally:
173 self._is_collecting = False
174 self._all_stocks_cache = None
176 # ── 2. 데이터 검증 (Sanity Check) ─────────────────────────────
178 async def _verify_crawler_data(self, df_crawled: pd.DataFrame, source_name: str) -> bool:
179 """
180 증권사 API의 확정 데이터(시가/고가/저가/종가)와 크롤링 데이터가 일치하는지 완벽 검증한다.
181 """
182 self._logger.info(f"[{source_name}] 데이터 정합성 검증(OHLC 4종) 시작...")
184 match_count = 0
185 mismatch_count = 0
187 for code in self.CANARY_STOCKS:
188 # 1. 증권사 API에서 Source of Truth 호출
189 api_resp = await self._fetch_with_retry(code)
190 if not api_resp or api_resp.rt_cd != ErrorCode.SUCCESS.value:
191 self._logger.debug(f"검증용 API 호출 실패({code}) - 스킵")
192 continue
194 # API 데이터 추출 (output 뎁스 고려)
195 data = api_resp.data
196 output = data.get('output') if isinstance(data, dict) else data
198 # API 속성 추출 헬퍼
199 def _get_api_val(key):
200 val = output.get(key, 0) if isinstance(output, dict) else getattr(output, key, 0)
201 return int(val) if val else 0
203 # API: 종가(stck_prpr), 시가(stck_oprc), 고가(stck_hgpr), 저가(stck_lwpr)
204 api_close = _get_api_val('stck_prpr')
205 api_open = _get_api_val('stck_oprc')
206 api_high = _get_api_val('stck_hgpr')
207 api_low = _get_api_val('stck_lwpr')
209 # 2. 크롤링 데이터(DataFrame)에서 추출
210 try:
211 # 인덱스가 종목코드인지, 컬럼에 종목코드가 있는지 확인
212 if code in df_crawled.index: 212 ↛ 215line 212 didn't jump to line 215 because the condition on line 212 was always true
213 row = df_crawled.loc[code]
214 else:
215 matches = df_crawled[df_crawled['종목코드'] == code]
216 if matches.empty:
217 self._logger.debug(f"크롤링 데이터에 검증용 종목({code}) 없음 - 스킵")
218 continue
219 row = matches.iloc[0]
221 # 크롤링 데이터 추출 헬퍼 (pykrx, FDR 호환)
222 def _get_crawled_val(cols):
223 for col in cols: 223 ↛ 226line 223 didn't jump to line 226 because the loop on line 223 didn't complete
224 if col in row.index and pd.notna(row[col]): 224 ↛ 223line 224 didn't jump to line 223 because the condition on line 224 was always true
225 return int(row[col])
226 return 0
228 crawled_close = _get_crawled_val(['종가', 'Close'])
229 crawled_open = _get_crawled_val(['시가', 'Open'])
230 crawled_high = _get_crawled_val(['고가', 'High'])
231 crawled_low = _get_crawled_val(['저가', 'Low'])
233 except Exception as e:
234 self._logger.debug(f"크롤링 데이터 파싱 예외({code}): {e} - 스킵")
235 continue
237 # 3. 시/고/저/종 4개 값 모두 대조 (단 하나라도 다르면 실패)
238 if (api_close != crawled_close or
239 api_open != crawled_open or
240 api_high != crawled_high or
241 api_low != crawled_low):
243 self._logger.warning(
244 f"데이터 불일치 감지! ({code})\n"
245 f" - API : 시({api_open}) 고({api_high}) 저({api_low}) 종({api_close})\n"
246 f" - {source_name.ljust(6)}: 시({crawled_open}) 고({crawled_high}) 저({crawled_low}) 종({crawled_close})"
247 )
248 mismatch_count += 1
249 else:
250 match_count += 1
252 if match_count == 0:
253 self._logger.warning(f"[{source_name}] 검증 가능한 종목이 없어 실패 처리합니다.")
254 return False
256 # 2개 이상 불일치 시에만 Fallback (단일 종목 거래 정지 등 예외 대응)
257 if mismatch_count >= 2: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 self._logger.warning(f"[{source_name}] 데이터 불일치 종목 다수({mismatch_count}개) 발생. 검증 실패.")
259 return False
261 self._logger.info(f"[{source_name}] 데이터 정합성 검증 통과 (일치: {match_count}, 불일치: {mismatch_count})")
262 return True
264 # ── 3. 수집 티어 구현 ─────────────────────────────────────────
265 async def _try_collect_via_fdr(self, target_date: str, start_time: float) -> bool:
266 """[Tier 2] FinanceDataReader를 활용한 수집 (차선책)"""
267 self._progress["status"] = "FinanceDataReader 일괄 수집 중..."
269 def _fetch_fdr_sync():
270 # FDR은 당일 시세(OHLCV) 전체 리스트를 가져오는 기능을 지원
271 df_fdr = fdr.StockListing('KRX')
272 if df_fdr.empty:
273 raise ValueError("FDR 데이터가 비어있습니다.")
275 # FDR의 경우 'Close' 컬럼을 '종가'로 맞추어 _verify_crawler_data와 호환되게 함
276 df_fdr.rename(columns={'Code': '종목코드', 'Close': '종가'}, inplace=True)
277 return df_fdr
279 try:
280 df_fdr = await asyncio.to_thread(_fetch_fdr_sync)
282 if not await self._verify_crawler_data(df_fdr, "FDR"): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 return False
285 formatted_records = self._format_dataframe_to_records(df_fdr)
286 await self._save_bulk_to_db_with_progress(target_date, formatted_records, start_time)
287 return True
288 except Exception as e:
289 self._logger.warning(f"FDR 수집 중 예외 발생 (Fallback 시도): {e}")
290 return False
292 async def _collect_via_broker_api(self, target_date: str, start_time: float) -> None:
293 """[Tier 2] 증권사 API 청크 기반 수집 로직 (약 10분 소요)"""
294 all_stocks = getattr(self, "_all_stocks_cache", None) or self._load_all_stocks()
295 total = len(all_stocks)
297 # 메인 오케스트레이터에서 넘겨받은 progress 정보를 API 수집 모드에 맞게 갱신
298 self._progress["total"] = total
299 self._progress["status"] = "증권사 API 수집 중 (Fallback)..."
301 collected_records: List[Dict] = []
302 db_upsert_buffer: List[Dict] = []
303 processed = 0
305 for chunk in _chunked(all_stocks, self.API_CHUNK_SIZE):
306 # 일시정지(suspend) 체크
307 await self._suspend_event.wait()
309 # 1. 8개씩 병렬 API 호출
310 tasks = [self._fetch_with_retry(code) for code, _, _ in chunk]
311 responses = await asyncio.gather(*tasks, return_exceptions=True)
313 # 2. 응답 데이터 추출 및 버퍼에 담기
314 batch_records = []
315 for (code, name, market), resp in zip(chunk, responses):
316 if isinstance(resp, Exception): 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 continue
318 record = self._extract_broker_api_record(code, name, market, resp)
319 if record: 319 ↛ 315line 319 didn't jump to line 315 because the condition on line 319 was always true
320 batch_records.append(record)
322 if batch_records: 322 ↛ 327line 322 didn't jump to line 327 because the condition on line 322 was always true
323 db_upsert_buffer.extend(batch_records)
324 collected_records.extend(batch_records)
326 # 3. DB 배치 저장 (500개 도달 시)
327 if len(db_upsert_buffer) >= self.DB_UPSERT_BATCH_SIZE: 327 ↛ 334line 327 didn't jump to line 334 because the condition on line 327 was always true
328 await self._stock_repo.upsert_daily_snapshot(target_date, db_upsert_buffer)
329 db_upsert_buffer.clear()
331 # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
332 # 4. ★ 핵심: 매 청크(8개)마다 진행률 즉시 업데이트
333 # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
334 processed += len(chunk)
335 elapsed = time.time() - start_time
336 self._progress.update({
337 "processed": processed,
338 "collected": len(collected_records),
339 "elapsed": round(elapsed, 1)
340 })
342 # 서버 로그용 출력 (50개 단위 또는 마지막)
343 if processed % 50 == 0 or processed >= total: 343 ↛ 351line 343 didn't jump to line 351 because the condition on line 343 was always true
344 self._logger.info(
345 f"[Broker API] 진행: {processed}/{total} "
346 f"({processed / total * 100:.1f}%) "
347 f"| 수집: {len(collected_records)} | 소요: {elapsed:.1f}s"
348 )
350 # API Rate Limit 회피용 Sleep (1.1초)
351 if not all(getattr(r, '_cache_hit', False) for r in responses if not isinstance(r, Exception)): 351 ↛ 305line 351 didn't jump to line 305 because the condition on line 351 was always true
352 await asyncio.sleep(self.CHUNK_SLEEP_SEC)
354 # 루프 종료 후 남은 버퍼 최종 저장
355 if db_upsert_buffer: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 await self._stock_repo.upsert_daily_snapshot(target_date, db_upsert_buffer)
358 # ── 4. 완료 처리 헬퍼 ─────────────────────────────────────────
360 async def _finish_collection(self, target_date: str, start_time: float, source: str) -> None:
361 """수집 완료 후 공통 후처리 로직"""
362 self._last_collected_date = target_date
363 elapsed = time.time() - start_time
364 self._logger.info(f"전체 종목 수집 완료 (Source: {source}), 소요: {elapsed:.1f}s")
365 if self._ns: 365 ↛ exitline 365 didn't return from function '_finish_collection' because the condition on line 365 was always true
366 await self._ns.emit(
367 NotificationCategory.BACKGROUND, NotificationLevel.INFO,
368 "전체 종목 현재가 수집 완료",
369 f"소스: {source} / 소요: {elapsed:.1f}초"
370 )
372 # # ── 내부 헬퍼 ─────────────────────────────────────────
374 async def _fetch_with_retry(self, code: str):
375 """get_current_price API 호출 + 재시도."""
376 max_retries = 3
377 delay = 1.0
378 for attempt in range(max_retries):
379 try:
380 resp = await self._stock_query_service.get_current_price(code, count_stats=False, caller="DailyPriceCollectorTask")
381 if resp and resp.rt_cd == ErrorCode.SUCCESS.value:
382 return resp
383 error_msg = resp.msg1 if resp else "응답 없음"
384 self._logger.warning(
385 f"현재가 조회 실패 (시도 {attempt + 1}/{max_retries}): "
386 f"{code}, 사유: {error_msg}"
387 )
388 except Exception as e:
389 self._logger.error(
390 f"현재가 조회 예외 (시도 {attempt + 1}/{max_retries}): "
391 f"{code}, 오류: {e}"
392 )
393 if attempt < max_retries - 1:
394 await asyncio.sleep(delay)
395 delay *= 1.5
396 return None
398 @staticmethod
399 def _extract_broker_api_record(
400 code: str, name: str, market: str, resp
401 ) -> Optional[Dict]:
402 """API 응답에서 DB 저장용 레코드를 추출한다."""
403 if not resp: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 return None
406 try:
407 data = resp.data
408 if not data: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 return None
411 # get_current_price의 응답 구조: data = {'output': ResStockFullInfoApiOutput}
412 output = data.get('output') if isinstance(data, dict) else data
413 if not output: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true
414 return None
416 def _safe_int(val, default=0):
417 try:
418 return int(val) if val else default
419 except (ValueError, TypeError):
420 return default
422 def _safe_float(val, default=0.0):
423 try:
424 return float(val) if val else default
425 except (ValueError, TypeError):
426 return default
428 # ResStockFullInfoApiOutput 필드 → DB 레코드 변환
429 # output이 Pydantic 모델이면 getattr, dict면 get
430 _get = (
431 (lambda k, d=None: getattr(output, k, d))
432 if hasattr(output, 'stck_prpr')
433 else (lambda k, d=None: output.get(k, d))
434 )
436 return {
437 "code": code,
438 "name": name,
439 "current_price": _safe_int(_get("stck_prpr")),
440 "open_price": _safe_int(_get("stck_oprc")),
441 "high_price": _safe_int(_get("stck_hgpr")),
442 "low_price": _safe_int(_get("stck_lwpr")),
443 "prev_close": _safe_int(_get("stck_sdpr")),
444 "change_price": _safe_int(_get("prdy_vrss")),
445 "change_sign": _get("prdy_vrss_sign", ""),
446 "change_rate": _get("prdy_ctrt", "0"),
447 "volume": _safe_int(_get("acml_vol")),
448 "trading_value": _safe_int(_get("acml_tr_pbmn")),
449 "market_cap": _safe_int(_get("hts_avls")),
450 "per": _safe_float(_get("per")),
451 "pbr": _safe_float(_get("pbr")),
452 "eps": _safe_float(_get("eps")),
453 "w52_high": _safe_int(_get("w52_hgpr")),
454 "w52_low": _safe_int(_get("w52_lwpr")),
455 "market": market,
456 }
457 except Exception:
458 return None
460 def _load_all_stocks(self) -> List[tuple]:
461 """StockCodeRepository에서 KOSPI/KOSDAQ 전체 종목 로드 (ETF/우선주 제외).
463 iterrows() 대신 벡터화 마스킹을 사용하여 수십 배 빠르게 필터링한다.
464 """
465 df = self.stock_code_repository.df
466 codes = df["종목코드"].astype(str)
467 names = df["종목명"].astype(str)
469 mask = (
470 codes.ne("")
471 & df["시장구분"].isin(("KOSPI", "KOSDAQ"))
472 & (codes.str[-1] == "0")
473 & ~names.str.startswith(_ETF_PREFIXES)
474 & ~names.str.contains("스팩", na=False)
475 )
476 filtered = df[mask]
477 return list(zip(filtered["종목코드"], filtered["종목명"], filtered["시장구분"]))
479 def get_progress(self) -> Dict:
480 """수집 진행률 반환."""
481 return dict(self._progress)
483 async def force_collect(self) -> None:
484 """강제 수집: skip 조건을 무시하고 전 종목 현재가를 API 재호출한다."""
485 self._logger.info("DailyPriceCollectorTask 강제 수집 요청")
486 await self._collect_all_prices(force=True)
488 def _format_dataframe_to_records(self, df: pd.DataFrame) -> List[Dict]:
489 """
490 pykrx 또는 FinanceDataReader에서 수집한 DataFrame을
491 기존 _extract_broker_api_record와 동일한 형태의 DB 레코드 딕셔너리 리스트로 변환한다.
492 """
493 records = []
494 if df is None or df.empty: 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true
495 return records
497 # 1. DB 기준 종목 메타데이터 (이름, 시장구분) 맵핑용 테이블 생성
498 # pykrx의 경우 종목명이나 시장구분 컬럼이 누락되어 있을 수 있으므로 자체 DB 기준으로 매핑합니다.
499 # 또한, 이 딕셔너리에 없는 종목(ETF, 스팩, 우선주 등)은 자연스럽게 필터링됩니다.
500 all_stocks = getattr(self, "_all_stocks_cache", None) or self._load_all_stocks()
501 stock_meta = {
502 code: {"name": name, "market": market}
503 for code, name, market in all_stocks
504 }
506 # 2. 컬럼명 유연성 확보를 위한 헬퍼 함수
507 def _get_val(row, possible_cols, default_val):
508 for col in possible_cols:
509 if col in row.index and pd.notna(row[col]):
510 return row[col]
511 return default_val
513 # 3. DataFrame 순회 및 레코드 추출
514 for _, row in df.iterrows():
515 # 종목코드 추출 (문자열 변환 및 6자리 패딩 보장)
516 raw_code = _get_val(row, ['종목코드', 'Code'], "")
517 code = str(raw_code).zfill(6)
519 if not code or code == "000000":
520 continue
522 # 자체 DB 필터링을 통과한 종목만 수집
523 meta = stock_meta.get(code)
524 if not meta:
525 continue
527 try:
528 # 숫자형 데이터 안전 추출 (pykrx와 FDR의 컬럼명 모두 지원)
529 current_price = int(_get_val(row, ['종가', 'Close'], 0))
530 open_price = int(_get_val(row, ['시가', 'Open'], 0))
531 high_price = int(_get_val(row, ['고가', 'High'], 0))
532 low_price = int(_get_val(row, ['저가', 'Low'], 0))
533 volume = int(_get_val(row, ['거래량', 'Volume'], 0))
534 trading_value = int(_get_val(row, ['거래대금', 'Amount'], 0))
535 market_cap = int(_get_val(row, ['시가총액', 'Marcap'], 0))
537 per = float(_get_val(row, ['PER'], 0.0))
538 pbr = float(_get_val(row, ['PBR'], 0.0))
539 eps = float(_get_val(row, ['EPS'], 0.0))
541 # 등락 데이터 추정 및 계산
542 change_price = int(_get_val(row, ['대비', 'Changes'], 0))
543 raw_change_rate = _get_val(row, ['등락률', 'ChagesRatio', 'ChangeRatio'], 0.0)
544 change_rate = str(round(float(raw_change_rate), 2))
546 # 전일 종가 계산 (현재가 - 대비)
547 prev_close = current_price - change_price
549 # 등락 부호 결정 (API 호환성: 2=상승, 3=보합, 5=하락)
550 if change_price > 0:
551 change_sign = "2"
552 elif change_price < 0: 552 ↛ 555line 552 didn't jump to line 555 because the condition on line 552 was always true
553 change_sign = "5"
554 else:
555 change_sign = "3"
557 record = {
558 "code": code,
559 "name": meta["name"],
560 "current_price": current_price,
561 "open_price": open_price,
562 "high_price": high_price,
563 "low_price": low_price,
564 "prev_close": prev_close,
565 "change_price": change_price,
566 "change_sign": change_sign,
567 "change_rate": change_rate,
568 "volume": volume,
569 "trading_value": trading_value,
570 "market_cap": market_cap,
571 "per": per,
572 "pbr": pbr,
573 "eps": eps,
574 # 일괄 수집 데이터에서는 52주 신고/신저가를 즉각 구하기 어려우므로 None 처리
575 # (DB Upsert 로직에서 NULL(None)이면 기존 값을 유지하도록 설계됨)
576 "w52_high": None,
577 "w52_low": None,
578 "market": meta["market"],
579 }
580 records.append(record)
582 except (ValueError, TypeError) as e:
583 self._logger.debug(f"데이터 파싱 오류 (종목: {code}): {e}")
584 continue
586 return records
588 async def _save_bulk_to_db_with_progress(self, target_date: str, records: List[Dict], start_time: float) -> None:
589 """크롤링된 전체 레코드를 DB에 Batch 단위로 저장하며 진행률을 업데이트한다."""
590 total_records = len(records)
591 if total_records == 0: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true
592 return
594 # 필터링 후의 실제 유효 레코드 수로 전체 모수(total) 보정
595 self._progress["total"] = total_records
596 processed = 0
598 for i in range(0, total_records, self.DB_UPSERT_BATCH_SIZE):
599 batch = records[i:i + self.DB_UPSERT_BATCH_SIZE]
601 # DB 저장
602 await self._stock_repo.upsert_daily_snapshot(target_date, batch)
604 # 진행률 업데이트
605 processed += len(batch)
606 elapsed = time.time() - start_time
607 self._progress.update({
608 "processed": processed,
609 "collected": processed,
610 "elapsed": round(elapsed, 1),
611 "status": "DB 저장 중..."
612 })
614 # 다른 비동기 태스크(API 응답 등)가 블로킹되지 않도록 제어권 양보
615 await asyncio.sleep(0.01)