Coverage for task / background / always_on / notification_queue_task.py: 89%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1""" 

2NotificationQueueTask — 알림 외부 핸들러(Telegram 등) 큐 소비 태스크. 

3 

4NotificationService.emit()이 enqueue한 이벤트를 저우선순위 백그라운드에서 

5순차적으로 소비하여 외부 핸들러에 전달한다. 

6 

7idle 감지 전략: 

8 Layer 1 — ForegroundScheduler가 포그라운드 작업 시작 시 BackgroundScheduler.suspend_all()을 

9 호출하므로, 이 태스크도 자동으로 SUSPENDED 상태로 전환된다. 

10 Layer 2 — asyncio cooperative scheduling: asyncio.wait_for(timeout=1.0) 대기 중 다른 

11 코루틴이 실행되고, 이벤트 처리 후 asyncio.sleep(poll_interval)로 추가 양보한다. 

12""" 

13from __future__ import annotations 

14 

15import asyncio 

16import logging 

17from typing import Dict, List, Optional, TYPE_CHECKING 

18 

19from interfaces.schedulable_task import SchedulableTask, TaskPriority, TaskState 

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 from services.notification_service import NotificationService 

23 

24 

25class NotificationQueueTask(SchedulableTask): 

26 """NotificationService의 외부 핸들러 큐를 소비하는 상시 동작 태스크.""" 

27 

28 def __init__( 

29 self, 

30 notification_service: "NotificationService", 

31 poll_interval: float = 1.0, 

32 logger: Optional[logging.Logger] = None, 

33 ) -> None: 

34 self._ns = notification_service 

35 self._poll_interval = poll_interval 

36 self._logger = logger or logging.getLogger(__name__) 

37 self._state: TaskState = TaskState.IDLE 

38 self._tasks: List[asyncio.Task] = [] 

39 self._resume_event: Optional[asyncio.Event] = None 

40 

41 # ── SchedulableTask interface ───────────────────────────────── 

42 

43 @property 

44 def task_name(self) -> str: 

45 return "notification_queue" 

46 

47 @property 

48 def priority(self) -> TaskPriority: 

49 return TaskPriority.LOW 

50 

51 @property 

52 def state(self) -> TaskState: 

53 return self._state 

54 

55 async def start(self) -> None: 

56 if self._state == TaskState.RUNNING: 

57 return 

58 self._resume_event = asyncio.Event() 

59 self._resume_event.set() # 초기 상태는 RUNNING이므로 set 

60 self._state = TaskState.RUNNING 

61 self._tasks.append(asyncio.create_task(self._drain_loop())) 

62 self._logger.info("NotificationQueueTask 시작") 

63 

64 async def stop(self) -> None: 

65 self._logger.info(f"NotificationQueueTask 종료 시작: {len(self._tasks)}개 태스크") 

66 # SUSPENDED 상태라면 drain_loop이 event.wait()에서 블로킹 중이므로 set() 후 cancel 

67 if self._resume_event is not None: 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was always true

68 self._resume_event.set() 

69 for task in self._tasks: 

70 if not task.done(): 70 ↛ 69line 70 didn't jump to line 69 because the condition on line 70 was always true

71 task.cancel() 

72 if self._tasks: 72 ↛ 74line 72 didn't jump to line 74 because the condition on line 72 was always true

73 await asyncio.gather(*self._tasks, return_exceptions=True) 

74 self._tasks.clear() 

75 self._state = TaskState.STOPPED 

76 self._logger.info("NotificationQueueTask 종료 완료") 

77 

78 async def suspend(self) -> None: 

79 """일시 중지: resume_event를 clear하여 drain_loop을 블로킹. 큐에는 이벤트가 계속 쌓임.""" 

80 if self._state == TaskState.RUNNING: 

81 self._state = TaskState.SUSPENDED 

82 if self._resume_event is not None: 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true

83 self._resume_event.clear() 

84 self._logger.info("NotificationQueueTask 일시 중지 (큐 누적 중)") 

85 

86 async def resume(self) -> None: 

87 """재개: resume_event를 set하여 drain_loop 블로킹 해제.""" 

88 if self._state == TaskState.SUSPENDED: 

89 self._state = TaskState.RUNNING 

90 if self._resume_event is not None: 90 ↛ 92line 90 didn't jump to line 92 because the condition on line 90 was always true

91 self._resume_event.set() 

92 self._logger.info("NotificationQueueTask 재개") 

93 

94 def get_progress(self) -> Dict: 

95 return { 

96 "running": self._state == TaskState.RUNNING, 

97 "queued_events": self._ns.external_handler_queue.qsize(), 

98 } 

99 

100 # ── Drain loop ──────────────────────────────────────────────── 

101 

102 async def _drain_loop(self) -> None: 

103 """큐에서 이벤트를 하나씩 꺼내 모든 외부 핸들러에 순차 전달한다.""" 

104 queue = self._ns.external_handler_queue 

105 while True: 

106 try: 

107 # SUSPENDED 상태에서는 event.wait()로 블로킹 (spin 없음) 

108 await self._resume_event.wait() 

109 

110 try: 

111 event = await asyncio.wait_for(queue.get(), timeout=1.0) 

112 except asyncio.TimeoutError: 

113 continue 

114 

115 for handler in self._ns.external_handlers: 

116 try: 

117 await handler(event) 

118 except Exception as e: 

119 self._logger.error( 

120 f"[NotificationQueueTask] 핸들러 오류 ({getattr(handler, '__name__', handler)}): {e}" 

121 ) 

122 

123 queue.task_done() 

124 await asyncio.sleep(self._poll_interval) 

125 

126 except asyncio.CancelledError: 

127 self._logger.info("NotificationQueueTask drain_loop 취소됨") 

128 break 

129 except Exception as e: 

130 self._logger.error(f"[NotificationQueueTask] drain_loop 예외: {e}", exc_info=True) 

131 await asyncio.sleep(1.0)