Coverage for brokers / korea_investment / korea_invest_websocket_api.py: 89%

416 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# api/korea_invest_websocket_api.py 

2from typing import Optional 

3import websockets # pip install websockets 

4import json 

5try: 

6 import orjson as _orjson 

7 def _loads(s): return _orjson.loads(s) 

8 def _dumps(obj) -> str: return _orjson.dumps(obj).decode() 

9except ImportError: 

10 _orjson = None 

11 _loads = json.loads 

12 _dumps = json.dumps 

13import logging 

14import requests 

15import certifi # requests의 SSL 인증서 검증에 필요 

16import asyncio # 비동기 처리를 위해 필요 

17import os # os.urandom (gt_uid 생성용) 

18 

19from Crypto.Cipher import AES # pip install pycryptodome 

20from Crypto.Util.Padding import unpad 

21from base64 import b64decode 

22 

23from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv # KoreaInvestEnv 클래스 임포트 

24from core.market_clock import MarketClock 

25from services.market_calendar_service import MarketCalendarService 

26 

27 

28class KoreaInvestWebSocketAPI: 

29 """ 

30 한국투자증권 Open API의 웹소켓 연결 및 실시간 데이터 수신을 관리하는 클래스입니다. 

31 `websockets` 라이브러리(asyncio 기반)를 사용하며, 다양한 실시간 데이터 파싱을 포함합니다. 

32 """ 

33 

34 def __init__(self, env: KoreaInvestApiEnv, logger=None, market_clock: MarketClock = None, 

35 market_calendar_service: Optional[MarketCalendarService] = None): 

36 self._env = env 

37 self._market_clock = market_clock 

38 self._mcs = market_calendar_service 

39 self._logger = logger if logger else logging.getLogger(__name__) 

40 # self._config = self._env.get_full_config() # 환경 설정 전체를 가져옴 (tr_ids 포함) 

41 # config에서 웹소켓 및 REST API 정보 가져오기 

42 # self._websocket_url = self._config['websocket_url'] 

43 # self._rest_api_key = self._config['api_key'] 

44 # self._rest_api_secret = self._config['api_secret_key'] 

45 # self._base_rest_url = self._config['base_url'] 

46 self._websocket_url = None 

47 self._rest_api_key = None 

48 self._rest_api_secret = None 

49 self._base_rest_url = None 

50 

51 self.ws = None # 웹소켓 연결 객체 (websockets.WebSocketClientProtocol) 

52 self.approval_key = None # 웹소켓 접속 키 (REST API로 발급) 

53 self._is_connected = False # 웹소켓 연결 상태 플래그 

54 self._auto_reconnect = False # 자동 재연결 활성화 플래그 

55 self._receive_task = None # 메시지 수신을 위한 asyncio.Task 

56 

57 # 실시간 메시지 수신 시 외부에서 등록할 콜백 함수 (TradingService의 핸들러) 

58 self.on_realtime_message_callback = None 

59 

60 # 암호화된 체결 통보 메시지 복호화를 위한 AES 키/IV 

61 # H0IFCNI0, H0STCNI0, H0MFCNI0, H0EUCNI0, H0STCNI9 등 통보 TR_ID 구독 시 서버로부터 수신 

62 self._aes_key = None 

63 self._aes_iv = None 

64 

65 # 재연결 시 복구를 위한 구독 목록 저장소 set((tr_id, tr_key)) 

66 self._subscribed_items = set() 

67 

68 def _aes_cbc_base64_dec(self, key, iv, cipher_text): 

69 """ 

70 AES256 DECODE (Base64 인코딩된 암호문을 복호화) 

71 :param key: AES256 Secret Key (str) 

72 :param iv: AES256 Initialize Vector (str) 

73 :param cipher_text: Base64 인코딩된 AES256 암호문 (str) 

74 :return: 복호화된 문자열 (str) 

75 """ 

76 try: 

77 cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv.encode('utf-8')) 

78 return bytes.decode(unpad(cipher.decrypt(b64decode(cipher_text)), AES.block_size)) 

79 except Exception as e: 

80 self._logger.exception(f"AES 복호화 오류 발생: {e} (key: {key[:5]}..., iv: {iv[:5]}..., cipher: {cipher_text[:50]}...)") 

81 

82 return None 

83 

84 async def _get_approval_key(self): 

85 """ 

86 웹소켓 접속 키(approval_key)를 한국투자증권 REST API를 통해 발급받습니다. 

87 """ 

88 self._websocket_url = self._env.active_config['websocket_url'] 

89 self._base_rest_url = self._env.active_config['base_url'] 

90 self._rest_api_key= self._env.active_config['api_key'] 

91 self._rest_api_secret= self._env.active_config['api_secret_key'] 

92 

93 path = "/oauth2/Approval" 

94 url = f"{self._base_rest_url}{path}" 

95 headers = {"content-type": "application/json; utf-8"} 

96 body = { 

97 "grant_type": "client_credentials", 

98 "appkey": self._rest_api_key, 

99 "secretkey": self._rest_api_secret # 웹소켓 접속키 발급 시 secretkey 필드명 사용 

100 } 

101 

102 self._logger.info("웹소켓 접속키 발급 시도...") 

103 try: 

104 # requests는 동기 함수이므로 asyncio의 loop.run_in_executor를 사용하여 비동기적으로 실행 

105 loop = asyncio.get_running_loop() 

