Coverage for repositories / stock_repository.py: 90%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# repositories/stock_repository.py 

2""" 

3StockRepository — 현재가(StockPriceRepository)와 OHLCV(StockOhlcvRepository)를 통합하는 Facade. 

4 

5기존 callers(MarketDataService, StreamingService 등)는 이 클래스만 참조하면 되며 

6내부적으로는 두 Repository에 위임한다. 

7""" 

8import logging 

9from typing import Optional, List, Dict, Any 

10 

11from repositories.cache import _LRUCache, _LFUCache # 하위호환 re-export 용 

12from repositories.stock_price_repository import StockPriceRepository 

13from repositories.stock_ohlcv_repository import StockOhlcvRepository 

14from core.logger import get_cache_event_logger 

15 

16 

17class StockRepository: 

18 """개별 종목 데이터(현재가 + OHLCV) 통합 Facade.""" 

19 

20 def __init__(self, db_path: str = None, logger=None): 

21 self._logger = logger or logging.getLogger(__name__) 

22 self._cache_logger = get_cache_event_logger() 

23 self._price_repo = StockPriceRepository(logger=self._logger, cache_logger=self._cache_logger) 

24 self._ohlcv_repo = StockOhlcvRepository(db_path=db_path, logger=self._logger, cache_logger=self._cache_logger) 

25 

26 # ── 하위호환 프로퍼티 (테스트 및 레거시 접근용) ───────────────────────────── 

27 

28 @property 

29 def _db_path(self) -> str: 

30 return self._ohlcv_repo._db_path 

31 

32 @property 

33 def _conn(self): 

34 return self._ohlcv_repo._write_conn 

35 

36 @_conn.setter 

37 def _conn(self, value): 

38 self._ohlcv_repo._write_conn = value 

39 

40 def _get_connection(self): 

41 """쓰기 전용 DB 연결 컨텍스트 매니저 (테스트에서 직접 접근 시 사용).""" 

42 return self._ohlcv_repo._get_write_connection() 

43 

44 # ── 현재가 캐시 ────────────────────────────────────────────────────────────── 

45 

46 def set_current_price(self, code: str, price_data: dict): 

47 """현재가 API 응답 전체 데이터를 캐시에 저장합니다.""" 

48 self._price_repo.set_current_price(code, price_data) 

49 

50 def get_current_price(self, code: str, max_age_sec: float = 3.0, 

51 count_stats: bool = True, caller: str = "unknown") -> Optional[dict]: 

52 """캐시된 현재가 데이터를 반환합니다. TTL 만료 시 None 반환.""" 

53 return self._price_repo.get_current_price( 

54 code, max_age_sec=max_age_sec, count_stats=count_stats, caller=caller 

55 ) 

56 

57 # ── OHLCV 캐시/DB ────────────────────────────────────────────────────────── 

58 

59 async def get_stock_data(self, code: str, ohlcv_limit: int = 600, 

60 caller: str = "unknown") -> Optional[Dict]: 

61 """메모리 캐시 또는 DB에서 OHLCV 데이터를 반환합니다.""" 

62 return await self._ohlcv_repo.get_stock_data(code, ohlcv_limit=ohlcv_limit, caller=caller) 

63 

64 async def upsert_ohlcv(self, records: List[Dict]): 

65 """여러 종목의 일봉(OHLCV) 데이터를 일괄 upsert 후 해당 종목 캐시 무효화.""" 

66 await self._ohlcv_repo.upsert_ohlcv(records) 

67 

68 async def get_ohlcv_summary(self, code: str) -> Dict[str, Any]: 

69 """DB에서 종목의 OHLCV 요약 정보를 반환합니다.""" 

70 return await self._ohlcv_repo.get_ohlcv_summary(code) 

71 

72 async def get_ohlcv_max_trading_days(self) -> int: 

73 """DB에 저장된 고유 거래일 수를 반환합니다.""" 

74 return await self._ohlcv_repo.get_ohlcv_max_trading_days() 

75 

76 # ── 실시간 틱 통합 업데이트 ─────────────────────────────────────────────────── 

77 

78 def update_realtime_data(self, code: str, current_price: float, volume: int = 0): 

79 """ 

80 장 중에 수신된 WebSocket 틱 데이터를 메모리 캐시에 즉시 반영합니다. 

81 - 현재가 캐시(price_repo) 갱신 

82 - OHLCV 당일 캔들(ohlcv_repo) 갱신 

83 """ 

84 self._price_repo.update_current_price(code, current_price, volume) 

