Coverage for scheduler / strategy_scheduler.py: 89%

397 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# scheduler/strategy_scheduler.py 

2from __future__ import annotations 

3 

4import asyncio 

5import json 

6import logging 

7from dataclasses import dataclass, field 

8from datetime import datetime 

9from typing import Dict, List, Optional 

10 

11from interfaces.live_strategy import LiveStrategy 

12from common.types import TradeSignal, ErrorCode, Exchange 

13from services.market_calendar_service import MarketCalendarService 

14from services.virtual_trade_service import VirtualTradeService 

15from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

16from services.order_execution_service import OrderExecutionService 

17from repositories.stock_code_repository import StockCodeRepository 

18from services.stock_query_service import StockQueryService 

19from core.market_clock import MarketClock 

20from core.performance_profiler import PerformanceProfiler 

21 

22from scheduler.strategy_scheduler_store import StrategySchedulerStore, SCHEDULER_DB_FILE 

23 

24 

25@dataclass 

26class SignalRecord: 

27 """실행된 시그널 이력 레코드.""" 

28 strategy_name: str 

29 code: str 

30 name: str 

31 action: str # BUY / SELL 

32 price: int 

33 qty: int = 1 

34 reason: str = "" 

35 timestamp: str = "" # ISO format 

36 api_success: bool = True 

37 return_rate: Optional[float] = None 

38 

39 

40@dataclass 

41class StrategySchedulerConfig: 

42 """전략별 스케줄링 설정.""" 

43 strategy: LiveStrategy 

44 interval_minutes: int = 5 # 실행 주기 (분) 

45 max_positions: int = 3 # 최대 동시 보유 포지션 수 

46 order_qty: int = 1 # 주문 수량 

47 enabled: bool = True # 개별 전략 활성/비활성 

48 allow_pyramiding: bool = False # 불타기(추가매수) 허용 여부 

49 force_exit_on_close: bool = False # 당일 청산 여부 

50 

51 

52class StrategyScheduler: 

53 """asyncio 기반 단일 스레드 전략 스케줄러. 

54 

55 등록된 전략들을 장중에 주기적으로 실행하고, 

56 발생한 TradeSignal을 CSV 기록 + API 주문으로 처리한다. 

57 """ 

58 

59 LOOP_INTERVAL_SEC = 1 # 메인 루프 깨어나는 주기 

60 MARKET_CLOSED_SLEEP_SEC = 60 # 장 외 시간 sleep 

61 FORCE_EXIT_MINUTES_BEFORE = 30 # 장 마감 N분 전 강제 청산 

62 STAGGER_INTERVAL_SEC = 60 # 전략 간 실행 시차 (초) 

63 

64 def __init__( 

65 self, 

66 virtual_trade_service: VirtualTradeService, 

67 order_execution_service: OrderExecutionService, 

68 stock_query_service: StockQueryService, 

69 stock_code_repository: StockCodeRepository, 

70 market_clock: MarketClock, 

71 market_calendar_service: MarketCalendarService, 

72 logger: Optional[logging.Logger] = None, 

73 dry_run: bool = False, 

74 notification_service: Optional[NotificationService] = None, 

75 performance_profiler: Optional[PerformanceProfiler] = None, 

76 store: Optional[StrategySchedulerStore] = None, 

77 ): 

78 self._virtual_trade_service = virtual_trade_service 

79 self._oes = order_execution_service 

80 self._sqs = stock_query_service 

81 self.stock_code_repository = stock_code_repository 

82 self._tm = market_clock 

83 self._logger = logger or logging.getLogger(__name__) 

84 self._dry_run = dry_run 

85 self._notification_service = notification_service 

86 self._mcs = market_calendar_service 

87 self._pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

88 

89 self._store = store or StrategySchedulerStore(db_path=SCHEDULER_DB_FILE, logger=self._logger) 

90 

91 self._strategies: List[StrategySchedulerConfig] = [] 

92 self._running = False 

93 self._task: Optional[asyncio.Task] = None 

94 self._last_run: Dict[str, datetime] = {} 

95 self._last_execution_time: Optional[datetime] = None # 전략 간 실행 쿨다운용 

96 self._force_exit_done: set = set() # 당일 강제 청산 완료된 전략 

97 self.MAX_HISTORY = 200 # 최대 보관 이력 수 

98 self._signal_history: List[SignalRecord] = self._load_signal_history() 

