Coverage for scheduler / strategy_scheduler_store.py: 93%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# scheduler/strategy_scheduler_store.py 

2"""SQLite 기반 스케줄러 영속화 스토어. 

3 

4signal_history (시그널 실행 이력) 와 scheduler_state (활성 전략/설정) 를 

5단일 SQLite DB 파일에 ACID 트랜잭션으로 저장한다. 

6 

7레거시 파일이 존재하면 최초 1회 DB로 마이그레이션 후 .migrated 로 이름 변경. 

8""" 

9from __future__ import annotations 

10 

11import csv 

12import json 

13import logging 

14import os 

15import sqlite3 

16import threading 

17from typing import Optional 

18 

19SCHEDULER_DB_FILE = "data/StrategyScheduler/scheduler.db" 

20_LEGACY_SIGNAL_CSV = "data/StrategyScheduler/signal_history.csv" 

21_LEGACY_STATE_JSON = "data/StrategyScheduler/scheduler_state.json" 

22 

23_DDL_SIGNAL_HISTORY = """ 

24CREATE TABLE IF NOT EXISTS signal_history ( 

25 id INTEGER PRIMARY KEY AUTOINCREMENT, 

26 strategy_name TEXT NOT NULL, 

27 code TEXT NOT NULL, 

28 name TEXT NOT NULL, 

29 action TEXT NOT NULL, 

30 price INTEGER NOT NULL, 

31 qty INTEGER NOT NULL DEFAULT 1, 

32 return_rate REAL, 

33 reason TEXT, 

34 timestamp TEXT NOT NULL, 

35 api_success INTEGER NOT NULL DEFAULT 1 

36); 

37""" 

38 

39_DDL_SCHEDULER_STATE = """ 

40CREATE TABLE IF NOT EXISTS scheduler_state ( 

41 key TEXT PRIMARY KEY, 

42 value TEXT NOT NULL 

43); 

44""" 

45 

46 

47class StrategySchedulerStore: 

48 """signal_history + scheduler_state 를 SQLite 에 영속화.""" 

49 

50 def __init__( 

51 self, 

52 db_path: str = SCHEDULER_DB_FILE, 

53 logger: Optional[logging.Logger] = None, 

54 ): 

55 self._db_path = db_path 

56 self._logger = logger or logging.getLogger(__name__) 

57 self._lock = threading.Lock() 

58 os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) 

59 self._conn = self._init_db() 

60 self._migrate_legacy_files() 

61 

62 # ── 초기화 ── 

63 

64 def _init_db(self) -> sqlite3.Connection: 

65 conn = sqlite3.connect(self._db_path, check_same_thread=False) 

66 conn.execute("PRAGMA journal_mode=WAL") 

67 conn.execute("PRAGMA synchronous=NORMAL") 

68 conn.execute(_DDL_SIGNAL_HISTORY) 

69 conn.execute(_DDL_SCHEDULER_STATE) 

70 conn.commit() 

71 return conn 

72 

73 def close(self) -> None: 

74 with self._lock: 

75 if self._conn: 

76 self._conn.close() 

77 self._conn = None 

78 

79 # ── Signal History ── 

80 

81 def append_signal(self, record) -> None: 

82 """SignalRecord 1건을 DB에 삽입.""" 

83 with self._lock: 