106 res = await loop.run_in_executor( 

107 None, # 기본 ThreadPoolExecutor 사용 

108 lambda: requests.post(url, headers=headers, data=json.dumps(body), verify=certifi.where()) 

109 ) 

110 

111 res.raise_for_status() # HTTP 오류(4xx, 5xx) 발생 시 예외 발생 

112 auth_data = res.json() 

113 

114 if auth_data and auth_data.get('approval_key'): 

115 self.approval_key = auth_data['approval_key'] 

116 self._logger.info(f"웹소켓 접속키 발급 성공: {self.approval_key[:10]}...") # 키의 일부만 로깅 

117 return self.approval_key 

118 else: 

119 self._logger.exception(f"웹소켓 접속키 발급 실패 - 응답 데이터 오류: {auth_data}") 

120 return None 

121 except requests.exceptions.RequestException as e: 

122 self._logger.exception(f"웹소켓 접속키 발급 중 네트워크 오류: {e}") 

123 return None 

124 except json.JSONDecodeError: 

125 self._logger.exception(f"웹소켓 접속키 발급 응답 JSON 디코딩 실패: {res.text if res else '응답 없음'}") 

126 return None 

127 except Exception as e: 

128 self._logger.exception(f"웹소켓 접속키 발급 중 알 수 없는 오류: {e}") 

129 return None 

130 

131 async def _establish_connection(self): 

132 """웹소켓 연결을 수립하는 내부 메서드 (재연결 로직에서 재사용).""" 

133 self._websocket_url = self._env.get_websocket_url() 

134 

135 # 1. approval_key 발급 (없으면 발급) 

136 if not self.approval_key: 

137 self.approval_key = await self._get_approval_key() 

138 if not self.approval_key: 

139 self._logger.error("웹소켓 접속 키 발급 실패로 연결할 수 없습니다.") 

140 return False 

141 

142 # 2. 웹소켓 연결 

143 try: 

144 self._logger.info(f"웹소켓 연결 시도: {self._websocket_url}") 

145 self.ws = await websockets.connect(self._websocket_url, ping_interval=20, ping_timeout=20) 

146 self._is_connected = True 

147 self._logger.info("웹소켓 연결 성공.") 

148 return True 

149 except Exception as e: 

150 self._logger.exception(f"웹소켓 연결 중 오류 발생: {e}") 

151 self._is_connected = False 

152 self.ws = None 

153 return False 

154 

155 async def _receive_messages(self): 

156 """웹소켓 메시지 수신 및 자동 재연결 루프.""" 

157 retry_count = 0 

158 max_retries = 30 # 최대 재시도 횟수 (약 30분간 시도) 

159 base_delay = 3 # 기본 대기 시간 (초) 

160 max_delay = 60 # 최대 대기 시간 (초) 

161 DATA_TIMEOUT = 60.0 # [추가] 데이터 수신 타임아웃 (초) 

162 

163 while self._auto_reconnect: 

164 # 1. 연결이 끊겨있다면 재연결 시도 

165 if not self._is_connected: 

166 # 장 운영 시간 확인 (MarketCalendarService가 있을 경우) 

167 if self._mcs and not await self._mcs.is_market_open_now(): 

168 self._logger.info("장이 종료되어 자동 재연결을 중단합니다.") 

169 self._auto_reconnect = False 

170 break 

171 

172 if retry_count >= max_retries: 

173 self._logger.error(f"웹소켓 재연결 실패: 최대 재시도 횟수({max_retries})를 초과했습니다.") 

174 self._auto_reconnect = False 

175 break 

176 

177 delay = min(max_delay, base_delay * (2 ** retry_count)) 

178 self._logger.info(f"웹소켓 재연결 대기 중 ({delay}초)... (시도 {retry_count + 1}/{max_retries})") 

179 await asyncio.sleep(delay) 

180 

181 retry_count += 1 

182 

183 if await self._establish_connection(): 

184 self._logger.info("웹소켓 재연결 성공. 기존 구독 항목을 복구합니다.") 

185 retry_count = 0 # 연결 성공 시 재시도 카운트 초기화 

186 try: 

187 await self._resubscribe_all() 

188 except Exception as e: 

189 self._logger.error(f"구독 복구 중 오류 발생: {e}") 

190 continue 

191 

192 # 2. 메시지 수신 

193 try: 

194 # [수정] 타임아웃 적용: 일정 시간 데이터가 없으면 Dead Connection으로 간주하고 재연결 

195 message = await asyncio.wait_for(self.ws.recv(), timeout=DATA_TIMEOUT) 

196 self._handle_websocket_message(message) 

197 except asyncio.TimeoutError: 

198 # 장 운영 시간 중에만 재연결 시도 (장 마감 후에는 데이터가 없는 것이 정상이므로 무시) 

199 if self._mcs and not await self._mcs.is_market_open_now(): 

200 self._logger.warning(f"{DATA_TIMEOUT}초간 데이터 수신 없음 (Dead Connection 의심). 재연결을 시도합니다.") 

201 self._is_connected = False 

202 if self.ws: 

203 await self.ws.close() 

204 self.ws = None 

205 self.approval_key = None # 재연결 시 새로운 접속키 발급 강제 

206 # 장 마감 후에는 타임아웃 발생해도 연결 유지 (Ping/Pong은 내부적으로 처리됨) 

207 except Exception as e: 

208 if self._auto_reconnect: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was always true

209 self._logger.warning(f"웹소켓 연결 끊김 ({e}). 재연결을 시도합니다.", exc_info=True) 

210 self._is_connected = False 

211 self.ws = None 