99 self._subscriber_queues: List[asyncio.Queue] = [] 

100 

101 # ── 전략 등록 ── 

102 

103 def register(self, config: StrategySchedulerConfig): 

104 self._strategies.append(config) 

105 self._logger.info( 

106 f"[Scheduler] 전략 등록: {config.strategy.name} " 

107 f"(주기={config.interval_minutes}분, 최대포지션={config.max_positions})" 

108 ) 

109 

110 # ── 생명주기 ── 

111 

112 async def start(self): 

113 if self._running: 

114 self._logger.warning("[Scheduler] 이미 실행 중") 

115 return 

116 for cfg in self._strategies: 

117 cfg.enabled = True 

118 self._running = True 

119 self._task = asyncio.create_task(self._loop()) 

120 self._logger.info("[Scheduler] 시작 (전체 전략 활성화)") 

121 if self._notification_service: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 names = [c.strategy.name for c in self._strategies if c.enabled] 

123 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.INFO, "스케줄러 시작", f"활성 전략: {', '.join(names)}") 

124 

125 async def stop(self, save_state: bool = False): 

126 if save_state: 

127 self._save_scheduler_state() 

128 

129 self._running = False 

130 for cfg in self._strategies: 

131 cfg.enabled = False 

132 if self._task: 

133 self._task.cancel() 

134 try: 

135 await self._task 

136 except asyncio.CancelledError: 

137 pass 

138 self._task = None 

139 

140 # 상태 저장을 동반한 종료(재시작 등)라면 강제 청산을 하지 않는다. 

141 # save_state=True -> perform_exit=False (청산 스킵) 

142 # save_state=False -> perform_exit=True (청산 수행) 

143 perform_exit = not save_state 

144 for cfg in self._strategies: 

145 await self.stop_strategy(cfg.strategy.name, perform_force_exit=perform_exit) 

146 

147 self.close() 

148 self._logger.info("[Scheduler] 정지 (전체 전략 비활성화)") 

149 if self._notification_service: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.INFO, "스케줄러 정지", "전체 전략 비활성화") 

151 

152 # ── 메인 루프 ── 

153 

154 async def _loop(self): 

155 self._logger.info("스케줄러 메인 루프 시작.") 

156 while self._running: 156 ↛ exitline 156 didn't return from function '_loop' because the condition on line 156 was always true

157 try: 

158 market_open = await self._mcs.is_market_open_now() 

159 

160 if not market_open: 

161 # 장이 닫힌 직후(15:40~) 아직 강제 청산 미완료된 전략이 있으면 실행 

162 if self._force_exit_done is not None: 162 ↛ 178line 162 didn't jump to line 178 because the condition on line 162 was always true

163 for cfg in self._strategies: 163 ↛ 164line 163 didn't jump to line 164 because the loop on line 163 never started

164 if (cfg.enabled and cfg.force_exit_on_close 

165 and cfg.strategy.name not in self._force_exit_done): 

166 name = cfg.strategy.name 

167 self._force_exit_done.add(name) 

168 self._logger.info( 

169 f"[Scheduler] {name}: 장 마감 후 미처리 강제 청산 실행" 

170 ) 

171 try: 

172 await self._run_strategy(cfg, force_exit_only=True) 

173 except Exception as e: 

174 self._logger.error( 

175 f"[Scheduler] {name} 강제 청산 오류: {e}", exc_info=True 

176 ) 

177 

178 self._logger.info("현재는 휴장일이거나 장 운영 시간이 아닙니다.") 

179 self._force_exit_done.clear() 

180 await self._mcs.wait_until_next_open() 

181 continue 

182 

183 # 장중: 시간 계산 

184 now = self._tm.get_current_kst_time() 

185 close_time = self._tm.get_market_close_time() 

186 minutes_to_close = (close_time - now).total_seconds() / 60 

187 in_force_exit_window = minutes_to_close <= self.FORCE_EXIT_MINUTES_BEFORE 

188 

189 # 1. 실행이 필요한 전략들을 수집 (기아 현상 방지를 위해 나중에 우선순위 정렬) 

190 evaluations = [] 

191 for cfg in self._strategies: 

192 if not cfg.enabled: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 continue 

194 name = cfg.strategy.name 

195 last = self._last_run.get(name) 

196 elapsed = (now - last).total_seconds() if last else float('inf') 

197 

198 # 강제 청산: 마감 N분 전이면 즉시 실행 (1회만) 

