Coverage for brokers / korea_investment / korea_invest_token_provider.py: 96%
94 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1import os
2import json
3import httpx # 비동기 HTTP 클라이언트
4from datetime import datetime, timedelta
5import logging
6import pytz # pytz 임포트
7from typing import Optional
10class TokenProvider:
11 """
12 한국투자증권 API의 액세스 토큰 관리를 전담하는 클래스.
13 - 토큰을 파일에 저장하여 영속성을 보장합니다.
14 - 토큰의 유효성을 검사하고, 만료 시 자동으로 재발급합니다.
15 """
17 def __init__(self, token_file_path: Optional[str] = None, logger=None):
18 """
19 :param token_file_path: 명시적으로 토큰 파일 경로를 지정할 수 있음
20 :param is_paper_trading: True면 모의투자용 토큰 파일, False면 실전투자용
21 """
22 self.token_file_path = token_file_path
23 self._access_token = None
24 self._token_expired_at = None
25 self._logger = logger if logger else logging.getLogger(__name__)
27 async def get_access_token(self, base_url: str, app_key: str, app_secret: str) -> str: # env 인자 대신 필요한 정보만 받음
28 """유효한 액세스 토큰을 반환합니다. 필요 시 파일에서 로드하거나 새로 발급합니다."""
29 # 1. 메모리에 토큰이 있고 유효한지 먼저 확인
30 if self._access_token and self._is_token_valid():
31 return self._access_token
33 # 2. 파일에서 토큰을 로드하고 유효한지 확인
34 self._load_token_from_file()
35 if self._access_token and self._is_token_valid():
36 # 파일에서 로드한 토큰이 현재 환경의 base_url과 일치하는지 확인
37 loaded_token_base_url = self._get_token_base_url_from_file()
38 if loaded_token_base_url == base_url: # 전달받은 base_url과 비교
39 self._logger.info("파일에서 유효한 토큰을 로드했습니다.")
40 return self._access_token
41 else:
42 self._logger.warning(f"파일에서 로드한 토큰의 base_url이 현재 환경과 다릅니다. 저장된: {loaded_token_base_url}, 현재: {base_url}. 새 토큰 발급 필요.")
43 self._access_token = None # base_url이 다르면 토큰 무효화
44 self._token_expired_at = None
46 # 3. 위 모든 경우에 해당하지 않으면 새로 발급
47 self._logger.info("새로운 액세스 토큰을 발급합니다.")
48 await self._issue_new_token(base_url, app_key, app_secret) # 필요한 정보 전달
50 return self._access_token
52 def _is_token_valid(self):
53 """토큰이 존재하고, 만료되지 않았는지 확인합니다. (5분 여유시간)"""
54 if not self._access_token or not self._token_expired_at:
55 return False
56 # 만료 시간 5분 전에 갱신하도록 여유를 둡니다.
57 now_kst = datetime.now(pytz.timezone('Asia/Seoul'))
58 # self._logger.debug(
59 # f"현재 시각: {now_kst}, 만료 시각: {self._token_expired_at}, 기준 시각: {self._token_expired_at - timedelta(minutes=10)}")
61 return now_kst < self._token_expired_at - timedelta(minutes=10)
63 def _load_token_from_file(self):
64 """파일에서 토큰 정보를 로드합니다."""
65 try:
66 with open(self.token_file_path, 'r') as f:
67 token_data = json.load(f)
68 self._access_token = token_data.get('access_token')
69 expiry_str = token_data.get('expired_at')
71 if expiry_str:
72 # fromisoformat()이 이미 타임존 정보를 파싱하므로, localize()를 다시 호출할 필요 없음
73 self._token_expired_at = datetime.fromisoformat(expiry_str)
75 except (FileNotFoundError, json.JSONDecodeError):
76 self._logger.warning(f"토큰 파일({self.token_file_path})을 찾을 수 없거나 형식이 잘못되었습니다. 새 토큰 발급을 시도합니다.")
77 self._access_token = None
78 self._token_expired_at = None
80 def _get_token_base_url_from_file(self):
81 """토큰 파일에서 base_url을 읽어옵니다."""
82 try:
83 if not os.path.exists(self.token_file_path):
84 return None
85 with open(self.token_file_path, 'r') as f:
86 token_data = json.load(f)
87 return token_data.get('base_url') # base_url 필드를 읽음
88 except (FileNotFoundError, json.JSONDecodeError):
89 return None
91 def _save_token_to_file(self, base_url_for_token: str): # base_url 인자 추가
92 """현재 토큰 정보를 파일에 저장합니다."""
93 token_data = {
94 'access_token': self._access_token,
95 'expired_at': self._token_expired_at.isoformat(),
96 'base_url': base_url_for_token # base_url도 함께 저장
97 }
98 os.makedirs(os.path.dirname(self.token_file_path), exist_ok=True)
99 with open(self.token_file_path, 'w') as f:
100 json.dump(token_data, f, indent=4)
101 self._logger.info("새 토큰을 파일에 저장했습니다.")
103 async def _issue_new_token(self, base_url: str, app_key: str, app_secret: str): # 필요한 정보만 받음
104 """API 서버에 요청하여 새로운 토큰을 발급받고, 상태를 업데이트합니다."""
106 if not base_url or not app_key or not app_secret:
107 self._logger.critical("토큰 발급에 필요한 환경 설정(base_url, app_key, app_secret)이 부족합니다.")
108 raise ValueError("Missing environment configuration for token issuance.")
110 url = f"{base_url}/oauth2/tokenP" # 전달받은 base_url 사용
111 body = {
112 "grant_type": "client_credentials",
113 "appkey": app_key, # 전달받은 app_key 사용
114 "appsecret": app_secret # 전달받은 app_secret 사용
115 }
116 async with httpx.AsyncClient() as client:
117 headers = {
118 "Cache-Control": "no-cache",
119 "Pragma": "no-cache"
120 }
121 response = await client.post(url, json=body, headers=headers)
122 # response = await client.post(url, json=body)
123 response.raise_for_status()
124 res_data = response.json()
126 self._access_token = res_data.get('access_token')
127 self._logger.info(f"✅ _issue_new_token - {self._access_token}")
128 expires_in = int(res_data.get('expires_in', 0))
130 # KST timezone을 고려하여 datetime 객체 생성
131 kst_timezone = pytz.timezone('Asia/Seoul')
132 self._token_expired_at = kst_timezone.localize(datetime.now() + timedelta(seconds=expires_in))
134 self._save_token_to_file(base_url) # 현재 발급된 토큰의 base_url 저장
136 def invalidate_token(self):
137 """외부에서 토큰 만료를 감지했을 때, 현재 토큰을 강제로 무효화합니다."""
138 self._access_token = None
139 self._token_expired_at = None
140 if os.path.exists(self.token_file_path):
141 os.remove(self.token_file_path)
142 self._logger.info("저장된 토큰이 무효화되었습니다.")
144 async def refresh_token(self, base_url: str, app_key: str, app_secret: str):
145 """
146 외부에서 강제로 토큰을 재발급하고 상태를 초기화할 때 사용합니다.
147 EGW00123 오류 응답을 받았을 때 호출하면 됩니다.
148 """
149 self._logger.info("🔁 refresh_token() 호출됨 - 강제 토큰 재발급 시작")
150 if self._access_token: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 self._logger.debug(f"✅ refresh_token 기존 토큰: {self._access_token[:40]}...")
152 else:
153 self._logger.debug("✅ refresh_token 기존 토큰: (None)")
154 self.invalidate_token() # ✅ 캐시/파일 무효화 먼저!
155 await self._issue_new_token(base_url, app_key, app_secret)
157 if self._access_token: 157 ↛ 160line 157 didn't jump to line 160 because the condition on line 157 was always true
158 self._logger.debug(f"✅ refresh_token 재발급 후 토큰: {self._access_token[:40]}...")
159 else:
160 raise Exception
162 self._logger.info("✅ 강제 토큰 재발급 완료")