Coverage for view / web / routes / virtual.py: 95%

255 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1""" 

2가상 매매 관련 API 엔드포인트 (virtual.html). 

3""" 

4import asyncio 

5import math 

6import time 

7from fastapi import APIRouter 

8from view.web.api_common import _get_ctx, _PRICE_CACHE 

9import pandas as pd 

10import numpy as np 

11from datetime import datetime, timezone, timedelta 

12 

13router = APIRouter() 

14 

15 

16@router.get("/virtual/summary") 

17async def get_virtual_summary(apply_cost: bool = False): 

18 """가상 매매 요약 정보 조회""" 

19 ctx = _get_ctx() 

20 t_start = ctx.pm.start_timer() 

21 # ctx에 virtual_trade_service가 초기화되어 있어야 합니다. 

22 if not hasattr(ctx, 'virtual_trade_service'): 

23 return {"total_trades": 0, "win_rate": 0, "avg_return": 0} 

24 

25 result = ctx.virtual_trade_service.get_summary(apply_cost=apply_cost) 

26 ctx.pm.log_timer("get_virtual_summary", t_start) 

27 return result 

28 

29 

30@router.get("/virtual/strategies") 

31async def get_strategies(): 

32 """등록된 모든 전략 목록 반환 (UI 탭 생성용)""" 

33 ctx = _get_ctx() 

34 return ctx.virtual_trade_service.get_all_strategies() 

35 

36 

37async def _calculate_benchmark(ctx, code: str, ref_history: list, start_date: str, end_date: str) -> list: 

38 """Helper to calculate benchmark history for a given ETF code.""" 

39 try: 

40 resp = await ctx.stock_query_service.get_ohlcv_range( 

41 code, period="D", start_date=start_date, end_date=end_date 

42 ) 

43 

44 # API 실패 또는 데이터 없는 경우 0으로 채운 리스트 반환 

45 if not resp or resp.rt_cd != "0" or not resp.data: 

46 return [{"date": h['date'], "return_rate": 0} for h in ref_history] 

47 

48 ohlcv = resp.data 

49 

50 # 첫 거래일의 종가를 기준가로 설정 

51 base_price = ohlcv[0]['close'] 

52 if not isinstance(base_price, (int, float)) or base_price <= 0: 

53 return [{"date": h['date'], "return_rate": 0} for h in ref_history] 

54 

55 ohlcv_map = {item['date']: item['close'] for item in ohlcv} 

56 

57 benchmark_history = [] 

58 last_price = base_price 

59 for h in ref_history: 

60 date_key = h['date'].replace('-', '') 

61 price = ohlcv_map.get(date_key, last_price) 

62 

63 # 가격 데이터가 없는 경우(get 실패), 마지막 가격 유지 

64 if not isinstance(price, (int, float)): 

65 price = last_price 

66 else: 

67 last_price = price 

68 

69 bench_return = round(((price - base_price) / base_price) * 100, 2) 

70 benchmark_history.append({"date": h['date'], "return_rate": bench_return}) 

71 

72 return benchmark_history 

73 

74 except Exception: 

75 # 예외 발생 시 로깅하고 0으로 채운 리스트 반환 

76 return [{"date": h['date'], "return_rate": 0} for h in ref_history] 

77 

78 

79@router.get("/virtual/chart/{strategy_name}") 

80async def get_strategy_chart(strategy_name: str): 

81 """특정 전략의 수익률 히스토리(차트용) 반환 + 벤치마크(KOSPI200, KOSDAQ150) 포함""" 

82 ctx = _get_ctx() 

83 async with ctx.pm.profile_async(f"get_strategy_chart({strategy_name})"): 

84 t_start = ctx.pm.start_timer() 

85 vm = ctx.virtual_trade_service 

86 

87 # 1. 히스토리 데이터 수집 

88 if strategy_name == "ALL": 

89 strategies = vm.get_all_strategies() 

90 histories = {s: vm.get_strategy_return_history(s) for s in strategies} 

91 # ALL 합산 히스토리 생성: 전 전략의 날짜별 평균 수익률 

92 all_dates_map: dict[str, list[float]] = {} 

93 for hist in histories.values(): 

94 for entry in hist: 

