Coverage for repositories / stock_repository.py: 90%
76 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# repositories/stock_repository.py
2"""
3StockRepository — 현재가(StockPriceRepository)와 OHLCV(StockOhlcvRepository)를 통합하는 Facade.
5기존 callers(MarketDataService, StreamingService 등)는 이 클래스만 참조하면 되며
6내부적으로는 두 Repository에 위임한다.
7"""
8import logging
9from typing import Optional, List, Dict, Any
11from repositories.cache import _LRUCache, _LFUCache # 하위호환 re-export 용
12from repositories.stock_price_repository import StockPriceRepository
13from repositories.stock_ohlcv_repository import StockOhlcvRepository
14from core.logger import get_cache_event_logger
17class StockRepository:
18 """개별 종목 데이터(현재가 + OHLCV) 통합 Facade."""
20 def __init__(self, db_path: str = None, logger=None):
21 self._logger = logger or logging.getLogger(__name__)
22 self._cache_logger = get_cache_event_logger()
23 self._price_repo = StockPriceRepository(logger=self._logger, cache_logger=self._cache_logger)
24 self._ohlcv_repo = StockOhlcvRepository(db_path=db_path, logger=self._logger, cache_logger=self._cache_logger)
26 # ── 하위호환 프로퍼티 (테스트 및 레거시 접근용) ─────────────────────────────
28 @property
29 def _db_path(self) -> str:
30 return self._ohlcv_repo._db_path
32 @property
33 def _conn(self):
34 return self._ohlcv_repo._write_conn
36 @_conn.setter
37 def _conn(self, value):
38 self._ohlcv_repo._write_conn = value
40 def _get_connection(self):
41 """쓰기 전용 DB 연결 컨텍스트 매니저 (테스트에서 직접 접근 시 사용)."""
42 return self._ohlcv_repo._get_write_connection()
44 # ── 현재가 캐시 ──────────────────────────────────────────────────────────────
46 def set_current_price(self, code: str, price_data: dict):
47 """현재가 API 응답 전체 데이터를 캐시에 저장합니다."""
48 self._price_repo.set_current_price(code, price_data)
50 def get_current_price(self, code: str, max_age_sec: float = 3.0,
51 count_stats: bool = True, caller: str = "unknown") -> Optional[dict]:
52 """캐시된 현재가 데이터를 반환합니다. TTL 만료 시 None 반환."""
53 return self._price_repo.get_current_price(
54 code, max_age_sec=max_age_sec, count_stats=count_stats, caller=caller
55 )
57 # ── OHLCV 캐시/DB ──────────────────────────────────────────────────────────
59 async def get_stock_data(self, code: str, ohlcv_limit: int = 600,
60 caller: str = "unknown") -> Optional[Dict]:
61 """메모리 캐시 또는 DB에서 OHLCV 데이터를 반환합니다."""
62 return await self._ohlcv_repo.get_stock_data(code, ohlcv_limit=ohlcv_limit, caller=caller)
64 async def upsert_ohlcv(self, records: List[Dict]):
65 """여러 종목의 일봉(OHLCV) 데이터를 일괄 upsert 후 해당 종목 캐시 무효화."""
66 await self._ohlcv_repo.upsert_ohlcv(records)
68 async def get_ohlcv_summary(self, code: str) -> Dict[str, Any]:
69 """DB에서 종목의 OHLCV 요약 정보를 반환합니다."""
70 return await self._ohlcv_repo.get_ohlcv_summary(code)
72 async def get_ohlcv_max_trading_days(self) -> int:
73 """DB에 저장된 고유 거래일 수를 반환합니다."""
74 return await self._ohlcv_repo.get_ohlcv_max_trading_days()
76 # ── 실시간 틱 통합 업데이트 ───────────────────────────────────────────────────
78 def update_realtime_data(self, code: str, current_price: float, volume: int = 0):
79 """
80 장 중에 수신된 WebSocket 틱 데이터를 메모리 캐시에 즉시 반영합니다.
81 - 현재가 캐시(price_repo) 갱신
82 - OHLCV 당일 캔들(ohlcv_repo) 갱신
83 """
84 self._price_repo.update_current_price(code, current_price, volume)
85 self._ohlcv_repo.update_today_candle(code, current_price, volume)
87 # ── daily_prices (장마감 후 전종목 스냅샷) ──────────────────────────────────
89 async def upsert_daily_snapshot(self, trade_date: str, records: List[Dict]):
90 """장마감 후 전체 종목 현재가+펀더멘털 스냅샷을 일괄 upsert."""
91 await self._ohlcv_repo.upsert_daily_snapshot(trade_date, records)
93 async def get_prices_by_date(self, trade_date: str) -> List[Dict]:
94 """특정 날짜의 전체 종목 스냅샷 조회."""
95 return await self._ohlcv_repo.get_prices_by_date(trade_date)
97 async def get_price_history(self, code: str, days: int = 30) -> List[Dict]:
98 """특정 종목의 최근 N일간 스냅샷 이력 조회."""
99 return await self._ohlcv_repo.get_price_history(code, days)
101 async def get_latest_trade_date(self) -> Optional[str]:
102 """daily_prices에 저장된 가장 최근 거래일 반환."""
103 return await self._ohlcv_repo.get_latest_trade_date()
105 async def get_count_by_date(self, trade_date: str) -> int:
106 """특정 날짜에 저장된 종목 수 반환."""
107 return await self._ohlcv_repo.get_count_by_date(trade_date)
109 async def cleanup_old_data(self, keep_days: int = 365):
110 """오래된 daily_prices 데이터 정리."""
111 await self._ohlcv_repo.cleanup_old_data(keep_days)
113 async def get_latest_daily_snapshot(self, code: str) -> Optional[dict]:
114 """daily_prices에서 최신 스냅샷을 현재가 API 응답 포맷으로 변환하여 반환합니다."""
115 return await self._ohlcv_repo.get_latest_daily_snapshot(code)
117 # ── 스트리밍 상태 ─────────────────────────────────────────────────────────────
119 def mark_streaming(self, code: str) -> None:
120 """해당 종목이 실시간 스트리밍 중임을 등록. TTL 우회 활성화."""
121 self._price_repo.mark_streaming(code)
123 def unmark_streaming(self, code: str) -> None:
124 """실시간 스트리밍 종료. TTL 우회 해제."""
125 self._price_repo.unmark_streaming(code)
127 def is_streaming(self, code: str) -> bool:
128 """해당 종목이 현재 스트리밍 중인지 여부."""
129 return self._price_repo.is_streaming(code)
131 # ── 통합 캐시 통계 ────────────────────────────────────────────────────────────
133 def get_cache_stats(self, expand: bool = False, latest_trading_date: str = None, log_stats: bool = False) -> dict:
134 """현재가 캐시 + OHLCV 캐시의 통합 통계를 반환합니다."""
135 price_stats = self._price_repo.get_cache_stats(expand=expand)
136 ohlcv_stats = self._ohlcv_repo.get_cache_stats(expand=expand, latest_trading_date=latest_trading_date)
137 if log_stats: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 self._cache_logger.log_stats(price_stats, ohlcv_stats)
140 total_hits = price_stats["hits"] + ohlcv_stats["hits"]
141 total_misses = price_stats["misses"] + ohlcv_stats["misses"]
142 total = total_hits + total_misses
143 hit_rate = (total_hits / total * 100) if total > 0 else 0.0
145 result = {
146 "hits": total_hits,
147 "misses": total_misses,
148 "hit_rate": round(hit_rate, 2),
149 "total_requests": total,
150 "current_size": price_stats["current_size"] + ohlcv_stats["current_size"],
151 "callers": price_stats.get("callers", {}),
152 "price_cache": price_stats,
153 "ohlcv_cache": ohlcv_stats,
154 }
155 if expand: 155 ↛ 157line 155 didn't jump to line 157 because the condition on line 155 was always true
156 result["items"] = price_stats.get("items", []) + ohlcv_stats.get("items", [])
157 return result
159 async def close(self):
160 """DB 연결을 닫습니다."""
161 await self._ohlcv_repo.close()
163 def __del__(self):
164 if self._ohlcv_repo._write_conn: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 self._logger.warning("StockRepository was not closed explicitly.")