Coverage for core / cache / file_cache.py: 93%

168 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# core/cache/file_cache.py 

2 

3import os 

4import json 

5import importlib 

6import time 

7from typing import Optional, Any 

8from dataclasses import dataclass, field, fields, MISSING, asdict, is_dataclass 

9from datetime import datetime 

10from core.cache.cache_config import load_cache_config 

11from pydantic import BaseModel 

12 

13def load_deserializable_classes(class_paths: list[str]) -> list[type]: 

14 classes = [] 

15 for path in class_paths: 

16 try: 

17 module_path, class_name = path.rsplit('.', 1) 

18 module = importlib.import_module(module_path) 

19 cls = getattr(module, class_name) 

20 classes.append(cls) 

21 except Exception as e: 

22 print(f"[❌ 클래스 로드 실패] {path}: {e}") 

23 return classes 

24 

25class FileCache: 

26 def __init__(self, config: Optional[dict] = None): 

27 if config is None: 

28 config = load_cache_config() 

29 self._base_dir = config["cache"]["base_dir"] 

30 self._logger = None 

31 self._deserializable_classes = load_deserializable_classes(config["cache"].get("deserializable_classes", [])) 

32 if not os.path.exists(self._base_dir): 

33 os.makedirs(self._base_dir, exist_ok=True) 

34 

35 def set_logger(self, logger): 

36 self._logger = logger 

37 

38 def _serialize(self, value: Any) -> Any: 

39 """직렬화 불가능한 객체 (예: dataclass 인스턴스)를 처리""" 

40 if hasattr(value, "to_dict") and callable(getattr(value, "to_dict")): 

41 # to_dict 메서드가 있는 객체는 해당 메서드를 사용하여 딕셔너리로 변환 

42 return value.to_dict() 

43 elif isinstance(value, BaseModel): 

44 return value.model_dump() 

45 elif isinstance(value, (list, tuple)): 

46 # 리스트/튜플 내의 항목도 재귀적으로 직렬화 

47 return [self._serialize(item) for item in value] 

48 elif isinstance(value, dict): 

49 # 딕셔너리 내의 값도 재귀적으로 직렬화 

50 return {k: self._serialize(v) for k, v in value.items()} 

51 # 그 외 기본 JSON 직렬화 가능 타입은 그대로 반환 

52 return value 

53 

54 def _deserialize(self, raw_data: Any) -> Any: 

55 if isinstance(raw_data, dict): 

56 best_cls = None 

57 best_ratio = 0.0 

58 best_is_dataclass = False 

59 

60 for cls in self._deserializable_classes: 

61 try: 

62 if issubclass(cls, BaseModel): 

63 cls_fields = set(cls.model_fields.keys()) 

64 elif is_dataclass(cls): 64 ↛ 67line 64 didn't jump to line 67 because the condition on line 64 was always true

65 cls_fields = {f.name for f in fields(cls)} 

66 else: 

67 continue 

68 

69 if not cls_fields.issubset(raw_data.keys()): 

70 continue 

71 

72 ratio = len(cls_fields) / len(raw_data) if raw_data else 0 

73 # 클래스 필드가 dict 키의 50% 이상을 커버해야 매칭 

74 # (소수 필드 클래스가 대형 dict에 잘못 매칭되는 것을 방지) 

75 if ratio < 0.5: 

76 continue 

77 

78 # ResCommonResponse는 래퍼이므로 즉시 처리 

79 if cls.__name__ == "ResCommonResponse": 

80 if "data" in raw_data: 80 ↛ 82line 80 didn't jump to line 82 because the condition on line 80 was always true

81 raw_data["data"] = self._deserialize(raw_data["data"]) 

82 if is_dataclass(cls): 

83 return cls.from_dict(raw_data) 

84 return cls.model_validate(raw_data) 

85 

86 if ratio > best_ratio: 86 ↛ 60line 86 didn't jump to line 60 because the condition on line 86 was always true

87 best_ratio = ratio 

88 best_cls = cls 

89 best_is_dataclass = is_dataclass(cls) 

90 except Exception: 

91 ... 

92 

93 if best_cls is not None: 

94 try: 

95 if best_is_dataclass: 

96 return best_cls.from_dict(raw_data) 

97 return best_cls.model_validate(raw_data) 

98 except Exception: 

99 ... 

100 

101 return {k: self._deserialize(v) for k, v in raw_data.items()} 

102 

103 elif isinstance(raw_data, (list, tuple)): 

104 return [self._deserialize(item) for item in raw_data] 

105 

106 return raw_data 

107 

108 def _get_path(self, key: str): 

109 return os.path.join(self._base_dir, f"{key}.json") 

110 

