Coverage for repositories / stock_price_repository.py: 88%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# repositories/stock_price_repository.py
2"""
3현재가 인메모리 캐시 및 WebSocket 스트리밍 상태를 전담하는 Repository.
4- 용량 3000: KOSPI+KOSDAQ 전종목을 커버하여 대량 스캔 시에도 eviction 없음
5- TTL: streaming 종목은 ∞, non-streaming 종목은 기본 3초
6"""
7import time
8import logging
9from typing import Optional, TYPE_CHECKING
11from repositories.cache import _LRUCache
13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true
14 from core.logger import CacheEventLogger
16_PRICE_CACHE_CAPACITY = 3000
19class StockPriceRepository:
20 """현재가 캐시 및 WebSocket 스트리밍 TTL 관리 저장소."""
22 def __init__(self, logger=None, cache_logger: "CacheEventLogger | None" = None):
23 self._logger = logger or logging.getLogger(__name__)
24 self._cache_logger = cache_logger
25 # 전종목(~2300) + 여유분을 수용하는 현재가 전용 캐시
26 self._price_cache = _LRUCache(
27 capacity=_PRICE_CACHE_CAPACITY,
28 on_evict=self._on_price_evicted,
29 )
30 # 현재 WebSocket으로 실시간 스트리밍 중인 종목 코드 집합
31 self._streaming_codes: set = set()
33 def _on_price_evicted(self, code: str) -> None:
34 if self._cache_logger: 34 ↛ exitline 34 didn't return from function '_on_price_evicted' because the condition on line 34 was always true
35 self._cache_logger.log_price_evicted(code, capacity=self._price_cache.capacity)
37 def set_current_price(self, code: str, price_data: dict):
38 """현재가 API 응답 전체 데이터를 캐시에 저장합니다."""
39 cached = self._price_cache.get(code, count_stats=False, item_type="set_price")
40 is_new = cached is None
41 if self._cache_logger: 41 ↛ 53line 41 didn't jump to line 53 because the condition on line 41 was always true
42 before_price = None
43 if not is_new and isinstance(cached, dict):
44 existing = cached.get("current_price_data")
45 if isinstance(existing, dict): 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true
46 _out = existing.get("output", {})
47 before_price = (_out.get("stck_prpr") if isinstance(_out, dict) else getattr(_out, "stck_prpr", None)) if "output" in existing else existing.get("stck_prpr")
48 after_price = None
49 if isinstance(price_data, dict): 49 ↛ 52line 49 didn't jump to line 52 because the condition on line 49 was always true
50 _out = price_data.get("output", {})
51 after_price = (_out.get("stck_prpr") if isinstance(_out, dict) else getattr(_out, "stck_prpr", None)) if "output" in price_data else price_data.get("stck_prpr")
52 self._cache_logger.log_price_set(code, "api", before_price, after_price, is_new)
53 if not cached:
54 cached = {}
55 self._price_cache.put(code, cached)
56 cached["current_price_data"] = price_data
57 cached["price_updated_at"] = time.time()
59 def get_current_price(self, code: str, max_age_sec: float = 3.0,
60 count_stats: bool = True, caller: str = "unknown") -> Optional[dict]:
61 """캐시된 현재가 데이터를 반환합니다. TTL 만료 시 None 반환."""
62 cached = self._price_cache.get(code, count_stats=count_stats,
63 caller=caller, item_type="current_price")
64 if cached and "current_price_data" in cached:
65 is_streaming = code in self._streaming_codes
66 effective_max_age = float('inf') if is_streaming else max_age_sec
67 age_sec = time.time() - cached.get("price_updated_at", 0)
68 if age_sec <= effective_max_age:
69 if self._cache_logger and count_stats: 69 ↛ 71line 69 didn't jump to line 71 because the condition on line 69 was always true
70 self._cache_logger.log_price_hit(code, caller, age_sec, is_streaming)
71 return cached["current_price_data"]
72 if self._cache_logger and count_stats: 72 ↛ 74line 72 didn't jump to line 74 because the condition on line 72 was always true
73 self._cache_logger.log_price_miss(code, caller, "ttl_expired")
74 return None
75 if self._cache_logger and count_stats: 75 ↛ 77line 75 didn't jump to line 77 because the condition on line 75 was always true
76 self._cache_logger.log_price_miss(code, caller, "not_found")
77 return None
79 def update_current_price(self, code: str, current_price: float, volume: int = 0):
80 """WebSocket 틱 데이터로 현재가 캐시를 즉시 갱신합니다."""
81 cached = self._price_cache.get(code, count_stats=False, item_type="update_tick")
82 if not cached:
83 cached = {}
84 self._price_cache.put(code, cached)
86 if "current_price_data" not in cached:
87 cached["current_price_data"] = {"output": {}}
89 output = cached["current_price_data"].get("output")
90 before_price = None
91 if isinstance(output, dict):
92 before_price = output.get("stck_prpr")
93 output["stck_prpr"] = str(int(current_price))
94 if volume > 0:
95 output["acml_vol"] = str(volume)
96 elif output is not None: 96 ↛ 105line 96 didn't jump to line 105 because the condition on line 96 was always true
97 try:
98 before_price = getattr(output, "stck_prpr", None)
99 setattr(output, "stck_prpr", str(int(current_price)))
100 if volume > 0: 100 ↛ 105line 100 didn't jump to line 105 because the condition on line 100 was always true
101 setattr(output, "acml_vol", str(volume))
102 except Exception:
103 pass
105 cached["price_updated_at"] = time.time()
106 if self._cache_logger and before_price != str(int(current_price)):
107 self._cache_logger.log_price_update_tick(
108 code, before_price, str(int(current_price)), volume
109 )
111 def mark_streaming(self, code: str) -> None:
112 """해당 종목이 실시간 스트리밍 중임을 등록. TTL 우회 활성화."""
113 self._streaming_codes.add(code)
114 if self._cache_logger: 114 ↛ exitline 114 didn't return from function 'mark_streaming' because the condition on line 114 was always true
115 self._cache_logger.log_streaming_mark(code, len(self._streaming_codes))
117 def unmark_streaming(self, code: str) -> None:
118 """실시간 스트리밍 종료. TTL 우회 해제."""
119 self._streaming_codes.discard(code)
120 if self._cache_logger: 120 ↛ exitline 120 didn't return from function 'unmark_streaming' because the condition on line 120 was always true
121 self._cache_logger.log_streaming_unmark(code, len(self._streaming_codes))
123 def is_streaming(self, code: str) -> bool:
124 """해당 종목이 현재 스트리밍 중인지 여부."""
125 return code in self._streaming_codes
127 def get_cache_stats(self, expand: bool = False) -> dict:
128 """현재가 캐시 통계를 반환합니다."""
129 return self._price_cache.get_stats(expand=expand)