Coverage for repositories / virtual_trade_repository.py: 90%

437 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# repositories/virtual_trade_repository.py 

2import bisect 

3import numpy as np 

4import pandas as pd 

5import asyncio 

6import threading 

7import os 

8import json 

9import math 

10import logging 

11from datetime import datetime, timedelta 

12from functools import lru_cache 

13from core.market_clock import MarketClock 

14from utils.transaction_cost_utils import TransactionCostUtils 

15logger = logging.getLogger(__name__) 

16 

17COLUMNS = ["strategy", "code", "buy_date", "buy_price", "qty", "sell_date", "sell_price", "return_rate", "status"] 

18SNAPSHOT_FILENAME = "portfolio_snapshots.json" 

19 

20 

21@lru_cache(maxsize=1024) 

22def _is_weekday(date_str: str) -> bool: 

23 # 이미 처리한 날짜는 다시 계산하지 않음 

24 return datetime.strptime(date_str, "%Y-%m-%d").weekday() < 5 

25 

26 

27def _strategy_values(snapshot: dict) -> dict: 

28 """스냅샷에서 개별 전략 값만 추출 (ALL 제외). ALL은 파생 값이라 비교에서 제외.""" 

29 return {k: v for k, v in snapshot.items() if k != "ALL"} 

30 

31 

32def _get_trading_dates(daily: dict) -> list[str]: 

33 """스냅샷 dict에서 실제 거래일만 추출 (평일 + 개별 전략 값이 변한 날짜). 오름차순 반환.""" 

34 weekday_dates = sorted(d for d in daily if _is_weekday(d)) 

35 if not weekday_dates: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 return [] 

37 trading = [weekday_dates[0]] # 첫 날은 항상 포함 

38 for d in weekday_dates[1:]: 

39 if _strategy_values(daily[d]) != _strategy_values(daily[trading[-1]]): 

40 trading.append(d) 

41 return trading 

42PRICE_CACHE_FILENAME = "close_price_cache.json" 

43 

44 

45class VirtualTradeRepository: 

46 def __init__(self, filename="data/VirtualTradeRepository/trade_journal.csv", market_clock: MarketClock = None): 

47 self._cached_data = None # 메모리 캐시 변수 추가 

48 self.filename = filename 

49 self.tm = market_clock if market_clock else MarketClock() 

50 self._lock = threading.Lock() 

51 os.makedirs(os.path.dirname(self.filename), exist_ok=True) # 데이터 디렉토리 생성 

52 if not os.path.exists(self.filename): 

53 pd.DataFrame(columns=COLUMNS).to_csv(self.filename, index=False) 

54 

55 def _read(self) -> pd.DataFrame: 

56 df = pd.read_csv(self.filename, dtype={'code': str, 'sell_date': object}, encoding='utf-8') 

57 df['return_rate'] = df['return_rate'].fillna(0.0) 

58 # 기존 파일 호환성: qty 컬럼이 없으면 기본값 1로 채움 

59 if 'qty' not in df.columns: 

60 df['qty'] = 1 

61 return df 

62 

63 def _write(self, df: pd.DataFrame): 

64 df.to_csv(self.filename, index=False, encoding='utf-8') 

65 

66 # ---- 매수/매도 ---- 

67 

68 def log_buy(self, strategy_name: str, code: str, current_price, qty: int = 1): 

69 """가상 매수 기록. 동일 전략+종목 중복 매수 방지.""" 

70 with self._lock: 

71 if self.is_holding(strategy_name, code): 

72 logger.info(f"[가상매매] {strategy_name}/{code} 이미 보유 중 — 매수 스킵") 

73 return 

74 buy_date = self.tm.get_current_kst_time().strftime("%Y-%m-%d %H:%M:%S") 

75 # 기존 CSV 헤더 순서에 맞춰 append 

76 try: 

77 existing_cols = pd.read_csv(self.filename, nrows=0, encoding='utf-8').columns.tolist() 

78 except Exception: 

79 existing_cols = COLUMNS 

80 

81 new_row = pd.DataFrame({ 

82 "strategy": [strategy_name], 

83 "code": [code], 

84 "buy_date": [buy_date], 

85 "buy_price": [current_price], 

86 "qty": [qty], 

87 "sell_date": [np.nan], 

88 "sell_price": [np.nan], 

89 "return_rate": [0.0], 

90 "status": ["HOLD"] 

91 }) 

