Coverage for services / price_subscription_service.py: 90%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# services/price_subscription_service.py 

2""" 

3실시간 현재가 구독 정책을 담당하는 서비스. 

4 

5역할: 

6 - 여러 요청자(Portfolio, Strategy, UI)로부터 구독 요청을 받아 참조 카운팅으로 관리 

7 - 우선순위(HIGH > MEDIUM > LOW) 기반으로 MAX_SUBSCRIPTIONS(35) 한도 내 최적 구독 유지 

8 - 실제 WebSocket 구독/해지는 StreamingService에 위임 

9 - 구독 활성화 시 StockRepository에 mark_streaming() 알림 (TTL 우회 활성화) 

10 

11StreamingService와의 역할 구분: 

12 - PriceSubscriptionService : 무엇을, 왜 구독할지 결정 (정책 레이어) 

13 - StreamingService : 어떻게 구독하는지 처리 (프로토콜 레이어) 

14 

15우선순위 카테고리: 

16 - HIGH : 보유 종목 (Portfolio) — category_key: "portfolio" 

17 - MEDIUM : 전략 감시 종목 (Strategy watchlist, premium stocks) — category_key: "strategy_*" 

18 - LOW : 웹 UI 조회 종목 — category_key: "ui_*" 

19""" 

20from __future__ import annotations 

21 

22import logging 

23import time 

24from enum import IntEnum 

25from typing import Dict, Set, List, Optional, TYPE_CHECKING 

26 

27if TYPE_CHECKING: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 from services.streaming_service import StreamingService 

29 from repositories.stock_repository import StockRepository 

30 from core.logger import StreamingEventLogger 

31 

32 

33class SubscriptionPriority(IntEnum): 

34 HIGH = 1 # Portfolio (보유 종목) 

35 MEDIUM = 2 # Strategy watchlist / premium stocks 

36 LOW = 3 # UI page view / watchlist page 

37 

38 

39class PriceSubscriptionService: 

40 """ 

41 우선순위 기반 실시간 현재가 구독 관리 서비스. 

42 

43 참조 카운팅: 

44 - add_subscription(code, priority, category_key): 카운트 0→1이면 실제 구독 

45 - remove_subscription(code, category_key): 카운트 1→0이면 실제 구독 해지 

46 - sync_subscriptions(codes, category_key, priority): 카테고리 전체 원자적 교체 

47 

48 MAX 한도 초과 시: 

49 - 전체 요청 종목을 우선순위로 정렬 후 상위 MAX_SUBSCRIPTIONS개만 구독 

50 - 우선순위가 동일하면 종목코드 오름차순으로 결정적(deterministic) 선택 

51 """ 

52 

53 MAX_SUBSCRIPTIONS = 35 

54 

55 def __init__( 

56 self, 

57 streaming_service: "StreamingService", 

58 stock_repo: "StockRepository", 

59 logger=None, 

60 streaming_logger: Optional["StreamingEventLogger"] = None, 

61 ): 

62 self._streaming = streaming_service 

63 self._stock_repo = stock_repo 

64 self._logger = logger or logging.getLogger(__name__) 

65 self._streaming_logger = streaming_logger 

66 

67 # code -> {category_key -> SubscriptionPriority} 

68 self._refs: Dict[str, Dict[str, SubscriptionPriority]] = {} 

69 

70 # 현재 실제로 WebSocket 구독 중인 종목 집합 

71 self._active_codes: Set[str] = set() 

72 

73 # summary 로그 스로틀 (동시 다발적 rebalance 호출로 인한 중복 발화 방지) 

74 self._last_summary_time: float = 0.0 

75 self._SUMMARY_THROTTLE_SEC: float = 2.0 

76 

77 # ── Public API ───────────────────────────────────────────────── 

78 

79 async def add_subscription( 

80 self, code: str, priority: SubscriptionPriority, category_key: str 

81 ) -> None: 

82 """특정 카테고리에서 종목 구독을 요청합니다.""" 

83 self._refs.setdefault(code, {})[category_key] = priority 

84 await self._rebalance() 

85 

86 async def remove_subscription(self, code: str, category_key: str) -> None: 

87 """특정 카테고리에서 종목 구독을 해제합니다.""" 

88 if code in self._refs: 

89 self._refs[code].pop(category_key, None) 

90 if not self._refs[code]: 

91 del self._refs[code] 

92 await self._rebalance() 

93 

94 async def remove_category(self, category_key: str) -> None: 

95 """카테고리 전체의 구독을 한 번에 해제합니다 (전략 종료 시 사용).""" 

96 codes_in_category = [c for c, cats in self._refs.items() if category_key in cats] 

97 for code in codes_in_category: 

98 self._refs[code].pop(category_key, None) 

99 if not self._refs[code]: 99 ↛ 97line 99 didn't jump to line 97 because the condition on line 99 was always true

100 del self._refs[code] 

101 await self._rebalance() 

102 

103 async def sync_subscriptions( 

104 self, 

105 codes: List[str], 

106 category_key: str, 

107 priority: SubscriptionPriority, 

108 ) -> None: 

109 """ 

110 카테고리 전체를 새 코드 목록으로 원자적으로 교체합니다. 

111 전략 워치리스트 갱신 시 사용 — rebalance 1회만 호출. 

112 """ 

