Coverage for task / background / intraday / websocket_watchdog_task.py: 83%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/intraday/websocket_watchdog_task.py 

2""" 

3프로그램매매 WebSocket 연결 감시 및 자동 복원 태스크. 

4WebSocket 수신 태스크 상태를 주기적으로 감시하고, 

5데이터 수신이 끊기면 재연결한다. 

6""" 

7import asyncio 

8import logging 

9import time 

10from typing import Dict, List, Optional, TYPE_CHECKING 

11 

12from interfaces.schedulable_task import SchedulableTask, TaskPriority, TaskState 

13from core.performance_profiler import PerformanceProfiler 

14from services.notification_service import NotificationService 

15 

16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true

17 from services.streaming_service import StreamingService 

18 from services.program_trading_stream_service import ProgramTradingStreamService 

19 from services.market_calendar_service import MarketCalendarService 

20 from core.logger import StreamingEventLogger 

21 

22# 재구독 시 패킷 간 딜레이 (초) — 증권사 Rate Limit 방지 

23SUBSCRIBE_DELAY_SEC = 0.2 

24 

25 

26class WebSocketWatchdogTask(SchedulableTask): 

27 """프로그램매매 WebSocket 연결을 감시·복원하는 백그라운드 태스크.""" 

28 

29 def __init__( 

30 self, 

31 streaming_service: Optional["StreamingService"] = None, 

32 realtime_data_service: Optional["ProgramTradingStreamService"] = None, 

33 market_calendar_service: Optional["MarketCalendarService"] = None, 

34 performance_profiler: Optional[PerformanceProfiler] = None, 

35 notification_service: Optional[NotificationService] = None, 

36 logger=None, 

37 streaming_logger: Optional["StreamingEventLogger"] = None, 

38 ): 

39 self._streaming_service = streaming_service 

40 self._realtime_data_service = realtime_data_service 

41 self.mcs = market_calendar_service 

42 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

43 self._ns = notification_service 

44 self._logger = logger or logging.getLogger(__name__) 

45 self._streaming_logger = streaming_logger 

46 

47 # SchedulableTask 상태 

48 self._state: TaskState = TaskState.IDLE 

49 self._tasks: List[asyncio.Task] = [] 

50 self._market_open: Optional[bool] = None # 가장 최근 시장 개장 여부 (워치독 루프에서 갱신) 

51 self._intentionally_disconnected: bool = False # 장 마감으로 인한 의도적 연결 종료 여부 

52 

53 # ── SchedulableTask 인터페이스 구현 ──────────────────────── 

54 

55 @property 

56 def task_name(self) -> str: 

57 return "websocket_watchdog" 

58 

59 @property 

60 def priority(self) -> TaskPriority: 

61 return TaskPriority.NORMAL 

62 

63 @property 

64 def state(self) -> TaskState: 

65 return self._state 

66 

67 async def start(self) -> None: 

68 """WebSocket 워치독 + 구독 복원 태스크를 시작한다.""" 

69 if self._state == TaskState.RUNNING: 

70 return 

71 self._state = TaskState.RUNNING 

72 

73 # 1. 실시간 데이터 매니저 백그라운드 태스크 (데이터 정리 등) 

74 if self._realtime_data_service: 74 ↛ 78line 74 didn't jump to line 78 because the condition on line 74 was always true

75 self._realtime_data_service.start_background_tasks() 

76 

77 # 2. 이전 구독 상태 자동 복원 

78 if self._realtime_data_service: 78 ↛ 86line 78 didn't jump to line 86 because the condition on line 78 was always true

79 saved_codes = self._realtime_data_service.get_subscribed_codes() 

80 if saved_codes: 80 ↛ 86line 80 didn't jump to line 86 because the condition on line 80 was always true

81 self._tasks.append( 

82 asyncio.create_task(self._restore_program_trading(saved_codes)) 

83 ) 

84 

85 # 3. 프로그램매매 연결 상태 워치독 

86 self._tasks.append( 

87 asyncio.create_task(self._program_trading_watchdog()) 

88 ) 

89 

90 self._logger.info(f"WebSocketWatchdogTask 시작: {len(self._tasks)}개 태스크") 

91 

92 async def stop(self) -> None: 

93 """모든 워치독 태스크를 취소하고 정리한다.""" 

94 self._logger.info(f"WebSocketWatchdogTask 종료 시작: {len(self._tasks)}개 태스크") 

95 

96 for task in self._tasks: 

97 if not task.done(): 

98 task.cancel() 

99 

100 if self._tasks: 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true

101 await asyncio.gather(*self._tasks, return_exceptions=True) 

102 

103 self._tasks.clear() 

104 

105 # 실시간 데이터 매니저 종료 

106 if self._realtime_data_service: 106 ↛ 109line 106 didn't jump to line 109 because the condition on line 106 was always true

107 await self._realtime_data_service.shutdown() 

108 

109 self._state = TaskState.STOPPED 

110 self._logger.info("WebSocketWatchdogTask 종료 완료") 

111 