199 force_exit = (cfg.force_exit_on_close 

200 and in_force_exit_window 

201 and name not in self._force_exit_done) 

202 

203 # 정규 실행: force_exit_on_close 전략은 마감 전 구간에서 새 매수 금지 

204 should_run = (not force_exit 

205 and not (cfg.force_exit_on_close and in_force_exit_window) 

206 and elapsed >= cfg.interval_minutes * 60) 

207 

208 if should_run or force_exit: 208 ↛ 191line 208 didn't jump to line 191 because the condition on line 208 was always true

209 # 지연 시간(초) 계산 - 처음 실행 시(last가 None) 무한대로 처리 

210 overdue = elapsed - (cfg.interval_minutes * 60) if last else float('inf') 

211 if force_exit: 

212 overdue = float('inf') # 강제 청산은 최우선순위 

213 evaluations.append((overdue, cfg, force_exit)) 

214 

215 # 2. 가장 오래 지연된(overdue가 큰) 전략부터 내림차순 정렬 

216 evaluations.sort(key=lambda x: x[0], reverse=True) 

217 

218 for overdue, cfg, force_exit in evaluations: 

219 name = cfg.strategy.name 

220 

221 # 전략 간 API 자원 충돌 방지 (강제 청산은 쿨다운 무시) 

222 if not force_exit and self._last_execution_time: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 since_last_exec = (now - self._last_execution_time).total_seconds() 

224 if since_last_exec < self.STAGGER_INTERVAL_SEC: 

225 continue 

226 

227 self._last_run[name] = now 

228 if force_exit: 

229 self._force_exit_done.add(name) 

230 self._logger.info(f"[Scheduler] {name}: 장 마감 {minutes_to_close:.1f}분 전 — 강제 청산 실행") 

231 

232 try: 

233 await self._run_strategy(cfg, force_exit_only=force_exit) 

234 except Exception as e: 

235 self._logger.error(f"[Scheduler] {name} 실행 오류: {e}", exc_info=True) 

236 finally: 

237 # 3. 전략 실행이 끝난 이후 시점을 기준으로 쿨다운 타이머를 갱신하여  

238 # 실행 시간이 긴 전략 이후에도 확실하게 60초의 휴지기 보장 

239 if not force_exit: 

240 self._last_execution_time = self._tm.get_current_kst_time() 

241 

242 await asyncio.sleep(self.LOOP_INTERVAL_SEC) 

243 

244 except asyncio.CancelledError: 

245 break 

246 except Exception as e: 

247 self._logger.error(f"[Scheduler] 루프 오류: {e}", exc_info=True) 

248 if self._notification_service: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "스케줄러 루프 오류", str(e)) 

250 await asyncio.sleep(self.LOOP_INTERVAL_SEC) 

251 

252 # ── 전략 실행 ── 

253 

254 async def _run_strategy(self, cfg: StrategySchedulerConfig, force_exit_only: bool = False): 

255 name = cfg.strategy.name 

256 t_run = self._pm.start_timer() 

257 self._logger.info(f"[Scheduler] {name} 실행 시작 (force_exit_only={force_exit_only})") 

258 

259 # 강제 청산 모드: 전략의 check_exits 로직 무시, 보유 종목 전량 시장가 매도 

260 if force_exit_only: 

261 await self._force_liquidate_strategy(cfg) 

262 self._pm.log_timer(f"{name}.run_strategy(force_exit)", t_run) 

263 return 

264 

265 # 1) 보유 종목 청산 조건 체크 

266 holdings = self._virtual_trade_service.get_holds_by_strategy(name) 

267 if holdings: 

268 t_exit = self._pm.start_timer() 

269 sell_signals = await cfg.strategy.check_exits(holdings) 

270 self._pm.log_timer(f"{name}.check_exits({len(holdings)}건)", t_exit) 

271 if sell_signals: 

272 tasks = [self._execute_signal(sig) for sig in sell_signals] 

273 for f in asyncio.as_completed(tasks): 

274 await f 

275 

276 # 2) 새 매수 스캔 

277 current_holdings = self._virtual_trade_service.get_holds_by_strategy(name) 

278 current_holds_count = len(current_holdings) 

279 

280 if current_holds_count >= cfg.max_positions: 

281 self._logger.info( 

282 f"[Scheduler] {name}: 최대 포지션 도달 " 

283 f"({current_holds_count}/{cfg.max_positions}), 스캔 스킵" 

284 ) 

