Coverage for repositories / stock_ohlcv_repository.py: 88%

227 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# repositories/stock_ohlcv_repository.py 

2""" 

3OHLCV 일봉 데이터 및 daily_prices 스냅샷을 관리하는 Repository. 

4- LFU 캐시(용량 500): 자주 분석되는 종목이 캐시에 오래 남음 

5- historical_complete 플래그: DB가 줄 수 있는 전체를 받은 경우 표시 → 불필요한 DB 재조회 방지 

6- upsert_ohlcv 호출 시 해당 종목의 캐시 무효화 → 항상 신선한 DB 데이터 보장 

7""" 

8import os 

9import sqlite3 

10import time 

11import logging 

12import asyncio 

13import aiosqlite 

14from contextlib import asynccontextmanager 

15from typing import Optional, List, Dict, Any, TYPE_CHECKING 

16 

17from repositories.cache import _LFUCache 

18 

19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true

20 from core.logger import CacheEventLogger 

21 

22_OHLCV_CACHE_CAPACITY = 500 

23 

24 

25class StockOhlcvRepository: 

26 """OHLCV 일봉 데이터 전담 저장소 (LFU 인메모리 캐시 + SQLite).""" 

27 

28 def __init__(self, db_path: str = None, logger=None, cache_logger: "CacheEventLogger | None" = None): 

29 self._logger = logger or logging.getLogger(__name__) 

30 self._cache_logger = cache_logger 

31 self._db_path = db_path or os.path.join("data", "stocks.db") 

32 self._write_lock = None 

33 self._write_conn: Optional[aiosqlite.Connection] = None 

34 self._read_conn: Optional[aiosqlite.Connection] = None 

35 self._read_conn_lock = asyncio.Lock() 

36 

37 # OHLCV 전용 LFU 캐시 — price 캐시와 물리적으로 분리 

38 self._ohlcv_cache = _LFUCache( 

39 capacity=_OHLCV_CACHE_CAPACITY, 

40 on_evict=self._on_ohlcv_evicted, 

41 ) 

42 

43 os.makedirs(os.path.dirname(self._db_path), exist_ok=True) 

44 self._init_db_sync() 

45 

46 def _on_ohlcv_evicted(self, code: str, freq: int, ohlcv_count: int) -> None: 

47 if self._cache_logger: 47 ↛ exitline 47 didn't return from function '_on_ohlcv_evicted' because the condition on line 47 was always true

48 self._cache_logger.log_ohlcv_evicted(code, freq, ohlcv_count, capacity=self._ohlcv_cache.capacity) 

49 

50 def _init_db_sync(self): 

51 try: 

52 with sqlite3.connect(self._db_path) as conn: 

53 conn.execute("PRAGMA journal_mode=WAL") 

54 conn.execute("PRAGMA synchronous=NORMAL") 

55 conn.execute(""" 

56 CREATE TABLE IF NOT EXISTS ohlcv ( 

57 code TEXT NOT NULL, 

58 date TEXT NOT NULL, 

59 open INTEGER, 

60 high INTEGER, 

61 low INTEGER, 

62 close INTEGER, 

63 volume INTEGER, 

64 PRIMARY KEY (code, date) 

65 ) 

66 """) 

67 conn.execute("CREATE INDEX IF NOT EXISTS idx_ohlcv_date ON ohlcv(date)") 

68 conn.execute(""" 

69 CREATE TABLE IF NOT EXISTS daily_prices ( 

70 code TEXT NOT NULL, 

71 trade_date TEXT NOT NULL, 

72 name TEXT, 

73 current_price INTEGER, 

74 open_price INTEGER, 

75 high_price INTEGER, 

76 low_price INTEGER, 

77 prev_close INTEGER, 

78 change_price INTEGER, 

79 change_sign TEXT, 

80 change_rate TEXT, 

81 volume INTEGER, 

82 trading_value INTEGER, 

83 market_cap INTEGER, 

84 per REAL, 

85 pbr REAL, 

86 eps REAL, 

87 w52_high INTEGER, 

88 w52_low INTEGER, 

89 market TEXT, 

90 collected_at REAL, 

91 PRIMARY KEY (code, trade_date) 

92 ) 

93 """) 

94 conn.execute( 

95 "CREATE INDEX IF NOT EXISTS idx_daily_prices_trade_date " 

96 "ON daily_prices(trade_date)" 

97 ) 

98 except Exception as e: 