92 new_row = new_row[existing_cols] 

93 new_row.to_csv(self.filename, mode='a', header=False, index=False, encoding='utf-8') 

94 logger.info(f"[가상매매] {strategy_name}/{code} 매수 기록 (가격: {current_price}, 수량: {qty})") 

95 

96 async def log_buy_async(self, strategy_name: str, code: str, current_price, qty: int = 1): 

97 """log_buy의 비동기 래퍼 (스레드 실행).""" 

98 await asyncio.to_thread(self.log_buy, strategy_name, code, current_price, qty) 

99 

100 def log_sell(self, code: str, current_price, qty: int = 1): 

101 """가상 매도 — 해당 종목 가장 최근 HOLD 건.""" 

102 with self._lock: 

103 df = self._read() 

104 mask = (df['code'] == code) & (df['status'] == 'HOLD') 

105 if df.loc[mask].empty: 

106 logger.warning(f"[가상매매] {code} 매도 실패: 보유 내역 없음") 

107 return 

108 idx = df.loc[mask].index[-1] 

109 buy_price = df.loc[idx, 'buy_price'] 

110 return_rate = ((current_price - buy_price) / buy_price) * 100 if buy_price else 0 

111 df.loc[idx, 'sell_date'] = self.tm.get_current_kst_time().strftime("%Y-%m-%d %H:%M:%S") 

112 df.loc[idx, 'sell_price'] = current_price 

113 df.loc[idx, 'return_rate'] = round(return_rate, 2) 

114 df.loc[idx, 'status'] = 'SOLD' 

115 self._write(df) 

116 logger.info(f"[가상매매] {code} 매도 기록 (수익률: {return_rate:.2f}%)") 

117 

118 async def log_sell_async(self, code: str, current_price, qty: int = 1): 

119 """log_sell의 비동기 래퍼 (스레드 실행).""" 

120 await asyncio.to_thread(self.log_sell, code, current_price, qty) 

121 

122 def log_sell_by_strategy(self, strategy_name: str, code: str, current_price, qty: int = 1) -> float | None: 

123 """전략+종목 매칭 매도. 성공 시 수익률 반환, 실패 시 None 반환.""" 

124 with self._lock: 

125 df = self._read() 

126 mask = (df['strategy'] == strategy_name) & (df['code'] == code) & (df['status'] == 'HOLD') 

127 if df.loc[mask].empty: 

128 logger.warning(f"[가상매매] {strategy_name}/{code} 매도 실패: 보유 내역 없음") 

129 return None 

130 idx = df.loc[mask].index[-1] 

131 buy_price = df.loc[idx, 'buy_price'] 

132 return_rate = ((current_price - buy_price) / buy_price) * 100 if buy_price else 0 

133 df.loc[idx, 'sell_date'] = self.tm.get_current_kst_time().strftime("%Y-%m-%d %H:%M:%S") 

134 df.loc[idx, 'sell_price'] = current_price 

135 df.loc[idx, 'return_rate'] = round(return_rate, 2) 

136 df.loc[idx, 'status'] = 'SOLD' 

137 self._write(df) 

138 logger.info(f"[가상매매] {strategy_name}/{code} 매도 기록 (수익률: {return_rate:.2f}%)") 

139 return round(return_rate, 2) 

140 

141 async def log_sell_by_strategy_async(self, strategy_name: str, code: str, current_price, qty: int = 1) -> float | None: 

142 """log_sell_by_strategy의 비동기 래퍼 (스레드 실행). 성공 시 수익률 반환.""" 

143 return await asyncio.to_thread(self.log_sell_by_strategy, strategy_name, code, current_price, qty) 

144 

145 # ---- 조회 ---- 

146 

147 def _to_json_records(self, df: pd.DataFrame) -> list: 

148 """DataFrame을 JSON 직렬화 가능한 dict 리스트로 변환 (NaN -> None).""" 

149 records = df.to_dict(orient='records') 

150 for record in records: 

151 for key, value in record.items(): 

152 if isinstance(value, float) and math.isnan(value): 

153 record[key] = None 

154 return records 

155 

156 def calculate_return(self, buy_price, sell_price, qty=1, apply_cost=False) -> float: 

157 """수익률 계산 헬퍼 (TransactionCostManager 위임)""" 