95 all_dates_map.setdefault(entry['date'], []).append(entry['return_rate']) 

96 if all_dates_map: 96 ↛ 105line 96 didn't jump to line 105 because the condition on line 96 was always true

97 histories["ALL"] = [ 

98 {"date": d, "return_rate": sum(vals) / len(vals)} 

99 for d, vals in sorted(all_dates_map.items()) 

100 ] 

101 else: 

102 histories = {strategy_name: vm.get_strategy_return_history(strategy_name)} 

103 

104 # 벤치마크 계산을 위한 기준 히스토리 (날짜 범위 추출용) 

105 ref_history = histories.get("ALL") or histories.get(strategy_name) or (next(iter(histories.values())) if histories else []) 

106 

107 if not ref_history: 

108 return {"histories": {}, "benchmarks": {}} 

109 

110 start_date = ref_history[0]['date'].replace('-', '') 

111 end_date = ref_history[-1]['date'].replace('-', '') 

112 

113 # 벤치마크 데이터 (KOSPI 200, KOSDAQ 150) 

114 kospi_benchmark = await _calculate_benchmark(ctx, "069500", ref_history, start_date, end_date) 

115 kosdaq_benchmark = await _calculate_benchmark(ctx, "229200", ref_history, start_date, end_date) 

116 

117 benchmarks = { 

118 "KOSPI200": kospi_benchmark, 

119 "KOSDAQ150": kosdaq_benchmark, 

120 } 

121 

122 ctx.pm.log_timer(f"get_strategy_chart({strategy_name})", t_start) 

123 return {"histories": histories, "benchmarks": benchmarks} 

124 

125 

126def _sanitize_for_json(obj): 

127 """NaN / Infinity → 0.0 으로 치환하여 JSON 직렬화 안전성 보장.""" 

128 if isinstance(obj, float): 

129 return 0.0 if (math.isnan(obj) or math.isinf(obj)) else obj 

130 if isinstance(obj, dict): 

131 return {k: _sanitize_for_json(v) for k, v in obj.items()} 

132 if isinstance(obj, list): 

133 return [_sanitize_for_json(v) for v in obj] 

134 return obj 

135 

136 

137def _aggregate_virtual_data(trades, vm, apply_cost): 

138 """Pandas 기반 집계 (CPU-bound) — thread pool에서 실행되는 순수 동기 함수.""" 

139 if not trades: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 return { 

141 "summary_agg": {}, "cumulative_returns": {}, 

142 "daily_changes": {}, "weekly_changes": {}, 

143 "daily_ref_dates": {}, "weekly_ref_dates": {}, 

144 "first_dates": {}, "counts": {}, 

145 "profit_factors": {}, "expectancies": {}, 

146 } 

147 

148 try: 

149 df = pd.DataFrame(trades) 

150 

151 for col, default in [('status', 'HOLD'), ('sell_date', None)]: 

152 if col not in df.columns: 

153 df[col] = default 

154 for col, default in [('qty', 1), ('buy_price', 0), ('current_price', 0), ('sell_price', 0)]: 

155 if col not in df.columns: 

156 df[col] = default 

157 df[col] = pd.to_numeric(df[col], errors='coerce').fillna(default) 

158 

159 df['eval_price'] = np.where( 

160 df['status'] == 'HOLD', df['current_price'], 

161 np.where(df['sell_price'] > 0, df['sell_price'], df['current_price']) 

162 ) 

163 df['eval_price'] = np.where(df['eval_price'] <= 0, df['buy_price'], df['eval_price']) 

164 

165 df['buy_amt'] = df.apply(lambda x: vm.get_trade_amount(x['buy_price'], x['qty'], is_sell=False, apply_cost=apply_cost), axis=1) 

166 df['eval_amt'] = df.apply(lambda x: vm.get_trade_amount(x['eval_price'], x['qty'], is_sell=True, apply_cost=apply_cost), axis=1) 

167 df['pnl'] = df['eval_amt'] - df['buy_amt'] 

168 

169 strategies = [s for s in df['strategy'].dropna().unique() if s] 

170 

171 # 전략별 누적수익률 

172 summary_agg = {"ALL": {"buy_sum": float(df['buy_amt'].sum()), "eval_sum": float(df['eval_amt'].sum())}} 

