Coverage for core / logger.py: 96%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# core/logger.py 

2import logging 

3import os 

4import time 

5import glob 

6from datetime import datetime 

7import http.client 

8import json 

9from logging.handlers import RotatingFileHandler 

10 

11# --- Log Rotation Constants --- 

12LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB 

13LOG_BACKUP_COUNT = 30 

14 

15# --- Timestamp Singleton --- 

16_log_timestamp = None 

17 

18def get_log_timestamp(): 

19 """애플리케이션 실행 당 한 번만 타임스탬프를 생성하고, 이후에는 동일한 값을 반환합니다.""" 

20 global _log_timestamp 

21 if _log_timestamp is None: 

22 _log_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 

23 return _log_timestamp 

24 

25def reset_log_timestamp_for_test(): 

26 """테스트 격리를 위해 전역 타임스탬프를 리셋합니다.""" 

27 global _log_timestamp 

28 _log_timestamp = None 

29 

30class SizeTimeRotatingFileHandler(RotatingFileHandler): 

31 """ 

32 파일 크기가 maxBytes를 초과하면 인덱스를 붙여 새 파일로 교체하는 핸들러. 

33 인덱스가 클수록 최신 파일입니다. 

34 예: app_1.log (가장 오래됨) ... app_25.log (오래된 백업) -> app_26.log (현재 활성) 

35 """ 

36 def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False): 

37 # 확장자 처리 (예: .log.json) 

38 if filename.endswith(".log.json"): 

39 root, ext = filename[:-len(".log.json")], ".log.json" 

40 else: 

41 root, ext = os.path.splitext(filename) 

42 

43 self._log_root = root 

44 self._log_ext = ext 

45 

46 # 기존 인덱스 파일 중 최대 인덱스 탐색 

47 pattern = f"{glob.escape(root)}_[0-9]*{glob.escape(ext)}" 

48 max_index = 0 

49 for f in glob.glob(pattern): 

50 try: 

51 idx_str = f[:-len(ext)].split('_')[-1] 

52 if idx_str.isdigit(): 52 ↛ 49line 52 didn't jump to line 49 because the condition on line 52 was always true

53 max_index = max(max_index, int(idx_str)) 

54 except (ValueError, IndexError): 

55 continue 

56 

57 # 초기 활성 파일은 max_index + 1 번 인덱스로 생성 

58 initial_filename = f"{root}_{max_index + 1}{ext}" 

59 super().__init__(initial_filename, mode=mode, maxBytes=maxBytes, 

60 backupCount=backupCount, encoding=encoding, delay=delay) 

61 

62 def doRollover(self): 

63 if self.stream: 63 ↛ 68line 63 didn't jump to line 68 because the condition on line 63 was always true

64 self.stream.close() 

65 self.stream = None 

66 

67 # 현재 존재하는 인덱스 파일 중 최대 인덱스 결정 

68 pattern = f"{glob.escape(self._log_root)}_[0-9]*{glob.escape(self._log_ext)}" 

69 existing = glob.glob(pattern) 

70 

71 max_index = 0 

72 for f in existing: 

73 try: 

74 idx_str = f[:-len(self._log_ext)].split('_')[-1] 

75 if idx_str.isdigit(): 75 ↛ 72line 75 didn't jump to line 72 because the condition on line 75 was always true

76 max_index = max(max_index, int(idx_str)) 

77 except (ValueError, IndexError): 

78 continue 

79 

80 # baseFilename을 다음 인덱스 파일로 업데이트 (이것이 새 활성 파일이 됨) 

81 next_filename = f"{self._log_root}_{max_index + 1}{self._log_ext}" 

82 self.baseFilename = os.path.abspath(next_filename) 

83 

84 # 오래된 파일 삭제 (backupCount 초과 시) 

85 if self.backupCount > 0: 85 ↛ 96line 85 didn't jump to line 96 because the condition on line 85 was always true

86 all_files = glob.glob(pattern) 

87 all_files.sort(key=lambda f: int(f[:-len(self._log_ext)].split('_')[-1]) 

88 if f[:-len(self._log_ext)].split('_')[-1].isdigit() else -1) 

89 if len(all_files) > self.backupCount: 

90 for f in all_files[:len(all_files) - self.backupCount]: 

91 try: 

92 os.remove(f) 

93 except OSError: 

94 pass 

95 

96 if not self.delay: 96 ↛ exitline 96 didn't return from function 'doRollover' because the condition on line 96 was always true

97 self.stream = self._open() 

98 

99 

100class JsonFormatter(logging.Formatter): 

101 """ 

102 로그 레코드를 JSON 형식으로 변환하는 포맷터. 

103 """ 

