Coverage for services / market_data_service.py: 87%

373 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# services/market_data_service.py 

2import asyncio 

3import logging 

4from datetime import datetime, timedelta 

5from typing import List, Dict, Any, Optional, TYPE_CHECKING 

6from config.DynamicConfig import DynamicConfig 

7 

8from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv 

9from core.market_clock import MarketClock 

10from brokers.broker_api_wrapper import BrokerAPIWrapper 

11from common.types import ( 

12 ResPriceSummary, ResCommonResponse, ErrorCode, 

13 

14 ResTopMarketCapApiItem, ResBasicStockInfo, ResFluctuation, ResDailyChartApiItem, Exchange, 

15 ResStockFullInfoApiOutput 

16) 

17from core.cache.cache_store import CacheStore 

18from core.performance_profiler import PerformanceProfiler 

19from services.market_calendar_service import MarketCalendarService 

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 from repositories.stock_repository import StockRepository 

23 

24class MarketDataService: 

25 """ 

26 시장 데이터(현재가, 호가, OHLCV, 랭킹 등) 조회를 전담하는 서비스입니다. 

27 """ 

28 def __init__(self, broker_api_wrapper: BrokerAPIWrapper, env: KoreaInvestApiEnv, logger=None, 

29 market_clock: MarketClock = None, cache_store: Optional[CacheStore] = None, 

30 market_calendar_service: Optional[MarketCalendarService] = None, performance_profiler: Optional[PerformanceProfiler] = None, 

31 stock_repository: Optional['StockRepository'] = None): 

32 self._broker_api_wrapper = broker_api_wrapper 

33 self._env = env 

34 self._logger = logger if logger else logging.getLogger(__name__) 

35 self._market_clock = market_clock 

36 self.cache_store = cache_store 

37 self._mcs = market_calendar_service 

38 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

39 self._stock_repo = stock_repository 

40 

41 self._ETF_PREFIXES = ( 

42 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE", 

43 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS", 

44 "VITA", "TREX", "MASTER", "WOORI", "KINDEX", 

45 ) 

46 

47 async def get_name_by_code(self, code: str) -> str: 

48 """종목코드로 종목명을 반환합니다 (BrokerAPIWrapper 위임).""" 

49 return await self._broker_api_wrapper.get_name_by_code(code) 

50 

