Coverage for brokers / korea_investment / korea_invest_api_base.py: 95%

183 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# brokers/korea_investment/korea_invest_api_base.py 

2 

3import requests 

4import json 

5try: 

6 import orjson as _orjson 

7 def _dumps(obj) -> str: return _orjson.dumps(obj).decode() 

8except ImportError: 

9 _orjson = None 

10 _dumps = json.dumps 

11import certifi 

12import logging 

13import asyncio # 비동기 처리를 위해 추가 

14import httpx # 비동기 처리를 위해 requests 대신 httpx 사용 

15import ssl 

16from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv # TokenProvider를 import 

17from common.types import ErrorCode, ResCommonResponse 

18from typing import Union, Optional 

19from brokers.korea_investment.korea_invest_header_provider import build_header_provider_from_env, \ 

20 KoreaInvestHeaderProvider 

21from brokers.korea_investment.korea_invest_url_provider import KoreaInvestUrlProvider 

22from brokers.korea_investment.korea_invest_trid_provider import KoreaInvestTrIdProvider 

23 

24 

25class ApiRetryError(Exception): 

26 """재시도 가능한 API 오류 (예: 일시적 네트워크 오류, Rate Limit, 토큰 만료)""" 

27 def __init__(self, message, delay=0.0): 

28 super().__init__(message) 

29 self.delay = delay 

30 

31class ApiFatalError(Exception): 

32 """복구 불가능한 API 오류 (예: 파싱 실패, 비즈니스 로직 오류)""" 

33 def __init__(self, message, rt_cd=None): 

34 super().__init__(message) 

35 self.rt_cd = rt_cd 

36 

37class KoreaInvestApiBase: 

38 """ 

39 모든 한국투자증권 API 호출 클래스가 공통적으로 사용할 기본 클래스입니다. 

40 requests.Session을 사용하여 연결 효율성을 높입니다. 

41 """ 

42 

43 def __init__(self, env: KoreaInvestApiEnv, 

44 logger=None, 

45 market_clock=None, 

46 async_client: Optional[httpx.AsyncClient] = None, 

47 header_provider: Optional[KoreaInvestHeaderProvider] = None, 

48 url_provider: Optional[KoreaInvestUrlProvider] = None, 

49 trid_provider: Optional[KoreaInvestTrIdProvider] = None): 

50 self._logger = logger if logger else logging.getLogger(__name__) 

51 self.market_clock = market_clock 

52 self._env = env 

53 self._base_url = None 

54 self._headers: KoreaInvestHeaderProvider = header_provider or build_header_provider_from_env(env) 

55 self._url_provider: KoreaInvestUrlProvider = url_provider or KoreaInvestUrlProvider.from_env_and_kis_config(env) 

56 self._trid_provider = trid_provider or KoreaInvestTrIdProvider.from_config_loader(env) 

57 self._use_real_auth: bool = False # True면 항상 실전 인증 사용 (조회 API) 

58 

59 if async_client: 

60 self._async_session = async_client 

61 else: 

62 ssl_context = ssl.create_default_context(cafile=certifi.where()) 

63 self._async_session = httpx.AsyncClient(verify=ssl_context) 

64 

65 # urllib3 로거의 DEBUG 레벨을 비활성화하여 call_api의 DEBUG 로그와 분리 

66 logging.getLogger('urllib3.connectionpool').setLevel(logging.WARNING) 

67 logging.getLogger('httpcore').setLevel(logging.WARNING) # httpx의 하위 로거 

68 

69 # ✅ 하위 클래스가 URL 만들 때 쓰는 헬퍼 

70 def url(self, key_or_path) -> str: 

71 return self._url_provider.url(key_or_path) 

72 

73 async def call_api(self, 

74 method, 

75 key_or_path, 

76 params=None, 

77 data=None, 

78 expect_standard_schema: bool = True, 

79 retry_count=10, 

80 delay=1): 

81 url = self.url(key_or_path) 

