Coverage for task / background / always_on / notification_queue_task.py: 89%
80 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1"""
2NotificationQueueTask — 알림 외부 핸들러(Telegram 등) 큐 소비 태스크.
4NotificationService.emit()이 enqueue한 이벤트를 저우선순위 백그라운드에서
5순차적으로 소비하여 외부 핸들러에 전달한다.
7idle 감지 전략:
8 Layer 1 — ForegroundScheduler가 포그라운드 작업 시작 시 BackgroundScheduler.suspend_all()을
9 호출하므로, 이 태스크도 자동으로 SUSPENDED 상태로 전환된다.
10 Layer 2 — asyncio cooperative scheduling: asyncio.wait_for(timeout=1.0) 대기 중 다른
11 코루틴이 실행되고, 이벤트 처리 후 asyncio.sleep(poll_interval)로 추가 양보한다.
12"""
13from __future__ import annotations
15import asyncio
16import logging
17from typing import Dict, List, Optional, TYPE_CHECKING
19from interfaces.schedulable_task import SchedulableTask, TaskPriority, TaskState
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 from services.notification_service import NotificationService
25class NotificationQueueTask(SchedulableTask):
26 """NotificationService의 외부 핸들러 큐를 소비하는 상시 동작 태스크."""
28 def __init__(
29 self,
30 notification_service: "NotificationService",
31 poll_interval: float = 1.0,
32 logger: Optional[logging.Logger] = None,
33 ) -> None:
34 self._ns = notification_service
35 self._poll_interval = poll_interval
36 self._logger = logger or logging.getLogger(__name__)
37 self._state: TaskState = TaskState.IDLE
38 self._tasks: List[asyncio.Task] = []
39 self._resume_event: Optional[asyncio.Event] = None
41 # ── SchedulableTask interface ─────────────────────────────────
43 @property
44 def task_name(self) -> str:
45 return "notification_queue"
47 @property
48 def priority(self) -> TaskPriority:
49 return TaskPriority.LOW
51 @property
52 def state(self) -> TaskState:
53 return self._state
55 async def start(self) -> None:
56 if self._state == TaskState.RUNNING:
57 return
58 self._resume_event = asyncio.Event()
59 self._resume_event.set() # 초기 상태는 RUNNING이므로 set
60 self._state = TaskState.RUNNING
61 self._tasks.append(asyncio.create_task(self._drain_loop()))
62 self._logger.info("NotificationQueueTask 시작")
64 async def stop(self) -> None:
65 self._logger.info(f"NotificationQueueTask 종료 시작: {len(self._tasks)}개 태스크")
66 # SUSPENDED 상태라면 drain_loop이 event.wait()에서 블로킹 중이므로 set() 후 cancel
67 if self._resume_event is not None: 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was always true
68 self._resume_event.set()
69 for task in self._tasks:
70 if not task.done(): 70 ↛ 69line 70 didn't jump to line 69 because the condition on line 70 was always true
71 task.cancel()
72 if self._tasks: 72 ↛ 74line 72 didn't jump to line 74 because the condition on line 72 was always true
73 await asyncio.gather(*self._tasks, return_exceptions=True)
74 self._tasks.clear()
75 self._state = TaskState.STOPPED
76 self._logger.info("NotificationQueueTask 종료 완료")
78 async def suspend(self) -> None:
79 """일시 중지: resume_event를 clear하여 drain_loop을 블로킹. 큐에는 이벤트가 계속 쌓임."""
80 if self._state == TaskState.RUNNING:
81 self._state = TaskState.SUSPENDED
82 if self._resume_event is not None: 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true
83 self._resume_event.clear()
84 self._logger.info("NotificationQueueTask 일시 중지 (큐 누적 중)")
86 async def resume(self) -> None:
87 """재개: resume_event를 set하여 drain_loop 블로킹 해제."""
88 if self._state == TaskState.SUSPENDED:
89 self._state = TaskState.RUNNING
90 if self._resume_event is not None: 90 ↛ 92line 90 didn't jump to line 92 because the condition on line 90 was always true
91 self._resume_event.set()
92 self._logger.info("NotificationQueueTask 재개")
94 def get_progress(self) -> Dict:
95 return {
96 "running": self._state == TaskState.RUNNING,
97 "queued_events": self._ns.external_handler_queue.qsize(),
98 }
100 # ── Drain loop ────────────────────────────────────────────────
102 async def _drain_loop(self) -> None:
103 """큐에서 이벤트를 하나씩 꺼내 모든 외부 핸들러에 순차 전달한다."""
104 queue = self._ns.external_handler_queue
105 while True:
106 try:
107 # SUSPENDED 상태에서는 event.wait()로 블로킹 (spin 없음)
108 await self._resume_event.wait()
110 try:
111 event = await asyncio.wait_for(queue.get(), timeout=1.0)
112 except asyncio.TimeoutError:
113 continue
115 for handler in self._ns.external_handlers:
116 try:
117 await handler(event)
118 except Exception as e:
119 self._logger.error(
120 f"[NotificationQueueTask] 핸들러 오류 ({getattr(handler, '__name__', handler)}): {e}"
121 )
123 queue.task_done()
124 await asyncio.sleep(self._poll_interval)
126 except asyncio.CancelledError:
127 self._logger.info("NotificationQueueTask drain_loop 취소됨")
128 break
129 except Exception as e:
130 self._logger.error(f"[NotificationQueueTask] drain_loop 예외: {e}", exc_info=True)
131 await asyncio.sleep(1.0)