Coverage for task / background / intraday / websocket_watchdog_task.py: 83%
195 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# task/background/intraday/websocket_watchdog_task.py
2"""
3프로그램매매 WebSocket 연결 감시 및 자동 복원 태스크.
4WebSocket 수신 태스크 상태를 주기적으로 감시하고,
5데이터 수신이 끊기면 재연결한다.
6"""
7import asyncio
8import logging
9import time
10from typing import Dict, List, Optional, TYPE_CHECKING
12from interfaces.schedulable_task import SchedulableTask, TaskPriority, TaskState
13from core.performance_profiler import PerformanceProfiler
14from services.notification_service import NotificationService
16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true
17 from services.streaming_service import StreamingService
18 from services.program_trading_stream_service import ProgramTradingStreamService
19 from services.market_calendar_service import MarketCalendarService
20 from core.logger import StreamingEventLogger
22# 재구독 시 패킷 간 딜레이 (초) — 증권사 Rate Limit 방지
23SUBSCRIBE_DELAY_SEC = 0.2
26class WebSocketWatchdogTask(SchedulableTask):
27 """프로그램매매 WebSocket 연결을 감시·복원하는 백그라운드 태스크."""
29 def __init__(
30 self,
31 streaming_service: Optional["StreamingService"] = None,
32 realtime_data_service: Optional["ProgramTradingStreamService"] = None,
33 market_calendar_service: Optional["MarketCalendarService"] = None,
34 performance_profiler: Optional[PerformanceProfiler] = None,
35 notification_service: Optional[NotificationService] = None,
36 logger=None,
37 streaming_logger: Optional["StreamingEventLogger"] = None,
38 ):
39 self._streaming_service = streaming_service
40 self._realtime_data_service = realtime_data_service
41 self.mcs = market_calendar_service
42 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
43 self._ns = notification_service
44 self._logger = logger or logging.getLogger(__name__)
45 self._streaming_logger = streaming_logger
47 # SchedulableTask 상태
48 self._state: TaskState = TaskState.IDLE
49 self._tasks: List[asyncio.Task] = []
50 self._market_open: Optional[bool] = None # 가장 최근 시장 개장 여부 (워치독 루프에서 갱신)
51 self._intentionally_disconnected: bool = False # 장 마감으로 인한 의도적 연결 종료 여부
53 # ── SchedulableTask 인터페이스 구현 ────────────────────────
55 @property
56 def task_name(self) -> str:
57 return "websocket_watchdog"
59 @property
60 def priority(self) -> TaskPriority:
61 return TaskPriority.NORMAL
63 @property
64 def state(self) -> TaskState:
65 return self._state
67 async def start(self) -> None:
68 """WebSocket 워치독 + 구독 복원 태스크를 시작한다."""
69 if self._state == TaskState.RUNNING:
70 return
71 self._state = TaskState.RUNNING
73 # 1. 실시간 데이터 매니저 백그라운드 태스크 (데이터 정리 등)
74 if self._realtime_data_service: 74 ↛ 78line 74 didn't jump to line 78 because the condition on line 74 was always true
75 self._realtime_data_service.start_background_tasks()
77 # 2. 이전 구독 상태 자동 복원
78 if self._realtime_data_service: 78 ↛ 86line 78 didn't jump to line 86 because the condition on line 78 was always true
79 saved_codes = self._realtime_data_service.get_subscribed_codes()
80 if saved_codes: 80 ↛ 86line 80 didn't jump to line 86 because the condition on line 80 was always true
81 self._tasks.append(
82 asyncio.create_task(self._restore_program_trading(saved_codes))
83 )
85 # 3. 프로그램매매 연결 상태 워치독
86 self._tasks.append(
87 asyncio.create_task(self._program_trading_watchdog())
88 )
90 self._logger.info(f"WebSocketWatchdogTask 시작: {len(self._tasks)}개 태스크")
92 async def stop(self) -> None:
93 """모든 워치독 태스크를 취소하고 정리한다."""
94 self._logger.info(f"WebSocketWatchdogTask 종료 시작: {len(self._tasks)}개 태스크")
96 for task in self._tasks:
97 if not task.done():
98 task.cancel()
100 if self._tasks: 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true
101 await asyncio.gather(*self._tasks, return_exceptions=True)
103 self._tasks.clear()
105 # 실시간 데이터 매니저 종료
106 if self._realtime_data_service: 106 ↛ 109line 106 didn't jump to line 109 because the condition on line 106 was always true
107 await self._realtime_data_service.shutdown()
109 self._state = TaskState.STOPPED
110 self._logger.info("WebSocketWatchdogTask 종료 완료")
112 async def suspend(self) -> None:
113 """워치독을 일시 중지한다."""
114 if self._state == TaskState.RUNNING:
115 self._state = TaskState.SUSPENDED
116 self._logger.info("WebSocketWatchdogTask 일시 중지")
118 async def resume(self) -> None:
119 """워치독을 재개한다."""
120 if self._state == TaskState.SUSPENDED:
121 self._state = TaskState.RUNNING
122 self._logger.info("WebSocketWatchdogTask 재개")
124 # ── 프로그램매매 워치독 / 복원 / 재연결 ──────────────────────
126 async def _restore_program_trading(self, codes: list) -> None:
127 """앱 시작 시 이전 구독 상태를 자동 복원 (백그라운드)."""
128 self._logger.info(f"프로그램매매 구독 복원 시작: {codes}")
129 success_count = 0
130 failed_codes = []
131 for code in codes:
132 try:
133 # StreamingService가 내부에 콜백을 저장하므로 인자 없이 호출
134 connected = await self._streaming_service.connect_websocket()
135 if not connected:
136 self._logger.warning(f"프로그램매매 복원 실패 (WebSocket 연결 불가): {code}")
137 failed_codes.append(code)
138 continue
139 await self._streaming_service.subscribe_program_trading(code)
140 if self._streaming_logger: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 self._streaming_logger.log_pt_subscribe(code, reason="restore")
142 await self._streaming_service.subscribe_realtime_price(code)
143 if self._streaming_logger: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 self._streaming_logger.log_price_subscribe(code, reason="restore")
145 success_count += 1
146 # 증권사 Rate Limit 방지: 패킷 간 딜레이
147 await asyncio.sleep(SUBSCRIBE_DELAY_SEC)
148 except Exception as e:
149 self._logger.error(f"프로그램매매 복원 중 오류 ({code}): {e}")
150 failed_codes.append(code)
152 if failed_codes:
153 self._logger.warning(f"복원에 실패한 구독 종목을 상태에서 제거합니다: {failed_codes}")
154 for code in failed_codes:
155 self._realtime_data_service.remove_subscribed_code(code)
156 if self._streaming_logger: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 self._streaming_logger.log_pt_unsubscribe(code, reason="restore_failed")
158 self._streaming_logger.log_price_unsubscribe(code, reason="restore_failed")
160 if self._streaming_logger: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 self._streaming_logger.log_restore(
162 codes=codes,
163 success=success_count,
164 total=len(codes),
165 )
166 self._logger.info(f"프로그램매매 구독 복원 완료: {success_count}/{len(codes)}개 종목")
168 async def _program_trading_watchdog(self) -> None:
169 """프로그램매매 WebSocket 연결 상태를 주기적으로 감시하고, 데이터 수신이 끊기면 재연결."""
170 WATCHDOG_INTERVAL = 60 # 감시 주기 (초)
171 DATA_GAP_THRESHOLD = 300 # 데이터 미수신 허용 최대 시간 (초) — 소외주 오탐 방지를 위해 120→300
173 while True:
174 try:
175 await asyncio.sleep(WATCHDOG_INTERVAL)
177 # suspend 상태이면 감시 스킵
178 if self._state == TaskState.SUSPENDED: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 continue
181 if not self._realtime_data_service: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 continue
184 codes = self._realtime_data_service.get_subscribed_codes()
185 if not codes: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 continue # 구독 중인 종목 없으면 스킵
188 market_is_open = bool(self.mcs and await self.mcs.is_market_open_now())
189 self._market_open = market_is_open
190 if not market_is_open:
191 # 장 마감 시간이면 연결을 명시적으로 종료하여 리소스 정리
192 if self._streaming_service and self._streaming_service.broker.is_websocket_receive_alive():
193 self._logger.info("[워치독] 장 마감 시간이므로 웹소켓 연결을 종료합니다.")
194 await self._streaming_service.disconnect_websocket()
195 self._intentionally_disconnected = True
196 continue
198 # 조건 1: 수신 태스크가 죽었는지 확인
199 receive_alive = (
200 self._streaming_service is not None
201 and self._streaming_service.broker.is_websocket_receive_alive()
202 )
204 # 조건 2: 데이터 수신 갭 확인 (한 번이라도 데이터를 받은 적이 있을 때만)
205 last_ts = self._realtime_data_service.last_data_ts
206 data_gap = (time.time() - last_ts) if last_ts > 0 else 0.0
208 reconnect_trigger = None
209 if not receive_alive: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 if self._intentionally_disconnected:
211 self._logger.info("[워치독] 장 시작 — 신규 WebSocket 연결을 수립합니다.")
212 reconnect_trigger = "market_open"
213 else:
214 self._logger.warning("[워치독] WebSocket 수신 태스크가 종료됨. 재연결을 시도합니다.")
215 reconnect_trigger = "receive_task_dead"
216 elif last_ts > 0 and data_gap > DATA_GAP_THRESHOLD:
217 self._logger.warning(f"[워치독] {data_gap:.0f}초간 데이터 미수신 (임계값: {DATA_GAP_THRESHOLD}초). 재연결을 시도합니다.")
218 reconnect_trigger = f"data_gap_{data_gap:.0f}s"
220 if reconnect_trigger:
221 self._intentionally_disconnected = False
222 await self.force_reconnect_program_trading(trigger=reconnect_trigger)
224 except asyncio.CancelledError:
225 break
226 except Exception as e:
227 self._logger.error(f"[워치독] 오류 발생: {e}")
229 def get_progress(self) -> Dict:
230 """태스크 진행률 반환 (SchedulableTask 인터페이스 구현).
232 Watchdog 태스크는 배치 진행률이 없으므로 연결 상태 정보를 반환한다.
233 """
234 subscribed = 0
235 if self._realtime_data_service: 235 ↛ 239line 235 didn't jump to line 239 because the condition on line 235 was always true
236 codes = self._realtime_data_service.get_subscribed_codes()
237 subscribed = len(codes) if codes else 0
239 last_ts = 0.0
240 data_gap = None
241 if self._realtime_data_service: 241 ↛ 246line 241 didn't jump to line 246 because the condition on line 241 was always true
242 last_ts = getattr(self._realtime_data_service, "last_data_ts", 0.0)
243 if last_ts > 0:
244 data_gap = round(time.time() - last_ts, 1)
246 return {
247 "running": self._state == TaskState.RUNNING,
248 "subscribed_codes": subscribed,
249 "data_gap_sec": data_gap,
250 "market_open": self._market_open,
251 }
253 async def force_reconnect_program_trading(self, trigger: str = "manual") -> None:
254 """WebSocket 연결을 강제로 끊고 재연결 + 재구독.
256 Args:
257 trigger: 재연결 원인 ("receive_task_dead" | "data_gap_{N}s" | "manual")
258 """
259 if not self._realtime_data_service:
260 return
262 codes = self._realtime_data_service.get_subscribed_codes()
263 if not codes:
264 return
266 t_start = self.pm.start_timer()
267 self._logger.info(f"[워치독] 강제 재연결 시작 (구독 종목: {codes})")
268 try:
269 # 1. 기존 WebSocket 연결 강제 종료
270 await self._streaming_service.disconnect_websocket()
271 except Exception as e:
272 self._logger.warning(f"[워치독] 기존 연결 종료 중 오류 (무시): {e}")
274 # 2. 새 연결 + 재구독 (StreamingService가 내부에 콜백을 저장하므로 인자 없이 호출)
275 success_count = 0
276 failed_codes = []
277 for code in codes:
278 try:
279 connected = await self._streaming_service.connect_websocket()
280 if not connected:
281 self._logger.warning(f"[워치독] 재연결 실패: {code}")
282 failed_codes.append(code)
283 continue
284 await self._streaming_service.subscribe_program_trading(code)
285 if self._streaming_logger: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 self._streaming_logger.log_pt_subscribe(code, reason="reconnect")
287 await self._streaming_service.subscribe_realtime_price(code)
288 if self._streaming_logger: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true
289 self._streaming_logger.log_price_subscribe(code, reason="reconnect")
290 success_count += 1
291 # 증권사 Rate Limit 방지: 패킷 간 딜레이
292 await asyncio.sleep(SUBSCRIBE_DELAY_SEC)
293 except Exception as e:
294 self._logger.error(f"[워치독] 재구독 중 오류 ({code}): {e}")
295 failed_codes.append(code)
297 if failed_codes:
298 self._logger.warning(f"[워치독] 재구독에 실패한 종목을 상태에서 제거합니다: {failed_codes}")
299 for code in failed_codes:
300 self._realtime_data_service.remove_subscribed_code(code)
301 if self._streaming_logger: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 self._streaming_logger.log_pt_unsubscribe(code, reason="reconnect_failed")
303 self._streaming_logger.log_price_unsubscribe(code, reason="reconnect_failed")
305 if self._streaming_logger: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 self._streaming_logger.log_reconnect(
307 trigger=trigger,
308 codes=codes,
309 success=success_count,
310 total=len(codes),
311 )
312 self.pm.log_timer(f"WebSocketWatchdogTask.force_reconnect_program_trading({success_count}/{len(codes)})", t_start)
313 self._logger.info(f"[워치독] 강제 재연결 완료: {success_count}/{len(codes)}개 종목")