Coverage for brokers / korea_investment / korea_invest_quotations_api.py: 97%

450 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# brokers/korea_investment/korea_invest_quotations_api.py 

2import httpx 

3from datetime import datetime 

4from typing import Dict, List, Union, Optional 

5from pydantic import ValidationError 

6from brokers.korea_investment.korea_invest_api_base import KoreaInvestApiBase 

7from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv 

8from brokers.korea_investment.korea_invest_params_provider import Params 

9from brokers.korea_investment.korea_invest_header_provider import KoreaInvestHeaderProvider 

10from brokers.korea_investment.korea_invest_url_provider import KoreaInvestUrlProvider 

11from brokers.korea_investment.korea_invest_url_keys import EndpointKey 

12from brokers.korea_investment.korea_invest_trid_provider import KoreaInvestTrIdProvider 

13from brokers.korea_investment.korea_invest_trid_keys import TrIdLeaf 

14# common/types에서 모든 ResTypedDict와 ErrorCode 임포트 

15from common.types import ( 

16 ResPriceSummary, ResMomentumStock, ResCommonResponse, ErrorCode, 

17 ResStockFullInfoApiOutput, ResTopMarketCapApiItem, ResDailyChartApiItem, ResFluctuation, 

18 Exchange, 

19) 

20 

21 

22def _exchange_to_market_code(exchange: Exchange) -> str: 

23 """Exchange enum을 API 마켓코드 문자열로 변환합니다.""" 

24 if exchange == Exchange.NXT: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true

25 return "NX" 

26 elif exchange == Exchange.UN: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 return "UN" 

28 return "J" # KRX 기본값 

29 

30 

31class KoreaInvestApiQuotations(KoreaInvestApiBase): 

32 def __init__(self, 

33 env: KoreaInvestApiEnv, 

34 logger=None, 

35 market_clock=None, 

36 async_client: Optional[httpx.AsyncClient] = None, 

37 header_provider: Optional[KoreaInvestHeaderProvider] = None, 

38 url_provider: Optional[KoreaInvestUrlProvider] = None, 

39 trid_provider: Optional[KoreaInvestTrIdProvider] = None): 

40 super().__init__(env, 

41 logger, 

42 market_clock, 

43 async_client=async_client, 

44 header_provider=header_provider, 

45 url_provider=url_provider, 

46 trid_provider=trid_provider) 

47 

