Coverage for brokers / korea_investment / korea_invest_websocket_api.py: 89%
416 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# api/korea_invest_websocket_api.py
2from typing import Optional
3import websockets # pip install websockets
4import json
5try:
6 import orjson as _orjson
7 def _loads(s): return _orjson.loads(s)
8 def _dumps(obj) -> str: return _orjson.dumps(obj).decode()
9except ImportError:
10 _orjson = None
11 _loads = json.loads
12 _dumps = json.dumps
13import logging
14import requests
15import certifi # requests의 SSL 인증서 검증에 필요
16import asyncio # 비동기 처리를 위해 필요
17import os # os.urandom (gt_uid 생성용)
19from Crypto.Cipher import AES # pip install pycryptodome
20from Crypto.Util.Padding import unpad
21from base64 import b64decode
23from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv # KoreaInvestEnv 클래스 임포트
24from core.market_clock import MarketClock
25from services.market_calendar_service import MarketCalendarService
28class KoreaInvestWebSocketAPI:
29 """
30 한국투자증권 Open API의 웹소켓 연결 및 실시간 데이터 수신을 관리하는 클래스입니다.
31 `websockets` 라이브러리(asyncio 기반)를 사용하며, 다양한 실시간 데이터 파싱을 포함합니다.
32 """
34 def __init__(self, env: KoreaInvestApiEnv, logger=None, market_clock: MarketClock = None,
35 market_calendar_service: Optional[MarketCalendarService] = None):
36 self._env = env
37 self._market_clock = market_clock
38 self._mcs = market_calendar_service
39 self._logger = logger if logger else logging.getLogger(__name__)
40 # self._config = self._env.get_full_config() # 환경 설정 전체를 가져옴 (tr_ids 포함)
41 # config에서 웹소켓 및 REST API 정보 가져오기
42 # self._websocket_url = self._config['websocket_url']
43 # self._rest_api_key = self._config['api_key']
44 # self._rest_api_secret = self._config['api_secret_key']
45 # self._base_rest_url = self._config['base_url']
46 self._websocket_url = None
47 self._rest_api_key = None
48 self._rest_api_secret = None
49 self._base_rest_url = None
51 self.ws = None # 웹소켓 연결 객체 (websockets.WebSocketClientProtocol)
52 self.approval_key = None # 웹소켓 접속 키 (REST API로 발급)
53 self._is_connected = False # 웹소켓 연결 상태 플래그
54 self._auto_reconnect = False # 자동 재연결 활성화 플래그
55 self._receive_task = None # 메시지 수신을 위한 asyncio.Task
57 # 실시간 메시지 수신 시 외부에서 등록할 콜백 함수 (TradingService의 핸들러)
58 self.on_realtime_message_callback = None
60 # 암호화된 체결 통보 메시지 복호화를 위한 AES 키/IV
61 # H0IFCNI0, H0STCNI0, H0MFCNI0, H0EUCNI0, H0STCNI9 등 통보 TR_ID 구독 시 서버로부터 수신
62 self._aes_key = None
63 self._aes_iv = None
65 # 재연결 시 복구를 위한 구독 목록 저장소 set((tr_id, tr_key))
66 self._subscribed_items = set()
68 def _aes_cbc_base64_dec(self, key, iv, cipher_text):
69 """
70 AES256 DECODE (Base64 인코딩된 암호문을 복호화)
71 :param key: AES256 Secret Key (str)
72 :param iv: AES256 Initialize Vector (str)
73 :param cipher_text: Base64 인코딩된 AES256 암호문 (str)
74 :return: 복호화된 문자열 (str)
75 """
76 try:
77 cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv.encode('utf-8'))
78 return bytes.decode(unpad(cipher.decrypt(b64decode(cipher_text)), AES.block_size))
79 except Exception as e:
80 self._logger.exception(f"AES 복호화 오류 발생: {e} (key: {key[:5]}..., iv: {iv[:5]}..., cipher: {cipher_text[:50]}...)")
82 return None
84 async def _get_approval_key(self):
85 """
86 웹소켓 접속 키(approval_key)를 한국투자증권 REST API를 통해 발급받습니다.
87 """
88 self._websocket_url = self._env.active_config['websocket_url']
89 self._base_rest_url = self._env.active_config['base_url']
90 self._rest_api_key= self._env.active_config['api_key']
91 self._rest_api_secret= self._env.active_config['api_secret_key']
93 path = "/oauth2/Approval"
94 url = f"{self._base_rest_url}{path}"
95 headers = {"content-type": "application/json; utf-8"}
96 body = {
97 "grant_type": "client_credentials",
98 "appkey": self._rest_api_key,
99 "secretkey": self._rest_api_secret # 웹소켓 접속키 발급 시 secretkey 필드명 사용
100 }
102 self._logger.info("웹소켓 접속키 발급 시도...")
103 try:
104 # requests는 동기 함수이므로 asyncio의 loop.run_in_executor를 사용하여 비동기적으로 실행
105 loop = asyncio.get_running_loop()
106 res = await loop.run_in_executor(
107 None, # 기본 ThreadPoolExecutor 사용
108 lambda: requests.post(url, headers=headers, data=json.dumps(body), verify=certifi.where())
109 )
111 res.raise_for_status() # HTTP 오류(4xx, 5xx) 발생 시 예외 발생
112 auth_data = res.json()
114 if auth_data and auth_data.get('approval_key'):
115 self.approval_key = auth_data['approval_key']
116 self._logger.info(f"웹소켓 접속키 발급 성공: {self.approval_key[:10]}...") # 키의 일부만 로깅
117 return self.approval_key
118 else:
119 self._logger.exception(f"웹소켓 접속키 발급 실패 - 응답 데이터 오류: {auth_data}")
120 return None
121 except requests.exceptions.RequestException as e:
122 self._logger.exception(f"웹소켓 접속키 발급 중 네트워크 오류: {e}")
123 return None
124 except json.JSONDecodeError:
125 self._logger.exception(f"웹소켓 접속키 발급 응답 JSON 디코딩 실패: {res.text if res else '응답 없음'}")
126 return None
127 except Exception as e:
128 self._logger.exception(f"웹소켓 접속키 발급 중 알 수 없는 오류: {e}")
129 return None
131 async def _establish_connection(self):
132 """웹소켓 연결을 수립하는 내부 메서드 (재연결 로직에서 재사용)."""
133 self._websocket_url = self._env.get_websocket_url()
135 # 1. approval_key 발급 (없으면 발급)
136 if not self.approval_key:
137 self.approval_key = await self._get_approval_key()
138 if not self.approval_key:
139 self._logger.error("웹소켓 접속 키 발급 실패로 연결할 수 없습니다.")
140 return False
142 # 2. 웹소켓 연결
143 try:
144 self._logger.info(f"웹소켓 연결 시도: {self._websocket_url}")
145 self.ws = await websockets.connect(self._websocket_url, ping_interval=20, ping_timeout=20)
146 self._is_connected = True
147 self._logger.info("웹소켓 연결 성공.")
148 return True
149 except Exception as e:
150 self._logger.exception(f"웹소켓 연결 중 오류 발생: {e}")
151 self._is_connected = False
152 self.ws = None
153 return False
155 async def _receive_messages(self):
156 """웹소켓 메시지 수신 및 자동 재연결 루프."""
157 retry_count = 0
158 max_retries = 30 # 최대 재시도 횟수 (약 30분간 시도)
159 base_delay = 3 # 기본 대기 시간 (초)
160 max_delay = 60 # 최대 대기 시간 (초)
161 DATA_TIMEOUT = 60.0 # [추가] 데이터 수신 타임아웃 (초)
163 while self._auto_reconnect:
164 # 1. 연결이 끊겨있다면 재연결 시도
165 if not self._is_connected:
166 # 장 운영 시간 확인 (MarketCalendarService가 있을 경우)
167 if self._mcs and not await self._mcs.is_market_open_now():
168 self._logger.info("장이 종료되어 자동 재연결을 중단합니다.")
169 self._auto_reconnect = False
170 break
172 if retry_count >= max_retries:
173 self._logger.error(f"웹소켓 재연결 실패: 최대 재시도 횟수({max_retries})를 초과했습니다.")
174 self._auto_reconnect = False
175 break
177 delay = min(max_delay, base_delay * (2 ** retry_count))
178 self._logger.info(f"웹소켓 재연결 대기 중 ({delay}초)... (시도 {retry_count + 1}/{max_retries})")
179 await asyncio.sleep(delay)
181 retry_count += 1
183 if await self._establish_connection():
184 self._logger.info("웹소켓 재연결 성공. 기존 구독 항목을 복구합니다.")
185 retry_count = 0 # 연결 성공 시 재시도 카운트 초기화
186 try:
187 await self._resubscribe_all()
188 except Exception as e:
189 self._logger.error(f"구독 복구 중 오류 발생: {e}")
190 continue
192 # 2. 메시지 수신
193 try:
194 # [수정] 타임아웃 적용: 일정 시간 데이터가 없으면 Dead Connection으로 간주하고 재연결
195 message = await asyncio.wait_for(self.ws.recv(), timeout=DATA_TIMEOUT)
196 self._handle_websocket_message(message)
197 except asyncio.TimeoutError:
198 # 장 운영 시간 중에만 재연결 시도 (장 마감 후에는 데이터가 없는 것이 정상이므로 무시)
199 if self._mcs and not await self._mcs.is_market_open_now():
200 self._logger.warning(f"{DATA_TIMEOUT}초간 데이터 수신 없음 (Dead Connection 의심). 재연결을 시도합니다.")
201 self._is_connected = False
202 if self.ws:
203 await self.ws.close()
204 self.ws = None
205 self.approval_key = None # 재연결 시 새로운 접속키 발급 강제
206 # 장 마감 후에는 타임아웃 발생해도 연결 유지 (Ping/Pong은 내부적으로 처리됨)
207 except Exception as e:
208 if self._auto_reconnect: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was always true
209 self._logger.warning(f"웹소켓 연결 끊김 ({e}). 재연결을 시도합니다.", exc_info=True)
210 self._is_connected = False
211 self.ws = None
212 self.approval_key = None # 재연결 시 새로운 접속키 발급 강제
214 def _handle_websocket_message(self, message: str):
215 """수신된 웹소켓 메시지를 파싱하고 등록된 콜백으로 전달."""
216 self._websocket_url = self._env.active_config['websocket_url']
217 self._base_rest_url = self._env.active_config['base_url']
218 self._rest_api_key= self._env.active_config['api_key']
219 self._rest_api_secret= self._env.active_config['api_secret_key']
221 # 한국투자증권 실시간 데이터는 '|'로 구분된 문자열 또는 JSON 객체로 수신됨
222 if message and (message.startswith('0|') or message.startswith('1|')): # 실시간 데이터 (0: 일반, 1: 체결통보)
223 recvstr = message.split('|')
224 tr_id = recvstr[1] # 두 번째 요소가 TR_ID
225 data_body = recvstr[3] # 네 번째 요소가 실제 데이터 본문
227 self._logger.debug(f"받은 TR_ID: {tr_id}")
228 self._logger.debug(f"비교 대상: {self._env.active_config['tr_ids']['websocket']['realtime_price']}")
230 parsed_data = {}
231 message_type = 'unknown'
233 # --- 주식 관련 실시간 데이터 파싱 ---
234 if tr_id == self._env.active_config['tr_ids']['websocket']['realtime_price']: # H0STCNT0 (주식 체결)
235 parsed_data = self._parse_stock_contract_data(data_body)
236 message_type = 'realtime_price'
237 elif tr_id == self._env.active_config['tr_ids']['websocket'].get('unified_realtime_price', 'H0UNCNT0'): # H0UNCNT0 (KRX+NXT 통합 체결) 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 parsed_data = self._parse_stock_contract_data(data_body) # H0STCNT0와 동일 포맷
239 message_type = 'realtime_price'
240 elif tr_id == self._env.active_config['tr_ids']['websocket']['realtime_quote']: # H0STASP0 (주식 호가)
241 parsed_data = self._parse_stock_quote_data(data_body)
242 message_type = 'realtime_quote'
244 # --- 파생상품 및 기타 실시간 데이터 파싱 (제공된 예제 코드 기반) ---
245 # 각 TR_ID에 따라 정확한 파싱 함수 호출
246 elif tr_id == "H0IFASP0" or tr_id == "H0IOASP0": # 지수선물/옵션 호가
247 parsed_data = self._parse_futs_optn_quote_data(data_body)
248 message_type = 'realtime_futs_optn_quote'
249 elif tr_id == "H0IFCNT0" or tr_id == "H0IOCNT0": # 지수선물/옵션 체결
250 parsed_data = self._parse_futs_optn_contract_data(data_body)
251 message_type = 'realtime_futs_optn_contract'
252 elif tr_id == "H0CFASP0": # 상품선물 호가
253 parsed_data = self._parse_product_futs_quote_data(data_body)
254 message_type = 'realtime_product_futs_quote'
255 elif tr_id == "H0CFCNT0": # 상품선물 체결
256 parsed_data = self._parse_product_futs_contract_data(data_body)
257 message_type = 'realtime_product_futs_contract'
258 elif tr_id == "H0ZFASP0" or tr_id == "H0ZOASP0": # 주식선물/옵션 호가
259 parsed_data = self._parse_stock_futs_optn_quote_data(data_body)
260 message_type = 'realtime_stock_futs_optn_quote'
261 elif tr_id == "H0ZFCNT0" or tr_id == "H0ZOCNT0": # 주식선물/옵션 체결
262 parsed_data = self._parse_stock_futs_optn_contract_data(data_body)
263 message_type = 'realtime_stock_futs_optn_contract'
264 elif tr_id == "H0ZFANC0" or tr_id == "H0ZOANC0": # 주식선물/옵션 예상체결
265 parsed_data = self._parse_stock_futs_optn_exp_contract_data(data_body)
266 message_type = 'realtime_stock_futs_optn_exp_contract'
267 elif tr_id == "H0MFASP0": # 야간선물(CME) 호가
268 parsed_data = self._parse_cmefuts_quote_data(data_body)
269 message_type = 'realtime_cmefuts_quote'
270 elif tr_id == "H0MFCNT0": # 야간선물(CME) 체결
271 parsed_data = self._parse_cmefuts_contract_data(data_body)
272 message_type = 'realtime_cmefuts_contract'
273 elif tr_id == "H0EUASP0": # 야간옵션(EUREX) 호가
274 parsed_data = self._parse_eurex_optn_quote_data(data_body)
275 message_type = 'realtime_eurex_optn_quote'
276 elif tr_id == "H0EUCNT0": # 야간옵션(EUREX) 체결
277 parsed_data = self._parse_eurex_optn_contract_data(data_body)
278 message_type = 'realtime_eurex_optn_contract'
279 elif tr_id == "H0EUANC0": # 야간옵션(EUREX) 예상체결
280 parsed_data = self._parse_eurex_optn_exp_contract_data(data_body)
281 message_type = 'realtime_eurex_optn_exp_contract'
283 # --- 체결/주문 통보 (암호화됨) ---
284 elif tr_id in ["H0STCNI0", "H0STCNI9", "H0IFCNI0", "H0MFCNI0", "H0EUCNI0"]: # 모든 체결 통보 TR_ID 284 ↛ 297line 284 didn't jump to line 297 because the condition on line 284 was always true
285 if self._aes_key and self._aes_iv:
286 decrypted_str = self._aes_cbc_base64_dec(self._aes_key, self._aes_iv, data_body)
287 if decrypted_str:
288 parsed_data = self._parse_signing_notice(decrypted_str, tr_id)
289 message_type = 'signing_notice'
290 else:
291 self._logger.exception(f"체결통보 복호화 실패: {tr_id}, 데이터: {data_body[:50]}...")
292 return
293 else:
294 self._logger.warning(f"체결통보 암호화 해제 실패: AES 키/IV 없음. TR_ID: {tr_id}, 메시지: {message[:50]}...")
295 return
297 elif tr_id == self._env.active_config['tr_ids']['websocket'].get('realtime_program_trading', 'H0STPGM0'):
298 parsed_data = self._parse_program_trading_data(data_body)
299 message_type = 'realtime_program_trading'
301 # [추가] 파싱된 데이터 디버그 로그 (데이터 내용 확인용)
302 self._logger.debug(f"WS 수신 데이터 파싱: Type={message_type}, TR_ID={tr_id}, Data={parsed_data}")
304 # 외부 콜백 함수로 파싱된 데이터 전달
305 if self.on_realtime_message_callback: 305 ↛ exitline 305 didn't return from function '_handle_websocket_message' because the condition on line 305 was always true
306 self.on_realtime_message_callback({'type': message_type, 'tr_id': tr_id, 'data': parsed_data})
308 else: # 제어 메시지 (응답, PINGPONG 등)
309 try:
310 json_object = _loads(message)
311 header = json_object.get("header", {})
312 tr_id = header.get("tr_id")
314 if tr_id == "PINGPONG":
315 self._logger.debug("PINGPONG 수신됨. PONG 응답.")
316 # websockets 라이브러리 내부에서 PONG 응답 자동 처리 (ping_interval, ping_timeout 설정 시)
317 elif json_object.get("body", {}).get("rt_cd") == '0':
318 self._logger.info(f"실시간 요청 응답 성공: TR_KEY={header.get('tr_key')}, MSG={json_object['body']['msg1']}")
319 # 체결통보용 AES KEY, IV 수신 처리
320 if tr_id in ["H0IFCNI0", "H0STCNI0", "H0STCNI9", "H0MFCNI0", "H0EUCNI0"] and json_object.get("body",
321 {}).get(
322 "output"):
323 self._aes_key = json_object["body"]["output"].get("key")
324 self._aes_iv = json_object["body"]["output"].get("iv")
325 self._logger.info(f"체결통보용 AES KEY/IV 수신 성공. TRID={tr_id}")
326 else:
327 self._logger.error(
328 f"실시간 요청 응답 오류: TR_KEY={header.get('tr_key')}, RT_CD={json_object.get('body', {}).get('rt_cd')}, MSG={json_object.get('body', {}).get('msg1')}")
329 if json_object.get("body", {}).get("msg1") == 'ALREADY IN SUBSCRIBE':
330 self._logger.warning("이미 구독 중인 종목입니다.")
331 except json.JSONDecodeError:
332 self._logger.exception(f"제어 메시지 JSON 디코딩 실패: {message}")
333 except Exception as e:
334 self._logger.exception(f"제어 메시지 처리 중 오류 발생: {e}, 메시지: {message}")
336 # --- 실시간 데이터 파싱 헬퍼 함수들 ---
338 def _parse_stock_quote_data(self, data_str):
339 """H0STASP0 (주식 호가) 데이터를 파싱합니다."""
340 recvvalue = data_str.split('^')
341 # API 문서: 주식호가 (H0STASP0) 필드명과 인덱스 참고
342 return {
343 "유가증권단축종목코드": recvvalue[0], "영업시간": recvvalue[1], "시간구분코드": recvvalue[2],
344 "매도호가1": recvvalue[3], "매도호가2": recvvalue[4], "매도호가3": recvvalue[5], "매도호가4": recvvalue[6],
345 "매도호가5": recvvalue[7],
346 "매도호가6": recvvalue[8], "매도호가7": recvvalue[9], "매도호가8": recvvalue[10], "매도호가9": recvvalue[11],
347 "매도호가10": recvvalue[12],
348 "매수호가1": recvvalue[13], "매수호가2": recvvalue[14], "매수호가3": recvvalue[15], "매수호가4": recvvalue[16],
349 "매수호가5": recvvalue[17],
350 "매수호가6": recvvalue[18], "매수호가7": recvvalue[19], "매수호가8": recvvalue[20], "매수호가9": recvvalue[21],
351 "매수호가10": recvvalue[22],
352 "매도호가잔량1": recvvalue[23], "매도호가잔량2": recvvalue[24], "매도호가잔량3": recvvalue[25], "매도호가잔량4": recvvalue[26],
353 "매도호가잔량5": recvvalue[27],
354 "매도호가잔량6": recvvalue[28], "매도호가잔량7": recvvalue[29], "매도호가잔량8": recvvalue[30], "매도호가잔량9": recvvalue[31],
355 "매도호가잔량10": recvvalue[32],
356 "매수호가잔량1": recvvalue[33], "매수호가잔량2": recvvalue[34], "매수호가잔량3": recvvalue[35], "매수호가잔량4": recvvalue[36],
357 "매수호가잔량5": recvvalue[37],
358 "매수호가잔량6": recvvalue[38], "매수호가잔량7": recvvalue[39], "매수호가잔량8": recvvalue[40], "매수호가잔량9": recvvalue[41],
359 "매수호가잔량10": recvvalue[42],
360 "총매도호가잔량": recvvalue[43], "총매수호가잔량": recvvalue[44], "시간외총매도호가잔량": recvvalue[45],
361 "시간외총매수호가잔량": recvvalue[46],
362 "예상체결가": recvvalue[47], "예상체결량": recvvalue[48], "예상거래량": recvvalue[49], "예상체결대비": recvvalue[50],
363 "부호": recvvalue[51],
364 "예상체결전일대비율": recvvalue[52], "누적거래량": recvvalue[53], "주식매매구분코드": recvvalue[58]
365 }
367 def _parse_stock_contract_data(self, data_str):
368 """H0STCNT0 (주식 체결) 데이터를 파싱합니다."""
369 menulist = "유가증권단축종목코드|주식체결시간|주식현재가|전일대비부호|전일대비|전일대비율|가중평균주식가격|주식시가|주식최고가|주식최저가|매도호가1|매수호가1|체결거래량|누적거래량|누적거래대금|매도체결건수|매수체결건수|순매수체결건수|체결강도|총매도수량|총매수수량|체결구분|매수비율|전일거래량대비등락율|시가시간|시가대비구분|시가대비|최고가시간|고가대비구분|고가대비|최저가시간|저가대비구분|저가대비|영업일자|신장운영구분코드|거래정지여부|매도호가잔량|매수호가잔량|총매도호가잔량|총매수호가잔량|거래량회전율|전일동시간누적거래량|전일동시간누적거래량비율|시간구분코드|임의종료구분코드|정적VI발동기준가"
370 keys = menulist.split('|')
371 values = data_str.split('^')
372 return dict(zip(keys, values[:len(keys)]))
374 def _parse_futs_optn_quote_data(self, data_str):
375 """H0IFASP0, H0IOASP0 (지수선물/옵션 호가) 데이터를 파싱합니다."""
376 recvvalue = data_str.split('^')
377 return {
378 "종목코드": recvvalue[0], "영업시간": recvvalue[1],
379 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5],
380 "매도호가5": recvvalue[6],
381 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10],
382 "매수호가5": recvvalue[11],
383 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15],
384 "매도호가건수5": recvvalue[16],
385 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20],
386 "매수호가건수5": recvvalue[21],
387 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25],
388 "매도호가잔량5": recvvalue[26],
389 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30],
390 "매수호가잔량5": recvvalue[31],
391 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33],
392 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35],
393 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37]
394 }
396 def _parse_futs_optn_contract_data(self, data_str):
397 """H0IFCNT0, H0IOCNT0 (지수선물/옵션 체결) 데이터를 파싱합니다."""
398 menulist = "선물단축종목코드|영업시간|선물전일대비|전일대비부호|선물전일대비율|선물현재가|선물시가|선물최고가|선물최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드|미결제약정수량|미결제약정수량증감|시가시간|시가대비현재가부호|시가대비지수현재가|최고가시간|최고가대비현재가부호|최고가대비지수현재가|최저가시간|최저가대비현재가부호|최저가대비지수현재가|매수비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|선물매도호가|선물매수호가|매도호가잔량|매수호가잔량|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율|협의대량거래량|실시간상한가|실시간하한가|실시간가격제한구분"
399 keys = menulist.split('|')
400 values = data_str.split('^')
401 return dict(zip(keys, values[:len(keys)]))
403 def _parse_product_futs_quote_data(self, data_str):
404 """H0CFASP0 (상품선물 호가) 데이터를 파싱합니다."""
405 recvvalue = data_str.split('^')
406 return {
407 "종목코드": recvvalue[0], "영업시간": recvvalue[1],
408 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5],
409 "매도호가5": recvvalue[6],
410 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10],
411 "매수호가5": recvvalue[11],
412 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15],
413 "매도호가건수5": recvvalue[16],
414 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20],
415 "매수호가건수5": recvvalue[21],
416 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25],
417 "매도호가잔량5": recvvalue[26],
418 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30],
419 "매수호가잔량5": recvvalue[31],
420 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33],
421 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35],
422 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37]
423 }
425 def _parse_product_futs_contract_data(self, data_str):
426 """H0CFCNT0 (상품선물 체결) 데이터를 파싱합니다."""
427 menulist = "선물단축종목코드|영업시간|선물전일대비|전일대비부호|선물전일대비율|선물현재가|선물시가|선물최고가|선물최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드|미결제약정수량|미결제약정수량증감|시가시간|시가대비현재가부호|시가대비지수현재가|최고가시간|최고가대비현재가부호|최고가대비지수현재가|최저가시간|최저가대비현재가부호|최저가대비지수현재가|매수비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|선물매도호가|선물매수호가|매도호가잔량|매수호가잔량|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율|협의대량거래량|실시간상한가|실시간하한가|실시간가격제한구분"
428 keys = menulist.split('|')
429 values = data_str.split('^')
430 return dict(zip(keys, values[:len(keys)]))
432 def _parse_stock_futs_optn_quote_data(self, data_str):
433 """H0ZFASP0, H0ZOASP0 (주식선물/옵션 호가) 데이터를 파싱합니다."""
434 recvvalue = data_str.split('^')
435 return {
436 "종목코드": recvvalue[0], "영업시간": recvvalue[1],
437 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5],
438 "매도호가5": recvvalue[6],
439 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10],
440 "매수호가5": recvvalue[11],
441 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15],
442 "매도호가건수5": recvvalue[16],
443 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20],
444 "매수호가건수5": recvvalue[21],
445 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25],
446 "매도호가잔량5": recvvalue[26],
447 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30],
448 "매수호가잔량5": recvvalue[31],
449 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33],
450 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35],
451 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37]
452 }
454 def _parse_stock_futs_optn_contract_data(self, data_str):
455 """H0ZFCNT0, H0ZOCNT0 (주식선물/옵션 체결) 데이터를 파싱합니다."""
456 menulist = "선물단축종목코드|영업시간|주식현재가|전일대비부호|전일대비|선물전일대비율|주식시가2|주식최고가|주식최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드1|HTS미결제약정수량|미결제약정수량증감|시가시간|시가2대비현재가부호|시가2대비현재가|최고가시간|최고가대비현재가부호|최고가대비현재가|최저가시간|최저가대비현재가부호|최저가대비현재가|매수2비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|매도호가1|매수호가1|매도호가잔량1|매수호가잔량1|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율|실시간상한가|실시간하한가|실시간가격제한구분"
457 keys = menulist.split('|')
458 values = data_str.split('^')
459 return dict(zip(keys, values[:len(keys)]))
461 def _parse_stock_futs_optn_exp_contract_data(self, data_str):
462 """H0ZFANC0, H0ZOANC0 (주식선물/옵션 예상체결) 데이터를 파싱합니다."""
463 menulist = "선물단축종목코드|영업시간|예상체결가|예상체결대비|예상체결대비부호|예상체결전일대비율|예상장운영구분코드"
464 keys = menulist.split('|')
465 values = data_str.split('^')
466 return dict(zip(keys, values[:len(keys)]))
468 def _parse_cmefuts_quote_data(self, data_str):
469 """H0MFASP0 (야간선물(CME) 호가) 데이터를 파싱합니다."""
470 recvvalue = data_str.split('^')
471 return {
472 "종목코드": recvvalue[0], "영업시간": recvvalue[1],
473 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5],
474 "매도호가5": recvvalue[6],
475 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10],
476 "매수호가5": recvvalue[11],
477 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15],
478 "매도호가건수5": recvvalue[16],
479 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20],
480 "매수호가건수5": recvvalue[21],
481 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25],
482 "매도호가잔량5": recvvalue[26],
483 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30],
484 "매수호가잔량5": recvvalue[31],
485 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33],
486 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35],
487 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37]
488 }
490 def _parse_cmefuts_contract_data(self, data_str):
491 """H0MFCNT0 (야간선물(CME) 체결) 데이터를 파싱합니다."""
492 menulist = "선물단축종목코드|영업시간|선물전일대비|전일대비부호|선물전일대비율|선물현재가|선물시가2|선물최고가|선물최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|시장베이시스|괴리율|근월물약정가|원월물약정가|스프레드1|HTS미결제약정수량|미결제약정수량증감|시가시간|시가2대비현재가부호|시가2대비현재가|최고가시간|최고가대비현재가부호|최고가대비현재가|최저가시간|최저가대비현재가부호|최저가대비현재가|매수2비율|체결강도|괴리도|미결제약정직전수량증감|이론베이시스|선물매도호가1|선물매수호가1|매도호가잔량1|매수호가잔량1|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율"
493 keys = menulist.split('|')
494 values = data_str.split('^')
495 return dict(zip(keys, values[:len(keys)]))
497 def _parse_eurex_optn_quote_data(self, data_str):
498 """H0EUASP0 (야간옵션(EUREX) 호가) 데이터를 파싱합니다."""
499 recvvalue = data_str.split('^')
500 return {
501 "종목코드": recvvalue[0], "영업시간": recvvalue[1],
502 "매도호가1": recvvalue[2], "매도호가2": recvvalue[3], "매도호가3": recvvalue[4], "매도호가4": recvvalue[5],
503 "매도호가5": recvvalue[6],
504 "매수호가1": recvvalue[7], "매수호가2": recvvalue[8], "매수호가3": recvvalue[9], "매수호가4": recvvalue[10],
505 "매수호가5": recvvalue[11],
506 "매도호가건수1": recvvalue[12], "매도호가건수2": recvvalue[13], "매도호가건수3": recvvalue[14], "매도호가건수4": recvvalue[15],
507 "매도호가건수5": recvvalue[16],
508 "매수호가건수1": recvvalue[17], "매수호가건수2": recvvalue[18], "매수호가건수3": recvvalue[19], "매수호가건수4": recvvalue[20],
509 "매수호가건수5": recvvalue[21],
510 "매도호가잔량1": recvvalue[22], "매도호가잔량2": recvvalue[23], "매도호가잔량3": recvvalue[24], "매도호가잔량4": recvvalue[25],
511 "매도호가잔량5": recvvalue[26],
512 "매수호가잔량1": recvvalue[27], "매수호가잔량2": recvvalue[28], "매수호가잔량3": recvvalue[29], "매수호가잔량4": recvvalue[30],
513 "매수호가잔량5": recvvalue[31],
514 "총매도호가건수": recvvalue[32], "총매수호가건수": recvvalue[33],
515 "총매도호가잔량": recvvalue[34], "총매수호가잔량": recvvalue[35],
516 "총매도호가잔량증감": recvvalue[36], "총매수호가잔량증감": recvvalue[37]
517 }
519 def _parse_eurex_optn_contract_data(self, data_str):
520 """H0EUCNT0 (야간옵션(EUREX) 체결) 데이터를 파싱합니다."""
521 menulist = "옵션단축종목코드|영업시간|옵션현재가|전일대비부호|옵션전일대비|전일대비율|옵션시가2|옵션최고가|옵션최저가|최종거래량|누적거래량|누적거래대금|HTS이론가|HTS미결제약정수량|미결제약정수량증감|시가시간|시가2대비현재가부호|시가대비지수현재가|최고가시간|최고가대비현재가부호|최고가대비지수현재가|최저가시간|최저가대비현재가부호|최저가대비지수현재가|매수2비율|프리미엄값|내재가치값|시간가치값|델타|감마|베가|세타|로우|HTS내재변동성|괴리도|미결제약정직전수량증감|이론베이시스|역사적변동성|체결강도|괴리율|시장베이시스|옵션매도호가1|옵션매수호가1|매도호가잔량1|매수호가잔량1|매도체결건수|매수체결건수|순매수체결건수|총매도수량|총매수수량|총매도호가잔량|총매수호가잔량|전일거래량대비등락율"
522 keys = menulist.split('|')
523 values = data_str.split('^')
524 return dict(zip(keys, values[:len(keys)]))
526 def _parse_eurex_optn_exp_contract_data(self, data_str):
527 """H0EUANC0 (야간옵션(EUREX) 예상체결) 데이터를 파싱합니다."""
528 menulist = "옵션단축종목코드|영업시간|예상체결가|예상체결대비|예상체결대비부호|예상체결전일대비율|예상장운영구분코드"
529 keys = menulist.split('|')
530 values = data_str.split('^')
531 return dict(zip(keys, values[:len(keys)]))
533 def _parse_program_trading_data(self, data_str: str) -> dict:
534 """H0STPGM0 (국내주식 실시간 프로그램매매) 데이터를 파싱합니다."""
535 menulist = "유가증권단축종목코드|주식체결시간|매도체결량|매도거래대금|매수2체결량|매수2거래대금|순매수체결량|순매수거래대금|매도호가잔량|매수호가잔량|전체순매수호가잔량"
536 keys = menulist.split('|')
537 values = data_str.split('^')
538 return dict(zip(keys, values[:len(keys)]))
540 async def subscribe_program_trading(self, stock_code: str):
541 """국내주식 실시간 프로그램매매 동향 (H0STPGM0) 구독."""
542 tr_id = self._env.active_config['tr_ids']['websocket'].get('realtime_program_trading', 'H0STPGM0')
543 self._logger.info(f"[프로그램매매] 종목 {stock_code} 구독 요청 ({tr_id})...")
544 return await self.send_realtime_request(tr_id, stock_code, tr_type="1")
546 async def unsubscribe_program_trading(self, stock_code: str):
547 """국내주식 실시간 프로그램매매 동향 (H0STPGM0) 구독 해지."""
548 tr_id = self._env.active_config['tr_ids']['websocket'].get('realtime_program_trading', 'H0STPGM0')
549 self._logger.info(f"[프로그램매매] 종목 {stock_code} 구독 해지 요청 ({tr_id})...")
550 return await self.send_realtime_request(tr_id, stock_code, tr_type="2")
552 async def _resubscribe_all(self):
553 """재연결 시 기존 구독 항목들을 다시 구독 요청합니다."""
554 for tr_id, tr_key in list(self._subscribed_items):
555 self._logger.info(f"구독 복구 요청: TR_ID={tr_id}, KEY={tr_key}")
556 # send_realtime_request 내부에서 _subscribed_items에 다시 추가하므로 중복 방지 로직 필요 없음 (Set이므로)
557 await self.send_realtime_request(tr_id, tr_key, tr_type="1")
559 def is_receive_alive(self) -> bool:
560 """수신 태스크가 살아있는지 확인 (외부 워치독용)."""
561 return (self._receive_task is not None
562 and not self._receive_task.done()
563 and self._auto_reconnect)
565 # --- 웹소켓 연결 및 해지 ---
566 async def connect(self, on_message_callback=None):
567 """웹소켓 연결을 시작하고 실시간 데이터 수신을 준비합니다."""
568 self._websocket_url = self._env.get_websocket_url()
569 if self.ws and self._is_connected:
570 self._logger.info("웹소켓이 이미 연결되어 있습니다.")
571 return True
573 self.on_realtime_message_callback = on_message_callback # 외부 콜백 등록
574 self._auto_reconnect = True # 자동 재연결 활성화
576 if await self._establish_connection():
577 # 메시지 수신 태스크 시작 (이미 실행 중이면 건너뜀)
578 if self._receive_task and not self._receive_task.done(): 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true
579 self._receive_task.cancel()
580 try:
581 await self._receive_task
582 except asyncio.CancelledError:
583 pass
584 self._receive_task = asyncio.create_task(self._receive_messages())
585 return True
586 return False
588 async def disconnect(self):
589 """웹소켓 연결을 종료합니다."""
590 self._auto_reconnect = False # 수동 종료 시 재연결 비활성화
591 if self._is_connected and self.ws:
592 self._logger.info("웹소켓 연결 종료 요청.")
593 await self.ws.close()
594 self._is_connected = False
595 if self._receive_task:
596 self._receive_task.cancel()
597 try:
598 await self._receive_task
599 except asyncio.CancelledError:
600 self._logger.info("웹소켓 수신 태스크 취소됨.")
601 except Exception as e:
602 self._logger.error(f"웹소켓 수신 태스크 종료 중 오류: {e}", exc_info=True)
604 self._logger.info("웹소켓 연결 종료 완료.")
605 self._is_connected = False
606 self.ws = None
607 else:
608 self._logger.info("웹소켓이 연결되어 있지 않습니다.")
609 if self._receive_task:
610 self._receive_task.cancel()
611 try:
612 await self._receive_task
613 except asyncio.CancelledError:
614 self._logger.info("웹소켓 수신 태스크 취소됨.")
615 except Exception as e:
616 self._logger.error(f"웹소켓 수신 태스크 종료 중 오류: {e}")
617 self._is_connected = False
618 self.ws = None
620 # --- 실시간 요청 전송 ---
621 async def send_realtime_request(self, tr_id, tr_key, tr_type="1"):
622 """
623 실시간 데이터 구독/해지 요청 메시지를 웹소켓으로 전송합니다.
624 :param tr_id: 실시간 TR ID
625 :param tr_key: 구독할 종목코드 또는 HTS ID (체결통보용)
626 :param tr_type: 1: 등록, 2: 해지
627 """
628 self._websocket_url = self._env.active_config['websocket_url']
629 self._base_rest_url = self._env.active_config['base_url']
630 self._rest_api_key= self._env.active_config['api_key']
631 self._rest_api_secret= self._env.active_config['api_secret_key']
633 if not self._is_connected or not self.ws:
634 self._logger.error("웹소켓이 연결되어 있지 않아 실시간 요청을 보낼 수 없습니다.")
635 return False
636 if not self.approval_key:
637 self._logger.error("approval_key가 없어 실시간 요청을 보낼 수 없습니다.")
638 return False
640 header = {
641 "approval_key": self.approval_key,
642 "custtype": self._env.active_config['custtype'],
643 "tr_type": tr_type,
644 "content-type": "utf-8",
645 }
646 body = {
647 "input": {
648 "tr_id": tr_id,
649 "tr_key": tr_key,
650 }
651 }
653 request_message = {"header": header, "body": body}
654 message_json = _dumps(request_message)
656 self._logger.info(f"실시간 요청 전송: TR_ID={tr_id}, TR_KEY={tr_key}, TYPE={tr_type}")
657 try:
658 await self.ws.send(message_json)
660 # 구독 목록 관리
661 if tr_type == "1": 661 ↛ 663line 661 didn't jump to line 663 because the condition on line 661 was always true
662 self._subscribed_items.add((tr_id, tr_key))
663 elif tr_type == "2":
664 self._subscribed_items.discard((tr_id, tr_key))
665 return True
666 except Exception as e:
667 self._logger.exception(f"실시간 요청 전송 중 오류 발생: {e}")
668 self._is_connected = False
669 self.ws = None
670 return False
672 async def subscribe_realtime_price(self, stock_code):
673 """실시간 주식체결 데이터(현재가)를 구독합니다."""
674 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_price']
675 self._logger.info(f"종목 {stock_code} 실시간 체결 데이터 구독 요청 ({tr_id})...")
676 return await self.send_realtime_request(tr_id, stock_code, tr_type="1")
678 async def unsubscribe_realtime_price(self, stock_code):
679 """실시간 주식체결 데이터(현재가) 구독을 해지합니다."""
680 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_price']
681 self._logger.info(f"종목 {stock_code} 실시간 체결 데이터 구독 해지 요청 ({tr_id})...")
682 return await self.send_realtime_request(tr_id, stock_code, tr_type="2")
684 async def subscribe_unified_price(self, stock_code: str) -> bool:
685 """실시간 통합 체결가(H0UNCNT0)를 구독합니다. KRX+NXT 통합."""
686 tr_id = self._env.active_config['tr_ids']['websocket'].get('unified_realtime_price', 'H0UNCNT0')
687 self._logger.info(f"종목 {stock_code} 통합 체결가 구독 요청 ({tr_id})...")
688 return await self.send_realtime_request(tr_id, stock_code, tr_type="1")
690 async def unsubscribe_unified_price(self, stock_code: str) -> bool:
691 """실시간 통합 체결가(H0UNCNT0) 구독을 해지합니다."""
692 tr_id = self._env.active_config['tr_ids']['websocket'].get('unified_realtime_price', 'H0UNCNT0')
693 self._logger.info(f"종목 {stock_code} 통합 체결가 구독 해지 ({tr_id})...")
694 return await self.send_realtime_request(tr_id, stock_code, tr_type="2")
696 async def subscribe_realtime_quote(self, stock_code):
697 """실시간 주식호가 데이터를 구독합니다."""
698 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_quote']
699 self._logger.info(f"종목 {stock_code} 실시간 호가 데이터 구독 요청 ({tr_id})...")
700 return await self.send_realtime_request(tr_id, stock_code, tr_type="1")
702 async def unsubscribe_realtime_quote(self, stock_code):
703 """실시간 주식호가 데이터 구독을 해지합니다."""
704 tr_id = self._env.active_config['tr_ids']['websocket']['realtime_quote']
705 self._logger.info(f"종목 {stock_code} 실시간 호가 데이터 구독 해지 요청 ({tr_id})...")
706 return await self.send_realtime_request(tr_id, stock_code, tr_type="2")
708 # For test only
709 async def _on_receive(self, message):
710 try:
711 parsed = _loads(message) # <-- JSON 문자열을 dict로 파싱
712 if self.on_realtime_message_callback:
713 await self.on_realtime_message_callback(parsed)
714 else:
715 self._logger.warning("수신된 메시지를 처리할 콜백이 등록되지 않았습니다.")
716 except Exception as e:
717 self._logger.exception(f"수신 메시지 처리 중 예외 발생: {e}")