112 async def suspend(self) -> None: 

113 """워치독을 일시 중지한다.""" 

114 if self._state == TaskState.RUNNING: 

115 self._state = TaskState.SUSPENDED 

116 self._logger.info("WebSocketWatchdogTask 일시 중지") 

117 

118 async def resume(self) -> None: 

119 """워치독을 재개한다.""" 

120 if self._state == TaskState.SUSPENDED: 

121 self._state = TaskState.RUNNING 

122 self._logger.info("WebSocketWatchdogTask 재개") 

123 

124 # ── 프로그램매매 워치독 / 복원 / 재연결 ────────────────────── 

125 

126 async def _restore_program_trading(self, codes: list) -> None: 

127 """앱 시작 시 이전 구독 상태를 자동 복원 (백그라운드).""" 

128 self._logger.info(f"프로그램매매 구독 복원 시작: {codes}") 

129 success_count = 0 

130 failed_codes = [] 

131 for code in codes: 

132 try: 

133 # StreamingService가 내부에 콜백을 저장하므로 인자 없이 호출 

134 connected = await self._streaming_service.connect_websocket() 

135 if not connected: 

136 self._logger.warning(f"프로그램매매 복원 실패 (WebSocket 연결 불가): {code}") 

137 failed_codes.append(code) 

138 continue 

139 await self._streaming_service.subscribe_program_trading(code) 

140 if self._streaming_logger: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 self._streaming_logger.log_pt_subscribe(code, reason="restore") 

142 await self._streaming_service.subscribe_realtime_price(code) 

143 if self._streaming_logger: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 self._streaming_logger.log_price_subscribe(code, reason="restore") 

145 success_count += 1 

146 # 증권사 Rate Limit 방지: 패킷 간 딜레이 

147 await asyncio.sleep(SUBSCRIBE_DELAY_SEC) 

148 except Exception as e: 

149 self._logger.error(f"프로그램매매 복원 중 오류 ({code}): {e}") 

150 failed_codes.append(code) 

151 

152 if failed_codes: 

153 self._logger.warning(f"복원에 실패한 구독 종목을 상태에서 제거합니다: {failed_codes}") 

154 for code in failed_codes: 

155 self._realtime_data_service.remove_subscribed_code(code) 

156 if self._streaming_logger: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 self._streaming_logger.log_pt_unsubscribe(code, reason="restore_failed") 

158 self._streaming_logger.log_price_unsubscribe(code, reason="restore_failed") 

159 

160 if self._streaming_logger: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 self._streaming_logger.log_restore( 

162 codes=codes, 

163 success=success_count, 

164 total=len(codes), 

165 ) 

166 self._logger.info(f"프로그램매매 구독 복원 완료: {success_count}/{len(codes)}개 종목") 

167 

168 async def _program_trading_watchdog(self) -> None: 

169 """프로그램매매 WebSocket 연결 상태를 주기적으로 감시하고, 데이터 수신이 끊기면 재연결.""" 

170 WATCHDOG_INTERVAL = 60 # 감시 주기 (초) 

171 DATA_GAP_THRESHOLD = 300 # 데이터 미수신 허용 최대 시간 (초) — 소외주 오탐 방지를 위해 120→300 

172 

173 while True: 

174 try: 

175 await asyncio.sleep(WATCHDOG_INTERVAL) 

176 

177 # suspend 상태이면 감시 스킵 

178 if self._state == TaskState.SUSPENDED: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 continue 

180 

181 if not self._realtime_data_service: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 continue 

183 

184 codes = self._realtime_data_service.get_subscribed_codes() 

185 if not codes: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 continue # 구독 중인 종목 없으면 스킵 

187 

188 market_is_open = bool(self.mcs and await self.mcs.is_market_open_now()) 

189 self._market_open = market_is_open 

190 if not market_is_open: 

191 # 장 마감 시간이면 연결을 명시적으로 종료하여 리소스 정리 

192 if self._streaming_service and self._streaming_service.broker.is_websocket_receive_alive(): 

193 self._logger.info("[워치독] 장 마감 시간이므로 웹소켓 연결을 종료합니다.") 

194 await self._streaming_service.disconnect_websocket() 

195 self._intentionally_disconnected = True 

196 continue 

197 

198 # 조건 1: 수신 태스크가 죽었는지 확인 

199 receive_alive = ( 

200 self._streaming_service is not None 

201 and self._streaming_service.broker.is_websocket_receive_alive() 

202 ) 

203 

204 # 조건 2: 데이터 수신 갭 확인 (한 번이라도 데이터를 받은 적이 있을 때만) 

205 last_ts = self._realtime_data_service.last_data_ts 

206 data_gap = (time.time() - last_ts) if last_ts > 0 else 0.0 

207 

208 reconnect_trigger = None 

209 if not receive_alive: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 if self._intentionally_disconnected: 

211 self._logger.info("[워치독] 장 시작 — 신규 WebSocket 연결을 수립합니다.") 

212 reconnect_trigger = "market_open" 

213 else: 

