Coverage for task / background / after_market / daily_price_collector_task.py: 71%

315 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/daily_price_collector_task.py 

2""" 

3장 마감 후 전체 종목 현재가+펀더멘털을 수집하여 StockRepository에 저장하는 백그라운드 태스크. 

4get_current_price API를 사용하여 종목별 50+ 필드(시가/고가/저가/현재가/PER/PBR 등)를 수집한다. 

5""" 

6import asyncio 

7import logging 

8import time 

9import pandas as pd 

10import FinanceDataReader as fdr 

11 

12from typing import Dict, List, Optional, TYPE_CHECKING 

13from common.types import ErrorCode 

14from core.performance_profiler import PerformanceProfiler 

15from core.market_clock import MarketClock 

16from task.background.after_market.after_market_task_base import AfterMarketTask 

17from interfaces.schedulable_task import TaskState 

18from repositories.stock_repository import StockRepository 

19from repositories.stock_code_repository import StockCodeRepository 

20from services.market_calendar_service import MarketCalendarService 

21from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

22 

23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true

24 from services.stock_query_service import StockQueryService 

25 

26 

27def _chunked(lst, size): 

28 for i in range(0, len(lst), size): 

29 yield lst[i:i + size] 

30 

31 

32# ETF/ETN 브랜드명 접두사 (OhlcvUpdateTask와 동일) 

33_ETF_PREFIXES = ( 

34 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE", 

35 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS", 

36 "VITA", "TREX", "MASTER", "WOORI", "KINDEX", 

37) 

38 

39 

40class DailyPriceCollectorTask(AfterMarketTask): 

41 """장 마감 후 전체 종목 현재가+펀더멘털을 수집하여 StockRepository에 저장하는 백그라운드 태스크.""" 

42 

43 API_CHUNK_SIZE = 8 

44 CHUNK_SLEEP_SEC = 1.1 

45 DB_UPSERT_BATCH_SIZE = 500 

46 # 검증의 견고성을 위해 시장 대표성을 띄는 다수 종목(삼성전자, SK하이닉스, NAVER, 현대차, 셀트리온) 지정 

47 CANARY_STOCKS = ["005930", "000660", "035420", "005380", "068270"] 

48 

49 def __init__( 

50 self, 

51 stock_query_service: "StockQueryService", 

52 stock_code_repository: StockCodeRepository, 

53 stock_repo: StockRepository, 

54 market_calendar_service: Optional[MarketCalendarService] = None, 

55 market_clock: Optional[MarketClock] = None, 

56 performance_profiler: Optional[PerformanceProfiler] = None, 

57 notification_service: Optional[NotificationService] = None, 

58 logger=None, 

59 ): 

60 super().__init__( 

61 mcs=market_calendar_service, 

62 market_clock=market_clock, 

63 logger=logger or logging.getLogger(__name__), 

64 ) 

65 self._stock_query_service = stock_query_service 

66 self.stock_code_repository = stock_code_repository 

67 self._stock_repo = stock_repo 

68 self._pm = performance_profiler or PerformanceProfiler(enabled=False) 

69 self._ns = notification_service 

70 self._suspend_event: asyncio.Event = asyncio.Event() 

71 self._suspend_event.set() # 초기에는 실행 가능 

72 

73 # 수집 상태 

74 self._is_collecting: bool = False 

75 self._last_collected_date: Optional[str] = None 

76 self._progress: Dict = { 

77 "running": False, 

78 "processed": 0, 

79 "total": 0, 

80 "collected": 0, 

81 "elapsed": 0.0, 

82 "status": "", 

83 } 

84 self._all_stocks_cache = None 

85 

86 # ── SchedulableTask 인터페이스 구현 ──────────────────────── 

87 

88 @property 

89 def task_name(self) -> str: 

90 return "daily_price_collector" 

91 

92 @property 

93 def _scheduler_label(self) -> str: 

94 return "DailyPriceCollector" 

95 

96 async def start(self) -> None: 

97 """장마감 후 자동 스케줄러 시작.""" 

98 if self._state == TaskState.RUNNING: 

99 return 

100 self._state = TaskState.RUNNING 

101 self._suspend_event.set() 

102 