82 

83 for attempt in range(1, retry_count + 1): 

84 try: 

85 self._logger.debug(f"API 호출 시도 {attempt}/{retry_count} - {method} {url}") 

86 

87 response = await self._execute_request(method, url, params, data) 

88 

89 # 네트워크 에러 시 ResCommonResponse가 반환될 수 있음 → 바로 리턴 

90 if isinstance(response, ResCommonResponse): 

91 return response 

92 

93 # 예외 기반 처리: 성공 시 dict 반환, 실패 시 예외 발생 

94 result_data = await self._handle_response(response, expect_standard_schema) 

95 

96 return ResCommonResponse( 

97 rt_cd=ErrorCode.SUCCESS.value, 

98 msg1="정상", 

99 data=result_data 

100 ) 

101 

102 except ApiRetryError as e: 

103 # 지수 백오프 적용: 기본 delay * 2^(attempt-1) 

104 if e.delay > 0: 

105 wait_time = e.delay 

106 else: 

107 wait_time = delay * (2 ** (attempt - 1)) 

108 self._logger.warning(f"재시도 필요: {attempt}/{retry_count}, 사유: {e}, 지연 {wait_time}초") 

109 await self.market_clock.async_sleep(wait_time) 

110 continue 

111 

112 except ApiFatalError as e: 

113 self._logger.error(f"복구 불가능한 오류 발생: {url}, 사유: {e}") 

114 return ResCommonResponse( 

115 rt_cd=str(e.rt_cd) if e.rt_cd else ErrorCode.PARSING_ERROR.value, 

116 msg1=f"API 오류: {e}", 

117 data=None 

118 ) 

119 

120 except Exception as e: 

121 self._log_request_exception(e) 

122 if attempt < retry_count: 

123 self._logger.info(f"예외 발생, 재시도: {attempt}/{retry_count}, 지연 {delay}초") 

124 await self.market_clock.async_sleep(delay) # 이 부분이 호출되어야 함 

125 continue 

126 else: 

127 pass 

128 

129 self._logger.error("모든 재시도 실패, API 호출 종료") 

130 return ResCommonResponse( 

131 rt_cd=ErrorCode.RETRY_LIMIT.value, 

132 msg1=f"최대 재시도 횟수 초과", 

133 data=None 

134 ) 

135 

136 async def close_session(self): 

137 """애플리케이션 종료 시 httpx 세션을 닫습니다.""" 

138 await self._async_session.aclose() 

139 self._logger.info("HTTP 클라이언트 세션이 종료되었습니다.") 

140 

141 def _log_headers(self): 

142 self._logger.debug("\nDEBUG: Headers being sent:") 

143 for key, value in self._headers.build().items(): 

144 try: 

145 encoded_value = str(value).encode('latin-1', errors='ignore') 

146 except UnicodeEncodeError: 

147 self._logger.debug(f" {key}: *** UnicodeEncodeError ***") 

148 except Exception as e: 

149 self._logger.debug(f" {key}: *** {type(e).__name__}: {e} ***") 

150 else: 

151 self._logger.debug(f" {key}: {encoded_value}") 

152 

153 def _log_request_exception(self, e): 

154 if isinstance(e, httpx.HTTPStatusError): 

155 self._logger.error(f"HTTP 오류 발생 (httpx): {e.response.status_code} - {e.response.text}", exc_info=True) 

156 elif isinstance(e, requests.exceptions.HTTPError): 

157 self._logger.error(f"HTTP 오류 발생 (requests): {e.response.status_code} - {e.response.text}", exc_info=True) 

158 elif isinstance(e, requests.exceptions.ConnectionError): 

159 self._logger.error(f"연결 오류 발생: {e}", exc_info=True) 

160 elif isinstance(e, requests.exceptions.Timeout): 

161 self._logger.error(f"타임아웃 오류 발생: {e}", exc_info=True) 

