Coverage for services / price_subscription_service.py: 90%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# services/price_subscription_service.py
2"""
3실시간 현재가 구독 정책을 담당하는 서비스.
5역할:
6 - 여러 요청자(Portfolio, Strategy, UI)로부터 구독 요청을 받아 참조 카운팅으로 관리
7 - 우선순위(HIGH > MEDIUM > LOW) 기반으로 MAX_SUBSCRIPTIONS(35) 한도 내 최적 구독 유지
8 - 실제 WebSocket 구독/해지는 StreamingService에 위임
9 - 구독 활성화 시 StockRepository에 mark_streaming() 알림 (TTL 우회 활성화)
11StreamingService와의 역할 구분:
12 - PriceSubscriptionService : 무엇을, 왜 구독할지 결정 (정책 레이어)
13 - StreamingService : 어떻게 구독하는지 처리 (프로토콜 레이어)
15우선순위 카테고리:
16 - HIGH : 보유 종목 (Portfolio) — category_key: "portfolio"
17 - MEDIUM : 전략 감시 종목 (Strategy watchlist, premium stocks) — category_key: "strategy_*"
18 - LOW : 웹 UI 조회 종목 — category_key: "ui_*"
19"""
20from __future__ import annotations
22import logging
23import time
24from enum import IntEnum
25from typing import Dict, Set, List, Optional, TYPE_CHECKING
27if TYPE_CHECKING: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 from services.streaming_service import StreamingService
29 from repositories.stock_repository import StockRepository
30 from core.logger import StreamingEventLogger
33class SubscriptionPriority(IntEnum):
34 HIGH = 1 # Portfolio (보유 종목)
35 MEDIUM = 2 # Strategy watchlist / premium stocks
36 LOW = 3 # UI page view / watchlist page
39class PriceSubscriptionService:
40 """
41 우선순위 기반 실시간 현재가 구독 관리 서비스.
43 참조 카운팅:
44 - add_subscription(code, priority, category_key): 카운트 0→1이면 실제 구독
45 - remove_subscription(code, category_key): 카운트 1→0이면 실제 구독 해지
46 - sync_subscriptions(codes, category_key, priority): 카테고리 전체 원자적 교체
48 MAX 한도 초과 시:
49 - 전체 요청 종목을 우선순위로 정렬 후 상위 MAX_SUBSCRIPTIONS개만 구독
50 - 우선순위가 동일하면 종목코드 오름차순으로 결정적(deterministic) 선택
51 """
53 MAX_SUBSCRIPTIONS = 35
55 def __init__(
56 self,
57 streaming_service: "StreamingService",
58 stock_repo: "StockRepository",
59 logger=None,
60 streaming_logger: Optional["StreamingEventLogger"] = None,
61 ):
62 self._streaming = streaming_service
63 self._stock_repo = stock_repo
64 self._logger = logger or logging.getLogger(__name__)
65 self._streaming_logger = streaming_logger
67 # code -> {category_key -> SubscriptionPriority}
68 self._refs: Dict[str, Dict[str, SubscriptionPriority]] = {}
70 # 현재 실제로 WebSocket 구독 중인 종목 집합
71 self._active_codes: Set[str] = set()
73 # summary 로그 스로틀 (동시 다발적 rebalance 호출로 인한 중복 발화 방지)
74 self._last_summary_time: float = 0.0
75 self._SUMMARY_THROTTLE_SEC: float = 2.0
77 # ── Public API ─────────────────────────────────────────────────
79 async def add_subscription(
80 self, code: str, priority: SubscriptionPriority, category_key: str
81 ) -> None:
82 """특정 카테고리에서 종목 구독을 요청합니다."""
83 self._refs.setdefault(code, {})[category_key] = priority
84 await self._rebalance()
86 async def remove_subscription(self, code: str, category_key: str) -> None:
87 """특정 카테고리에서 종목 구독을 해제합니다."""
88 if code in self._refs:
89 self._refs[code].pop(category_key, None)
90 if not self._refs[code]:
91 del self._refs[code]
92 await self._rebalance()
94 async def remove_category(self, category_key: str) -> None:
95 """카테고리 전체의 구독을 한 번에 해제합니다 (전략 종료 시 사용)."""
96 codes_in_category = [c for c, cats in self._refs.items() if category_key in cats]
97 for code in codes_in_category:
98 self._refs[code].pop(category_key, None)
99 if not self._refs[code]: 99 ↛ 97line 99 didn't jump to line 97 because the condition on line 99 was always true
100 del self._refs[code]
101 await self._rebalance()
103 async def sync_subscriptions(
104 self,
105 codes: List[str],
106 category_key: str,
107 priority: SubscriptionPriority,
108 ) -> None:
109 """
110 카테고리 전체를 새 코드 목록으로 원자적으로 교체합니다.
111 전략 워치리스트 갱신 시 사용 — rebalance 1회만 호출.
112 """
113 old_codes = {c for c, cats in self._refs.items() if category_key in cats}
114 new_codes = set(codes)
116 for code in old_codes - new_codes:
117 self._refs[code].pop(category_key, None)
118 if not self._refs[code]: 118 ↛ 116line 118 didn't jump to line 116 because the condition on line 118 was always true
119 del self._refs[code]
121 for code in new_codes:
122 self._refs.setdefault(code, {})[category_key] = priority
124 await self._rebalance()
126 def is_streaming(self, code: str) -> bool:
127 """해당 종목이 현재 실시간 구독 중인지 여부."""
128 return code in self._active_codes
130 def get_status(self) -> dict:
131 """현재 구독 현황을 반환합니다 (모니터링/API 용)."""
132 pending_by_priority: Dict[int, List[str]] = {}
133 for code, cats in self._refs.items():
134 best = min(int(p) for p in cats.values())
135 pending_by_priority.setdefault(best, []).append(code)
137 return {
138 "active_count": len(self._active_codes),
139 "max_subscriptions": self.MAX_SUBSCRIPTIONS,
140 "active_codes": sorted(self._active_codes),
141 "pending_count": len(self._refs),
142 "pending_by_priority": {
143 "HIGH": sorted(pending_by_priority.get(int(SubscriptionPriority.HIGH), [])),
144 "MEDIUM": sorted(pending_by_priority.get(int(SubscriptionPriority.MEDIUM), [])),
145 "LOW": sorted(pending_by_priority.get(int(SubscriptionPriority.LOW), [])),
146 },
147 }
149 # ── Internal rebalance logic ────────────────────────────────────
151 async def _rebalance(self) -> None:
152 """
153 요청된 구독 목록을 우선순위로 정렬하여 MAX_SUBSCRIPTIONS개 이내로 유지.
154 변경이 필요한 종목만 구독/해지 처리.
155 """
156 def _best_priority(code: str) -> int:
157 return min(int(p) for p in self._refs[code].values())
159 ranked = sorted(self._refs.keys(), key=lambda c: (_best_priority(c), c))
160 desired: Set[str] = set(ranked[: self.MAX_SUBSCRIPTIONS])
162 to_unsubscribe = self._active_codes - desired
163 to_subscribe = desired - self._active_codes
165 for code in to_unsubscribe:
166 await self._do_unsubscribe(code)
168 for code in to_subscribe:
169 await self._do_subscribe(code)
171 # MAX 초과로 탈락된 종목이 있으면 경고 로그
172 dropped = len(self._refs) - len(desired)
173 if dropped > 0:
174 self._logger.warning(
175 f"PriceSubscriptionService: 구독 한도 초과 — {dropped}개 종목이 대기 상태 "
176 f"(active={len(self._active_codes)}, requested={len(self._refs)}, max={self.MAX_SUBSCRIPTIONS})"
177 )
179 # 변경이 있었을 때 현재 구독 상태 요약 기록 (2초 스로틀로 중복 발화 방지)
180 if (to_subscribe or to_unsubscribe) and self._streaming_logger:
181 now = time.monotonic()
182 if now - self._last_summary_time >= self._SUMMARY_THROTTLE_SEC: 182 ↛ exitline 182 didn't return from function '_rebalance' because the condition on line 182 was always true
183 self._last_summary_time = now
184 status = self.get_status()
185 self._streaming_logger.log_summary(
186 active_count=status["active_count"],
187 active_codes=status["active_codes"],
188 pending_by_priority=status["pending_by_priority"],
189 )
191 async def _do_subscribe(self, code: str) -> None:
192 try:
193 success = await self._streaming.subscribe_unified_price(code)
194 if success:
195 self._active_codes.add(code)
196 self._stock_repo.mark_streaming(code)
197 self._logger.debug(f"PriceSubscriptionService: 구독 등록 {code}")
198 if self._streaming_logger: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 categories = self._refs.get(code, {})
200 self._streaming_logger.log_subscribe(
201 code=code,
202 categories=categories,
203 active_count=len(self._active_codes),
204 )
205 else:
206 self._logger.warning(
207 f"PriceSubscriptionService: 구독 실패(False 반환) {code} "
208 f"— WebSocket 미연결 또는 브로커 거부 가능성"
209 )
210 except Exception as e:
211 self._logger.error(f"PriceSubscriptionService: 구독 실패 {code}: {e}")
213 async def _do_unsubscribe(self, code: str) -> None:
214 try:
215 await self._streaming.unsubscribe_unified_price(code)
216 self._active_codes.discard(code)
217 self._stock_repo.unmark_streaming(code)
218 self._logger.debug(f"PriceSubscriptionService: 구독 해지 {code}")
219 if self._streaming_logger: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 self._streaming_logger.log_unsubscribe(
221 code=code,
222 active_count=len(self._active_codes),
223 )
224 except Exception as e:
225 self._logger.error(f"PriceSubscriptionService: 구독 해지 실패 {code}: {e}")