Coverage for brokers / korea_investment / korea_invest_token_provider.py: 96%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1import os 

2import json 

3import httpx # 비동기 HTTP 클라이언트 

4from datetime import datetime, timedelta 

5import logging 

6import pytz # pytz 임포트 

7from typing import Optional 

8 

9 

10class TokenProvider: 

11 """ 

12 한국투자증권 API의 액세스 토큰 관리를 전담하는 클래스. 

13 - 토큰을 파일에 저장하여 영속성을 보장합니다. 

14 - 토큰의 유효성을 검사하고, 만료 시 자동으로 재발급합니다. 

15 """ 

16 

17 def __init__(self, token_file_path: Optional[str] = None, logger=None): 

18 """ 

19 :param token_file_path: 명시적으로 토큰 파일 경로를 지정할 수 있음 

20 :param is_paper_trading: True면 모의투자용 토큰 파일, False면 실전투자용 

21 """ 

22 self.token_file_path = token_file_path 

23 self._access_token = None 

24 self._token_expired_at = None 

25 self._logger = logger if logger else logging.getLogger(__name__) 

26 

27 async def get_access_token(self, base_url: str, app_key: str, app_secret: str) -> str: # env 인자 대신 필요한 정보만 받음 

28 """유효한 액세스 토큰을 반환합니다. 필요 시 파일에서 로드하거나 새로 발급합니다.""" 

29 # 1. 메모리에 토큰이 있고 유효한지 먼저 확인 

30 if self._access_token and self._is_token_valid(): 

31 return self._access_token 

32 

33 # 2. 파일에서 토큰을 로드하고 유효한지 확인 

34 self._load_token_from_file() 

35 if self._access_token and self._is_token_valid(): 

36 # 파일에서 로드한 토큰이 현재 환경의 base_url과 일치하는지 확인 

37 loaded_token_base_url = self._get_token_base_url_from_file() 

38 if loaded_token_base_url == base_url: # 전달받은 base_url과 비교 

39 self._logger.info("파일에서 유효한 토큰을 로드했습니다.") 

40 return self._access_token 

41 else: 

42 self._logger.warning(f"파일에서 로드한 토큰의 base_url이 현재 환경과 다릅니다. 저장된: {loaded_token_base_url}, 현재: {base_url}. 새 토큰 발급 필요.") 

43 self._access_token = None # base_url이 다르면 토큰 무효화 

44 self._token_expired_at = None 

45 

46 # 3. 위 모든 경우에 해당하지 않으면 새로 발급 

47 self._logger.info("새로운 액세스 토큰을 발급합니다.") 

48 await self._issue_new_token(base_url, app_key, app_secret) # 필요한 정보 전달 

49 

50 return self._access_token 

51 

52 def _is_token_valid(self): 

53 """토큰이 존재하고, 만료되지 않았는지 확인합니다. (5분 여유시간)""" 

54 if not self._access_token or not self._token_expired_at: 

55 return False 

56 # 만료 시간 5분 전에 갱신하도록 여유를 둡니다. 

57 now_kst = datetime.now(pytz.timezone('Asia/Seoul')) 

58 # self._logger.debug( 

59 # f"현재 시각: {now_kst}, 만료 시각: {self._token_expired_at}, 기준 시각: {self._token_expired_at - timedelta(minutes=10)}") 

60 

61 return now_kst < self._token_expired_at - timedelta(minutes=10) 

62 

63 def _load_token_from_file(self): 

64 """파일에서 토큰 정보를 로드합니다.""" 

65 try: 

66 with open(self.token_file_path, 'r') as f: 

67 token_data = json.load(f) 

68 self._access_token = token_data.get('access_token') 

69 expiry_str = token_data.get('expired_at') 

70 

71 if expiry_str: 

72 # fromisoformat()이 이미 타임존 정보를 파싱하므로, localize()를 다시 호출할 필요 없음 

73 self._token_expired_at = datetime.fromisoformat(expiry_str) 

74 

75 except (FileNotFoundError, json.JSONDecodeError): 

76 self._logger.warning(f"토큰 파일({self.token_file_path})을 찾을 수 없거나 형식이 잘못되었습니다. 새 토큰 발급을 시도합니다.") 

77 self._access_token = None 

78 self._token_expired_at = None 

79 

80 def _get_token_base_url_from_file(self): 

