Coverage for services / virtual_trade_service.py: 89%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# services/virtual_trade_service.py 

2import logging 

3from typing import List, Dict, Tuple, Optional 

4import pandas as pd 

5import bisect 

6from functools import lru_cache 

7from datetime import datetime, timedelta 

8 

9from repositories.virtual_trade_repository import VirtualTradeRepository 

10from core.market_clock import MarketClock 

11from utils.transaction_cost_utils import TransactionCostUtils 

12 

13logger = logging.getLogger(__name__) 

14 

15@lru_cache(maxsize=1024) 

16def _is_weekday(date_str: str) -> bool: 

17 return datetime.strptime(date_str, "%Y-%m-%d").weekday() < 5 

18 

19def _strategy_values(snapshot: dict) -> dict: 

20 return {k: v for k, v in snapshot.items() if k != "ALL"} 

21 

22def _get_trading_dates(daily: dict) -> list[str]: 

23 weekday_dates = sorted(d for d in daily if _is_weekday(d)) 

24 if not weekday_dates: return [] 24 ↛ exitline 24 didn't return from function '_get_trading_dates' because the return on line 24 wasn't executed

25 trading = [weekday_dates[0]] 

26 for d in weekday_dates[1:]: 

27 if _strategy_values(daily[d]) != _strategy_values(daily[trading[-1]]): 27 ↛ 26line 27 didn't jump to line 26 because the condition on line 27 was always true

28 trading.append(d) 

29 return trading 

30 

31class VirtualTradeService: 

32 """모의매매 통계 계산 및 성과 분석을 담당하는 비즈니스 서비스 계층""" 

33 def __init__(self, repository: VirtualTradeRepository, market_clock: MarketClock): 

34 self._repo = repository 

35 self.tm = market_clock 

36 

37 # ── 비즈니스 & 통계 계산 로직 ── 

38 

39 def calculate_return(self, buy_price, sell_price, qty=1, apply_cost=False) -> float: 

40 return round(TransactionCostUtils.get_return_rate(buy_price, sell_price, qty, apply_cost), 2) 

41 

42 def get_trade_amount(self, price, qty=1, is_sell=False, apply_cost=False) -> float: 

43 base_amount = price * qty 

44 if not apply_cost: return base_amount 

45 cost = TransactionCostUtils.calculate_cost(price, qty, is_sell) 

46 return base_amount - cost if is_sell else base_amount + cost 

47 

48 def get_all_trades(self, apply_cost: bool = False) -> list: 

49 df = self._repo._read() 

50 records = self._repo._to_json_records(df) 

51 if apply_cost: 

52 for r in records: 

53 if r.get('status') == 'SOLD' and r.get('buy_price') and r.get('sell_price'): 53 ↛ 52line 53 didn't jump to line 52 because the condition on line 53 was always true

54 r['return_rate'] = self.calculate_return(r['buy_price'], r['sell_price'], r.get('qty', 1), True) 

55 return records 

56 

57 def get_summary(self, apply_cost: bool = False) -> dict: 

58 df = self._repo._read() 

59 total_trades = len(df) 

60 sold_df = df[df['status'] == 'SOLD'] 

61 

62 if sold_df.empty: 

63 return {"total_trades": total_trades, "win_rate": 0, "avg_return": 0} 

64 

65 if apply_cost: 

66 returns = sold_df.apply(lambda row: self.calculate_return(row['buy_price'], row['sell_price'], row['qty'], True), axis=1) 

67 else: 

68 returns = sold_df['return_rate'] 

69 

70 win_trades = len(returns[returns > 0]) 

71 win_rate = (win_trades / len(sold_df) * 100) 

72 avg_return = returns.mean() 

73 

74 return { 

75 "total_trades": total_trades, 

76 "win_rate": round(win_rate, 1), 

77 "avg_return": round(avg_return, 2) 

78 } 

79 

80 def get_daily_change(self, strategy: str, current_return: float, *, _data: dict | None = None) -> tuple[float | None, str | None]: 

81 data = _data or self._repo._load_data() 

82 daily = data.get("daily", {}) 

83 if not daily: return None, None 

84 today = self.tm.get_current_kst_time().strftime("%Y-%m-%d") 

85 all_trading = _get_trading_dates(daily) 

86 trading = [d for d in all_trading if d <= today] 

87 if len(trading) < 2: return None, None 87 ↛ exitline 87 didn't return from function 'get_daily_change' because the return on line 87 wasn't executed

88 latest_date = trading[-1] 

89 prev_date = trading[-2] 

90 latest_val = daily[latest_date].get(strategy) 

91 prev_val = daily[prev_date].get(strategy) 

92 if latest_val is None or prev_val is None: return None, None 92 ↛ exitline 92 didn't return from function 'get_daily_change' because the return on line 92 wasn't executed

93 return round(latest_val - prev_val, 2), prev_date 

94 

