Coverage for brokers / korea_investment / korea_invest_api_base.py: 95%
183 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# brokers/korea_investment/korea_invest_api_base.py
3import requests
4import json
5try:
6 import orjson as _orjson
7 def _dumps(obj) -> str: return _orjson.dumps(obj).decode()
8except ImportError:
9 _orjson = None
10 _dumps = json.dumps
11import certifi
12import logging
13import asyncio # 비동기 처리를 위해 추가
14import httpx # 비동기 처리를 위해 requests 대신 httpx 사용
15import ssl
16from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv # TokenProvider를 import
17from common.types import ErrorCode, ResCommonResponse
18from typing import Union, Optional
19from brokers.korea_investment.korea_invest_header_provider import build_header_provider_from_env, \
20 KoreaInvestHeaderProvider
21from brokers.korea_investment.korea_invest_url_provider import KoreaInvestUrlProvider
22from brokers.korea_investment.korea_invest_trid_provider import KoreaInvestTrIdProvider
25class ApiRetryError(Exception):
26 """재시도 가능한 API 오류 (예: 일시적 네트워크 오류, Rate Limit, 토큰 만료)"""
27 def __init__(self, message, delay=0.0):
28 super().__init__(message)
29 self.delay = delay
31class ApiFatalError(Exception):
32 """복구 불가능한 API 오류 (예: 파싱 실패, 비즈니스 로직 오류)"""
33 def __init__(self, message, rt_cd=None):
34 super().__init__(message)
35 self.rt_cd = rt_cd
37class KoreaInvestApiBase:
38 """
39 모든 한국투자증권 API 호출 클래스가 공통적으로 사용할 기본 클래스입니다.
40 requests.Session을 사용하여 연결 효율성을 높입니다.
41 """
43 def __init__(self, env: KoreaInvestApiEnv,
44 logger=None,
45 market_clock=None,
46 async_client: Optional[httpx.AsyncClient] = None,
47 header_provider: Optional[KoreaInvestHeaderProvider] = None,
48 url_provider: Optional[KoreaInvestUrlProvider] = None,
49 trid_provider: Optional[KoreaInvestTrIdProvider] = None):
50 self._logger = logger if logger else logging.getLogger(__name__)
51 self.market_clock = market_clock
52 self._env = env
53 self._base_url = None
54 self._headers: KoreaInvestHeaderProvider = header_provider or build_header_provider_from_env(env)
55 self._url_provider: KoreaInvestUrlProvider = url_provider or KoreaInvestUrlProvider.from_env_and_kis_config(env)
56 self._trid_provider = trid_provider or KoreaInvestTrIdProvider.from_config_loader(env)
57 self._use_real_auth: bool = False # True면 항상 실전 인증 사용 (조회 API)
59 if async_client:
60 self._async_session = async_client
61 else:
62 ssl_context = ssl.create_default_context(cafile=certifi.where())
63 self._async_session = httpx.AsyncClient(verify=ssl_context)
65 # urllib3 로거의 DEBUG 레벨을 비활성화하여 call_api의 DEBUG 로그와 분리
66 logging.getLogger('urllib3.connectionpool').setLevel(logging.WARNING)
67 logging.getLogger('httpcore').setLevel(logging.WARNING) # httpx의 하위 로거
69 # ✅ 하위 클래스가 URL 만들 때 쓰는 헬퍼
70 def url(self, key_or_path) -> str:
71 return self._url_provider.url(key_or_path)
73 async def call_api(self,
74 method,
75 key_or_path,
76 params=None,
77 data=None,
78 expect_standard_schema: bool = True,
79 retry_count=10,
80 delay=1):
81 url = self.url(key_or_path)
83 for attempt in range(1, retry_count + 1):
84 try:
85 self._logger.debug(f"API 호출 시도 {attempt}/{retry_count} - {method} {url}")
87 response = await self._execute_request(method, url, params, data)
89 # 네트워크 에러 시 ResCommonResponse가 반환될 수 있음 → 바로 리턴
90 if isinstance(response, ResCommonResponse):
91 return response
93 # 예외 기반 처리: 성공 시 dict 반환, 실패 시 예외 발생
94 result_data = await self._handle_response(response, expect_standard_schema)
96 return ResCommonResponse(
97 rt_cd=ErrorCode.SUCCESS.value,
98 msg1="정상",
99 data=result_data
100 )
102 except ApiRetryError as e:
103 # 지수 백오프 적용: 기본 delay * 2^(attempt-1)
104 if e.delay > 0:
105 wait_time = e.delay
106 else:
107 wait_time = delay * (2 ** (attempt - 1))
108 self._logger.warning(f"재시도 필요: {attempt}/{retry_count}, 사유: {e}, 지연 {wait_time}초")
109 await self.market_clock.async_sleep(wait_time)
110 continue
112 except ApiFatalError as e:
113 self._logger.error(f"복구 불가능한 오류 발생: {url}, 사유: {e}")
114 return ResCommonResponse(
115 rt_cd=str(e.rt_cd) if e.rt_cd else ErrorCode.PARSING_ERROR.value,
116 msg1=f"API 오류: {e}",
117 data=None
118 )
120 except Exception as e:
121 self._log_request_exception(e)
122 if attempt < retry_count:
123 self._logger.info(f"예외 발생, 재시도: {attempt}/{retry_count}, 지연 {delay}초")
124 await self.market_clock.async_sleep(delay) # 이 부분이 호출되어야 함
125 continue
126 else:
127 pass
129 self._logger.error("모든 재시도 실패, API 호출 종료")
130 return ResCommonResponse(
131 rt_cd=ErrorCode.RETRY_LIMIT.value,
132 msg1=f"최대 재시도 횟수 초과",
133 data=None
134 )
136 async def close_session(self):
137 """애플리케이션 종료 시 httpx 세션을 닫습니다."""
138 await self._async_session.aclose()
139 self._logger.info("HTTP 클라이언트 세션이 종료되었습니다.")
141 def _log_headers(self):
142 self._logger.debug("\nDEBUG: Headers being sent:")
143 for key, value in self._headers.build().items():
144 try:
145 encoded_value = str(value).encode('latin-1', errors='ignore')
146 except UnicodeEncodeError:
147 self._logger.debug(f" {key}: *** UnicodeEncodeError ***")
148 except Exception as e:
149 self._logger.debug(f" {key}: *** {type(e).__name__}: {e} ***")
150 else:
151 self._logger.debug(f" {key}: {encoded_value}")
153 def _log_request_exception(self, e):
154 if isinstance(e, httpx.HTTPStatusError):
155 self._logger.error(f"HTTP 오류 발생 (httpx): {e.response.status_code} - {e.response.text}", exc_info=True)
156 elif isinstance(e, requests.exceptions.HTTPError):
157 self._logger.error(f"HTTP 오류 발생 (requests): {e.response.status_code} - {e.response.text}", exc_info=True)
158 elif isinstance(e, requests.exceptions.ConnectionError):
159 self._logger.error(f"연결 오류 발생: {e}", exc_info=True)
160 elif isinstance(e, requests.exceptions.Timeout):
161 self._logger.error(f"타임아웃 오류 발생: {e}", exc_info=True)
162 elif isinstance(e, requests.exceptions.RequestException): # requests 관련 일반 예외
163 self._logger.error(f"요청 예외 발생 (requests): {e}", exc_info=True)
164 elif isinstance(e, httpx.RequestError): # httpx 관련 일반 요청 오류 (연결, 타임아웃 등) 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 self._logger.error(f"요청 예외 발생 (httpx): {e}", exc_info=True)
166 elif isinstance(e, json.JSONDecodeError):
167 self._logger.error("JSON 디코딩 오류 발생", exc_info=True)
168 else:
169 self._logger.error(f"예상치 못한 예외 발생: {e}", exc_info=True)
171 async def _execute_request(self, method, url, params, data):
172 loop = asyncio.get_running_loop()
173 response = None
174 token_refreshed = False # ✅ 토큰 재발급 여부 플래그
176 async def make_request():
177 self._headers.sync_from_env(self._env)
179 if self._use_real_auth:
180 # 조회 API: 항상 실전 인증 사용
181 access_token = await self._env.get_real_access_token()
182 real_cfg = self._env.get_real_config()
183 self._headers.set_app_keys(real_cfg['api_key'], real_cfg['api_secret_key'])
184 else:
185 access_token = await self._env.get_access_token()
186 self._headers.set_app_keys(self._env.active_config['api_key'], self._env.active_config['api_secret_key'])
188 if not isinstance(access_token, str) or access_token is None:
189 raise ValueError("접근 토큰이 없습니다. KoreaInvestEnv에서 먼저 토큰을 발급받아야 합니다.")
191 self._headers.set_auth_bearer(access_token)
192 headers = self._headers.build()
193 # self._log_headers()
195 if method.upper() == 'GET':
196 self._logger.debug(f"[GET] 요청 Url: {url}")
197 self._logger.debug(f"[GET] 요청 Headers: {headers}")
198 self._logger.debug(f"[GET] 요청 Data: {params}")
199 return await self._async_session.get(url, headers=headers, params=params)
200 elif method.upper() == 'POST':
201 json_body = _dumps(data) if data else None
203 self._logger.debug(f"[POST] 요청 Url: {url}")
204 self._logger.debug(f"[POST] 요청 Headers: {headers}")
205 self._logger.debug(f"[POST] 요청 Data: {json_body}")
207 return await self._async_session.post(
208 url,
209 headers=headers,
210 data=json_body, # json 넘기면 실패.
211 )
212 else:
213 raise ValueError(f"지원하지 않는 HTTP 메서드: {method}")
215 try:
216 response = await make_request()
217 if response is None:
218 raise ValueError("response is None")
220 res_json = response.json()
222 # ✅ 토큰 만료 응답 감지 시 재발급 + 재시도 (단 1회만)
223 if isinstance(res_json, dict) and res_json.get("msg_cd") == "EGW00123" and not token_refreshed:
224 self._logger.warning("🔁 토큰 만료 감지 (EGW00123). 재발급 후 1회 재시도")
225 await self.market_clock.async_sleep(3)
226 await self._env.refresh_token()
227 token_refreshed = True # ✅ 재시도 플래그 설정
229 # ✅ 강제 delay 삽입
231 # ✅ 반드시 새로 가져온 토큰으로 Authorization 헤더 재세팅
232 new_token = await self._env.get_access_token()
233 self._headers.set_auth_bearer(new_token) # ✅ 메서드 사용
234 self._logger.debug(f"✅ 재발급 후 토큰 적용 확인: {new_token[:40]}...")
236 response = await make_request()
238 except httpx.RequestError as e:
239 if self._logger: 239 ↛ 243line 239 didn't jump to line 243 because the condition on line 239 was always true
240 self._logger.error(f"요청 예외 발생 (httpx): {str(e)}")
241 auth = self._headers.build().get("Authorization", "") # ✅ 안전 조회
242 self._logger.debug(f"[EGW00123 대응] 현재 Authorization 헤더: {auth[:40]}...")
243 return ResCommonResponse(rt_cd=ErrorCode.NETWORK_ERROR.value, msg1=str(e), data=None)
245 return response
247 async def _handle_response(self, response, expect_standard_schema: bool = True) -> dict:
248 """HTTP 응답을 처리하고, 오류 유형에 따라 재시도 여부를 결정합니다."""
249 # 1. 호출 제한 오류 (Rate Limit) - 최상단에서 가장 먼저 검사하고 즉시 반환
250 if response.status_code == 429 or \
251 (response.status_code == 500 and "초당 거래건수를 초과하였습니다" in response.text):
252 raise ApiRetryError("Rate limit exceeded")
254 # 2. 그 외의 HTTP 오류 (HTTP 상태 코드 자체로 인한 오류)
255 try:
256 response.raise_for_status()
257 except httpx.HTTPStatusError as e:
258 self._logger.error(f"HTTP 오류 발생: {e.response.status_code} - {e.response.text}")
259 raise ApiFatalError(f"HTTP Error {e.response.status_code}", rt_cd=ErrorCode.NETWORK_ERROR.value)
261 # 3. 성공적인 응답 처리 (JSON 디코딩)
262 try:
263 response_json = response.json()
264 except (json.JSONDecodeError, ValueError):
265 self._logger.error(f"응답 JSON 디코딩 실패: {response.text}")
266 raise ApiFatalError("JSON Decode Error", rt_cd=ErrorCode.PARSING_ERROR.value)
268 # 3-1. 응답 형식 검증 (dict 여부)
269 if not isinstance(response_json, dict):
270 self._logger.error(f"API 응답 형식이 dict가 아님: {type(response_json)}")
271 raise ApiFatalError("Response is not a dict", rt_cd=ErrorCode.PARSING_ERROR.value)
273 # 4. 토큰 만료 오류 처리 (API 응답 내용 기반)
274 if response_json.get('msg_cd') == 'EGW00123':
275 self._logger.error("최종 토큰 만료 오류(EGW00123) 감지.")
276 self._env.invalidate_token()
277 raise ApiRetryError("Token expired")
279 if not expect_standard_schema:
280 # ✅ 표준 스키마(rt_cd 등) 미적용 엔드포인트: 2xx면 성공으로 간주
281 return response_json
283 # 5. API 비즈니스 로직 오류 (rt_cd가 '0'이 아님)
284 # 이 검사는 429/500 rate limit 케이스에서는 도달하지 않아야 합니다.
285 if response_json.get('rt_cd') is None or response_json.get('rt_cd') != '0':
286 # msg1이 있을 경우에만 로깅, 없을 경우 "None" 로깅 방지
287 error_message = response_json.get('msg1', '알 수 없는 비즈니스 오류')
288 self._logger.error(f"API 비즈니스 오류: {error_message}")
289 raise ApiFatalError(f"Business Error: {error_message}", rt_cd=ErrorCode.API_ERROR.value)
291 # 6. 데이터 존재 여부 검증 (성공 응답인 경우)
292 if not any(key in response_json for key in ["output", "output1", "output2"]):
293 self._logger.error(f"API 응답에 output 데이터가 없습니다: {response.text}")
294 raise ApiFatalError("Missing output data", rt_cd=ErrorCode.PARSING_ERROR.value)
296 # 모든 검사를 통과한 최종 성공적인 응답
297 self._logger.debug(f"API 응답 성공: {response.text}")
298 return response_json