285 self._pm.log_timer(f"{name}.run_strategy", t_run) 

286 return 

287 

288 t_scan = self._pm.start_timer() 

289 buy_signals = await cfg.strategy.scan() 

290 self._pm.log_timer(f"{name}.scan()", t_scan) 

291 

292 # 이미 보유 중인 종목은 추가 매수(불타기) 방지 

293 if cfg.allow_pyramiding: 

294 valid_signals = buy_signals 

295 else: 

296 holding_codes = {str(h.get('code')) for h in current_holdings if h.get('code')} 

297 valid_signals = [s for s in buy_signals if str(s.code) not in holding_codes] 

298 

299 remaining = cfg.max_positions - current_holds_count 

300 target_signals = valid_signals[:remaining] 

301 

302 if target_signals: 

303 for sig in target_signals: 

304 if sig.qty <= 1: 304 ↛ 303line 304 didn't jump to line 303 because the condition on line 304 was always true

305 sig.qty = cfg.order_qty 

306 

307 # 순차 실행: DB 재조회 없이 메모리 카운터로 max_positions 초과 방지 

308 current_count = current_holds_count 

309 for sig in target_signals: 

310 if current_count >= cfg.max_positions: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 self._logger.info( 

312 f"[Scheduler] {name}: 매수 중단 — 최대 포지션 도달 " 

313 f"({current_count}/{cfg.max_positions})" 

314 ) 

315 break 

316 await self._execute_signal(sig) 

317 current_count += 1 

318 

319 self._pm.log_timer(f"{name}.run_strategy", t_run) 

320 self._logger.info(f"[Scheduler] {name} 실행 완료") 

321 

322 # ── 시그널 실행 ── 

323 

324 async def _execute_signal(self, signal: TradeSignal): 

325 self._logger.info( 

326 f"[Scheduler] 시그널 실행: [{signal.strategy_name}] {signal.action} {signal.name}({signal.code}) " 

327 f"@ {signal.price:,}원 | {signal.reason}" 

328 ) 

329 

330 # 기록용 가격 결정 (시장가 0원인 경우 현재가 조회 시도하여 기록 정확도 향상) 

331 log_price = signal.price 

332 if log_price == 0: 

333 try: 

334 # StockQueryService를 통해 현재가 조회 

335 resp = await self._sqs.get_current_price(signal.code, caller="StrategyScheduler") 

336 if resp and resp.rt_cd == ErrorCode.SUCCESS.value: 

337 data = resp.data 

338 output = data.get("output") if isinstance(data, dict) else getattr(data, "output", None) 

339 if output: 

340 val = output.get("stck_prpr") if isinstance(output, dict) else getattr(output, "stck_prpr", 0) 

341 log_price = int(val) 

342 except Exception: 

343 pass # 조회 실패 시 0원으로 기록 유지 

344 

345 # 종목명 보정 (이름이 비어있거나, 종목 코드와 동일하게 들어온 경우) 

346 if not signal.name or signal.name == signal.code: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 signal.name = self.stock_code_repository.get_name_by_code(signal.code) or signal.code 

348 

349 # CSV 기록 (항상) 

350 return_rate = None 

351 if signal.action == "BUY": 

352 await self._virtual_trade_service.log_buy_async(signal.strategy_name, signal.code, log_price, signal.qty) 

353 elif signal.action == "SELL": 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true

354 return_rate = await self._virtual_trade_service.log_sell_by_strategy_async(signal.strategy_name, signal.code, log_price, signal.qty) 

355 

356 api_success = True 

357 

358 # API 주문 (dry_run이 아닐 때) 

359 if not self._dry_run: 

360 try: 

361 try: 

362 signal_exchange = Exchange(signal.exchange) if signal.exchange else Exchange.KRX 

363 except ValueError: 

364 signal_exchange = Exchange.KRX 

365 if signal.action == "BUY": 

366 resp = await self._oes.handle_place_buy_order( 

367 signal.code, signal.price, signal.qty, exchange=signal_exchange 

368 ) 

369 else: 

370 resp = await self._oes.handle_place_sell_order( 

371 signal.code, signal.price, signal.qty, exchange=signal_exchange 

372 ) 

373 

374 if resp and resp.rt_cd == ErrorCode.SUCCESS.value: 

375 self._logger.info( 

376 f"[Scheduler] API 주문 성공: {signal.action} {signal.code}" 

377 ) 

