Coverage for services / streaming_service.py: 78%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1""" 

2WebSocket 스트리밍 관련 기능을 담당하는 서비스. 

3 

4역할: 

5 - WebSocket 연결/해제 수명주기 관리 (connect, disconnect) 

6 - 실시간 데이터 구독/해지 (subscribe/unsubscribe) 

7 - 수신 메시지 dispatch 및 최신가 메모리 캐시 유지 

8 - 프로그램매매 히스토리 조회 (REST) 

9 

10ProgramTradingStreamService와의 역할 구분: 

11 - StreamingService : WebSocket 연결·구독·메시지 처리 (프로토콜 레이어) 

12 - ProgramTradingStreamService: 프로그램매매 데이터의 저장·버퍼링·SSE 배포 (데이터 레이어) 

13""" 

14from __future__ import annotations 

15 

16import time 

17from typing import Optional, Dict, TYPE_CHECKING 

18 

19from common.types import ResCommonResponse, ErrorCode 

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 from brokers.broker_api_wrapper import BrokerAPIWrapper 

23 from services.market_data_service import MarketDataService 

24 from core.logger import StreamingEventLogger 

25 

26 

27class StreamingService: 

28 """ 

29 WebSocket 연결·구독·메시지 dispatch를 담당하는 서비스. 

30 BrokerAPIWrapper를 통해 실제 WebSocket API에 위임한다. 

31 """ 

32 

33 def __init__( 

34 self, 

35 broker_api_wrapper: "BrokerAPIWrapper", 

36 logger, 

37 market_clock, 

38 market_data_service: Optional["MarketDataService"] = None, 

39 streaming_logger: Optional["StreamingEventLogger"] = None, 

40 ): 

41 self.broker = broker_api_wrapper 

42 self.logger = logger 

43 self.market_clock = market_clock 

44 self.market_data_service = market_data_service 

45 self._streaming_logger = streaming_logger 

46 self._latest_prices: Dict[str, dict | str] = {} 

47 self._last_console_print_time: float = 0.0 

48 self._PRINT_THROTTLE_SEC: float = 0.5 

49 self._callback = None # 재연결 시 콜백 유실 방지용 저장 

50 

51 # ── 연결 수명주기 ────────────────────────────────────────────── 

52 

53 async def connect_websocket(self, callback=None): 

54 """WebSocket 연결 (BrokerAPIWrapper 위임). 

55 

56 callback이 전달되면 내부에 저장하여 이후 재연결 시에도 동일한 콜백을 사용한다. 

57 callback이 None이면 기존에 저장된 콜백을 사용한다. 

58 """ 

59 if callback is not None: 

60 self._callback = callback 

61 result = await self.broker.connect_websocket(self._callback) 

62 if result and self._streaming_logger: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 self._streaming_logger.log_connect() 

64 return result 

65 

66 async def disconnect_websocket(self): 

67 """WebSocket 연결 해제 (BrokerAPIWrapper 위임).""" 

68 result = await self.broker.disconnect_websocket() 

69 if self._streaming_logger: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 self._streaming_logger.log_disconnect() 

71 return result 

72 

73 # ── 구독 / 해지 ─────────────────────────────────────────────── 

74 

75 async def subscribe_program_trading(self, code: str): 

76 """프로그램매매 실시간 구독 (BrokerAPIWrapper 위임).""" 

77 return await self.broker.subscribe_program_trading(code) 

78 

79 async def unsubscribe_program_trading(self, code: str): 

80 """프로그램매매 구독 해지 (BrokerAPIWrapper 위임).""" 

81 return await self.broker.unsubscribe_program_trading(code) 

82 

83 async def subscribe_realtime_price(self, code: str): 

84 """실시간 체결가 구독 (BrokerAPIWrapper 위임).""" 

85 return await self.broker.subscribe_realtime_price(code) 

86 

87 async def unsubscribe_realtime_price(self, code: str): 

88 """실시간 체결가 구독 해지 (BrokerAPIWrapper 위임).""" 

89 return await self.broker.unsubscribe_realtime_price(code) 

90 

91 async def subscribe_unified_price(self, code: str) -> bool: 

92 """실시간 통합 체결가(H0UNCNT0) 구독 — PriceSubscriptionService 전용.""" 

93 return await self.broker.subscribe_unified_price(code) 

94 

