Coverage for scheduler / background_scheduler.py: 91%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# scheduler/background_scheduler.py 

2""" 

3백그라운드 태스크 라이프사이클 관리 스케줄러. 

4SchedulableTask 인터페이스를 구현한 태스크들을 등록하고 

5start/stop/suspend/resume을 통합 관리한다. 

6""" 

7import logging 

8from typing import Dict, List, Optional 

9 

10from interfaces.schedulable_task import SchedulableTask, TaskState 

11from core.performance_profiler import PerformanceProfiler 

12 

13 

14class BackgroundScheduler: 

15 """SchedulableTask 기반 백그라운드 태스크 라이프사이클 관리자. 

16 

17 비즈니스 로직 없이 순수하게 태스크의 생명주기만 관리한다. 

18 """ 

19 

20 def __init__( 

21 self, 

22 logger=None, 

23 performance_profiler: Optional[PerformanceProfiler] = None, 

24 ): 

25 self._logger = logger or logging.getLogger(__name__) 

26 self._pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

27 self._tasks: Dict[str, SchedulableTask] = {} # name -> task 

28 

29 def register(self, task: SchedulableTask) -> None: 

30 """SchedulableTask를 등록한다.""" 

31 if task.task_name in self._tasks: 

32 self._logger.warning(f"[BackgroundScheduler] 태스크 '{task.task_name}' 이미 등록됨 — 덮어씁니다.") 

33 self._tasks[task.task_name] = task 

34 self._logger.info( 

35 f"[BackgroundScheduler] 태스크 등록: {task.task_name} (priority={task.priority})" 

36 ) 

37 

38 def unregister(self, task_name: str) -> None: 

39 """등록된 태스크를 제거한다.""" 

40 if task_name in self._tasks: 40 ↛ exitline 40 didn't return from function 'unregister' because the condition on line 40 was always true

41 del self._tasks[task_name] 

42 self._logger.info(f"[BackgroundScheduler] 태스크 제거: {task_name}") 

43 

44 async def start_all(self) -> None: 

45 """등록된 모든 태스크를 시작한다.""" 

46 t_start = self._pm.start_timer() 

47 self._logger.info(f"[BackgroundScheduler] 전체 시작: {len(self._tasks)}개 태스크") 

48 for name, task in self._tasks.items(): 

49 if task.state in (TaskState.IDLE, TaskState.STOPPED): 

50 try: 

51 await task.start() 

52 self._logger.info(f"[BackgroundScheduler] '{name}' 시작 완료") 

53 except Exception as e: 

54 self._logger.error(f"[BackgroundScheduler] '{name}' 시작 실패: {e}", exc_info=True) 

55 self._pm.log_timer("BackgroundScheduler.start_all", t_start) 

56 

57 async def shutdown(self) -> None: 

58 """등록된 모든 태스크를 정상 종료한다.""" 

59 t_start = self._pm.start_timer() 

60 self._logger.info(f"[BackgroundScheduler] 전체 종료: {len(self._tasks)}개 태스크") 

61 for name, task in self._tasks.items(): 

62 if task.state not in (TaskState.IDLE, TaskState.STOPPED): 

63 try: 

64 await task.stop() 

65 self._logger.info(f"[BackgroundScheduler] '{name}' 종료 완료") 

66 except Exception as e: 

67 self._logger.error(f"[BackgroundScheduler] '{name}' 종료 실패: {e}", exc_info=True) 

68 self._pm.log_timer("BackgroundScheduler.shutdown", t_start) 

69 

70 async def suspend_all(self) -> None: 

71 """실행 중인 모든 태스크를 일시 중지한다.""" 

72 self._logger.info("[BackgroundScheduler] 전체 일시 중지") 

73 for name, task in self._tasks.items(): 

74 if task.state == TaskState.RUNNING: 

75 try: 

76 await task.suspend() 

77 except Exception as e: 

78 self._logger.error(f"[BackgroundScheduler] '{name}' 일시 중지 실패: {e}") 

79 

80 async def resume_all(self) -> None: 

81 """일시 중지된 모든 태스크를 재개한다.""" 

82 self._logger.info("[BackgroundScheduler] 전체 재개") 

83 for name, task in self._tasks.items(): 

84 if task.state == TaskState.SUSPENDED: 

85 try: 

86 await task.resume() 

87 except Exception as e: 

88 self._logger.error(f"[BackgroundScheduler] '{name}' 재개 실패: {e}") 

89 

90 def get_task(self, name: str) -> Optional[SchedulableTask]: 

91 """이름으로 태스크를 조회한다.""" 

92 return self._tasks.get(name) 

93 

94 def get_all_status(self) -> List[dict]: 

95 """모든 등록된 태스크의 상태를 반환한다.""" 

96 return [ 

97 { 

98 "name": task.task_name, 

99 "state": task.state.value, 

100 "priority": int(task.priority), 

101 } 

102 for task in self._tasks.values() 

103 ]