Coverage for services / notification_service.py: 96%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1"""중앙 집중형 알림 이벤트 관리자. 

2 

3모든 시스템 이벤트(매매 시그널, API 응답, 오류 등)를 수집하고 

4SSE 구독자에게 실시간 전파한다. 

5""" 

6from __future__ import annotations 

7 

8import asyncio 

9import json 

10import uuid 

11from dataclasses import dataclass, field, asdict 

12from datetime import datetime 

13from typing import Callable, Coroutine, Any, Dict, List, Optional 

14from enum import Enum 

15 

16from core.market_clock import MarketClock 

17 

18 

19class NotificationCategory(str, Enum): 

20 STRATEGY = "STRATEGY" 

21 BACKGROUND = "BACKGROUND" 

22 TRADE = "TRADE" 

23 API = "API" 

24 SYSTEM = "SYSTEM" 

25 

26class NotificationLevel(str, Enum): 

27 INFO = "info" 

28 WARNING = "warning" 

29 ERROR = "error" 

30 CRITICAL = "critical" 

31 

32 

33@dataclass 

34class NotificationEvent: 

35 """알림 이벤트.""" 

36 id: str 

37 timestamp: str # ISO format (KST) 

38 category: NotificationCategory 

39 level: NotificationLevel 

40 title: str 

41 message: str 

42 metadata: Dict[str, Any] = field(default_factory=dict) 

43 

44 def to_dict(self) -> dict: 

45 return asdict(self) 

46 

47 

48class NotificationService: 

49 """시스템 전체 알림 이벤트 허브. 

50 

51 사용법: 

52 nm = NotificationService(market_clock) 

53 await nm.emit(NotificationCategory.TRADE, NotificationCategory.CIRITICAL, "매수 시그널", "삼성전자 72,000원") 

54 """ 

55 

56 MAX_HISTORY = 200 

57 MAX_EXTERNAL_QUEUE_SIZE = 500 

58 

59 def __init__(self, market_clock: MarketClock): 

60 self._market_clock = market_clock 

61 self._history: List[NotificationEvent] = [] 

62 self._subscriber_queues: List[asyncio.Queue] = [] 

63 self._external_handlers: List[Callable[..., Coroutine[Any, Any, None]]] = [] 

64 self._external_handler_queue: asyncio.Queue = asyncio.Queue(maxsize=self.MAX_EXTERNAL_QUEUE_SIZE) 

65 

66 # ── 이벤트 발행 ── 

67 

68 async def emit( 

69 self, 

70 category: NotificationCategory, 

71 level: NotificationLevel, 

72 title: str, 

73 message: str, 

74 metadata: Optional[Dict[str, Any]] = None, 

75 ) -> NotificationEvent: 

76 """이벤트 생성 → 히스토리 저장 → 구독자 전파.""" 

77 event = NotificationEvent( 

78 id=uuid.uuid4().hex[:12], 

79 timestamp=self._market_clock.get_current_kst_time().isoformat(), 

80 category=category, 

81 level=level, 

82 title=title, 

83 message=message, 

84 metadata=metadata or {}, 

85 ) 

86 

87 self._history.append(event) 

88 if len(self._history) > self.MAX_HISTORY: 

89 self._history = self._history[-self.MAX_HISTORY:] 

90 

91 json_data = json.dumps(event.to_dict(), ensure_ascii=False) 

92 for queue in list(self._subscriber_queues): 

93 try: 

94 queue.put_nowait(json_data) 

95 except asyncio.QueueFull: 

96 try: 

97 queue.get_nowait() 

98 queue.put_nowait(json_data) 

99 except Exception: 

100 pass 

101 

102 if self._external_handlers: 

103 try: 

104 self._external_handler_queue.put_nowait(event) 

105 except asyncio.QueueFull: 

106 try: 

107 self._external_handler_queue.get_nowait() 

108 self._external_handler_queue.put_nowait(event) 

109 except Exception: 

110 pass 

111 

112 return event 

113 

114 # ── SSE 구독자 관리 ── 

115 

116 def create_subscriber_queue(self) -> asyncio.Queue: 

117 queue: asyncio.Queue = asyncio.Queue(maxsize=100) 

118 self._subscriber_queues.append(queue) 

119 return queue 

120 

121 def remove_subscriber_queue(self, queue: asyncio.Queue): 

122 if queue in self._subscriber_queues: 

123 self._subscriber_queues.remove(queue) 

124 

125 # ── 최근 이벤트 조회 ── 

126 

127 def get_recent( 

128 self, count: int = 50, category: Optional[NotificationCategory] = None 

129 ) -> List[dict]: 

130 items = self._history 

131 if category: 

132 items = [e for e in items if e.category == category] 

133 return [e.to_dict() for e in items[-count:]][::-1] 

134 

135 # ── 외부 핸들러 등록 (Telegram/Slack 등) ── 

136 

137 def register_external_handler( 

138 self, handler: Callable[..., Coroutine[Any, Any, None]] 

139 ): 

140 self._external_handlers.append(handler) 

141 

142 @property 

143 def external_handler_queue(self) -> asyncio.Queue: 

144 """외부 핸들러 전달 대기 큐 (NotificationQueueTask가 소비).""" 

145 return self._external_handler_queue 

146 

147 @property 

148 def external_handlers(self): 

149 """등록된 외부 핸들러 목록 (방어적 복사).""" 

150 return list(self._external_handlers)