Coverage for scheduler / background_scheduler.py: 91%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# scheduler/background_scheduler.py
2"""
3백그라운드 태스크 라이프사이클 관리 스케줄러.
4SchedulableTask 인터페이스를 구현한 태스크들을 등록하고
5start/stop/suspend/resume을 통합 관리한다.
6"""
7import logging
8from typing import Dict, List, Optional
10from interfaces.schedulable_task import SchedulableTask, TaskState
11from core.performance_profiler import PerformanceProfiler
14class BackgroundScheduler:
15 """SchedulableTask 기반 백그라운드 태스크 라이프사이클 관리자.
17 비즈니스 로직 없이 순수하게 태스크의 생명주기만 관리한다.
18 """
20 def __init__(
21 self,
22 logger=None,
23 performance_profiler: Optional[PerformanceProfiler] = None,
24 ):
25 self._logger = logger or logging.getLogger(__name__)
26 self._pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
27 self._tasks: Dict[str, SchedulableTask] = {} # name -> task
29 def register(self, task: SchedulableTask) -> None:
30 """SchedulableTask를 등록한다."""
31 if task.task_name in self._tasks:
32 self._logger.warning(f"[BackgroundScheduler] 태스크 '{task.task_name}' 이미 등록됨 — 덮어씁니다.")
33 self._tasks[task.task_name] = task
34 self._logger.info(
35 f"[BackgroundScheduler] 태스크 등록: {task.task_name} (priority={task.priority})"
36 )
38 def unregister(self, task_name: str) -> None:
39 """등록된 태스크를 제거한다."""
40 if task_name in self._tasks: 40 ↛ exitline 40 didn't return from function 'unregister' because the condition on line 40 was always true
41 del self._tasks[task_name]
42 self._logger.info(f"[BackgroundScheduler] 태스크 제거: {task_name}")
44 async def start_all(self) -> None:
45 """등록된 모든 태스크를 시작한다."""
46 t_start = self._pm.start_timer()
47 self._logger.info(f"[BackgroundScheduler] 전체 시작: {len(self._tasks)}개 태스크")
48 for name, task in self._tasks.items():
49 if task.state in (TaskState.IDLE, TaskState.STOPPED):
50 try:
51 await task.start()
52 self._logger.info(f"[BackgroundScheduler] '{name}' 시작 완료")
53 except Exception as e:
54 self._logger.error(f"[BackgroundScheduler] '{name}' 시작 실패: {e}", exc_info=True)
55 self._pm.log_timer("BackgroundScheduler.start_all", t_start)
57 async def shutdown(self) -> None:
58 """등록된 모든 태스크를 정상 종료한다."""
59 t_start = self._pm.start_timer()
60 self._logger.info(f"[BackgroundScheduler] 전체 종료: {len(self._tasks)}개 태스크")
61 for name, task in self._tasks.items():
62 if task.state not in (TaskState.IDLE, TaskState.STOPPED):
63 try:
64 await task.stop()
65 self._logger.info(f"[BackgroundScheduler] '{name}' 종료 완료")
66 except Exception as e:
67 self._logger.error(f"[BackgroundScheduler] '{name}' 종료 실패: {e}", exc_info=True)
68 self._pm.log_timer("BackgroundScheduler.shutdown", t_start)
70 async def suspend_all(self) -> None:
71 """실행 중인 모든 태스크를 일시 중지한다."""
72 self._logger.info("[BackgroundScheduler] 전체 일시 중지")
73 for name, task in self._tasks.items():
74 if task.state == TaskState.RUNNING:
75 try:
76 await task.suspend()
77 except Exception as e:
78 self._logger.error(f"[BackgroundScheduler] '{name}' 일시 중지 실패: {e}")
80 async def resume_all(self) -> None:
81 """일시 중지된 모든 태스크를 재개한다."""
82 self._logger.info("[BackgroundScheduler] 전체 재개")
83 for name, task in self._tasks.items():
84 if task.state == TaskState.SUSPENDED:
85 try:
86 await task.resume()
87 except Exception as e:
88 self._logger.error(f"[BackgroundScheduler] '{name}' 재개 실패: {e}")
90 def get_task(self, name: str) -> Optional[SchedulableTask]:
91 """이름으로 태스크를 조회한다."""
92 return self._tasks.get(name)
94 def get_all_status(self) -> List[dict]:
95 """모든 등록된 태스크의 상태를 반환한다."""
96 return [
97 {
98 "name": task.task_name,
99 "state": task.state.value,
100 "priority": int(task.priority),
101 }
102 for task in self._tasks.values()
103 ]