158 return round(TransactionCostUtils.get_return_rate(buy_price, sell_price, qty, apply_cost), 2) 

159 

160 def get_trade_amount(self, price, qty=1, is_sell=False, apply_cost=False) -> float: 

161 """거래 금액 계산 (비용 포함 매수금액 또는 비용 차감 매도금액)""" 

162 base_amount = price * qty 

163 if not apply_cost: 

164 return base_amount 

165 

166 cost = TransactionCostUtils.calculate_cost(price, qty, is_sell) 

167 return base_amount - cost if is_sell else base_amount + cost 

168 

169 def get_all_trades(self, apply_cost: bool = False) -> list: 

170 """전체 거래 기록 반환 (웹 API용). apply_cost=True 시 수익률 재계산.""" 

171 df = self._read() 

172 records = self._to_json_records(df) 

173 if apply_cost: 

174 for r in records: 

175 if r.get('status') == 'SOLD' and r.get('buy_price') and r.get('sell_price'): 175 ↛ 174line 175 didn't jump to line 174 because the condition on line 175 was always true

176 r['return_rate'] = self.calculate_return(r['buy_price'], r['sell_price'], r.get('qty', 1), True) 

177 return records 

178 

179 def get_solds(self) -> list: 

180 """전체 SOLD 포지션 반환.""" 

181 df = self._read() 

182 return self._to_json_records(df.loc[df['status'] == 'SOLD']) 

183 

184 def get_holds(self) -> list: 

185 """전체 HOLD 포지션 반환.""" 

186 df = self._read() 

187 return self._to_json_records(df.loc[df['status'] == 'HOLD']) 

188 

189 def get_holds_by_strategy(self, strategy_name: str) -> list: 

190 """전략별 HOLD 포지션 반환.""" 

191 df = self._read() 

192 mask = (df['strategy'] == strategy_name) & (df['status'] == 'HOLD') 

193 return self._to_json_records(df.loc[mask]) 

194 

195 def is_holding(self, strategy_name: str, code: str) -> bool: 

196 """해당 전략에서 종목 보유 중인지 확인.""" 

197 df = self._read() 

198 mask = (df['strategy'] == strategy_name) & (df['code'] == code) & (df['status'] == 'HOLD') 

199 return not df.loc[mask].empty 

200 

201 def fix_sell_price(self, code: str, buy_date: str, correct_price): 

202 """sell_price가 0인 SOLD 기록의 매도가/수익률을 보정합니다.""" 

203 with self._lock: 

204 df = self._read() 

205 mask = (df['code'] == code) & (df['status'] == 'SOLD') & (df['sell_price'] == 0) 

206 if buy_date: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 mask = mask & (df['buy_date'] == buy_date) 

208 if df.loc[mask].empty: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 return 

210 for idx in df.loc[mask].index: 

211 bp = df.loc[idx, 'buy_price'] 

212 df.loc[idx, 'sell_price'] = correct_price 

213 df.loc[idx, 'return_rate'] = round(((correct_price - bp) / bp) * 100, 2) if bp else 0 

214 self._write(df) 

215 logger.info(f"[가상매매] {code} sell_price 보정 완료 → {correct_price}") 

216 

217 def get_summary(self, apply_cost: bool = False) -> dict: 

218 """전체 매매 요약 통계 (HOLD + SOLD 모두 포함).""" 

219 df = self._read() 

220 total_trades = len(df) 

221 sold_df = df[df['status'] == 'SOLD'] 

222 

223 if sold_df.empty: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 return {"total_trades": total_trades, "win_rate": 0, "avg_return": 0} 

225 

226 # 수익률 시리즈 추출 (비용 적용 시 재계산) 

227 if apply_cost: 

228 returns = sold_df.apply(lambda row: self.calculate_return(row['buy_price'], row['sell_price'], row['qty'], True), axis=1) 

229 else: 

230 returns = sold_df['return_rate'] 

231 

232 win_trades = len(returns[returns > 0]) 

233 win_rate = (win_trades / len(sold_df) * 100) 

234 avg_return = returns.mean() 

235 

236 return { 

237 "total_trades": total_trades, 

238 "win_rate": round(win_rate, 1), 

239 "avg_return": round(avg_return, 2) 

240 } 

241 

242 # ---- 종가 캐시 (backfill용) ---- 

