Coverage for services / streaming_service.py: 78%
134 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1"""
2WebSocket 스트리밍 관련 기능을 담당하는 서비스.
4역할:
5 - WebSocket 연결/해제 수명주기 관리 (connect, disconnect)
6 - 실시간 데이터 구독/해지 (subscribe/unsubscribe)
7 - 수신 메시지 dispatch 및 최신가 메모리 캐시 유지
8 - 프로그램매매 히스토리 조회 (REST)
10ProgramTradingStreamService와의 역할 구분:
11 - StreamingService : WebSocket 연결·구독·메시지 처리 (프로토콜 레이어)
12 - ProgramTradingStreamService: 프로그램매매 데이터의 저장·버퍼링·SSE 배포 (데이터 레이어)
13"""
14from __future__ import annotations
16import time
17from typing import Optional, Dict, TYPE_CHECKING
19from common.types import ResCommonResponse, ErrorCode
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 from brokers.broker_api_wrapper import BrokerAPIWrapper
23 from services.market_data_service import MarketDataService
24 from core.logger import StreamingEventLogger
27class StreamingService:
28 """
29 WebSocket 연결·구독·메시지 dispatch를 담당하는 서비스.
30 BrokerAPIWrapper를 통해 실제 WebSocket API에 위임한다.
31 """
33 def __init__(
34 self,
35 broker_api_wrapper: "BrokerAPIWrapper",
36 logger,
37 market_clock,
38 market_data_service: Optional["MarketDataService"] = None,
39 streaming_logger: Optional["StreamingEventLogger"] = None,
40 ):
41 self.broker = broker_api_wrapper
42 self.logger = logger
43 self.market_clock = market_clock
44 self.market_data_service = market_data_service
45 self._streaming_logger = streaming_logger
46 self._latest_prices: Dict[str, dict | str] = {}
47 self._last_console_print_time: float = 0.0
48 self._PRINT_THROTTLE_SEC: float = 0.5
49 self._callback = None # 재연결 시 콜백 유실 방지용 저장
51 # ── 연결 수명주기 ──────────────────────────────────────────────
53 async def connect_websocket(self, callback=None):
54 """WebSocket 연결 (BrokerAPIWrapper 위임).
56 callback이 전달되면 내부에 저장하여 이후 재연결 시에도 동일한 콜백을 사용한다.
57 callback이 None이면 기존에 저장된 콜백을 사용한다.
58 """
59 if callback is not None:
60 self._callback = callback
61 result = await self.broker.connect_websocket(self._callback)
62 if result and self._streaming_logger: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 self._streaming_logger.log_connect()
64 return result
66 async def disconnect_websocket(self):
67 """WebSocket 연결 해제 (BrokerAPIWrapper 위임)."""
68 result = await self.broker.disconnect_websocket()
69 if self._streaming_logger: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 self._streaming_logger.log_disconnect()
71 return result
73 # ── 구독 / 해지 ───────────────────────────────────────────────
75 async def subscribe_program_trading(self, code: str):
76 """프로그램매매 실시간 구독 (BrokerAPIWrapper 위임)."""
77 return await self.broker.subscribe_program_trading(code)
79 async def unsubscribe_program_trading(self, code: str):
80 """프로그램매매 구독 해지 (BrokerAPIWrapper 위임)."""
81 return await self.broker.unsubscribe_program_trading(code)
83 async def subscribe_realtime_price(self, code: str):
84 """실시간 체결가 구독 (BrokerAPIWrapper 위임)."""
85 return await self.broker.subscribe_realtime_price(code)
87 async def unsubscribe_realtime_price(self, code: str):
88 """실시간 체결가 구독 해지 (BrokerAPIWrapper 위임)."""
89 return await self.broker.unsubscribe_realtime_price(code)
91 async def subscribe_unified_price(self, code: str) -> bool:
92 """실시간 통합 체결가(H0UNCNT0) 구독 — PriceSubscriptionService 전용."""
93 return await self.broker.subscribe_unified_price(code)
95 async def unsubscribe_unified_price(self, code: str) -> bool:
96 """실시간 통합 체결가(H0UNCNT0) 구독 해지 — PriceSubscriptionService 전용."""
97 return await self.broker.unsubscribe_unified_price(code)
99 # ── 고수준 스트림 핸들러 ──────────────────────────────────────
101 async def handle_program_trading_stream(self, stock_code: str, duration: int = 60) -> None:
102 """
103 프로그램매매(H0STPGM0) 구독 → duration초 수신 → 해지.
104 CLI 등 단발성 스트리밍 용도.
105 """
106 await self.connect_websocket()
107 await self.subscribe_program_trading(stock_code)
108 try:
109 await self.market_clock.async_sleep(duration)
110 finally:
111 await self.unsubscribe_program_trading(stock_code)
112 await self.disconnect_websocket()
114 async def handle_realtime_stream(
115 self,
116 stock_codes: list[str],
117 fields: list[str],
118 duration: int = 30,
119 ) -> None:
120 """실시간 스트림 구독 및 처리 (price / quote 필드 지원)."""
121 self.logger.info(
122 f"StreamingService - 실시간 스트림 요청: 종목={stock_codes}, 필드={fields}, 시간={duration}s"
123 )
124 try:
125 await self.connect_websocket()
126 for code in stock_codes:
127 if "price" in fields: 127 ↛ 129line 127 didn't jump to line 129 because the condition on line 127 was always true
128 await self.subscribe_realtime_price(code)
129 if "quote" in fields: 129 ↛ 126line 129 didn't jump to line 126 because the condition on line 129 was always true
130 await self.broker.subscribe_realtime_quote(code)
132 from datetime import datetime, timedelta
133 start_time = datetime.now()
134 while (datetime.now() - start_time) < timedelta(seconds=duration): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 await self.market_clock.async_sleep(1)
136 except Exception as e:
137 self.logger.exception(f"실시간 스트림 처리 중 오류 발생: {str(e)}")
138 finally:
139 for code in stock_codes:
140 if "price" in fields: 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was always true
141 await self.unsubscribe_realtime_price(code)
142 if "quote" in fields:
143 await self.broker.unsubscribe_realtime_quote(code)
144 await self.disconnect_websocket()
145 self.logger.info("실시간 스트림 종료")
147 # ── 메시지 dispatch 및 캐시 ───────────────────────────────────
149 def dispatch_realtime_message(self, data: dict) -> None:
150 """실시간 WebSocket 메시지를 파싱하여 내부 최신가 캐시를 갱신한다."""
151 self.logger.debug(
152 f"실시간 데이터 수신: Type={data.get('type')}, TR_ID={data.get('tr_id')}, Data={data.get('data')}"
153 )
155 if data.get('type') == 'realtime_price':
156 realtime_data = data.get('data', {})
157 stock_code = realtime_data.get('유가증권단축종목코드')
158 current_price = realtime_data.get('주식현재가')
160 if stock_code and current_price: 160 ↛ 185line 160 didn't jump to line 185 because the condition on line 160 was always true
161 self._latest_prices[stock_code] = {
162 "price": current_price,
163 "change": realtime_data.get('전일대비', '0'),
164 "rate": realtime_data.get('전일대비율', '0.00'),
165 "sign": realtime_data.get('전일대비부호', '3'),
166 "received_at": time.time(),
167 }
169 # StockRepository 실시간 틱 캐시 즉시 반영
170 if ( 170 ↛ 185line 170 didn't jump to line 185 because the condition on line 170 was always true
171 self.market_data_service is not None
172 and hasattr(self.market_data_service, '_stock_repo')
173 and self.market_data_service._stock_repo
174 ):
175 try:
176 cum_vol = realtime_data.get('누적거래량', '0')
177 vol_int = int(cum_vol) if cum_vol and cum_vol != 'N/A' else 0
178 self.market_data_service._stock_repo.update_realtime_data(
179 stock_code, float(current_price), vol_int
180 )
181 except Exception as e:
182 self.logger.warning(f"StockRepository 실시간 틱 캐시 갱신 실패: {e}")
184 # 콘솔 출력 (0.5초 스로틀링 — 이벤트 루프 blocking 최소화)
185 now = time.monotonic()
186 if now - self._last_console_print_time >= self._PRINT_THROTTLE_SEC: 186 ↛ exitline 186 didn't return from function 'dispatch_realtime_message' because the condition on line 186 was always true
187 self._last_console_print_time = now
188 change = realtime_data.get('전일대비', 'N/A')
189 change_sign = realtime_data.get('전일대비부호', 'N/A')
190 change_rate = realtime_data.get('전일대비율', 'N/A')
191 cumulative_volume = realtime_data.get('누적거래량', 'N/A')
192 trade_time = realtime_data.get('주식체결시간', 'N/A')
193 display_message = (
194 f"\r[실시간 체결 - {trade_time}] 종목: {stock_code}: 현재가 {current_price}원, "
195 f"전일대비: {change_sign}{change} ({change_rate}%), 누적량: {cumulative_volume}"
196 )
197 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="")
199 elif data.get('type') == 'realtime_quote':
200 quote_data = data.get('data', {})
201 stock_code = quote_data.get('유가증권단축종목코드', 'N/A')
202 askp1 = quote_data.get('매도호가1', 'N/A')
203 bidp1 = quote_data.get('매수호가1', 'N/A')
204 trade_time = quote_data.get('영업시간', 'N/A')
205 now = time.monotonic()
206 if now - self._last_console_print_time >= self._PRINT_THROTTLE_SEC: 206 ↛ exitline 206 didn't return from function 'dispatch_realtime_message' because the condition on line 206 was always true
207 self._last_console_print_time = now
208 display_message = (
209 f"[실시간 호가 - {trade_time}] 종목: {stock_code}: 매도1호가: {askp1}, 매수1호가: {bidp1}"
210 )
211 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="")
213 elif data.get('type') == 'signing_notice': 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 notice_data = data.get('data', {})
215 order_num = notice_data.get('주문번호', 'N/A')
216 trade_qty = notice_data.get('체결수량', 'N/A')
217 trade_price = notice_data.get('체결단가', 'N/A')
218 trade_time = notice_data.get('주식체결시간', 'N/A')
219 self.logger.debug(
220 f"\n[체결통보] 주문: {order_num}, 수량: {trade_qty}, "
221 f"단가: {trade_price}, 시간: {trade_time}"
222 )
224 elif data.get('type') == 'realtime_program_trading': 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 d = data.get('data', {})
226 t = d.get('주식체결시간', 'N/A')
227 ntby = d.get('순매수거래대금', '0')
228 now = time.monotonic()
229 if now - self._last_console_print_time >= self._PRINT_THROTTLE_SEC:
230 self._last_console_print_time = now
231 msg = f"[프로그램매매 - {t}] 순매수거래대금: {ntby}"
232 self.logger.debug(f"\r{msg}{' ' * max(0, 80 - len(msg))}", end="")
234 else:
235 self.logger.debug(
236 f"처리되지 않은 실시간 메시지: {data.get('tr_id')} - {data}"
237 )
239 def get_cached_realtime_price(self, code: str) -> Optional[Dict | str]:
240 """메모리 캐시에서 실시간 최신가 정보를 반환한다."""
241 return self._latest_prices.get(code)
243 # ── REST 조회 ─────────────────────────────────────────────────
245 async def handle_get_program_trading_history(self, code: str) -> ResCommonResponse:
246 """종목별 프로그램매매 추이 히스토리 조회 (REST, 실전 전용)."""
247 self.logger.info(f"StreamingService - 프로그램매매 히스토리 조회: {code}")
248 try:
249 return await self.broker.get_program_trade_by_stock_daily(code)
250 except Exception as e:
251 self.logger.error(f"프로그램매매 히스토리 조회 실패 ({code}): {e}")
252 return ResCommonResponse(
253 rt_cd=ErrorCode.API_ERROR.value,
254 msg1=str(e),
255 data=None,
256 )