111 def set(self, key: str, value: Any, save_to_file: bool = False): 

112 if save_to_file: 

113 try: 

114 path = self._get_path(key) 

115 os.makedirs(os.path.dirname(path), exist_ok=True) 

116 

117 # _serialize 메서드를 사용하여 모든 객체를 직렬화 가능하도록 변환 

118 serialized_data = self._serialize(value) 

119 wrapper = serialized_data 

120 

121 with open(path, "w", encoding="utf-8") as f: 

122 json.dump(wrapper, f, ensure_ascii=False, indent=2) 

123 

124 if self._logger: 

125 self._logger.debug(f"💾 File cache 저장: {path}") 

126 except Exception as e: 

127 if self._logger: 127 ↛ exitline 127 didn't return from function 'set' because the condition on line 127 was always true

128 self._logger.error(f"❌ File cache 저장 실패: {e}") 

129 

130 def delete(self, key: str): 

131 path = self._get_path(key) 

132 if os.path.exists(path): 

133 try: 

134 os.remove(path) 

135 if self._logger: 

136 self._logger.debug(f"🗑️ File cache 삭제됨: {key}") 

137 except Exception as e: 

138 if self._logger: 138 ↛ exitline 138 didn't return from function 'delete' because the condition on line 138 was always true

139 self._logger.error(f"❌ File cache 삭제 실패: {e}") 

140 

141 def clear(self): 

142 """파일 캐시 전체 삭제""" 

143 if not os.path.exists(self._base_dir): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 return 

145 

146 try: 

147 for root, _, files in os.walk(self._base_dir): 

148 for file in files: 

149 if file.endswith(".json"): 

150 path = os.path.join(root, file) 

151 try: 

152 os.remove(path) 

153 if self._logger: 

154 self._logger.debug(f"🗑️ File cache 삭제됨: {path}") 

155 except Exception as e: 

156 if self._logger: 156 ↛ 148line 156 didn't jump to line 148 because the condition on line 156 was always true

157 self._logger.error(f"❌ 파일 삭제 실패: {path} - {e}") 

158 except Exception as e: 

159 if self._logger: 159 ↛ exitline 159 didn't return from function 'clear' because the condition on line 159 was always true

160 self._logger.error(f"❌ 전체 캐시 삭제 실패: {e}") 

161 

162 def cleanup_old_files(self, days: int = 7, max_size_mb: int = 0): 

163 """오래된 캐시 파일 삭제 (기본 7일)""" 

164 if not os.path.exists(self._base_dir): 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 return 

166 

167 cutoff = time.time() - (days * 86400) 

168 ohlcv_cutoff = time.time() - (365 * 86400) # OHLCV 데이터는 1년 보관 

169 

170 try: 

171 for root, _, files in os.walk(self._base_dir): 

172 for file in files: 

173 if file.endswith(".json"): 

174 path = os.path.join(root, file) 

175 try: 

176 mtime = os.path.getmtime(path) 

177 # OHLCV 데이터 별도 정책 적용 

178 if "ohlcv_past_" in file or "indicators_chart_" in file: 

179 if mtime < ohlcv_cutoff: 

180 os.remove(path) 

181 if self._logger: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 self._logger.debug(f"🗑️ 오래된 OHLCV File cache 삭제됨: {path}") 

183 else: 

184 if mtime < cutoff: 

185 os.remove(path) 

186 if self._logger: 

187 self._logger.debug(f"🗑️ 오래된 File cache 삭제됨: {path}") 

188 except Exception as e: 

189 if self._logger: 189 ↛ 172line 189 didn't jump to line 172 because the condition on line 189 was always true

190 self._logger.error(f"❌ 오래된 파일 삭제 실패: {path} - {e}") 

191 except Exception as e: 

192 if self._logger: 192 ↛ exitline 192 didn't return from function 'cleanup_old_files' because the condition on line 192 was always true

193 self._logger.error(f"❌ 캐시 정리 실패: {e}") 

194 

195 def get_raw(self, key: str): 

196 path = self._get_path(key) 

197 if not os.path.exists(path): 

198 return None 

199 try: 

200 with open(path, "r", encoding="utf-8") as f: 

201 wrapper = json.load(f) 

202 data = wrapper["data"] 

203 wrapper['data'] = self._deserialize(data) 

204 

205 return wrapper 

206 except Exception as e: 

207 if self._logger: 207 ↛ 209line 207 didn't jump to line 209 because the condition on line 207 was always true

208 self._logger.error(f"[FileCache] Load Error: {e}") 

209 return None 

210 

211 def exists(self, key: str) -> bool: 

212 """파일 캐시 존재 여부 확인""" 

213 return os.path.exists(self._get_path(key))