84 self._conn.execute( 

85 """INSERT INTO signal_history 

86 (strategy_name, code, name, action, price, qty, 

87 return_rate, reason, timestamp, api_success) 

88 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", 

89 ( 

90 record.strategy_name, record.code, record.name, record.action, 

91 record.price, record.qty, record.return_rate, record.reason, 

92 record.timestamp, 1 if record.api_success else 0, 

93 ), 

94 ) 

95 self._conn.commit() 

96 

97 def load_signal_history(self, limit: int = 200) -> list: 

98 """최근 N건 시그널 이력을 오래된 순 dict list 로 반환.""" 

99 with self._lock: 

100 cur = self._conn.execute( 

101 """SELECT strategy_name, code, name, action, price, qty, 

102 return_rate, reason, timestamp, api_success 

103 FROM signal_history 

104 ORDER BY id DESC LIMIT ?""", 

105 (limit,), 

106 ) 

107 rows = cur.fetchall() 

108 return [ 

109 { 

110 "strategy_name": r[0], "code": r[1], "name": r[2], 

111 "action": r[3], "price": r[4], "qty": r[5], 

112 "return_rate": r[6], "reason": r[7], 

113 "timestamp": r[8], "api_success": bool(r[9]), 

114 } 

115 for r in reversed(rows) 

116 ] 

117 

118 # ── Scheduler State ── 

119 

120 def save_state(self, state: dict) -> None: 

121 with self._lock: 

122 self._conn.execute( 

123 "INSERT OR REPLACE INTO scheduler_state (key, value) VALUES ('state', ?)", 

124 (json.dumps(state, ensure_ascii=False),), 

125 ) 

126 self._conn.commit() 

127 

128 def load_state(self) -> Optional[dict]: 

129 with self._lock: 

130 cur = self._conn.execute( 

131 "SELECT value FROM scheduler_state WHERE key = 'state'" 

132 ) 

133 row = cur.fetchone() 

134 if not row: 

135 return None 

136 try: 

137 return json.loads(row[0]) 

138 except (json.JSONDecodeError, TypeError): 

139 return None 

140 

141 def clear_state(self) -> None: 

142 with self._lock: 

143 self._conn.execute("DELETE FROM scheduler_state WHERE key = 'state'") 

144 self._conn.commit() 

145 

146 # ── 레거시 파일 마이그레이션 ── 

147 

148 def _migrate_legacy_files(self) -> None: 

149 self._migrate_csv() 

150 self._migrate_json() 

151 

152 def _migrate_csv(self) -> None: 

153 if not os.path.exists(_LEGACY_SIGNAL_CSV): 

154 return 

155 try: 

156 with open(_LEGACY_SIGNAL_CSV, "r", encoding="utf-8") as f: 

157 rows = list(csv.DictReader(f)) 

158 

159 with self._lock: 

160 cur = self._conn.execute("SELECT COUNT(*) FROM signal_history") 

161 if cur.fetchone()[0] > 0: 

162 os.rename(_LEGACY_SIGNAL_CSV, _LEGACY_SIGNAL_CSV + ".migrated") 

163 return 

164 

165 for row in rows: 

166 rv = row.get("return_rate") 

167 try: 

168 return_rate: Optional[float] = float(rv) if rv else None 

169 except (ValueError, TypeError): 

170 return_rate = None 

171 try: 

172 self._conn.execute( 

173 """INSERT INTO signal_history 

174 (strategy_name, code, name, action, price, qty, 

175 return_rate, reason, timestamp, api_success) 

176 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", 

177 ( 

178 row["strategy_name"], row["code"], row["name"], row["action"], 

179 int(row["price"]), int(row.get("qty") or 1), return_rate, 

180 row["reason"], row["timestamp"], 

181 1 if row.get("api_success", "True") == "True" else 0, 

182 ), 

183 ) 

184 except Exception as e: 

185 self._logger.warning(f"[Store] CSV 행 마이그레이션 실패: {e}") 

186 self._conn.commit() 

187 

188 os.rename(_LEGACY_SIGNAL_CSV, _LEGACY_SIGNAL_CSV + ".migrated") 

189 self._logger.info(f"[Store] signal_history.csv → DB 마이그레이션 완료 ({len(rows)}건)") 

190 except Exception as e: 

191 self._logger.error(f"[Store] CSV 마이그레이션 실패: {e}") 

192 

193 def _migrate_json(self) -> None: 

194 if not os.path.exists(_LEGACY_STATE_JSON): 

195 return 

196 try: 

197 with open(_LEGACY_STATE_JSON, "r", encoding="utf-8") as f: 

198 state = json.load(f) 

199 

200 with self._lock: 

201 cur = self._conn.execute("SELECT COUNT(*) FROM scheduler_state") 

202 if cur.fetchone()[0] > 0: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 os.rename(_LEGACY_STATE_JSON, _LEGACY_STATE_JSON + ".migrated") 

204 return 

205 

206 self._conn.execute( 

207 "INSERT OR REPLACE INTO scheduler_state (key, value) VALUES ('state', ?)", 

208 (json.dumps(state, ensure_ascii=False),), 

209 ) 

210 self._conn.commit() 

211 

212 os.rename(_LEGACY_STATE_JSON, _LEGACY_STATE_JSON + ".migrated") 

213 self._logger.info("[Store] scheduler_state.json → DB 마이그레이션 완료") 

214 except Exception as e: 

215 self._logger.error(f"[Store] JSON 상태 마이그레이션 실패: {e}")