Coverage for scheduler / strategy_scheduler_store.py: 93%
107 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# scheduler/strategy_scheduler_store.py
2"""SQLite 기반 스케줄러 영속화 스토어.
4signal_history (시그널 실행 이력) 와 scheduler_state (활성 전략/설정) 를
5단일 SQLite DB 파일에 ACID 트랜잭션으로 저장한다.
7레거시 파일이 존재하면 최초 1회 DB로 마이그레이션 후 .migrated 로 이름 변경.
8"""
9from __future__ import annotations
11import csv
12import json
13import logging
14import os
15import sqlite3
16import threading
17from typing import Optional
19SCHEDULER_DB_FILE = "data/StrategyScheduler/scheduler.db"
20_LEGACY_SIGNAL_CSV = "data/StrategyScheduler/signal_history.csv"
21_LEGACY_STATE_JSON = "data/StrategyScheduler/scheduler_state.json"
23_DDL_SIGNAL_HISTORY = """
24CREATE TABLE IF NOT EXISTS signal_history (
25 id INTEGER PRIMARY KEY AUTOINCREMENT,
26 strategy_name TEXT NOT NULL,
27 code TEXT NOT NULL,
28 name TEXT NOT NULL,
29 action TEXT NOT NULL,
30 price INTEGER NOT NULL,
31 qty INTEGER NOT NULL DEFAULT 1,
32 return_rate REAL,
33 reason TEXT,
34 timestamp TEXT NOT NULL,
35 api_success INTEGER NOT NULL DEFAULT 1
36);
37"""
39_DDL_SCHEDULER_STATE = """
40CREATE TABLE IF NOT EXISTS scheduler_state (
41 key TEXT PRIMARY KEY,
42 value TEXT NOT NULL
43);
44"""
47class StrategySchedulerStore:
48 """signal_history + scheduler_state 를 SQLite 에 영속화."""
50 def __init__(
51 self,
52 db_path: str = SCHEDULER_DB_FILE,
53 logger: Optional[logging.Logger] = None,
54 ):
55 self._db_path = db_path
56 self._logger = logger or logging.getLogger(__name__)
57 self._lock = threading.Lock()
58 os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
59 self._conn = self._init_db()
60 self._migrate_legacy_files()
62 # ── 초기화 ──
64 def _init_db(self) -> sqlite3.Connection:
65 conn = sqlite3.connect(self._db_path, check_same_thread=False)
66 conn.execute("PRAGMA journal_mode=WAL")
67 conn.execute("PRAGMA synchronous=NORMAL")
68 conn.execute(_DDL_SIGNAL_HISTORY)
69 conn.execute(_DDL_SCHEDULER_STATE)
70 conn.commit()
71 return conn
73 def close(self) -> None:
74 with self._lock:
75 if self._conn:
76 self._conn.close()
77 self._conn = None
79 # ── Signal History ──
81 def append_signal(self, record) -> None:
82 """SignalRecord 1건을 DB에 삽입."""
83 with self._lock:
84 self._conn.execute(
85 """INSERT INTO signal_history
86 (strategy_name, code, name, action, price, qty,
87 return_rate, reason, timestamp, api_success)
88 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
89 (
90 record.strategy_name, record.code, record.name, record.action,
91 record.price, record.qty, record.return_rate, record.reason,
92 record.timestamp, 1 if record.api_success else 0,
93 ),
94 )
95 self._conn.commit()
97 def load_signal_history(self, limit: int = 200) -> list:
98 """최근 N건 시그널 이력을 오래된 순 dict list 로 반환."""
99 with self._lock:
100 cur = self._conn.execute(
101 """SELECT strategy_name, code, name, action, price, qty,
102 return_rate, reason, timestamp, api_success
103 FROM signal_history
104 ORDER BY id DESC LIMIT ?""",
105 (limit,),
106 )
107 rows = cur.fetchall()
108 return [
109 {
110 "strategy_name": r[0], "code": r[1], "name": r[2],
111 "action": r[3], "price": r[4], "qty": r[5],
112 "return_rate": r[6], "reason": r[7],
113 "timestamp": r[8], "api_success": bool(r[9]),
114 }
115 for r in reversed(rows)
116 ]
118 # ── Scheduler State ──
120 def save_state(self, state: dict) -> None:
121 with self._lock:
122 self._conn.execute(
123 "INSERT OR REPLACE INTO scheduler_state (key, value) VALUES ('state', ?)",
124 (json.dumps(state, ensure_ascii=False),),
125 )
126 self._conn.commit()
128 def load_state(self) -> Optional[dict]:
129 with self._lock:
130 cur = self._conn.execute(
131 "SELECT value FROM scheduler_state WHERE key = 'state'"
132 )
133 row = cur.fetchone()
134 if not row:
135 return None
136 try:
137 return json.loads(row[0])
138 except (json.JSONDecodeError, TypeError):
139 return None
141 def clear_state(self) -> None:
142 with self._lock:
143 self._conn.execute("DELETE FROM scheduler_state WHERE key = 'state'")
144 self._conn.commit()
146 # ── 레거시 파일 마이그레이션 ──
148 def _migrate_legacy_files(self) -> None:
149 self._migrate_csv()
150 self._migrate_json()
152 def _migrate_csv(self) -> None:
153 if not os.path.exists(_LEGACY_SIGNAL_CSV):
154 return
155 try:
156 with open(_LEGACY_SIGNAL_CSV, "r", encoding="utf-8") as f:
157 rows = list(csv.DictReader(f))
159 with self._lock:
160 cur = self._conn.execute("SELECT COUNT(*) FROM signal_history")
161 if cur.fetchone()[0] > 0:
162 os.rename(_LEGACY_SIGNAL_CSV, _LEGACY_SIGNAL_CSV + ".migrated")
163 return
165 for row in rows:
166 rv = row.get("return_rate")
167 try:
168 return_rate: Optional[float] = float(rv) if rv else None
169 except (ValueError, TypeError):
170 return_rate = None
171 try:
172 self._conn.execute(
173 """INSERT INTO signal_history
174 (strategy_name, code, name, action, price, qty,
175 return_rate, reason, timestamp, api_success)
176 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
177 (
178 row["strategy_name"], row["code"], row["name"], row["action"],
179 int(row["price"]), int(row.get("qty") or 1), return_rate,
180 row["reason"], row["timestamp"],
181 1 if row.get("api_success", "True") == "True" else 0,
182 ),
183 )
184 except Exception as e:
185 self._logger.warning(f"[Store] CSV 행 마이그레이션 실패: {e}")
186 self._conn.commit()
188 os.rename(_LEGACY_SIGNAL_CSV, _LEGACY_SIGNAL_CSV + ".migrated")
189 self._logger.info(f"[Store] signal_history.csv → DB 마이그레이션 완료 ({len(rows)}건)")
190 except Exception as e:
191 self._logger.error(f"[Store] CSV 마이그레이션 실패: {e}")
193 def _migrate_json(self) -> None:
194 if not os.path.exists(_LEGACY_STATE_JSON):
195 return
196 try:
197 with open(_LEGACY_STATE_JSON, "r", encoding="utf-8") as f:
198 state = json.load(f)
200 with self._lock:
201 cur = self._conn.execute("SELECT COUNT(*) FROM scheduler_state")
202 if cur.fetchone()[0] > 0: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 os.rename(_LEGACY_STATE_JSON, _LEGACY_STATE_JSON + ".migrated")
204 return
206 self._conn.execute(
207 "INSERT OR REPLACE INTO scheduler_state (key, value) VALUES ('state', ?)",
208 (json.dumps(state, ensure_ascii=False),),
209 )
210 self._conn.commit()
212 os.rename(_LEGACY_STATE_JSON, _LEGACY_STATE_JSON + ".migrated")
213 self._logger.info("[Store] scheduler_state.json → DB 마이그레이션 완료")
214 except Exception as e:
215 self._logger.error(f"[Store] JSON 상태 마이그레이션 실패: {e}")