Coverage for services / market_data_service.py: 87%
373 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# services/market_data_service.py
2import asyncio
3import logging
4from datetime import datetime, timedelta
5from typing import List, Dict, Any, Optional, TYPE_CHECKING
6from config.DynamicConfig import DynamicConfig
8from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv
9from core.market_clock import MarketClock
10from brokers.broker_api_wrapper import BrokerAPIWrapper
11from common.types import (
12 ResPriceSummary, ResCommonResponse, ErrorCode,
14 ResTopMarketCapApiItem, ResBasicStockInfo, ResFluctuation, ResDailyChartApiItem, Exchange,
15 ResStockFullInfoApiOutput
16)
17from core.cache.cache_store import CacheStore
18from core.performance_profiler import PerformanceProfiler
19from services.market_calendar_service import MarketCalendarService
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 from repositories.stock_repository import StockRepository
24class MarketDataService:
25 """
26 시장 데이터(현재가, 호가, OHLCV, 랭킹 등) 조회를 전담하는 서비스입니다.
27 """
28 def __init__(self, broker_api_wrapper: BrokerAPIWrapper, env: KoreaInvestApiEnv, logger=None,
29 market_clock: MarketClock = None, cache_store: Optional[CacheStore] = None,
30 market_calendar_service: Optional[MarketCalendarService] = None, performance_profiler: Optional[PerformanceProfiler] = None,
31 stock_repository: Optional['StockRepository'] = None):
32 self._broker_api_wrapper = broker_api_wrapper
33 self._env = env
34 self._logger = logger if logger else logging.getLogger(__name__)
35 self._market_clock = market_clock
36 self.cache_store = cache_store
37 self._mcs = market_calendar_service
38 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
39 self._stock_repo = stock_repository
41 self._ETF_PREFIXES = (
42 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE",
43 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS",
44 "VITA", "TREX", "MASTER", "WOORI", "KINDEX",
45 )
47 async def get_name_by_code(self, code: str) -> str:
48 """종목코드로 종목명을 반환합니다 (BrokerAPIWrapper 위임)."""
49 return await self._broker_api_wrapper.get_name_by_code(code)
51 async def get_price_summary(self, stock_code, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
52 """주어진 종목코드에 대해 시가/현재가/등락률(%) 요약 정보를 반환합니다 (KoreaInvestApiQuotations 위임)."""
53 self._logger.info(f"MarketDataService - {stock_code} 종목 요약 정보 조회 요청")
54 return await self._broker_api_wrapper.get_price_summary(stock_code, exchange=exchange)
56 async def get_stock_info_by_code(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
57 """종목코드로 종목의 전체 정보를 가져옵니다 (BrokerAPIWrapper 위임)."""
58 self._logger.info(f"MarketDataService - {stock_code} 종목 상세 정보 조회 요청")
59 return await self._broker_api_wrapper.get_stock_info_by_code(stock_code, exchange=exchange)
61 async def get_current_price(self, stock_code, exchange: Exchange = Exchange.KRX, count_stats: bool = True, caller: str = "unknown") -> ResCommonResponse:
62 # 1. StockRepository 단기 캐시 확인 (웹소켓 틱 갱신 또는 최근 API 조회 결과)
63 if self._stock_repo: 63 ↛ 69line 63 didn't jump to line 69 because the condition on line 63 was always true
64 cached_data = self._stock_repo.get_current_price(stock_code, max_age_sec=3.0, count_stats=count_stats, caller=caller)
65 if cached_data:
66 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(Cache)", data=cached_data)
68 # 2. 장 마감 시간대에는 daily_prices DB 스냅샷 확인 (API 호출 절약)
69 is_market_open = (await self._mcs.is_market_open_now()) if self._mcs else self._market_clock.is_market_operating_hours()
70 if self._stock_repo and not is_market_open:
71 db_data = await self._stock_repo.get_latest_daily_snapshot(stock_code)
72 if db_data:
73 # DB 데이터가 최근 거래일 기준인지 확인 (오래된 데이터 방지)
74 latest_trading_date = await self._mcs.get_latest_trading_date() if self._mcs else None
75 db_trade_date = db_data.get("_trade_date") or (db_data.get("output") or {}).get("stck_bsop_date")
76 if not latest_trading_date or db_trade_date != latest_trading_date:
77 self._logger.debug(
78 f"MarketDataService - {stock_code} DB 스냅샷 날짜({db_trade_date}) != 최근 거래일({latest_trading_date}), API 조회로 폴백"
79 )
80 else:
81 db_data = self._wrap_snapshot_output(db_data)
82 self._stock_repo.set_current_price(stock_code, db_data)
83 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(DB)", data=db_data)
85 if count_stats: 85 ↛ 87line 85 didn't jump to line 87 because the condition on line 85 was always true
86 self._logger.info(f"MarketDataService - {stock_code} 현재가 조회 요청")
87 resp = await self._broker_api_wrapper.get_current_price(stock_code, exchange=exchange)
89 # 3. 조회 결과를 StockRepository에 갱신
90 if resp and resp.rt_cd == ErrorCode.SUCCESS.value and self._stock_repo:
91 self._stock_repo.set_current_price(stock_code, resp.data)
93 return resp
95 async def get_stock_conclusion(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
96 """종목의 체결(체결강도 등) 정보를 조회합니다."""
97 self._logger.info(f"MarketDataService - {stock_code} 체결 정보 조회 요청")
98 return await self._broker_api_wrapper.get_stock_conclusion(stock_code, exchange=exchange)
100 async def get_multi_price(self, stock_codes: list[str]) -> ResCommonResponse:
101 """복수종목 현재가 조회 (최대 30종목)"""
102 self._logger.info(f"MarketDataService - 복수종목 현재가 조회 요청 ({len(stock_codes)}종목)")
103 return await self._broker_api_wrapper.get_multi_price(stock_codes)
105 async def get_top_market_cap_stocks_code(self, market_code: str, limit: int = None) -> ResCommonResponse:
106 """
107 시가총액 상위 종목을 조회하고 결과를 반환합니다 (모의투자 미지원).
108 ResCommonResponse 형태로 반환하며, data 필드에 List[ResTopMarketCapApiItem] 포함.
109 """
110 if limit is None: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 limit = 30
112 self._logger.warning(f"[경고] count 파라미터가 명시되지 않아 기본값 30을 사용합니다. market_code={market_code}")
114 self._logger.info(f"MarketDataService - 시가총액 상위 종목 조회 요청 - 시장: {market_code}, 개수: {limit}")
115 if self._env.is_paper_trading: 115 ↛ 119line 115 didn't jump to line 119 because the condition on line 115 was always true
116 self._logger.warning("MarketDataService - 시가총액 상위 종목 조회는 모의투자를 지원하지 않습니다.")
117 return ResCommonResponse(rt_cd=ErrorCode.INVALID_INPUT.value, msg1="모의투자 미지원 API입니다.", data=[])
119 return await self._broker_api_wrapper.get_top_market_cap_stocks_code(market_code, limit)
121 async def get_current_upper_limit_stocks(self, rise_stocks: List) -> ResCommonResponse:
122 """
123 전체 종목 리스트 중 현재 상한가에 도달한 종목을 필터링합니다.
124 ResCommonResponse 형태로 반환하며, data 필드에 List[Dict] (종목 정보) 포함.
125 """
126 results: List[ResBasicStockInfo] = []
127 for stock_info in rise_stocks:
128 code = "Unknown"
129 try:
130 if isinstance(stock_info, dict):
131 code = stock_info.get("stck_shrn_iscd", "Unknown")
132 elif hasattr(stock_info, "stck_shrn_iscd"): 132 ↛ 135line 132 didn't jump to line 135 because the condition on line 132 was always true
133 code = stock_info.stck_shrn_iscd
135 fluctuation_info: ResFluctuation = stock_info
136 code = fluctuation_info.stck_shrn_iscd
137 current_price = int(fluctuation_info.stck_prpr)
138 prdy_ctrt = float(fluctuation_info.prdy_ctrt)
139 name = fluctuation_info.hts_kor_isnm
140 change_rate = float(fluctuation_info.prdy_vrss)
142 if prdy_ctrt > 29.0:
143 stock_info = ResBasicStockInfo(
144 code=code, name=name, current_price=current_price,
145 change_rate=change_rate, prdy_ctrt=prdy_ctrt
146 )
147 results.append(stock_info)
148 except Exception as e:
149 self._logger.warning(f"{code} 현재 상한가 필터링 중 오류: {e}", exc_info=True)
150 continue
152 self._market_clock.get_current_kst_time()
153 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="현재 상한가 종목 필터링 성공", data=results)
155 async def get_all_stocks_code(self) -> ResCommonResponse:
156 """
157 전체 종목 코드를 조회합니다.
158 ResCommonResponse 형태로 반환하며, data 필드에 List[str] (종목 코드 리스트) 포함.
159 """
160 self._logger.info("MarketDataService - 전체 종목 코드 조회 요청")
161 try:
162 codes = await self._broker_api_wrapper.get_all_stock_code_list()
163 if not isinstance(codes, list):
164 msg = f"비정상 응답 형식: {codes}"
165 self._logger.warning(msg)
166 return ResCommonResponse(rt_cd=ErrorCode.PARSING_ERROR.value, msg1=msg, data=[])
167 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="전체 종목 코드 조회 성공", data=codes)
168 except Exception as e:
169 error_msg = f"전체 종목 코드 조회 실패: {e}"
170 self._logger.exception(error_msg)
171 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=error_msg, data=[])
173 async def get_asking_price(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
174 """종목의 실시간 호가 정보를 조회합니다."""
175 self._logger.info(f"MarketDataService - {stock_code} 종목 호가 정보 조회 요청")
176 return await self._broker_api_wrapper.get_asking_price(stock_code, exchange=exchange)
178 async def get_time_concluded_prices(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
179 """종목의 시간대별 체결가 정보를 조회합니다."""
180 self._logger.info(f"MarketDataService - {stock_code} 종목 시간대별 체결가 조회 요청")
181 return await self._broker_api_wrapper.get_time_concluded_prices(stock_code, exchange=exchange)
183 async def inquire_daily_itemchartprice(self, stock_code: str, start_date: str, end_date: str, fid_period_div_code: str = 'D', exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
184 """일별/분봉 주식 시세 차트 데이터를 조회합니다 (KoreaInvestApiQuotations 위임)."""
185 self._logger.info(f"MarketDataService - {stock_code} 종목 차트 데이터 조회 요청")
186 return await self._broker_api_wrapper.inquire_daily_itemchartprice(stock_code=stock_code, start_date=start_date, end_date=end_date, fid_period_div_code=fid_period_div_code, exchange=exchange)
188 async def get_top_rise_fall_stocks(self, rise: bool = True) -> ResCommonResponse:
189 """상승률 또는 하락률 상위 종목을 조회합니다."""
190 direction = "상승" if rise else "하락"
191 self._logger.info(f"MarketDataService - {direction}률 상위 종목 조회 요청")
192 return await self._broker_api_wrapper.get_top_rise_fall_stocks(rise)
194 async def get_top_volume_stocks(self) -> ResCommonResponse:
195 """거래량 상위 종목을 조회합니다."""
196 self._logger.info("MarketDataService - 거래량 상위 종목 조회 요청")
197 return await self._broker_api_wrapper.get_top_volume_stocks()
199 async def get_top_trading_value_stocks(self) -> ResCommonResponse:
200 """
201 거래대금 상위 종목 조회.
202 거래량/코스피시가총액30/코스닥시가총액30/상승률/하락률 5개 기존 API 결과를 병합하여
203 acml_tr_pbmn(거래대금) 기준 상위 30개를 반환한다.
204 ETF/ETN 종목은 제외한다.
205 """
206 t_start = self.pm.start_timer()
207 self._logger.info("MarketDataService - 거래대금 상위 종목 조회 요청 (병합)")
209 vol_resp, mc_kospi_resp, mc_kosdaq_resp, rise_resp, fall_resp = await asyncio.gather(
210 self._broker_api_wrapper.get_top_volume_stocks(),
211 self._broker_api_wrapper.get_top_market_cap_stocks_code("0000", 30),
212 self._broker_api_wrapper.get_top_market_cap_stocks_code("1001", 30),
213 self._broker_api_wrapper.get_top_rise_fall_stocks(True),
214 self._broker_api_wrapper.get_top_rise_fall_stocks(False),
215 )
217 volume_stocks = vol_resp.data if vol_resp and vol_resp.rt_cd == ErrorCode.SUCCESS.value else []
218 if isinstance(volume_stocks, dict):
219 volume_stocks = volume_stocks.get("output", [])
220 mc_kospi = mc_kospi_resp.data if mc_kospi_resp and mc_kospi_resp.rt_cd == ErrorCode.SUCCESS.value else []
221 mc_kosdaq = mc_kosdaq_resp.data if mc_kosdaq_resp and mc_kosdaq_resp.rt_cd == ErrorCode.SUCCESS.value else []
222 mc_stocks = list(mc_kospi or []) + list(mc_kosdaq or [])
223 rise_stocks = rise_resp.data if rise_resp and rise_resp.rt_cd == ErrorCode.SUCCESS.value else []
224 fall_stocks = fall_resp.data if fall_resp and fall_resp.rt_cd == ErrorCode.SUCCESS.value else []
226 def _to_dict(item):
227 return item.to_dict() if hasattr(item, 'to_dict') else (item if isinstance(item, dict) else {})
228 def _get_code(d):
229 return d.get("mksc_shrn_iscd") or d.get("stck_shrn_iscd") or d.get("iscd") or ""
231 def _compute_tr_val(d):
232 if d.get("acml_tr_pbmn"):
233 try:
234 d["_tr_val_int"] = int(d["acml_tr_pbmn"] or "0")
235 except (ValueError, TypeError):
236 d["_tr_val_int"] = 0
237 else:
238 try:
239 d["_tr_val_int"] = int(d.get("stck_prpr", "0") or "0") * int(d.get("acml_vol", "0") or "0")
240 except (ValueError, TypeError):
241 d["_tr_val_int"] = 0
243 merged = {}
244 for stock in volume_stocks:
245 d = _to_dict(stock)
246 code = _get_code(d)
247 if code: 247 ↛ 244line 247 didn't jump to line 244 because the condition on line 247 was always true
248 _compute_tr_val(d)
249 merged[code] = d
251 for stock in mc_stocks + list(rise_stocks or []) + list(fall_stocks or []):
252 d = _to_dict(stock)
253 code = _get_code(d)
254 if code and code not in merged:
255 _compute_tr_val(d)
256 merged[code] = d
258 merged = {c: d for c, d in merged.items() if not self._is_etf(d)}
259 result = list(merged.values())
260 result.sort(key=lambda x: x.get("_tr_val_int", 0), reverse=True)
261 result = result[:30]
262 for i, stock in enumerate(result, 1):
263 stock["data_rank"] = str(i)
264 tr_val_int = stock.pop("_tr_val_int", None)
265 if "acml_tr_pbmn" not in stock and tr_val_int is not None:
266 stock["acml_tr_pbmn"] = str(tr_val_int)
268 self.pm.log_timer("MarketDataService.get_top_trading_value_stocks", t_start, threshold=1.0)
269 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="거래대금 상위 성공", data=result)
271 def _wrap_snapshot_output(self, db_data: dict) -> dict:
272 """get_latest_daily_snapshot의 plain dict output을 ResStockFullInfoApiOutput으로 변환합니다."""
273 output = db_data.get("output")
274 if not isinstance(output, dict): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 return db_data
276 defaults = {f: "" for f in ResStockFullInfoApiOutput.model_fields}
277 defaults["new_hgpr_lwpr_cls_code"] = None
278 merged = {**defaults, **output}
279 return {**db_data, "output": ResStockFullInfoApiOutput.model_validate(merged)}
281 def _is_etf(self, stock: dict) -> bool:
282 """종목명 기반 ETF/ETN 여부 판별."""
283 name = stock.get("hts_kor_isnm") or stock.get("kor_shrt_ism") or ""
284 return any(name.startswith(prefix) for prefix in self._ETF_PREFIXES)
286 async def get_etf_info(self, etf_code: str) -> ResCommonResponse:
287 """특정 ETF의 상세 정보를 조회합니다."""
288 self._logger.info(f"MarketDataService - {etf_code} ETF 정보 조회")
289 return await self._broker_api_wrapper.get_etf_info(etf_code)
291 async def get_financial_ratio(self, stock_code: str) -> ResCommonResponse:
292 """특정 종목의 재무비율을 조회합니다."""
293 self._logger.info(f"MarketDataService - {stock_code} 재무비율 조회")
294 return await self._broker_api_wrapper.get_financial_ratio(stock_code)
296 def _normalize_ohlcv_rows(self, items: List[Any]) -> List[dict]:
297 """
298 한국투자 일봉 응답(ResDailyChartApiItem list) 또는 dict list를
299 표준 OHLCV 스키마로 정규화한다.
300 반환: [{"date":"YYYYMMDD","open":float,"high":float,"low":float,"close":float,"volume":int}, ...]
301 """
302 def _get(it, attr, default=None):
303 if it is None: return default 303 ↛ exitline 303 didn't return from function '_get' because the return on line 303 wasn't executed
304 if isinstance(it, dict): return it.get(attr, default)
305 return getattr(it, attr, default)
306 def _to_float(x):
307 try: return float(str(x).replace(",", ""))
308 except Exception: return None
309 def _to_int(x):
310 try: return int(float(str(x).replace(",", "")))
311 except Exception: return None
312 rows = []
313 for it in items or []:
314 date = _get(it, "stck_bsop_date") or _get(it, "date")
315 if not date: continue
316 rows.append({
317 "date": date,
318 "open": _to_float(_get(it, "stck_oprc") or _get(it, "open")),
319 "high": _to_float(_get(it, "stck_hgpr") or _get(it, "high")),
320 "low": _to_float(_get(it, "stck_lwpr") or _get(it, "low")),
321 "close": _to_float(_get(it, "stck_clpr") or _get(it, "close")),
322 "volume": _to_int(_get(it, "acml_vol") or _get(it, "volume")),
323 })
324 rows.sort(key=lambda r: r["date"])
325 return rows
327 def _calc_range_by_period(self, period: str, end_dt: datetime | None, limit: int | None = None) -> tuple[str, str]:
328 """
329 period: 'D'|'W'|'M'
330 end_dt: 기준일(없으면 now KST)
331 limit : 원하는 봉 개수(없으면 합리적 기본값)
332 return: (start_yyyymmdd, end_yyyymmdd)
333 """
334 if end_dt is None: end_dt = self._market_clock.get_current_kst_time()
335 period = (period or "D").upper()
336 if period == "D": start_dt = end_dt - timedelta(days=max((limit or 365) * 2, 730))
337 elif period == "W": start_dt = end_dt - timedelta(weeks=max((limit or 52) * 2, 104))
338 elif period == "M": start_dt = end_dt - timedelta(days=max((limit or 24) * 2, 60) * 31)
339 else: start_dt = end_dt - timedelta(days=max((limit or 120) * 2, 240))
340 return self._market_clock.to_yyyymmdd(start_dt), self._market_clock.to_yyyymmdd(end_dt)
342 async def _fetch_past_daily_ohlcv(self, stock_code: str, end_yyyymmdd: str, max_loops: int = 8) -> List[dict]:
343 """
344 과거 일봉 데이터를 반복 조회하여 수집합니다.
345 :param end_yyyymmdd: 조회 종료일 (보통 어제 날짜)
346 :param max_loops: 반복 횟수 (10회 * 100일 = 약 1000일 ≈ 692 거래일)
347 """
348 t_start = self.pm.start_timer()
349 tasks = []
350 curr_end_dt = datetime.strptime(end_yyyymmdd, "%Y%m%d")
352 for _ in range(max_loops):
353 curr_start_dt = curr_end_dt - timedelta(days=DynamicConfig.OHLCV.DAILY_ITEMCHARTPRICE_MAX_RANGE)
354 s_date = self._market_clock.to_yyyymmdd(curr_start_dt)
355 e_date = self._market_clock.to_yyyymmdd(curr_end_dt)
357 tasks.append(self._broker_api_wrapper.inquire_daily_itemchartprice(
358 stock_code=stock_code, start_date=s_date, end_date=e_date, fid_period_div_code="D"
359 ))
360 curr_end_dt = curr_start_dt - timedelta(days=1)
362 responses = await asyncio.gather(*tasks, return_exceptions=True)
364 all_rows_map = {}
365 for raw in responses:
366 if isinstance(raw, Exception) or not raw or raw.rt_cd != ErrorCode.SUCCESS.value:
367 continue
368 rows = self._normalize_ohlcv_rows(raw.data)
369 for r in rows:
370 all_rows_map[r['date']] = r
372 all_rows = sorted(all_rows_map.values(), key=lambda x: x['date'])
374 self.pm.log_timer(f"MarketData._fetch_past_daily_ohlcv({stock_code})", t_start)
375 return all_rows
377 async def _fetch_today_ohlcv(self, stock_code: str, today_str: str, caller: str = "unknown") -> List[dict]:
378 try:
379 current_resp = await self.get_current_price(stock_code, caller=caller)
380 if current_resp.rt_cd == ErrorCode.SUCCESS.value and current_resp.data: 380 ↛ 394line 380 didn't jump to line 394 because the condition on line 380 was always true
381 output = current_resp.data.get('output')
382 if output: 382 ↛ 394line 382 didn't jump to line 394 because the condition on line 382 was always true
383 def _get_val(obj, attr_name):
384 return obj.get(attr_name) if isinstance(obj, dict) else getattr(obj, attr_name, None)
385 opn = _get_val(output, 'stck_oprc')
386 high = _get_val(output, 'stck_hgpr')
387 low = _get_val(output, 'stck_lwpr')
388 close = _get_val(output, 'stck_prpr')
389 vol = _get_val(output, 'acml_vol')
390 if opn and high and low and close: 390 ↛ 394line 390 didn't jump to line 394 because the condition on line 390 was always true
391 return [{"date": today_str, "open": float(opn), "high": float(high), "low": float(low), "close": float(close), "volume": int(vol) if vol else 0}]
392 except Exception as e:
393 self._logger.warning(f"오늘자 OHLCV 구성을 위한 현재가 조회 실패: {e}")
394 return []
396 async def get_ohlcv(self, stock_code: str, period: str = "D", caller: str = "unknown", exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
397 """
398 일봉(D)의 경우 StockRepository 활용, 주봉/월봉은 기존 API 조회 방식 사용.
399 """
400 t_ohlcv = self.pm.start_timer()
401 if (period or "D").upper() == "D":
402 now_dt = self._market_clock.get_current_kst_time()
403 today_str = now_dt.strftime("%Y%m%d")
404 yesterday_str = (now_dt - timedelta(days=1)).strftime("%Y%m%d")
405 past_rows = []
407 # 1. 과거 데이터 가져오기 (StockRepository 캐시/DB 활용)
408 stock_data = None
409 if self._stock_repo: 409 ↛ 416line 409 didn't jump to line 416 because the condition on line 409 was always true
410 stock_data = await self._stock_repo.get_stock_data(stock_code, ohlcv_limit=600, caller=caller)
411 if stock_data and "ohlcv" in stock_data:
412 past_rows = stock_data["ohlcv"]
414 # 2. 백필 범위 결정: 장 마감 후에는 오늘 최종 캔들이 확정 → today_str까지 조회
415 # 장 중에는 오늘 캔들이 미완성 → yesterday_str까지만 조회
416 is_market_open = (await self._mcs.is_market_open_now()) if self._mcs else False
417 fetch_end_date = yesterday_str if is_market_open else today_str
419 # historical_complete=True면 DB가 줄 수 있는 전부를 이미 보유 중 → 전체 백필 생략
420 # 단, DB 최신일 < fetch_end_date이면 누락 구간 보완 백필 수행
421 # 그렇지 않으면(첫 조회 또는 캐시 없음) API 백필 수행 (약 1000일 ≈ 692 거래일)
422 latest_in_db = past_rows[-1]['date'] if past_rows else None
423 if not stock_data or not stock_data.get("historical_complete"):
424 past_rows = await self._fetch_past_daily_ohlcv(stock_code, fetch_end_date, max_loops=10)
425 if self._stock_repo and past_rows:
426 await self._stock_repo.upsert_ohlcv([{**r, "code": stock_code} for r in past_rows])
427 elif latest_in_db and latest_in_db < fetch_end_date:
428 # DB 데이터가 오래됨 (거래일 누락) → 최근 구간만 보완 백필 후 병합
429 new_rows = await self._fetch_past_daily_ohlcv(stock_code, fetch_end_date, max_loops=1)
430 if new_rows: 430 ↛ 439line 430 didn't jump to line 439 because the condition on line 430 was always true
431 if self._stock_repo: 431 ↛ 434line 431 didn't jump to line 434 because the condition on line 431 was always true
432 await self._stock_repo.upsert_ohlcv([{**r, "code": stock_code} for r in new_rows])
433 # 기존 데이터와 병합 (날짜 기준 dedup → 오름차순 정렬)
434 merged = {r['date']: r for r in past_rows}
435 merged.update({r['date']: r for r in new_rows})
436 past_rows = sorted(merged.values(), key=lambda r: r['date'])
438 # 3. 오늘 데이터 처리 (실시간 API 병합) - 장 중에만 수행 (장 마감 후는 backfill에서 포함)
439 today_rows = []
440 if is_market_open:
441 today_rows = await self._fetch_today_ohlcv(stock_code, today_str, caller=caller)
442 if today_rows and now_dt.weekday() >= 5: today_rows = []
443 elif today_rows and past_rows: 443 ↛ 449line 443 didn't jump to line 449 because the condition on line 443 was always true
444 last_past = past_rows[-1]
445 today = today_rows[0]
446 if today['date'] != last_past['date'] and (today['open'] == last_past['open'] and today['close'] == last_past['close']): 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true
447 today_rows = []
449 if today_rows and not is_market_open: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 if not past_rows or today_rows[0]['date'] > past_rows[-1]['date']:
451 if self._stock_repo: await self._stock_repo.upsert_ohlcv([{**today_rows[0], "code": stock_code}])
452 past_rows = past_rows + today_rows
453 today_rows = []
455 # past_rows는 이미 날짜 오름차순 정렬 상태 → O(1) 업데이트
456 final_rows = past_rows.copy()
457 if today_rows:
458 today = today_rows[0]
459 if final_rows and final_rows[-1]['date'] == today['date']: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 final_rows[-1] = today
461 else:
462 final_rows.append(today)
463 self.pm.log_timer(f"MarketData.get_ohlcv({stock_code})", t_ohlcv)
464 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1=f"OHLCV {len(final_rows)}건", data=final_rows)
465 else:
466 s, e = self._calc_range_by_period(period, end_dt=self._market_clock.get_current_kst_time())
467 raw = await self._broker_api_wrapper.inquire_daily_itemchartprice(stock_code, start_date=s, end_date=e, fid_period_div_code=period.upper(), exchange=exchange)
468 if not raw or raw.rt_cd != ErrorCode.SUCCESS.value: return raw 468 ↛ exitline 468 didn't return from function 'get_ohlcv' because the return on line 468 wasn't executed
469 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=self._normalize_ohlcv_rows(raw.data))
471 async def get_ohlcv_range(self, stock_code: str, period: str = "D", start_date: Optional[str] = None, end_date: Optional[str] = None, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
472 """
473 시작일~종료일 범위형 차트 API 호출 (일/분 공통).
474 """
475 ed = end_date or self._market_clock.get_current_kst_time()
476 sd = start_date or (datetime.strptime(ed, "%Y%m%d") - timedelta(days=240)).strftime("%Y%m%d")
477 raw = await self._broker_api_wrapper.inquire_daily_itemchartprice(stock_code, start_date=sd, end_date=ed, fid_period_div_code=period.upper(), exchange=exchange)
478 if not raw or raw.rt_cd != ErrorCode.SUCCESS.value: return raw
479 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=self._normalize_ohlcv_rows(raw.data))
481 async def get_recent_daily_ohlcv(self, code: str, limit: int = DynamicConfig.OHLCV.DAILY_ITEMCHARTPRICE_MAX_RANGE, end_date: Optional[str] = None, start_date: Optional[str] = None, exchange: Exchange = Exchange.KRX) -> List[Dict[str, Any]]:
482 """
483 최근 'limit'개 *거래일* 일봉을 반환.
484 API는 시작/종료일 범위를 요구하므로 넉넉한 범위로 받아서 슬라이스로 limit개 보장.
485 한국투자증권 API 특성상 긴 기간(1년 이상) 조회 시 데이터가 잘릴 수 있으므로,
486 1년 단위로 끊어서 반복 호출하여 병합한다.
488 DB-first 최적화: end_date/start_date 미지정 시(최근 N건 요청) DB에 충분한 데이터가
489 있으면 API를 호출하지 않고 DB에서 바로 반환한다. ohlcv_update_task가 장 마감 후
490 전 종목을 갱신하므로 장중에는 API 호출 없이 DB 조회만으로 충분하다.
491 """
492 t_start = self.pm.start_timer()
493 current_ed_dt = datetime.strptime(end_date, "%Y%m%d") if end_date else self._market_clock.get_current_kst_time()
495 # DB-first: 날짜 범위가 명시되지 않은 경우에만 시도
496 if not end_date and not start_date and self._stock_repo:
497 stock_data = await self._stock_repo.get_stock_data(code, ohlcv_limit=limit, caller="get_recent_daily_ohlcv")
498 if stock_data:
499 rows = stock_data.get("ohlcv", [])
500 if len(rows) >= limit:
501 self.pm.log_timer(f"MarketData.get_recent_daily_ohlcv({code})[DB]", t_start)
502 return rows[-limit:]
504 if start_date:
506 resp = await self.get_ohlcv_range(code, "D", start_date, self._market_clock.to_yyyymmdd(current_ed_dt), exchange=exchange)
507 return resp.data or [] if resp and resp.rt_cd == ErrorCode.SUCCESS.value else []
509 # limit을 만족하기 위해 대략 영업일을 고려해 1.5를 곱하고 100일 단위로 끊습니다.
510 max_loops = int((limit * 1.5) // 100) + 1
511 max_loops = max(1, min(max_loops, 20)) # 최소 1, 최대 20으로 한정
513 tasks = []
514 curr_end_dt = current_ed_dt
515 for _ in range(max_loops):
516 ed_str = self._market_clock.to_yyyymmdd(curr_end_dt)
517 curr_start_dt = curr_end_dt - timedelta(days=100)
518 sd_str = self._market_clock.to_yyyymmdd(curr_start_dt)
520 tasks.append(self.get_ohlcv_range(code, "D", sd_str, ed_str))
521 curr_end_dt = curr_start_dt - timedelta(days=1)
523 responses = await asyncio.gather(*tasks, return_exceptions=True)
525 all_rows_map = {}
526 for resp in responses:
527 if isinstance(resp, Exception) or not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
528 continue
529 for r in (resp.data or []):
530 all_rows_map[r['date']] = r
532 all_rows = sorted(all_rows_map.values(), key=lambda x: x['date'])
534 if len(all_rows) > limit: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 all_rows = all_rows[-limit:]
537 self.pm.log_timer(f"MarketData.get_recent_daily_ohlcv({code})", t_start)
538 return all_rows
540 async def get_intraday_minutes_today(self, *, stock_code: str, input_hour_1: str) -> ResCommonResponse:
541 """
542 URL: /quotations/inquire-time-itemchartprice
543 TR : FHKST03010200 (모의/실전 공통)
544 """
545 return await self._broker_api_wrapper.inquire_time_itemchartprice(stock_code=stock_code, input_hour_1=input_hour_1, pw_data_incu_yn="Y", etc_cls_code="0")
547 async def get_intraday_minutes_by_date(self, *, stock_code: str, input_date_1: str, input_hour_1: str = "") -> ResCommonResponse:
548 """
549 URL: /quotations/inquire-time-dailychartprice
550 TR : FHKST03010230 (실전만)
551 """
552 if self._env.is_paper_trading: return ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="모의투자 미지원", data=[])
553 return await self._broker_api_wrapper.inquire_time_dailychartprice(stock_code=stock_code, input_date_1=input_date_1, input_hour_1=input_hour_1, pw_data_incu_yn="Y", fake_tick_incu_yn="")
555 async def get_latest_trading_date(self) -> Optional[str]:
556 """
557 대표 종목(삼성전자)의 일봉을 조회하여 데이터가 존재하는 가장 최근 날짜를 반환한다.
558 """
559 if self._mcs: return await self._mcs.get_latest_trading_date()
560 return None
562 async def get_next_open_day(self) -> Optional[str]:
563 """
564 현재 날짜 기준으로 가장 빠른 개장일을 찾아 반환합니다 (YYYYMMDD).
565 국내휴장일조회 API를 사용하여 휴장 여부를 정확히 판단합니다.
566 """
567 current_date = self._market_clock.get_current_kst_time().date()
568 target_date_str = current_date.strftime("%Y%m%d")
569 for _ in range(30):
570 resp = await self._broker_api_wrapper.check_holiday(target_date_str)
571 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
572 current_date += timedelta(days=1)
573 target_date_str = current_date.strftime("%Y%m%d")
574 continue
575 outputs = resp.data.get("output", []) if isinstance(resp.data, dict) else []
576 for item in outputs:
577 if item.get("bzdy_yn") == "Y": return item.get("bass_dt")
578 if outputs:
579 last_dt_str = outputs[-1].get("bass_dt", target_date_str)
580 current_date = datetime.strptime(last_dt_str, "%Y%m%d").date() + timedelta(days=1)
581 target_date_str = current_date.strftime("%Y%m%d")
582 return None