243 

244 def _price_cache_path(self) -> str: 

245 return os.path.join(os.path.dirname(self.filename), PRICE_CACHE_FILENAME) 

246 

247 def _load_price_cache(self) -> dict: 

248 """로컬 종가 캐시 로드. 구조: { "005930": {"2026-02-13": 56000, ...}, ... }""" 

249 path = self._price_cache_path() 

250 if not os.path.exists(path): 

251 return {} 

252 try: 

253 with open(path, 'r', encoding='utf-8') as f: 

254 return json.load(f) 

255 except (json.JSONDecodeError, IOError): 

256 return {} 

257 

258 def _save_price_cache(self, cache: dict): 

259 path = self._price_cache_path() 

260 with open(path, 'w', encoding='utf-8') as f: 

261 json.dump(cache, f, ensure_ascii=False, indent=2) 

262 

263 def _fetch_close_prices(self, codes: list[str], start_date: str, end_date: str) -> dict: 

264 """pykrx로 종가 조회 후 캐시에 병합. 캐시에 이미 있으면 API 스킵. 

265 Returns: { code: { "YYYY-MM-DD": close_price, ... }, ... } 

266 """ 

267 from pykrx import stock as pykrx_stock 

268 

269 cache = self._load_price_cache() 

270 start_fmt = start_date.replace('-', '') 

271 end_fmt = end_date.replace('-', '') 

272 fetched = 0 

273 

274 for code in codes: 

275 # 캐시에 해당 종목+기간 데이터가 이미 있는지 확인 

276 if code in cache: 

277 cached_dates = set(cache[code].keys()) 

278 # start~end 범위의 영업일 중 누락된 날짜가 없으면 스킵 

279 needed_dates = set( 

280 d.strftime('%Y-%m-%d') 

281 for d in pd.date_range(start_date, end_date, freq='B') 

282 ) 

283 if needed_dates.issubset(cached_dates): 283 ↛ 286line 283 didn't jump to line 286 because the condition on line 283 was always true

284 continue 

285 

286 try: 

287 df = pykrx_stock.get_market_ohlcv_by_date(start_fmt, end_fmt, code) 

288 if df.empty: 

289 continue 

290 

291 if code not in cache: 291 ↛ 294line 291 didn't jump to line 294 because the condition on line 291 was always true

292 cache[code] = {} 

293 

294 for date_idx, row in df.iterrows(): 

295 day_str = date_idx.strftime('%Y-%m-%d') 

296 cache[code][day_str] = int(row['종가']) 

297 

298 fetched += 1 

299 except Exception as e: 

300 logger.warning(f"[가상매매] pykrx 종가 조회 실패 {code}: {e}") 

301 continue 

302 

303 if fetched > 0: 

304 self._save_price_cache(cache) 

305 logger.info(f"[가상매매] 종가 캐시 업데이트: {fetched}개 종목 조회") 

306 

307 return cache 

308 

309 # ---- backfill ---- 

310 

311 def backfill_snapshots(self): 

312 """CSV 거래 기록을 기반으로 과거 일별 스냅샷을 역산하여 채웁니다. 

313 이미 스냅샷이 존재하는 날짜는 덮어쓰지 않습니다. 

314 

315 계산 방식 (web_api.py의 save_daily_snapshot과 동일): 

316 - 해당 날짜 기준 '활성 거래' = 매수일 <= day인 모든 거래 

317 - SOLD: sell_day <= day → 확정 return_rate 사용 

318 - HOLD(당시 기준): 당일 종가 기준 수익률 (pykrx 조회, 로컬 캐시) 

319 - 전략별 평균 return_rate 저장 

320 """ 

321 df = self._read() 

322 if df.empty: 

323 return 

324 

325 data = self._load_data() 

326 daily = data["daily"] 

327 

328 # 1. 날짜 전처리 

329 # itertuples 접근을 위해 underscore 없는 컬럼명 사용 

330 df['buy_day_str'] = pd.to_datetime(df['buy_date'], errors='coerce').dt.strftime('%Y-%m-%d') 

331 sell_mask = df['sell_date'].notna() & (df['sell_date'] != '') 

332 df['sell_day_str'] = None 

333 sell_dt = pd.to_datetime(df.loc[sell_mask, 'sell_date'], errors='coerce') 

