Coverage for repositories / cache.py: 98%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# repositories/cache.py 

2""" 

3인메모리 캐시 구현체. 

4- _LRUCache : 현재가(price) 캐시용 — recency 기반 eviction 

5- _LFUCache : OHLCV 캐시용 — frequency 기반 eviction (자주 쓰이는 종목이 밀려나지 않음) 

6""" 

7import collections 

8from typing import Optional, Any 

9 

10 

11class _LRUCache: 

12 """내장 OrderedDict를 활용한 인메모리 LRU(Least Recently Used) 캐시""" 

13 def __init__(self, capacity: int = 500, on_evict=None): 

14 self.cache = collections.OrderedDict() 

15 self.capacity = capacity 

16 self.hits = 0 

17 self.misses = 0 

18 self.item_hits = collections.defaultdict(int) 

19 self.caller_stats = collections.defaultdict(lambda: {"hits": 0, "misses": 0, "keys": collections.defaultdict(int), "items": collections.defaultdict(int)}) 

20 self._on_evict = on_evict # callable(evicted_key: str) | None 

21 

22 def get(self, key, count_stats: bool = True, caller: str = "unknown", item_type: str = "unknown"): 

23 if count_stats: 

24 self.caller_stats[caller]["keys"][key] += 1 

25 self.caller_stats[caller]["items"][item_type] += 1 

26 

27 if key not in self.cache: 

28 if count_stats: 

29 self.misses += 1 

30 self.caller_stats[caller]["misses"] += 1 

31 return None 

32 if count_stats: 

33 self.hits += 1 

34 self.item_hits[key] += 1 

35 self.caller_stats[caller]["hits"] += 1 

36 self.cache.move_to_end(key) 

37 return self.cache[key] 

38 

39 def put(self, key, value): 

40 self.cache[key] = value 

41 self.cache.move_to_end(key) 

42 if len(self.cache) > self.capacity: 

43 removed_key, _ = self.cache.popitem(last=False) 

44 if removed_key in self.item_hits: 

45 del self.item_hits[removed_key] 

46 if self._on_evict: 

47 try: 

48 self._on_evict(removed_key) 

49 except Exception: 

50 pass 

51 

52 def delete(self, key): 

53 if key in self.cache: 

54 del self.cache[key] 

55 if key in self.item_hits: 55 ↛ exitline 55 didn't return from function 'delete' because the condition on line 55 was always true

56 del self.item_hits[key] 

57 

58 def get_stats(self, expand: bool = False) -> dict: 

59 """캐시 적중률 통계를 반환합니다.""" 

60 total = self.hits + self.misses 

61 hit_rate = (self.hits / total * 100) if total > 0 else 0.0 

62 

63 callers_out = {} 

64 for c, s in self.caller_stats.items(): 

65 callers_out[c] = { 

66 "hits": s["hits"], 

67 "misses": s["misses"], 

68 "items": dict(s["items"]) 

69 } 

70 if expand: 

71 callers_out[c]["keys"] = dict(sorted(s["keys"].items(), key=lambda item: item[1], reverse=True)[:20]) 

72 

73 stats = { 

74 "hits": self.hits, 

75 "misses": self.misses, 

76 "hit_rate": round(hit_rate, 2), 

77 "total_requests": total, 

78 "current_size": len(self.cache), 

79 "callers": callers_out 

80 } 

81 

82 if expand: 

83 items = [] 

84 for key, val in list(self.cache.items()): 

85 if isinstance(val, dict): 

86 items.append({ 

87 "code": key, 

88 "hit_count": self.item_hits.get(key, 0), 

89 "has_ohlcv": "ohlcv" in val and len(val["ohlcv"]) > 0, 

90 "ohlcv_length": len(val["ohlcv"]) if "ohlcv" in val else 0, 

91 "has_current_price": "current_price_data" in val, 

92 "last_updated": val.get("last_updated"), 

93 "price_updated_at": val.get("price_updated_at") 

94 }) 

95 else: 

96 items.append({ 

97 "code": key, 

98 "hit_count": self.item_hits.get(key, 0), 

99 }) 

100 items.sort(key=lambda x: x.get("hit_count", 0), reverse=True) 

101 stats["items"] = items 

102 

103 return stats 

104 

105 

106class _LFUCache: 

107 """ 

108 LFU(Least Frequently Used) 캐시 — OHLCV 데이터 전용. 

109 접근 빈도가 낮은 항목부터 evict하여 자주 분석되는 종목이 캐시에 오래 남음. 

110 """ 