113 old_codes = {c for c, cats in self._refs.items() if category_key in cats} 

114 new_codes = set(codes) 

115 

116 for code in old_codes - new_codes: 

117 self._refs[code].pop(category_key, None) 

118 if not self._refs[code]: 118 ↛ 116line 118 didn't jump to line 116 because the condition on line 118 was always true

119 del self._refs[code] 

120 

121 for code in new_codes: 

122 self._refs.setdefault(code, {})[category_key] = priority 

123 

124 await self._rebalance() 

125 

126 def is_streaming(self, code: str) -> bool: 

127 """해당 종목이 현재 실시간 구독 중인지 여부.""" 

128 return code in self._active_codes 

129 

130 def get_status(self) -> dict: 

131 """현재 구독 현황을 반환합니다 (모니터링/API 용).""" 

132 pending_by_priority: Dict[int, List[str]] = {} 

133 for code, cats in self._refs.items(): 

134 best = min(int(p) for p in cats.values()) 

135 pending_by_priority.setdefault(best, []).append(code) 

136 

137 return { 

138 "active_count": len(self._active_codes), 

139 "max_subscriptions": self.MAX_SUBSCRIPTIONS, 

140 "active_codes": sorted(self._active_codes), 

141 "pending_count": len(self._refs), 

142 "pending_by_priority": { 

143 "HIGH": sorted(pending_by_priority.get(int(SubscriptionPriority.HIGH), [])), 

144 "MEDIUM": sorted(pending_by_priority.get(int(SubscriptionPriority.MEDIUM), [])), 

145 "LOW": sorted(pending_by_priority.get(int(SubscriptionPriority.LOW), [])), 

146 }, 

147 } 

148 

149 # ── Internal rebalance logic ──────────────────────────────────── 

150 

151 async def _rebalance(self) -> None: 

152 """ 

153 요청된 구독 목록을 우선순위로 정렬하여 MAX_SUBSCRIPTIONS개 이내로 유지. 

154 변경이 필요한 종목만 구독/해지 처리. 

155 """ 

156 def _best_priority(code: str) -> int: 

157 return min(int(p) for p in self._refs[code].values()) 

158 

159 ranked = sorted(self._refs.keys(), key=lambda c: (_best_priority(c), c)) 

160 desired: Set[str] = set(ranked[: self.MAX_SUBSCRIPTIONS]) 

161 

162 to_unsubscribe = self._active_codes - desired 

163 to_subscribe = desired - self._active_codes 

164 

165 for code in to_unsubscribe: 

166 await self._do_unsubscribe(code) 

167 

168 for code in to_subscribe: 

169 await self._do_subscribe(code) 

170 

171 # MAX 초과로 탈락된 종목이 있으면 경고 로그 

172 dropped = len(self._refs) - len(desired) 

173 if dropped > 0: 

174 self._logger.warning( 

175 f"PriceSubscriptionService: 구독 한도 초과 — {dropped}개 종목이 대기 상태 " 

176 f"(active={len(self._active_codes)}, requested={len(self._refs)}, max={self.MAX_SUBSCRIPTIONS})" 

177 ) 

178 

179 # 변경이 있었을 때 현재 구독 상태 요약 기록 (2초 스로틀로 중복 발화 방지) 

180 if (to_subscribe or to_unsubscribe) and self._streaming_logger: 

181 now = time.monotonic() 

182 if now - self._last_summary_time >= self._SUMMARY_THROTTLE_SEC: 182 ↛ exitline 182 didn't return from function '_rebalance' because the condition on line 182 was always true

183 self._last_summary_time = now 

184 status = self.get_status() 

185 self._streaming_logger.log_summary( 

186 active_count=status["active_count"], 

187 active_codes=status["active_codes"], 

188 pending_by_priority=status["pending_by_priority"], 

189 ) 

190 

191 async def _do_subscribe(self, code: str) -> None: 

192 try: 

193 success = await self._streaming.subscribe_unified_price(code) 

194 if success: 

195 self._active_codes.add(code) 

196 self._stock_repo.mark_streaming(code) 

197 self._logger.debug(f"PriceSubscriptionService: 구독 등록 {code}") 

198 if self._streaming_logger: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 categories = self._refs.get(code, {}) 

200 self._streaming_logger.log_subscribe( 

201 code=code, 

202 categories=categories, 

203 active_count=len(self._active_codes), 

204 ) 

205 else: 

206 self._logger.warning( 

207 f"PriceSubscriptionService: 구독 실패(False 반환) {code} " 

208 f"— WebSocket 미연결 또는 브로커 거부 가능성" 

209 ) 

210 except Exception as e: 

211 self._logger.error(f"PriceSubscriptionService: 구독 실패 {code}: {e}") 

212 

213 async def _do_unsubscribe(self, code: str) -> None: 

214 try: 

215 await self._streaming.unsubscribe_unified_price(code) 

216 self._active_codes.discard(code) 

217 self._stock_repo.unmark_streaming(code) 

218 self._logger.debug(f"PriceSubscriptionService: 구독 해지 {code}") 

219 if self._streaming_logger: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 self._streaming_logger.log_unsubscribe( 

221 code=code, 

222 active_count=len(self._active_codes), 

223 ) 

224 except Exception as e: 

225 self._logger.error(f"PriceSubscriptionService: 구독 해지 실패 {code}: {e}")