104 def format(self, record): 

105 log_object = { 

106 "timestamp": self.formatTime(record, self.datefmt), 

107 "level": record.levelname, 

108 "name": record.name, 

109 } 

110 # message가 dict 형태이면, 그대로 data 필드로 추가 

111 if isinstance(record.msg, dict): 

112 log_object["data"] = record.msg 

113 else: 

114 log_object["message"] = record.getMessage() 

115 

116 # 예외 정보가 있으면 추가 

117 if record.exc_info: 

118 log_object['exc_info'] = self.formatException(record.exc_info) 

119 

120 return json.dumps(log_object, ensure_ascii=False, default=str) 

121 

122 

123def get_streaming_logger(log_dir: str = "logs") -> "StreamingEventLogger": 

124 """ 

125 실시간 스트리밍 전용 이벤트 로거를 생성하고 반환합니다. 

126 경로: logs/streaming/{timestamp}_streaming.log.json 

127 

128 로그 항목 구조: 

129 - action: "subscribe" | "unsubscribe" | "summary" | 

130 "connect" | "disconnect" | 

131 "reconnect" | "restore" | 

132 "pt_subscribe" | "pt_unsubscribe" (프로그램매매 H0STPGM0) 

133 "price_subscribe" | "price_unsubscribe" (현재 체결가 H0STCNT0) 

134 + action별 세부 필드 (code, categories, reason, trigger, ...) 

135 """ 

136 streaming_log_dir = os.path.join(log_dir, "streaming") 

137 os.makedirs(streaming_log_dir, exist_ok=True) 

138 

139 logger_name = "streaming_event" 

140 logger = logging.getLogger(logger_name) 

141 

142 if not logger.handlers: 

143 logger.setLevel(logging.DEBUG) 

144 logger.propagate = False 

145 

146 timestamp = get_log_timestamp() 

147 log_file = os.path.join(streaming_log_dir, f"{timestamp}_streaming.log.json") 

148 handler = SizeTimeRotatingFileHandler( 

149 log_file, 

150 mode="a", 

151 encoding="utf-8", 

152 maxBytes=LOG_MAX_BYTES, 

153 backupCount=LOG_BACKUP_COUNT, 

154 ) 

155 handler.setFormatter(JsonFormatter()) 

156 logger.addHandler(handler) 

157 

158 return StreamingEventLogger(logger) 

159 

160 

161class StreamingEventLogger: 

162 """ 

163 실시간 구독·연결 이벤트를 JSON으로 기록하는 전용 로거. 

164 

165 logs/streaming/{timestamp}_streaming.log.json 에 기록한다. 

166 각 항목은 JsonFormatter를 통해 {"timestamp":..., "level":..., "data":{...}} 형태로 저장된다. 

167 

168 --- 로그 종류 --- 

169 [통합 가격 구독 - PriceSubscriptionService (H0UNCNT0)] 

170 log_subscribe : 통합 현재가 구독 등록 (categories = 구독 요청자 목록) 

171 log_unsubscribe : 통합 현재가 구독 해제 

172 log_summary : 현재 전체 구독 현황 스냅샷 

173 

174 [연결 이벤트 - StreamingService] 

175 log_connect : WebSocket 연결 성공 

176 log_disconnect : WebSocket 연결 해제 

177 

178 [워치독/복원 이벤트 - WebSocketWatchdogTask] 

179 log_reconnect : 강제 재연결 (trigger 포함) 

180 log_restore : 앱 시작 시 구독 상태 복원 

181 

182 [프로그램매매 구독 - WebSocketWatchdogTask (H0STPGM0)] 

183 log_pt_subscribe : 프로그램매매 구독 등록 

184 log_pt_unsubscribe : 프로그램매매 구독 해제 

185 

186 [실시간 체결가 구독 - WebSocketWatchdogTask (H0STCNT0)] 

187 log_price_subscribe : 실시간 현재 체결가 구독 등록 

188 log_price_unsubscribe : 실시간 현재 체결가 구독 해제 

189 """ 

190 

191 def __init__(self, logger: logging.Logger): 

192 self._logger = logger 

193 

194 # ── PriceSubscriptionService 이벤트 (H0UNCNT0) ────────────── 

195 

196 def log_subscribe(self, code: str, categories: dict, active_count: int) -> None: 

197 """통합 현재가 구독 등록. 

198 

199 Args: 

200 code: 종목코드 

201 categories: {category_key: priority_int} — 해당 종목을 요청한 카테고리 목록 

202 ex) {"portfolio": 1, "strategy_momentum": 2} 

203 active_count: 구독 등록 후 총 활성 구독 수 

204 """ 

