Coverage for view / web / routes / notification.py: 100%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1""" 

2알림 센터 API 엔드포인트. 

3""" 

4import asyncio 

5from typing import Optional 

6 

7from fastapi import APIRouter, Query, Request 

8from fastapi.responses import StreamingResponse 

9 

10from view.web.api_common import _get_ctx 

11from services.notification_service import NotificationCategory 

12 

13router = APIRouter() 

14 

15 

16@router.get("/notifications/recent") 

17async def get_recent_notifications( 

18 category: Optional[NotificationCategory] = Query(None), 

19 count: int = Query(50, ge=1, le=200), 

20): 

21 """최근 알림 목록 조회.""" 

22 ctx = _get_ctx() 

23 items = ctx.notification_service.get_recent(count=count, category=category) 

24 return {"notifications": items} 

25 

26 

27@router.get("/notifications/stream") 

28async def stream_notifications(request: Request): 

29 """SSE 스트리밍: 알림 이벤트를 실시간으로 브라우저에 전달.""" 

30 ctx = _get_ctx() 

31 queue = ctx.notification_service.create_subscriber_queue() 

32 

33 async def event_generator(): 

34 try: 

35 while True: 

36 try: 

37 data = await asyncio.wait_for(queue.get(), timeout=15) 

38 if data is None: 

39 break 

40 yield f"data: {data}\n\n" 

41 except asyncio.TimeoutError: 

42 if await request.is_disconnected(): 

43 break 

44 yield ": keepalive\n\n" 

45 except asyncio.CancelledError: 

46 pass 

47 finally: 

48 ctx.notification_service.remove_subscriber_queue(queue) 

49 

50 return StreamingResponse(event_generator(), media_type="text/event-stream")