334 valid_sell = sell_mask & sell_dt.notna().reindex(df.index, fill_value=False) 

335 df.loc[valid_sell, 'sell_day_str'] = sell_dt.dropna().dt.strftime('%Y-%m-%d') 

336 

337 all_days = set(df['buy_day_str'].dropna().tolist()) 

338 all_days |= set(df.loc[sell_mask, 'sell_day_str'].dropna().tolist()) 

339 

340 if not all_days: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 return 

342 

343 min_day = min(all_days) 

344 max_day = max(all_days) 

345 

346 # [수정] 현재 시점(어제)까지 backfill 범위 확장 (보유 중인 경우 등 고려) 

347 yesterday = (self.tm.get_current_kst_time() - timedelta(days=1)).strftime('%Y-%m-%d') 

348 if yesterday > max_day: 

349 max_day = yesterday 

350 

351 # backfill이 필요한 날짜 확인 

352 date_range = pd.date_range(min_day, max_day, freq='D') 

353 date_strs = [d.strftime('%Y-%m-%d') for d in date_range] 

354 missing_days = [d for d in date_strs if d not in daily] 

355 

356 if not missing_days: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 return # backfill 불필요 

358 

359 # 2. 종가 캐시 조회 (HOLD 포지션 수익률 계산용) 

360 all_codes = df['code'].unique().tolist() 

361 price_cache = self._fetch_close_prices(all_codes, min_day, max_day) 

362 

363 # [성능 개선] 종가 데이터를 DataFrame으로 변환하고 전처리 (ffill) 

364 # _find_prev_close 반복 호출 제거를 위해 전체 기간 데이터를 미리 채움 

365 price_df = pd.DataFrame() 

366 if price_cache: 366 ↛ 380line 366 didn't jump to line 380 because the condition on line 366 was always true

367 try: 

368 price_df = pd.DataFrame(price_cache) 

369 # 인덱스(날짜)를 datetime으로 변환하여 정렬 

370 price_df.index = pd.to_datetime(price_df.index) 

371 price_df = price_df.sort_index() 

372 # 전체 기간 reindex & ffill (휴장일 데이터 채우기) 

373 full_idx = pd.date_range(start=min_day, end=max_day) 

374 price_df = price_df.reindex(full_idx).ffill() 

375 except Exception as e: 

376 logger.warning(f"[가상매매] 종가 데이터프레임 변환 실패: {e}") 

377 price_df = pd.DataFrame() 

378 

379 # 3. 날짜별 스냅샷 생성 (Numpy Optimization) 

380 added = 0 

381 missing_days.sort() 

382 n_days = len(missing_days) 

383 

384 strategies = sorted(df['strategy'].unique().tolist()) 

385 strat_to_idx = {s: i for i, s in enumerate(strategies)} 

386 n_strats = len(strategies) 

387 

388 # Arrays for aggregation 

389 buy_sums = np.zeros((n_days, n_strats), dtype=np.float64) 

390 eval_sums = np.zeros((n_days, n_strats), dtype=np.float64) 

391 

392 # Prepare Price Matrix 

393 price_matrix = None 

394 code_to_idx = {} 

395 

396 if not price_df.empty: 396 ↛ 406line 396 didn't jump to line 406 because the condition on line 396 was always true

397 md_dt = pd.to_datetime(missing_days) 

398 # Reindex to missing days only 

399 price_df_aligned = price_df.reindex(md_dt) 

400 

401 codes = price_df_aligned.columns.tolist() 

402 code_to_idx = {c: i for i, c in enumerate(codes)} 

403 price_matrix = price_df_aligned.to_numpy(dtype=np.float64) 

404 

405 # Iterate trades 

406 for row in df.itertuples(): 

407 strat = row.strategy 

408 s_idx = strat_to_idx.get(strat) 

409 if s_idx is None: continue 409 ↛ 406line 409 didn't jump to line 406 because the continue on line 409 wasn't executed

410 

411 code = row.code 

412 try: 

413 qty = float(row.qty) if hasattr(row, 'qty') and pd.notna(row.qty) else 1.0 

414 except (ValueError, TypeError): 

415 qty = 1.0 

416 bp = float(row.buy_price) if pd.notna(row.buy_price) else 0.0 

417 if bp == 0: continue 417 ↛ 406line 417 didn't jump to line 406 because the continue on line 417 wasn't executed

418 