95 async def unsubscribe_unified_price(self, code: str) -> bool: 

96 """실시간 통합 체결가(H0UNCNT0) 구독 해지 — PriceSubscriptionService 전용.""" 

97 return await self.broker.unsubscribe_unified_price(code) 

98 

99 # ── 고수준 스트림 핸들러 ────────────────────────────────────── 

100 

101 async def handle_program_trading_stream(self, stock_code: str, duration: int = 60) -> None: 

102 """ 

103 프로그램매매(H0STPGM0) 구독 → duration초 수신 → 해지. 

104 CLI 등 단발성 스트리밍 용도. 

105 """ 

106 await self.connect_websocket() 

107 await self.subscribe_program_trading(stock_code) 

108 try: 

109 await self.market_clock.async_sleep(duration) 

110 finally: 

111 await self.unsubscribe_program_trading(stock_code) 

112 await self.disconnect_websocket() 

113 

114 async def handle_realtime_stream( 

115 self, 

116 stock_codes: list[str], 

117 fields: list[str], 

118 duration: int = 30, 

119 ) -> None: 

120 """실시간 스트림 구독 및 처리 (price / quote 필드 지원).""" 

121 self.logger.info( 

122 f"StreamingService - 실시간 스트림 요청: 종목={stock_codes}, 필드={fields}, 시간={duration}s" 

123 ) 

124 try: 

125 await self.connect_websocket() 

126 for code in stock_codes: 

127 if "price" in fields: 127 ↛ 129line 127 didn't jump to line 129 because the condition on line 127 was always true

128 await self.subscribe_realtime_price(code) 

129 if "quote" in fields: 129 ↛ 126line 129 didn't jump to line 126 because the condition on line 129 was always true

130 await self.broker.subscribe_realtime_quote(code) 

131 

132 from datetime import datetime, timedelta 

133 start_time = datetime.now() 

134 while (datetime.now() - start_time) < timedelta(seconds=duration): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 await self.market_clock.async_sleep(1) 

136 except Exception as e: 

137 self.logger.exception(f"실시간 스트림 처리 중 오류 발생: {str(e)}") 

138 finally: 

139 for code in stock_codes: 

140 if "price" in fields: 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was always true

141 await self.unsubscribe_realtime_price(code) 

142 if "quote" in fields: 

143 await self.broker.unsubscribe_realtime_quote(code) 

144 await self.disconnect_websocket() 

145 self.logger.info("실시간 스트림 종료") 

146 

147 # ── 메시지 dispatch 및 캐시 ─────────────────────────────────── 

148 

149 def dispatch_realtime_message(self, data: dict) -> None: 

150 """실시간 WebSocket 메시지를 파싱하여 내부 최신가 캐시를 갱신한다.""" 

151 self.logger.debug( 

152 f"실시간 데이터 수신: Type={data.get('type')}, TR_ID={data.get('tr_id')}, Data={data.get('data')}" 

153 ) 

154 

155 if data.get('type') == 'realtime_price': 

156 realtime_data = data.get('data', {}) 

157 stock_code = realtime_data.get('유가증권단축종목코드') 

158 current_price = realtime_data.get('주식현재가') 

159 

160 if stock_code and current_price: 160 ↛ 185line 160 didn't jump to line 185 because the condition on line 160 was always true

161 self._latest_prices[stock_code] = { 

162 "price": current_price, 

163 "change": realtime_data.get('전일대비', '0'), 

164 "rate": realtime_data.get('전일대비율', '0.00'), 

165 "sign": realtime_data.get('전일대비부호', '3'), 

166 "received_at": time.time(), 

167 } 

168 

169 # StockRepository 실시간 틱 캐시 즉시 반영 

170 if ( 170 ↛ 185line 170 didn't jump to line 185 because the condition on line 170 was always true

171 self.market_data_service is not None 

172 and hasattr(self.market_data_service, '_stock_repo') 

173 and self.market_data_service._stock_repo 

174 ): 

175 try: 

176 cum_vol = realtime_data.get('누적거래량', '0') 

177 vol_int = int(cum_vol) if cum_vol and cum_vol != 'N/A' else 0 

178 self.market_data_service._stock_repo.update_realtime_data( 

179 stock_code, float(current_price), vol_int 

180 ) 

181 except Exception as e: 

