Coverage for view / web / routes / notification.py: 100%
32 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1"""
2알림 센터 API 엔드포인트.
3"""
4import asyncio
5from typing import Optional
7from fastapi import APIRouter, Query, Request
8from fastapi.responses import StreamingResponse
10from view.web.api_common import _get_ctx
11from services.notification_service import NotificationCategory
13router = APIRouter()
16@router.get("/notifications/recent")
17async def get_recent_notifications(
18 category: Optional[NotificationCategory] = Query(None),
19 count: int = Query(50, ge=1, le=200),
20):
21 """최근 알림 목록 조회."""
22 ctx = _get_ctx()
23 items = ctx.notification_service.get_recent(count=count, category=category)
24 return {"notifications": items}
27@router.get("/notifications/stream")
28async def stream_notifications(request: Request):
29 """SSE 스트리밍: 알림 이벤트를 실시간으로 브라우저에 전달."""
30 ctx = _get_ctx()
31 queue = ctx.notification_service.create_subscriber_queue()
33 async def event_generator():
34 try:
35 while True:
36 try:
37 data = await asyncio.wait_for(queue.get(), timeout=15)
38 if data is None:
39 break
40 yield f"data: {data}\n\n"
41 except asyncio.TimeoutError:
42 if await request.is_disconnected():
43 break
44 yield ": keepalive\n\n"
45 except asyncio.CancelledError:
46 pass
47 finally:
48 ctx.notification_service.remove_subscriber_queue(queue)
50 return StreamingResponse(event_generator(), media_type="text/event-stream")