205 self._logger.info({ 

206 "action": "subscribe", 

207 "code": code, 

208 "categories": {k: int(v) for k, v in categories.items()}, 

209 "active_count": active_count, 

210 }) 

211 

212 def log_unsubscribe(self, code: str, active_count: int) -> None: 

213 """통합 현재가 구독 해제. 

214 

215 Args: 

216 code: 종목코드 

217 active_count: 구독 해제 후 총 활성 구독 수 

218 """ 

219 self._logger.info({ 

220 "action": "unsubscribe", 

221 "code": code, 

222 "active_count": active_count, 

223 }) 

224 

225 def log_summary( 

226 self, 

227 active_count: int, 

228 active_codes: list, 

229 pending_by_priority: dict, 

230 ) -> None: 

231 """현재 구독 상태 전체 요약. 

232 

233 Args: 

234 active_count: 현재 활성 구독 수 

235 active_codes: 활성 구독 종목 목록 

236 pending_by_priority: {"HIGH": [...], "MEDIUM": [...], "LOW": [...]} 

237 """ 

238 self._logger.info({ 

239 "action": "summary", 

240 "active_count": active_count, 

241 "active_codes": sorted(active_codes), 

242 "pending_by_priority": pending_by_priority, 

243 }) 

244 

245 # ── StreamingService 이벤트 ────────────────────────────────── 

246 

247 def log_connect(self) -> None: 

248 """WebSocket 연결 성공.""" 

249 self._logger.info({"action": "connect"}) 

250 

251 def log_disconnect(self, reason: str = "") -> None: 

252 """WebSocket 연결 해제. 

253 

254 Args: 

255 reason: 해제 이유 (e.g., "market_closed", "manual", "") 

256 """ 

257 self._logger.info({"action": "disconnect", "reason": reason}) 

258 

259 # ── WebSocketWatchdogTask 이벤트 ───────────────────────────── 

260 

261 def log_reconnect( 

262 self, 

263 trigger: str, 

264 codes: list, 

265 success: int, 

266 total: int, 

267 ) -> None: 

268 """강제 재연결 완료. 

269 

270 Args: 

271 trigger: 재연결 원인 ("receive_task_dead" | "data_gap_{N}s") 

272 codes: 재구독 시도한 종목 목록 

273 success: 성공한 종목 수 

274 total: 전체 시도 종목 수 

275 """ 

276 self._logger.info({ 

277 "action": "reconnect", 

278 "trigger": trigger, 

279 "codes": sorted(codes), 

280 "success": success, 

281 "total": total, 

282 }) 

283 

284 def log_restore(self, codes: list, success: int, total: int) -> None: 

285 """앱 시작 시 구독 상태 복원 완료. 

286 

287 Args: 

288 codes: 복원 시도한 종목 목록 

289 success: 성공한 종목 수 

290 total: 전체 시도 종목 수 

291 """ 

292 self._logger.info({ 

293 "action": "restore", 

294 "codes": sorted(codes), 

295 "success": success, 

296 "total": total, 

297 }) 

298 

299 # ── 프로그램매매 구독 이벤트 (H0STPGM0) ──────────────────── 

300 

301 def log_pt_subscribe(self, code: str, reason: str = "") -> None: 

302 """프로그램매매 실시간 구독 등록 (H0STPGM0). 

303 

304 Args: 

305 code: 종목코드 

306 reason: 구독 이유 (e.g., "initial", "reconnect", "restore", "user_request") 

307 """ 

308 self._logger.info({"action": "pt_subscribe", "code": code, "reason": reason}) 

309 

310 def log_pt_unsubscribe(self, code: str, reason: str = "") -> None: 

311 """프로그램매매 실시간 구독 해제 (H0STPGM0). 

312 

313 Args: 

314 code: 종목코드 

315 reason: 해제 이유 (e.g., "failed", "user_request") 

316 """ 

317 self._logger.info({"action": "pt_unsubscribe", "code": code, "reason": reason}) 

318 

319 # ── 실시간 체결가 구독 이벤트 (H0STCNT0) ────────────────── 

320 

321 def log_price_subscribe(self, code: str, reason: str = "") -> None: 

322 """실시간 현재 체결가 구독 등록 (H0STCNT0). 

323 

324 워치독이 프로그램매매 종목과 함께 체결가도 함께 구독하는 경우 사용. 

325 

326 Args: 

327 code: 종목코드 

328 reason: 구독 이유 (e.g., "initial", "reconnect", "restore") 

329 """ 

330 self._logger.info({"action": "price_subscribe", "code": code, "reason": reason}) 

331 

332 def log_price_unsubscribe(self, code: str, reason: str = "") -> None: 