378 else: 

379 api_success = False 

380 msg = resp.msg1 if resp else "응답 없음" 

381 self._logger.warning( 

382 f"[Scheduler] API 주문 실패: {signal.action} {signal.code} - {msg} " 

383 f"(CSV는 기록됨)" 

384 ) 

385 except Exception as e: 

386 api_success = False 

387 self._logger.error( 

388 f"[Scheduler] API 주문 예외: {signal.action} {signal.code} - {e} " 

389 f"(CSV는 기록됨)" 

390 ) 

391 

392 # 시그널 이력 기록 (메모리 + CSV 영속화) 

393 now = self._tm.get_current_kst_time() 

394 record = SignalRecord( 

395 strategy_name=signal.strategy_name, 

396 code=signal.code, 

397 name=signal.name, 

398 action=signal.action, 

399 price=log_price, 

400 qty=signal.qty, 

401 reason=signal.reason, 

402 timestamp=now.strftime("%Y-%m-%d %H:%M:%S"), 

403 api_success=api_success, 

404 return_rate=return_rate, 

405 ) 

406 self._signal_history.append(record) 

407 if len(self._signal_history) > self.MAX_HISTORY: 

408 self._signal_history = self._signal_history[-self.MAX_HISTORY:] 

409 await self._append_signal_db(record) 

410 await self._notify_subscribers(record) 

411 

412 if self._notification_service: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true

413 action_kr = "매수" if signal.action == "BUY" else "매도" 

414 level = NotificationLevel.CRITICAL if api_success else NotificationLevel.ERROR 

415 title = f"[{signal.strategy_name}] {signal.name} {action_kr} {'성공' if api_success else '실패'}" 

416 msg = (f"종목: {signal.name}({signal.code})\n" 

417 f"주문: {log_price:,}원 × {signal.qty}주\n" 

418 f"사유: {signal.reason}") 

419 if not api_success: 

420 title = f"[{signal.strategy_name}] {signal.name} {action_kr} 실패" 

421 await self._notification_service.emit(NotificationCategory.STRATEGY, level, title, msg, metadata={ 

422 "strategy_name": signal.strategy_name, 

423 "code": signal.code, 

424 "action": signal.action, 

425 "price": log_price, 

426 "qty": signal.qty, 

427 "reason": signal.reason, 

428 "api_success": api_success, 

429 "return_rate": return_rate, 

430 }) 

431 

432 async def _force_liquidate_strategy(self, cfg: StrategySchedulerConfig): 

433 """전략 중지 시 보유 종목 강제 청산 (force_exit_on_close=True).""" 

434 name = cfg.strategy.name 

435 holdings = self._virtual_trade_service.get_holds_by_strategy(name) 

436 if not holdings: 

437 return 

438 

439 self._logger.info(f"[Scheduler] {name} 종료로 인한 강제 청산 실행 (보유 {len(holdings)}건)") 

440 

441 for hold in holdings: 

442 code = hold.get("code") 

443 if not code: 

444 continue 

445 

446 stock_name = hold.get("name", code) 

447 

448 # 보유 수량 조회 (VirtualTradeRepository가 qty를 반환해야 함) 

449 # 만약 qty 정보가 없다면 설정된 주문 수량(cfg.order_qty)을 fallback으로 사용 

450 holding_qty = int(hold.get("qty") or 0) 

451 if holding_qty <= 0: 

452 holding_qty = cfg.order_qty 

453 

454 # 시장가 매도를 위해 가격 0 설정 

455 signal = TradeSignal( 

456 strategy_name=name, 

457 code=code, 

458 name=stock_name, 

459 action="SELL", 

460 price=0, # 시장가 

461 qty=holding_qty, 

462 reason="전략 종료 강제 청산 (시장가)" 

463 ) 

464 await self._execute_signal(signal) 

465 

466 # ── 개별 전략 제어 ── 

467 

468 async def start_strategy(self, name: str) -> bool: 

469 """개별 전략 활성화. 루프가 돌고 있지 않으면 자동 시작. 성공 시 True 반환.""" 

470 for cfg in self._strategies: 

471 if cfg.strategy.name == name: 471 ↛ 470line 471 didn't jump to line 470 because the condition on line 471 was always true

472 cfg.enabled = True 

473 self._logger.info(f"[Scheduler] 전략 활성화: {name}") 