173 for strat in strategies: 

174 mask = df['strategy'] == strat 

175 summary_agg[strat] = { 

176 "buy_sum": float(df.loc[mask, 'buy_amt'].sum()), 

177 "eval_sum": float(df.loc[mask, 'eval_amt'].sum()), 

178 } 

179 strategy_returns = { 

180 k: (round(((v["eval_sum"] - v["buy_sum"]) / v["buy_sum"]) * 100, 2) if v["buy_sum"] > 0 else 0.0) 

181 for k, v in summary_agg.items() 

182 } 

183 

184 # 스냅샷 저장 및 변화율 

185 daily_changes, weekly_changes, daily_ref_dates, weekly_ref_dates = {}, {}, {}, {} 

186 try: 

187 vm.save_daily_snapshot(strategy_returns) 

188 snapshot_data = vm._load_data() 

189 for key in ["ALL"] + list(strategies): 

190 d_val, d_date = vm.get_daily_change(key, strategy_returns.get(key, 0), _data=snapshot_data) 

191 w_val, w_date = vm.get_weekly_change(key, strategy_returns.get(key, 0), _data=snapshot_data) 

192 daily_changes[key], weekly_changes[key] = d_val, w_val 

193 if d_date: daily_ref_dates[key] = d_date 

194 if w_date: weekly_ref_dates[key] = w_date 

195 except Exception as e: 

196 print(f"[WebAPI] virtual/history 스냅샷 처리 오류: {e}") 

197 

198 # 최초 매매일 

199 df['buy_date_str'] = df['buy_date'].astype(str).str[:10] 

200 valid_mask = df['buy_date_str'].str.match(r'^\d{4}-\d{2}-\d{2}$', na=False) 

201 valid_df_dates = df[valid_mask] 

202 all_min = valid_df_dates['buy_date_str'].min() 

203 first_dates = {} 

204 if pd.notna(all_min) and all_min: 

205 first_dates["ALL"] = str(all_min) 

206 for strat in strategies: 

207 mn = valid_df_dates[valid_df_dates['strategy'] == strat]['buy_date_str'].min() 

208 if pd.notna(mn) and mn: 

209 first_dates[strat] = str(mn) 

210 

211 # 상태별 카운트 

212 KST = timezone(timedelta(hours=9)) 

213 today_str = datetime.now(KST).strftime("%Y-%m-%d") 

214 try: 

215 snap = vm._load_data() 

216 if isinstance(snap, dict) and snap.get('daily'): 

217 daily_keys = sorted(snap['daily'].keys()) 

218 if daily_keys and today_str > daily_keys[-1]: 

219 today_str = daily_keys[-1] 

220 except Exception: 

221 pass 

222 

223 df['is_hold'] = df['status'] == 'HOLD' 

224 df['is_today_buy'] = df['buy_date'].astype(str).str.startswith(today_str) 

225 df['is_today_sell'] = (df['status'] == 'SOLD') & df['sell_date'].astype(str).str.startswith(today_str) 

226 

227 counts = {"ALL": { 

228 "hold": int(df['is_hold'].sum()), 

229 "today_buy": int(df['is_today_buy'].sum()), 

230 "today_sell": int(df['is_today_sell'].sum()), 

231 }} 

232 for strat in strategies: 

233 mask = df['strategy'] == strat 

234 counts[strat] = { 

235 "hold": int(df.loc[mask, 'is_hold'].sum()), 

236 "today_buy": int(df.loc[mask, 'is_today_buy'].sum()), 

237 "today_sell": int(df.loc[mask, 'is_today_sell'].sum()), 

238 } 

239 

240 # Profit Factor & Expectancy 

241 def calc_metrics(sub_df): 

242 gains = sub_df[sub_df['pnl'] >= 0]['pnl'] 

243 losses = sub_df[sub_df['pnl'] < 0]['pnl'] 

244 tot_g, tot_l = float(gains.sum()), abs(float(losses.sum())) 

245 wins, loss_c = len(gains), len(losses) 

246 tot_c = wins + loss_c 