162 elif isinstance(e, requests.exceptions.RequestException): # requests 관련 일반 예외 

163 self._logger.error(f"요청 예외 발생 (requests): {e}", exc_info=True) 

164 elif isinstance(e, httpx.RequestError): # httpx 관련 일반 요청 오류 (연결, 타임아웃 등) 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 self._logger.error(f"요청 예외 발생 (httpx): {e}", exc_info=True) 

166 elif isinstance(e, json.JSONDecodeError): 

167 self._logger.error("JSON 디코딩 오류 발생", exc_info=True) 

168 else: 

169 self._logger.error(f"예상치 못한 예외 발생: {e}", exc_info=True) 

170 

171 async def _execute_request(self, method, url, params, data): 

172 loop = asyncio.get_running_loop() 

173 response = None 

174 token_refreshed = False # ✅ 토큰 재발급 여부 플래그 

175 

176 async def make_request(): 

177 self._headers.sync_from_env(self._env) 

178 

179 if self._use_real_auth: 

180 # 조회 API: 항상 실전 인증 사용 

181 access_token = await self._env.get_real_access_token() 

182 real_cfg = self._env.get_real_config() 

183 self._headers.set_app_keys(real_cfg['api_key'], real_cfg['api_secret_key']) 

184 else: 

185 access_token = await self._env.get_access_token() 

186 self._headers.set_app_keys(self._env.active_config['api_key'], self._env.active_config['api_secret_key']) 

187 

188 if not isinstance(access_token, str) or access_token is None: 

189 raise ValueError("접근 토큰이 없습니다. KoreaInvestEnv에서 먼저 토큰을 발급받아야 합니다.") 

190 

191 self._headers.set_auth_bearer(access_token) 

192 headers = self._headers.build() 

193 # self._log_headers() 

194 

195 if method.upper() == 'GET': 

196 self._logger.debug(f"[GET] 요청 Url: {url}") 

197 self._logger.debug(f"[GET] 요청 Headers: {headers}") 

198 self._logger.debug(f"[GET] 요청 Data: {params}") 

199 return await self._async_session.get(url, headers=headers, params=params) 

200 elif method.upper() == 'POST': 

201 json_body = _dumps(data) if data else None 

202 

203 self._logger.debug(f"[POST] 요청 Url: {url}") 

204 self._logger.debug(f"[POST] 요청 Headers: {headers}") 

205 self._logger.debug(f"[POST] 요청 Data: {json_body}") 

206 

207 return await self._async_session.post( 

208 url, 

209 headers=headers, 

210 data=json_body, # json 넘기면 실패. 

211 ) 

212 else: 

213 raise ValueError(f"지원하지 않는 HTTP 메서드: {method}") 

214 

215 try: 

216 response = await make_request() 

217 if response is None: 

218 raise ValueError("response is None") 

219 

220 res_json = response.json() 

221 

222 # ✅ 토큰 만료 응답 감지 시 재발급 + 재시도 (단 1회만) 

223 if isinstance(res_json, dict) and res_json.get("msg_cd") == "EGW00123" and not token_refreshed: 

224 self._logger.warning("🔁 토큰 만료 감지 (EGW00123). 재발급 후 1회 재시도") 

225 await self.market_clock.async_sleep(3) 

226 await self._env.refresh_token() 

227 token_refreshed = True # ✅ 재시도 플래그 설정 

228 

229 # ✅ 강제 delay 삽입 

230 

231 # ✅ 반드시 새로 가져온 토큰으로 Authorization 헤더 재세팅 

232 new_token = await self._env.get_access_token() 

233 self._headers.set_auth_bearer(new_token) # ✅ 메서드 사용 

234 self._logger.debug(f"✅ 재발급 후 토큰 적용 확인: {new_token[:40]}...") 

235 

236 response = await make_request() 

237 

238 except httpx.RequestError as e: 

239 if self._logger: 239 ↛ 243line 239 didn't jump to line 243 because the condition on line 239 was always true