95 def get_weekly_change(self, strategy: str, current_return: float, *, _data: dict | None = None) -> tuple[float | None, str | None]: 

96 data = _data or self._repo._load_data() 

97 daily = data.get("daily", {}) 

98 if not daily: return None, None 98 ↛ exitline 98 didn't return from function 'get_weekly_change' because the return on line 98 wasn't executed

99 today = self.tm.get_current_kst_time().strftime("%Y-%m-%d") 

100 target = (self.tm.get_current_kst_time() - timedelta(days=7)).strftime("%Y-%m-%d") 

101 sorted_dates = sorted(daily.keys()) 

102 candidates = [d for d in sorted_dates if d <= target and d != today] 

103 if not candidates: return None, None 103 ↛ exitline 103 didn't return from function 'get_weekly_change' because the return on line 103 wasn't executed

104 ref_date = candidates[-1] 

105 ref_val = daily[ref_date].get(strategy) 

106 if ref_val is None: return None, None 106 ↛ exitline 106 didn't return from function 'get_weekly_change' because the return on line 106 wasn't executed

107 return round(current_return - ref_val, 2), ref_date 

108 

109 def get_strategy_return_history(self, strategy_name: str) -> list[dict]: 

110 data = self._repo._load_data() 

111 daily = data.get("daily", {}) 

112 if not daily: return [] 112 ↛ exitline 112 didn't return from function 'get_strategy_return_history' because the return on line 112 wasn't executed

113 df = pd.DataFrame.from_dict(daily, orient='index') 

114 if strategy_name not in df.columns: return [] 114 ↛ exitline 114 didn't return from function 'get_strategy_return_history' because the return on line 114 wasn't executed

115 series = df[strategy_name].sort_index().ffill().fillna(0.0) 

116 return [{"date": date, "return_rate": float(val)} for date, val in series.items() if _is_weekday(date)] 

117 

118 def get_all_strategies(self) -> list[str]: 

119 data = self._repo._load_data() 

120 daily = data.get("daily", {}) 

121 if not daily: return [] 121 ↛ exitline 121 didn't return from function 'get_all_strategies' because the return on line 121 wasn't executed

122 strategies = set() 

123 recent_dates = sorted(daily.keys(), reverse=True)[:5] 

124 for date in recent_dates: strategies.update(daily[date].keys()) 

125 if "ALL" in strategies: strategies.remove("ALL") 

126 return sorted(list(strategies)) 

127 

128 # ── 데이터 영속성 위임 (Facade) ── 

129 # 기존 코드 호환성을 위해 순수 I/O 요청을 Repository로 그대로 전달합니다. 

130 def log_buy(self, *args, **kwargs): return self._repo.log_buy(*args, **kwargs) 

131 async def log_buy_async(self, *args, **kwargs): return await self._repo.log_buy_async(*args, **kwargs) 

132 def log_sell(self, *args, **kwargs): return self._repo.log_sell(*args, **kwargs) 

133 async def log_sell_async(self, *args, **kwargs): return await self._repo.log_sell_async(*args, **kwargs) 

134 def log_sell_by_strategy(self, *args, **kwargs): return self._repo.log_sell_by_strategy(*args, **kwargs) 134 ↛ exitline 134 didn't return from function 'log_sell_by_strategy' because the return on line 134 wasn't executed

135 async def log_sell_by_strategy_async(self, *args, **kwargs): return await self._repo.log_sell_by_strategy_async(*args, **kwargs) 135 ↛ exitline 135 didn't return from function 'log_sell_by_strategy_async' because the return on line 135 wasn't executed

136 def get_holds(self): return self._repo.get_holds() 

137 def get_solds(self): return self._repo.get_solds() 137 ↛ exitline 137 didn't return from function 'get_solds' because the return on line 137 wasn't executed

138 def get_holds_by_strategy(self, strategy_name: str): return self._repo.get_holds_by_strategy(strategy_name) 

139 def is_holding(self, strategy_name: str, code: str): return self._repo.is_holding(strategy_name, code) 139 ↛ exitline 139 didn't return from function 'is_holding' because the return on line 139 wasn't executed

140 def fix_sell_price(self, *args, **kwargs): return self._repo.fix_sell_price(*args, **kwargs) 140 ↛ exitline 140 didn't return from function 'fix_sell_price' because the return on line 140 wasn't executed

141 def backfill_snapshots(self): return self._repo.backfill_snapshots() 

142 def save_daily_snapshot(self, strategy_returns: dict): return self._repo.save_daily_snapshot(strategy_returns) 142 ↛ exitline 142 didn't return from function 'save_daily_snapshot' because the return on line 142 wasn't executed

143 def _load_data(self): return self._repo._load_data() 143 ↛ exitline 143 didn't return from function '_load_data' because the return on line 143 wasn't executed

144 def _save_data(self, data: dict): return self._repo._save_data(data) 144 ↛ exitline 144 didn't return from function '_save_data' because the return on line 144 wasn't executed