99 self._logger.error(f"StockOhlcvRepository DB 초기화 실패: {e}") 

100 

101 @asynccontextmanager 

102 async def _get_write_connection(self): 

103 """쓰기 전용 연결 — asyncio.Lock으로 직렬화, 커밋/롤백 보장.""" 

104 if self._write_lock is None: 

105 self._write_lock = asyncio.Lock() 

106 async with self._write_lock: 

107 if self._write_conn is None: 

108 self._write_conn = await aiosqlite.connect(self._db_path) 

109 await self._write_conn.execute("PRAGMA synchronous=NORMAL") 

110 try: 

111 yield self._write_conn 

112 await self._write_conn.commit() 

113 except Exception: 

114 await self._write_conn.rollback() 

115 raise 

116 

117 @asynccontextmanager 

118 async def _get_read_connection(self): 

119 """읽기 전용 연결 — 한 번만 열고 재사용 (WAL 모드에서 안전). Double-checked locking으로 연결 누수 방지.""" 

120 if self._read_conn is None: 

121 async with self._read_conn_lock: 

122 if self._read_conn is None: 122 ↛ 126line 122 didn't jump to line 126

123 conn = await aiosqlite.connect(self._db_path) 

124 conn.row_factory = aiosqlite.Row 

125 self._read_conn = conn 

126 yield self._read_conn 

127 

128 # ── OHLCV 캐시/DB ────────────────────────────────────────────────────────── 

129 

130 async def get_stock_data(self, code: str, ohlcv_limit: int = 600, 

131 caller: str = "unknown") -> Optional[Dict]: 

132 """ 

133 메모리 캐시 또는 로컬 DB에서 OHLCV 데이터를 반환합니다. 

134 

135 historical_complete=True이면 ohlcv_limit 미달이어도 캐시 히트 처리하여 

136 DB가 줄 수 있는 전부를 이미 보유 중인 종목(예: 신규 상장 종목)의 

137 반복 DB 재조회 loop를 방지합니다. 

138 """ 

139 # 1. LFU 캐시 확인 — historical_complete 플래그 기반 

140 # 캐시에 저장된 건수가 요청 ohlcv_limit 이상일 때만 히트 처리. 

141 # 예) limit=90으로 90건 캐시 후 limit=600 요청이 오면 DB에서 600건 재로드. 

142 cached = self._ohlcv_cache.get(code, count_stats=True, caller=caller, item_type="ohlcv") 

143 if cached and cached.get("historical_complete"): 

144 cached_count = len(cached.get("ohlcv_historical", [])) 

145 if cached_count >= ohlcv_limit: 

146 ohlcv_today = cached.get("ohlcv_today") 

147 ohlcv = cached["ohlcv_historical"][:] 

148 if ohlcv_today: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 ohlcv = ohlcv + [ohlcv_today] 

150 if self._cache_logger: 150 ↛ 154line 150 didn't jump to line 154 because the condition on line 150 was always true

151 self._cache_logger.log_ohlcv_hit( 

152 code, caller, len(ohlcv), has_today_candle=ohlcv_today is not None 

153 ) 

154 return { 

155 "code": code, 

156 "ohlcv": ohlcv, 

157 "last_updated": cached["last_loaded"], 

158 "historical_complete": True, 

159 } 

160 

161 if self._cache_logger: 161 ↛ 165line 161 didn't jump to line 165 because the condition on line 161 was always true

162 self._cache_logger.log_ohlcv_miss(code, caller) 

163 

164 # 2. DB에서 읽기 

165 try: 

166 async with self._get_read_connection() as conn: 

167 async with conn.execute( 

168 "SELECT date, open, high, low, close, volume FROM ohlcv " 

169 "WHERE code = ? ORDER BY date DESC LIMIT ?", 

170 (code, ohlcv_limit) 

171 ) as cursor: 

172 ohlcv_rows = await cursor.fetchall() 

173 

174 if not ohlcv_rows: 

175 return None 

176 

177 historical = [dict(r) for r in reversed(ohlcv_rows)] 

178 entry = { 

179 "ohlcv_historical": historical, 

180 "ohlcv_today": None, 

181 "historical_complete": True, # DB가 줄 수 있는 전부를 받았음 

182 "last_loaded": time.time(), 

183 } 

184 self._ohlcv_cache.put(code, entry) 

185 latest_date = historical[-1].get("date") if historical else None 