103 self._tasks.append( 

104 asyncio.create_task(self._after_market_scheduler()) 

105 ) 

106 self._logger.info(f"DailyPriceCollectorTask 시작: {len(self._tasks)}개 태스크") 

107 

108 async def suspend(self) -> None: 

109 """수집을 일시 중지한다 (chunk 사이에서 대기).""" 

110 if self._state == TaskState.RUNNING: 

111 self._suspend_event.clear() 

112 self._state = TaskState.SUSPENDED 

113 self._logger.info("DailyPriceCollectorTask 일시 중지") 

114 

115 async def resume(self) -> None: 

116 """일시 중지된 수집을 재개한다.""" 

117 if self._state == TaskState.SUSPENDED: 

118 self._suspend_event.set() 

119 self._state = TaskState.RUNNING 

120 self._logger.info("DailyPriceCollectorTask 재개") 

121 

122 async def _on_market_closed(self, latest_trading_date: str) -> None: 

123 """장 마감 후 콜백: 해당 거래일의 수집이 필요하면 실행.""" 

124 if self._last_collected_date != latest_trading_date: 

125 await self._collect_all_prices() 

126 

127 # ── 전체 종목 현재가 수집 ──────────────────────────── 

128 async def _collect_all_prices(self, force: bool = False) -> None: 

129 """전체 종목 현재가+펀더멘털을 3-Tier Fallback 구조로 수집한다.""" 

130 if self._mcs and await self._mcs.is_market_open_now(): 

131 self._logger.info("장 운영 중이므로 현재가 수집을 건너뜁니다.") 

132 return 

133 

134 if self._is_collecting: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 self._logger.info("현재가 수집 이미 진행 중 — 스킵") 

136 return 

137 

138 target_date = await self._mcs.get_latest_trading_date() if self._mcs else None 

139 if not target_date: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 self._logger.error("최근 거래일을 확인할 수 없어 현재가 수집을 중단합니다.") 

141 return 

142 

143 if not force and self._last_collected_date == target_date: 

144 self._logger.info(f"이미 {target_date} 현재가 수집 완료 — 스킵") 

145 return 

146 

147 self._logger.info(f"전체 종목 수집 파이프라인 시작 (기준일: {target_date})") 

148 self._is_collecting = True 

149 start_time = time.time() 

150 

151 # 반복 조회를 피하기 위해 한 번 로드 후 캐싱 

152 self._all_stocks_cache = self._load_all_stocks() 

153 

154 try: 

155 # [Tier 1] pykrx 실패/검증 실패 시 FinanceDataReader 시도 

156 if await self._try_collect_via_fdr(target_date, start_time): 

157 await self._finish_collection(target_date, start_time, "FDR") 

158 return 

159 

160 # [Tier 2] 모두 실패 시 최후의 보루 증권사 API 청크 수집 

161 self._logger.warning("크롤링 모두 실패. 증권사 API(Chunk) 수집으로 Fallback 합니다.") 

162 if self._ns: 162 ↛ 167line 162 didn't jump to line 167 because the condition on line 162 was always true

163 await self._ns.emit( 

164 NotificationCategory.BACKGROUND, NotificationLevel.WARNING, 

165 "수집 모드 전환", "크롤링 라이브러리 오류로 인해 증권사 API 일일이 수집 모드(약 10분 소요)로 동작합니다." 

166 ) 

167 await self._collect_via_broker_api(target_date, start_time) 

168 await self._finish_collection(target_date, start_time, "Broker API") 

169 

170 except Exception as e: 

171 self._logger.error(f"전체 수집 파이프라인 실패: {e}", exc_info=True) 

172 finally: 

173 self._is_collecting = False 

174 self._all_stocks_cache = None 

175 

176 # ── 2. 데이터 검증 (Sanity Check) ───────────────────────────── 

177 

178 async def _verify_crawler_data(self, df_crawled: pd.DataFrame, source_name: str) -> bool: 

179 """ 

180 증권사 API의 확정 데이터(시가/고가/저가/종가)와 크롤링 데이터가 일치하는지 완벽 검증한다. 

181 """ 

182 self._logger.info(f"[{source_name}] 데이터 정합성 검증(OHLC 4종) 시작...") 

