Coverage for scheduler / strategy_scheduler.py: 89%
397 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# scheduler/strategy_scheduler.py
2from __future__ import annotations
4import asyncio
5import json
6import logging
7from dataclasses import dataclass, field
8from datetime import datetime
9from typing import Dict, List, Optional
11from interfaces.live_strategy import LiveStrategy
12from common.types import TradeSignal, ErrorCode, Exchange
13from services.market_calendar_service import MarketCalendarService
14from services.virtual_trade_service import VirtualTradeService
15from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
16from services.order_execution_service import OrderExecutionService
17from repositories.stock_code_repository import StockCodeRepository
18from services.stock_query_service import StockQueryService
19from core.market_clock import MarketClock
20from core.performance_profiler import PerformanceProfiler
22from scheduler.strategy_scheduler_store import StrategySchedulerStore, SCHEDULER_DB_FILE
25@dataclass
26class SignalRecord:
27 """실행된 시그널 이력 레코드."""
28 strategy_name: str
29 code: str
30 name: str
31 action: str # BUY / SELL
32 price: int
33 qty: int = 1
34 reason: str = ""
35 timestamp: str = "" # ISO format
36 api_success: bool = True
37 return_rate: Optional[float] = None
40@dataclass
41class StrategySchedulerConfig:
42 """전략별 스케줄링 설정."""
43 strategy: LiveStrategy
44 interval_minutes: int = 5 # 실행 주기 (분)
45 max_positions: int = 3 # 최대 동시 보유 포지션 수
46 order_qty: int = 1 # 주문 수량
47 enabled: bool = True # 개별 전략 활성/비활성
48 allow_pyramiding: bool = False # 불타기(추가매수) 허용 여부
49 force_exit_on_close: bool = False # 당일 청산 여부
52class StrategyScheduler:
53 """asyncio 기반 단일 스레드 전략 스케줄러.
55 등록된 전략들을 장중에 주기적으로 실행하고,
56 발생한 TradeSignal을 CSV 기록 + API 주문으로 처리한다.
57 """
59 LOOP_INTERVAL_SEC = 1 # 메인 루프 깨어나는 주기
60 MARKET_CLOSED_SLEEP_SEC = 60 # 장 외 시간 sleep
61 FORCE_EXIT_MINUTES_BEFORE = 30 # 장 마감 N분 전 강제 청산
62 STAGGER_INTERVAL_SEC = 60 # 전략 간 실행 시차 (초)
64 def __init__(
65 self,
66 virtual_trade_service: VirtualTradeService,
67 order_execution_service: OrderExecutionService,
68 stock_query_service: StockQueryService,
69 stock_code_repository: StockCodeRepository,
70 market_clock: MarketClock,
71 market_calendar_service: MarketCalendarService,
72 logger: Optional[logging.Logger] = None,
73 dry_run: bool = False,
74 notification_service: Optional[NotificationService] = None,
75 performance_profiler: Optional[PerformanceProfiler] = None,
76 store: Optional[StrategySchedulerStore] = None,
77 ):
78 self._virtual_trade_service = virtual_trade_service
79 self._oes = order_execution_service
80 self._sqs = stock_query_service
81 self.stock_code_repository = stock_code_repository
82 self._tm = market_clock
83 self._logger = logger or logging.getLogger(__name__)
84 self._dry_run = dry_run
85 self._notification_service = notification_service
86 self._mcs = market_calendar_service
87 self._pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
89 self._store = store or StrategySchedulerStore(db_path=SCHEDULER_DB_FILE, logger=self._logger)
91 self._strategies: List[StrategySchedulerConfig] = []
92 self._running = False
93 self._task: Optional[asyncio.Task] = None
94 self._last_run: Dict[str, datetime] = {}
95 self._last_execution_time: Optional[datetime] = None # 전략 간 실행 쿨다운용
96 self._force_exit_done: set = set() # 당일 강제 청산 완료된 전략
97 self.MAX_HISTORY = 200 # 최대 보관 이력 수
98 self._signal_history: List[SignalRecord] = self._load_signal_history()
99 self._subscriber_queues: List[asyncio.Queue] = []
101 # ── 전략 등록 ──
103 def register(self, config: StrategySchedulerConfig):
104 self._strategies.append(config)
105 self._logger.info(
106 f"[Scheduler] 전략 등록: {config.strategy.name} "
107 f"(주기={config.interval_minutes}분, 최대포지션={config.max_positions})"
108 )
110 # ── 생명주기 ──
112 async def start(self):
113 if self._running:
114 self._logger.warning("[Scheduler] 이미 실행 중")
115 return
116 for cfg in self._strategies:
117 cfg.enabled = True
118 self._running = True
119 self._task = asyncio.create_task(self._loop())
120 self._logger.info("[Scheduler] 시작 (전체 전략 활성화)")
121 if self._notification_service: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 names = [c.strategy.name for c in self._strategies if c.enabled]
123 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.INFO, "스케줄러 시작", f"활성 전략: {', '.join(names)}")
125 async def stop(self, save_state: bool = False):
126 if save_state:
127 self._save_scheduler_state()
129 self._running = False
130 for cfg in self._strategies:
131 cfg.enabled = False
132 if self._task:
133 self._task.cancel()
134 try:
135 await self._task
136 except asyncio.CancelledError:
137 pass
138 self._task = None
140 # 상태 저장을 동반한 종료(재시작 등)라면 강제 청산을 하지 않는다.
141 # save_state=True -> perform_exit=False (청산 스킵)
142 # save_state=False -> perform_exit=True (청산 수행)
143 perform_exit = not save_state
144 for cfg in self._strategies:
145 await self.stop_strategy(cfg.strategy.name, perform_force_exit=perform_exit)
147 self.close()
148 self._logger.info("[Scheduler] 정지 (전체 전략 비활성화)")
149 if self._notification_service: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.INFO, "스케줄러 정지", "전체 전략 비활성화")
152 # ── 메인 루프 ──
154 async def _loop(self):
155 self._logger.info("스케줄러 메인 루프 시작.")
156 while self._running: 156 ↛ exitline 156 didn't return from function '_loop' because the condition on line 156 was always true
157 try:
158 market_open = await self._mcs.is_market_open_now()
160 if not market_open:
161 # 장이 닫힌 직후(15:40~) 아직 강제 청산 미완료된 전략이 있으면 실행
162 if self._force_exit_done is not None: 162 ↛ 178line 162 didn't jump to line 178 because the condition on line 162 was always true
163 for cfg in self._strategies: 163 ↛ 164line 163 didn't jump to line 164 because the loop on line 163 never started
164 if (cfg.enabled and cfg.force_exit_on_close
165 and cfg.strategy.name not in self._force_exit_done):
166 name = cfg.strategy.name
167 self._force_exit_done.add(name)
168 self._logger.info(
169 f"[Scheduler] {name}: 장 마감 후 미처리 강제 청산 실행"
170 )
171 try:
172 await self._run_strategy(cfg, force_exit_only=True)
173 except Exception as e:
174 self._logger.error(
175 f"[Scheduler] {name} 강제 청산 오류: {e}", exc_info=True
176 )
178 self._logger.info("현재는 휴장일이거나 장 운영 시간이 아닙니다.")
179 self._force_exit_done.clear()
180 await self._mcs.wait_until_next_open()
181 continue
183 # 장중: 시간 계산
184 now = self._tm.get_current_kst_time()
185 close_time = self._tm.get_market_close_time()
186 minutes_to_close = (close_time - now).total_seconds() / 60
187 in_force_exit_window = minutes_to_close <= self.FORCE_EXIT_MINUTES_BEFORE
189 # 1. 실행이 필요한 전략들을 수집 (기아 현상 방지를 위해 나중에 우선순위 정렬)
190 evaluations = []
191 for cfg in self._strategies:
192 if not cfg.enabled: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 continue
194 name = cfg.strategy.name
195 last = self._last_run.get(name)
196 elapsed = (now - last).total_seconds() if last else float('inf')
198 # 강제 청산: 마감 N분 전이면 즉시 실행 (1회만)
199 force_exit = (cfg.force_exit_on_close
200 and in_force_exit_window
201 and name not in self._force_exit_done)
203 # 정규 실행: force_exit_on_close 전략은 마감 전 구간에서 새 매수 금지
204 should_run = (not force_exit
205 and not (cfg.force_exit_on_close and in_force_exit_window)
206 and elapsed >= cfg.interval_minutes * 60)
208 if should_run or force_exit: 208 ↛ 191line 208 didn't jump to line 191 because the condition on line 208 was always true
209 # 지연 시간(초) 계산 - 처음 실행 시(last가 None) 무한대로 처리
210 overdue = elapsed - (cfg.interval_minutes * 60) if last else float('inf')
211 if force_exit:
212 overdue = float('inf') # 강제 청산은 최우선순위
213 evaluations.append((overdue, cfg, force_exit))
215 # 2. 가장 오래 지연된(overdue가 큰) 전략부터 내림차순 정렬
216 evaluations.sort(key=lambda x: x[0], reverse=True)
218 for overdue, cfg, force_exit in evaluations:
219 name = cfg.strategy.name
221 # 전략 간 API 자원 충돌 방지 (강제 청산은 쿨다운 무시)
222 if not force_exit and self._last_execution_time: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 since_last_exec = (now - self._last_execution_time).total_seconds()
224 if since_last_exec < self.STAGGER_INTERVAL_SEC:
225 continue
227 self._last_run[name] = now
228 if force_exit:
229 self._force_exit_done.add(name)
230 self._logger.info(f"[Scheduler] {name}: 장 마감 {minutes_to_close:.1f}분 전 — 강제 청산 실행")
232 try:
233 await self._run_strategy(cfg, force_exit_only=force_exit)
234 except Exception as e:
235 self._logger.error(f"[Scheduler] {name} 실행 오류: {e}", exc_info=True)
236 finally:
237 # 3. 전략 실행이 끝난 이후 시점을 기준으로 쿨다운 타이머를 갱신하여
238 # 실행 시간이 긴 전략 이후에도 확실하게 60초의 휴지기 보장
239 if not force_exit:
240 self._last_execution_time = self._tm.get_current_kst_time()
242 await asyncio.sleep(self.LOOP_INTERVAL_SEC)
244 except asyncio.CancelledError:
245 break
246 except Exception as e:
247 self._logger.error(f"[Scheduler] 루프 오류: {e}", exc_info=True)
248 if self._notification_service: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "스케줄러 루프 오류", str(e))
250 await asyncio.sleep(self.LOOP_INTERVAL_SEC)
252 # ── 전략 실행 ──
254 async def _run_strategy(self, cfg: StrategySchedulerConfig, force_exit_only: bool = False):
255 name = cfg.strategy.name
256 t_run = self._pm.start_timer()
257 self._logger.info(f"[Scheduler] {name} 실행 시작 (force_exit_only={force_exit_only})")
259 # 강제 청산 모드: 전략의 check_exits 로직 무시, 보유 종목 전량 시장가 매도
260 if force_exit_only:
261 await self._force_liquidate_strategy(cfg)
262 self._pm.log_timer(f"{name}.run_strategy(force_exit)", t_run)
263 return
265 # 1) 보유 종목 청산 조건 체크
266 holdings = self._virtual_trade_service.get_holds_by_strategy(name)
267 if holdings:
268 t_exit = self._pm.start_timer()
269 sell_signals = await cfg.strategy.check_exits(holdings)
270 self._pm.log_timer(f"{name}.check_exits({len(holdings)}건)", t_exit)
271 if sell_signals:
272 tasks = [self._execute_signal(sig) for sig in sell_signals]
273 for f in asyncio.as_completed(tasks):
274 await f
276 # 2) 새 매수 스캔
277 current_holdings = self._virtual_trade_service.get_holds_by_strategy(name)
278 current_holds_count = len(current_holdings)
280 if current_holds_count >= cfg.max_positions:
281 self._logger.info(
282 f"[Scheduler] {name}: 최대 포지션 도달 "
283 f"({current_holds_count}/{cfg.max_positions}), 스캔 스킵"
284 )
285 self._pm.log_timer(f"{name}.run_strategy", t_run)
286 return
288 t_scan = self._pm.start_timer()
289 buy_signals = await cfg.strategy.scan()
290 self._pm.log_timer(f"{name}.scan()", t_scan)
292 # 이미 보유 중인 종목은 추가 매수(불타기) 방지
293 if cfg.allow_pyramiding:
294 valid_signals = buy_signals
295 else:
296 holding_codes = {str(h.get('code')) for h in current_holdings if h.get('code')}
297 valid_signals = [s for s in buy_signals if str(s.code) not in holding_codes]
299 remaining = cfg.max_positions - current_holds_count
300 target_signals = valid_signals[:remaining]
302 if target_signals:
303 for sig in target_signals:
304 if sig.qty <= 1: 304 ↛ 303line 304 didn't jump to line 303 because the condition on line 304 was always true
305 sig.qty = cfg.order_qty
307 # 순차 실행: DB 재조회 없이 메모리 카운터로 max_positions 초과 방지
308 current_count = current_holds_count
309 for sig in target_signals:
310 if current_count >= cfg.max_positions: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 self._logger.info(
312 f"[Scheduler] {name}: 매수 중단 — 최대 포지션 도달 "
313 f"({current_count}/{cfg.max_positions})"
314 )
315 break
316 await self._execute_signal(sig)
317 current_count += 1
319 self._pm.log_timer(f"{name}.run_strategy", t_run)
320 self._logger.info(f"[Scheduler] {name} 실행 완료")
322 # ── 시그널 실행 ──
324 async def _execute_signal(self, signal: TradeSignal):
325 self._logger.info(
326 f"[Scheduler] 시그널 실행: [{signal.strategy_name}] {signal.action} {signal.name}({signal.code}) "
327 f"@ {signal.price:,}원 | {signal.reason}"
328 )
330 # 기록용 가격 결정 (시장가 0원인 경우 현재가 조회 시도하여 기록 정확도 향상)
331 log_price = signal.price
332 if log_price == 0:
333 try:
334 # StockQueryService를 통해 현재가 조회
335 resp = await self._sqs.get_current_price(signal.code, caller="StrategyScheduler")
336 if resp and resp.rt_cd == ErrorCode.SUCCESS.value:
337 data = resp.data
338 output = data.get("output") if isinstance(data, dict) else getattr(data, "output", None)
339 if output:
340 val = output.get("stck_prpr") if isinstance(output, dict) else getattr(output, "stck_prpr", 0)
341 log_price = int(val)
342 except Exception:
343 pass # 조회 실패 시 0원으로 기록 유지
345 # 종목명 보정 (이름이 비어있거나, 종목 코드와 동일하게 들어온 경우)
346 if not signal.name or signal.name == signal.code: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 signal.name = self.stock_code_repository.get_name_by_code(signal.code) or signal.code
349 # CSV 기록 (항상)
350 return_rate = None
351 if signal.action == "BUY":
352 await self._virtual_trade_service.log_buy_async(signal.strategy_name, signal.code, log_price, signal.qty)
353 elif signal.action == "SELL": 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true
354 return_rate = await self._virtual_trade_service.log_sell_by_strategy_async(signal.strategy_name, signal.code, log_price, signal.qty)
356 api_success = True
358 # API 주문 (dry_run이 아닐 때)
359 if not self._dry_run:
360 try:
361 try:
362 signal_exchange = Exchange(signal.exchange) if signal.exchange else Exchange.KRX
363 except ValueError:
364 signal_exchange = Exchange.KRX
365 if signal.action == "BUY":
366 resp = await self._oes.handle_place_buy_order(
367 signal.code, signal.price, signal.qty, exchange=signal_exchange
368 )
369 else:
370 resp = await self._oes.handle_place_sell_order(
371 signal.code, signal.price, signal.qty, exchange=signal_exchange
372 )
374 if resp and resp.rt_cd == ErrorCode.SUCCESS.value:
375 self._logger.info(
376 f"[Scheduler] API 주문 성공: {signal.action} {signal.code}"
377 )
378 else:
379 api_success = False
380 msg = resp.msg1 if resp else "응답 없음"
381 self._logger.warning(
382 f"[Scheduler] API 주문 실패: {signal.action} {signal.code} - {msg} "
383 f"(CSV는 기록됨)"
384 )
385 except Exception as e:
386 api_success = False
387 self._logger.error(
388 f"[Scheduler] API 주문 예외: {signal.action} {signal.code} - {e} "
389 f"(CSV는 기록됨)"
390 )
392 # 시그널 이력 기록 (메모리 + CSV 영속화)
393 now = self._tm.get_current_kst_time()
394 record = SignalRecord(
395 strategy_name=signal.strategy_name,
396 code=signal.code,
397 name=signal.name,
398 action=signal.action,
399 price=log_price,
400 qty=signal.qty,
401 reason=signal.reason,
402 timestamp=now.strftime("%Y-%m-%d %H:%M:%S"),
403 api_success=api_success,
404 return_rate=return_rate,
405 )
406 self._signal_history.append(record)
407 if len(self._signal_history) > self.MAX_HISTORY:
408 self._signal_history = self._signal_history[-self.MAX_HISTORY:]
409 await self._append_signal_db(record)
410 await self._notify_subscribers(record)
412 if self._notification_service: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 action_kr = "매수" if signal.action == "BUY" else "매도"
414 level = NotificationLevel.CRITICAL if api_success else NotificationLevel.ERROR
415 title = f"[{signal.strategy_name}] {signal.name} {action_kr} {'성공' if api_success else '실패'}"
416 msg = (f"종목: {signal.name}({signal.code})\n"
417 f"주문: {log_price:,}원 × {signal.qty}주\n"
418 f"사유: {signal.reason}")
419 if not api_success:
420 title = f"[{signal.strategy_name}] {signal.name} {action_kr} 실패"
421 await self._notification_service.emit(NotificationCategory.STRATEGY, level, title, msg, metadata={
422 "strategy_name": signal.strategy_name,
423 "code": signal.code,
424 "action": signal.action,
425 "price": log_price,
426 "qty": signal.qty,
427 "reason": signal.reason,
428 "api_success": api_success,
429 "return_rate": return_rate,
430 })
432 async def _force_liquidate_strategy(self, cfg: StrategySchedulerConfig):
433 """전략 중지 시 보유 종목 강제 청산 (force_exit_on_close=True)."""
434 name = cfg.strategy.name
435 holdings = self._virtual_trade_service.get_holds_by_strategy(name)
436 if not holdings:
437 return
439 self._logger.info(f"[Scheduler] {name} 종료로 인한 강제 청산 실행 (보유 {len(holdings)}건)")
441 for hold in holdings:
442 code = hold.get("code")
443 if not code:
444 continue
446 stock_name = hold.get("name", code)
448 # 보유 수량 조회 (VirtualTradeRepository가 qty를 반환해야 함)
449 # 만약 qty 정보가 없다면 설정된 주문 수량(cfg.order_qty)을 fallback으로 사용
450 holding_qty = int(hold.get("qty") or 0)
451 if holding_qty <= 0:
452 holding_qty = cfg.order_qty
454 # 시장가 매도를 위해 가격 0 설정
455 signal = TradeSignal(
456 strategy_name=name,
457 code=code,
458 name=stock_name,
459 action="SELL",
460 price=0, # 시장가
461 qty=holding_qty,
462 reason="전략 종료 강제 청산 (시장가)"
463 )
464 await self._execute_signal(signal)
466 # ── 개별 전략 제어 ──
468 async def start_strategy(self, name: str) -> bool:
469 """개별 전략 활성화. 루프가 돌고 있지 않으면 자동 시작. 성공 시 True 반환."""
470 for cfg in self._strategies:
471 if cfg.strategy.name == name: 471 ↛ 470line 471 didn't jump to line 470 because the condition on line 471 was always true
472 cfg.enabled = True
473 self._logger.info(f"[Scheduler] 전략 활성화: {name}")
474 # 루프가 안 돌고 있으면 자동으로 시작
475 if not self._running:
476 self._running = True
477 self._task = asyncio.create_task(self._loop())
478 self._logger.info("[Scheduler] 루프 자동 시작 (개별 전략 활성화)")
479 return True
480 return False
482 async def stop_strategy(self, name: str, perform_force_exit: bool = True) -> bool:
483 """개별 전략 비활성화. 성공 시 True 반환."""
484 for cfg in self._strategies:
485 if cfg.strategy.name == name:
486 if perform_force_exit and cfg.enabled and cfg.force_exit_on_close:
487 await self._force_liquidate_strategy(cfg)
489 cfg.enabled = False
490 self._logger.info(f"[Scheduler] 전략 비활성화: {name}")
491 return True
492 return False
494 async def update_max_positions(self, name: str, new_max: int) -> bool:
495 """개별 전략의 최대 포지션 수를 동적으로 변경하고 상태를 저장합니다."""
496 if new_max < 1:
497 return False
498 for cfg in self._strategies:
499 if cfg.strategy.name == name: 499 ↛ 498line 499 didn't jump to line 498 because the condition on line 499 was always true
500 cfg.max_positions = new_max
501 self._logger.info(f"[Scheduler] '{name}' 전략 최대 포지션 수 변경: {new_max}")
502 self._save_scheduler_state()
503 return True
504 return False
506 # ── 상태 조회 ──
508 def get_status(self) -> dict:
509 strategies = []
510 for cfg in self._strategies:
511 name = cfg.strategy.name
512 last = self._last_run.get(name)
513 holdings = self._virtual_trade_service.get_holds_by_strategy(name)
514 strategies.append({
515 "name": name,
516 "interval_minutes": cfg.interval_minutes,
517 "max_positions": cfg.max_positions,
518 "enabled": cfg.enabled,
519 "current_holds": len(holdings),
520 "holdings": holdings, # 상세 보유 내역 추가
521 "last_run": last.strftime("%H:%M:%S") if last else None,
522 })
523 return {
524 "running": self._running,
525 "dry_run": self._dry_run,
526 "strategies": strategies,
527 }
529 # ── DB 영속화 ──
531 def close(self):
532 """DB 연결을 닫습니다."""
533 self._store.close()
535 def _load_signal_history(self) -> List[SignalRecord]:
536 """DB에서 시그널 이력 복원."""
537 try:
538 records_data = self._store.load_signal_history(limit=self.MAX_HISTORY)
539 records = [
540 SignalRecord(
541 strategy_name=d["strategy_name"],
542 code=d["code"],
543 name=d["name"],
544 action=d["action"],
545 price=d["price"],
546 qty=d["qty"],
547 return_rate=d["return_rate"],
548 reason=d["reason"],
549 timestamp=d["timestamp"],
550 api_success=d["api_success"],
551 )
552 for d in records_data
553 ]
554 self._logger.info(f"[Scheduler] 시그널 이력 {len(records)}건 로드 완료")
555 return records
556 except Exception as e:
557 self._logger.error(f"[Scheduler] 시그널 이력 로드 실패: {e}")
558 return []
560 def _save_scheduler_state(self):
561 """활성 전략 목록 및 설정을 DB에 저장."""
562 enabled_names = [cfg.strategy.name for cfg in self._strategies if cfg.enabled]
563 current_positions = self._virtual_trade_service.get_holds()
564 state = {
565 "running": self._running,
566 "enabled_strategies": enabled_names,
567 "current_positions": current_positions,
568 "strategy_configs": {
569 cfg.strategy.name: {"max_positions": cfg.max_positions}
570 for cfg in self._strategies
571 },
572 }
573 try:
574 self._store.save_state(state)
575 self._logger.info(
576 f"[Scheduler] 상태 저장 완료: {enabled_names}, 보유종목 {len(current_positions)}건"
577 )
578 except Exception as e:
579 self._logger.error(f"[Scheduler] 상태 저장 실패: {e}")
581 def clear_saved_state(self):
582 """저장된 상태 삭제 (수동 정지 시 호출)."""
583 try:
584 self._store.clear_state()
585 self._logger.info("[Scheduler] 저장된 상태 삭제")
586 except Exception as e:
587 self._logger.error(f"[Scheduler] 상태 삭제 실패: {e}")
589 async def restore_state(self):
590 """이전 실행 상태 복원. 활성 전략이 있으면 자동 시작."""
591 try:
592 state = self._store.load_state()
593 except Exception as e:
594 self._logger.error(f"[Scheduler] 상태 복원 파일 읽기 실패: {e}")
595 return
596 if not state:
597 return
599 try:
600 enabled_names = state.get("enabled_strategies", [])
601 saved_positions = state.get("current_positions", [])
602 strategy_configs = state.get("strategy_configs", {})
604 if saved_positions:
605 self._logger.info(
606 f"[Scheduler] 이전 상태 파일에 저장된 보유 포지션: {len(saved_positions)}건"
607 )
609 restored = []
610 for cfg in self._strategies:
611 if cfg.strategy.name in strategy_configs: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true
612 cfg.max_positions = strategy_configs[cfg.strategy.name].get(
613 "max_positions", cfg.max_positions
614 )
615 if cfg.strategy.name in enabled_names: 615 ↛ 610line 615 didn't jump to line 610 because the condition on line 615 was always true
616 cfg.enabled = True
617 restored.append(cfg.strategy.name)
619 if restored:
620 self._running = True
621 self._task = asyncio.create_task(self._loop())
622 self._logger.info(f"[Scheduler] 이전 상태 복원 — 자동 시작: {restored}")
623 except Exception as e:
624 self._logger.error(f"[Scheduler] 상태 복원 실패: {e}")
626 async def _append_signal_db(self, record: SignalRecord):
627 """시그널 1건을 DB에 비동기 삽입."""
628 try:
629 await asyncio.to_thread(self._store.append_signal, record)
630 except Exception as e:
631 self._logger.error(f"[Scheduler] 시그널 DB 저장 실패: {e}")
633 def get_signal_history(self, strategy_name: str = None) -> list:
634 """시그널 실행 이력 반환. strategy_name 지정 시 해당 전략만 필터."""
635 records = self._signal_history
636 if strategy_name:
637 records = [r for r in records if r.strategy_name == strategy_name]
638 # 최신순으로 반환
639 return [
640 {
641 "strategy_name": r.strategy_name,
642 "code": r.code,
643 "name": r.name,
644 "action": r.action,
645 "price": r.price,
646 "qty": r.qty,
647 "return_rate": r.return_rate,
648 "reason": r.reason,
649 "timestamp": r.timestamp,
650 "api_success": r.api_success,
651 }
652 for r in reversed(records)
653 ]
655 # ── SSE 구독자 관리 ──
657 def create_subscriber_queue(self) -> asyncio.Queue:
658 """SSE 클라이언트용 큐 생성 및 등록."""
659 queue: asyncio.Queue = asyncio.Queue(maxsize=100)
660 self._subscriber_queues.append(queue)
661 return queue
663 def remove_subscriber_queue(self, queue: asyncio.Queue):
664 """SSE 클라이언트 연결 해제 시 큐 제거."""
665 if queue in self._subscriber_queues:
666 self._subscriber_queues.remove(queue)
668 async def _notify_subscribers(self, record: SignalRecord):
669 """새 시그널을 모든 SSE 구독자에게 전파."""
670 json_data = json.dumps({
671 "strategy_name": record.strategy_name,
672 "code": record.code,
673 "name": record.name,
674 "action": record.action,
675 "price": record.price,
676 "return_rate": record.return_rate,
677 "reason": record.reason,
678 "timestamp": record.timestamp,
679 "api_success": record.api_success,
680 }, ensure_ascii=False)
681 for queue in list(self._subscriber_queues):
682 try:
683 queue.put_nowait(json_data)
684 except asyncio.QueueFull:
685 try:
686 queue.get_nowait()
687 queue.put_nowait(json_data)
688 except Exception:
689 pass