333 """실시간 현재 체결가 구독 해제 (H0STCNT0). 

334 

335 Args: 

336 code: 종목코드 

337 reason: 해제 이유 (e.g., "failed", "user_request") 

338 """ 

339 self._logger.info({"action": "price_unsubscribe", "code": code, "reason": reason}) 

340 

341 

342def get_cache_event_logger(log_dir: str = "logs") -> "CacheEventLogger": 

343 """ 

344 캐시 동작 전용 이벤트 로거를 생성하고 반환합니다. 

345 경로: logs/cache/{timestamp}_cache.log.json 

346 

347 로그 항목 구조: 

348 - action: 아래 CacheEventLogger 참조 

349 + action별 세부 필드 (code, caller, before_price, after_price, ohlcv_count, ...) 

350 """ 

351 cache_log_dir = os.path.join(log_dir, "cache") 

352 os.makedirs(cache_log_dir, exist_ok=True) 

353 

354 logger_name = "cache_event" 

355 logger = logging.getLogger(logger_name) 

356 

357 if not logger.handlers: 

358 logger.setLevel(logging.DEBUG) 

359 logger.propagate = False 

360 

361 timestamp = get_log_timestamp() 

362 log_file = os.path.join(cache_log_dir, f"{timestamp}_cache.log.json") 

363 handler = SizeTimeRotatingFileHandler( 

364 log_file, 

365 mode="a", 

366 encoding="utf-8", 

367 maxBytes=LOG_MAX_BYTES, 

368 backupCount=LOG_BACKUP_COUNT, 

369 ) 

370 handler.setFormatter(JsonFormatter()) 

371 logger.addHandler(handler) 

372 

373 return CacheEventLogger(logger) 

374 

375 

376class CacheEventLogger: 

377 """ 

378 현재가·OHLCV 캐시 동작을 JSON으로 기록하는 전용 로거. 

379 

380 logs/cache/{timestamp}_cache.log.json 에 기록한다. 

381 

382 --- 로그 종류 --- 

383 

384 [현재가 캐시 — StockPriceRepository (LRU)] 

385 price_set : API 응답으로 현재가 캐시 등록/갱신 (before/after price, is_new) 

386 price_update_tick : WebSocket 틱으로 현재가 갱신 (before/after price, volume) 

387 price_hit : 캐시 히트 (caller, age_sec, is_streaming) 

388 price_miss : 캐시 미스 (caller, reason: "not_found" | "ttl_expired") 

389 price_evicted : LRU capacity 초과로 캐시 제거 

390 

391 [스트리밍 상태 — StockPriceRepository] 

392 streaming_mark : 실시간 스트리밍 등록 (streaming_count) 

393 streaming_unmark : 실시간 스트리밍 해제 (streaming_count) 

394 

395 [OHLCV 캐시 — StockOhlcvRepository (LFU)] 

396 ohlcv_loaded : DB에서 OHLCV 로드 후 캐시 등록 (caller, ohlcv_count, latest_date) 

397 ohlcv_hit : 캐시 히트 (caller, ohlcv_count, has_today_candle) 

398 ohlcv_miss : 캐시 미스 (caller) 

399 ohlcv_evicted : LFU capacity 초과로 캐시 제거 (freq, ohlcv_count) 

400 ohlcv_invalidated : upsert 후 캐시 무효화 

401 ohlcv_upsert : OHLCV upsert 배치 완료 (record_count, code_count, invalidated_codes) 

402 today_candle : 당일 캔들 갱신 (before/after price, high, low, is_new_candle) 

403 

404 [통합 통계] 

405 cache_stats : 현재가+OHLCV 캐시 hit/miss 통계 스냅샷 

406 """ 

407 

408 def __init__(self, logger: logging.Logger): 

409 self._logger = logger 

410 

411 # ── 현재가 캐시 이벤트 ─────────────────────────────────────────── 

412 

413 def log_price_set( 

414 self, 

415 code: str, 

416 caller: str, 

417 before_price: str, 

418 after_price: str, 

419 is_new: bool, 

420 ) -> None: 

421 """API 응답으로 현재가 캐시 등록 또는 갱신. 

422 

423 Args: 

424 code: 종목코드 

425 caller: 호출 출처 (e.g., "market_data_service", "streaming") 

426 before_price: 갱신 전 stck_prpr (캐시 미존재 시 None) 

427 after_price: 갱신 후 stck_prpr 

428 is_new: True이면 캐시에 처음 등록 (신규 종목) 

429 """ 

430 self._logger.info({ 

431 "action": "price_set", 

432 "code": code, 

433 "caller": caller, 

434 "before_price": before_price, 

435 "after_price": after_price, 

436 "is_new": is_new, 

437 }) 