183 

184 match_count = 0 

185 mismatch_count = 0 

186 

187 for code in self.CANARY_STOCKS: 

188 # 1. 증권사 API에서 Source of Truth 호출 

189 api_resp = await self._fetch_with_retry(code) 

190 if not api_resp or api_resp.rt_cd != ErrorCode.SUCCESS.value: 

191 self._logger.debug(f"검증용 API 호출 실패({code}) - 스킵") 

192 continue 

193 

194 # API 데이터 추출 (output 뎁스 고려) 

195 data = api_resp.data 

196 output = data.get('output') if isinstance(data, dict) else data 

197 

198 # API 속성 추출 헬퍼 

199 def _get_api_val(key): 

200 val = output.get(key, 0) if isinstance(output, dict) else getattr(output, key, 0) 

201 return int(val) if val else 0 

202 

203 # API: 종가(stck_prpr), 시가(stck_oprc), 고가(stck_hgpr), 저가(stck_lwpr) 

204 api_close = _get_api_val('stck_prpr') 

205 api_open = _get_api_val('stck_oprc') 

206 api_high = _get_api_val('stck_hgpr') 

207 api_low = _get_api_val('stck_lwpr') 

208 

209 # 2. 크롤링 데이터(DataFrame)에서 추출 

210 try: 

211 # 인덱스가 종목코드인지, 컬럼에 종목코드가 있는지 확인 

212 if code in df_crawled.index: 212 ↛ 215line 212 didn't jump to line 215 because the condition on line 212 was always true

213 row = df_crawled.loc[code] 

214 else: 

215 matches = df_crawled[df_crawled['종목코드'] == code] 

216 if matches.empty: 

217 self._logger.debug(f"크롤링 데이터에 검증용 종목({code}) 없음 - 스킵") 

218 continue 

219 row = matches.iloc[0] 

220 

221 # 크롤링 데이터 추출 헬퍼 (pykrx, FDR 호환) 

222 def _get_crawled_val(cols): 

223 for col in cols: 223 ↛ 226line 223 didn't jump to line 226 because the loop on line 223 didn't complete

224 if col in row.index and pd.notna(row[col]): 224 ↛ 223line 224 didn't jump to line 223 because the condition on line 224 was always true

225 return int(row[col]) 

226 return 0 

227 

228 crawled_close = _get_crawled_val(['종가', 'Close']) 

229 crawled_open = _get_crawled_val(['시가', 'Open']) 

230 crawled_high = _get_crawled_val(['고가', 'High']) 

231 crawled_low = _get_crawled_val(['저가', 'Low']) 

232 

233 except Exception as e: 

234 self._logger.debug(f"크롤링 데이터 파싱 예외({code}): {e} - 스킵") 

235 continue 

236 

237 # 3. 시/고/저/종 4개 값 모두 대조 (단 하나라도 다르면 실패) 

238 if (api_close != crawled_close or 

239 api_open != crawled_open or 

240 api_high != crawled_high or 

241 api_low != crawled_low): 

242 

243 self._logger.warning( 

244 f"데이터 불일치 감지! ({code})\n" 

245 f" - API : 시({api_open}) 고({api_high}) 저({api_low}) 종({api_close})\n" 

246 f" - {source_name.ljust(6)}: 시({crawled_open}) 고({crawled_high}) 저({crawled_low}) 종({crawled_close})" 

247 ) 

248 mismatch_count += 1 

249 else: 

250 match_count += 1 

251 

252 if match_count == 0: 

253 self._logger.warning(f"[{source_name}] 검증 가능한 종목이 없어 실패 처리합니다.") 

254 return False 

255 

256 # 2개 이상 불일치 시에만 Fallback (단일 종목 거래 정지 등 예외 대응) 

257 if mismatch_count >= 2: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 self._logger.warning(f"[{source_name}] 데이터 불일치 종목 다수({mismatch_count}개) 발생. 검증 실패.") 

259 return False 

260 

261 self._logger.info(f"[{source_name}] 데이터 정합성 검증 통과 (일치: {match_count}, 불일치: {mismatch_count})") 

262 return True 

263 

264 # ── 3. 수집 티어 구현 ───────────────────────────────────────── 