186 if self._cache_logger: 186 ↛ 188line 186 didn't jump to line 188 because the condition on line 186 was always true

187 self._cache_logger.log_ohlcv_loaded(code, caller, len(historical), latest_date) 

188 return { 

189 "code": code, 

190 "ohlcv": historical[:], 

191 "last_updated": entry["last_loaded"], 

192 "historical_complete": True, 

193 } 

194 except Exception as e: 

195 self._logger.error(f"StockOhlcvRepository OHLCV 조회 실패 ({code}): {e}") 

196 return None 

197 

198 def update_today_candle(self, code: str, current_price: float, volume: int = 0): 

199 """ 

200 WebSocket 틱 데이터로 당일 OHLCV 캔들을 갱신합니다. 

201 - ohlcv_today가 있으면 해당 캔들 업데이트 

202 - ohlcv_today가 없으면 마지막 historical 캔들을 업데이트 (기존 동작 유지) 

203 """ 

204 cached = self._ohlcv_cache.get(code, count_stats=False, item_type="update_tick") 

205 if not cached: 

206 return 

207 

208 is_new_candle = cached.get("ohlcv_today") is not None 

209 target = cached.get("ohlcv_today") 

210 if target is None and cached.get("ohlcv_historical"): 

211 target = cached["ohlcv_historical"][-1] 

212 

213 if target is None: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 return 

215 

216 before_price = target.get("close") 

217 target["close"] = current_price 

218 if volume > 0: 218 ↛ 220line 218 didn't jump to line 220 because the condition on line 218 was always true

219 target["volume"] = volume 

220 if current_price > target.get("high", current_price): 

221 target["high"] = current_price 

222 if current_price < target.get("low", current_price): 

223 target["low"] = current_price 

224 

225 if self._cache_logger and before_price != current_price: 225 ↛ exitline 225 didn't return from function 'update_today_candle' because the condition on line 225 was always true

226 self._cache_logger.log_today_candle( 

227 code, before_price, current_price, 

228 target.get("high"), target.get("low"), 

229 is_new_candle, 

230 ) 

231 

232 async def upsert_ohlcv(self, records: List[Dict]): 

233 """여러 종목의 일봉(OHLCV) 데이터를 일괄 upsert 후 관련 캐시 무효화.""" 

234 if not records: 

235 return 

236 codes_to_invalidate = {r["code"] for r in records if "code" in r} 

237 try: 

238 async with self._get_write_connection() as conn: 

239 await conn.executemany( 

240 """ 

241 INSERT OR REPLACE INTO ohlcv ( 

242 code, date, open, high, low, close, volume 

243 ) VALUES ( 

244 :code, :date, :open, :high, :low, :close, :volume 

245 ) 

246 """, 

247 records, 

248 ) 

249 # upsert 성공 후에만 캐시 무효화 — 다음 get_stock_data 시 신선한 DB 데이터 로드 

250 for code in codes_to_invalidate: 

251 self._ohlcv_cache.delete(code) 

252 if self._cache_logger: 252 ↛ 250line 252 didn't jump to line 250 because the condition on line 252 was always true

253 self._cache_logger.log_ohlcv_invalidated(code) 

254 if self._cache_logger: 254 ↛ exitline 254 didn't return from function 'upsert_ohlcv' because the condition on line 254 was always true

255 self._cache_logger.log_ohlcv_upsert( 

256 record_count=len(records), 

257 code_count=len(codes_to_invalidate), 

258 invalidated_codes=list(codes_to_invalidate), 

259 ) 

260 except Exception as e: 

261 self._logger.error(f"StockOhlcvRepository OHLCV upsert 실패: {e}") 

262 

263 async def get_ohlcv_summary(self, code: str) -> Dict[str, Any]: 

264 """DB에서 종목의 OHLCV 요약 정보를 반환합니다 (전체 데이터 로드 없이 메타만 조회). 

265 

266 Returns: 

267 {"count": int, "latest_date": str|None, "oldest_date": str|None} 

268 """ 

269 try: 

270 async with self._get_read_connection() as conn: 

271 async with conn.execute( 

272 "SELECT COUNT(*), MAX(date), MIN(date) FROM ohlcv WHERE code = ?", 

273 (code,), 

274 ) as cursor: 

275 row = await cursor.fetchone() 

276 if row and row[0]: 276 ↛ 280line 276 didn't jump to line 280 because the condition on line 276 was always true