182 self.logger.warning(f"StockRepository 실시간 틱 캐시 갱신 실패: {e}") 

183 

184 # 콘솔 출력 (0.5초 스로틀링 — 이벤트 루프 blocking 최소화) 

185 now = time.monotonic() 

186 if now - self._last_console_print_time >= self._PRINT_THROTTLE_SEC: 186 ↛ exitline 186 didn't return from function 'dispatch_realtime_message' because the condition on line 186 was always true

187 self._last_console_print_time = now 

188 change = realtime_data.get('전일대비', 'N/A') 

189 change_sign = realtime_data.get('전일대비부호', 'N/A') 

190 change_rate = realtime_data.get('전일대비율', 'N/A') 

191 cumulative_volume = realtime_data.get('누적거래량', 'N/A') 

192 trade_time = realtime_data.get('주식체결시간', 'N/A') 

193 display_message = ( 

194 f"\r[실시간 체결 - {trade_time}] 종목: {stock_code}: 현재가 {current_price}원, " 

195 f"전일대비: {change_sign}{change} ({change_rate}%), 누적량: {cumulative_volume}" 

196 ) 

197 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="") 

198 

199 elif data.get('type') == 'realtime_quote': 

200 quote_data = data.get('data', {}) 

201 stock_code = quote_data.get('유가증권단축종목코드', 'N/A') 

202 askp1 = quote_data.get('매도호가1', 'N/A') 

203 bidp1 = quote_data.get('매수호가1', 'N/A') 

204 trade_time = quote_data.get('영업시간', 'N/A') 

205 now = time.monotonic() 

206 if now - self._last_console_print_time >= self._PRINT_THROTTLE_SEC: 206 ↛ exitline 206 didn't return from function 'dispatch_realtime_message' because the condition on line 206 was always true

207 self._last_console_print_time = now 

208 display_message = ( 

209 f"[실시간 호가 - {trade_time}] 종목: {stock_code}: 매도1호가: {askp1}, 매수1호가: {bidp1}" 

210 ) 

211 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="") 

212 

213 elif data.get('type') == 'signing_notice': 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 notice_data = data.get('data', {}) 

215 order_num = notice_data.get('주문번호', 'N/A') 

216 trade_qty = notice_data.get('체결수량', 'N/A') 

217 trade_price = notice_data.get('체결단가', 'N/A') 

218 trade_time = notice_data.get('주식체결시간', 'N/A') 

219 self.logger.debug( 

220 f"\n[체결통보] 주문: {order_num}, 수량: {trade_qty}, " 

221 f"단가: {trade_price}, 시간: {trade_time}" 

222 ) 

223 

224 elif data.get('type') == 'realtime_program_trading': 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 d = data.get('data', {}) 

226 t = d.get('주식체결시간', 'N/A') 

227 ntby = d.get('순매수거래대금', '0') 

228 now = time.monotonic() 

229 if now - self._last_console_print_time >= self._PRINT_THROTTLE_SEC: 

230 self._last_console_print_time = now 

231 msg = f"[프로그램매매 - {t}] 순매수거래대금: {ntby}" 

232 self.logger.debug(f"\r{msg}{' ' * max(0, 80 - len(msg))}", end="") 

233 

234 else: 

235 self.logger.debug( 

236 f"처리되지 않은 실시간 메시지: {data.get('tr_id')} - {data}" 

237 ) 

238 

239 def get_cached_realtime_price(self, code: str) -> Optional[Dict | str]: 

240 """메모리 캐시에서 실시간 최신가 정보를 반환한다.""" 

241 return self._latest_prices.get(code) 

242 

243 # ── REST 조회 ───────────────────────────────────────────────── 

244 

245 async def handle_get_program_trading_history(self, code: str) -> ResCommonResponse: 

246 """종목별 프로그램매매 추이 히스토리 조회 (REST, 실전 전용).""" 

247 self.logger.info(f"StreamingService - 프로그램매매 히스토리 조회: {code}") 

248 try: 

249 return await self.broker.get_program_trade_by_stock_daily(code) 

250 except Exception as e: 

251 self.logger.error(f"프로그램매매 히스토리 조회 실패 ({code}): {e}") 

252 return ResCommonResponse( 

253 rt_cd=ErrorCode.API_ERROR.value, 

254 msg1=str(e), 

255 data=None, 

256 )