265 async def _try_collect_via_fdr(self, target_date: str, start_time: float) -> bool: 

266 """[Tier 2] FinanceDataReader를 활용한 수집 (차선책)""" 

267 self._progress["status"] = "FinanceDataReader 일괄 수집 중..." 

268 

269 def _fetch_fdr_sync(): 

270 # FDR은 당일 시세(OHLCV) 전체 리스트를 가져오는 기능을 지원 

271 df_fdr = fdr.StockListing('KRX') 

272 if df_fdr.empty: 

273 raise ValueError("FDR 데이터가 비어있습니다.") 

274 

275 # FDR의 경우 'Close' 컬럼을 '종가'로 맞추어 _verify_crawler_data와 호환되게 함 

276 df_fdr.rename(columns={'Code': '종목코드', 'Close': '종가'}, inplace=True) 

277 return df_fdr 

278 

279 try: 

280 df_fdr = await asyncio.to_thread(_fetch_fdr_sync) 

281 

282 if not await self._verify_crawler_data(df_fdr, "FDR"): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 return False 

284 

285 formatted_records = self._format_dataframe_to_records(df_fdr) 

286 await self._save_bulk_to_db_with_progress(target_date, formatted_records, start_time) 

287 return True 

288 except Exception as e: 

289 self._logger.warning(f"FDR 수집 중 예외 발생 (Fallback 시도): {e}") 

290 return False 

291 

292 async def _collect_via_broker_api(self, target_date: str, start_time: float) -> None: 

293 """[Tier 2] 증권사 API 청크 기반 수집 로직 (약 10분 소요)""" 

294 all_stocks = getattr(self, "_all_stocks_cache", None) or self._load_all_stocks() 

295 total = len(all_stocks) 

296 

297 # 메인 오케스트레이터에서 넘겨받은 progress 정보를 API 수집 모드에 맞게 갱신 

298 self._progress["total"] = total 

299 self._progress["status"] = "증권사 API 수집 중 (Fallback)..." 

300 

301 collected_records: List[Dict] = [] 

302 db_upsert_buffer: List[Dict] = [] 

303 processed = 0 

304 

305 for chunk in _chunked(all_stocks, self.API_CHUNK_SIZE): 

306 # 일시정지(suspend) 체크 

307 await self._suspend_event.wait() 

308 

309 # 1. 8개씩 병렬 API 호출 

310 tasks = [self._fetch_with_retry(code) for code, _, _ in chunk] 

311 responses = await asyncio.gather(*tasks, return_exceptions=True) 

312 

313 # 2. 응답 데이터 추출 및 버퍼에 담기 

314 batch_records = [] 

315 for (code, name, market), resp in zip(chunk, responses): 

316 if isinstance(resp, Exception): 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 continue 

318 record = self._extract_broker_api_record(code, name, market, resp) 

319 if record: 319 ↛ 315line 319 didn't jump to line 315 because the condition on line 319 was always true

320 batch_records.append(record) 

321 

322 if batch_records: 322 ↛ 327line 322 didn't jump to line 327 because the condition on line 322 was always true

323 db_upsert_buffer.extend(batch_records) 

324 collected_records.extend(batch_records) 

325 

326 # 3. DB 배치 저장 (500개 도달 시) 

327 if len(db_upsert_buffer) >= self.DB_UPSERT_BATCH_SIZE: 327 ↛ 334line 327 didn't jump to line 334 because the condition on line 327 was always true

328 await self._stock_repo.upsert_daily_snapshot(target_date, db_upsert_buffer) 

329 db_upsert_buffer.clear() 

330 

331 # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 

332 # 4. ★ 핵심: 매 청크(8개)마다 진행률 즉시 업데이트 

333 # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 

334 processed += len(chunk) 

335 elapsed = time.time() - start_time 

336 self._progress.update({ 

337 "processed": processed, 

338 "collected": len(collected_records), 

339 "elapsed": round(elapsed, 1) 

340 }) 

341 

342 # 서버 로그용 출력 (50개 단위 또는 마지막) 

343 if processed % 50 == 0 or processed >= total: 343 ↛ 351line 343 didn't jump to line 351 because the condition on line 343 was always true

