Coverage for repositories / stock_price_repository.py: 88%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# repositories/stock_price_repository.py 

2""" 

3현재가 인메모리 캐시 및 WebSocket 스트리밍 상태를 전담하는 Repository. 

4- 용량 3000: KOSPI+KOSDAQ 전종목을 커버하여 대량 스캔 시에도 eviction 없음 

5- TTL: streaming 종목은 ∞, non-streaming 종목은 기본 3초 

6""" 

7import time 

8import logging 

9from typing import Optional, TYPE_CHECKING 

10 

11from repositories.cache import _LRUCache 

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true

14 from core.logger import CacheEventLogger 

15 

16_PRICE_CACHE_CAPACITY = 3000 

17 

18 

19class StockPriceRepository: 

20 """현재가 캐시 및 WebSocket 스트리밍 TTL 관리 저장소.""" 

21 

22 def __init__(self, logger=None, cache_logger: "CacheEventLogger | None" = None): 

23 self._logger = logger or logging.getLogger(__name__) 

24 self._cache_logger = cache_logger 

25 # 전종목(~2300) + 여유분을 수용하는 현재가 전용 캐시 

26 self._price_cache = _LRUCache( 

27 capacity=_PRICE_CACHE_CAPACITY, 

28 on_evict=self._on_price_evicted, 

29 ) 

30 # 현재 WebSocket으로 실시간 스트리밍 중인 종목 코드 집합 

31 self._streaming_codes: set = set() 

32 

33 def _on_price_evicted(self, code: str) -> None: 

34 if self._cache_logger: 34 ↛ exitline 34 didn't return from function '_on_price_evicted' because the condition on line 34 was always true

35 self._cache_logger.log_price_evicted(code, capacity=self._price_cache.capacity) 

36 

37 def set_current_price(self, code: str, price_data: dict): 

38 """현재가 API 응답 전체 데이터를 캐시에 저장합니다.""" 

39 cached = self._price_cache.get(code, count_stats=False, item_type="set_price") 

40 is_new = cached is None 

41 if self._cache_logger: 41 ↛ 53line 41 didn't jump to line 53 because the condition on line 41 was always true

42 before_price = None 

43 if not is_new and isinstance(cached, dict): 

44 existing = cached.get("current_price_data") 

45 if isinstance(existing, dict): 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true

46 _out = existing.get("output", {}) 

47 before_price = (_out.get("stck_prpr") if isinstance(_out, dict) else getattr(_out, "stck_prpr", None)) if "output" in existing else existing.get("stck_prpr") 

48 after_price = None 

49 if isinstance(price_data, dict): 49 ↛ 52line 49 didn't jump to line 52 because the condition on line 49 was always true

50 _out = price_data.get("output", {}) 

51 after_price = (_out.get("stck_prpr") if isinstance(_out, dict) else getattr(_out, "stck_prpr", None)) if "output" in price_data else price_data.get("stck_prpr") 

52 self._cache_logger.log_price_set(code, "api", before_price, after_price, is_new) 

53 if not cached: 

54 cached = {} 

55 self._price_cache.put(code, cached) 

56 cached["current_price_data"] = price_data 

57 cached["price_updated_at"] = time.time() 

58 

59 def get_current_price(self, code: str, max_age_sec: float = 3.0, 

60 count_stats: bool = True, caller: str = "unknown") -> Optional[dict]: 

61 """캐시된 현재가 데이터를 반환합니다. TTL 만료 시 None 반환.""" 

62 cached = self._price_cache.get(code, count_stats=count_stats, 

63 caller=caller, item_type="current_price") 

64 if cached and "current_price_data" in cached: 

65 is_streaming = code in self._streaming_codes 

66 effective_max_age = float('inf') if is_streaming else max_age_sec 

67 age_sec = time.time() - cached.get("price_updated_at", 0) 

68 if age_sec <= effective_max_age: 

69 if self._cache_logger and count_stats: 69 ↛ 71line 69 didn't jump to line 71 because the condition on line 69 was always true

70 self._cache_logger.log_price_hit(code, caller, age_sec, is_streaming) 

71 return cached["current_price_data"] 

72 if self._cache_logger and count_stats: 72 ↛ 74line 72 didn't jump to line 74 because the condition on line 72 was always true

73 self._cache_logger.log_price_miss(code, caller, "ttl_expired") 

74 return None 

75 if self._cache_logger and count_stats: 75 ↛ 77line 75 didn't jump to line 77 because the condition on line 75 was always true

76 self._cache_logger.log_price_miss(code, caller, "not_found") 

77 return None 

78 

79 def update_current_price(self, code: str, current_price: float, volume: int = 0): 

80 """WebSocket 틱 데이터로 현재가 캐시를 즉시 갱신합니다.""" 

81 cached = self._price_cache.get(code, count_stats=False, item_type="update_tick") 

82 if not cached: 

83 cached = {} 

84 self._price_cache.put(code, cached) 

85 

86 if "current_price_data" not in cached: 

87 cached["current_price_data"] = {"output": {}} 

88 

89 output = cached["current_price_data"].get("output") 

90 before_price = None 

91 if isinstance(output, dict): 

92 before_price = output.get("stck_prpr") 

93 output["stck_prpr"] = str(int(current_price)) 

94 if volume > 0: 

95 output["acml_vol"] = str(volume) 

96 elif output is not None: 96 ↛ 105line 96 didn't jump to line 105 because the condition on line 96 was always true

97 try: 

98 before_price = getattr(output, "stck_prpr", None) 

99 setattr(output, "stck_prpr", str(int(current_price))) 

100 if volume > 0: 100 ↛ 105line 100 didn't jump to line 105 because the condition on line 100 was always true

101 setattr(output, "acml_vol", str(volume)) 

102 except Exception: 

103 pass 

104 

105 cached["price_updated_at"] = time.time() 

106 if self._cache_logger and before_price != str(int(current_price)): 

107 self._cache_logger.log_price_update_tick( 

108 code, before_price, str(int(current_price)), volume 

109 ) 

110 

111 def mark_streaming(self, code: str) -> None: 

112 """해당 종목이 실시간 스트리밍 중임을 등록. TTL 우회 활성화.""" 

113 self._streaming_codes.add(code) 

114 if self._cache_logger: 114 ↛ exitline 114 didn't return from function 'mark_streaming' because the condition on line 114 was always true

115 self._cache_logger.log_streaming_mark(code, len(self._streaming_codes)) 

116 

117 def unmark_streaming(self, code: str) -> None: 

118 """실시간 스트리밍 종료. TTL 우회 해제.""" 

119 self._streaming_codes.discard(code) 

120 if self._cache_logger: 120 ↛ exitline 120 didn't return from function 'unmark_streaming' because the condition on line 120 was always true

121 self._cache_logger.log_streaming_unmark(code, len(self._streaming_codes)) 

122 

123 def is_streaming(self, code: str) -> bool: 

124 """해당 종목이 현재 스트리밍 중인지 여부.""" 

125 return code in self._streaming_codes 

126 

127 def get_cache_stats(self, expand: bool = False) -> dict: 

128 """현재가 캐시 통계를 반환합니다.""" 

129 return self._price_cache.get_stats(expand=expand)