85 self._ohlcv_repo.update_today_candle(code, current_price, volume) 

86 

87 # ── daily_prices (장마감 후 전종목 스냅샷) ────────────────────────────────── 

88 

89 async def upsert_daily_snapshot(self, trade_date: str, records: List[Dict]): 

90 """장마감 후 전체 종목 현재가+펀더멘털 스냅샷을 일괄 upsert.""" 

91 await self._ohlcv_repo.upsert_daily_snapshot(trade_date, records) 

92 

93 async def get_prices_by_date(self, trade_date: str) -> List[Dict]: 

94 """특정 날짜의 전체 종목 스냅샷 조회.""" 

95 return await self._ohlcv_repo.get_prices_by_date(trade_date) 

96 

97 async def get_price_history(self, code: str, days: int = 30) -> List[Dict]: 

98 """특정 종목의 최근 N일간 스냅샷 이력 조회.""" 

99 return await self._ohlcv_repo.get_price_history(code, days) 

100 

101 async def get_latest_trade_date(self) -> Optional[str]: 

102 """daily_prices에 저장된 가장 최근 거래일 반환.""" 

103 return await self._ohlcv_repo.get_latest_trade_date() 

104 

105 async def get_count_by_date(self, trade_date: str) -> int: 

106 """특정 날짜에 저장된 종목 수 반환.""" 

107 return await self._ohlcv_repo.get_count_by_date(trade_date) 

108 

109 async def cleanup_old_data(self, keep_days: int = 365): 

110 """오래된 daily_prices 데이터 정리.""" 

111 await self._ohlcv_repo.cleanup_old_data(keep_days) 

112 

113 async def get_latest_daily_snapshot(self, code: str) -> Optional[dict]: 

114 """daily_prices에서 최신 스냅샷을 현재가 API 응답 포맷으로 변환하여 반환합니다.""" 

115 return await self._ohlcv_repo.get_latest_daily_snapshot(code) 

116 

117 # ── 스트리밍 상태 ───────────────────────────────────────────────────────────── 

118 

119 def mark_streaming(self, code: str) -> None: 

120 """해당 종목이 실시간 스트리밍 중임을 등록. TTL 우회 활성화.""" 

121 self._price_repo.mark_streaming(code) 

122 

123 def unmark_streaming(self, code: str) -> None: 

124 """실시간 스트리밍 종료. TTL 우회 해제.""" 

125 self._price_repo.unmark_streaming(code) 

126 

127 def is_streaming(self, code: str) -> bool: 

128 """해당 종목이 현재 스트리밍 중인지 여부.""" 

129 return self._price_repo.is_streaming(code) 

130 

131 # ── 통합 캐시 통계 ──────────────────────────────────────────────────────────── 

132 

133 def get_cache_stats(self, expand: bool = False, latest_trading_date: str = None, log_stats: bool = False) -> dict: 

134 """현재가 캐시 + OHLCV 캐시의 통합 통계를 반환합니다.""" 

135 price_stats = self._price_repo.get_cache_stats(expand=expand) 

136 ohlcv_stats = self._ohlcv_repo.get_cache_stats(expand=expand, latest_trading_date=latest_trading_date) 

137 if log_stats: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 self._cache_logger.log_stats(price_stats, ohlcv_stats) 

139 

140 total_hits = price_stats["hits"] + ohlcv_stats["hits"] 

141 total_misses = price_stats["misses"] + ohlcv_stats["misses"] 

142 total = total_hits + total_misses 

143 hit_rate = (total_hits / total * 100) if total > 0 else 0.0 

144 

145 result = { 

146 "hits": total_hits, 

147 "misses": total_misses, 

148 "hit_rate": round(hit_rate, 2), 

149 "total_requests": total, 

150 "current_size": price_stats["current_size"] + ohlcv_stats["current_size"], 

151 "callers": price_stats.get("callers", {}), 

152 "price_cache": price_stats, 

153 "ohlcv_cache": ohlcv_stats, 

154 } 

155 if expand: 155 ↛ 157line 155 didn't jump to line 157 because the condition on line 155 was always true

156 result["items"] = price_stats.get("items", []) + ohlcv_stats.get("items", []) 

157 return result 

158 

159 async def close(self): 

160 """DB 연결을 닫습니다.""" 

161 await self._ohlcv_repo.close() 

162 

163 def __del__(self): 

164 if self._ohlcv_repo._write_conn: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 self._logger.warning("StockRepository was not closed explicitly.")