438 

439 def log_price_update_tick( 

440 self, 

441 code: str, 

442 before_price: str, 

443 after_price: str, 

444 volume: int, 

445 ) -> None: 

446 """WebSocket 틱 데이터로 현재가 갱신. 가격 변동 시에만 기록. 

447 

448 Args: 

449 code: 종목코드 

450 before_price: 갱신 전 stck_prpr 

451 after_price: 갱신 후 stck_prpr 

452 volume: 누적 거래량 

453 """ 

454 self._logger.debug({ 

455 "action": "price_update_tick", 

456 "code": code, 

457 "before_price": before_price, 

458 "after_price": after_price, 

459 "volume": volume, 

460 }) 

461 

462 def log_price_hit( 

463 self, 

464 code: str, 

465 caller: str, 

466 age_sec: float, 

467 is_streaming: bool, 

468 ) -> None: 

469 """현재가 캐시 히트. 

470 

471 Args: 

472 code: 종목코드 

473 caller: 호출 출처 

474 age_sec: 캐시 데이터 경과 시간 (초) 

475 is_streaming: 실시간 스트리밍 중 여부 (TTL 무제한) 

476 """ 

477 self._logger.debug({ 

478 "action": "price_hit", 

479 "code": code, 

480 "caller": caller, 

481 "age_sec": round(age_sec, 2), 

482 "is_streaming": is_streaming, 

483 }) 

484 

485 def log_price_miss(self, code: str, caller: str, reason: str) -> None: 

486 """현재가 캐시 미스. 

487 

488 Args: 

489 code: 종목코드 

490 caller: 호출 출처 

491 reason: "not_found" | "ttl_expired" 

492 """ 

493 self._logger.debug({ 

494 "action": "price_miss", 

495 "code": code, 

496 "caller": caller, 

497 "reason": reason, 

498 }) 

499 

500 def log_price_evicted(self, code: str, capacity: int) -> None: 

501 """LRU 용량 초과로 현재가 캐시에서 제거. 

502 

503 Args: 

504 code: 제거된 종목코드 

505 capacity: 캐시 최대 용량 

506 """ 

507 self._logger.warning({ 

508 "action": "price_evicted", 

509 "code": code, 

510 "capacity": capacity, 

511 }) 

512 

513 # ── 스트리밍 상태 이벤트 ───────────────────────────────────────── 

514 

515 def log_streaming_mark(self, code: str, streaming_count: int) -> None: 

516 """실시간 스트리밍 등록 (TTL 무제한 전환). 

517 

518 Args: 

519 code: 종목코드 

520 streaming_count: 등록 후 총 스트리밍 종목 수 

521 """ 

522 self._logger.info({ 

523 "action": "streaming_mark", 

524 "code": code, 

525 "streaming_count": streaming_count, 

526 }) 

527 

528 def log_streaming_unmark(self, code: str, streaming_count: int) -> None: 

529 """실시간 스트리밍 해제 (TTL 정상 적용 복귀). 

530 

531 Args: 

532 code: 종목코드 

533 streaming_count: 해제 후 총 스트리밍 종목 수 

534 """ 

535 self._logger.info({ 

536 "action": "streaming_unmark", 

537 "code": code, 

538 "streaming_count": streaming_count, 

539 }) 

540 

541 # ── OHLCV 캐시 이벤트 ─────────────────────────────────────────── 

542 

543 def log_ohlcv_loaded( 

544 self, 

545 code: str, 

546 caller: str, 

547 ohlcv_count: int, 

548 latest_date: str, 

549 ) -> None: 

550 """DB에서 OHLCV 데이터를 읽어 캐시에 등록. 

551 

552 Args: 

553 code: 종목코드 

554 caller: 호출 출처 

555 ohlcv_count: 적재된 OHLCV 일수 

556 latest_date: 가장 최근 OHLCV 날짜 (데이터 신선도 확인) 

557 """ 

558 self._logger.info({ 

559 "action": "ohlcv_loaded", 

560 "code": code, 

561 "caller": caller, 

562 "ohlcv_count": ohlcv_count, 

563 "latest_date": latest_date, 

564 }) 

565 

566 def log_ohlcv_hit( 

567 self, 

568 code: str, 

569 caller: str, 

570 ohlcv_count: int, 

571 has_today_candle: bool, 

572 ) -> None: 

573 """OHLCV 캐시 히트. 

574 

575 Args: 

576 code: 종목코드 

577 caller: 호출 출처 

578 ohlcv_count: 캐시에 있는 총 OHLCV 일수 (historical + today 포함) 

579 has_today_candle: 당일 캔들 존재 여부 

580 """ 

