Coverage for repositories / virtual_trade_repository.py: 90%
437 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# repositories/virtual_trade_repository.py
2import bisect
3import numpy as np
4import pandas as pd
5import asyncio
6import threading
7import os
8import json
9import math
10import logging
11from datetime import datetime, timedelta
12from functools import lru_cache
13from core.market_clock import MarketClock
14from utils.transaction_cost_utils import TransactionCostUtils
15logger = logging.getLogger(__name__)
17COLUMNS = ["strategy", "code", "buy_date", "buy_price", "qty", "sell_date", "sell_price", "return_rate", "status"]
18SNAPSHOT_FILENAME = "portfolio_snapshots.json"
21@lru_cache(maxsize=1024)
22def _is_weekday(date_str: str) -> bool:
23 # 이미 처리한 날짜는 다시 계산하지 않음
24 return datetime.strptime(date_str, "%Y-%m-%d").weekday() < 5
27def _strategy_values(snapshot: dict) -> dict:
28 """스냅샷에서 개별 전략 값만 추출 (ALL 제외). ALL은 파생 값이라 비교에서 제외."""
29 return {k: v for k, v in snapshot.items() if k != "ALL"}
32def _get_trading_dates(daily: dict) -> list[str]:
33 """스냅샷 dict에서 실제 거래일만 추출 (평일 + 개별 전략 값이 변한 날짜). 오름차순 반환."""
34 weekday_dates = sorted(d for d in daily if _is_weekday(d))
35 if not weekday_dates: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 return []
37 trading = [weekday_dates[0]] # 첫 날은 항상 포함
38 for d in weekday_dates[1:]:
39 if _strategy_values(daily[d]) != _strategy_values(daily[trading[-1]]):
40 trading.append(d)
41 return trading
42PRICE_CACHE_FILENAME = "close_price_cache.json"
45class VirtualTradeRepository:
46 def __init__(self, filename="data/VirtualTradeRepository/trade_journal.csv", market_clock: MarketClock = None):
47 self._cached_data = None # 메모리 캐시 변수 추가
48 self.filename = filename
49 self.tm = market_clock if market_clock else MarketClock()
50 self._lock = threading.Lock()
51 os.makedirs(os.path.dirname(self.filename), exist_ok=True) # 데이터 디렉토리 생성
52 if not os.path.exists(self.filename):
53 pd.DataFrame(columns=COLUMNS).to_csv(self.filename, index=False)
55 def _read(self) -> pd.DataFrame:
56 df = pd.read_csv(self.filename, dtype={'code': str, 'sell_date': object}, encoding='utf-8')
57 df['return_rate'] = df['return_rate'].fillna(0.0)
58 # 기존 파일 호환성: qty 컬럼이 없으면 기본값 1로 채움
59 if 'qty' not in df.columns:
60 df['qty'] = 1
61 return df
63 def _write(self, df: pd.DataFrame):
64 df.to_csv(self.filename, index=False, encoding='utf-8')
66 # ---- 매수/매도 ----
68 def log_buy(self, strategy_name: str, code: str, current_price, qty: int = 1):
69 """가상 매수 기록. 동일 전략+종목 중복 매수 방지."""
70 with self._lock:
71 if self.is_holding(strategy_name, code):
72 logger.info(f"[가상매매] {strategy_name}/{code} 이미 보유 중 — 매수 스킵")
73 return
74 buy_date = self.tm.get_current_kst_time().strftime("%Y-%m-%d %H:%M:%S")
75 # 기존 CSV 헤더 순서에 맞춰 append
76 try:
77 existing_cols = pd.read_csv(self.filename, nrows=0, encoding='utf-8').columns.tolist()
78 except Exception:
79 existing_cols = COLUMNS
81 new_row = pd.DataFrame({
82 "strategy": [strategy_name],
83 "code": [code],
84 "buy_date": [buy_date],
85 "buy_price": [current_price],
86 "qty": [qty],
87 "sell_date": [np.nan],
88 "sell_price": [np.nan],
89 "return_rate": [0.0],
90 "status": ["HOLD"]
91 })
92 new_row = new_row[existing_cols]
93 new_row.to_csv(self.filename, mode='a', header=False, index=False, encoding='utf-8')
94 logger.info(f"[가상매매] {strategy_name}/{code} 매수 기록 (가격: {current_price}, 수량: {qty})")
96 async def log_buy_async(self, strategy_name: str, code: str, current_price, qty: int = 1):
97 """log_buy의 비동기 래퍼 (스레드 실행)."""
98 await asyncio.to_thread(self.log_buy, strategy_name, code, current_price, qty)
100 def log_sell(self, code: str, current_price, qty: int = 1):
101 """가상 매도 — 해당 종목 가장 최근 HOLD 건."""
102 with self._lock:
103 df = self._read()
104 mask = (df['code'] == code) & (df['status'] == 'HOLD')
105 if df.loc[mask].empty:
106 logger.warning(f"[가상매매] {code} 매도 실패: 보유 내역 없음")
107 return
108 idx = df.loc[mask].index[-1]
109 buy_price = df.loc[idx, 'buy_price']
110 return_rate = ((current_price - buy_price) / buy_price) * 100 if buy_price else 0
111 df.loc[idx, 'sell_date'] = self.tm.get_current_kst_time().strftime("%Y-%m-%d %H:%M:%S")
112 df.loc[idx, 'sell_price'] = current_price
113 df.loc[idx, 'return_rate'] = round(return_rate, 2)
114 df.loc[idx, 'status'] = 'SOLD'
115 self._write(df)
116 logger.info(f"[가상매매] {code} 매도 기록 (수익률: {return_rate:.2f}%)")
118 async def log_sell_async(self, code: str, current_price, qty: int = 1):
119 """log_sell의 비동기 래퍼 (스레드 실행)."""
120 await asyncio.to_thread(self.log_sell, code, current_price, qty)
122 def log_sell_by_strategy(self, strategy_name: str, code: str, current_price, qty: int = 1) -> float | None:
123 """전략+종목 매칭 매도. 성공 시 수익률 반환, 실패 시 None 반환."""
124 with self._lock:
125 df = self._read()
126 mask = (df['strategy'] == strategy_name) & (df['code'] == code) & (df['status'] == 'HOLD')
127 if df.loc[mask].empty:
128 logger.warning(f"[가상매매] {strategy_name}/{code} 매도 실패: 보유 내역 없음")
129 return None
130 idx = df.loc[mask].index[-1]
131 buy_price = df.loc[idx, 'buy_price']
132 return_rate = ((current_price - buy_price) / buy_price) * 100 if buy_price else 0
133 df.loc[idx, 'sell_date'] = self.tm.get_current_kst_time().strftime("%Y-%m-%d %H:%M:%S")
134 df.loc[idx, 'sell_price'] = current_price
135 df.loc[idx, 'return_rate'] = round(return_rate, 2)
136 df.loc[idx, 'status'] = 'SOLD'
137 self._write(df)
138 logger.info(f"[가상매매] {strategy_name}/{code} 매도 기록 (수익률: {return_rate:.2f}%)")
139 return round(return_rate, 2)
141 async def log_sell_by_strategy_async(self, strategy_name: str, code: str, current_price, qty: int = 1) -> float | None:
142 """log_sell_by_strategy의 비동기 래퍼 (스레드 실행). 성공 시 수익률 반환."""
143 return await asyncio.to_thread(self.log_sell_by_strategy, strategy_name, code, current_price, qty)
145 # ---- 조회 ----
147 def _to_json_records(self, df: pd.DataFrame) -> list:
148 """DataFrame을 JSON 직렬화 가능한 dict 리스트로 변환 (NaN -> None)."""
149 records = df.to_dict(orient='records')
150 for record in records:
151 for key, value in record.items():
152 if isinstance(value, float) and math.isnan(value):
153 record[key] = None
154 return records
156 def calculate_return(self, buy_price, sell_price, qty=1, apply_cost=False) -> float:
157 """수익률 계산 헬퍼 (TransactionCostManager 위임)"""
158 return round(TransactionCostUtils.get_return_rate(buy_price, sell_price, qty, apply_cost), 2)
160 def get_trade_amount(self, price, qty=1, is_sell=False, apply_cost=False) -> float:
161 """거래 금액 계산 (비용 포함 매수금액 또는 비용 차감 매도금액)"""
162 base_amount = price * qty
163 if not apply_cost:
164 return base_amount
166 cost = TransactionCostUtils.calculate_cost(price, qty, is_sell)
167 return base_amount - cost if is_sell else base_amount + cost
169 def get_all_trades(self, apply_cost: bool = False) -> list:
170 """전체 거래 기록 반환 (웹 API용). apply_cost=True 시 수익률 재계산."""
171 df = self._read()
172 records = self._to_json_records(df)
173 if apply_cost:
174 for r in records:
175 if r.get('status') == 'SOLD' and r.get('buy_price') and r.get('sell_price'): 175 ↛ 174line 175 didn't jump to line 174 because the condition on line 175 was always true
176 r['return_rate'] = self.calculate_return(r['buy_price'], r['sell_price'], r.get('qty', 1), True)
177 return records
179 def get_solds(self) -> list:
180 """전체 SOLD 포지션 반환."""
181 df = self._read()
182 return self._to_json_records(df.loc[df['status'] == 'SOLD'])
184 def get_holds(self) -> list:
185 """전체 HOLD 포지션 반환."""
186 df = self._read()
187 return self._to_json_records(df.loc[df['status'] == 'HOLD'])
189 def get_holds_by_strategy(self, strategy_name: str) -> list:
190 """전략별 HOLD 포지션 반환."""
191 df = self._read()
192 mask = (df['strategy'] == strategy_name) & (df['status'] == 'HOLD')
193 return self._to_json_records(df.loc[mask])
195 def is_holding(self, strategy_name: str, code: str) -> bool:
196 """해당 전략에서 종목 보유 중인지 확인."""
197 df = self._read()
198 mask = (df['strategy'] == strategy_name) & (df['code'] == code) & (df['status'] == 'HOLD')
199 return not df.loc[mask].empty
201 def fix_sell_price(self, code: str, buy_date: str, correct_price):
202 """sell_price가 0인 SOLD 기록의 매도가/수익률을 보정합니다."""
203 with self._lock:
204 df = self._read()
205 mask = (df['code'] == code) & (df['status'] == 'SOLD') & (df['sell_price'] == 0)
206 if buy_date: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 mask = mask & (df['buy_date'] == buy_date)
208 if df.loc[mask].empty: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 return
210 for idx in df.loc[mask].index:
211 bp = df.loc[idx, 'buy_price']
212 df.loc[idx, 'sell_price'] = correct_price
213 df.loc[idx, 'return_rate'] = round(((correct_price - bp) / bp) * 100, 2) if bp else 0
214 self._write(df)
215 logger.info(f"[가상매매] {code} sell_price 보정 완료 → {correct_price}")
217 def get_summary(self, apply_cost: bool = False) -> dict:
218 """전체 매매 요약 통계 (HOLD + SOLD 모두 포함)."""
219 df = self._read()
220 total_trades = len(df)
221 sold_df = df[df['status'] == 'SOLD']
223 if sold_df.empty: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 return {"total_trades": total_trades, "win_rate": 0, "avg_return": 0}
226 # 수익률 시리즈 추출 (비용 적용 시 재계산)
227 if apply_cost:
228 returns = sold_df.apply(lambda row: self.calculate_return(row['buy_price'], row['sell_price'], row['qty'], True), axis=1)
229 else:
230 returns = sold_df['return_rate']
232 win_trades = len(returns[returns > 0])
233 win_rate = (win_trades / len(sold_df) * 100)
234 avg_return = returns.mean()
236 return {
237 "total_trades": total_trades,
238 "win_rate": round(win_rate, 1),
239 "avg_return": round(avg_return, 2)
240 }
242 # ---- 종가 캐시 (backfill용) ----
244 def _price_cache_path(self) -> str:
245 return os.path.join(os.path.dirname(self.filename), PRICE_CACHE_FILENAME)
247 def _load_price_cache(self) -> dict:
248 """로컬 종가 캐시 로드. 구조: { "005930": {"2026-02-13": 56000, ...}, ... }"""
249 path = self._price_cache_path()
250 if not os.path.exists(path):
251 return {}
252 try:
253 with open(path, 'r', encoding='utf-8') as f:
254 return json.load(f)
255 except (json.JSONDecodeError, IOError):
256 return {}
258 def _save_price_cache(self, cache: dict):
259 path = self._price_cache_path()
260 with open(path, 'w', encoding='utf-8') as f:
261 json.dump(cache, f, ensure_ascii=False, indent=2)
263 def _fetch_close_prices(self, codes: list[str], start_date: str, end_date: str) -> dict:
264 """pykrx로 종가 조회 후 캐시에 병합. 캐시에 이미 있으면 API 스킵.
265 Returns: { code: { "YYYY-MM-DD": close_price, ... }, ... }
266 """
267 from pykrx import stock as pykrx_stock
269 cache = self._load_price_cache()
270 start_fmt = start_date.replace('-', '')
271 end_fmt = end_date.replace('-', '')
272 fetched = 0
274 for code in codes:
275 # 캐시에 해당 종목+기간 데이터가 이미 있는지 확인
276 if code in cache:
277 cached_dates = set(cache[code].keys())
278 # start~end 범위의 영업일 중 누락된 날짜가 없으면 스킵
279 needed_dates = set(
280 d.strftime('%Y-%m-%d')
281 for d in pd.date_range(start_date, end_date, freq='B')
282 )
283 if needed_dates.issubset(cached_dates): 283 ↛ 286line 283 didn't jump to line 286 because the condition on line 283 was always true
284 continue
286 try:
287 df = pykrx_stock.get_market_ohlcv_by_date(start_fmt, end_fmt, code)
288 if df.empty:
289 continue
291 if code not in cache: 291 ↛ 294line 291 didn't jump to line 294 because the condition on line 291 was always true
292 cache[code] = {}
294 for date_idx, row in df.iterrows():
295 day_str = date_idx.strftime('%Y-%m-%d')
296 cache[code][day_str] = int(row['종가'])
298 fetched += 1
299 except Exception as e:
300 logger.warning(f"[가상매매] pykrx 종가 조회 실패 {code}: {e}")
301 continue
303 if fetched > 0:
304 self._save_price_cache(cache)
305 logger.info(f"[가상매매] 종가 캐시 업데이트: {fetched}개 종목 조회")
307 return cache
309 # ---- backfill ----
311 def backfill_snapshots(self):
312 """CSV 거래 기록을 기반으로 과거 일별 스냅샷을 역산하여 채웁니다.
313 이미 스냅샷이 존재하는 날짜는 덮어쓰지 않습니다.
315 계산 방식 (web_api.py의 save_daily_snapshot과 동일):
316 - 해당 날짜 기준 '활성 거래' = 매수일 <= day인 모든 거래
317 - SOLD: sell_day <= day → 확정 return_rate 사용
318 - HOLD(당시 기준): 당일 종가 기준 수익률 (pykrx 조회, 로컬 캐시)
319 - 전략별 평균 return_rate 저장
320 """
321 df = self._read()
322 if df.empty:
323 return
325 data = self._load_data()
326 daily = data["daily"]
328 # 1. 날짜 전처리
329 # itertuples 접근을 위해 underscore 없는 컬럼명 사용
330 df['buy_day_str'] = pd.to_datetime(df['buy_date'], errors='coerce').dt.strftime('%Y-%m-%d')
331 sell_mask = df['sell_date'].notna() & (df['sell_date'] != '')
332 df['sell_day_str'] = None
333 sell_dt = pd.to_datetime(df.loc[sell_mask, 'sell_date'], errors='coerce')
334 valid_sell = sell_mask & sell_dt.notna().reindex(df.index, fill_value=False)
335 df.loc[valid_sell, 'sell_day_str'] = sell_dt.dropna().dt.strftime('%Y-%m-%d')
337 all_days = set(df['buy_day_str'].dropna().tolist())
338 all_days |= set(df.loc[sell_mask, 'sell_day_str'].dropna().tolist())
340 if not all_days: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 return
343 min_day = min(all_days)
344 max_day = max(all_days)
346 # [수정] 현재 시점(어제)까지 backfill 범위 확장 (보유 중인 경우 등 고려)
347 yesterday = (self.tm.get_current_kst_time() - timedelta(days=1)).strftime('%Y-%m-%d')
348 if yesterday > max_day:
349 max_day = yesterday
351 # backfill이 필요한 날짜 확인
352 date_range = pd.date_range(min_day, max_day, freq='D')
353 date_strs = [d.strftime('%Y-%m-%d') for d in date_range]
354 missing_days = [d for d in date_strs if d not in daily]
356 if not missing_days: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true
357 return # backfill 불필요
359 # 2. 종가 캐시 조회 (HOLD 포지션 수익률 계산용)
360 all_codes = df['code'].unique().tolist()
361 price_cache = self._fetch_close_prices(all_codes, min_day, max_day)
363 # [성능 개선] 종가 데이터를 DataFrame으로 변환하고 전처리 (ffill)
364 # _find_prev_close 반복 호출 제거를 위해 전체 기간 데이터를 미리 채움
365 price_df = pd.DataFrame()
366 if price_cache: 366 ↛ 380line 366 didn't jump to line 380 because the condition on line 366 was always true
367 try:
368 price_df = pd.DataFrame(price_cache)
369 # 인덱스(날짜)를 datetime으로 변환하여 정렬
370 price_df.index = pd.to_datetime(price_df.index)
371 price_df = price_df.sort_index()
372 # 전체 기간 reindex & ffill (휴장일 데이터 채우기)
373 full_idx = pd.date_range(start=min_day, end=max_day)
374 price_df = price_df.reindex(full_idx).ffill()
375 except Exception as e:
376 logger.warning(f"[가상매매] 종가 데이터프레임 변환 실패: {e}")
377 price_df = pd.DataFrame()
379 # 3. 날짜별 스냅샷 생성 (Numpy Optimization)
380 added = 0
381 missing_days.sort()
382 n_days = len(missing_days)
384 strategies = sorted(df['strategy'].unique().tolist())
385 strat_to_idx = {s: i for i, s in enumerate(strategies)}
386 n_strats = len(strategies)
388 # Arrays for aggregation
389 buy_sums = np.zeros((n_days, n_strats), dtype=np.float64)
390 eval_sums = np.zeros((n_days, n_strats), dtype=np.float64)
392 # Prepare Price Matrix
393 price_matrix = None
394 code_to_idx = {}
396 if not price_df.empty: 396 ↛ 406line 396 didn't jump to line 406 because the condition on line 396 was always true
397 md_dt = pd.to_datetime(missing_days)
398 # Reindex to missing days only
399 price_df_aligned = price_df.reindex(md_dt)
401 codes = price_df_aligned.columns.tolist()
402 code_to_idx = {c: i for i, c in enumerate(codes)}
403 price_matrix = price_df_aligned.to_numpy(dtype=np.float64)
405 # Iterate trades
406 for row in df.itertuples():
407 strat = row.strategy
408 s_idx = strat_to_idx.get(strat)
409 if s_idx is None: continue 409 ↛ 406line 409 didn't jump to line 406 because the continue on line 409 wasn't executed
411 code = row.code
412 try:
413 qty = float(row.qty) if hasattr(row, 'qty') and pd.notna(row.qty) else 1.0
414 except (ValueError, TypeError):
415 qty = 1.0
416 bp = float(row.buy_price) if pd.notna(row.buy_price) else 0.0
417 if bp == 0: continue 417 ↛ 406line 417 didn't jump to line 406 because the continue on line 417 wasn't executed
419 buy_date = row.buy_day_str
420 sell_date = row.sell_day_str if row.status == 'SOLD' else None
422 # Find start index in missing_days
423 start_idx = bisect.bisect_left(missing_days, buy_date)
424 if start_idx >= n_days: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true
425 continue
427 # Determine end index (sell date)
428 if sell_date:
429 sell_idx = bisect.bisect_left(missing_days, sell_date)
431 # Period 2: SOLD (from sell_idx onwards)
432 if sell_idx < n_days: 432 ↛ 443line 432 didn't jump to line 443 because the condition on line 432 was always true
433 sp = float(row.sell_price) if pd.notna(row.sell_price) else 0.0
434 val = sp if sp > 0 else bp
436 # Apply to [max(start_idx, sell_idx) : ]
437 s_start = max(start_idx, sell_idx)
438 if s_start < n_days: 438 ↛ 443line 438 didn't jump to line 443 because the condition on line 438 was always true
439 buy_sums[s_start:, s_idx] += (bp * qty)
440 eval_sums[s_start:, s_idx] += (val * qty)
442 # Period 1: HOLD (from start_idx to sell_idx)
443 h_end = min(sell_idx, n_days)
444 if start_idx < h_end: 444 ↛ 406line 444 didn't jump to line 406 because the condition on line 444 was always true
445 buy_sums[start_idx:h_end, s_idx] += (bp * qty)
447 # Eval using market price
448 c_idx = code_to_idx.get(code)
449 if c_idx is not None and price_matrix is not None:
450 prices = price_matrix[start_idx:h_end, c_idx]
451 # Handle NaNs
452 if np.isnan(prices).any(): 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 prices = prices.copy()
454 prices[np.isnan(prices)] = bp
455 eval_sums[start_idx:h_end, s_idx] += (prices * qty)
456 else:
457 eval_sums[start_idx:h_end, s_idx] += (bp * qty)
458 else:
459 # HOLD until end
460 buy_sums[start_idx:, s_idx] += (bp * qty)
462 c_idx = code_to_idx.get(code)
463 if c_idx is not None and price_matrix is not None: 463 ↛ 470line 463 didn't jump to line 470 because the condition on line 463 was always true
464 prices = price_matrix[start_idx:, c_idx]
465 if np.isnan(prices).any():
466 prices = prices.copy()
467 prices[np.isnan(prices)] = bp
468 eval_sums[start_idx:, s_idx] += (prices * qty)
469 else:
470 eval_sums[start_idx:, s_idx] += (bp * qty)
472 # Calculate Returns
473 with np.errstate(divide='ignore', invalid='ignore'):
474 returns = ((eval_sums - buy_sums) / buy_sums) * 100
475 returns[~np.isfinite(returns)] = 0.0
476 returns = np.round(returns, 2)
478 # Calculate ALL
479 total_buy = buy_sums.sum(axis=1)
480 total_eval = eval_sums.sum(axis=1)
481 with np.errstate(divide='ignore', invalid='ignore'):
482 all_returns = ((total_eval - total_buy) / total_buy) * 100
483 all_returns[~np.isfinite(all_returns)] = 0.0
484 all_returns = np.round(all_returns, 2)
486 # Build result dict
487 for i, day_str in enumerate(missing_days):
488 if total_buy[i] > 0: 488 ↛ 487line 488 didn't jump to line 487 because the condition on line 488 was always true
489 day_stats = {}
490 for j, strat in enumerate(strategies):
491 if buy_sums[i, j] > 0:
492 day_stats[strat] = returns[i, j]
494 day_stats['ALL'] = all_returns[i]
495 daily[day_str] = day_stats
496 added += 1
498 if added > 0: 498 ↛ exitline 498 didn't return from function 'backfill_snapshots' because the condition on line 498 was always true
499 cutoff = (self.tm.get_current_kst_time() - timedelta(days=30)).strftime("%Y-%m-%d")
500 data["daily"] = {d: v for d, v in sorted(daily.items()) if d >= cutoff}
501 self._save_data(data)
502 logger.info(f"[가상매매] 스냅샷 backfill 완료: {added}일 추가")
504 @staticmethod
505 def _find_prev_close(price_cache: dict, code: str, day: str):
506 """해당 날짜 이전 가장 가까운 종가를 찾습니다 (휴장일 대응)."""
507 code_prices = price_cache.get(code)
508 if not code_prices: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true
509 return None
510 prev_dates = sorted([d for d in code_prices if d < day], reverse=True)
511 return code_prices[prev_dates[0]] if prev_dates else None
513 # ---- 포트폴리오 스냅샷 (전일/전주대비 계산용) ----
514 #
515 # JSON 구조:
516 # {
517 # "daily": {"2026-02-13": {"ALL": 2.5, "수동매매": 2.5}, ...},
518 # "prev_values": {"ALL": 0.0, "수동매매": 0.0} ← 마지막 변동 전 기준값
519 # }
521 def _snapshot_path(self) -> str:
522 return os.path.join(os.path.dirname(self.filename), SNAPSHOT_FILENAME)
524 def _load_data(self) -> dict:
525 """메모리에 데이터가 있으면 즉시 반환하고, 없으면 파일에서 읽어옵니다."""
526 # 1. 캐시 확인 (메모리에 있으면 I/O 생략)
527 if self._cached_data is not None:
528 return self._cached_data
530 path = self._snapshot_path()
531 if not os.path.exists(path):
532 self._cached_data = {"daily": {}, "prev_values": {}}
533 return self._cached_data
535 try:
536 with open(path, 'r', encoding='utf-8') as f:
537 data = json.load(f)
539 # 데이터 마이그레이션 로직
540 if "daily" not in data: 540 ↛ 544line 540 didn't jump to line 544 because the condition on line 540 was always true
541 data = {"daily": data, "prev_values": {}}
543 # 2. 읽어온 데이터를 메모리에 저장 (캐싱)
544 self._cached_data = data
545 return self._cached_data
547 except (json.JSONDecodeError, IOError):
548 self._cached_data = {"daily": {}, "prev_values": {}}
549 return self._cached_data
551 def _save_data(self, data: dict):
552 path = self._snapshot_path()
553 try:
554 with open(path, 'w', encoding='utf-8') as f:
555 json.dump(data, f, ensure_ascii=False, indent=2)
556 # 중요: 파일 저장 성공 시 메모리 캐시도 즉시 최신화
557 self._cached_data = data
558 except IOError as e:
559 logger.error(f"Failed to save snapshot: {e}")
561 def save_daily_snapshot(self, strategy_returns: dict):
562 """오늘 스냅샷 저장. 성능 최적화 버전."""
563 now = self.tm.get_current_kst_time()
564 if now.weekday() >= 5: # 주말 제외
565 return
567 today = now.strftime("%Y-%m-%d")
569 # 1. 메모리 캐시를 우선 사용하는 _load_data 호출
570 data = self._load_data()
571 daily = data.get("daily", {})
573 # 2. 직전 날짜 비교 로직 최적화 (전체 정렬 대신 필요한 값만 찾기)
574 if daily:
575 # 오늘보다 이전 날짜 중 가장 최근 날짜 하나만 찾음
576 prev_dates = [d for d in daily if d < today]
577 if prev_dates: 577 ↛ 585line 577 didn't jump to line 585 because the condition on line 577 was always true
578 last_date = max(prev_dates) # sorted()보다 max()가 훨씬 빠름
579 if _is_weekday(last_date): 579 ↛ 585line 579 didn't jump to line 585 because the condition on line 579 was always true
580 last_snapshot = daily[last_date]
581 if _strategy_values(last_snapshot) == _strategy_values(strategy_returns):
582 return
584 # 3. 데이터 업데이트
585 daily[today] = strategy_returns
587 # 4. 데이터 정리 로직 개선 (30일치 유지)
588 cutoff_dt = now - timedelta(days=30)
589 cutoff_str = cutoff_dt.strftime("%Y-%m-%d")
591 # dict comprehension 시 기준 날짜보다 큰 것만 필터링
592 new_daily = {d: v for d, v in daily.items() if d >= cutoff_str}
593 data["daily"] = new_daily
595 # 5. 파일 및 캐시 저장
596 self._save_data(data)
598 def get_daily_change(self, strategy: str, current_return: float, *, _data: dict | None = None) -> tuple[float | None, str | None]:
599 data = _data or self._load_data()
600 daily = data.get("daily", {})
601 if not daily: return None, None 601 ↛ exitline 601 didn't return from function 'get_daily_change' because the return on line 601 wasn't executed
603 today = self.tm.get_current_kst_time().strftime("%Y-%m-%d")
605 # _get_trading_dates()로 주말/공휴일(전략값 미변동) 제외
606 all_trading = _get_trading_dates(daily)
607 trading = [d for d in all_trading if d <= today]
608 if len(trading) < 2:
609 return None, None
611 latest_date = trading[-1]
612 prev_date = trading[-2]
614 latest_val = daily[latest_date].get(strategy)
615 prev_val = daily[prev_date].get(strategy)
617 if latest_val is None or prev_val is None: 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true
618 return None, None
619 return round(latest_val - prev_val, 2), prev_date
621 def get_weekly_change(self, strategy: str, current_return: float, *, _data: dict | None = None) -> tuple[float | None, str | None]:
622 """7일 전 거래일 스냅샷 대비 변화. (변동값, 기준날짜) 튜플 반환."""
623 data = _data or self._load_data()
624 daily = data.get("daily", {})
625 if not daily: return None, None 625 ↛ exitline 625 didn't return from function 'get_weekly_change' because the return on line 625 wasn't executed
627 today = self.tm.get_current_kst_time().strftime("%Y-%m-%d")
628 target = (self.tm.get_current_kst_time() - timedelta(days=7)).strftime("%Y-%m-%d")
630 # _get_trading_dates()의 무거운 루프를 피하고 바로 키 정렬 사용
631 sorted_dates = sorted(daily.keys())
632 candidates = [d for d in sorted_dates if d <= target and d != today]
634 if not candidates: 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true
635 return None, None
637 ref_date = candidates[-1]
638 ref_val = daily[ref_date].get(strategy)
639 if ref_val is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true
640 return None, None
641 return round(current_return - ref_val, 2), ref_date
643 def get_strategy_return_history(self, strategy_name: str) -> list[dict]:
644 data = self._load_data()
645 daily = data.get("daily", {})
646 if not daily: return [] 646 ↛ exitline 646 didn't return from function 'get_strategy_return_history' because the return on line 646 wasn't executed
648 df = pd.DataFrame.from_dict(daily, orient='index')
649 if strategy_name not in df.columns: return []
651 # [수정 포인트] ffill() 후에도 남는 과거 빈값(최초 거래 이전)을 0.0으로 채움
652 # .fillna(0.0) 을 반드시 추가해 주세요.
653 series = df[strategy_name].sort_index().ffill().fillna(0.0)
655 # [수정 포인트] Numpy float64 타입을 순수 Python float로 캐스팅하여 에러 방지
656 # 주말 날짜 제외 (_is_weekday 필터)
657 return [{"date": date, "return_rate": float(val)} for date, val in series.items() if _is_weekday(date)]
659 def get_all_strategies(self) -> list[str]:
660 data = self._load_data()
661 daily = data.get("daily", {})
662 if not daily: return [] 662 ↛ exitline 662 didn't return from function 'get_all_strategies' because the return on line 662 wasn't executed
664 # 모든 날짜를 다 뒤지는 대신, 최근 30일 내에 등장한 전략들만 합집합
665 # (과거에 삭제된 전략이 계속 나타나는 것을 방지)
666 strategies = set()
667 recent_dates = sorted(daily.keys(), reverse=True)[:5] # 최근 5거래일만 확인
668 for date in recent_dates:
669 strategies.update(daily[date].keys())
671 if "ALL" in strategies: strategies.remove("ALL")
672 return sorted(list(strategies))