51 async def get_price_summary(self, stock_code, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

52 """주어진 종목코드에 대해 시가/현재가/등락률(%) 요약 정보를 반환합니다 (KoreaInvestApiQuotations 위임).""" 

53 self._logger.info(f"MarketDataService - {stock_code} 종목 요약 정보 조회 요청") 

54 return await self._broker_api_wrapper.get_price_summary(stock_code, exchange=exchange) 

55 

56 async def get_stock_info_by_code(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

57 """종목코드로 종목의 전체 정보를 가져옵니다 (BrokerAPIWrapper 위임).""" 

58 self._logger.info(f"MarketDataService - {stock_code} 종목 상세 정보 조회 요청") 

59 return await self._broker_api_wrapper.get_stock_info_by_code(stock_code, exchange=exchange) 

60 

61 async def get_current_price(self, stock_code, exchange: Exchange = Exchange.KRX, count_stats: bool = True, caller: str = "unknown") -> ResCommonResponse: 

62 # 1. StockRepository 단기 캐시 확인 (웹소켓 틱 갱신 또는 최근 API 조회 결과) 

63 if self._stock_repo: 63 ↛ 69line 63 didn't jump to line 69 because the condition on line 63 was always true

64 cached_data = self._stock_repo.get_current_price(stock_code, max_age_sec=3.0, count_stats=count_stats, caller=caller) 

65 if cached_data: 

66 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(Cache)", data=cached_data) 

67 

68 # 2. 장 마감 시간대에는 daily_prices DB 스냅샷 확인 (API 호출 절약) 

69 is_market_open = (await self._mcs.is_market_open_now()) if self._mcs else self._market_clock.is_market_operating_hours() 

70 if self._stock_repo and not is_market_open: 

71 db_data = await self._stock_repo.get_latest_daily_snapshot(stock_code) 

72 if db_data: 

73 # DB 데이터가 최근 거래일 기준인지 확인 (오래된 데이터 방지) 

74 latest_trading_date = await self._mcs.get_latest_trading_date() if self._mcs else None 

75 db_trade_date = db_data.get("_trade_date") or (db_data.get("output") or {}).get("stck_bsop_date") 

76 if not latest_trading_date or db_trade_date != latest_trading_date: 

77 self._logger.debug( 

78 f"MarketDataService - {stock_code} DB 스냅샷 날짜({db_trade_date}) != 최근 거래일({latest_trading_date}), API 조회로 폴백" 

79 ) 

80 else: 

81 db_data = self._wrap_snapshot_output(db_data) 

82 self._stock_repo.set_current_price(stock_code, db_data) 

83 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(DB)", data=db_data) 

84 

85 if count_stats: 85 ↛ 87line 85 didn't jump to line 87 because the condition on line 85 was always true

86 self._logger.info(f"MarketDataService - {stock_code} 현재가 조회 요청") 

87 resp = await self._broker_api_wrapper.get_current_price(stock_code, exchange=exchange) 

88 

89 # 3. 조회 결과를 StockRepository에 갱신 

90 if resp and resp.rt_cd == ErrorCode.SUCCESS.value and self._stock_repo: 

91 self._stock_repo.set_current_price(stock_code, resp.data) 

92 

93 return resp 

94 

95 async def get_stock_conclusion(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

96 """종목의 체결(체결강도 등) 정보를 조회합니다.""" 

97 self._logger.info(f"MarketDataService - {stock_code} 체결 정보 조회 요청") 

98 return await self._broker_api_wrapper.get_stock_conclusion(stock_code, exchange=exchange) 

99 

100 async def get_multi_price(self, stock_codes: list[str]) -> ResCommonResponse: 

101 """복수종목 현재가 조회 (최대 30종목)""" 

102 self._logger.info(f"MarketDataService - 복수종목 현재가 조회 요청 ({len(stock_codes)}종목)") 

103 return await self._broker_api_wrapper.get_multi_price(stock_codes) 

104 

105 async def get_top_market_cap_stocks_code(self, market_code: str, limit: int = None) -> ResCommonResponse: 

106 """ 

107 시가총액 상위 종목을 조회하고 결과를 반환합니다 (모의투자 미지원). 

108 ResCommonResponse 형태로 반환하며, data 필드에 List[ResTopMarketCapApiItem] 포함. 

109 """ 

110 if limit is None: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 limit = 30 

112 self._logger.warning(f"[경고] count 파라미터가 명시되지 않아 기본값 30을 사용합니다. market_code={market_code}") 

113 

114 self._logger.info(f"MarketDataService - 시가총액 상위 종목 조회 요청 - 시장: {market_code}, 개수: {limit}") 

115 if self._env.is_paper_trading: 115 ↛ 119line 115 didn't jump to line 119 because the condition on line 115 was always true

116 self._logger.warning("MarketDataService - 시가총액 상위 종목 조회는 모의투자를 지원하지 않습니다.") 

117 return ResCommonResponse(rt_cd=ErrorCode.INVALID_INPUT.value, msg1="모의투자 미지원 API입니다.", data=[]) 

118 

119 return await self._broker_api_wrapper.get_top_market_cap_stocks_code(market_code, limit) 

120 

121 async def get_current_upper_limit_stocks(self, rise_stocks: List) -> ResCommonResponse: 

122 """ 

123 전체 종목 리스트 중 현재 상한가에 도달한 종목을 필터링합니다. 

124 ResCommonResponse 형태로 반환하며, data 필드에 List[Dict] (종목 정보) 포함. 

125 """ 

126 results: List[ResBasicStockInfo] = [] 

127 for stock_info in rise_stocks: 

128 code = "Unknown" 

129 try: 

130 if isinstance(stock_info, dict): 

131 code = stock_info.get("stck_shrn_iscd", "Unknown") 

132 elif hasattr(stock_info, "stck_shrn_iscd"): 132 ↛ 135line 132 didn't jump to line 135 because the condition on line 132 was always true

133 code = stock_info.stck_shrn_iscd 

134 

135 fluctuation_info: ResFluctuation = stock_info 

136 code = fluctuation_info.stck_shrn_iscd 

137 current_price = int(fluctuation_info.stck_prpr) 

138 prdy_ctrt = float(fluctuation_info.prdy_ctrt) 

139 name = fluctuation_info.hts_kor_isnm 

140 change_rate = float(fluctuation_info.prdy_vrss) 

141 

142 if prdy_ctrt > 29.0: 

143 stock_info = ResBasicStockInfo( 

144 code=code, name=name, current_price=current_price, 

145 change_rate=change_rate, prdy_ctrt=prdy_ctrt 

146 ) 

147 results.append(stock_info) 

148 except Exception as e: 

149 self._logger.warning(f"{code} 현재 상한가 필터링 중 오류: {e}", exc_info=True) 

150 continue 

151 

152 self._market_clock.get_current_kst_time() 

153 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="현재 상한가 종목 필터링 성공", data=results) 

154 

155 async def get_all_stocks_code(self) -> ResCommonResponse: 

156 """ 

157 전체 종목 코드를 조회합니다. 

158 ResCommonResponse 형태로 반환하며, data 필드에 List[str] (종목 코드 리스트) 포함. 

159 """ 

160 self._logger.info("MarketDataService - 전체 종목 코드 조회 요청") 

161 try: 

162 codes = await self._broker_api_wrapper.get_all_stock_code_list() 

163 if not isinstance(codes, list): 

164 msg = f"비정상 응답 형식: {codes}" 

165 self._logger.warning(msg) 

166 return ResCommonResponse(rt_cd=ErrorCode.PARSING_ERROR.value, msg1=msg, data=[]) 

167 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="전체 종목 코드 조회 성공", data=codes) 

168 except Exception as e: 

169 error_msg = f"전체 종목 코드 조회 실패: {e}" 

170 self._logger.exception(error_msg) 

171 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=error_msg, data=[]) 