247 pf = { 

248 "value": (round(tot_g / tot_l, 2) if tot_l > 0 else (None if tot_g > 0 else 0.0)), 

249 "total_gain": round(tot_g), "total_loss": round(tot_l), 

250 } 

251 exp = {"value": 0.0, "win_rate": 0.0, "avg_gain": 0, "avg_loss": 0, "wins": 0, "losses": 0} 

252 if tot_c > 0: 

253 w_rate, l_rate = wins / tot_c, loss_c / tot_c 

254 avg_g = tot_g / wins if wins > 0 else 0 

255 avg_l = tot_l / loss_c if loss_c > 0 else 0 

256 exp.update({ 

257 "value": round((w_rate * avg_g) - (l_rate * avg_l), 0), 

258 "win_rate": round(w_rate * 100, 1), 

259 "avg_gain": round(avg_g), "avg_loss": round(avg_l), 

260 "wins": wins, "losses": loss_c, 

261 }) 

262 return pf, exp 

263 

264 valid_df = df[df['buy_price'] > 0] 

265 profit_factors, expectancies = {}, {} 

266 pf_all, exp_all = calc_metrics(valid_df) 

267 profit_factors["ALL"], expectancies["ALL"] = pf_all, exp_all 

268 for strat in strategies: 

269 pf_s, exp_s = calc_metrics(valid_df[valid_df['strategy'] == strat]) 

270 profit_factors[strat], expectancies[strat] = pf_s, exp_s 

271 

272 except Exception as e: 

273 print(f"[WebAPI] virtual/history Pandas 집계 오류: {e}") 

274 summary_agg = {} 

275 strategy_returns = {} 

276 daily_changes = weekly_changes = daily_ref_dates = weekly_ref_dates = {} 

277 first_dates = counts = profit_factors = expectancies = {} 

278 

279 return { 

280 "summary_agg": summary_agg, 

281 "cumulative_returns": strategy_returns, 

282 "daily_changes": daily_changes, 

283 "weekly_changes": weekly_changes, 

284 "daily_ref_dates": daily_ref_dates, 

285 "weekly_ref_dates": weekly_ref_dates, 

286 "first_dates": first_dates, 

287 "counts": counts, 

288 "profit_factors": profit_factors, 

289 "expectancies": expectancies, 

290 } 

291 

292 

293@router.get("/virtual/history") 

294async def get_virtual_history(force_code: str = None, apply_cost: bool = False): 

295 """가상 매매 전체 기록 조회 (force_code 지정 시 해당 종목은 캐시 무시)""" 

296 ctx = _get_ctx() 

297 async with ctx.pm.profile_async("get_virtual_history"): 

298 return await _get_virtual_history_impl(ctx, force_code, apply_cost) 

299 

300 

301async def _get_virtual_history_impl(ctx, force_code, apply_cost): 

302 """get_virtual_history의 실제 구현 (Pandas 고속 집계 적용 버전)""" 

303 t_start = ctx.pm.start_timer() 

304 if not hasattr(ctx, 'virtual_trade_service'): 

305 return {"trades": [], "weekly_changes": {}} 

306 

307 vm = ctx.virtual_trade_service 

308 trades = vm.get_all_trades(apply_cost=apply_cost) 

309 

310 # --------------------------------------------------------- 

311 # 1~3단계: 종목명 및 현재가 Enrichment (기존 로직 유지 - API 통신) 

312 # --------------------------------------------------------- 

313 try: 

314 mapper = getattr(ctx, 'stock_code_repository', None) 

315 hold_codes = set() 

316 

317 for trade in trades: 

318 code = str(trade.get('code', '')) 

319 trade['stock_name'] = mapper.get_name_by_code(code) if mapper else '' 

320 if code.strip(): 320 ↛ 317line 320 didn't jump to line 317 because the condition on line 320 was always true

321 hold_codes.add(code) 

322 

323 hold_codes = list(hold_codes) 

324 price_map = {} 

325 

326 if hold_codes and getattr(ctx, 'stock_query_service', None): 

327 now = time.time() 

328 cached_codes = {} 

329 fetch_codes = [] 

330 

331 # 캐시 확인 

332 for code in hold_codes: 

333 if code != force_code and code in _PRICE_CACHE: 

334 c_price, c_rate, c_ts = _PRICE_CACHE[code] 