474 # 루프가 안 돌고 있으면 자동으로 시작 

475 if not self._running: 

476 self._running = True 

477 self._task = asyncio.create_task(self._loop()) 

478 self._logger.info("[Scheduler] 루프 자동 시작 (개별 전략 활성화)") 

479 return True 

480 return False 

481 

482 async def stop_strategy(self, name: str, perform_force_exit: bool = True) -> bool: 

483 """개별 전략 비활성화. 성공 시 True 반환.""" 

484 for cfg in self._strategies: 

485 if cfg.strategy.name == name: 

486 if perform_force_exit and cfg.enabled and cfg.force_exit_on_close: 

487 await self._force_liquidate_strategy(cfg) 

488 

489 cfg.enabled = False 

490 self._logger.info(f"[Scheduler] 전략 비활성화: {name}") 

491 return True 

492 return False 

493 

494 async def update_max_positions(self, name: str, new_max: int) -> bool: 

495 """개별 전략의 최대 포지션 수를 동적으로 변경하고 상태를 저장합니다.""" 

496 if new_max < 1: 

497 return False 

498 for cfg in self._strategies: 

499 if cfg.strategy.name == name: 499 ↛ 498line 499 didn't jump to line 498 because the condition on line 499 was always true

500 cfg.max_positions = new_max 

501 self._logger.info(f"[Scheduler] '{name}' 전략 최대 포지션 수 변경: {new_max}") 

502 self._save_scheduler_state() 

503 return True 

504 return False 

505 

506 # ── 상태 조회 ── 

507 

508 def get_status(self) -> dict: 

509 strategies = [] 

510 for cfg in self._strategies: 

511 name = cfg.strategy.name 

512 last = self._last_run.get(name) 

513 holdings = self._virtual_trade_service.get_holds_by_strategy(name) 

514 strategies.append({ 

515 "name": name, 

516 "interval_minutes": cfg.interval_minutes, 

517 "max_positions": cfg.max_positions, 

518 "enabled": cfg.enabled, 

519 "current_holds": len(holdings), 

520 "holdings": holdings, # 상세 보유 내역 추가 

521 "last_run": last.strftime("%H:%M:%S") if last else None, 

522 }) 

523 return { 

524 "running": self._running, 

525 "dry_run": self._dry_run, 

526 "strategies": strategies, 

527 } 

528 

529 # ── DB 영속화 ── 

530 

531 def close(self): 

532 """DB 연결을 닫습니다.""" 

533 self._store.close() 

534 

535 def _load_signal_history(self) -> List[SignalRecord]: 

536 """DB에서 시그널 이력 복원.""" 

537 try: 

538 records_data = self._store.load_signal_history(limit=self.MAX_HISTORY) 

539 records = [ 

540 SignalRecord( 

541 strategy_name=d["strategy_name"], 

542 code=d["code"], 

543 name=d["name"], 

544 action=d["action"], 

545 price=d["price"], 

546 qty=d["qty"], 

547 return_rate=d["return_rate"], 

548 reason=d["reason"], 

549 timestamp=d["timestamp"], 

550 api_success=d["api_success"], 

551 ) 

552 for d in records_data 

553 ] 

554 self._logger.info(f"[Scheduler] 시그널 이력 {len(records)}건 로드 완료") 

555 return records 

556 except Exception as e: 

557 self._logger.error(f"[Scheduler] 시그널 이력 로드 실패: {e}") 

558 return [] 

559 

560 def _save_scheduler_state(self): 

561 """활성 전략 목록 및 설정을 DB에 저장.""" 

562 enabled_names = [cfg.strategy.name for cfg in self._strategies if cfg.enabled] 

563 current_positions = self._virtual_trade_service.get_holds() 

564 state = { 

565 "running": self._running, 

566 "enabled_strategies": enabled_names, 

567 "current_positions": current_positions, 

568 "strategy_configs": { 

569 cfg.strategy.name: {"max_positions": cfg.max_positions} 

570 for cfg in self._strategies 

571 }, 

572 } 

573 try: 

574 self._store.save_state(state) 

575 self._logger.info( 

576 f"[Scheduler] 상태 저장 완료: {enabled_names}, 보유종목 {len(current_positions)}건" 

577 ) 

578 except Exception as e: 

579 self._logger.error(f"[Scheduler] 상태 저장 실패: {e}") 

580 

581 def clear_saved_state(self): 