111 def __init__(self, capacity: int = 500, on_evict=None): 

112 self._cache: dict = {} # key → value 

113 self._freq: dict = collections.defaultdict(int) # key → access count 

114 self.capacity = capacity 

115 self.hits = 0 

116 self.misses = 0 

117 self.caller_stats = collections.defaultdict(lambda: {"hits": 0, "misses": 0, "keys": collections.defaultdict(int), "items": collections.defaultdict(int)}) 

118 self._on_evict = on_evict # callable(evicted_key: str, freq: int, ohlcv_count: int) | None 

119 

120 def get(self, key, count_stats: bool = True, caller: str = "unknown", item_type: str = "unknown") -> Optional[Any]: 

121 if key not in self._cache: 

122 if count_stats: 

123 self.misses += 1 

124 self.caller_stats[caller]["misses"] += 1 

125 self.caller_stats[caller]["items"][item_type] += 1 

126 return None 

127 if count_stats: 

128 self.hits += 1 

129 self._freq[key] += 1 

130 self.caller_stats[caller]["hits"] += 1 

131 self.caller_stats[caller]["keys"][key] += 1 

132 self.caller_stats[caller]["items"][item_type] += 1 

133 return self._cache[key] 

134 

135 def put(self, key, value): 

136 if key in self._cache: 

137 self._cache[key] = value 

138 self._freq[key] += 1 

139 return 

140 if len(self._cache) >= self.capacity: 

141 lfu_key = min(self._freq, key=lambda k: self._freq[k]) 

142 lfu_freq = self._freq[lfu_key] 

143 lfu_val = self._cache[lfu_key] 

144 lfu_ohlcv_count = len(lfu_val.get("ohlcv_historical", [])) if isinstance(lfu_val, dict) else 0 

145 del self._cache[lfu_key] 

146 del self._freq[lfu_key] 

147 if self._on_evict: 

148 try: 

149 self._on_evict(lfu_key, lfu_freq, lfu_ohlcv_count) 

150 except Exception: 

151 pass 

152 self._cache[key] = value 

153 self._freq[key] = 0 

154 

155 def delete(self, key): 

156 if key in self._cache: 

157 del self._cache[key] 

158 if key in self._freq: 158 ↛ exitline 158 didn't return from function 'delete' because the condition on line 158 was always true

159 del self._freq[key] 

160 

161 def get_stats(self, expand: bool = False, latest_trading_date: str = None) -> dict: 

162 """캐시 적중률 통계를 반환합니다.""" 

163 total = self.hits + self.misses 

164 hit_rate = (self.hits / total * 100) if total > 0 else 0.0 

165 

166 callers_out = {} 

167 for c, s in self.caller_stats.items(): 

168 callers_out[c] = { 

169 "hits": s["hits"], 

170 "misses": s["misses"], 

171 "items": dict(s["items"]), 

172 } 

173 if expand: 

174 callers_out[c]["keys"] = dict(sorted(s["keys"].items(), key=lambda item: item[1], reverse=True)[:20]) 

175 

176 stats = { 

177 "hits": self.hits, 

178 "misses": self.misses, 

179 "hit_rate": round(hit_rate, 2), 

180 "total_requests": total, 

181 "current_size": len(self._cache), 

182 "callers": callers_out, 

183 } 

184 if expand: 

185 items = [] 

186 for k, v in list(self._cache.items()): 

187 recent_dates: list[str] = [] 

188 has_today_candle = False 

189 if isinstance(v, dict): 

190 historical = v.get("ohlcv_historical", []) 

191 tail = historical[-5:] if len(historical) >= 5 else historical 

192 recent_dates = [ 

193 c["date"] for c in reversed(tail) 

194 if isinstance(c, dict) and "date" in c 

195 ] 

196 recent_dates = recent_dates[:5] 

197 

198 if latest_trading_date and historical: 

199 last_candle = historical[-1] 

200 has_today_candle = ( 

201 isinstance(last_candle, dict) and 

202 last_candle.get("date") == latest_trading_date 

203 ) 

204 

205 items.append({ 

206 "code": k, 

207 "freq": self._freq.get(k, 0), 

208 "historical_complete": v.get("historical_complete", False) if isinstance(v, dict) else False, 

209 "ohlcv_count": len(v.get("ohlcv_historical", [])) if isinstance(v, dict) else 0, 

210 "has_today_candle": has_today_candle, 

211 "recent_dates": recent_dates, 

212 }) 

213 items.sort(key=lambda x: x["freq"], reverse=True) 

214 stats["items"] = items 

215 return stats