212 self.approval_key = None # 재연결 시 새로운 접속키 발급 강제 

213 

214 def _handle_websocket_message(self, message: str): 

215 """수신된 웹소켓 메시지를 파싱하고 등록된 콜백으로 전달.""" 

216 self._websocket_url = self._env.active_config['websocket_url'] 

217 self._base_rest_url = self._env.active_config['base_url'] 

218 self._rest_api_key= self._env.active_config['api_key'] 

219 self._rest_api_secret= self._env.active_config['api_secret_key'] 

220 

221 # 한국투자증권 실시간 데이터는 '|'로 구분된 문자열 또는 JSON 객체로 수신됨 

222 if message and (message.startswith('0|') or message.startswith('1|')): # 실시간 데이터 (0: 일반, 1: 체결통보) 

223 recvstr = message.split('|') 

224 tr_id = recvstr[1] # 두 번째 요소가 TR_ID 

225 data_body = recvstr[3] # 네 번째 요소가 실제 데이터 본문 

226 

227 self._logger.debug(f"받은 TR_ID: {tr_id}") 

228 self._logger.debug(f"비교 대상: {self._env.active_config['tr_ids']['websocket']['realtime_price']}") 

229 

230 parsed_data = {} 

231 message_type = 'unknown' 

232 

233 # --- 주식 관련 실시간 데이터 파싱 --- 

234 if tr_id == self._env.active_config['tr_ids']['websocket']['realtime_price']: # H0STCNT0 (주식 체결) 

235 parsed_data = self._parse_stock_contract_data(data_body) 

236 message_type = 'realtime_price' 

237 elif tr_id == self._env.active_config['tr_ids']['websocket'].get('unified_realtime_price', 'H0UNCNT0'): # H0UNCNT0 (KRX+NXT 통합 체결) 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 parsed_data = self._parse_stock_contract_data(data_body) # H0STCNT0와 동일 포맷 

239 message_type = 'realtime_price' 

240 elif tr_id == self._env.active_config['tr_ids']['websocket']['realtime_quote']: # H0STASP0 (주식 호가) 

241 parsed_data = self._parse_stock_quote_data(data_body) 

242 message_type = 'realtime_quote' 

243 

244 # --- 파생상품 및 기타 실시간 데이터 파싱 (제공된 예제 코드 기반) --- 

245 # 각 TR_ID에 따라 정확한 파싱 함수 호출 

246 elif tr_id == "H0IFASP0" or tr_id == "H0IOASP0": # 지수선물/옵션 호가 

247 parsed_data = self._parse_futs_optn_quote_data(data_body) 

248 message_type = 'realtime_futs_optn_quote' 

249 elif tr_id == "H0IFCNT0" or tr_id == "H0IOCNT0": # 지수선물/옵션 체결 

250 parsed_data = self._parse_futs_optn_contract_data(data_body) 

251 message_type = 'realtime_futs_optn_contract' 

252 elif tr_id == "H0CFASP0": # 상품선물 호가 

253 parsed_data = self._parse_product_futs_quote_data(data_body) 

254 message_type = 'realtime_product_futs_quote' 

255 elif tr_id == "H0CFCNT0": # 상품선물 체결 

256 parsed_data = self._parse_product_futs_contract_data(data_body) 

257 message_type = 'realtime_product_futs_contract' 

258 elif tr_id == "H0ZFASP0" or tr_id == "H0ZOASP0": # 주식선물/옵션 호가 

259 parsed_data = self._parse_stock_futs_optn_quote_data(data_body) 

260 message_type = 'realtime_stock_futs_optn_quote' 

261 elif tr_id == "H0ZFCNT0" or tr_id == "H0ZOCNT0": # 주식선물/옵션 체결 

262 parsed_data = self._parse_stock_futs_optn_contract_data(data_body) 

263 message_type = 'realtime_stock_futs_optn_contract' 

264 elif tr_id == "H0ZFANC0" or tr_id == "H0ZOANC0": # 주식선물/옵션 예상체결 

265 parsed_data = self._parse_stock_futs_optn_exp_contract_data(data_body) 

266 message_type = 'realtime_stock_futs_optn_exp_contract' 

267 elif tr_id == "H0MFASP0": # 야간선물(CME) 호가 

268 parsed_data = self._parse_cmefuts_quote_data(data_body) 

269 message_type = 'realtime_cmefuts_quote' 

270 elif tr_id == "H0MFCNT0": # 야간선물(CME) 체결 

271 parsed_data = self._parse_cmefuts_contract_data(data_body) 

272 message_type = 'realtime_cmefuts_contract' 

273 elif tr_id == "H0EUASP0": # 야간옵션(EUREX) 호가 

274 parsed_data = self._parse_eurex_optn_quote_data(data_body) 

275 message_type = 'realtime_eurex_optn_quote' 

276 elif tr_id == "H0EUCNT0": # 야간옵션(EUREX) 체결 

277 parsed_data = self._parse_eurex_optn_contract_data(data_body) 

278 message_type = 'realtime_eurex_optn_contract' 

279 elif tr_id == "H0EUANC0": # 야간옵션(EUREX) 예상체결 

280 parsed_data = self._parse_eurex_optn_exp_contract_data(data_body) 

281 message_type = 'realtime_eurex_optn_exp_contract' 

282 

283 # --- 체결/주문 통보 (암호화됨) --- 

284 elif tr_id in ["H0STCNI0", "H0STCNI9", "H0IFCNI0", "H0MFCNI0", "H0EUCNI0"]: # 모든 체결 통보 TR_ID 284 ↛ 297line 284 didn't jump to line 297 because the condition on line 284 was always true