344 self._logger.info( 

345 f"[Broker API] 진행: {processed}/{total} " 

346 f"({processed / total * 100:.1f}%) " 

347 f"| 수집: {len(collected_records)} | 소요: {elapsed:.1f}s" 

348 ) 

349 

350 # API Rate Limit 회피용 Sleep (1.1초) 

351 if not all(getattr(r, '_cache_hit', False) for r in responses if not isinstance(r, Exception)): 351 ↛ 305line 351 didn't jump to line 305 because the condition on line 351 was always true

352 await asyncio.sleep(self.CHUNK_SLEEP_SEC) 

353 

354 # 루프 종료 후 남은 버퍼 최종 저장 

355 if db_upsert_buffer: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 await self._stock_repo.upsert_daily_snapshot(target_date, db_upsert_buffer) 

357 

358 # ── 4. 완료 처리 헬퍼 ───────────────────────────────────────── 

359 

360 async def _finish_collection(self, target_date: str, start_time: float, source: str) -> None: 

361 """수집 완료 후 공통 후처리 로직""" 

362 self._last_collected_date = target_date 

363 elapsed = time.time() - start_time 

364 self._logger.info(f"전체 종목 수집 완료 (Source: {source}), 소요: {elapsed:.1f}s") 

365 if self._ns: 365 ↛ exitline 365 didn't return from function '_finish_collection' because the condition on line 365 was always true

366 await self._ns.emit( 

367 NotificationCategory.BACKGROUND, NotificationLevel.INFO, 

368 "전체 종목 현재가 수집 완료", 

369 f"소스: {source} / 소요: {elapsed:.1f}초" 

370 ) 

371 

372 # # ── 내부 헬퍼 ───────────────────────────────────────── 

373 

374 async def _fetch_with_retry(self, code: str): 

375 """get_current_price API 호출 + 재시도.""" 

376 max_retries = 3 

377 delay = 1.0 

378 for attempt in range(max_retries): 

379 try: 

380 resp = await self._stock_query_service.get_current_price(code, count_stats=False, caller="DailyPriceCollectorTask") 

381 if resp and resp.rt_cd == ErrorCode.SUCCESS.value: 

382 return resp 

383 error_msg = resp.msg1 if resp else "응답 없음" 

384 self._logger.warning( 

385 f"현재가 조회 실패 (시도 {attempt + 1}/{max_retries}): " 

386 f"{code}, 사유: {error_msg}" 

387 ) 

388 except Exception as e: 

389 self._logger.error( 

390 f"현재가 조회 예외 (시도 {attempt + 1}/{max_retries}): " 

391 f"{code}, 오류: {e}" 

392 ) 

393 if attempt < max_retries - 1: 

394 await asyncio.sleep(delay) 

395 delay *= 1.5 

396 return None 

397 

398 @staticmethod 

399 def _extract_broker_api_record( 

400 code: str, name: str, market: str, resp 

401 ) -> Optional[Dict]: 

402 """API 응답에서 DB 저장용 레코드를 추출한다.""" 

403 if not resp: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 return None 

405 

406 try: 

407 data = resp.data 

408 if not data: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 return None 

410 

411 # get_current_price의 응답 구조: data = {'output': ResStockFullInfoApiOutput} 

412 output = data.get('output') if isinstance(data, dict) else data 

413 if not output: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 return None 

415 

416 def _safe_int(val, default=0): 

417 try: 

418 return int(val) if val else default 

419 except (ValueError, TypeError): 

420 return default 

421 

422 def _safe_float(val, default=0.0): 

423 try: 

424 return float(val) if val else default 

425 except (ValueError, TypeError): 

426 return default 

427 

428 # ResStockFullInfoApiOutput 필드 → DB 레코드 변환 

429 # output이 Pydantic 모델이면 getattr, dict면 get 

430 _get = ( 

431 (lambda k, d=None: getattr(output, k, d)) 

432 if hasattr(output, 'stck_prpr') 

433 else (lambda k, d=None: output.get(k, d)) 

434 ) 

435 