581 self._logger.debug({ 

582 "action": "ohlcv_hit", 

583 "code": code, 

584 "caller": caller, 

585 "ohlcv_count": ohlcv_count, 

586 "has_today_candle": has_today_candle, 

587 }) 

588 

589 def log_ohlcv_miss(self, code: str, caller: str) -> None: 

590 """OHLCV 캐시 미스 (DB 조회 필요). 

591 

592 Args: 

593 code: 종목코드 

594 caller: 호출 출처 

595 """ 

596 self._logger.debug({ 

597 "action": "ohlcv_miss", 

598 "code": code, 

599 "caller": caller, 

600 }) 

601 

602 def log_ohlcv_evicted(self, code: str, freq: int, ohlcv_count: int, capacity: int) -> None: 

603 """LFU 용량 초과로 OHLCV 캐시에서 제거. 

604 

605 Args: 

606 code: 제거된 종목코드 

607 freq: 제거 시점까지의 누적 접근 횟수 (낮을수록 자주 안 쓰인 종목) 

608 ohlcv_count: 제거된 종목의 OHLCV 일수 

609 capacity: 캐시 최대 용량 

610 """ 

611 self._logger.warning({ 

612 "action": "ohlcv_evicted", 

613 "code": code, 

614 "freq": freq, 

615 "ohlcv_count": ohlcv_count, 

616 "capacity": capacity, 

617 }) 

618 

619 def log_ohlcv_invalidated(self, code: str) -> None: 

620 """upsert 이후 해당 종목 OHLCV 캐시 무효화 (다음 조회 시 DB에서 재로드). 

621 

622 Args: 

623 code: 무효화된 종목코드 

624 """ 

625 self._logger.info({ 

626 "action": "ohlcv_invalidated", 

627 "code": code, 

628 }) 

629 

630 def log_ohlcv_upsert( 

631 self, 

632 record_count: int, 

633 code_count: int, 

634 invalidated_codes: list, 

635 ) -> None: 

636 """OHLCV upsert 배치 완료 및 캐시 무효화 요약. 

637 

638 Args: 

639 record_count: upsert된 총 레코드 수 

640 code_count: 영향 받은 고유 종목 수 

641 invalidated_codes: 캐시 무효화된 종목코드 목록 

642 """ 

643 self._logger.info({ 

644 "action": "ohlcv_upsert", 

645 "record_count": record_count, 

646 "code_count": code_count, 

647 "invalidated_codes": sorted(invalidated_codes), 

648 }) 

649 

650 def log_today_candle( 

651 self, 

652 code: str, 

653 before_price, 

654 after_price: float, 

655 high: float, 

656 low: float, 

657 is_new_candle: bool, 

658 ) -> None: 

659 """WebSocket 틱으로 당일 캔들 갱신. 

660 

661 Args: 

662 code: 종목코드 

663 before_price: 갱신 전 close 가격 (캔들 없으면 None) 

664 after_price: 갱신 후 close 가격 

665 high: 갱신 후 고가 

666 low: 갱신 후 저가 

667 is_new_candle: True이면 ohlcv_today 신규 생성 (기존 historical[-1] 갱신이 아님) 

668 """ 

669 self._logger.debug({ 

670 "action": "today_candle", 

671 "code": code, 

672 "before_price": before_price, 

673 "after_price": after_price, 

674 "high": high, 

675 "low": low, 

676 "is_new_candle": is_new_candle, 

677 }) 

678 

679 # ── 통합 통계 ───────────────────────────────────────────────────── 

680 

681 def log_stats(self, price_stats: dict, ohlcv_stats: dict) -> None: 

682 """현재가 + OHLCV 캐시 통합 hit/miss 통계 스냅샷. 

683 

684 Args: 

685 price_stats: StockPriceRepository.get_cache_stats() 결과 

686 ohlcv_stats: StockOhlcvRepository.get_cache_stats() 결과 

687 """ 

688 total_hits = price_stats.get("hits", 0) + ohlcv_stats.get("hits", 0) 

689 total_misses = price_stats.get("misses", 0) + ohlcv_stats.get("misses", 0) 

690 total = total_hits + total_misses 

691 self._logger.info({ 

692 "action": "cache_stats", 

693 "price": { 

694 "hits": price_stats.get("hits", 0), 

695 "misses": price_stats.get("misses", 0), 

696 "hit_rate": price_stats.get("hit_rate", 0.0), 

697 "current_size": price_stats.get("current_size", 0), 

698 }, 

699 "ohlcv": { 

700 "hits": ohlcv_stats.get("hits", 0), 

701 "misses": ohlcv_stats.get("misses", 0), 

702 "hit_rate": ohlcv_stats.get("hit_rate", 0.0), 

703 "current_size": ohlcv_stats.get("current_size", 0), 

704 }, 

705 "combined": { 

706 "hits": total_hits, 

707 "misses": total_misses, 

708 "hit_rate": round(total_hits / total * 100, 2) if total > 0 else 0.0, 

709 }, 

710 }) 

