Coverage for services / notification_service.py: 96%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1"""중앙 집중형 알림 이벤트 관리자.
3모든 시스템 이벤트(매매 시그널, API 응답, 오류 등)를 수집하고
4SSE 구독자에게 실시간 전파한다.
5"""
6from __future__ import annotations
8import asyncio
9import json
10import uuid
11from dataclasses import dataclass, field, asdict
12from datetime import datetime
13from typing import Callable, Coroutine, Any, Dict, List, Optional
14from enum import Enum
16from core.market_clock import MarketClock
19class NotificationCategory(str, Enum):
20 STRATEGY = "STRATEGY"
21 BACKGROUND = "BACKGROUND"
22 TRADE = "TRADE"
23 API = "API"
24 SYSTEM = "SYSTEM"
26class NotificationLevel(str, Enum):
27 INFO = "info"
28 WARNING = "warning"
29 ERROR = "error"
30 CRITICAL = "critical"
33@dataclass
34class NotificationEvent:
35 """알림 이벤트."""
36 id: str
37 timestamp: str # ISO format (KST)
38 category: NotificationCategory
39 level: NotificationLevel
40 title: str
41 message: str
42 metadata: Dict[str, Any] = field(default_factory=dict)
44 def to_dict(self) -> dict:
45 return asdict(self)
48class NotificationService:
49 """시스템 전체 알림 이벤트 허브.
51 사용법:
52 nm = NotificationService(market_clock)
53 await nm.emit(NotificationCategory.TRADE, NotificationCategory.CIRITICAL, "매수 시그널", "삼성전자 72,000원")
54 """
56 MAX_HISTORY = 200
57 MAX_EXTERNAL_QUEUE_SIZE = 500
59 def __init__(self, market_clock: MarketClock):
60 self._market_clock = market_clock
61 self._history: List[NotificationEvent] = []
62 self._subscriber_queues: List[asyncio.Queue] = []
63 self._external_handlers: List[Callable[..., Coroutine[Any, Any, None]]] = []
64 self._external_handler_queue: asyncio.Queue = asyncio.Queue(maxsize=self.MAX_EXTERNAL_QUEUE_SIZE)
66 # ── 이벤트 발행 ──
68 async def emit(
69 self,
70 category: NotificationCategory,
71 level: NotificationLevel,
72 title: str,
73 message: str,
74 metadata: Optional[Dict[str, Any]] = None,
75 ) -> NotificationEvent:
76 """이벤트 생성 → 히스토리 저장 → 구독자 전파."""
77 event = NotificationEvent(
78 id=uuid.uuid4().hex[:12],
79 timestamp=self._market_clock.get_current_kst_time().isoformat(),
80 category=category,
81 level=level,
82 title=title,
83 message=message,
84 metadata=metadata or {},
85 )
87 self._history.append(event)
88 if len(self._history) > self.MAX_HISTORY:
89 self._history = self._history[-self.MAX_HISTORY:]
91 json_data = json.dumps(event.to_dict(), ensure_ascii=False)
92 for queue in list(self._subscriber_queues):
93 try:
94 queue.put_nowait(json_data)
95 except asyncio.QueueFull:
96 try:
97 queue.get_nowait()
98 queue.put_nowait(json_data)
99 except Exception:
100 pass
102 if self._external_handlers:
103 try:
104 self._external_handler_queue.put_nowait(event)
105 except asyncio.QueueFull:
106 try:
107 self._external_handler_queue.get_nowait()
108 self._external_handler_queue.put_nowait(event)
109 except Exception:
110 pass
112 return event
114 # ── SSE 구독자 관리 ──
116 def create_subscriber_queue(self) -> asyncio.Queue:
117 queue: asyncio.Queue = asyncio.Queue(maxsize=100)
118 self._subscriber_queues.append(queue)
119 return queue
121 def remove_subscriber_queue(self, queue: asyncio.Queue):
122 if queue in self._subscriber_queues:
123 self._subscriber_queues.remove(queue)
125 # ── 최근 이벤트 조회 ──
127 def get_recent(
128 self, count: int = 50, category: Optional[NotificationCategory] = None
129 ) -> List[dict]:
130 items = self._history
131 if category:
132 items = [e for e in items if e.category == category]
133 return [e.to_dict() for e in items[-count:]][::-1]
135 # ── 외부 핸들러 등록 (Telegram/Slack 등) ──
137 def register_external_handler(
138 self, handler: Callable[..., Coroutine[Any, Any, None]]
139 ):
140 self._external_handlers.append(handler)
142 @property
143 def external_handler_queue(self) -> asyncio.Queue:
144 """외부 핸들러 전달 대기 큐 (NotificationQueueTask가 소비)."""
145 return self._external_handler_queue
147 @property
148 def external_handlers(self):
149 """등록된 외부 핸들러 목록 (방어적 복사)."""
150 return list(self._external_handlers)