436 return { 

437 "code": code, 

438 "name": name, 

439 "current_price": _safe_int(_get("stck_prpr")), 

440 "open_price": _safe_int(_get("stck_oprc")), 

441 "high_price": _safe_int(_get("stck_hgpr")), 

442 "low_price": _safe_int(_get("stck_lwpr")), 

443 "prev_close": _safe_int(_get("stck_sdpr")), 

444 "change_price": _safe_int(_get("prdy_vrss")), 

445 "change_sign": _get("prdy_vrss_sign", ""), 

446 "change_rate": _get("prdy_ctrt", "0"), 

447 "volume": _safe_int(_get("acml_vol")), 

448 "trading_value": _safe_int(_get("acml_tr_pbmn")), 

449 "market_cap": _safe_int(_get("hts_avls")), 

450 "per": _safe_float(_get("per")), 

451 "pbr": _safe_float(_get("pbr")), 

452 "eps": _safe_float(_get("eps")), 

453 "w52_high": _safe_int(_get("w52_hgpr")), 

454 "w52_low": _safe_int(_get("w52_lwpr")), 

455 "market": market, 

456 } 

457 except Exception: 

458 return None 

459 

460 def _load_all_stocks(self) -> List[tuple]: 

461 """StockCodeRepository에서 KOSPI/KOSDAQ 전체 종목 로드 (ETF/우선주 제외). 

462 

463 iterrows() 대신 벡터화 마스킹을 사용하여 수십 배 빠르게 필터링한다. 

464 """ 

465 df = self.stock_code_repository.df 

466 codes = df["종목코드"].astype(str) 

467 names = df["종목명"].astype(str) 

468 

469 mask = ( 

470 codes.ne("") 

471 & df["시장구분"].isin(("KOSPI", "KOSDAQ")) 

472 & (codes.str[-1] == "0") 

473 & ~names.str.startswith(_ETF_PREFIXES) 

474 & ~names.str.contains("스팩", na=False) 

475 ) 

476 filtered = df[mask] 

477 return list(zip(filtered["종목코드"], filtered["종목명"], filtered["시장구분"])) 

478 

479 def get_progress(self) -> Dict: 

480 """수집 진행률 반환.""" 

481 return dict(self._progress) 

482 

483 async def force_collect(self) -> None: 

484 """강제 수집: skip 조건을 무시하고 전 종목 현재가를 API 재호출한다.""" 

485 self._logger.info("DailyPriceCollectorTask 강제 수집 요청") 

486 await self._collect_all_prices(force=True) 

487 

488 def _format_dataframe_to_records(self, df: pd.DataFrame) -> List[Dict]: 

489 """ 

490 pykrx 또는 FinanceDataReader에서 수집한 DataFrame을 

491 기존 _extract_broker_api_record와 동일한 형태의 DB 레코드 딕셔너리 리스트로 변환한다. 

492 """ 

493 records = [] 

494 if df is None or df.empty: 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true

495 return records 

496 

497 # 1. DB 기준 종목 메타데이터 (이름, 시장구분) 맵핑용 테이블 생성 

498 # pykrx의 경우 종목명이나 시장구분 컬럼이 누락되어 있을 수 있으므로 자체 DB 기준으로 매핑합니다. 

499 # 또한, 이 딕셔너리에 없는 종목(ETF, 스팩, 우선주 등)은 자연스럽게 필터링됩니다. 

500 all_stocks = getattr(self, "_all_stocks_cache", None) or self._load_all_stocks() 

501 stock_meta = { 

502 code: {"name": name, "market": market} 

503 for code, name, market in all_stocks 

504 } 

505 

506 # 2. 컬럼명 유연성 확보를 위한 헬퍼 함수 

507 def _get_val(row, possible_cols, default_val): 

508 for col in possible_cols: 

509 if col in row.index and pd.notna(row[col]): 

510 return row[col] 

511 return default_val 

512 

513 # 3. DataFrame 순회 및 레코드 추출 

514 for _, row in df.iterrows(): 

515 # 종목코드 추출 (문자열 변환 및 6자리 패딩 보장) 

516 raw_code = _get_val(row, ['종목코드', 'Code'], "") 

517 code = str(raw_code).zfill(6) 

518 

519 if not code or code == "000000": 

520 continue 

521 

522 # 자체 DB 필터링을 통과한 종목만 수집 