711 

712 

713def get_strategy_logger(strategy_name: str, log_dir="logs", sub_dir: str = None): 

714 """ 

715 전략별 전용 로거를 생성하고 반환합니다. 

716 - 실행 시마다 타임스탬프가 찍힌 JSON 파일 핸들러 생성 

717 - 콘솔 스트림 핸들러 

718 """ 

719 logger = logging.getLogger(f"strategy.{strategy_name}") 

720 

721 if logger.handlers: 

722 # 이미 핸들러가 설정된 경우, 새 실행을 위해 기존 핸들러를 제거하고 다시 설정 

723 for handler in logger.handlers[:]: 

724 handler.close() 

725 logger.removeHandler(handler) 

726 

727 logger.setLevel(logging.DEBUG) 

728 logger.propagate = True 

729 

730 strategy_log_dir = os.path.join(log_dir, "strategies") 

731 if sub_dir: 

732 strategy_log_dir = os.path.join(strategy_log_dir, sub_dir) 

733 if not os.path.exists(strategy_log_dir): 

734 os.makedirs(strategy_log_dir, exist_ok=True) 

735 

736 timestamp = get_log_timestamp() 

737 

738 # 1. JSON 파일 핸들러 (실행마다 새로 생성) 

739 log_file = os.path.join(strategy_log_dir, f"{timestamp}_{strategy_name}.log.json") 

740 file_handler = SizeTimeRotatingFileHandler( 

741 log_file, 

742 mode='a', 

743 encoding='utf-8', 

744 maxBytes=LOG_MAX_BYTES, 

745 backupCount=LOG_BACKUP_COUNT 

746 ) 

747 file_handler.setFormatter(JsonFormatter()) 

748 logger.addHandler(file_handler) 

749 

750 return logger 

751 

752 

753def get_performance_logger(log_dir="logs"): 

754 """ 

755 성능 측정 전용 로거를 생성하고 반환합니다. 

756 경로: logs/performance/{timestamp}_perf.log 

757 """ 

758 logger = logging.getLogger("performance") 

759 

760 if logger.handlers: 

761 return logger 

762 

763 logger.setLevel(logging.INFO) 

764 logger.propagate = False 

765 

766 perf_log_dir = os.path.join(log_dir, "performance") 

767 if not os.path.exists(perf_log_dir): 

768 os.makedirs(perf_log_dir, exist_ok=True) 

769 

770 timestamp = get_log_timestamp() 

771 log_file = os.path.join(perf_log_dir, f"{timestamp}_perf.log") 

772 

773 file_handler = SizeTimeRotatingFileHandler( 

774 log_file, 

775 mode='a', 

776 encoding='utf-8', 

777 maxBytes=LOG_MAX_BYTES, 

778 backupCount=LOG_BACKUP_COUNT 

779 ) 

780 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) 

781 logger.addHandler(file_handler) 

782 

783 return logger 

784 

785 

786class StrategyInfoFilter(logging.Filter): 

787 """ 

788 전략 로거(strategy.*)의 로그는 INFO 레벨 이상만 통과시키는 필터. 

789 통합 로그(debug.log)에 전략의 과도한 DEBUG 로그가 쌓이는 것을 방지함. 

790 """ 

791 def filter(self, record): 

792 if record.name.startswith("strategy."): 

793 return record.levelno >= logging.INFO 

794 return True 

795 

796class Logger: 

797 """ 

798 애플리케이션의 로깅을 관리하는 클래스입니다. 

799 운영에 필요한 정보(operational.log)와 디버깅에 필요한 데이터(debug.log)를 분리하여 저장합니다. 

800 매 실행마다 시간이 적힌 새로운 로그 파일을 생성합니다. 

801 """ 

802 

803 def __init__(self, log_dir="logs"): 

804 self.log_dir = log_dir 

805 self.common_log_dir = os.path.join(self.log_dir, "common") 

806 self.strategy_log_dir = os.path.join(self.log_dir, "strategies") 

807 

808 # 공유 타임스탬프 생성 

809 timestamp = get_log_timestamp() 

810 

811 # 로그 디렉토리(logs/, logs/common, logs/strategies)가 없으면 생성 

812 if not os.path.exists(self.log_dir): 

813 os.makedirs(self.log_dir) 

814 if not os.path.exists(self.common_log_dir): 

815 os.makedirs(self.common_log_dir, exist_ok=True) 