240 self._logger.error(f"요청 예외 발생 (httpx): {str(e)}") 

241 auth = self._headers.build().get("Authorization", "") # ✅ 안전 조회 

242 self._logger.debug(f"[EGW00123 대응] 현재 Authorization 헤더: {auth[:40]}...") 

243 return ResCommonResponse(rt_cd=ErrorCode.NETWORK_ERROR.value, msg1=str(e), data=None) 

244 

245 return response 

246 

247 async def _handle_response(self, response, expect_standard_schema: bool = True) -> dict: 

248 """HTTP 응답을 처리하고, 오류 유형에 따라 재시도 여부를 결정합니다.""" 

249 # 1. 호출 제한 오류 (Rate Limit) - 최상단에서 가장 먼저 검사하고 즉시 반환 

250 if response.status_code == 429 or \ 

251 (response.status_code == 500 and "초당 거래건수를 초과하였습니다" in response.text): 

252 raise ApiRetryError("Rate limit exceeded") 

253 

254 # 2. 그 외의 HTTP 오류 (HTTP 상태 코드 자체로 인한 오류) 

255 try: 

256 response.raise_for_status() 

257 except httpx.HTTPStatusError as e: 

258 self._logger.error(f"HTTP 오류 발생: {e.response.status_code} - {e.response.text}") 

259 raise ApiFatalError(f"HTTP Error {e.response.status_code}", rt_cd=ErrorCode.NETWORK_ERROR.value) 

260 

261 # 3. 성공적인 응답 처리 (JSON 디코딩) 

262 try: 

263 response_json = response.json() 

264 except (json.JSONDecodeError, ValueError): 

265 self._logger.error(f"응답 JSON 디코딩 실패: {response.text}") 

266 raise ApiFatalError("JSON Decode Error", rt_cd=ErrorCode.PARSING_ERROR.value) 

267 

268 # 3-1. 응답 형식 검증 (dict 여부) 

269 if not isinstance(response_json, dict): 

270 self._logger.error(f"API 응답 형식이 dict가 아님: {type(response_json)}") 

271 raise ApiFatalError("Response is not a dict", rt_cd=ErrorCode.PARSING_ERROR.value) 

272 

273 # 4. 토큰 만료 오류 처리 (API 응답 내용 기반) 

274 if response_json.get('msg_cd') == 'EGW00123': 

275 self._logger.error("최종 토큰 만료 오류(EGW00123) 감지.") 

276 self._env.invalidate_token() 

277 raise ApiRetryError("Token expired") 

278 

279 if not expect_standard_schema: 

280 # ✅ 표준 스키마(rt_cd 등) 미적용 엔드포인트: 2xx면 성공으로 간주 

281 return response_json 

282 

283 # 5. API 비즈니스 로직 오류 (rt_cd가 '0'이 아님) 

284 # 이 검사는 429/500 rate limit 케이스에서는 도달하지 않아야 합니다. 

285 if response_json.get('rt_cd') is None or response_json.get('rt_cd') != '0': 

286 # msg1이 있을 경우에만 로깅, 없을 경우 "None" 로깅 방지 

287 error_message = response_json.get('msg1', '알 수 없는 비즈니스 오류') 

288 self._logger.error(f"API 비즈니스 오류: {error_message}") 

289 raise ApiFatalError(f"Business Error: {error_message}", rt_cd=ErrorCode.API_ERROR.value) 

290 

291 # 6. 데이터 존재 여부 검증 (성공 응답인 경우) 

292 if not any(key in response_json for key in ["output", "output1", "output2"]): 

293 self._logger.error(f"API 응답에 output 데이터가 없습니다: {response.text}") 

294 raise ApiFatalError("Missing output data", rt_cd=ErrorCode.PARSING_ERROR.value) 

295 

296 # 모든 검사를 통과한 최종 성공적인 응답 

297 self._logger.debug(f"API 응답 성공: {response.text}") 

298 return response_json