172 

173 async def get_asking_price(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

174 """종목의 실시간 호가 정보를 조회합니다.""" 

175 self._logger.info(f"MarketDataService - {stock_code} 종목 호가 정보 조회 요청") 

176 return await self._broker_api_wrapper.get_asking_price(stock_code, exchange=exchange) 

177 

178 async def get_time_concluded_prices(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

179 """종목의 시간대별 체결가 정보를 조회합니다.""" 

180 self._logger.info(f"MarketDataService - {stock_code} 종목 시간대별 체결가 조회 요청") 

181 return await self._broker_api_wrapper.get_time_concluded_prices(stock_code, exchange=exchange) 

182 

183 async def inquire_daily_itemchartprice(self, stock_code: str, start_date: str, end_date: str, fid_period_div_code: str = 'D', exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

184 """일별/분봉 주식 시세 차트 데이터를 조회합니다 (KoreaInvestApiQuotations 위임).""" 

185 self._logger.info(f"MarketDataService - {stock_code} 종목 차트 데이터 조회 요청") 

186 return await self._broker_api_wrapper.inquire_daily_itemchartprice(stock_code=stock_code, start_date=start_date, end_date=end_date, fid_period_div_code=fid_period_div_code, exchange=exchange) 

187 

188 async def get_top_rise_fall_stocks(self, rise: bool = True) -> ResCommonResponse: 

189 """상승률 또는 하락률 상위 종목을 조회합니다.""" 

190 direction = "상승" if rise else "하락" 

191 self._logger.info(f"MarketDataService - {direction}률 상위 종목 조회 요청") 

192 return await self._broker_api_wrapper.get_top_rise_fall_stocks(rise) 

193 

194 async def get_top_volume_stocks(self) -> ResCommonResponse: 

195 """거래량 상위 종목을 조회합니다.""" 

196 self._logger.info("MarketDataService - 거래량 상위 종목 조회 요청") 

197 return await self._broker_api_wrapper.get_top_volume_stocks() 

198 

199 async def get_top_trading_value_stocks(self) -> ResCommonResponse: 

200 """ 

201 거래대금 상위 종목 조회. 

202 거래량/코스피시가총액30/코스닥시가총액30/상승률/하락률 5개 기존 API 결과를 병합하여 

203 acml_tr_pbmn(거래대금) 기준 상위 30개를 반환한다. 

204 ETF/ETN 종목은 제외한다. 

205 """ 

206 t_start = self.pm.start_timer() 

207 self._logger.info("MarketDataService - 거래대금 상위 종목 조회 요청 (병합)") 

208 

209 vol_resp, mc_kospi_resp, mc_kosdaq_resp, rise_resp, fall_resp = await asyncio.gather( 

210 self._broker_api_wrapper.get_top_volume_stocks(), 

211 self._broker_api_wrapper.get_top_market_cap_stocks_code("0000", 30), 

212 self._broker_api_wrapper.get_top_market_cap_stocks_code("1001", 30), 

213 self._broker_api_wrapper.get_top_rise_fall_stocks(True), 

214 self._broker_api_wrapper.get_top_rise_fall_stocks(False), 

215 ) 

216 

217 volume_stocks = vol_resp.data if vol_resp and vol_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

218 if isinstance(volume_stocks, dict): 

219 volume_stocks = volume_stocks.get("output", []) 

220 mc_kospi = mc_kospi_resp.data if mc_kospi_resp and mc_kospi_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

221 mc_kosdaq = mc_kosdaq_resp.data if mc_kosdaq_resp and mc_kosdaq_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

222 mc_stocks = list(mc_kospi or []) + list(mc_kosdaq or []) 

223 rise_stocks = rise_resp.data if rise_resp and rise_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

224 fall_stocks = fall_resp.data if fall_resp and fall_resp.rt_cd == ErrorCode.SUCCESS.value else [] 

225 

226 def _to_dict(item): 

227 return item.to_dict() if hasattr(item, 'to_dict') else (item if isinstance(item, dict) else {}) 

228 def _get_code(d): 

229 return d.get("mksc_shrn_iscd") or d.get("stck_shrn_iscd") or d.get("iscd") or "" 

230 

231 def _compute_tr_val(d): 

232 if d.get("acml_tr_pbmn"): 

233 try: 

234 d["_tr_val_int"] = int(d["acml_tr_pbmn"] or "0") 

235 except (ValueError, TypeError): 

236 d["_tr_val_int"] = 0 

237 else: 

238 try: 

239 d["_tr_val_int"] = int(d.get("stck_prpr", "0") or "0") * int(d.get("acml_vol", "0") or "0") 

240 except (ValueError, TypeError): 

241 d["_tr_val_int"] = 0 

242 

243 merged = {} 

244 for stock in volume_stocks: 

245 d = _to_dict(stock) 

246 code = _get_code(d) 

247 if code: 247 ↛ 244line 247 didn't jump to line 244 because the condition on line 247 was always true

248 _compute_tr_val(d) 

249 merged[code] = d 

250 

251 for stock in mc_stocks + list(rise_stocks or []) + list(fall_stocks or []): 

252 d = _to_dict(stock) 

253 code = _get_code(d) 

254 if code and code not in merged: 

255 _compute_tr_val(d) 

256 merged[code] = d 

257 

258 merged = {c: d for c, d in merged.items() if not self._is_etf(d)} 

259 result = list(merged.values()) 

260 result.sort(key=lambda x: x.get("_tr_val_int", 0), reverse=True) 

261 result = result[:30] 

262 for i, stock in enumerate(result, 1): 

263 stock["data_rank"] = str(i) 

264 tr_val_int = stock.pop("_tr_val_int", None) 

265 if "acml_tr_pbmn" not in stock and tr_val_int is not None: 

266 stock["acml_tr_pbmn"] = str(tr_val_int) 

267 

268 self.pm.log_timer("MarketDataService.get_top_trading_value_stocks", t_start, threshold=1.0) 

269 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="거래대금 상위 성공", data=result) 

270 

271 def _wrap_snapshot_output(self, db_data: dict) -> dict: 

272 """get_latest_daily_snapshot의 plain dict output을 ResStockFullInfoApiOutput으로 변환합니다.""" 

273 output = db_data.get("output") 

274 if not isinstance(output, dict): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 return db_data 

276 defaults = {f: "" for f in ResStockFullInfoApiOutput.model_fields} 

277 defaults["new_hgpr_lwpr_cls_code"] = None 

278 merged = {**defaults, **output} 

279 return {**db_data, "output": ResStockFullInfoApiOutput.model_validate(merged)} 

280 

281 def _is_etf(self, stock: dict) -> bool: 

282 """종목명 기반 ETF/ETN 여부 판별.""" 

283 name = stock.get("hts_kor_isnm") or stock.get("kor_shrt_ism") or "" 

284 return any(name.startswith(prefix) for prefix in self._ETF_PREFIXES) 

285 

286 async def get_etf_info(self, etf_code: str) -> ResCommonResponse: 

287 """특정 ETF의 상세 정보를 조회합니다.""" 

288 self._logger.info(f"MarketDataService - {etf_code} ETF 정보 조회") 

289 return await self._broker_api_wrapper.get_etf_info(etf_code) 

290 

291 async def get_financial_ratio(self, stock_code: str) -> ResCommonResponse: 

292 """특정 종목의 재무비율을 조회합니다.""" 

293 self._logger.info(f"MarketDataService - {stock_code} 재무비율 조회") 

294 return await self._broker_api_wrapper.get_financial_ratio(stock_code) 

295 

296 def _normalize_ohlcv_rows(self, items: List[Any]) -> List[dict]: 

297 """ 

298 한국투자 일봉 응답(ResDailyChartApiItem list) 또는 dict list를 

299 표준 OHLCV 스키마로 정규화한다. 

300 반환: [{"date":"YYYYMMDD","open":float,"high":float,"low":float,"close":float,"volume":int}, ...] 

301 """ 

302 def _get(it, attr, default=None): 

303 if it is None: return default 303 ↛ exitline 303 didn't return from function '_get' because the return on line 303 wasn't executed

304 if isinstance(it, dict): return it.get(attr, default) 

305 return getattr(it, attr, default) 

306 def _to_float(x): 

307 try: return float(str(x).replace(",", "")) 

308 except Exception: return None 

309 def _to_int(x): 

310 try: return int(float(str(x).replace(",", ""))) 

311 except Exception: return None 

312 rows = [] 

313 for it in items or []: 

314 date = _get(it, "stck_bsop_date") or _get(it, "date") 

315 if not date: continue 

316 rows.append({ 

317 "date": date, 

318 "open": _to_float(_get(it, "stck_oprc") or _get(it, "open")), 

319 "high": _to_float(_get(it, "stck_hgpr") or _get(it, "high")), 

320 "low": _to_float(_get(it, "stck_lwpr") or _get(it, "low")), 

321 "close": _to_float(_get(it, "stck_clpr") or _get(it, "close")), 

322 "volume": _to_int(_get(it, "acml_vol") or _get(it, "volume")), 

323 }) 

324 rows.sort(key=lambda r: r["date"]) 

325 return rows 

326 

327 def _calc_range_by_period(self, period: str, end_dt: datetime | None, limit: int | None = None) -> tuple[str, str]: 

328 """ 

329 period: 'D'|'W'|'M' 

330 end_dt: 기준일(없으면 now KST) 

331 limit : 원하는 봉 개수(없으면 합리적 기본값) 

332 return: (start_yyyymmdd, end_yyyymmdd) 

333 """ 

334 if end_dt is None: end_dt = self._market_clock.get_current_kst_time() 

335 period = (period or "D").upper() 

336 if period == "D": start_dt = end_dt - timedelta(days=max((limit or 365) * 2, 730)) 

337 elif period == "W": start_dt = end_dt - timedelta(weeks=max((limit or 52) * 2, 104)) 

338 elif period == "M": start_dt = end_dt - timedelta(days=max((limit or 24) * 2, 60) * 31) 

339 else: start_dt = end_dt - timedelta(days=max((limit or 120) * 2, 240)) 

340 return self._market_clock.to_yyyymmdd(start_dt), self._market_clock.to_yyyymmdd(end_dt) 

341 

342 async def _fetch_past_daily_ohlcv(self, stock_code: str, end_yyyymmdd: str, max_loops: int = 8) -> List[dict]: 

343 """ 

344 과거 일봉 데이터를 반복 조회하여 수집합니다. 

345 :param end_yyyymmdd: 조회 종료일 (보통 어제 날짜) 

346 :param max_loops: 반복 횟수 (10회 * 100일 = 약 1000일 ≈ 692 거래일) 

347 """ 

348 t_start = self.pm.start_timer() 

349 tasks = [] 

350 curr_end_dt = datetime.strptime(end_yyyymmdd, "%Y%m%d") 

351 

352 for _ in range(max_loops): 

353 curr_start_dt = curr_end_dt - timedelta(days=DynamicConfig.OHLCV.DAILY_ITEMCHARTPRICE_MAX_RANGE) 

354 s_date = self._market_clock.to_yyyymmdd(curr_start_dt) 

355 e_date = self._market_clock.to_yyyymmdd(curr_end_dt) 

356 

357 tasks.append(self._broker_api_wrapper.inquire_daily_itemchartprice( 

358 stock_code=stock_code, start_date=s_date, end_date=e_date, fid_period_div_code="D" 

359 )) 

360 curr_end_dt = curr_start_dt - timedelta(days=1) 

361 

362 responses = await asyncio.gather(*tasks, return_exceptions=True) 

363 

364 all_rows_map = {} 

365 for raw in responses: 

366 if isinstance(raw, Exception) or not raw or raw.rt_cd != ErrorCode.SUCCESS.value: 

367 continue 

368 rows = self._normalize_ohlcv_rows(raw.data) 

369 for r in rows: 

370 all_rows_map[r['date']] = r 

371 

372 all_rows = sorted(all_rows_map.values(), key=lambda x: x['date']) 

373 

374 self.pm.log_timer(f"MarketData._fetch_past_daily_ohlcv({stock_code})", t_start) 

375 return all_rows 

376 

377 async def _fetch_today_ohlcv(self, stock_code: str, today_str: str, caller: str = "unknown") -> List[dict]: 

378 try: 

379 current_resp = await self.get_current_price(stock_code, caller=caller) 

380 if current_resp.rt_cd == ErrorCode.SUCCESS.value and current_resp.data: 380 ↛ 394line 380 didn't jump to line 394 because the condition on line 380 was always true

381 output = current_resp.data.get('output') 

382 if output: 382 ↛ 394line 382 didn't jump to line 394 because the condition on line 382 was always true

383 def _get_val(obj, attr_name): 

384 return obj.get(attr_name) if isinstance(obj, dict) else getattr(obj, attr_name, None) 

385 opn = _get_val(output, 'stck_oprc') 

386 high = _get_val(output, 'stck_hgpr') 

387 low = _get_val(output, 'stck_lwpr') 

388 close = _get_val(output, 'stck_prpr') 

389 vol = _get_val(output, 'acml_vol') 

390 if opn and high and low and close: 390 ↛ 394line 390 didn't jump to line 394 because the condition on line 390 was always true

391 return [{"date": today_str, "open": float(opn), "high": float(high), "low": float(low), "close": float(close), "volume": int(vol) if vol else 0}] 

392 except Exception as e: 

393 self._logger.warning(f"오늘자 OHLCV 구성을 위한 현재가 조회 실패: {e}") 

394 return [] 

395 

396 async def get_ohlcv(self, stock_code: str, period: str = "D", caller: str = "unknown", exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

397 """ 

398 일봉(D)의 경우 StockRepository 활용, 주봉/월봉은 기존 API 조회 방식 사용. 

399 """ 

400 t_ohlcv = self.pm.start_timer() 

401 if (period or "D").upper() == "D": 

402 now_dt = self._market_clock.get_current_kst_time() 

403 today_str = now_dt.strftime("%Y%m%d") 

404 yesterday_str = (now_dt - timedelta(days=1)).strftime("%Y%m%d") 

405 past_rows = [] 

406 

407 # 1. 과거 데이터 가져오기 (StockRepository 캐시/DB 활용) 

408 stock_data = None 

409 if self._stock_repo: 409 ↛ 416line 409 didn't jump to line 416 because the condition on line 409 was always true

410 stock_data = await self._stock_repo.get_stock_data(stock_code, ohlcv_limit=600, caller=caller) 

411 if stock_data and "ohlcv" in stock_data: 

412 past_rows = stock_data["ohlcv"] 

413 

414 # 2. 백필 범위 결정: 장 마감 후에는 오늘 최종 캔들이 확정 → today_str까지 조회 

415 # 장 중에는 오늘 캔들이 미완성 → yesterday_str까지만 조회 

416 is_market_open = (await self._mcs.is_market_open_now()) if self._mcs else False 

417 fetch_end_date = yesterday_str if is_market_open else today_str 

418 

419 # historical_complete=True면 DB가 줄 수 있는 전부를 이미 보유 중 → 전체 백필 생략 

420 # 단, DB 최신일 < fetch_end_date이면 누락 구간 보완 백필 수행 

421 # 그렇지 않으면(첫 조회 또는 캐시 없음) API 백필 수행 (약 1000일 ≈ 692 거래일) 

422 latest_in_db = past_rows[-1]['date'] if past_rows else None 

423 if not stock_data or not stock_data.get("historical_complete"): 

424 past_rows = await self._fetch_past_daily_ohlcv(stock_code, fetch_end_date, max_loops=10) 

425 if self._stock_repo and past_rows: 

426 await self._stock_repo.upsert_ohlcv([{**r, "code": stock_code} for r in past_rows]) 

427 elif latest_in_db and latest_in_db < fetch_end_date: 

428 # DB 데이터가 오래됨 (거래일 누락) → 최근 구간만 보완 백필 후 병합 

429 new_rows = await self._fetch_past_daily_ohlcv(stock_code, fetch_end_date, max_loops=1) 

430 if new_rows: 430 ↛ 439line 430 didn't jump to line 439 because the condition on line 430 was always true

431 if self._stock_repo: 431 ↛ 434line 431 didn't jump to line 434 because the condition on line 431 was always true

432 await self._stock_repo.upsert_ohlcv([{**r, "code": stock_code} for r in new_rows]) 

433 # 기존 데이터와 병합 (날짜 기준 dedup → 오름차순 정렬) 

434 merged = {r['date']: r for r in past_rows} 

435 merged.update({r['date']: r for r in new_rows}) 

436 past_rows = sorted(merged.values(), key=lambda r: r['date']) 

437 

438 # 3. 오늘 데이터 처리 (실시간 API 병합) - 장 중에만 수행 (장 마감 후는 backfill에서 포함) 

439 today_rows = [] 

440 if is_market_open: 

441 today_rows = await self._fetch_today_ohlcv(stock_code, today_str, caller=caller) 

442 if today_rows and now_dt.weekday() >= 5: today_rows = [] 

443 elif today_rows and past_rows: 443 ↛ 449line 443 didn't jump to line 449 because the condition on line 443 was always true

444 last_past = past_rows[-1] 

445 today = today_rows[0] 

446 if today['date'] != last_past['date'] and (today['open'] == last_past['open'] and today['close'] == last_past['close']): 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true

447 today_rows = [] 

448 

449 if today_rows and not is_market_open: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 if not past_rows or today_rows[0]['date'] > past_rows[-1]['date']: 

451 if self._stock_repo: await self._stock_repo.upsert_ohlcv([{**today_rows[0], "code": stock_code}]) 

452 past_rows = past_rows + today_rows 

453 today_rows = [] 

454 

455 # past_rows는 이미 날짜 오름차순 정렬 상태 → O(1) 업데이트 

456 final_rows = past_rows.copy() 

457 if today_rows: 

458 today = today_rows[0] 

459 if final_rows and final_rows[-1]['date'] == today['date']: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 final_rows[-1] = today 

461 else: 

462 final_rows.append(today) 

463 self.pm.log_timer(f"MarketData.get_ohlcv({stock_code})", t_ohlcv) 

464 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1=f"OHLCV {len(final_rows)}건", data=final_rows) 

465 else: 

466 s, e = self._calc_range_by_period(period, end_dt=self._market_clock.get_current_kst_time()) 

467 raw = await self._broker_api_wrapper.inquire_daily_itemchartprice(stock_code, start_date=s, end_date=e, fid_period_div_code=period.upper(), exchange=exchange) 

468 if not raw or raw.rt_cd != ErrorCode.SUCCESS.value: return raw 468 ↛ exitline 468 didn't return from function 'get_ohlcv' because the return on line 468 wasn't executed

469 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=self._normalize_ohlcv_rows(raw.data)) 

470 

471 async def get_ohlcv_range(self, stock_code: str, period: str = "D", start_date: Optional[str] = None, end_date: Optional[str] = None, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

472 """ 

473 시작일~종료일 범위형 차트 API 호출 (일/분 공통). 

474 """ 

475 ed = end_date or self._market_clock.get_current_kst_time() 

476 sd = start_date or (datetime.strptime(ed, "%Y%m%d") - timedelta(days=240)).strftime("%Y%m%d") 

477 raw = await self._broker_api_wrapper.inquire_daily_itemchartprice(stock_code, start_date=sd, end_date=ed, fid_period_div_code=period.upper(), exchange=exchange) 

478 if not raw or raw.rt_cd != ErrorCode.SUCCESS.value: return raw 

479 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=self._normalize_ohlcv_rows(raw.data)) 

480 

481 async def get_recent_daily_ohlcv(self, code: str, limit: int = DynamicConfig.OHLCV.DAILY_ITEMCHARTPRICE_MAX_RANGE, end_date: Optional[str] = None, start_date: Optional[str] = None, exchange: Exchange = Exchange.KRX) -> List[Dict[str, Any]]: 

482 """ 

483 최근 'limit'개 *거래일* 일봉을 반환. 

484 API는 시작/종료일 범위를 요구하므로 넉넉한 범위로 받아서 슬라이스로 limit개 보장. 

485 한국투자증권 API 특성상 긴 기간(1년 이상) 조회 시 데이터가 잘릴 수 있으므로, 

486 1년 단위로 끊어서 반복 호출하여 병합한다. 

487 

488 DB-first 최적화: end_date/start_date 미지정 시(최근 N건 요청) DB에 충분한 데이터가 

489 있으면 API를 호출하지 않고 DB에서 바로 반환한다. ohlcv_update_task가 장 마감 후 

490 전 종목을 갱신하므로 장중에는 API 호출 없이 DB 조회만으로 충분하다. 

491 """ 

492 t_start = self.pm.start_timer() 

493 current_ed_dt = datetime.strptime(end_date, "%Y%m%d") if end_date else self._market_clock.get_current_kst_time() 

494 

495 # DB-first: 날짜 범위가 명시되지 않은 경우에만 시도 

496 if not end_date and not start_date and self._stock_repo: 

497 stock_data = await self._stock_repo.get_stock_data(code, ohlcv_limit=limit, caller="get_recent_daily_ohlcv") 

498 if stock_data: 

499 rows = stock_data.get("ohlcv", []) 

500 if len(rows) >= limit: 

501 self.pm.log_timer(f"MarketData.get_recent_daily_ohlcv({code})[DB]", t_start) 

502 return rows[-limit:] 

503 

504 if start_date: 

505 

506 resp = await self.get_ohlcv_range(code, "D", start_date, self._market_clock.to_yyyymmdd(current_ed_dt), exchange=exchange) 

507 return resp.data or [] if resp and resp.rt_cd == ErrorCode.SUCCESS.value else [] 

508 

509 # limit을 만족하기 위해 대략 영업일을 고려해 1.5를 곱하고 100일 단위로 끊습니다. 

510 max_loops = int((limit * 1.5) // 100) + 1 

511 max_loops = max(1, min(max_loops, 20)) # 최소 1, 최대 20으로 한정 

512 

513 tasks = [] 

514 curr_end_dt = current_ed_dt 

515 for _ in range(max_loops): 

516 ed_str = self._market_clock.to_yyyymmdd(curr_end_dt) 

517 curr_start_dt = curr_end_dt - timedelta(days=100) 

518 sd_str = self._market_clock.to_yyyymmdd(curr_start_dt) 

519 

520 tasks.append(self.get_ohlcv_range(code, "D", sd_str, ed_str)) 

521 curr_end_dt = curr_start_dt - timedelta(days=1) 

522 

523 responses = await asyncio.gather(*tasks, return_exceptions=True) 

524 

525 all_rows_map = {} 

526 for resp in responses: 

527 if isinstance(resp, Exception) or not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

528 continue 

529 for r in (resp.data or []): 

530 all_rows_map[r['date']] = r 

531 

532 all_rows = sorted(all_rows_map.values(), key=lambda x: x['date']) 

533 

534 if len(all_rows) > limit: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 all_rows = all_rows[-limit:] 

536 

537 self.pm.log_timer(f"MarketData.get_recent_daily_ohlcv({code})", t_start) 

538 return all_rows 

539 

540 async def get_intraday_minutes_today(self, *, stock_code: str, input_hour_1: str) -> ResCommonResponse: 

541 """ 

542 URL: /quotations/inquire-time-itemchartprice 

543 TR : FHKST03010200 (모의/실전 공통) 

544 """ 

545 return await self._broker_api_wrapper.inquire_time_itemchartprice(stock_code=stock_code, input_hour_1=input_hour_1, pw_data_incu_yn="Y", etc_cls_code="0") 

546 

547 async def get_intraday_minutes_by_date(self, *, stock_code: str, input_date_1: str, input_hour_1: str = "") -> ResCommonResponse: 

548 """ 

549 URL: /quotations/inquire-time-dailychartprice 

550 TR : FHKST03010230 (실전만) 

551 """ 

552 if self._env.is_paper_trading: return ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="모의투자 미지원", data=[]) 

553 return await self._broker_api_wrapper.inquire_time_dailychartprice(stock_code=stock_code, input_date_1=input_date_1, input_hour_1=input_hour_1, pw_data_incu_yn="Y", fake_tick_incu_yn="") 

554 

555 async def get_latest_trading_date(self) -> Optional[str]: 

556 """ 

557 대표 종목(삼성전자)의 일봉을 조회하여 데이터가 존재하는 가장 최근 날짜를 반환한다. 

558 """ 

559 if self._mcs: return await self._mcs.get_latest_trading_date() 

560 return None 

561 

562 async def get_next_open_day(self) -> Optional[str]: 

563 """ 

564 현재 날짜 기준으로 가장 빠른 개장일을 찾아 반환합니다 (YYYYMMDD). 

565 국내휴장일조회 API를 사용하여 휴장 여부를 정확히 판단합니다. 

566 """ 

567 current_date = self._market_clock.get_current_kst_time().date() 

568 target_date_str = current_date.strftime("%Y%m%d") 

569 for _ in range(30): 

570 resp = await self._broker_api_wrapper.check_holiday(target_date_str) 

571 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

572 current_date += timedelta(days=1) 

573 target_date_str = current_date.strftime("%Y%m%d") 

574 continue 

575 outputs = resp.data.get("output", []) if isinstance(resp.data, dict) else [] 

576 for item in outputs: 

577 if item.get("bzdy_yn") == "Y": return item.get("bass_dt") 

578 if outputs: 

579 last_dt_str = outputs[-1].get("bass_dt", target_date_str) 

580 current_date = datetime.strptime(last_dt_str, "%Y%m%d").date() + timedelta(days=1) 

581 target_date_str = current_date.strftime("%Y%m%d") 

582 return None