523 meta = stock_meta.get(code) 

524 if not meta: 

525 continue 

526 

527 try: 

528 # 숫자형 데이터 안전 추출 (pykrx와 FDR의 컬럼명 모두 지원) 

529 current_price = int(_get_val(row, ['종가', 'Close'], 0)) 

530 open_price = int(_get_val(row, ['시가', 'Open'], 0)) 

531 high_price = int(_get_val(row, ['고가', 'High'], 0)) 

532 low_price = int(_get_val(row, ['저가', 'Low'], 0)) 

533 volume = int(_get_val(row, ['거래량', 'Volume'], 0)) 

534 trading_value = int(_get_val(row, ['거래대금', 'Amount'], 0)) 

535 market_cap = int(_get_val(row, ['시가총액', 'Marcap'], 0)) 

536 

537 per = float(_get_val(row, ['PER'], 0.0)) 

538 pbr = float(_get_val(row, ['PBR'], 0.0)) 

539 eps = float(_get_val(row, ['EPS'], 0.0)) 

540 

541 # 등락 데이터 추정 및 계산 

542 change_price = int(_get_val(row, ['대비', 'Changes'], 0)) 

543 raw_change_rate = _get_val(row, ['등락률', 'ChagesRatio', 'ChangeRatio'], 0.0) 

544 change_rate = str(round(float(raw_change_rate), 2)) 

545 

546 # 전일 종가 계산 (현재가 - 대비) 

547 prev_close = current_price - change_price 

548 

549 # 등락 부호 결정 (API 호환성: 2=상승, 3=보합, 5=하락) 

550 if change_price > 0: 

551 change_sign = "2" 

552 elif change_price < 0: 552 ↛ 555line 552 didn't jump to line 555 because the condition on line 552 was always true

553 change_sign = "5" 

554 else: 

555 change_sign = "3" 

556 

557 record = { 

558 "code": code, 

559 "name": meta["name"], 

560 "current_price": current_price, 

561 "open_price": open_price, 

562 "high_price": high_price, 

563 "low_price": low_price, 

564 "prev_close": prev_close, 

565 "change_price": change_price, 

566 "change_sign": change_sign, 

567 "change_rate": change_rate, 

568 "volume": volume, 

569 "trading_value": trading_value, 

570 "market_cap": market_cap, 

571 "per": per, 

572 "pbr": pbr, 

573 "eps": eps, 

574 # 일괄 수집 데이터에서는 52주 신고/신저가를 즉각 구하기 어려우므로 None 처리 

575 # (DB Upsert 로직에서 NULL(None)이면 기존 값을 유지하도록 설계됨) 

576 "w52_high": None, 

577 "w52_low": None, 

578 "market": meta["market"], 

579 } 

580 records.append(record) 

581 

582 except (ValueError, TypeError) as e: 

583 self._logger.debug(f"데이터 파싱 오류 (종목: {code}): {e}") 

584 continue 

585 

586 return records 

587 

588 async def _save_bulk_to_db_with_progress(self, target_date: str, records: List[Dict], start_time: float) -> None: 

589 """크롤링된 전체 레코드를 DB에 Batch 단위로 저장하며 진행률을 업데이트한다.""" 

590 total_records = len(records) 

591 if total_records == 0: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 return 

593 

594 # 필터링 후의 실제 유효 레코드 수로 전체 모수(total) 보정 

595 self._progress["total"] = total_records 

596 processed = 0 

597 

598 for i in range(0, total_records, self.DB_UPSERT_BATCH_SIZE): 

599 batch = records[i:i + self.DB_UPSERT_BATCH_SIZE] 

600 

601 # DB 저장 

602 await self._stock_repo.upsert_daily_snapshot(target_date, batch) 

603 

604 # 진행률 업데이트 

605 processed += len(batch) 

606 elapsed = time.time() - start_time 

607 self._progress.update({ 

608 "processed": processed, 

609 "collected": processed, 

610 "elapsed": round(elapsed, 1), 

611 "status": "DB 저장 중..." 

612 }) 

613 

614 # 다른 비동기 태스크(API 응답 등)가 블로킹되지 않도록 제어권 양보 

615 await asyncio.sleep(0.01)