Coverage for task / background / intraday / strategy_scheduler_task_adapter.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/strategy_scheduler_task_adapter.py 

2""" 

3StrategyScheduler를 SchedulableTask 인터페이스로 래핑하는 어댑터. 

4기존 StrategyScheduler 코드를 변경하지 않고 BackgroundScheduler에 등록할 수 있게 한다. 

5""" 

6from typing import Dict 

7 

8from interfaces.schedulable_task import SchedulableTask, TaskPriority, TaskState 

9from scheduler.strategy_scheduler import StrategyScheduler 

10 

11 

12class StrategySchedulerTaskAdapter(SchedulableTask): 

13 """StrategyScheduler를 SchedulableTask 인터페이스로 래핑한다. 

14 

15 전략 스케줄러의 scan 루프는 background로 동작하지만, 

16 전략에서 발생하는 매수/매도 주문은 foreground 우선순위로 실행된다. 

17 """ 

18 

19 def __init__(self, scheduler: StrategyScheduler): 

20 self._scheduler = scheduler 

21 self._state: TaskState = TaskState.IDLE 

22 

23 @property 

24 def task_name(self) -> str: 

25 return "strategy_scheduler" 

26 

27 @property 

28 def priority(self) -> TaskPriority: 

29 return TaskPriority.NORMAL 

30 

31 @property 

32 def state(self) -> TaskState: 

33 return self._state 

34 

35 async def start(self) -> None: 

36 """전략 스케줄러의 이전 상태를 복원하고 시작한다.""" 

37 if self._state == TaskState.RUNNING: 

38 return 

39 await self._scheduler.restore_state() 

40 self._state = TaskState.RUNNING 

41 

42 async def stop(self) -> None: 

43 """전략 스케줄러를 정지하고 상태를 저장한다.""" 

44 if self._scheduler._running: 

45 await self._scheduler.stop(save_state=True) 

46 self._state = TaskState.STOPPED 

47 

48 async def suspend(self) -> None: 

49 """전략 스케줄러를 일시 중지한다. 

50 

51 Note: 전략의 매수/매도는 critical 우선순위이므로 실제로는 

52 scan 루프만 일시 중지하는 것이 이상적이다. 

53 현재는 상태 플래그만 변경하고, 실제 루프 중지는 하지 않는다. 

54 """ 

55 if self._state == TaskState.RUNNING: 

56 self._state = TaskState.SUSPENDED 

57 

58 async def resume(self) -> None: 

59 """전략 스케줄러를 재개한다.""" 

60 if self._state == TaskState.SUSPENDED: 

61 self._state = TaskState.RUNNING 

62 

63 def get_progress(self) -> Dict: 

64 """태스크 진행률 반환 (SchedulableTask 인터페이스 구현). 

65 

66 전략 스케줄러는 배치 진행률이 없으므로 활성 전략 수를 반환한다. 

67 """ 

68 active_strategies = 0 

69 total_strategies = 0 

70 try: 

71 status = self._scheduler.get_status() 

72 strategies = status.get("strategies", []) 

73 total_strategies = len(strategies) 

74 active_strategies = sum(1 for s in strategies if s.get("enabled")) 

75 except Exception: 

76 pass 

77 

78 return { 

79 "running": self._state == TaskState.RUNNING, 

80 "active_strategies": active_strategies, 

81 "total_strategies": total_strategies, 

82 }