Coverage for brokers / korea_investment / korea_invest_quotations_api.py: 97%
450 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# brokers/korea_investment/korea_invest_quotations_api.py
2import httpx
3from datetime import datetime
4from typing import Dict, List, Union, Optional
5from pydantic import ValidationError
6from brokers.korea_investment.korea_invest_api_base import KoreaInvestApiBase
7from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv
8from brokers.korea_investment.korea_invest_params_provider import Params
9from brokers.korea_investment.korea_invest_header_provider import KoreaInvestHeaderProvider
10from brokers.korea_investment.korea_invest_url_provider import KoreaInvestUrlProvider
11from brokers.korea_investment.korea_invest_url_keys import EndpointKey
12from brokers.korea_investment.korea_invest_trid_provider import KoreaInvestTrIdProvider
13from brokers.korea_investment.korea_invest_trid_keys import TrIdLeaf
14# common/types에서 모든 ResTypedDict와 ErrorCode 임포트
15from common.types import (
16 ResPriceSummary, ResMomentumStock, ResCommonResponse, ErrorCode,
17 ResStockFullInfoApiOutput, ResTopMarketCapApiItem, ResDailyChartApiItem, ResFluctuation,
18 Exchange,
19)
22def _exchange_to_market_code(exchange: Exchange) -> str:
23 """Exchange enum을 API 마켓코드 문자열로 변환합니다."""
24 if exchange == Exchange.NXT: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true
25 return "NX"
26 elif exchange == Exchange.UN: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true
27 return "UN"
28 return "J" # KRX 기본값
31class KoreaInvestApiQuotations(KoreaInvestApiBase):
32 def __init__(self,
33 env: KoreaInvestApiEnv,
34 logger=None,
35 market_clock=None,
36 async_client: Optional[httpx.AsyncClient] = None,
37 header_provider: Optional[KoreaInvestHeaderProvider] = None,
38 url_provider: Optional[KoreaInvestUrlProvider] = None,
39 trid_provider: Optional[KoreaInvestTrIdProvider] = None):
40 super().__init__(env,
41 logger,
42 market_clock,
43 async_client=async_client,
44 header_provider=header_provider,
45 url_provider=url_provider,
46 trid_provider=trid_provider)
48 async def get_stock_info_by_code(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
49 """
50 종목코드로 종목의 전체 정보 (이름, 현재가, 시가총액 등)를 가져옵니다.
51 ResCommonResponse 형태로 반환하며, data 필드에 ResStockFullInfoApiOutput 포함.
52 """
53 full_config = self._env.active_config
54 tr_id = self._trid_provider.quotations(TrIdLeaf.SEARCH_INFO) # 모드에 따라 자동
56 self._headers.set_tr_id(tr_id)
57 self._headers.set_custtype(full_config['custtype'])
59 params = Params.search_info(stock_code=stock_code, prdt_type_cd=full_config["params"]["fid_div_cls_code"])
61 self._logger.info(f"{stock_code} 종목 정보 조회 시도... (exchange={exchange.value})")
62 response: ResCommonResponse = await self.call_api("GET", EndpointKey.SEARCH_INFO, params=params, retry_count=1)
64 if response.rt_cd == ErrorCode.SUCCESS.value:
65 try:
66 # Pydantic 모델 필수 필드 누락 방지 (API 응답에 없을 경우 기본값 설정)
67 if "new_hgpr_lwpr_cls_code" not in response.data:
68 response.data["new_hgpr_lwpr_cls_code"] = "-"
70 stock_info_data = ResStockFullInfoApiOutput(**response.data)
71 return ResCommonResponse(
72 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용
73 msg1="종목 정보 조회 성공",
74 data=stock_info_data
75 )
76 except (TypeError, ValidationError) as e:
77 error_msg = f"{stock_code} 종목 정보 응답 형식 오류: {e}, 응답: {response.data}"
78 self._logger.error(error_msg)
79 return ResCommonResponse(
80 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용
81 msg1=error_msg,
82 data=None
83 )
84 else:
85 error_msg = f"{stock_code} 종목 정보 조회 실패: {response.msg1 or '알 수 없는 오류'}, 응답: {response}"
86 self._logger.warning(error_msg)
87 return ResCommonResponse(
88 rt_cd=response.get("rt_cd", ErrorCode.API_ERROR.value) if isinstance(response,
89 dict) else ErrorCode.API_ERROR.value,
90 msg1=error_msg,
91 data=None
92 )
94 async def get_current_price(self, stock_code, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
95 """
96 현재가를 조회합니다. API 원본 응답을 ResCommonResponse의 data 필드에 담아 반환.
97 """
98 full_config = self._env.active_config
99 tr_id = self._trid_provider.quotations(TrIdLeaf.INQUIRE_PRICE) # 모드에 따라 자동
101 self._headers.set_tr_id(tr_id)
102 self._headers.set_custtype(full_config['custtype'])
104 market_code = _exchange_to_market_code(exchange)
105 params = Params.inquire_price(stock_code=stock_code, market=market_code)
106 self._logger.info(f"{stock_code} 현재가 조회 시도...")
108 response: ResCommonResponse = await self.call_api("GET", EndpointKey.INQUIRE_PRICE, params=params,
109 retry_count=3)
111 if response.rt_cd != ErrorCode.SUCCESS.value:
112 self._logger.warning("현재가 조회 실패")
113 return response
115 try:
116 response_data_dict = response.data['output']
117 # Pydantic 모델 필수 필드 누락 방지 (API 응답에 없을 경우 기본값 설정)
118 if "new_hgpr_lwpr_cls_code" not in response_data_dict:
119 response_data_dict["new_hgpr_lwpr_cls_code"] = "-"
121 response.data['output'] = ResStockFullInfoApiOutput.from_dict(response_data_dict)
122 except (KeyError, ValidationError, TypeError) as e:
123 error_msg = f"현재가 응답 데이터 파싱 실패: {stock_code}, 오류: {e}"
124 self._logger.error(error_msg)
125 return ResCommonResponse(
126 rt_cd=ErrorCode.PARSING_ERROR.value,
127 msg1=error_msg,
128 data=None
129 )
130 return response
132 async def get_stock_conclusion(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
133 """주식 체결(체결강도 포함) 정보를 조회합니다."""
134 full_config = self._env.active_config
135 tr_id = self._trid_provider.quotations(TrIdLeaf.INQUIRE_CONCLUSION)
137 self._headers.set_tr_id(tr_id)
138 self._headers.set_custtype(full_config['custtype'])
140 market_code = _exchange_to_market_code(exchange)
141 params = Params.inquire_conclusion(stock_code=stock_code, market=market_code)
142 self._logger.info(f"{stock_code} 체결(체결강도) 조회 시도...")
143 return await self.call_api("GET", EndpointKey.INQUIRE_CONCLUSION, params=params, retry_count=1)
145 async def get_price_summary(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
146 """
147 주어진 종목코드에 대해 시가/현재가/등락률(%) 요약 정보 반환
148 ResCommonResponse 형태로 반환하며, data 필드에 ResPriceSummary 포함.
149 """
150 response_common = await self.get_current_price(stock_code, exchange=exchange)
152 if response_common.rt_cd != ErrorCode.SUCCESS.value:
153 self._logger.warning(f"({stock_code}) get_current_price 실패: {response_common.msg1}")
154 return ResCommonResponse(
155 rt_cd=ErrorCode.API_ERROR.value,
156 msg1="get_current_price 실패",
157 data=None
158 )
160 output: ResStockFullInfoApiOutput = response_common.data['output']
162 if not output:
163 error_msg = f"API 응답 output 데이터 없음: {stock_code}, 응답: {response_common.msg1}"
164 self._logger.warning(error_msg)
165 return ResCommonResponse(
166 rt_cd=ErrorCode.API_ERROR.value, # Enum 값 사용
167 msg1=error_msg,
168 data=None
169 )
170 if not isinstance(output, ResStockFullInfoApiOutput):
171 error_msg = f"Wrong Ret Type ResStockFullInfoApiOutput - Ret: {type(output)}, 응답: {response_common.msg1}"
172 self._logger.warning(error_msg)
173 return ResCommonResponse(
174 rt_cd=ErrorCode.WRONG_RET_TYPE.value, # Enum 값 사용
175 msg1=error_msg,
176 data=None
177 )
179 # ✅ 필수 키 누락 체크
180 required_keys = ["stck_oprc", "stck_prpr", "prdy_ctrt"]
181 if not all(hasattr(output, k) and getattr(output, k) is not None for k in required_keys): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 error_msg = f"API 응답 output에 필수 가격 데이터 누락: {stock_code}, 응답: {output}"
183 self._logger.warning(error_msg)
184 return ResCommonResponse(
185 rt_cd=ErrorCode.PARSING_ERROR.value,
186 msg1=error_msg,
187 data=None
188 )
190 try:
191 open_price = int(output.stck_oprc)
192 current_price = int(output.stck_prpr)
193 prdy_ctrt = float(output.prdy_ctrt)
194 except (ValueError, TypeError) as e:
195 error_msg = f"가격 데이터 파싱 실패: {stock_code}, 응답: {output}, 오류: {e}"
196 self._logger.warning(error_msg)
197 return ResCommonResponse(
198 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용
199 msg1=error_msg,
200 data=None
201 )
203 change_rate = (current_price - open_price) / open_price * 100 if open_price else 0
205 # 신고/신저가 상태 확인 및 로깅
206 raw_status_code = str(getattr(output, 'new_hgpr_lwpr_cls_code', ''))
207 if "vs" in raw_status_code:
208 self._logger.warning(f"({stock_code}) 신고/신저가 불일치: {raw_status_code}")
209 # 최종 응답에는 가공된 값을 사용
210 new_high_low_status = output.new_high_low_status
211 if "vs" in new_high_low_status: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 self._logger.warning(f"({stock_code}) 신고/신저가 불일치: {new_high_low_status}")
214 price_summary_data = ResPriceSummary(
215 symbol=stock_code,
216 open=open_price,
217 current=current_price,
218 change_rate=round(change_rate, 2),
219 prdy_ctrt=prdy_ctrt,
220 new_high_low_status=new_high_low_status,
221 )
222 return ResCommonResponse(
223 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용
224 msg1="정상 처리되었습니다.",
225 data=price_summary_data
226 )
228 async def get_market_cap(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
229 """
230 종목코드로 시가총액을 반환합니다. (단위: 원)
231 ResCommonResponse 형태로 반환하며, data 필드에 int 시가총액 값 포함.
232 """
233 response_common = await self.get_stock_info_by_code(stock_code, exchange=exchange)
235 if response_common.rt_cd != ErrorCode.SUCCESS.value: # Enum 값 사용
236 return response_common
238 info: ResStockFullInfoApiOutput = response_common.data
240 if info is None:
241 error_msg = f"{stock_code} 시가총액 정보 조회 실패: ResStockFullInfoApiOutput 데이터가 None"
242 self._logger.warning(error_msg)
243 return ResCommonResponse(
244 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용
245 msg1=error_msg,
246 data=None
247 )
249 market_cap_str = info.stck_llam
250 if market_cap_str and market_cap_str.isdigit():
251 market_cap = int(market_cap_str)
252 return ResCommonResponse(
253 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용
254 msg1="시가총액 조회 성공",
255 data=market_cap
256 )
257 else:
258 error_msg = f"{stock_code} 시가총액 정보 없음 또는 형식 오류: {market_cap_str}"
259 self._logger.warning(error_msg)
260 return ResCommonResponse(
261 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용
262 msg1=error_msg,
263 data=0
264 )
266 async def get_top_market_cap_stocks_code(self, market_code: str, count: int = 30) -> ResCommonResponse:
267 """
268 시가총액 상위 종목 목록을 반환합니다. 최대 30개까지만 지원됩니다.
269 ResCommonResponse 형태로 반환하며, data 필드에 List[ResTopMarketCapApiItem] 포함.
270 """
271 full_config = self._env.active_config
273 if count <= 0:
274 error_msg = f"요청된 count가 0 이하입니다. count={count}"
275 self._logger.warning(error_msg)
276 return ResCommonResponse(
277 rt_cd=ErrorCode.INVALID_INPUT.value, # Enum 값 사용
278 msg1=error_msg,
279 data=[]
280 )
282 if count > 30:
283 self._logger.warning(f"요청 수 {count}는 최대 허용값 30을 초과하므로 30개로 제한됩니다.")
284 count = 30
286 tr_id = self._trid_provider.quotations(TrIdLeaf.MARKET_CAP)
287 self._headers.set_tr_id(tr_id)
288 self._headers.set_custtype(full_config['custtype'])
290 params = Params.top_market_cap(input_iscd=market_code)
292 self._logger.info(f"시가총액 상위 종목 조회 시도 (시장코드: {market_code}, 요청개수: {count})")
293 response: ResCommonResponse = await self.call_api("GET", EndpointKey.MARKET_CAP, params=params, retry_count=3)
295 if response.rt_cd != ErrorCode.SUCCESS.value:
296 self._logger.warning(f"시가총액 응답 오류 또는 비어 있음: 시가총액 조회 실패")
297 return ResCommonResponse(
298 rt_cd=response.get("rt_cd", ErrorCode.API_ERROR.value) if isinstance(response,
299 dict) else ErrorCode.API_ERROR.value,
300 # Enum 값 사용
301 msg1="시가총액 조회 실패",
302 data=[]
303 )
305 batch = response.data.get('output', '')[:count]
306 self._logger.info(f"API로부터 수신한 종목 수: {len(batch)}")
308 results: list[ResTopMarketCapApiItem] = []
310 for raw in batch:
311 try:
312 # 1) 필수키 정규화 (별칭 허용)
313 code = raw.get("mksc_shrn_iscd") or raw.get("iscd")
314 market_cap = raw.get("stck_avls") # 문자열 숫자 유지
316 if not code or not market_cap:
317 # 필수값 없으면 스킵
318 continue
320 # 2) 선택/별칭 필드들까지 묶어서 안전 생성
321 # - dataclass 내부 __post_init__에서 iscd/acc_trdvol ↔ mksc_shrn_iscd/acml_vol 보완
322 item = ResTopMarketCapApiItem(
323 mksc_shrn_iscd=code,
324 stck_avls=str(market_cap),
325 data_rank=str(raw.get("data_rank", "")),
326 hts_kor_isnm=raw.get("hts_kor_isnm", ""),
328 # 시세/등락(옵션)
329 stck_prpr=str(raw.get("stck_prpr")) if raw.get("stck_prpr") is not None else None,
330 prdy_vrss=str(raw.get("prdy_vrss")) if raw.get("prdy_vrss") is not None else None,
331 prdy_vrss_sign=raw.get("prdy_vrss_sign"),
332 prdy_ctrt=str(raw.get("prdy_ctrt")) if raw.get("prdy_ctrt") is not None else None,
334 # 거래/상장(옵션)
335 acml_vol=str(raw.get("acml_vol")) if raw.get("acml_vol") is not None else None,
336 lstn_stcn=str(raw.get("lstn_stcn")) if raw.get("lstn_stcn") is not None else None,
338 # 시장 비중(옵션)
339 mrkt_whol_avls_rlim=str(raw.get("mrkt_whol_avls_rlim")) if raw.get(
340 "mrkt_whol_avls_rlim") is not None else None,
342 # 호환 alias (있으면 넣고, 없어도 __post_init__에서 보완)
343 iscd=raw.get("iscd"),
344 acc_trdvol=str(raw.get("acc_trdvol")) if raw.get("acc_trdvol") is not None else None,
345 )
347 results.append(item)
349 except (ValueError, TypeError, KeyError, ValidationError) as e:
350 self._logger.warning(f"시가총액 상위 종목 개별 항목 파싱 오류: {e}, 항목: {raw}")
351 continue
353 # (선택) 정렬: API가 정렬 보장 안 하면 rank 우선, 없으면 시총 내림차순
354 def _to_int_safe(s, default=10 ** 12):
355 try:
356 return int(str(s))
357 except Exception:
358 return default
360 if results:
361 # data_rank가 있으면 그걸로, 없으면 stck_avls(큰값 우선)
362 if any(r.data_rank for r in results):
363 results.sort(key=lambda r: _to_int_safe(r.data_rank, default=10 ** 12))
364 else:
365 results.sort(key=lambda r: _to_int_safe(r.stck_avls, default=0), reverse=True)
367 # 응답: 결과 없으면 "없음"으로 분기(뷰 로직에 맞춰 조정)
368 if results:
369 return ResCommonResponse(
370 rt_cd=ErrorCode.SUCCESS.value,
371 msg1="시가총액 상위 종목 조회 성공",
372 data=results,
373 )
374 else:
375 return ResCommonResponse(
376 rt_cd=ErrorCode.EMPTY_VALUES.value, # 없으면 EMPTY_VALUES 등 내부 규약 코드 사용
377 msg1="시가총액 상위 종목 없음",
378 data=[],
379 )
381 async def inquire_daily_itemchartprice(self, stock_code: str,
382 start_date: str = '',
383 end_date: str = '',
384 fid_period_div_code: str = 'D',
385 fid_input_iscd: str = '00',
386 fid_org_adj_prc: str = '0',
387 exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
388 """
389 일별/주별/월별/분별/틱별 주식 시세 차트 데이터를 조회합니다.
390 TRID: FHKST03010100 (일별), FHNKF03060000 (분봉)
391 ResCommonResponse 형태로 반환하며, data 필드에 List[ResDailyChartApiItem] 포함.
392 """
393 valid_period_codes = {"D", "W", "M", "Y"}
394 if fid_period_div_code not in valid_period_codes:
395 error_msg = f"지원하지 않는 fid_period_div_code: {fid_period_div_code}"
396 self._logger.error(error_msg)
397 return ResCommonResponse(
398 rt_cd=ErrorCode.INVALID_INPUT.value,
399 msg1=error_msg,
400 data=[]
401 )
403 tr_id = self._trid_provider.daily_itemchartprice()
405 self._logger.debug(f"차트 조회 시도 현재 tr_id: {tr_id}")
407 market_code = _exchange_to_market_code(exchange)
408 params = Params.daily_itemchartprice(stock_code=stock_code, start_date=start_date, end_date=end_date, period=fid_period_div_code, market=market_code)
410 with self._headers.temp(tr_id=tr_id):
411 response_data: ResCommonResponse = await self.call_api(
412 "GET", EndpointKey.DAILY_ITEMCHARTPRICE, params=params
413 )
415 if response_data.rt_cd != ErrorCode.SUCCESS.value:
416 error_msg = f"API 응답 비정상: None, 응답: {response_data.data}"
417 self._logger.error(error_msg)
418 return ResCommonResponse(
419 rt_cd=ErrorCode.API_ERROR.value,
420 msg1=error_msg,
421 data=[]
422 )
424 if not response_data.data: # None 또는 빈 리스트
425 warning_msg = f"일별 시세 차트 데이터가 비어있음 (stock_code: {stock_code})"
426 self._logger.warning(warning_msg)
427 return ResCommonResponse(
428 rt_cd=ErrorCode.MISSING_KEY.value,
429 msg1=warning_msg,
430 data=[]
431 )
433 raw = response_data.data
434 if isinstance(raw, dict):
435 # KIS 관례: output1 = 요약/메타, output2 = 캔들 리스트
436 output_list = raw.get("output2") or raw.get("output") or raw.get("output1") or []
437 else:
438 output_list = raw or []
440 # 리스트 보장
441 if not isinstance(output_list, list):
442 output_list = [output_list] if output_list else []
444 chart_data_items: List[ResDailyChartApiItem] = []
445 for item in output_list:
446 try:
447 if not isinstance(item, dict):
448 raise TypeError(f"Chart data item is not a dictionary: {type(item)}")
449 # from_dict가 있으면 쓰고, 없으면 **unpack
450 if hasattr(ResDailyChartApiItem, "from_dict"): 450 ↛ 453line 450 didn't jump to line 453 because the condition on line 450 was always true
451 chart_data_items.append(ResDailyChartApiItem.from_dict(item))
452 else:
453 chart_data_items.append(ResDailyChartApiItem(**item))
454 except (TypeError, ValueError) as e:
455 self._logger.warning(f"차트 데이터 항목 파싱 오류: {e}, 항목: {item}")
456 continue
458 return ResCommonResponse(
459 rt_cd=ErrorCode.SUCCESS.value,
460 msg1="일별/분봉 차트 데이터 조회 성공",
461 data=chart_data_items
462 )
464 async def inquire_time_itemchartprice(
465 self,
466 stock_code: str,
467 input_hour: str, # "YYYYMMDDHH" (len=10)
468 include_past: str = "Y", # 과거 데이터 포함 여부
469 etc_cls_code: str = "0", # 기타 구분
470 ) -> ResCommonResponse:
471 """
472 분봉(시간) 차트 조회.
473 URL: /uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice
474 TR: FHKST03010200 (모의/실전 동일; TR 테이블에서 TIME_ITEMCHARTPRICE 로 매핑)
475 """
476 # TR-ID: minute leaf 사용(테이블에서 FHKST03010200 으로 매핑해두기)
477 tr_id = self._trid_provider.time_itemchartprice()
478 self._headers.set_tr_id(tr_id)
479 self._headers.set_custtype(self._env.active_config['custtype'])
481 params = Params.time_itemchartprice(
482 stock_code=stock_code,
483 input_hour=input_hour,
484 include_past=include_past,
485 etc_cls_code=etc_cls_code,
486 )
488 self._logger.info(f"[분봉] {stock_code} time-itemchartprice 조회 시도... (input_hour={input_hour})")
489 resp: ResCommonResponse = await self.call_api(
490 "GET", EndpointKey.TIME_ITEMCHARTPRICE, params=params, retry_count=1
491 )
493 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
494 original_msg = resp.msg1 if resp else "응답 없음"
495 error_msg = f"분봉 차트 조회 실패: {original_msg}"
496 self._logger.warning(f"[분봉] 조회 실패: {original_msg}")
497 # 원래의 에러 코드를 유지하면서, 컨텍스트를 추가한 메시지를 반환합니다.
498 return ResCommonResponse(rt_cd=resp.rt_cd if resp else ErrorCode.API_ERROR.value, msg1=error_msg, data=[])
500 raw = resp.data
501 # KIS 관례상 output2(리스트) 우선
502 rows = (raw.get("output2") or raw.get("output") or raw.get("output1") or []) if isinstance(raw, dict) else raw
503 if not isinstance(rows, list):
504 rows = [rows] if rows else []
506 # 분봉 스키마는 일봉과 다를 수 있으므로, 우선 원시 rows 를 그대로 반환
507 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="분봉 차트 조회 성공", data=rows)
509 async def inquire_time_dailychartprice(self,
510 stock_code: str,
511 input_hour: str, # 길이 10 권장 (당일 기준 시간)
512 input_date: str, # "YYYYMMDDHH" (len=10)
513 include_past: str = "Y", # 과거 데이터 포함 여부
514 fid_pw_data_incu_yn: str = "", # 기타 구분
515 ):
516 """
517 일변 분봉(특정 일자) 조회 ※ 모의투자 미지원
518 URL : /uapi/domestic-stock/v1/quotations/inquire-time-dailychartprice
519 TRID: FHKST03010230 (REAL only)
520 """
522 full_config = self._env.active_config
523 tr_id = self._trid_provider.quotations(TrIdLeaf.TIME_DAILY_ITEMCHARTPRICE)
524 self._headers.set_tr_id(tr_id)
525 self._headers.set_custtype(full_config["custtype"])
527 params = Params.time_daily_itemchartprice(
528 stock_code=stock_code,
529 input_hour=input_hour,
530 input_date=input_date,
531 include_past=include_past,
532 fid_pw_data_incu_yn=fid_pw_data_incu_yn,
533 )
534 self._logger.info(f"[분봉-일변] {stock_code} 조회 (hour={input_hour}) (date_1={input_date} ")
535 resp = await self.call_api("GET", EndpointKey.TIME_DAILY_ITEMCHARTPRICE, params=params, retry_count=1)
537 if resp.rt_cd != ErrorCode.SUCCESS.value:
538 return resp
539 raw = resp.data if isinstance(resp.data, dict) else {}
540 rows = raw.get("output2") or raw.get("output") or raw.get("output1") or []
541 rows = rows if isinstance(rows, list) else ([rows] if rows else [])
542 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="일변 분봉 조회 성공", data=rows)
544 async def get_asking_price(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
545 """
546 종목의 실시간 호가(매도/매수 잔량 포함) 정보를 조회합니다.
547 ResCommonResponse 형태로 반환되며, data는 원시 output 딕셔너리입니다.
548 """
549 full_config = self._env.active_config
550 tr_id = self._trid_provider.quotations(TrIdLeaf.ASKING_PRICE)
552 self._headers.set_tr_id(tr_id)
553 self._headers.set_custtype(full_config["custtype"])
555 market_code = _exchange_to_market_code(exchange)
556 params = Params.asking_price(stock_code=stock_code, market=market_code)
558 self._logger.info(f"{stock_code} 종목 호가잔량 조회 시도...")
560 response: ResCommonResponse = await self.call_api("GET", EndpointKey.ASKING_PRICE, params=params, retry_count=1)
562 if response.rt_cd != ErrorCode.SUCCESS.value:
563 self._logger.warning(f"{stock_code} 호가 정보 조회 실패: {response.msg1}")
564 return response
566 return response
568 async def get_time_concluded_prices(self, stock_code: str, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
569 """
570 종목의 시간대별 체결가/체결량 정보를 조회합니다.
571 """
572 full_config = self._env.active_config
573 tr_id = self._trid_provider.quotations(TrIdLeaf.TIME_CONCLUDE)
575 self._headers.set_tr_id(tr_id)
576 self._headers.set_custtype(full_config["custtype"])
578 market_code = _exchange_to_market_code(exchange)
579 params = Params.time_conclude(stock_code=stock_code, market=market_code)
581 self._logger.info(f"{stock_code} 종목 체결가 조회 시도...")
582 response: ResCommonResponse = await self.call_api("GET", EndpointKey.TIME_CONCLUDE, params=params,
583 retry_count=1)
585 if response.rt_cd != ErrorCode.SUCCESS.value:
586 self._logger.warning(f"{stock_code} 체결가 정보 조회 실패: {response.msg1}")
587 return response
589 return response
591 async def get_top_rise_fall_stocks(self, rise: bool = True) -> ResCommonResponse:
592 """
593 상승률/하락률 상위 종목 조회
594 """
595 full_config = self._env.active_config
596 tr_id = self._trid_provider.quotations(TrIdLeaf.RANKING_FLUCTUATION)
598 self._headers.set_tr_id(tr_id)
599 self._headers.set_custtype(full_config["custtype"])
601 params = (
602 Params.fluctuation_rise() # 상승률 상위
603 if rise
604 else Params.fluctuation_fall() # 하락률 상위
605 )
607 direction = "상승" if rise else "하락"
608 self._logger.info(f"{direction}률 상위 종목 조회 시도...")
609 response = await self.call_api("GET", EndpointKey.RANKING_FLUCTUATION, params=params, retry_count=3)
611 if response.rt_cd != ErrorCode.SUCCESS.value:
612 return response
614 try:
615 if not isinstance(response.data, dict):
616 raise TypeError(f"Expected dict for response data, but got {type(response.data)}")
618 output_list = response.data.get("output", [])
619 if not isinstance(output_list, list):
620 raise TypeError(f"Expected 'output' to be a list, but got {type(output_list)}")
622 stocks = []
623 for row in output_list:
624 if not isinstance(row, dict):
625 raise TypeError(f"Expected item in 'output' to be a dict, but got {type(row)}")
626 stocks.append(ResFluctuation.from_dict(row))
628 return ResCommonResponse(
629 rt_cd=ErrorCode.SUCCESS.value, # Enum 값 사용
630 msg1="종목 정보 조회 성공",
631 data=stocks
632 )
633 except (TypeError, AttributeError, ValidationError) as e:
634 error_msg = f"등락률 응답 형식 오류: {e}, 응답: {response.data!r}"
635 self._logger.error(error_msg)
636 return ResCommonResponse(
637 rt_cd=ErrorCode.PARSING_ERROR.value, # Enum 값 사용
638 msg1=error_msg,
639 data=None
640 )
642 async def get_top_volume_stocks(self) -> ResCommonResponse:
643 """
644 거래량 상위 종목 조회
645 """
646 full_config = self._env.active_config
647 tr_id = self._trid_provider.quotations(TrIdLeaf.RANKING_VOLUME)
649 self._headers.set_tr_id(tr_id)
650 self._headers.set_custtype(full_config["custtype"])
652 params = Params.volume_rank()
654 self._logger.info(f"거래량 상위 종목 조회 시도...")
655 response = await self.call_api("GET", EndpointKey.RANKING_VOLUME, params=params, retry_count=3)
657 if response.rt_cd != ErrorCode.SUCCESS.value:
658 self._logger.warning(f"거래량 상위 조회 실패: {response.msg1}")
659 return response
661 try:
662 if not isinstance(response.data, dict):
663 raise TypeError(f"Expected dict for response data, but got {type(response.data)}")
665 stocks = response.data.get("output", [])
666 if not isinstance(stocks, list):
667 raise TypeError(f"Expected 'output' to be a list, but got {type(stocks)}")
669 return ResCommonResponse(
670 rt_cd=ErrorCode.SUCCESS.value,
671 msg1="거래량 상위 종목 조회 성공",
672 data=stocks
673 )
674 except (TypeError, AttributeError, ValidationError) as e:
675 error_msg = f"거래량 상위 응답 형식 오류: {e}, 응답: {response.data!r}"
676 self._logger.error(error_msg)
677 return ResCommonResponse(
678 rt_cd=ErrorCode.PARSING_ERROR.value,
679 msg1=error_msg,
680 data=None
681 )
684 async def get_investor_trade_by_stock_daily(self, stock_code: str, date: str = None) -> ResCommonResponse:
685 """
686 종목별 투자자 매매동향(일별) 조회
687 실전 전용 (모의투자 미지원)
688 한 번의 호출로 외국인/개인/기관 순매수 데이터를 모두 반환.
689 """
690 if date is None: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 date = datetime.now().strftime("%Y%m%d")
693 full_config = self._env.active_config
694 tr_id = self._trid_provider.quotations(TrIdLeaf.INVESTOR_TRADE_BY_STOCK_DAILY)
696 self._headers.set_tr_id(tr_id)
697 self._headers.set_custtype(full_config["custtype"])
699 params = Params.investor_trade_by_stock_daily(stock_code=stock_code, date=date)
701 response: ResCommonResponse = await self.call_api(
702 "GET", EndpointKey.INVESTOR_TRADE_BY_STOCK_DAILY, params=params, retry_count=1
703 )
705 if response.rt_cd != ErrorCode.SUCCESS.value:
706 return response
708 try:
709 if not isinstance(response.data, dict):
710 raise TypeError(f"Expected dict, got {type(response.data)}")
712 output1 = response.data.get("output1", {})
713 output2 = response.data.get("output2", [])
715 if not isinstance(output2, list) or not output2:
716 return ResCommonResponse(
717 rt_cd=ErrorCode.SUCCESS.value,
718 msg1="데이터 없음",
719 data=None
720 )
722 # output1(현재가 정보) + output2[0](최신 일별 투자자 데이터) 병합
723 # output2의 빈 값("")이 output1의 유효한 값을 덮어쓰지 않도록 처리
724 merged = {}
725 if isinstance(output1, dict): 725 ↛ 727line 725 didn't jump to line 727 because the condition on line 725 was always true
726 merged.update(output1)
727 for k, v in output2[0].items():
728 if v or k not in merged: 728 ↛ 727line 728 didn't jump to line 727 because the condition on line 728 was always true
729 merged[k] = v
731 return ResCommonResponse(
732 rt_cd=ErrorCode.SUCCESS.value,
733 msg1="투자자 매매동향 조회 성공",
734 data=merged
735 )
736 except (TypeError, AttributeError) as e:
737 error_msg = f"투자자 매매동향 응답 형식 오류: {e}"
738 self._logger.error(error_msg)
739 return ResCommonResponse(
740 rt_cd=ErrorCode.PARSING_ERROR.value,
741 msg1=error_msg,
742 data=None
743 )
745 async def get_investor_trade_by_stock_daily_multi(self, stock_code: str, date: str = None, days: int = 3) -> ResCommonResponse:
746 """
747 종목별 투자자 매매동향(일별) 다중일 조회
748 실전 전용 (모의투자 미지원)
749 output2[:days] 를 리스트로 반환. 각 항목은 {frgn_ntby_tr_pbmn, orgn_ntby_tr_pbmn, acml_tr_pbmn, stck_bsop_date, ...} 형태.
750 단위: frgn/orgn_ntby_tr_pbmn 은 백만원, acml_tr_pbmn 은 원.
751 """
752 if date is None:
753 date = datetime.now().strftime("%Y%m%d")
755 full_config = self._env.active_config
756 tr_id = self._trid_provider.quotations(TrIdLeaf.INVESTOR_TRADE_BY_STOCK_DAILY)
758 self._headers.set_tr_id(tr_id)
759 self._headers.set_custtype(full_config["custtype"])
761 params = Params.investor_trade_by_stock_daily(stock_code=stock_code, date=date)
763 response: ResCommonResponse = await self.call_api(
764 "GET", EndpointKey.INVESTOR_TRADE_BY_STOCK_DAILY, params=params, retry_count=1
765 )
767 if response.rt_cd != ErrorCode.SUCCESS.value: 767 ↛ 768line 767 didn't jump to line 768 because the condition on line 767 was never true
768 return response
770 try:
771 if not isinstance(response.data, dict):
772 raise TypeError(f"Expected dict, got {type(response.data)}")
774 output2 = response.data.get("output2", [])
776 if not isinstance(output2, list) or not output2:
777 return ResCommonResponse(
778 rt_cd=ErrorCode.SUCCESS.value,
779 msg1="데이터 없음",
780 data=[]
781 )
783 return ResCommonResponse(
784 rt_cd=ErrorCode.SUCCESS.value,
785 msg1="투자자 매매동향 다중일 조회 성공",
786 data=output2[:days]
787 )
788 except (TypeError, AttributeError) as e:
789 error_msg = f"투자자 매매동향 다중일 응답 형식 오류: {e}"
790 self._logger.error(error_msg)
791 return ResCommonResponse(
792 rt_cd=ErrorCode.PARSING_ERROR.value,
793 msg1=error_msg,
794 data=None
795 )
797 async def get_program_trade_by_stock_daily(self, stock_code: str, date: str = None) -> ResCommonResponse:
798 """
799 종목별 프로그램매매추이(일별) 조회
800 실전 전용 (모의투자 미지원)
801 """
802 if date is None:
803 date = datetime.now().strftime("%Y%m%d")
805 full_config = self._env.active_config
806 tr_id = self._trid_provider.quotations(TrIdLeaf.PROGRAM_TRADE_BY_STOCK_DAILY)
808 self._headers.set_tr_id(tr_id)
809 self._headers.set_custtype(full_config["custtype"])
811 params = Params.program_trade_by_stock_daily(stock_code=stock_code, date=date)
813 response: ResCommonResponse = await self.call_api(
814 "GET", EndpointKey.PROGRAM_TRADE_BY_STOCK_DAILY, params=params, retry_count=1
815 )
817 if response.rt_cd != ErrorCode.SUCCESS.value:
818 return response
820 try:
821 if not isinstance(response.data, dict):
822 raise TypeError(f"Expected dict, got {type(response.data)}")
824 output = response.data.get("output", [])
826 if not isinstance(output, list) or not output:
827 return ResCommonResponse(
828 rt_cd=ErrorCode.SUCCESS.value,
829 msg1="데이터 없음",
830 data=None
831 )
833 # output[0] = 최신 일별 프로그램매매 데이터
834 return ResCommonResponse(
835 rt_cd=ErrorCode.SUCCESS.value,
836 msg1="프로그램매매추이 조회 성공",
837 data=output[0]
838 )
839 except (TypeError, AttributeError) as e:
840 error_msg = f"프로그램매매추이 응답 형식 오류: {e}"
841 self._logger.error(error_msg)
842 return ResCommonResponse(
843 rt_cd=ErrorCode.PARSING_ERROR.value,
844 msg1=error_msg,
845 data=None
846 )
848 # async def get_stock_news(self, stock_code: str) -> ResCommonResponse:
849 # """
850 # 종목 뉴스 조회
851 # """
852 # full_config = self._env.active_config
853 #
854 # path = full_config["paths"]["item_news"]
855 # tr_id = full_config["tr_ids"]["quotations"]["item_news"]
856 #
857 # self._headers["tr_id"] = tr_id
858 # self._headers["custtype"] = full_config["custtype"]
859 #
860 # params = {
861 # "fid_input_iscd": stock_code
862 # }
863 #
864 # self._logger.info(f"{stock_code} 종목 뉴스 조회 시도...")
865 # response = await self.call_api("GET", path, params=params, retry_count=1)
866 #
867 # if response.rt_cd != ErrorCode.SUCCESS.value:
868 # self._logger.warning(f"{stock_code} 종목 뉴스 조회 실패: {response.msg1}")
869 # return response
870 #
871 # return response
873 async def get_multi_price(self, stock_codes: list[str]) -> ResCommonResponse:
874 """
875 복수종목 현재가 조회 (최대 30종목)
876 URL: /uapi/domestic-stock/v1/quotations/intstock-multprice
877 TR: FHKST11300006 (실전 전용)
878 """
879 if not stock_codes:
880 return ResCommonResponse(
881 rt_cd=ErrorCode.INVALID_INPUT.value,
882 msg1="종목코드 리스트가 비어 있습니다.",
883 data=[]
884 )
886 if len(stock_codes) > 30:
887 self._logger.warning(f"최대 30종목까지 조회 가능합니다. 30개로 제한합니다. (요청: {len(stock_codes)}개)")
888 stock_codes = stock_codes[:30]
890 full_config = self._env.active_config
891 tr_id = self._trid_provider.quotations(TrIdLeaf.MULTI_PRICE)
893 self._headers.set_tr_id(tr_id)
894 self._headers.set_custtype(full_config["custtype"])
896 params = Params.multi_price(stock_codes=stock_codes)
898 self._logger.info(f"복수종목 현재가 조회 시도 ({len(stock_codes)}종목: {stock_codes[:5]}{'...' if len(stock_codes) > 5 else ''})")
899 response: ResCommonResponse = await self.call_api("GET", EndpointKey.MULTI_PRICE, params=params, retry_count=1)
901 if response.rt_cd != ErrorCode.SUCCESS.value:
902 self._logger.warning(f"복수종목 현재가 조회 실패: {response.msg1}")
903 return response
905 if not response.data:
906 return ResCommonResponse(
907 rt_cd=ErrorCode.EMPTY_VALUES.value,
908 msg1="복수종목 현재가 데이터 없음",
909 data=[]
910 )
912 raw = response.data
913 output_list = raw.get("output", []) if isinstance(raw, dict) else raw
914 if not isinstance(output_list, list): 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true
915 output_list = [output_list] if output_list else []
917 # 필드명 정규화: inter_shrn_iscd → stck_shrn_iscd, inter2_prpr → stck_prpr
918 FIELD_MAP = {
919 "inter_shrn_iscd": "stck_shrn_iscd",
920 "inter2_prpr": "stck_prpr",
921 }
922 normalized = []
923 for item in output_list:
924 if not isinstance(item, dict):
925 continue
926 mapped = {}
927 for k, v in item.items():
928 mapped[FIELD_MAP.get(k, k)] = v
929 normalized.append(mapped)
931 return ResCommonResponse(
932 rt_cd=ErrorCode.SUCCESS.value,
933 msg1="복수종목 현재가 조회 성공",
934 data=normalized
935 )
937 async def get_etf_info(self, etf_code: str) -> ResCommonResponse:
938 """
939 ETF 정보 조회
940 """
941 full_config = self._env.active_config
942 tr_id = self._trid_provider.quotations(TrIdLeaf.ETF_INFO)
944 self._headers.set_tr_id(tr_id)
945 self._headers.set_custtype(full_config["custtype"])
947 params = Params.etf_info(etf_code=etf_code)
949 self._logger.info(f"{etf_code} ETF 정보 조회 시도...")
950 response = await self.call_api("GET", EndpointKey.ETF_INFO, params=params, retry_count=1)
952 if response.rt_cd != ErrorCode.SUCCESS.value:
953 self._logger.warning(f"{etf_code} ETF 조회 실패: {response.msg1}")
954 return response
956 return response
958 async def get_financial_ratio(self, stock_code: str) -> ResCommonResponse:
959 """기업 재무비율 조회 (영업이익 증가율 등).
961 TR ID: FHKST66430300 (KIS 문서 확인 필요, 실전 전용 가능성)
962 모의투자 환경에서 미지원 시 API_ERROR 반환 → 호출 측에서 graceful degradation.
963 """
964 full_config = self._env.active_config
965 tr_id = self._trid_provider.quotations(TrIdLeaf.FINANCIAL_RATIO)
967 self._headers.set_tr_id(tr_id)
968 self._headers.set_custtype(full_config['custtype'])
970 params = Params.financial_ratio(stock_code=stock_code)
972 self._logger.info(f"{stock_code} 기업 재무비율 조회 시도...")
973 response: ResCommonResponse = await self.call_api(
974 "GET", EndpointKey.FINANCIAL_RATIO, params=params, retry_count=1,
975 )
977 if response.rt_cd != ErrorCode.SUCCESS.value:
978 self._logger.warning(f"{stock_code} 재무비율 조회 실패: {response.msg1}")
979 return response
981 return response
983 async def check_holiday(self, date: str) -> ResCommonResponse:
984 """국내 휴장일 조회 (CTCA0903R)"""
985 full_config = self._env.active_config
986 tr_id = self._trid_provider.quotations(TrIdLeaf.CHK_HOLIDAY)
988 self._headers.set_tr_id(tr_id)
989 self._headers.set_custtype(full_config['custtype'])
991 params = Params.check_holiday(date)
993 # EndpointKey.CHK_HOLIDAY는 전체 경로로 설정되어 있음
994 return await self.call_api(
995 "GET", EndpointKey.CHK_HOLIDAY, params=params, retry_count=1
996 )