285 if self._aes_key and self._aes_iv: 

286 decrypted_str = self._aes_cbc_base64_dec(self._aes_key, self._aes_iv, data_body) 

287 if decrypted_str: 

288 parsed_data = self._parse_signing_notice(decrypted_str, tr_id) 

289 message_type = 'signing_notice' 

290 else: 

291 self._logger.exception(f"체결통보 복호화 실패: {tr_id}, 데이터: {data_body[:50]}...") 

292 return 

293 else: 

294 self._logger.warning(f"체결통보 암호화 해제 실패: AES 키/IV 없음. TR_ID: {tr_id}, 메시지: {message[:50]}...") 

295 return 

296 

297 elif tr_id == self._env.active_config['tr_ids']['websocket'].get('realtime_program_trading', 'H0STPGM0'): 

298 parsed_data = self._parse_program_trading_data(data_body) 

299 message_type = 'realtime_program_trading' 

300 

301 # [추가] 파싱된 데이터 디버그 로그 (데이터 내용 확인용) 

302 self._logger.debug(f"WS 수신 데이터 파싱: Type={message_type}, TR_ID={tr_id}, Data={parsed_data}") 

303 

304 # 외부 콜백 함수로 파싱된 데이터 전달 

305 if self.on_realtime_message_callback: 305 ↛ exitline 305 didn't return from function '_handle_websocket_message' because the condition on line 305 was always true

306 self.on_realtime_message_callback({'type': message_type, 'tr_id': tr_id, 'data': parsed_data}) 

307 

308 else: # 제어 메시지 (응답, PINGPONG 등) 

309 try: 

310 json_object = _loads(message) 

311 header = json_object.get("header", {}) 

312 tr_id = header.get("tr_id") 

313 

314 if tr_id == "PINGPONG": 

315 self._logger.debug("PINGPONG 수신됨. PONG 응답.") 

316 # websockets 라이브러리 내부에서 PONG 응답 자동 처리 (ping_interval, ping_timeout 설정 시) 

317 elif json_object.get("body", {}).get("rt_cd") == '0': 

318 self._logger.info(f"실시간 요청 응답 성공: TR_KEY={header.get('tr_key')}, MSG={json_object['body']['msg1']}") 

319 # 체결통보용 AES KEY, IV 수신 처리 

320 if tr_id in ["H0IFCNI0", "H0STCNI0", "H0STCNI9", "H0MFCNI0", "H0EUCNI0"] and json_object.get("body", 

321 {}).get( 

322 "output"): 

323 self._aes_key = json_object["body"]["output"].get("key") 

324 self._aes_iv = json_object["body"]["output"].get("iv") 

325 self._logger.info(f"체결통보용 AES KEY/IV 수신 성공. TRID={tr_id}") 

326 else: 

327 self._logger.error( 

328 f"실시간 요청 응답 오류: TR_KEY={header.get('tr_key')}, RT_CD={json_object.get('body', {}).get('rt_cd')}, MSG={json_object.get('body', {}).get('msg1')}") 

329 if json_object.get("body", {}).get("msg1") == 'ALREADY IN SUBSCRIBE': 

330 self._logger.warning("이미 구독 중인 종목입니다.") 

331 except json.JSONDecodeError: 

332 self._logger.exception(f"제어 메시지 JSON 디코딩 실패: {message}") 

333 except Exception as e: 

334 self._logger.exception(f"제어 메시지 처리 중 오류 발생: {e}, 메시지: {message}") 

335 

336 # --- 실시간 데이터 파싱 헬퍼 함수들 --- 

337 

338 def _parse_stock_quote_data(self, data_str): 

339 """H0STASP0 (주식 호가) 데이터를 파싱합니다.""" 

340 recvvalue = data_str.split('^') 

341 # API 문서: 주식호가 (H0STASP0) 필드명과 인덱스 참고 

342 return { 

343 "유가증권단축종목코드": recvvalue[0], "영업시간": recvvalue[1], "시간구분코드": recvvalue[2], 

344 "매도호가1": recvvalue[3], "매도호가2": recvvalue[4], "매도호가3": recvvalue[5], "매도호가4": recvvalue[6], 

345 "매도호가5": recvvalue[7], 

346 "매도호가6": recvvalue[8], "매도호가7": recvvalue[9], "매도호가8": recvvalue[10], "매도호가9": recvvalue[11], 

347 "매도호가10": recvvalue[12], 

348 "매수호가1": recvvalue[13], "매수호가2": recvvalue[14], "매수호가3": recvvalue[15], "매수호가4": recvvalue[16], 

349 "매수호가5": recvvalue[17], 

350 "매수호가6": recvvalue[18], "매수호가7": recvvalue[19], "매수호가8": recvvalue[20], "매수호가9": recvvalue[21], 

351 "매수호가10": recvvalue[22], 

352 "매도호가잔량1": recvvalue[23], "매도호가잔량2": recvvalue[24], "매도호가잔량3": recvvalue[25], "매도호가잔량4": recvvalue[26], 

353 "매도호가잔량5": recvvalue[27], 

354 "매도호가잔량6": recvvalue[28], "매도호가잔량7": recvvalue[29], "매도호가잔량8": recvvalue[30], "매도호가잔량9": recvvalue[31], 

355 "매도호가잔량10": recvvalue[32], 

356 "매수호가잔량1": recvvalue[33], "매수호가잔량2": recvvalue[34], "매수호가잔량3": recvvalue[35], "매수호가잔량4": recvvalue[36], 

357 "매수호가잔량5": recvvalue[37], 

358 "매수호가잔량6": recvvalue[38], "매수호가잔량7": recvvalue[39], "매수호가잔량8": recvvalue[40], "매수호가잔량9": recvvalue[41], 

359 "매수호가잔량10": recvvalue[42], 

360 "총매도호가잔량": recvvalue[43], "총매수호가잔량": recvvalue[44], "시간외총매도호가잔량": recvvalue[45], 

361 "시간외총매수호가잔량": recvvalue[46], 

362 "예상체결가": recvvalue[47], "예상체결량": recvvalue[48], "예상거래량": recvvalue[49], "예상체결대비": recvvalue[50], 

363 "부호": recvvalue[51], 

364 "예상체결전일대비율": recvvalue[52], "누적거래량": recvvalue[53], "주식매매구분코드": recvvalue[58] 

365 } 