277 return {"count": row[0], "latest_date": row[1], "oldest_date": row[2]} 

278 except Exception as e: 

279 self._logger.error(f"StockOhlcvRepository OHLCV 요약 조회 실패 ({code}): {e}") 

280 return {"count": 0, "latest_date": None, "oldest_date": None} 

281 

282 async def get_ohlcv_max_trading_days(self) -> int: 

283 """DB에 저장된 고유 거래일 수를 반환합니다.""" 

284 try: 

285 async with self._get_read_connection() as conn: 

286 async with conn.execute("SELECT COUNT(DISTINCT date) FROM ohlcv") as cursor: 

287 row = await cursor.fetchone() 

288 return row[0] if row and row[0] else 0 

289 except Exception as e: 

290 self._logger.error(f"StockOhlcvRepository 거래일 수 조회 실패: {e}") 

291 return 0 

292 

293 # ── daily_prices (장마감 후 전종목 스냅샷) ────────────────────────────────── 

294 

295 async def upsert_daily_snapshot(self, trade_date: str, records: List[Dict]): 

296 """장마감 후 전체 종목 현재가+펀더멘털 스냅샷을 일괄 upsert.""" 

297 if not records: 

298 return 

299 

300 now = time.time() 

301 try: 

302 async with self._get_write_connection() as conn: 

303 await conn.executemany( 

304 """ 

305 INSERT OR REPLACE INTO daily_prices ( 

306 code, trade_date, name, 

307 current_price, open_price, high_price, low_price, prev_close, 

308 change_price, change_sign, change_rate, 

309 volume, trading_value, market_cap, 

310 per, pbr, eps, 

311 w52_high, w52_low, 

312 market, collected_at 

313 ) VALUES ( 

314 :code, :trade_date, :name, 

315 :current_price, :open_price, :high_price, :low_price, :prev_close, 

316 :change_price, :change_sign, :change_rate, 

317 :volume, :trading_value, :market_cap, 

318 :per, :pbr, :eps, 

319 :w52_high, :w52_low, 

320 :market, :collected_at 

321 ) 

322 """, 

323 [{**r, "trade_date": trade_date, "collected_at": now} for r in records], 

324 ) 

325 self._logger.debug( 

326 f"StockOhlcvRepository: daily_prices {len(records)}건 upsert 완료 (date={trade_date})" 

327 ) 

328 except Exception as e: 

329 self._logger.error(f"StockOhlcvRepository daily_prices upsert 실패: {e}") 

330 

331 async def get_prices_by_date(self, trade_date: str) -> List[Dict]: 

332 """특정 날짜의 전체 종목 스냅샷 조회.""" 

333 try: 

334 async with self._get_read_connection() as conn: 

335 async with conn.execute( 

336 "SELECT * FROM daily_prices WHERE trade_date = ? ORDER BY code", 

337 (trade_date,), 

338 ) as cursor: 

339 rows = await cursor.fetchall() 

340 return [dict(row) for row in rows] 

341 except Exception as e: 

342 self._logger.error(f"StockOhlcvRepository daily_prices 날짜별 조회 실패: {e}") 

343 return [] 

344 

345 async def get_price_history(self, code: str, days: int = 30) -> List[Dict]: 

346 """특정 종목의 최근 N일간 스냅샷 이력 조회.""" 

347 try: 

348 async with self._get_read_connection() as conn: 

349 async with conn.execute( 

350 "SELECT * FROM daily_prices WHERE code = ? " 

351 "ORDER BY trade_date DESC LIMIT ?", 

352 (code, days), 

353 ) as cursor: 

354 rows = await cursor.fetchall() 

355 return [dict(row) for row in rows] 

356 except Exception as e: 

357 self._logger.error(f"StockOhlcvRepository daily_prices 이력 조회 실패: {e}") 

358 return [] 

359 

360 async def get_latest_trade_date(self) -> Optional[str]: 

361 """daily_prices에 저장된 가장 최근 거래일 반환.""" 

362 try: 

363 async with self._get_read_connection() as conn: 

364 async with conn.execute("SELECT MAX(trade_date) FROM daily_prices") as cursor: 

365 row = await cursor.fetchone() 

366 return row[0] if row and row[0] else None 

367 except Exception as e: 

368 self._logger.error(f"StockOhlcvRepository daily_prices 최근 거래일 조회 실패: {e}") 

369 return None 

370 

371 async def get_count_by_date(self, trade_date: str) -> int: 