419 buy_date = row.buy_day_str 

420 sell_date = row.sell_day_str if row.status == 'SOLD' else None 

421 

422 # Find start index in missing_days 

423 start_idx = bisect.bisect_left(missing_days, buy_date) 

424 if start_idx >= n_days: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true

425 continue 

426 

427 # Determine end index (sell date) 

428 if sell_date: 

429 sell_idx = bisect.bisect_left(missing_days, sell_date) 

430 

431 # Period 2: SOLD (from sell_idx onwards) 

432 if sell_idx < n_days: 432 ↛ 443line 432 didn't jump to line 443 because the condition on line 432 was always true

433 sp = float(row.sell_price) if pd.notna(row.sell_price) else 0.0 

434 val = sp if sp > 0 else bp 

435 

436 # Apply to [max(start_idx, sell_idx) : ] 

437 s_start = max(start_idx, sell_idx) 

438 if s_start < n_days: 438 ↛ 443line 438 didn't jump to line 443 because the condition on line 438 was always true

439 buy_sums[s_start:, s_idx] += (bp * qty) 

440 eval_sums[s_start:, s_idx] += (val * qty) 

441 

442 # Period 1: HOLD (from start_idx to sell_idx) 

443 h_end = min(sell_idx, n_days) 

444 if start_idx < h_end: 444 ↛ 406line 444 didn't jump to line 406 because the condition on line 444 was always true

445 buy_sums[start_idx:h_end, s_idx] += (bp * qty) 

446 

447 # Eval using market price 

448 c_idx = code_to_idx.get(code) 

449 if c_idx is not None and price_matrix is not None: 

450 prices = price_matrix[start_idx:h_end, c_idx] 

451 # Handle NaNs 

452 if np.isnan(prices).any(): 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 prices = prices.copy() 

454 prices[np.isnan(prices)] = bp 

455 eval_sums[start_idx:h_end, s_idx] += (prices * qty) 

456 else: 

457 eval_sums[start_idx:h_end, s_idx] += (bp * qty) 

458 else: 

459 # HOLD until end 

460 buy_sums[start_idx:, s_idx] += (bp * qty) 

461 

462 c_idx = code_to_idx.get(code) 

463 if c_idx is not None and price_matrix is not None: 463 ↛ 470line 463 didn't jump to line 470 because the condition on line 463 was always true

464 prices = price_matrix[start_idx:, c_idx] 

465 if np.isnan(prices).any(): 

466 prices = prices.copy() 

467 prices[np.isnan(prices)] = bp 

468 eval_sums[start_idx:, s_idx] += (prices * qty) 

469 else: 

470 eval_sums[start_idx:, s_idx] += (bp * qty) 

471 

472 # Calculate Returns 

473 with np.errstate(divide='ignore', invalid='ignore'): 

474 returns = ((eval_sums - buy_sums) / buy_sums) * 100 

475 returns[~np.isfinite(returns)] = 0.0 

476 returns = np.round(returns, 2) 

477 

478 # Calculate ALL 

479 total_buy = buy_sums.sum(axis=1) 

480 total_eval = eval_sums.sum(axis=1) 

481 with np.errstate(divide='ignore', invalid='ignore'): 

482 all_returns = ((total_eval - total_buy) / total_buy) * 100 

483 all_returns[~np.isfinite(all_returns)] = 0.0 

484 all_returns = np.round(all_returns, 2) 

485 

486 # Build result dict 

487 for i, day_str in enumerate(missing_days): 

488 if total_buy[i] > 0: 488 ↛ 487line 488 didn't jump to line 487 because the condition on line 488 was always true

489 day_stats = {} 

490 for j, strat in enumerate(strategies): 

491 if buy_sums[i, j] > 0: 

492 day_stats[strat] = returns[i, j] 

493 

494 day_stats['ALL'] = all_returns[i] 

495 daily[day_str] = day_stats 

496 added += 1 

497 

498 if added > 0: 498 ↛ exitline 498 didn't return from function 'backfill_snapshots' because the condition on line 498 was always true

499 cutoff = (self.tm.get_current_kst_time() - timedelta(days=30)).strftime("%Y-%m-%d") 

500 data["daily"] = {d: v for d, v in sorted(daily.items()) if d >= cutoff} 

501 self._save_data(data) 