366 

367 def _parse_stock_contract_data(self, data_str): 

368 """H0STCNT0 (주식 체결) 데이터를 파싱합니다.""" 

369 menulist = "유가증권단축종목코드|주식체결시간|주식현재가|전일대비부호|전일대비|전일대비율|가중평균주식가격|주식시가|주식최고가|주식최저가|매도호가1|매수호가1|체결거래량|누적거래량|누적거래대금|매도체결건수|매수체결건수|순매수체결건수|체결강도|총매도수량|총매수수량|체결구분|매수비율|전일거래량대비등락율|시가시간|시가대비구분|시가대비|최고가시간|고가대비구분|고가대비|최저가시간|저가대비구분|저가대비|영업일자|신장운영구분코드|거래정지여부|매도호가잔량|매수호가잔량|총매도호가잔량|총매수호가잔량|거래량회전율|전일동시간누적거래량|전일동시간누적거래량비율|시간구분코드|임의종료구분코드|정적VI발동기준가" 

370 keys = menulist.split('|') 

371 values = data_str.split('^') 

372 return dict(zip(keys, values[:len(keys)])) 

373 

374 def _parse_futs_optn_quote_data(self, data_str): 

375 """H0IFASP0, H0IOASP0 (지수선물/옵션 호가) 데이터를 파싱합니다.""" 

376 recvvalue = data_str.split('^') 

377 return { 

378 "종목코드": recvvalue[0], "영업시간": recvvalue[1], 

379 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5], 

380 "매도호가5": recvvalue[6], 

381 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10], 

382 "매수호가5": recvvalue[11], 

383 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15], 

384 "매도호가건수5": recvvalue[16], 

385 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20], 

386 "매수호가건수5": recvvalue[21], 

387 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25], 

388 "매도호가잔량5": recvvalue[26], 

389 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30], 

390 "매수호가잔량5": recvvalue[31], 

391 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33], 

392 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35], 

393 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37] 

394 } 

395 

396 def _parse_futs_optn_contract_data(self, data_str): 

397 """H0IFCNT0, H0IOCNT0 (지수선물/옵션 체결) 데이터를 파싱합니다.""" 

398 menulist = "선물단축종목코드|영업시간|선물전일대비|전일대비부호|선물전일대비율|선물현재가|선물시가|선물최고가|선물최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드|미결제약정수량|미결제약정수량증감|시가시간|시가대비현재가부호|시가대비지수현재가|최고가시간|최고가대비현재가부호|최고가대비지수현재가|최저가시간|최저가대비현재가부호|최저가대비지수현재가|매수비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|선물매도호가|선물매수호가|매도호가잔량|매수호가잔량|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율|협의대량거래량|실시간상한가|실시간하한가|실시간가격제한구분" 

399 keys = menulist.split('|') 

400 values = data_str.split('^') 

401 return dict(zip(keys, values[:len(keys)])) 

402 

403 def _parse_product_futs_quote_data(self, data_str): 

404 """H0CFASP0 (상품선물 호가) 데이터를 파싱합니다.""" 

405 recvvalue = data_str.split('^') 

406 return { 

407 "종목코드": recvvalue[0], "영업시간": recvvalue[1], 

408 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5], 

409 "매도호가5": recvvalue[6], 

410 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10], 

411 "매수호가5": recvvalue[11], 

412 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15], 

413 "매도호가건수5": recvvalue[16], 

414 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20], 

415 "매수호가건수5": recvvalue[21], 

416 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25], 

417 "매도호가잔량5": recvvalue[26], 

418 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30], 

419 "매수호가잔량5": recvvalue[31], 

420 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33], 

421 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35], 

422 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37] 

423 } 

424 

425 def _parse_product_futs_contract_data(self, data_str): 

426 """H0CFCNT0 (상품선물 체결) 데이터를 파싱합니다.""" 

427 menulist = "선물단축종목코드|영업시간|선물전일대비|전일대비부호|선물전일대비율|선물현재가|선물시가|선물최고가|선물최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드|미결제약정수량|미결제약정수량증감|시가시간|시가대비현재가부호|시가대비지수현재가|최고가시간|최고가대비현재가부호|최고가대비지수현재가|최저가시간|최저가대비현재가부호|최저가대비지수현재가|매수비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|선물매도호가|선물매수호가|매도호가잔량|매수호가잔량|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율|협의대량거래량|실시간상한가|실시간하한가|실시간가격제한구분" 

428 keys = menulist.split('|') 

429 values = data_str.split('^') 