335 if now - c_ts < 60: 

336 cached_codes[code] = (c_price, c_rate, False, c_ts) 

337 continue 

338 fetch_codes.append(code) 

339 

340 price_map.update(cached_codes) 

341 

342 # API 배치 조회 

343 if fetch_codes: 343 ↛ 371line 343 didn't jump to line 371 because the condition on line 343 was always true

344 for batch_start in range(0, len(fetch_codes), 30): 

345 batch = fetch_codes[batch_start:batch_start + 30] 

346 try: 

347 resp = await ctx.stock_query_service.get_multi_price(batch) 

348 if resp and resp.rt_cd == "0" and isinstance(resp.data, list): 

349 for item in resp.data: 

350 if not isinstance(item, dict): continue 350 ↛ 349line 350 didn't jump to line 349 because the continue on line 350 wasn't executed

351 code = item.get("stck_shrn_iscd", "") 

352 if not code: continue 352 ↛ 349line 352 didn't jump to line 349 because the continue on line 352 wasn't executed

353 

354 price_val = int(float(item.get("stck_prpr", "0"))) 

355 rate_str = item.get("prdy_ctrt", "0") 

356 rate_val = float(rate_str) if rate_str not in ('N/A', '', 'None') else 0.0 

357 

358 if price_val > 0: 358 ↛ 349line 358 didn't jump to line 349 because the condition on line 358 was always true

359 _PRICE_CACHE[code] = (price_val, rate_val, time.time()) 

360 price_map[code] = (price_val, rate_val, False, time.time()) 

361 except Exception as e: 

362 print(f"[WebAPI] 복수종목 조회 예외: {e}") 

363 

364 # API 실패 시 기존 캐시 폴백 

365 for code in fetch_codes: 

366 if code not in price_map and code in _PRICE_CACHE: 

367 cached_price, cached_rate, cached_time = _PRICE_CACHE[code] 

368 price_map[code] = (cached_price, cached_rate, True, cached_time) 

369 

370 # 현재가 반영 및 개별 수익률 재계산 

371 for trade in trades: 

372 if trade['code'] in price_map: 

373 cur, daily_rate, cached, ts = price_map[trade['code']] 

374 trade['current_price'] = cur 

375 trade['is_cached'] = cached 

376 trade['cache_ts'] = ts 

377 

378 bp = float(trade.get('buy_price', 0) or 0) 

379 qty = float(trade.get('qty', 1) or 1) 

380 

381 if trade['status'] == 'HOLD': 

382 trade['daily_change_rate'] = daily_rate 

383 trade['return_rate'] = vm.calculate_return(bp, cur, qty, apply_cost=apply_cost) 

384 

385 if trade['status'] == 'SOLD': 

386 sp = float(trade.get('sell_price') or 0) 

387 bp = float(trade.get('buy_price', 0) or 0) 

388 qty = float(trade.get('qty', 1) or 1) 

389 

390 if sp == 0.0 and trade.get('current_price'): 

391 cur = trade['current_price'] 

392 trade['sell_price'] = cur 

393 trade['return_rate'] = vm.calculate_return(bp, cur, qty, apply_cost=apply_cost) 

394 try: 

395 vm.fix_sell_price(trade['code'], trade.get('buy_date', ''), cur) 

396 except Exception: pass 

397 else: 

398 trade['return_rate'] = vm.calculate_return(bp, sp, qty, apply_cost=apply_cost) 

399 trade['sell_price'] = sp 

400 

401 except Exception as e: 

402 print(f"[WebAPI] virtual/history enrichment 오류: {e}") 

403 

404 # --------------------------------------------------------- 

405 # 4~7단계: Pandas 고속 집계 — CPU-bound 작업을 thread pool로 위임 

406 # (이벤트 루프 차단 방지: 집계 중에도 다른 요청 처리 가능) 

407 # --------------------------------------------------------- 

408 loop = asyncio.get_event_loop() 

409 agg = await loop.run_in_executor(None, _aggregate_virtual_data, trades, vm, apply_cost) 

410 

411 ctx.pm.log_timer("get_virtual_history", t_start) 

412 return _sanitize_for_json({"trades": trades, **agg})