502 logger.info(f"[가상매매] 스냅샷 backfill 완료: {added}일 추가") 

503 

504 @staticmethod 

505 def _find_prev_close(price_cache: dict, code: str, day: str): 

506 """해당 날짜 이전 가장 가까운 종가를 찾습니다 (휴장일 대응).""" 

507 code_prices = price_cache.get(code) 

508 if not code_prices: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true

509 return None 

510 prev_dates = sorted([d for d in code_prices if d < day], reverse=True) 

511 return code_prices[prev_dates[0]] if prev_dates else None 

512 

513 # ---- 포트폴리오 스냅샷 (전일/전주대비 계산용) ---- 

514 # 

515 # JSON 구조: 

516 # { 

517 # "daily": {"2026-02-13": {"ALL": 2.5, "수동매매": 2.5}, ...}, 

518 # "prev_values": {"ALL": 0.0, "수동매매": 0.0} ← 마지막 변동 전 기준값 

519 # } 

520 

521 def _snapshot_path(self) -> str: 

522 return os.path.join(os.path.dirname(self.filename), SNAPSHOT_FILENAME) 

523 

524 def _load_data(self) -> dict: 

525 """메모리에 데이터가 있으면 즉시 반환하고, 없으면 파일에서 읽어옵니다.""" 

526 # 1. 캐시 확인 (메모리에 있으면 I/O 생략) 

527 if self._cached_data is not None: 

528 return self._cached_data 

529 

530 path = self._snapshot_path() 

531 if not os.path.exists(path): 

532 self._cached_data = {"daily": {}, "prev_values": {}} 

533 return self._cached_data 

534 

535 try: 

536 with open(path, 'r', encoding='utf-8') as f: 

537 data = json.load(f) 

538 

539 # 데이터 마이그레이션 로직 

540 if "daily" not in data: 540 ↛ 544line 540 didn't jump to line 544 because the condition on line 540 was always true

541 data = {"daily": data, "prev_values": {}} 

542 

543 # 2. 읽어온 데이터를 메모리에 저장 (캐싱) 

544 self._cached_data = data 

545 return self._cached_data 

546 

547 except (json.JSONDecodeError, IOError): 

548 self._cached_data = {"daily": {}, "prev_values": {}} 

549 return self._cached_data 

550 

551 def _save_data(self, data: dict): 

552 path = self._snapshot_path() 

553 try: 

554 with open(path, 'w', encoding='utf-8') as f: 

555 json.dump(data, f, ensure_ascii=False, indent=2) 

556 # 중요: 파일 저장 성공 시 메모리 캐시도 즉시 최신화 

557 self._cached_data = data 

558 except IOError as e: 

559 logger.error(f"Failed to save snapshot: {e}") 

560 

561 def save_daily_snapshot(self, strategy_returns: dict): 

562 """오늘 스냅샷 저장. 성능 최적화 버전.""" 

563 now = self.tm.get_current_kst_time() 

564 if now.weekday() >= 5: # 주말 제외 

565 return 

566 

567 today = now.strftime("%Y-%m-%d") 

568 

569 # 1. 메모리 캐시를 우선 사용하는 _load_data 호출 

570 data = self._load_data() 

571 daily = data.get("daily", {}) 

572 

573 # 2. 직전 날짜 비교 로직 최적화 (전체 정렬 대신 필요한 값만 찾기) 

574 if daily: 

575 # 오늘보다 이전 날짜 중 가장 최근 날짜 하나만 찾음 

576 prev_dates = [d for d in daily if d < today] 

577 if prev_dates: 577 ↛ 585line 577 didn't jump to line 585 because the condition on line 577 was always true

578 last_date = max(prev_dates) # sorted()보다 max()가 훨씬 빠름 

579 if _is_weekday(last_date): 579 ↛ 585line 579 didn't jump to line 585 because the condition on line 579 was always true

580 last_snapshot = daily[last_date] 

581 if _strategy_values(last_snapshot) == _strategy_values(strategy_returns): 

582 return 

583 

584 # 3. 데이터 업데이트 

585 daily[today] = strategy_returns 

586 

587 # 4. 데이터 정리 로직 개선 (30일치 유지) 

588 cutoff_dt = now - timedelta(days=30) 

589 cutoff_str = cutoff_dt.strftime("%Y-%m-%d") 

590 