430 return dict(zip(keys, values[:len(keys)])) 

431 

432 def _parse_stock_futs_optn_quote_data(self, data_str): 

433 """H0ZFASP0, H0ZOASP0 (주식선물/옵션 호가) 데이터를 파싱합니다.""" 

434 recvvalue = data_str.split('^') 

435 return { 

436 "종목코드": recvvalue[0], "영업시간": recvvalue[1], 

437 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5], 

438 "매도호가5": recvvalue[6], 

439 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10], 

440 "매수호가5": recvvalue[11], 

441 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15], 

442 "매도호가건수5": recvvalue[16], 

443 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20], 

444 "매수호가건수5": recvvalue[21], 

445 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25], 

446 "매도호가잔량5": recvvalue[26], 

447 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30], 

448 "매수호가잔량5": recvvalue[31], 

449 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33], 

450 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35], 

451 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37] 

452 } 

453 

454 def _parse_stock_futs_optn_contract_data(self, data_str): 

455 """H0ZFCNT0, H0ZOCNT0 (주식선물/옵션 체결) 데이터를 파싱합니다.""" 

456 menulist = "선물단축종목코드|영업시간|주식현재가|전일대비부호|전일대비|선물전일대비율|주식시가2|주식최고가|주식최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드1|HTS미결제약정수량|미결제약정수량증감|시가시간|시가2대비현재가부호|시가2대비현재가|최고가시간|최고가대비현재가부호|최고가대비현재가|최저가시간|최저가대비현재가부호|최저가대비현재가|매수2비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|매도호가1|매수호가1|매도호가잔량1|매수호가잔량1|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율|실시간상한가|실시간하한가|실시간가격제한구분" 

457 keys = menulist.split('|') 

458 values = data_str.split('^') 

459 return dict(zip(keys, values[:len(keys)])) 

460 

461 def _parse_stock_futs_optn_exp_contract_data(self, data_str): 

462 """H0ZFANC0, H0ZOANC0 (주식선물/옵션 예상체결) 데이터를 파싱합니다.""" 

463 menulist = "선물단축종목코드|영업시간|예상체결가|예상체결대비|예상체결대비부호|예상체결전일대비율|예상장운영구분코드" 

464 keys = menulist.split('|') 

465 values = data_str.split('^') 

466 return dict(zip(keys, values[:len(keys)])) 

467 

468 def _parse_cmefuts_quote_data(self, data_str): 

469 """H0MFASP0 (야간선물(CME) 호가) 데이터를 파싱합니다.""" 

470 recvvalue = data_str.split('^') 

471 return { 

472 "종목코드": recvvalue[0], "영업시간": recvvalue[1], 

473 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5], 

474 "매도호가5": recvvalue[6], 

475 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10], 

476 "매수호가5": recvvalue[11], 

477 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15], 

478 "매도호가건수5": recvvalue[16], 

479 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20], 

480 "매수호가건수5": recvvalue[21], 

481 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25], 

482 "매도호가잔량5": recvvalue[26], 

483 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30], 

484 "매수호가잔량5": recvvalue[31], 

485 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33], 

486 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35], 

487 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37] 

488 } 

489 

490 def _parse_cmefuts_contract_data(self, data_str): 

491 """H0MFCNT0 (야간선물(CME) 체결) 데이터를 파싱합니다.""" 

492 menulist = "선물단축종목코드|영업시간|선물전일대비|전일대비부호|선물전일대비율|선물현재가|선물시가2|선물최고가|선물최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드1|HTS미결제약정수량|미결제약정수량증감|시가시간|시가2대비현재가부호|시가2대비현재가|최고가시간|최고가대비현재가부호|최고가대비현재가|최저가시간|최저가대비현재가부호|최저가대비현재가|매수2비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|선물매도호가1|선물매수호가1|매도호가잔량1|매수호가잔량1|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율" 

493 keys = menulist.split('|') 

494 values = data_str.split('^') 

495 return dict(zip(keys, values[:len(keys)])) 

496 

497 def _parse_eurex_optn_quote_data(self, data_str): 

498 """H0EUASP0 (야간옵션(EUREX) 호가) 데이터를 파싱합니다.""" 

499 recvvalue = data_str.split('^') 

500 return { 

501 "종목코드": recvvalue[0], "영업시간": recvvalue[1], 

502 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5], 

503 "매도호가5": recvvalue[6], 

504 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10], 

505 "매수호가5": recvvalue[11], 

506 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15], 

507 "매도호가건수5": recvvalue[16], 

508 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20], 

509 "매수호가건수5": recvvalue[21], 

510 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25], 

511 "매도호가잔량5": recvvalue[26], 

512 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30], 

513 "매수호가잔량5": recvvalue[31], 

514 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33], 

515 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35], 

516 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37] 

517 } 

518 

519 def _parse_eurex_optn_contract_data(self, data_str): 

520 """H0EUCNT0 (야간옵션(EUREX) 체결) 데이터를 파싱합니다.""" 

521 menulist = "옵션단축종목코드|영업시간|옵션현재가|전일대비부호|옵션전일대비|전일대비율|옵션시가2|옵션최고가|옵션최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|HTS미결제약정수량|미결제약정수량증감|시가시간|시가2대비현재가부호|시가대비지수현재가|최고가시간|최고가대비현재가부호|최고가대비지수현재가|최저가시간|최저가대비현재가부호|최저가대비지수현재가|매수2비율|프리미엄값|내재가치값|시간가치값|델타|감마|베가|세타|로우|HTS내재변동성|괴리도|미결제약정직전수량증감|이론베이시스|역사적변동성|체결강도|괴리율|시장베이시스|옵션매도호가1|옵션매수호가1|매도호가잔량1|매수호가잔량1|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율" 