214 self._logger.warning("[워치독] WebSocket 수신 태스크가 종료됨. 재연결을 시도합니다.") 

215 reconnect_trigger = "receive_task_dead" 

216 elif last_ts > 0 and data_gap > DATA_GAP_THRESHOLD: 

217 self._logger.warning(f"[워치독] {data_gap:.0f}초간 데이터 미수신 (임계값: {DATA_GAP_THRESHOLD}초). 재연결을 시도합니다.") 

218 reconnect_trigger = f"data_gap_{data_gap:.0f}s" 

219 

220 if reconnect_trigger: 

221 self._intentionally_disconnected = False 

222 await self.force_reconnect_program_trading(trigger=reconnect_trigger) 

223 

224 except asyncio.CancelledError: 

225 break 

226 except Exception as e: 

227 self._logger.error(f"[워치독] 오류 발생: {e}") 

228 

229 def get_progress(self) -> Dict: 

230 """태스크 진행률 반환 (SchedulableTask 인터페이스 구현). 

231 

232 Watchdog 태스크는 배치 진행률이 없으므로 연결 상태 정보를 반환한다. 

233 """ 

234 subscribed = 0 

235 if self._realtime_data_service: 235 ↛ 239line 235 didn't jump to line 239 because the condition on line 235 was always true

236 codes = self._realtime_data_service.get_subscribed_codes() 

237 subscribed = len(codes) if codes else 0 

238 

239 last_ts = 0.0 

240 data_gap = None 

241 if self._realtime_data_service: 241 ↛ 246line 241 didn't jump to line 246 because the condition on line 241 was always true

242 last_ts = getattr(self._realtime_data_service, "last_data_ts", 0.0) 

243 if last_ts > 0: 

244 data_gap = round(time.time() - last_ts, 1) 

245 

246 return { 

247 "running": self._state == TaskState.RUNNING, 

248 "subscribed_codes": subscribed, 

249 "data_gap_sec": data_gap, 

250 "market_open": self._market_open, 

251 } 

252 

253 async def force_reconnect_program_trading(self, trigger: str = "manual") -> None: 

254 """WebSocket 연결을 강제로 끊고 재연결 + 재구독. 

255 

256 Args: 

257 trigger: 재연결 원인 ("receive_task_dead" | "data_gap_{N}s" | "manual") 

258 """ 

259 if not self._realtime_data_service: 

260 return 

261 

262 codes = self._realtime_data_service.get_subscribed_codes() 

263 if not codes: 

264 return 

265 

266 t_start = self.pm.start_timer() 

267 self._logger.info(f"[워치독] 강제 재연결 시작 (구독 종목: {codes})") 

268 try: 

269 # 1. 기존 WebSocket 연결 강제 종료 

270 await self._streaming_service.disconnect_websocket() 

271 except Exception as e: 

272 self._logger.warning(f"[워치독] 기존 연결 종료 중 오류 (무시): {e}") 

273 

274 # 2. 새 연결 + 재구독 (StreamingService가 내부에 콜백을 저장하므로 인자 없이 호출) 

275 success_count = 0 

276 failed_codes = [] 

277 for code in codes: 

278 try: 

279 connected = await self._streaming_service.connect_websocket() 

280 if not connected: 

281 self._logger.warning(f"[워치독] 재연결 실패: {code}") 

282 failed_codes.append(code) 

283 continue 

284 await self._streaming_service.subscribe_program_trading(code) 

285 if self._streaming_logger: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 self._streaming_logger.log_pt_subscribe(code, reason="reconnect") 

287 await self._streaming_service.subscribe_realtime_price(code) 

288 if self._streaming_logger: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true

289 self._streaming_logger.log_price_subscribe(code, reason="reconnect") 

290 success_count += 1 

291 # 증권사 Rate Limit 방지: 패킷 간 딜레이 

292 await asyncio.sleep(SUBSCRIBE_DELAY_SEC) 

293 except Exception as e: 

294 self._logger.error(f"[워치독] 재구독 중 오류 ({code}): {e}") 

295 failed_codes.append(code) 

296 

297 if failed_codes: 

298 self._logger.warning(f"[워치독] 재구독에 실패한 종목을 상태에서 제거합니다: {failed_codes}") 

299 for code in failed_codes: 

300 self._realtime_data_service.remove_subscribed_code(code) 

301 if self._streaming_logger: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 self._streaming_logger.log_pt_unsubscribe(code, reason="reconnect_failed") 

303 self._streaming_logger.log_price_unsubscribe(code, reason="reconnect_failed") 

304 

305 if self._streaming_logger: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 self._streaming_logger.log_reconnect( 

307 trigger=trigger, 

308 codes=codes, 

309 success=success_count, 

310 total=len(codes), 

311 ) 

312 self.pm.log_timer(f"WebSocketWatchdogTask.force_reconnect_program_trading({success_count}/{len(codes)})", t_start) 

313 self._logger.info(f"[워치독] 강제 재연결 완료: {success_count}/{len(codes)}개 종목")