48 async def get_stock_info_by_code(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

49 """ 

50 종목코드로 종목의 전체 정보 (이름, 현재가, 시가총액 등)를 가져옵니다. 

51 ResCommonResponse 형태로 반환하며, data 필드에 ResStockFullInfoApiOutput 포함. 

52 """ 

53 full_config = self._env.active_config 

54 tr_id = self._trid_provider.quotations(TrIdLeaf.SEARCH_INFO) # 모드에 따라 자동 

55 

56 self._headers.set_tr_id(tr_id) 

57 self._headers.set_custtype(full_config['custtype']) 

58 

59 params = Params.search_info(stock_code=stock_code, prdt_type_cd=full_config["params"]["fid_div_cls_code"]) 

60 

61 self._logger.info(f"{stock_code} 종목 정보 조회 시도... (exchange={exchange.value})") 

62 response: ResCommonResponse = await self.call_api("GET", EndpointKey.SEARCH_INFO, params=params, retry_count=1) 

63 

64 if response.rt_cd == ErrorCode.SUCCESS.value: 

65 try: 

66 # Pydantic 모델 필수 필드 누락 방지 (API 응답에 없을 경우 기본값 설정) 

67 if "new_hgpr_lwpr_cls_code" not in response.data: 

68 response.data["new_hgpr_lwpr_cls_code"] = "-" 

69 

70 stock_info_data = ResStockFullInfoApiOutput(**response.data) 

71 return ResCommonResponse( 

72 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용 

73 msg1="종목 정보 조회 성공", 

74 data=stock_info_data 

75 ) 

76 except (TypeError, ValidationError) as e: 

77 error_msg = f"{stock_code} 종목 정보 응답 형식 오류: {e}, 응답: {response.data}" 

78 self._logger.error(error_msg) 

79 return ResCommonResponse( 

80 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용 

81 msg1=error_msg, 

82 data=None 

83 ) 

84 else: 

85 error_msg = f"{stock_code} 종목 정보 조회 실패: {response.msg1 or '알 수 없는 오류'}, 응답: {response}" 

86 self._logger.warning(error_msg) 

87 return ResCommonResponse( 

88 rt_cd=response.get("rt_cd", ErrorCode.API_ERROR.value) if isinstance(response, 

89 dict) else ErrorCode.API_ERROR.value, 

90 msg1=error_msg, 

91 data=None 

92 ) 

93 

94 async def get_current_price(self, stock_code, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

95 """ 

96 현재가를 조회합니다. API 원본 응답을 ResCommonResponse의 data 필드에 담아 반환. 

97 """ 

98 full_config = self._env.active_config 

99 tr_id = self._trid_provider.quotations(TrIdLeaf.INQUIRE_PRICE) # 모드에 따라 자동 

100 

101 self._headers.set_tr_id(tr_id) 

102 self._headers.set_custtype(full_config['custtype']) 

103 

104 market_code = _exchange_to_market_code(exchange) 

105 params = Params.inquire_price(stock_code=stock_code, market=market_code) 

106 self._logger.info(f"{stock_code} 현재가 조회 시도...") 

107 

108 response: ResCommonResponse = await self.call_api("GET", EndpointKey.INQUIRE_PRICE, params=params, 

109 retry_count=3) 

110 

111 if response.rt_cd != ErrorCode.SUCCESS.value: 

112 self._logger.warning("현재가 조회 실패") 

113 return response 

114 

115 try: 

116 response_data_dict = response.data['output'] 

117 # Pydantic 모델 필수 필드 누락 방지 (API 응답에 없을 경우 기본값 설정) 

118 if "new_hgpr_lwpr_cls_code" not in response_data_dict: 

119 response_data_dict["new_hgpr_lwpr_cls_code"] = "-" 

120 

121 response.data['output'] = ResStockFullInfoApiOutput.from_dict(response_data_dict) 

122 except (KeyError, ValidationError, TypeError) as e: 

123 error_msg = f"현재가 응답 데이터 파싱 실패: {stock_code}, 오류: {e}" 

124 self._logger.error(error_msg) 

125 return ResCommonResponse( 

126 rt_cd=ErrorCode.PARSING_ERROR.value, 

127 msg1=error_msg, 

128 data=None 

129 ) 

130 return response 

131 

132 async def get_stock_conclusion(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

133 """주식 체결(체결강도 포함) 정보를 조회합니다.""" 

134 full_config = self._env.active_config 

135 tr_id = self._trid_provider.quotations(TrIdLeaf.INQUIRE_CONCLUSION) 

136 

137 self._headers.set_tr_id(tr_id) 

138 self._headers.set_custtype(full_config['custtype']) 

139 

140 market_code = _exchange_to_market_code(exchange) 

141 params = Params.inquire_conclusion(stock_code=stock_code, market=market_code) 

142 self._logger.info(f"{stock_code} 체결(체결강도) 조회 시도...") 

143 return await self.call_api("GET", EndpointKey.INQUIRE_CONCLUSION, params=params, retry_count=1) 

144 

145 async def get_price_summary(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

146 """ 

147 주어진 종목코드에 대해 시가/현재가/등락률(%) 요약 정보 반환 

148 ResCommonResponse 형태로 반환하며, data 필드에 ResPriceSummary 포함. 

149 """ 

150 response_common = await self.get_current_price(stock_code, exchange=exchange) 

151 

152 if response_common.rt_cd != ErrorCode.SUCCESS.value: 

153 self._logger.warning(f"({stock_code}) get_current_price 실패: {response_common.msg1}") 

154 return ResCommonResponse( 

155 rt_cd=ErrorCode.API_ERROR.value, 

156 msg1="get_current_price 실패", 

157 data=None 

158 ) 

159 

160 output: ResStockFullInfoApiOutput = response_common.data['output'] 

161 

162 if not output: 

163 error_msg = f"API 응답 output 데이터 없음: {stock_code}, 응답: {response_common.msg1}" 

164 self._logger.warning(error_msg) 

165 return ResCommonResponse( 

166 rt_cd=ErrorCode.API_ERROR.value, # Enum 값 사용 

167 msg1=error_msg, 

168 data=None 

169 ) 

170 if not isinstance(output, ResStockFullInfoApiOutput): 

171 error_msg = f"Wrong Ret Type ResStockFullInfoApiOutput - Ret: {type(output)}, 응답: {response_common.msg1}" 

172 self._logger.warning(error_msg) 

173 return ResCommonResponse( 

174 rt_cd=ErrorCode.WRONG_RET_TYPE.value, # Enum 값 사용 

175 msg1=error_msg, 

176 data=None 

177 ) 

178 

179 # ✅ 필수 키 누락 체크 

180 required_keys = ["stck_oprc", "stck_prpr", "prdy_ctrt"] 

181 if not all(hasattr(output, k) and getattr(output, k) is not None for k in required_keys): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 error_msg = f"API 응답 output에 필수 가격 데이터 누락: {stock_code}, 응답: {output}" 

183 self._logger.warning(error_msg) 

184 return ResCommonResponse( 

185 rt_cd=ErrorCode.PARSING_ERROR.value, 

186 msg1=error_msg, 

187 data=None 

188 ) 

189 

190 try: 

191 open_price = int(output.stck_oprc) 

192 current_price = int(output.stck_prpr) 

193 prdy_ctrt = float(output.prdy_ctrt) 

194 except (ValueError, TypeError) as e: 

195 error_msg = f"가격 데이터 파싱 실패: {stock_code}, 응답: {output}, 오류: {e}" 

196 self._logger.warning(error_msg) 

197 return ResCommonResponse( 

198 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용 

199 msg1=error_msg, 

200 data=None 

201 ) 

202 

203 change_rate = (current_price - open_price) / open_price * 100 if open_price else 0 

204 

205 # 신고/신저가 상태 확인 및 로깅 

206 raw_status_code = str(getattr(output, 'new_hgpr_lwpr_cls_code', '')) 

207 if "vs" in raw_status_code: 

208 self._logger.warning(f"({stock_code}) 신고/신저가 불일치: {raw_status_code}") 

209 # 최종 응답에는 가공된 값을 사용 

210 new_high_low_status = output.new_high_low_status 

211 if "vs" in new_high_low_status: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 self._logger.warning(f"({stock_code}) 신고/신저가 불일치: {new_high_low_status}") 

213 

214 price_summary_data = ResPriceSummary( 

215 symbol=stock_code, 

216 open=open_price, 

217 current=current_price, 

218 change_rate=round(change_rate, 2), 

219 prdy_ctrt=prdy_ctrt, 

220 new_high_low_status=new_high_low_status, 

221 ) 

222 return ResCommonResponse( 

223 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용 

224 msg1="정상 처리되었습니다.", 

225 data=price_summary_data 

226 ) 

227 

228 async def get_market_cap(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

229 """ 

230 종목코드로 시가총액을 반환합니다. (단위: 원) 

231 ResCommonResponse 형태로 반환하며, data 필드에 int 시가총액 값 포함. 

232 """ 

233 response_common = await self.get_stock_info_by_code(stock_code, exchange=exchange) 

234 

235 if response_common.rt_cd != ErrorCode.SUCCESS.value: # Enum 값 사용 

236 return response_common 

237 

238 info: ResStockFullInfoApiOutput = response_common.data 

239 

240 if info is None: 

241 error_msg = f"{stock_code} 시가총액 정보 조회 실패: ResStockFullInfoApiOutput 데이터가 None" 

242 self._logger.warning(error_msg) 

243 return ResCommonResponse( 

244 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용 

245 msg1=error_msg, 

246 data=None 

247 ) 

248 

249 market_cap_str = info.stck_llam 

250 if market_cap_str and market_cap_str.isdigit(): 

251 market_cap = int(market_cap_str) 

252 return ResCommonResponse( 

253 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용 

254 msg1="시가총액 조회 성공", 

255 data=market_cap 

256 ) 

257 else: 

258 error_msg = f"{stock_code} 시가총액 정보 없음 또는 형식 오류: {market_cap_str}" 

259 self._logger.warning(error_msg) 

260 return ResCommonResponse( 

261 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용 

262 msg1=error_msg, 

263 data=0 

264 ) 

265 

266 async def get_top_market_cap_stocks_code(self, market_code: str, count: int = 30) -> ResCommonResponse: 

267 """ 

268 시가총액 상위 종목 목록을 반환합니다. 최대 30개까지만 지원됩니다. 

269 ResCommonResponse 형태로 반환하며, data 필드에 List[ResTopMarketCapApiItem] 포함. 

270 """ 

271 full_config = self._env.active_config 

272 

273 if count <= 0: 

274 error_msg = f"요청된 count가 0 이하입니다. count={count}" 

275 self._logger.warning(error_msg) 

276 return ResCommonResponse( 

277 rt_cd=ErrorCode.INVALID_INPUT.value, # Enum 값 사용 

278 msg1=error_msg, 

279 data=[] 

280 ) 

281 

282 if count > 30: 

283 self._logger.warning(f"요청 수 {count}는 최대 허용값 30을 초과하므로 30개로 제한됩니다.") 

284 count = 30 

285 

286 tr_id = self._trid_provider.quotations(TrIdLeaf.MARKET_CAP) 

287 self._headers.set_tr_id(tr_id) 

288 self._headers.set_custtype(full_config['custtype']) 

289 

290 params = Params.top_market_cap(input_iscd=market_code) 

291 

292 self._logger.info(f"시가총액 상위 종목 조회 시도 (시장코드: {market_code}, 요청개수: {count})") 

293 response: ResCommonResponse = await self.call_api("GET", EndpointKey.MARKET_CAP, params=params, retry_count=3) 

294 

295 if response.rt_cd != ErrorCode.SUCCESS.value: 

296 self._logger.warning(f"시가총액 응답 오류 또는 비어 있음: 시가총액 조회 실패") 

297 return ResCommonResponse( 

298 rt_cd=response.get("rt_cd", ErrorCode.API_ERROR.value) if isinstance(response, 

299 dict) else ErrorCode.API_ERROR.value, 

300 # Enum 값 사용 

301 msg1="시가총액 조회 실패", 

302 data=[] 

303 ) 

304 

305 batch = response.data.get('output', '')[:count] 

306 self._logger.info(f"API로부터 수신한 종목 수: {len(batch)}") 

307 

308 results: list[ResTopMarketCapApiItem] = [] 

309 

310 for raw in batch: 

311 try: 

312 # 1) 필수키 정규화 (별칭 허용) 

313 code = raw.get("mksc_shrn_iscd") or raw.get("iscd") 

314 market_cap = raw.get("stck_avls") # 문자열 숫자 유지 

315 

316 if not code or not market_cap: 

317 # 필수값 없으면 스킵 

318 continue 

319 

320 # 2) 선택/별칭 필드들까지 묶어서 안전 생성 

321 # - dataclass 내부 __post_init__에서 iscd/acc_trdvol ↔ mksc_shrn_iscd/acml_vol 보완 

322 item = ResTopMarketCapApiItem( 

323 mksc_shrn_iscd=code, 

324 stck_avls=str(market_cap), 

325 data_rank=str(raw.get("data_rank", "")), 

326 hts_kor_isnm=raw.get("hts_kor_isnm", ""), 

327 

328 # 시세/등락(옵션) 

329 stck_prpr=str(raw.get("stck_prpr")) if raw.get("stck_prpr") is not None else None, 

330 prdy_vrss=str(raw.get("prdy_vrss")) if raw.get("prdy_vrss") is not None else None, 

331 prdy_vrss_sign=raw.get("prdy_vrss_sign"), 

332 prdy_ctrt=str(raw.get("prdy_ctrt")) if raw.get("prdy_ctrt") is not None else None, 

333 

334 # 거래/상장(옵션) 

335 acml_vol=str(raw.get("acml_vol")) if raw.get("acml_vol") is not None else None, 

336 lstn_stcn=str(raw.get("lstn_stcn")) if raw.get("lstn_stcn") is not None else None, 

337 

338 # 시장 비중(옵션) 

339 mrkt_whol_avls_rlim=str(raw.get("mrkt_whol_avls_rlim")) if raw.get( 

340 "mrkt_whol_avls_rlim") is not None else None, 

341 

342 # 호환 alias (있으면 넣고, 없어도 __post_init__에서 보완) 

343 iscd=raw.get("iscd"), 

344 acc_trdvol=str(raw.get("acc_trdvol")) if raw.get("acc_trdvol") is not None else None, 

345 ) 

346 

347 results.append(item) 

348 

349 except (ValueError, TypeError, KeyError, ValidationError) as e: 

350 self._logger.warning(f"시가총액 상위 종목 개별 항목 파싱 오류: {e}, 항목: {raw}") 

351 continue 

352 

353 # (선택) 정렬: API가 정렬 보장 안 하면 rank 우선, 없으면 시총 내림차순 

354 def _to_int_safe(s, default=10 ** 12): 

355 try: 

356 return int(str(s)) 

357 except Exception: 

358 return default 

359 

360 if results: 

361 # data_rank가 있으면 그걸로, 없으면 stck_avls(큰값 우선) 

362 if any(r.data_rank for r in results): 

363 results.sort(key=lambda r: _to_int_safe(r.data_rank, default=10 ** 12)) 

364 else: 

365 results.sort(key=lambda r: _to_int_safe(r.stck_avls, default=0), reverse=True) 

366 

367 # 응답: 결과 없으면 "없음"으로 분기(뷰 로직에 맞춰 조정) 

368 if results: 

369 return ResCommonResponse( 

370 rt_cd=ErrorCode.SUCCESS.value, 

371 msg1="시가총액 상위 종목 조회 성공", 

372 data=results, 

373 ) 

374 else: 

375 return ResCommonResponse( 

376 rt_cd=ErrorCode.EMPTY_VALUES.value, # 없으면 EMPTY_VALUES 등 내부 규약 코드 사용 

377 msg1="시가총액 상위 종목 없음", 

378 data=[], 

379 ) 

380 

381 async def inquire_daily_itemchartprice(self, stock_code: str, 

382 start_date: str = '', 

383 end_date: str = '', 

384 fid_period_div_code: str = 'D', 

385 fid_input_iscd: str = '00', 

386 fid_org_adj_prc: str = '0', 

387 exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

388 """ 

389 일별/주별/월별/분별/틱별 주식 시세 차트 데이터를 조회합니다. 

390 TRID: FHKST03010100 (일별), FHNKF03060000 (분봉) 

391 ResCommonResponse 형태로 반환하며, data 필드에 List[ResDailyChartApiItem] 포함. 

392 """ 

393 valid_period_codes = {"D", "W", "M", "Y"} 

394 if fid_period_div_code not in valid_period_codes: 

395 error_msg = f"지원하지 않는 fid_period_div_code: {fid_period_div_code}" 

396 self._logger.error(error_msg) 

397 return ResCommonResponse( 

398 rt_cd=ErrorCode.INVALID_INPUT.value, 

399 msg1=error_msg, 

400 data=[] 

401 ) 

402 

403 tr_id = self._trid_provider.daily_itemchartprice() 

404 

405 self._logger.debug(f"차트 조회 시도 현재 tr_id: {tr_id}") 

406 

407 market_code = _exchange_to_market_code(exchange) 

408 params = Params.daily_itemchartprice(stock_code=stock_code, start_date=start_date, end_date=end_date, period=fid_period_div_code, market=market_code) 

409 

410 with self._headers.temp(tr_id=tr_id): 

411 response_data: ResCommonResponse = await self.call_api( 

412 "GET", EndpointKey.DAILY_ITEMCHARTPRICE, params=params 

413 ) 

414 

415 if response_data.rt_cd != ErrorCode.SUCCESS.value: 

416 error_msg = f"API 응답 비정상: None, 응답: {response_data.data}" 

417 self._logger.error(error_msg) 

418 return ResCommonResponse( 

419 rt_cd=ErrorCode.API_ERROR.value, 

420 msg1=error_msg, 

421 data=[] 

422 ) 

423 

424 if not response_data.data: # None 또는 빈 리스트 

425 warning_msg = f"일별 시세 차트 데이터가 비어있음 (stock_code: {stock_code})" 

426 self._logger.warning(warning_msg) 

427 return ResCommonResponse( 

428 rt_cd=ErrorCode.MISSING_KEY.value, 

429 msg1=warning_msg, 

430 data=[] 

431 ) 

432 

433 raw = response_data.data 

434 if isinstance(raw, dict): 

435 # KIS 관례: output1 = 요약/메타, output2 = 캔들 리스트 

436 output_list = raw.get("output2") or raw.get("output") or raw.get("output1") or [] 

437 else: 

438 output_list = raw or [] 

439 

440 # 리스트 보장 

441 if not isinstance(output_list, list): 

442 output_list = [output_list] if output_list else [] 

443 

444 chart_data_items: List[ResDailyChartApiItem] = [] 

445 for item in output_list: 

446 try: 

447 if not isinstance(item, dict): 

448 raise TypeError(f"Chart data item is not a dictionary: {type(item)}") 

449 # from_dict가 있으면 쓰고, 없으면 **unpack 

450 if hasattr(ResDailyChartApiItem, "from_dict"): 450 ↛ 453line 450 didn't jump to line 453 because the condition on line 450 was always true

451 chart_data_items.append(ResDailyChartApiItem.from_dict(item)) 

452 else: 

453 chart_data_items.append(ResDailyChartApiItem(**item)) 

454 except (TypeError, ValueError) as e: 

455 self._logger.warning(f"차트 데이터 항목 파싱 오류: {e}, 항목: {item}") 

456 continue 

457 

458 return ResCommonResponse( 

459 rt_cd=ErrorCode.SUCCESS.value, 

460 msg1="일별/분봉 차트 데이터 조회 성공", 

461 data=chart_data_items 

462 ) 

463 

464 async def inquire_time_itemchartprice( 

465 self, 

466 stock_code: str, 

467 input_hour: str, # "YYYYMMDDHH" (len=10) 

468 include_past: str = "Y", # 과거 데이터 포함 여부 

469 etc_cls_code: str = "0", # 기타 구분 

470 ) -> ResCommonResponse: 

471 """ 

472 분봉(시간) 차트 조회. 

473 URL: /uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice 

474 TR: FHKST03010200 (모의/실전 동일; TR 테이블에서 TIME_ITEMCHARTPRICE 로 매핑) 

475 """ 

476 # TR-ID: minute leaf 사용(테이블에서 FHKST03010200 으로 매핑해두기) 

477 tr_id = self._trid_provider.time_itemchartprice() 

478 self._headers.set_tr_id(tr_id) 

479 self._headers.set_custtype(self._env.active_config['custtype']) 

480 

481 params = Params.time_itemchartprice( 

482 stock_code=stock_code, 

483 input_hour=input_hour, 

484 include_past=include_past, 

485 etc_cls_code=etc_cls_code, 

486 ) 

487 

488 self._logger.info(f"[분봉] {stock_code} time-itemchartprice 조회 시도... (input_hour={input_hour})") 

489 resp: ResCommonResponse = await self.call_api( 

490 "GET", EndpointKey.TIME_ITEMCHARTPRICE, params=params, retry_count=1 

491 ) 

492 

493 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

494 original_msg = resp.msg1 if resp else "응답 없음" 

495 error_msg = f"분봉 차트 조회 실패: {original_msg}" 

496 self._logger.warning(f"[분봉] 조회 실패: {original_msg}") 

497 # 원래의 에러 코드를 유지하면서, 컨텍스트를 추가한 메시지를 반환합니다. 

498 return ResCommonResponse(rt_cd=resp.rt_cd if resp else ErrorCode.API_ERROR.value, msg1=error_msg, data=[]) 

499 

500 raw = resp.data 

501 # KIS 관례상 output2(리스트) 우선 

502 rows = (raw.get("output2") or raw.get("output") or raw.get("output1") or []) if isinstance(raw, dict) else raw 

503 if not isinstance(rows, list): 

504 rows = [rows] if rows else [] 

505 

506 # 분봉 스키마는 일봉과 다를 수 있으므로, 우선 원시 rows 를 그대로 반환 

507 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="분봉 차트 조회 성공", data=rows) 

508 

509 async def inquire_time_dailychartprice(self, 

510 stock_code: str, 

511 input_hour: str, # 길이 10 권장 (당일 기준 시간) 

512 input_date: str, # "YYYYMMDDHH" (len=10) 

513 include_past: str = "Y", # 과거 데이터 포함 여부 

514 fid_pw_data_incu_yn: str = "", # 기타 구분 

515 ): 

516 """ 

517 일변 분봉(특정 일자) 조회 ※ 모의투자 미지원 

518 URL : /uapi/domestic-stock/v1/quotations/inquire-time-dailychartprice 

519 TRID: FHKST03010230 (REAL only) 

520 """ 

521 

522 full_config = self._env.active_config 

523 tr_id = self._trid_provider.quotations(TrIdLeaf.TIME_DAILY_ITEMCHARTPRICE) 

524 self._headers.set_tr_id(tr_id) 

525 self._headers.set_custtype(full_config["custtype"]) 

526 

527 params = Params.time_daily_itemchartprice( 

528 stock_code=stock_code, 

529 input_hour=input_hour, 

530 input_date=input_date, 

531 include_past=include_past, 

532 fid_pw_data_incu_yn=fid_pw_data_incu_yn, 

533 ) 

534 self._logger.info(f"[분봉-일변] {stock_code} 조회 (hour={input_hour}) (date_1={input_date} ") 

535 resp = await self.call_api("GET", EndpointKey.TIME_DAILY_ITEMCHARTPRICE, params=params, retry_count=1) 

536 

537 if resp.rt_cd != ErrorCode.SUCCESS.value: 

538 return resp 

539 raw = resp.data if isinstance(resp.data, dict) else {} 

540 rows = raw.get("output2") or raw.get("output") or raw.get("output1") or [] 

541 rows = rows if isinstance(rows, list) else ([rows] if rows else []) 

542 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="일변 분봉 조회 성공", data=rows) 

543 

544 async def get_asking_price(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

545 """ 

546 종목의 실시간 호가(매도/매수 잔량 포함) 정보를 조회합니다. 

547 ResCommonResponse 형태로 반환되며, data는 원시 output 딕셔너리입니다. 

548 """ 

549 full_config = self._env.active_config 

550 tr_id = self._trid_provider.quotations(TrIdLeaf.ASKING_PRICE) 

551 

552 self._headers.set_tr_id(tr_id) 

553 self._headers.set_custtype(full_config["custtype"]) 

554 

555 market_code = _exchange_to_market_code(exchange) 

556 params = Params.asking_price(stock_code=stock_code, market=market_code) 

557 

558 self._logger.info(f"{stock_code} 종목 호가잔량 조회 시도...") 

559 

560 response: ResCommonResponse = await self.call_api("GET", EndpointKey.ASKING_PRICE, params=params, retry_count=1) 

561 

562 if response.rt_cd != ErrorCode.SUCCESS.value: 

563 self._logger.warning(f"{stock_code} 호가 정보 조회 실패: {response.msg1}") 

564 return response 

565 

566 return response 

567 

568 async def get_time_concluded_prices(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

569 """ 

570 종목의 시간대별 체결가/체결량 정보를 조회합니다. 

571 """ 

572 full_config = self._env.active_config 

573 tr_id = self._trid_provider.quotations(TrIdLeaf.TIME_CONCLUDE) 

574 

575 self._headers.set_tr_id(tr_id) 

576 self._headers.set_custtype(full_config["custtype"]) 

577 

578 market_code = _exchange_to_market_code(exchange) 

579 params = Params.time_conclude(stock_code=stock_code, market=market_code) 

580 

581 self._logger.info(f"{stock_code} 종목 체결가 조회 시도...") 

582 response: ResCommonResponse = await self.call_api("GET", EndpointKey.TIME_CONCLUDE, params=params, 

583 retry_count=1) 

584 

585 if response.rt_cd != ErrorCode.SUCCESS.value: 

586 self._logger.warning(f"{stock_code} 체결가 정보 조회 실패: {response.msg1}") 

587 return response 

588 

589 return response 

590 

591 async def get_top_rise_fall_stocks(self, rise: bool = True) -> ResCommonResponse: 

592 """ 

593 상승률/하락률 상위 종목 조회 

594 """ 

595 full_config = self._env.active_config 

596 tr_id = self._trid_provider.quotations(TrIdLeaf.RANKING_FLUCTUATION) 

597 

598 self._headers.set_tr_id(tr_id) 

599 self._headers.set_custtype(full_config["custtype"]) 

600 

601 params = ( 

602 Params.fluctuation_rise() # 상승률 상위 

603 if rise 

604 else Params.fluctuation_fall() # 하락률 상위 

605 ) 

606 

607 direction = "상승" if rise else "하락" 

608 self._logger.info(f"{direction}률 상위 종목 조회 시도...") 

609 response = await self.call_api("GET", EndpointKey.RANKING_FLUCTUATION, params=params, retry_count=3) 

610 

611 if response.rt_cd != ErrorCode.SUCCESS.value: 

612 return response 

613 

614 try: 

615 if not isinstance(response.data, dict): 

616 raise TypeError(f"Expected dict for response data, but got {type(response.data)}") 

617 

618 output_list = response.data.get("output", []) 

619 if not isinstance(output_list, list): 

620 raise TypeError(f"Expected 'output' to be a list, but got {type(output_list)}") 

621 

622 stocks = [] 

623 for row in output_list: 

624 if not isinstance(row, dict): 

625 raise TypeError(f"Expected item in 'output' to be a dict, but got {type(row)}") 

626 stocks.append(ResFluctuation.from_dict(row)) 

627 

628 return ResCommonResponse( 

629 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용 

630 msg1="종목 정보 조회 성공", 

631 data=stocks 

632 ) 

633 except (TypeError, AttributeError, ValidationError) as e: 

634 error_msg = f"등락률 응답 형식 오류: {e}, 응답: {response.data!r}" 

635 self._logger.error(error_msg) 

636 return ResCommonResponse( 

637 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용 

638 msg1=error_msg, 

639 data=None 

640 ) 

641 

642 async def get_top_volume_stocks(self) -> ResCommonResponse: 

643 """ 

644 거래량 상위 종목 조회 

645 """ 

646 full_config = self._env.active_config 

647 tr_id = self._trid_provider.quotations(TrIdLeaf.RANKING_VOLUME) 

648 

649 self._headers.set_tr_id(tr_id) 

650 self._headers.set_custtype(full_config["custtype"]) 

651 

652 params = Params.volume_rank() 

653 

654 self._logger.info(f"거래량 상위 종목 조회 시도...") 

655 response = await self.call_api("GET", EndpointKey.RANKING_VOLUME, params=params, retry_count=3) 

656 

657 if response.rt_cd != ErrorCode.SUCCESS.value: 

658 self._logger.warning(f"거래량 상위 조회 실패: {response.msg1}") 

659 return response 

660 

661 try: 

662 if not isinstance(response.data, dict): 

663 raise TypeError(f"Expected dict for response data, but got {type(response.data)}") 

664 

665 stocks = response.data.get("output", []) 

666 if not isinstance(stocks, list): 

667 raise TypeError(f"Expected 'output' to be a list, but got {type(stocks)}") 

668 

669 return ResCommonResponse( 

670 rt_cd=ErrorCode.SUCCESS.value, 

671 msg1="거래량 상위 종목 조회 성공", 

672 data=stocks 

673 ) 

674 except (TypeError, AttributeError, ValidationError) as e: 

675 error_msg = f"거래량 상위 응답 형식 오류: {e}, 응답: {response.data!r}" 

676 self._logger.error(error_msg) 

677 return ResCommonResponse( 

678 rt_cd=ErrorCode.PARSING_ERROR.value, 

679 msg1=error_msg, 

680 data=None 

681 ) 

682 

683 

684 async def get_investor_trade_by_stock_daily(self, stock_code: str, date: str = None) -> ResCommonResponse: 

685 """ 

686 종목별 투자자 매매동향(일별) 조회 

687 실전 전용 (모의투자 미지원) 

688 한 번의 호출로 외국인/개인/기관 순매수 데이터를 모두 반환. 

689 """ 

690 if date is None: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true

691 date = datetime.now().strftime("%Y%m%d") 

692 

693 full_config = self._env.active_config 

694 tr_id = self._trid_provider.quotations(TrIdLeaf.INVESTOR_TRADE_BY_STOCK_DAILY) 

695 

696 self._headers.set_tr_id(tr_id) 

697 self._headers.set_custtype(full_config["custtype"]) 

698 

699 params = Params.investor_trade_by_stock_daily(stock_code=stock_code, date=date) 

700 

701 response: ResCommonResponse = await self.call_api( 

702 "GET", EndpointKey.INVESTOR_TRADE_BY_STOCK_DAILY, params=params, retry_count=1 

703 ) 

704 

705 if response.rt_cd != ErrorCode.SUCCESS.value: 

706 return response 

707 

708 try: 

709 if not isinstance(response.data, dict): 

710 raise TypeError(f"Expected dict, got {type(response.data)}") 

711 

712 output1 = response.data.get("output1", {}) 

713 output2 = response.data.get("output2", []) 

714 

715 if not isinstance(output2, list) or not output2: 

716 return ResCommonResponse( 

717 rt_cd=ErrorCode.SUCCESS.value, 

718 msg1="데이터 없음", 

719 data=None 

720 ) 

721 

722 # output1(현재가 정보) + output2[0](최신 일별 투자자 데이터) 병합 

723 # output2의 빈 값("")이 output1의 유효한 값을 덮어쓰지 않도록 처리 

724 merged = {} 

725 if isinstance(output1, dict): 725 ↛ 727line 725 didn't jump to line 727 because the condition on line 725 was always true

726 merged.update(output1) 

727 for k, v in output2[0].items(): 

728 if v or k not in merged: 728 ↛ 727line 728 didn't jump to line 727 because the condition on line 728 was always true

729 merged[k] = v 

730 

731 return ResCommonResponse( 

732 rt_cd=ErrorCode.SUCCESS.value, 

733 msg1="투자자 매매동향 조회 성공", 

734 data=merged 

735 ) 

736 except (TypeError, AttributeError) as e: 

737 error_msg = f"투자자 매매동향 응답 형식 오류: {e}" 

738 self._logger.error(error_msg) 

739 return ResCommonResponse( 

740 rt_cd=ErrorCode.PARSING_ERROR.value, 

741 msg1=error_msg, 

742 data=None 

743 ) 

744 

745 async def get_investor_trade_by_stock_daily_multi(self, stock_code: str, date: str = None, days: int = 3) -> ResCommonResponse: 

746 """ 

747 종목별 투자자 매매동향(일별) 다중일 조회 

748 실전 전용 (모의투자 미지원) 

749 output2[:days] 를 리스트로 반환. 각 항목은 {frgn_ntby_tr_pbmn, orgn_ntby_tr_pbmn, acml_tr_pbmn, stck_bsop_date, ...} 형태. 

750 단위: frgn/orgn_ntby_tr_pbmn 은 백만원, acml_tr_pbmn 은 원. 

751 """ 

752 if date is None: 

753 date = datetime.now().strftime("%Y%m%d") 

754 

755 full_config = self._env.active_config 

756 tr_id = self._trid_provider.quotations(TrIdLeaf.INVESTOR_TRADE_BY_STOCK_DAILY) 

757 

758 self._headers.set_tr_id(tr_id) 

759 self._headers.set_custtype(full_config["custtype"]) 

760 

761 params = Params.investor_trade_by_stock_daily(stock_code=stock_code, date=date) 

762 

763 response: ResCommonResponse = await self.call_api( 

764 "GET", EndpointKey.INVESTOR_TRADE_BY_STOCK_DAILY, params=params, retry_count=1 

765 ) 

766 

767 if response.rt_cd != ErrorCode.SUCCESS.value: 767 ↛ 768line 767 didn't jump to line 768 because the condition on line 767 was never true

768 return response 

769 

770 try: 

771 if not isinstance(response.data, dict): 

772 raise TypeError(f"Expected dict, got {type(response.data)}") 

773 

774 output2 = response.data.get("output2", []) 

775 

776 if not isinstance(output2, list) or not output2: 

777 return ResCommonResponse( 

778 rt_cd=ErrorCode.SUCCESS.value, 

779 msg1="데이터 없음", 

780 data=[] 

781 ) 

782 

783 return ResCommonResponse( 

784 rt_cd=ErrorCode.SUCCESS.value, 

785 msg1="투자자 매매동향 다중일 조회 성공", 

786 data=output2[:days] 

787 ) 

788 except (TypeError, AttributeError) as e: 

789 error_msg = f"투자자 매매동향 다중일 응답 형식 오류: {e}" 

790 self._logger.error(error_msg) 

791 return ResCommonResponse( 

792 rt_cd=ErrorCode.PARSING_ERROR.value, 

793 msg1=error_msg, 

794 data=None 

795 ) 

796 

797 async def get_program_trade_by_stock_daily(self, stock_code: str, date: str = None) -> ResCommonResponse: 

798 """ 

799 종목별 프로그램매매추이(일별) 조회 

800 실전 전용 (모의투자 미지원) 

801 """ 

802 if date is None: 

803 date = datetime.now().strftime("%Y%m%d") 

804 

805 full_config = self._env.active_config 

806 tr_id = self._trid_provider.quotations(TrIdLeaf.PROGRAM_TRADE_BY_STOCK_DAILY) 

807 

808 self._headers.set_tr_id(tr_id) 

809 self._headers.set_custtype(full_config["custtype"]) 

810 

811 params = Params.program_trade_by_stock_daily(stock_code=stock_code, date=date) 

812 

813 response: ResCommonResponse = await self.call_api( 

814 "GET", EndpointKey.PROGRAM_TRADE_BY_STOCK_DAILY, params=params, retry_count=1 

815 ) 

816 

817 if response.rt_cd != ErrorCode.SUCCESS.value: 

818 return response 

819 

820 try: 

821 if not isinstance(response.data, dict): 

822 raise TypeError(f"Expected dict, got {type(response.data)}") 

823 

824 output = response.data.get("output", []) 

825 

826 if not isinstance(output, list) or not output: 

827 return ResCommonResponse( 

828 rt_cd=ErrorCode.SUCCESS.value, 

829 msg1="데이터 없음", 

830 data=None 

831 ) 

832 

833 # output[0] = 최신 일별 프로그램매매 데이터 

834 return ResCommonResponse( 

835 rt_cd=ErrorCode.SUCCESS.value, 

836 msg1="프로그램매매추이 조회 성공", 

837 data=output[0] 

838 ) 

839 except (TypeError, AttributeError) as e: 

840 error_msg = f"프로그램매매추이 응답 형식 오류: {e}" 

841 self._logger.error(error_msg) 

842 return ResCommonResponse( 

843 rt_cd=ErrorCode.PARSING_ERROR.value, 

844 msg1=error_msg, 

845 data=None 

846 ) 

847 

848 # async def get_stock_news(self, stock_code: str) -> ResCommonResponse: 

849 # """ 

850 # 종목 뉴스 조회 

851 # """ 

852 # full_config = self._env.active_config 

853 # 

854 # path = full_config["paths"]["item_news"] 

855 # tr_id = full_config["tr_ids"]["quotations"]["item_news"] 

856 # 

857 # self._headers["tr_id"] = tr_id 

858 # self._headers["custtype"] = full_config["custtype"] 

859 # 

860 # params = { 

861 # "fid_input_iscd": stock_code 

862 # } 

863 # 

864 # self._logger.info(f"{stock_code} 종목 뉴스 조회 시도...") 

865 # response = await self.call_api("GET", path, params=params, retry_count=1) 

866 # 

867 # if response.rt_cd != ErrorCode.SUCCESS.value: 

868 # self._logger.warning(f"{stock_code} 종목 뉴스 조회 실패: {response.msg1}") 

869 # return response 

870 # 

871 # return response 

872 

873 async def get_multi_price(self, stock_codes: list[str]) -> ResCommonResponse: 

874 """ 

875 복수종목 현재가 조회 (최대 30종목) 

876 URL: /uapi/domestic-stock/v1/quotations/intstock-multprice 

877 TR: FHKST11300006 (실전 전용) 

878 """ 

879 if not stock_codes: 

880 return ResCommonResponse( 

881 rt_cd=ErrorCode.INVALID_INPUT.value, 

882 msg1="종목코드 리스트가 비어 있습니다.", 

883 data=[] 

884 ) 

885 

886 if len(stock_codes) > 30: 

887 self._logger.warning(f"최대 30종목까지 조회 가능합니다. 30개로 제한합니다. (요청: {len(stock_codes)}개)") 

888 stock_codes = stock_codes[:30] 

889 

890 full_config = self._env.active_config 

891 tr_id = self._trid_provider.quotations(TrIdLeaf.MULTI_PRICE) 

892 

893 self._headers.set_tr_id(tr_id) 

894 self._headers.set_custtype(full_config["custtype"]) 

895 

896 params = Params.multi_price(stock_codes=stock_codes) 

897 

898 self._logger.info(f"복수종목 현재가 조회 시도 ({len(stock_codes)}종목: {stock_codes[:5]}{'...' if len(stock_codes) > 5 else ''})") 

899 response: ResCommonResponse = await self.call_api("GET", EndpointKey.MULTI_PRICE, params=params, retry_count=1) 

900 

901 if response.rt_cd != ErrorCode.SUCCESS.value: 

902 self._logger.warning(f"복수종목 현재가 조회 실패: {response.msg1}") 

903 return response 

904 

905 if not response.data: 

906 return ResCommonResponse( 

907 rt_cd=ErrorCode.EMPTY_VALUES.value, 

908 msg1="복수종목 현재가 데이터 없음", 

909 data=[] 

910 ) 

911 

912 raw = response.data 

913 output_list = raw.get("output", []) if isinstance(raw, dict) else raw 

914 if not isinstance(output_list, list): 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true

915 output_list = [output_list] if output_list else [] 

916 

917 # 필드명 정규화: inter_shrn_iscd → stck_shrn_iscd, inter2_prpr → stck_prpr 

918 FIELD_MAP = { 

919 "inter_shrn_iscd": "stck_shrn_iscd", 

920 "inter2_prpr": "stck_prpr", 

921 } 

922 normalized = [] 

923 for item in output_list: 

924 if not isinstance(item, dict): 

925 continue 

926 mapped = {} 

927 for k, v in item.items(): 

928 mapped[FIELD_MAP.get(k, k)] = v 

929 normalized.append(mapped) 

930 

931 return ResCommonResponse( 

932 rt_cd=ErrorCode.SUCCESS.value, 

933 msg1="복수종목 현재가 조회 성공", 

934 data=normalized 

935 ) 

936 

937 async def get_etf_info(self, etf_code: str) -> ResCommonResponse: 

938 """ 

939 ETF 정보 조회 

940 """ 

941 full_config = self._env.active_config 

942 tr_id = self._trid_provider.quotations(TrIdLeaf.ETF_INFO) 

943 

944 self._headers.set_tr_id(tr_id) 

945 self._headers.set_custtype(full_config["custtype"]) 

946 

947 params = Params.etf_info(etf_code=etf_code) 

948 

949 self._logger.info(f"{etf_code} ETF 정보 조회 시도...") 

950 response = await self.call_api("GET", EndpointKey.ETF_INFO, params=params, retry_count=1) 

951 

952 if response.rt_cd != ErrorCode.SUCCESS.value: 

953 self._logger.warning(f"{etf_code} ETF 조회 실패: {response.msg1}") 

954 return response 

955 

956 return response 

957 

958 async def get_financial_ratio(self, stock_code: str) -> ResCommonResponse: 

959 """기업 재무비율 조회 (영업이익 증가율 등). 

960 

961 TR ID: FHKST66430300 (KIS 문서 확인 필요, 실전 전용 가능성) 

962 모의투자 환경에서 미지원 시 API_ERROR 반환 → 호출 측에서 graceful degradation. 

963 """ 

964 full_config = self._env.active_config 

965 tr_id = self._trid_provider.quotations(TrIdLeaf.FINANCIAL_RATIO) 

966 

967 self._headers.set_tr_id(tr_id) 

968 self._headers.set_custtype(full_config['custtype']) 

969 

970 params = Params.financial_ratio(stock_code=stock_code) 

971 

972 self._logger.info(f"{stock_code} 기업 재무비율 조회 시도...") 

973 response: ResCommonResponse = await self.call_api( 

974 "GET", EndpointKey.FINANCIAL_RATIO, params=params, retry_count=1, 

975 ) 

976 

977 if response.rt_cd != ErrorCode.SUCCESS.value: 

978 self._logger.warning(f"{stock_code} 재무비율 조회 실패: {response.msg1}") 

979 return response 

980 

981 return response 

982 

983 async def check_holiday(self, date: str) -> ResCommonResponse: 

984 """국내 휴장일 조회 (CTCA0903R)""" 

985 full_config = self._env.active_config 

986 tr_id = self._trid_provider.quotations(TrIdLeaf.CHK_HOLIDAY) 

987 

988 self._headers.set_tr_id(tr_id) 

989 self._headers.set_custtype(full_config['custtype']) 

990 

991 params = Params.check_holiday(date) 

992 

993 # EndpointKey.CHK_HOLIDAY는 전체 경로로 설정되어 있음 

994 return await self.call_api( 

995 "GET", EndpointKey.CHK_HOLIDAY, params=params, retry_count=1 

996 )