522 keys = menulist.split('|') 

523 values = data_str.split('^') 

524 return dict(zip(keys, values[:len(keys)])) 

525 

526 def _parse_eurex_optn_exp_contract_data(self, data_str): 

527 """H0EUANC0 (야간옵션(EUREX) 예상체결) 데이터를 파싱합니다.""" 

528 menulist = "옵션단축종목코드|영업시간|예상체결가|예상체결대비|예상체결대비부호|예상체결전일대비율|예상장운영구분코드" 

529 keys = menulist.split('|') 

530 values = data_str.split('^') 

531 return dict(zip(keys, values[:len(keys)])) 

532 

533 def _parse_program_trading_data(self, data_str: str) -> dict: 

534 """H0STPGM0 (국내주식 실시간 프로그램매매) 데이터를 파싱합니다.""" 

535 menulist = "유가증권단축종목코드|주식체결시간|매도체결량|매도거래대금|매수2체결량|매수2거래대금|순매수체결량|순매수거래대금|매도호가잔량|매수호가잔량|전체순매수호가잔량" 

536 keys = menulist.split('|') 

537 values = data_str.split('^') 

538 return dict(zip(keys, values[:len(keys)])) 

539 

540 async def subscribe_program_trading(self, stock_code: str): 

541 """국내주식 실시간 프로그램매매 동향 (H0STPGM0) 구독.""" 

542 tr_id = self._env.active_config['tr_ids']['websocket'].get('realtime_program_trading', 'H0STPGM0') 

543 self._logger.info(f"[프로그램매매] 종목 {stock_code} 구독 요청 ({tr_id})...") 

544 return await self.send_realtime_request(tr_id, stock_code, tr_type="1") 

545 

546 async def unsubscribe_program_trading(self, stock_code: str): 

547 """국내주식 실시간 프로그램매매 동향 (H0STPGM0) 구독 해지.""" 

548 tr_id = self._env.active_config['tr_ids']['websocket'].get('realtime_program_trading', 'H0STPGM0') 

549 self._logger.info(f"[프로그램매매] 종목 {stock_code} 구독 해지 요청 ({tr_id})...") 

550 return await self.send_realtime_request(tr_id, stock_code, tr_type="2") 

551 

552 async def _resubscribe_all(self): 

553 """재연결 시 기존 구독 항목들을 다시 구독 요청합니다.""" 

554 for tr_id, tr_key in list(self._subscribed_items): 

555 self._logger.info(f"구독 복구 요청: TR_ID={tr_id}, KEY={tr_key}") 

556 # send_realtime_request 내부에서 _subscribed_items에 다시 추가하므로 중복 방지 로직 필요 없음 (Set이므로) 

557 await self.send_realtime_request(tr_id, tr_key, tr_type="1") 

558 

559 def is_receive_alive(self) -> bool: 

560 """수신 태스크가 살아있는지 확인 (외부 워치독용).""" 

561 return (self._receive_task is not None 

562 and not self._receive_task.done() 

563 and self._auto_reconnect) 

564 

565 # --- 웹소켓 연결 및 해지 --- 

566 async def connect(self, on_message_callback=None): 

567 """웹소켓 연결을 시작하고 실시간 데이터 수신을 준비합니다.""" 

568 self._websocket_url = self._env.get_websocket_url() 

569 if self.ws and self._is_connected: 

570 self._logger.info("웹소켓이 이미 연결되어 있습니다.") 

571 return True 

572 

573 self.on_realtime_message_callback = on_message_callback # 외부 콜백 등록 

574 self._auto_reconnect = True # 자동 재연결 활성화 

575 

576 if await self._establish_connection(): 

577 # 메시지 수신 태스크 시작 (이미 실행 중이면 건너뜀) 

578 if self._receive_task and not self._receive_task.done(): 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true

579 self._receive_task.cancel() 

580 try: 

581 await self._receive_task 

582 except asyncio.CancelledError: 

583 pass 

584 self._receive_task = asyncio.create_task(self._receive_messages()) 

585 return True 

586 return False 

587 

588 async def disconnect(self): 

589 """웹소켓 연결을 종료합니다.""" 

590 self._auto_reconnect = False # 수동 종료 시 재연결 비활성화 

591 if self._is_connected and self.ws: 

592 self._logger.info("웹소켓 연결 종료 요청.") 

593 await self.ws.close() 

594 self._is_connected = False 

595 if self._receive_task: 

596 self._receive_task.cancel() 

597 try: 

598 await self._receive_task 

599 except asyncio.CancelledError: 

600 self._logger.info("웹소켓 수신 태스크 취소됨.") 

601 except Exception as e: 

602 self._logger.error(f"웹소켓 수신 태스크 종료 중 오류: {e}", exc_info=True) 

603 

604 self._logger.info("웹소켓 연결 종료 완료.") 

605 self._is_connected = False 

606 self.ws = None 

607 else: 

608 self._logger.info("웹소켓이 연결되어 있지 않습니다.") 

609 if self._receive_task: 

610 self._receive_task.cancel() 

611 try: 

612 await self._receive_task 

613 except asyncio.CancelledError: 

614 self._logger.info("웹소켓 수신 태스크 취소됨.") 

615 except Exception as e: 