81 """토큰 파일에서 base_url을 읽어옵니다.""" 

82 try: 

83 if not os.path.exists(self.token_file_path): 

84 return None 

85 with open(self.token_file_path, 'r') as f: 

86 token_data = json.load(f) 

87 return token_data.get('base_url') # base_url 필드를 읽음 

88 except (FileNotFoundError, json.JSONDecodeError): 

89 return None 

90 

91 def _save_token_to_file(self, base_url_for_token: str): # base_url 인자 추가 

92 """현재 토큰 정보를 파일에 저장합니다.""" 

93 token_data = { 

94 'access_token': self._access_token, 

95 'expired_at': self._token_expired_at.isoformat(), 

96 'base_url': base_url_for_token # base_url도 함께 저장 

97 } 

98 os.makedirs(os.path.dirname(self.token_file_path), exist_ok=True) 

99 with open(self.token_file_path, 'w') as f: 

100 json.dump(token_data, f, indent=4) 

101 self._logger.info("새 토큰을 파일에 저장했습니다.") 

102 

103 async def _issue_new_token(self, base_url: str, app_key: str, app_secret: str): # 필요한 정보만 받음 

104 """API 서버에 요청하여 새로운 토큰을 발급받고, 상태를 업데이트합니다.""" 

105 

106 if not base_url or not app_key or not app_secret: 

107 self._logger.critical("토큰 발급에 필요한 환경 설정(base_url, app_key, app_secret)이 부족합니다.") 

108 raise ValueError("Missing environment configuration for token issuance.") 

109 

110 url = f"{base_url}/oauth2/tokenP" # 전달받은 base_url 사용 

111 body = { 

112 "grant_type": "client_credentials", 

113 "appkey": app_key, # 전달받은 app_key 사용 

114 "appsecret": app_secret # 전달받은 app_secret 사용 

115 } 

116 async with httpx.AsyncClient() as client: 

117 headers = { 

118 "Cache-Control": "no-cache", 

119 "Pragma": "no-cache" 

120 } 

121 response = await client.post(url, json=body, headers=headers) 

122 # response = await client.post(url, json=body) 

123 response.raise_for_status() 

124 res_data = response.json() 

125 

126 self._access_token = res_data.get('access_token') 

127 self._logger.info(f"✅ _issue_new_token - {self._access_token}") 

128 expires_in = int(res_data.get('expires_in', 0)) 

129 

130 # KST timezone을 고려하여 datetime 객체 생성 

131 kst_timezone = pytz.timezone('Asia/Seoul') 

132 self._token_expired_at = kst_timezone.localize(datetime.now() + timedelta(seconds=expires_in)) 

133 

134 self._save_token_to_file(base_url) # 현재 발급된 토큰의 base_url 저장 

135 

136 def invalidate_token(self): 

137 """외부에서 토큰 만료를 감지했을 때, 현재 토큰을 강제로 무효화합니다.""" 

138 self._access_token = None 

139 self._token_expired_at = None 

140 if os.path.exists(self.token_file_path): 

141 os.remove(self.token_file_path) 

142 self._logger.info("저장된 토큰이 무효화되었습니다.") 

143 

144 async def refresh_token(self, base_url: str, app_key: str, app_secret: str): 

145 """ 

146 외부에서 강제로 토큰을 재발급하고 상태를 초기화할 때 사용합니다. 

147 EGW00123 오류 응답을 받았을 때 호출하면 됩니다. 

148 """ 

149 self._logger.info("🔁 refresh_token() 호출됨 - 강제 토큰 재발급 시작") 

150 if self._access_token: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 self._logger.debug(f"✅ refresh_token 기존 토큰: {self._access_token[:40]}...") 

152 else: 

153 self._logger.debug("✅ refresh_token 기존 토큰: (None)") 

154 self.invalidate_token() # ✅ 캐시/파일 무효화 먼저! 

155 await self._issue_new_token(base_url, app_key, app_secret) 

156 

157 if self._access_token: 157 ↛ 160line 157 didn't jump to line 160 because the condition on line 157 was always true

158 self._logger.debug(f"✅ refresh_token 재발급 후 토큰: {self._access_token[:40]}...") 

159 else: 

160 raise Exception 

161 

162 self._logger.info("✅ 강제 토큰 재발급 완료")