591 # dict comprehension 시 기준 날짜보다 큰 것만 필터링 

592 new_daily = {d: v for d, v in daily.items() if d >= cutoff_str} 

593 data["daily"] = new_daily 

594 

595 # 5. 파일 및 캐시 저장 

596 self._save_data(data) 

597 

598 def get_daily_change(self, strategy: str, current_return: float, *, _data: dict | None = None) -> tuple[float | None, str | None]: 

599 data = _data or self._load_data() 

600 daily = data.get("daily", {}) 

601 if not daily: return None, None 601 ↛ exitline 601 didn't return from function 'get_daily_change' because the return on line 601 wasn't executed

602 

603 today = self.tm.get_current_kst_time().strftime("%Y-%m-%d") 

604 

605 # _get_trading_dates()로 주말/공휴일(전략값 미변동) 제외 

606 all_trading = _get_trading_dates(daily) 

607 trading = [d for d in all_trading if d <= today] 

608 if len(trading) < 2: 

609 return None, None 

610 

611 latest_date = trading[-1] 

612 prev_date = trading[-2] 

613 

614 latest_val = daily[latest_date].get(strategy) 

615 prev_val = daily[prev_date].get(strategy) 

616 

617 if latest_val is None or prev_val is None: 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true

618 return None, None 

619 return round(latest_val - prev_val, 2), prev_date 

620 

621 def get_weekly_change(self, strategy: str, current_return: float, *, _data: dict | None = None) -> tuple[float | None, str | None]: 

622 """7일 전 거래일 스냅샷 대비 변화. (변동값, 기준날짜) 튜플 반환.""" 

623 data = _data or self._load_data() 

624 daily = data.get("daily", {}) 

625 if not daily: return None, None 625 ↛ exitline 625 didn't return from function 'get_weekly_change' because the return on line 625 wasn't executed

626 

627 today = self.tm.get_current_kst_time().strftime("%Y-%m-%d") 

628 target = (self.tm.get_current_kst_time() - timedelta(days=7)).strftime("%Y-%m-%d") 

629 

630 # _get_trading_dates()의 무거운 루프를 피하고 바로 키 정렬 사용 

631 sorted_dates = sorted(daily.keys()) 

632 candidates = [d for d in sorted_dates if d <= target and d != today] 

633 

634 if not candidates: 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true

635 return None, None 

636 

637 ref_date = candidates[-1] 

638 ref_val = daily[ref_date].get(strategy) 

639 if ref_val is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 return None, None 

641 return round(current_return - ref_val, 2), ref_date 

642 

643 def get_strategy_return_history(self, strategy_name: str) -> list[dict]: 

644 data = self._load_data() 

645 daily = data.get("daily", {}) 

646 if not daily: return [] 646 ↛ exitline 646 didn't return from function 'get_strategy_return_history' because the return on line 646 wasn't executed

647 

648 df = pd.DataFrame.from_dict(daily, orient='index') 

649 if strategy_name not in df.columns: return [] 

650 

651 # [수정 포인트] ffill() 후에도 남는 과거 빈값(최초 거래 이전)을 0.0으로 채움 

652 # .fillna(0.0) 을 반드시 추가해 주세요. 

653 series = df[strategy_name].sort_index().ffill().fillna(0.0) 

654 

655 # [수정 포인트] Numpy float64 타입을 순수 Python float로 캐스팅하여 에러 방지 

656 # 주말 날짜 제외 (_is_weekday 필터) 

657 return [{"date": date, "return_rate": float(val)} for date, val in series.items() if _is_weekday(date)] 

658 

659 def get_all_strategies(self) -> list[str]: 

660 data = self._load_data() 

661 daily = data.get("daily", {}) 

662 if not daily: return [] 662 ↛ exitline 662 didn't return from function 'get_all_strategies' because the return on line 662 wasn't executed

663 

664 # 모든 날짜를 다 뒤지는 대신, 최근 30일 내에 등장한 전략들만 합집합 

665 # (과거에 삭제된 전략이 계속 나타나는 것을 방지) 

666 strategies = set() 

667 recent_dates = sorted(daily.keys(), reverse=True)[:5] # 최근 5거래일만 확인 

668 for date in recent_dates: 

669 strategies.update(daily[date].keys()) 

670 

671 if "ALL" in strategies: strategies.remove("ALL") 

672 return sorted(list(strategies))