816 if not os.path.exists(self.strategy_log_dir): 

817 os.makedirs(self.strategy_log_dir, exist_ok=True) 

818 

819 # 오래된 로그 파일 정리 (30일 경과) 

820 self._cleanup_old_logs(days=30) 

821 

822 # 로그 파일 경로 설정 (logs/common/ 하위) 

823 self.operational_log_path = os.path.join(self.common_log_dir, f"{timestamp}_operational.log") 

824 self.debug_log_path = os.path.join(self.common_log_dir, f"{timestamp}_debug.log") 

825 

826 # 로거 인스턴스 생성 

827 self.operational_logger = self._setup_logger('operational_logger', self.operational_log_path, logging.INFO) 

828 self.debug_logger = self._setup_logger('debug_logger', self.debug_log_path, logging.DEBUG) 

829 

830 # 전략 로그 필터 생성 (debug.log 용량 관리용) 

831 strategy_filter = StrategyInfoFilter() 

832 

833 # 기존 로깅 핸들러 제거 및 urllib3 로거 레벨 설정 (중복 로깅 방지) 

834 for handler in logging.root.handlers[:]: 

835 logging.root.removeHandler(handler) 

836 

837 # 루트 로거에 통합 로그 핸들러 연결 (전략 로거 등 전파된 로그 수집) 

838 root_logger = logging.getLogger() 

839 root_logger.setLevel(logging.DEBUG) 

840 for h in self.debug_logger.handlers: 

841 h.addFilter(strategy_filter) # 전략 로그는 INFO 이상만 debug.log에 기록 

842 root_logger.addHandler(h) 

843 for h in self.operational_logger.handlers: 

844 root_logger.addHandler(h) 

845 

846 logging.getLogger('urllib3.connectionpool').setLevel(logging.WARNING) 

847 logging.getLogger('aiosqlite').setLevel(logging.WARNING) 

848 http.client.HTTPConnection.debuglevel = 0 # HTTP 통신 디버그 레벨 비활성화 

849 

850 def _cleanup_old_logs(self, days=30): 

851 """ 

852 로그 디렉토리를 순회하며 지정된 일수(days)보다 오래된 파일을 삭제합니다. 

853 """ 

854 now = time.time() 

855 cutoff = now - (days * 86400) 

856 

857 for root, _, files in os.walk(self.log_dir): 

858 for filename in files: 

859 # 로그 파일 확장자 또는 패턴 확인 

860 if ".log" in filename or ".json" in filename: 

861 file_path = os.path.join(root, filename) 

862 try: 

863 if os.path.getmtime(file_path) < cutoff: 

864 os.remove(file_path) 

865 except Exception: 

866 pass # 삭제 실패(권한 문제, 파일 잠김 등) 시 무시 

867 

868 def _setup_logger(self, name, log_file, level, mode='a'): 

869 """단일 로거를 설정합니다.""" 

870 logger = logging.getLogger(name) 

871 logger.setLevel(level) 

872 logger.propagate = False 

873 

874 if logger.handlers: 

875 return logger 

876 

877 file_handler = SizeTimeRotatingFileHandler( 

878 log_file, 

879 mode=mode, 

880 encoding='utf-8', 

881 maxBytes=LOG_MAX_BYTES, 

882 backupCount=LOG_BACKUP_COUNT 

883 ) 

884 file_handler.setLevel(level) 

885 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s')) 

886 logger.addHandler(file_handler) 

887 

888 return logger 

889 

890 def info(self, message): 

891 self.operational_logger.info(message, stacklevel=2) 

892 self.debug_logger.info(message, stacklevel=2) 

893 

894 def debug(self, message): 

895 self.debug_logger.debug(message, stacklevel=2) 

896 

897 def warning(self, message, exc_info=False): 

898 self.operational_logger.warning(message, exc_info=exc_info, stacklevel=2) 

899 self.debug_logger.warning(message, exc_info=exc_info, stacklevel=2) 

900 

901 def error(self, message, exc_info=False): 

902 self.operational_logger.error(message, exc_info=exc_info, stacklevel=2) 

903 self.debug_logger.error(message, exc_info=exc_info, stacklevel=2) 

904 

905 def critical(self, message, exc_info=False): 

906 self.operational_logger.critical(message, exc_info=exc_info, stacklevel=2) 

907 self.debug_logger.critical(message, exc_info=exc_info, stacklevel=2) 

908 

909 def exception(self, message): 

910 """ 

911 예외 정보를 포함하여 ERROR 레벨로 로그를 남깁니다. 

912 주로 except 블록 안에서 사용합니다. 

913 """ 

914 self.error(message, exc_info=True)