582 """저장된 상태 삭제 (수동 정지 시 호출).""" 

583 try: 

584 self._store.clear_state() 

585 self._logger.info("[Scheduler] 저장된 상태 삭제") 

586 except Exception as e: 

587 self._logger.error(f"[Scheduler] 상태 삭제 실패: {e}") 

588 

589 async def restore_state(self): 

590 """이전 실행 상태 복원. 활성 전략이 있으면 자동 시작.""" 

591 try: 

592 state = self._store.load_state() 

593 except Exception as e: 

594 self._logger.error(f"[Scheduler] 상태 복원 파일 읽기 실패: {e}") 

595 return 

596 if not state: 

597 return 

598 

599 try: 

600 enabled_names = state.get("enabled_strategies", []) 

601 saved_positions = state.get("current_positions", []) 

602 strategy_configs = state.get("strategy_configs", {}) 

603 

604 if saved_positions: 

605 self._logger.info( 

606 f"[Scheduler] 이전 상태 파일에 저장된 보유 포지션: {len(saved_positions)}건" 

607 ) 

608 

609 restored = [] 

610 for cfg in self._strategies: 

611 if cfg.strategy.name in strategy_configs: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true

612 cfg.max_positions = strategy_configs[cfg.strategy.name].get( 

613 "max_positions", cfg.max_positions 

614 ) 

615 if cfg.strategy.name in enabled_names: 615 ↛ 610line 615 didn't jump to line 610 because the condition on line 615 was always true

616 cfg.enabled = True 

617 restored.append(cfg.strategy.name) 

618 

619 if restored: 

620 self._running = True 

621 self._task = asyncio.create_task(self._loop()) 

622 self._logger.info(f"[Scheduler] 이전 상태 복원 — 자동 시작: {restored}") 

623 except Exception as e: 

624 self._logger.error(f"[Scheduler] 상태 복원 실패: {e}") 

625 

626 async def _append_signal_db(self, record: SignalRecord): 

627 """시그널 1건을 DB에 비동기 삽입.""" 

628 try: 

629 await asyncio.to_thread(self._store.append_signal, record) 

630 except Exception as e: 

631 self._logger.error(f"[Scheduler] 시그널 DB 저장 실패: {e}") 

632 

633 def get_signal_history(self, strategy_name: str = None) -> list: 

634 """시그널 실행 이력 반환. strategy_name 지정 시 해당 전략만 필터.""" 

635 records = self._signal_history 

636 if strategy_name: 

637 records = [r for r in records if r.strategy_name == strategy_name] 

638 # 최신순으로 반환 

639 return [ 

640 { 

641 "strategy_name": r.strategy_name, 

642 "code": r.code, 

643 "name": r.name, 

644 "action": r.action, 

645 "price": r.price, 

646 "qty": r.qty, 

647 "return_rate": r.return_rate, 

648 "reason": r.reason, 

649 "timestamp": r.timestamp, 

650 "api_success": r.api_success, 

651 } 

652 for r in reversed(records) 

653 ] 

654 

655 # ── SSE 구독자 관리 ── 

656 

657 def create_subscriber_queue(self) -> asyncio.Queue: 

658 """SSE 클라이언트용 큐 생성 및 등록.""" 

659 queue: asyncio.Queue = asyncio.Queue(maxsize=100) 

660 self._subscriber_queues.append(queue) 

661 return queue 

662 

663 def remove_subscriber_queue(self, queue: asyncio.Queue): 

664 """SSE 클라이언트 연결 해제 시 큐 제거.""" 

665 if queue in self._subscriber_queues: 

666 self._subscriber_queues.remove(queue) 

667 

668 async def _notify_subscribers(self, record: SignalRecord): 

669 """새 시그널을 모든 SSE 구독자에게 전파.""" 

670 json_data = json.dumps({ 

671 "strategy_name": record.strategy_name, 

672 "code": record.code, 

673 "name": record.name, 

674 "action": record.action, 

675 "price": record.price, 

676 "return_rate": record.return_rate, 

677 "reason": record.reason, 

678 "timestamp": record.timestamp, 

679 "api_success": record.api_success, 

680 }, ensure_ascii=False) 

681 for queue in list(self._subscriber_queues): 

682 try: 

683 queue.put_nowait(json_data) 

684 except asyncio.QueueFull: 

685 try: 

686 queue.get_nowait() 

687 queue.put_nowait(json_data) 

688 except Exception: 

689 pass