616 self._logger.error(f"웹소켓 수신 태스크 종료 중 오류: {e}") 

617 self._is_connected = False 

618 self.ws = None 

619 

620 # --- 실시간 요청 전송 --- 

621 async def send_realtime_request(self, tr_id, tr_key, tr_type="1"): 

622 """ 

623 실시간 데이터 구독/해지 요청 메시지를 웹소켓으로 전송합니다. 

624 :param tr_id: 실시간 TR ID 

625 :param tr_key: 구독할 종목코드 또는 HTS ID (체결통보용) 

626 :param tr_type: 1: 등록, 2: 해지 

627 """ 

628 self._websocket_url = self._env.active_config['websocket_url'] 

629 self._base_rest_url = self._env.active_config['base_url'] 

630 self._rest_api_key= self._env.active_config['api_key'] 

631 self._rest_api_secret= self._env.active_config['api_secret_key'] 

632 

633 if not self._is_connected or not self.ws: 

634 self._logger.error("웹소켓이 연결되어 있지 않아 실시간 요청을 보낼 수 없습니다.") 

635 return False 

636 if not self.approval_key: 

637 self._logger.error("approval_key가 없어 실시간 요청을 보낼 수 없습니다.") 

638 return False 

639 

640 header = { 

641 "approval_key": self.approval_key, 

642 "custtype": self._env.active_config['custtype'], 

643 "tr_type": tr_type, 

644 "content-type": "utf-8", 

645 } 

646 body = { 

647 "input": { 

648 "tr_id": tr_id, 

649 "tr_key": tr_key, 

650 } 

651 } 

652 

653 request_message = {"header": header, "body": body} 

654 message_json = _dumps(request_message) 

655 

656 self._logger.info(f"실시간 요청 전송: TR_ID={tr_id}, TR_KEY={tr_key}, TYPE={tr_type}") 

657 try: 

658 await self.ws.send(message_json) 

659 

660 # 구독 목록 관리 

661 if tr_type == "1": 661 ↛ 663line 661 didn't jump to line 663 because the condition on line 661 was always true

662 self._subscribed_items.add((tr_id, tr_key)) 

663 elif tr_type == "2": 

664 self._subscribed_items.discard((tr_id, tr_key)) 

665 return True 

666 except Exception as e: 

667 self._logger.exception(f"실시간 요청 전송 중 오류 발생: {e}") 

668 self._is_connected = False 

669 self.ws = None 

670 return False 

671 

672 async def subscribe_realtime_price(self, stock_code): 

673 """실시간 주식체결 데이터(현재가)를 구독합니다.""" 

674 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_price'] 

675 self._logger.info(f"종목 {stock_code} 실시간 체결 데이터 구독 요청 ({tr_id})...") 

676 return await self.send_realtime_request(tr_id, stock_code, tr_type="1") 

677 

678 async def unsubscribe_realtime_price(self, stock_code): 

679 """실시간 주식체결 데이터(현재가) 구독을 해지합니다.""" 

680 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_price'] 

681 self._logger.info(f"종목 {stock_code} 실시간 체결 데이터 구독 해지 요청 ({tr_id})...") 

682 return await self.send_realtime_request(tr_id, stock_code, tr_type="2") 

683 

684 async def subscribe_unified_price(self, stock_code: str) -> bool: 

685 """실시간 통합 체결가(H0UNCNT0)를 구독합니다. KRX+NXT 통합.""" 

686 tr_id = self._env.active_config['tr_ids']['websocket'].get('unified_realtime_price', 'H0UNCNT0') 

687 self._logger.info(f"종목 {stock_code} 통합 체결가 구독 요청 ({tr_id})...") 

688 return await self.send_realtime_request(tr_id, stock_code, tr_type="1") 

689 

690 async def unsubscribe_unified_price(self, stock_code: str) -> bool: 

691 """실시간 통합 체결가(H0UNCNT0) 구독을 해지합니다.""" 

692 tr_id = self._env.active_config['tr_ids']['websocket'].get('unified_realtime_price', 'H0UNCNT0') 

693 self._logger.info(f"종목 {stock_code} 통합 체결가 구독 해지 ({tr_id})...") 

694 return await self.send_realtime_request(tr_id, stock_code, tr_type="2") 

695 

696 async def subscribe_realtime_quote(self, stock_code): 

697 """실시간 주식호가 데이터를 구독합니다.""" 

698 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_quote'] 

699 self._logger.info(f"종목 {stock_code} 실시간 호가 데이터 구독 요청 ({tr_id})...") 

700 return await self.send_realtime_request(tr_id, stock_code, tr_type="1") 

701 

702 async def unsubscribe_realtime_quote(self, stock_code): 

703 """실시간 주식호가 데이터 구독을 해지합니다.""" 

704 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_quote'] 

705 self._logger.info(f"종목 {stock_code} 실시간 호가 데이터 구독 해지 요청 ({tr_id})...") 

706 return await self.send_realtime_request(tr_id, stock_code, tr_type="2") 

707 

708 # For test only 

709 async def _on_receive(self, message): 

710 try: 

711 parsed = _loads(message) # <-- JSON 문자열을 dict로 파싱 

712 if self.on_realtime_message_callback: 

713 await self.on_realtime_message_callback(parsed) 

714 else: 

715 self._logger.warning("수신된 메시지를 처리할 콜백이 등록되지 않았습니다.") 

716 except Exception as e: 

717 self._logger.exception(f"수신 메시지 처리 중 예외 발생: {e}")