Coverage for repositories / stock_ohlcv_repository.py: 88%
227 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# repositories/stock_ohlcv_repository.py
2"""
3OHLCV 일봉 데이터 및 daily_prices 스냅샷을 관리하는 Repository.
4- LFU 캐시(용량 500): 자주 분석되는 종목이 캐시에 오래 남음
5- historical_complete 플래그: DB가 줄 수 있는 전체를 받은 경우 표시 → 불필요한 DB 재조회 방지
6- upsert_ohlcv 호출 시 해당 종목의 캐시 무효화 → 항상 신선한 DB 데이터 보장
7"""
8import os
9import sqlite3
10import time
11import logging
12import asyncio
13import aiosqlite
14from contextlib import asynccontextmanager
15from typing import Optional, List, Dict, Any, TYPE_CHECKING
17from repositories.cache import _LFUCache
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true
20 from core.logger import CacheEventLogger
22_OHLCV_CACHE_CAPACITY = 500
25class StockOhlcvRepository:
26 """OHLCV 일봉 데이터 전담 저장소 (LFU 인메모리 캐시 + SQLite)."""
28 def __init__(self, db_path: str = None, logger=None, cache_logger: "CacheEventLogger | None" = None):
29 self._logger = logger or logging.getLogger(__name__)
30 self._cache_logger = cache_logger
31 self._db_path = db_path or os.path.join("data", "stocks.db")
32 self._write_lock = None
33 self._write_conn: Optional[aiosqlite.Connection] = None
34 self._read_conn: Optional[aiosqlite.Connection] = None
35 self._read_conn_lock = asyncio.Lock()
37 # OHLCV 전용 LFU 캐시 — price 캐시와 물리적으로 분리
38 self._ohlcv_cache = _LFUCache(
39 capacity=_OHLCV_CACHE_CAPACITY,
40 on_evict=self._on_ohlcv_evicted,
41 )
43 os.makedirs(os.path.dirname(self._db_path), exist_ok=True)
44 self._init_db_sync()
46 def _on_ohlcv_evicted(self, code: str, freq: int, ohlcv_count: int) -> None:
47 if self._cache_logger: 47 ↛ exitline 47 didn't return from function '_on_ohlcv_evicted' because the condition on line 47 was always true
48 self._cache_logger.log_ohlcv_evicted(code, freq, ohlcv_count, capacity=self._ohlcv_cache.capacity)
50 def _init_db_sync(self):
51 try:
52 with sqlite3.connect(self._db_path) as conn:
53 conn.execute("PRAGMA journal_mode=WAL")
54 conn.execute("PRAGMA synchronous=NORMAL")
55 conn.execute("""
56 CREATE TABLE IF NOT EXISTS ohlcv (
57 code TEXT NOT NULL,
58 date TEXT NOT NULL,
59 open INTEGER,
60 high INTEGER,
61 low INTEGER,
62 close INTEGER,
63 volume INTEGER,
64 PRIMARY KEY (code, date)
65 )
66 """)
67 conn.execute("CREATE INDEX IF NOT EXISTS idx_ohlcv_date ON ohlcv(date)")
68 conn.execute("""
69 CREATE TABLE IF NOT EXISTS daily_prices (
70 code TEXT NOT NULL,
71 trade_date TEXT NOT NULL,
72 name TEXT,
73 current_price INTEGER,
74 open_price INTEGER,
75 high_price INTEGER,
76 low_price INTEGER,
77 prev_close INTEGER,
78 change_price INTEGER,
79 change_sign TEXT,
80 change_rate TEXT,
81 volume INTEGER,
82 trading_value INTEGER,
83 market_cap INTEGER,
84 per REAL,
85 pbr REAL,
86 eps REAL,
87 w52_high INTEGER,
88 w52_low INTEGER,
89 market TEXT,
90 collected_at REAL,
91 PRIMARY KEY (code, trade_date)
92 )
93 """)
94 conn.execute(
95 "CREATE INDEX IF NOT EXISTS idx_daily_prices_trade_date "
96 "ON daily_prices(trade_date)"
97 )
98 except Exception as e:
99 self._logger.error(f"StockOhlcvRepository DB 초기화 실패: {e}")
101 @asynccontextmanager
102 async def _get_write_connection(self):
103 """쓰기 전용 연결 — asyncio.Lock으로 직렬화, 커밋/롤백 보장."""
104 if self._write_lock is None:
105 self._write_lock = asyncio.Lock()
106 async with self._write_lock:
107 if self._write_conn is None:
108 self._write_conn = await aiosqlite.connect(self._db_path)
109 await self._write_conn.execute("PRAGMA synchronous=NORMAL")
110 try:
111 yield self._write_conn
112 await self._write_conn.commit()
113 except Exception:
114 await self._write_conn.rollback()
115 raise
117 @asynccontextmanager
118 async def _get_read_connection(self):
119 """읽기 전용 연결 — 한 번만 열고 재사용 (WAL 모드에서 안전). Double-checked locking으로 연결 누수 방지."""
120 if self._read_conn is None:
121 async with self._read_conn_lock:
122 if self._read_conn is None: 122 ↛ 126line 122 didn't jump to line 126
123 conn = await aiosqlite.connect(self._db_path)
124 conn.row_factory = aiosqlite.Row
125 self._read_conn = conn
126 yield self._read_conn
128 # ── OHLCV 캐시/DB ──────────────────────────────────────────────────────────
130 async def get_stock_data(self, code: str, ohlcv_limit: int = 600,
131 caller: str = "unknown") -> Optional[Dict]:
132 """
133 메모리 캐시 또는 로컬 DB에서 OHLCV 데이터를 반환합니다.
135 historical_complete=True이면 ohlcv_limit 미달이어도 캐시 히트 처리하여
136 DB가 줄 수 있는 전부를 이미 보유 중인 종목(예: 신규 상장 종목)의
137 반복 DB 재조회 loop를 방지합니다.
138 """
139 # 1. LFU 캐시 확인 — historical_complete 플래그 기반
140 # 캐시에 저장된 건수가 요청 ohlcv_limit 이상일 때만 히트 처리.
141 # 예) limit=90으로 90건 캐시 후 limit=600 요청이 오면 DB에서 600건 재로드.
142 cached = self._ohlcv_cache.get(code, count_stats=True, caller=caller, item_type="ohlcv")
143 if cached and cached.get("historical_complete"):
144 cached_count = len(cached.get("ohlcv_historical", []))
145 if cached_count >= ohlcv_limit:
146 ohlcv_today = cached.get("ohlcv_today")
147 ohlcv = cached["ohlcv_historical"][:]
148 if ohlcv_today: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 ohlcv = ohlcv + [ohlcv_today]
150 if self._cache_logger: 150 ↛ 154line 150 didn't jump to line 154 because the condition on line 150 was always true
151 self._cache_logger.log_ohlcv_hit(
152 code, caller, len(ohlcv), has_today_candle=ohlcv_today is not None
153 )
154 return {
155 "code": code,
156 "ohlcv": ohlcv,
157 "last_updated": cached["last_loaded"],
158 "historical_complete": True,
159 }
161 if self._cache_logger: 161 ↛ 165line 161 didn't jump to line 165 because the condition on line 161 was always true
162 self._cache_logger.log_ohlcv_miss(code, caller)
164 # 2. DB에서 읽기
165 try:
166 async with self._get_read_connection() as conn:
167 async with conn.execute(
168 "SELECT date, open, high, low, close, volume FROM ohlcv "
169 "WHERE code = ? ORDER BY date DESC LIMIT ?",
170 (code, ohlcv_limit)
171 ) as cursor:
172 ohlcv_rows = await cursor.fetchall()
174 if not ohlcv_rows:
175 return None
177 historical = [dict(r) for r in reversed(ohlcv_rows)]
178 entry = {
179 "ohlcv_historical": historical,
180 "ohlcv_today": None,
181 "historical_complete": True, # DB가 줄 수 있는 전부를 받았음
182 "last_loaded": time.time(),
183 }
184 self._ohlcv_cache.put(code, entry)
185 latest_date = historical[-1].get("date") if historical else None
186 if self._cache_logger: 186 ↛ 188line 186 didn't jump to line 188 because the condition on line 186 was always true
187 self._cache_logger.log_ohlcv_loaded(code, caller, len(historical), latest_date)
188 return {
189 "code": code,
190 "ohlcv": historical[:],
191 "last_updated": entry["last_loaded"],
192 "historical_complete": True,
193 }
194 except Exception as e:
195 self._logger.error(f"StockOhlcvRepository OHLCV 조회 실패 ({code}): {e}")
196 return None
198 def update_today_candle(self, code: str, current_price: float, volume: int = 0):
199 """
200 WebSocket 틱 데이터로 당일 OHLCV 캔들을 갱신합니다.
201 - ohlcv_today가 있으면 해당 캔들 업데이트
202 - ohlcv_today가 없으면 마지막 historical 캔들을 업데이트 (기존 동작 유지)
203 """
204 cached = self._ohlcv_cache.get(code, count_stats=False, item_type="update_tick")
205 if not cached:
206 return
208 is_new_candle = cached.get("ohlcv_today") is not None
209 target = cached.get("ohlcv_today")
210 if target is None and cached.get("ohlcv_historical"):
211 target = cached["ohlcv_historical"][-1]
213 if target is None: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 return
216 before_price = target.get("close")
217 target["close"] = current_price
218 if volume > 0: 218 ↛ 220line 218 didn't jump to line 220 because the condition on line 218 was always true
219 target["volume"] = volume
220 if current_price > target.get("high", current_price):
221 target["high"] = current_price
222 if current_price < target.get("low", current_price):
223 target["low"] = current_price
225 if self._cache_logger and before_price != current_price: 225 ↛ exitline 225 didn't return from function 'update_today_candle' because the condition on line 225 was always true
226 self._cache_logger.log_today_candle(
227 code, before_price, current_price,
228 target.get("high"), target.get("low"),
229 is_new_candle,
230 )
232 async def upsert_ohlcv(self, records: List[Dict]):
233 """여러 종목의 일봉(OHLCV) 데이터를 일괄 upsert 후 관련 캐시 무효화."""
234 if not records:
235 return
236 codes_to_invalidate = {r["code"] for r in records if "code" in r}
237 try:
238 async with self._get_write_connection() as conn:
239 await conn.executemany(
240 """
241 INSERT OR REPLACE INTO ohlcv (
242 code, date, open, high, low, close, volume
243 ) VALUES (
244 :code, :date, :open, :high, :low, :close, :volume
245 )
246 """,
247 records,
248 )
249 # upsert 성공 후에만 캐시 무효화 — 다음 get_stock_data 시 신선한 DB 데이터 로드
250 for code in codes_to_invalidate:
251 self._ohlcv_cache.delete(code)
252 if self._cache_logger: 252 ↛ 250line 252 didn't jump to line 250 because the condition on line 252 was always true
253 self._cache_logger.log_ohlcv_invalidated(code)
254 if self._cache_logger: 254 ↛ exitline 254 didn't return from function 'upsert_ohlcv' because the condition on line 254 was always true
255 self._cache_logger.log_ohlcv_upsert(
256 record_count=len(records),
257 code_count=len(codes_to_invalidate),
258 invalidated_codes=list(codes_to_invalidate),
259 )
260 except Exception as e:
261 self._logger.error(f"StockOhlcvRepository OHLCV upsert 실패: {e}")
263 async def get_ohlcv_summary(self, code: str) -> Dict[str, Any]:
264 """DB에서 종목의 OHLCV 요약 정보를 반환합니다 (전체 데이터 로드 없이 메타만 조회).
266 Returns:
267 {"count": int, "latest_date": str|None, "oldest_date": str|None}
268 """
269 try:
270 async with self._get_read_connection() as conn:
271 async with conn.execute(
272 "SELECT COUNT(*), MAX(date), MIN(date) FROM ohlcv WHERE code = ?",
273 (code,),
274 ) as cursor:
275 row = await cursor.fetchone()
276 if row and row[0]: 276 ↛ 280line 276 didn't jump to line 280 because the condition on line 276 was always true
277 return {"count": row[0], "latest_date": row[1], "oldest_date": row[2]}
278 except Exception as e:
279 self._logger.error(f"StockOhlcvRepository OHLCV 요약 조회 실패 ({code}): {e}")
280 return {"count": 0, "latest_date": None, "oldest_date": None}
282 async def get_ohlcv_max_trading_days(self) -> int:
283 """DB에 저장된 고유 거래일 수를 반환합니다."""
284 try:
285 async with self._get_read_connection() as conn:
286 async with conn.execute("SELECT COUNT(DISTINCT date) FROM ohlcv") as cursor:
287 row = await cursor.fetchone()
288 return row[0] if row and row[0] else 0
289 except Exception as e:
290 self._logger.error(f"StockOhlcvRepository 거래일 수 조회 실패: {e}")
291 return 0
293 # ── daily_prices (장마감 후 전종목 스냅샷) ──────────────────────────────────
295 async def upsert_daily_snapshot(self, trade_date: str, records: List[Dict]):
296 """장마감 후 전체 종목 현재가+펀더멘털 스냅샷을 일괄 upsert."""
297 if not records:
298 return
300 now = time.time()
301 try:
302 async with self._get_write_connection() as conn:
303 await conn.executemany(
304 """
305 INSERT OR REPLACE INTO daily_prices (
306 code, trade_date, name,
307 current_price, open_price, high_price, low_price, prev_close,
308 change_price, change_sign, change_rate,
309 volume, trading_value, market_cap,
310 per, pbr, eps,
311 w52_high, w52_low,
312 market, collected_at
313 ) VALUES (
314 :code, :trade_date, :name,
315 :current_price, :open_price, :high_price, :low_price, :prev_close,
316 :change_price, :change_sign, :change_rate,
317 :volume, :trading_value, :market_cap,
318 :per, :pbr, :eps,
319 :w52_high, :w52_low,
320 :market, :collected_at
321 )
322 """,
323 [{**r, "trade_date": trade_date, "collected_at": now} for r in records],
324 )
325 self._logger.debug(
326 f"StockOhlcvRepository: daily_prices {len(records)}건 upsert 완료 (date={trade_date})"
327 )
328 except Exception as e:
329 self._logger.error(f"StockOhlcvRepository daily_prices upsert 실패: {e}")
331 async def get_prices_by_date(self, trade_date: str) -> List[Dict]:
332 """특정 날짜의 전체 종목 스냅샷 조회."""
333 try:
334 async with self._get_read_connection() as conn:
335 async with conn.execute(
336 "SELECT * FROM daily_prices WHERE trade_date = ? ORDER BY code",
337 (trade_date,),
338 ) as cursor:
339 rows = await cursor.fetchall()
340 return [dict(row) for row in rows]
341 except Exception as e:
342 self._logger.error(f"StockOhlcvRepository daily_prices 날짜별 조회 실패: {e}")
343 return []
345 async def get_price_history(self, code: str, days: int = 30) -> List[Dict]:
346 """특정 종목의 최근 N일간 스냅샷 이력 조회."""
347 try:
348 async with self._get_read_connection() as conn:
349 async with conn.execute(
350 "SELECT * FROM daily_prices WHERE code = ? "
351 "ORDER BY trade_date DESC LIMIT ?",
352 (code, days),
353 ) as cursor:
354 rows = await cursor.fetchall()
355 return [dict(row) for row in rows]
356 except Exception as e:
357 self._logger.error(f"StockOhlcvRepository daily_prices 이력 조회 실패: {e}")
358 return []
360 async def get_latest_trade_date(self) -> Optional[str]:
361 """daily_prices에 저장된 가장 최근 거래일 반환."""
362 try:
363 async with self._get_read_connection() as conn:
364 async with conn.execute("SELECT MAX(trade_date) FROM daily_prices") as cursor:
365 row = await cursor.fetchone()
366 return row[0] if row and row[0] else None
367 except Exception as e:
368 self._logger.error(f"StockOhlcvRepository daily_prices 최근 거래일 조회 실패: {e}")
369 return None
371 async def get_count_by_date(self, trade_date: str) -> int:
372 """특정 날짜에 저장된 종목 수 반환."""
373 try:
374 async with self._get_read_connection() as conn:
375 async with conn.execute(
376 "SELECT COUNT(*) FROM daily_prices WHERE trade_date = ?",
377 (trade_date,),
378 ) as cursor:
379 row = await cursor.fetchone()
380 return row[0] if row else 0
381 except Exception as e:
382 self._logger.error(f"StockOhlcvRepository daily_prices 카운트 조회 실패: {e}")
383 return 0
385 async def cleanup_old_data(self, keep_days: int = 365):
386 """오래된 daily_prices 데이터 정리."""
387 from datetime import datetime, timedelta
389 cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime("%Y%m%d")
390 try:
391 async with self._get_write_connection() as conn:
392 async with conn.execute(
393 "DELETE FROM daily_prices WHERE trade_date < ?", (cutoff_date,)
394 ) as cursor:
395 deleted = cursor.rowcount
396 if deleted > 0: 396 ↛ exitline 396 didn't jump to the function exit
397 self._logger.info(
398 f"StockOhlcvRepository: {deleted}건 오래된 daily_prices 삭제 (기준: {cutoff_date})"
399 )
400 except Exception as e:
401 self._logger.error(f"StockOhlcvRepository daily_prices 데이터 정리 실패: {e}")
403 async def get_latest_daily_snapshot(self, code: str) -> Optional[dict]:
404 """daily_prices에서 최신 스냅샷을 현재가 API 응답 포맷으로 변환하여 반환합니다."""
405 try:
406 async with self._get_read_connection() as conn:
407 async with conn.execute(
408 "SELECT * FROM daily_prices WHERE code = ? ORDER BY trade_date DESC LIMIT 1",
409 (code,),
410 ) as cursor:
411 row = await cursor.fetchone()
412 if not row:
413 return None
414 r = dict(row)
415 output = {
416 "stck_prpr": str(r.get("current_price") or 0),
417 "stck_oprc": str(r.get("open_price") or 0),
418 "stck_hgpr": str(r.get("high_price") or 0),
419 "stck_lwpr": str(r.get("low_price") or 0),
420 "stck_sdpr": str(r.get("prev_close") or 0),
421 "prdy_vrss": str(r.get("change_price") or 0),
422 "prdy_vrss_sign": str(r.get("change_sign") or ""),
423 "prdy_ctrt": str(r.get("change_rate") or "0"),
424 "acml_vol": str(r.get("volume") or 0),
425 "acml_tr_pbmn": str(r.get("trading_value") or 0),
426 "hts_avls": str(r.get("market_cap") or 0),
427 "per": str(r.get("per") or ""),
428 "pbr": str(r.get("pbr") or ""),
429 "eps": str(r.get("eps") or ""),
430 "d250_hgpr": str(r.get("w52_high") or 0),
431 "d250_lwpr": str(r.get("w52_low") or 0),
432 "hts_kor_isnm": str(r.get("name") or ""),
433 "stck_bsop_date": str(r.get("trade_date") or ""),
434 "stck_shrn_iscd": str(r.get("code") or ""),
435 }
436 return {"output": output, "_source": "daily_snapshot", "_trade_date": r.get("trade_date")}
437 except Exception as e:
438 self._logger.error(f"StockOhlcvRepository daily_prices 스냅샷 조회 실패 ({code}): {e}")
439 return None
441 def get_cache_stats(self, expand: bool = False, latest_trading_date: str = None) -> dict:
442 """OHLCV 캐시 통계를 반환합니다."""
443 return self._ohlcv_cache.get_stats(expand=expand, latest_trading_date=latest_trading_date)
445 async def close(self):
446 """DB 연결(쓰기/읽기)을 닫습니다."""
447 if self._write_conn:
448 await self._write_conn.close()
449 self._write_conn = None
450 if self._read_conn:
451 await self._read_conn.close()
452 self._read_conn = None
454 def __del__(self):
455 if self._write_conn or self._read_conn:
456 self._logger.warning("StockOhlcvRepository was not closed explicitly.")