372 """특정 날짜에 저장된 종목 수 반환.""" 

373 try: 

374 async with self._get_read_connection() as conn: 

375 async with conn.execute( 

376 "SELECT COUNT(*) FROM daily_prices WHERE trade_date = ?", 

377 (trade_date,), 

378 ) as cursor: 

379 row = await cursor.fetchone() 

380 return row[0] if row else 0 

381 except Exception as e: 

382 self._logger.error(f"StockOhlcvRepository daily_prices 카운트 조회 실패: {e}") 

383 return 0 

384 

385 async def cleanup_old_data(self, keep_days: int = 365): 

386 """오래된 daily_prices 데이터 정리.""" 

387 from datetime import datetime, timedelta 

388 

389 cutoff_date = (datetime.now() - timedelta(days=keep_days)).strftime("%Y%m%d") 

390 try: 

391 async with self._get_write_connection() as conn: 

392 async with conn.execute( 

393 "DELETE FROM daily_prices WHERE trade_date < ?", (cutoff_date,) 

394 ) as cursor: 

395 deleted = cursor.rowcount 

396 if deleted > 0: 396 ↛ exitline 396 didn't jump to the function exit

397 self._logger.info( 

398 f"StockOhlcvRepository: {deleted}건 오래된 daily_prices 삭제 (기준: {cutoff_date})" 

399 ) 

400 except Exception as e: 

401 self._logger.error(f"StockOhlcvRepository daily_prices 데이터 정리 실패: {e}") 

402 

403 async def get_latest_daily_snapshot(self, code: str) -> Optional[dict]: 

404 """daily_prices에서 최신 스냅샷을 현재가 API 응답 포맷으로 변환하여 반환합니다.""" 

405 try: 

406 async with self._get_read_connection() as conn: 

407 async with conn.execute( 

408 "SELECT * FROM daily_prices WHERE code = ? ORDER BY trade_date DESC LIMIT 1", 

409 (code,), 

410 ) as cursor: 

411 row = await cursor.fetchone() 

412 if not row: 

413 return None 

414 r = dict(row) 

415 output = { 

416 "stck_prpr": str(r.get("current_price") or 0), 

417 "stck_oprc": str(r.get("open_price") or 0), 

418 "stck_hgpr": str(r.get("high_price") or 0), 

419 "stck_lwpr": str(r.get("low_price") or 0), 

420 "stck_sdpr": str(r.get("prev_close") or 0), 

421 "prdy_vrss": str(r.get("change_price") or 0), 

422 "prdy_vrss_sign": str(r.get("change_sign") or ""), 

423 "prdy_ctrt": str(r.get("change_rate") or "0"), 

424 "acml_vol": str(r.get("volume") or 0), 

425 "acml_tr_pbmn": str(r.get("trading_value") or 0), 

426 "hts_avls": str(r.get("market_cap") or 0), 

427 "per": str(r.get("per") or ""), 

428 "pbr": str(r.get("pbr") or ""), 

429 "eps": str(r.get("eps") or ""), 

430 "d250_hgpr": str(r.get("w52_high") or 0), 

431 "d250_lwpr": str(r.get("w52_low") or 0), 

432 "hts_kor_isnm": str(r.get("name") or ""), 

433 "stck_bsop_date": str(r.get("trade_date") or ""), 

434 "stck_shrn_iscd": str(r.get("code") or ""), 

435 } 

436 return {"output": output, "_source": "daily_snapshot", "_trade_date": r.get("trade_date")} 

437 except Exception as e: 

438 self._logger.error(f"StockOhlcvRepository daily_prices 스냅샷 조회 실패 ({code}): {e}") 

439 return None 

440 

441 def get_cache_stats(self, expand: bool = False, latest_trading_date: str = None) -> dict: 

442 """OHLCV 캐시 통계를 반환합니다.""" 

443 return self._ohlcv_cache.get_stats(expand=expand, latest_trading_date=latest_trading_date) 

444 

445 async def close(self): 

446 """DB 연결(쓰기/읽기)을 닫습니다.""" 

447 if self._write_conn: 

448 await self._write_conn.close() 

449 self._write_conn = None 

450 if self._read_conn: 

451 await self._read_conn.close() 

452 self._read_conn = None 

453 

454 def __del__(self): 

455 if self._write_conn or self._read_conn: 

456 self._logger.warning("StockOhlcvRepository was not closed explicitly.")