Coverage for task / background / after_market / after_market_task_base.py: 95%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/after_market/after_market_task_base.py 

2""" 

3AfterMarketTask — 장 마감 후 실행되는 배치 태스크의 공통 기반 클래스. 

4 

5모든 after_market 태스크가 공유하는 보일러플레이트를 단일 위치에서 관리한다. 

6 

7공통 제공 

8--------- 

9- ``_state``, ``_tasks`` 필드 초기화 

10- ``state`` / ``priority`` property 

11- ``stop()`` — asyncio.Task 취소 및 정리 

12- ``suspend()`` / ``resume()`` — 기본(상태 전환만). 청크 중단이 필요한 서브클래스는 재정의. 

13- ``_after_market_scheduler()`` — ``run_after_market_loop`` 연결 

14 

15서브클래스 필수 구현 

16-------------------- 

17- ``task_name`` property 

18- ``_scheduler_label`` property — run_after_market_loop 레이블 (로그 식별자) 

19- ``_on_market_closed(latest_trading_date: str)`` — 장 마감 콜백 

20- ``start()`` — 태스크 시작 (초기 1회 실행 + 스케줄러 등록) 

21""" 

22from __future__ import annotations 

23 

24import asyncio 

25import logging 

26import os 

27from abc import ABC, abstractmethod 

28from typing import Dict, List, Optional, TYPE_CHECKING 

29 

30import yaml 

31 

32from interfaces.schedulable_task import SchedulableTask, TaskPriority, TaskState 

33from scheduler.after_market_loop import run_after_market_loop 

34 

35if TYPE_CHECKING: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 from core.market_clock import MarketClock 

37 from services.market_calendar_service import MarketCalendarService 

38 

39from pydantic import BaseModel, Field 

40 

41class AfterMarketTasksConfig(BaseModel): 

42 after_market_delay_sec: Dict[str, int] = Field(default_factory=dict) 

43 

44class TaskConfigModel(BaseModel): 

45 after_market_tasks: AfterMarketTasksConfig = Field(default_factory=AfterMarketTasksConfig) 

46 

47_TASK_CONFIG_PATH = os.path.join( 

48 os.path.dirname(os.path.abspath(__file__)), 

49 "..", "..", "..", "config", "task_config.yaml", 

50) 

51_DEFAULT_DELAYS: Dict[str, int] = {} 

52 

53def _load_after_market_delays() -> Dict[str, int]: 

54 """task_config.yaml 에서 after_market_delay_sec 매핑을 로드한다.""" 

55 global _DEFAULT_DELAYS 

56 if _DEFAULT_DELAYS: 

57 return _DEFAULT_DELAYS 

58 try: 

59 with open(_TASK_CONFIG_PATH, encoding="utf-8") as f: 

60 raw = yaml.safe_load(f) or {} 

61 

62 # Pydantic 모델을 통한 안전한 파싱, 타입 캐스팅 및 기본값 할당 

63 config = TaskConfigModel(**raw) 

64 _DEFAULT_DELAYS = {k: v * 60 for k, v in config.after_market_tasks.after_market_delay_sec.items()} 

65 except Exception: 

66 _DEFAULT_DELAYS = {} 

67 return _DEFAULT_DELAYS 

68 

69 

70class AfterMarketTask(SchedulableTask, ABC): 

71 """장 마감 후 주기적으로 실행되는 배치 태스크의 공통 기반 클래스.""" 

72 

73 def __init__( 

74 self, 

75 mcs: Optional["MarketCalendarService"], 

76 market_clock: Optional["MarketClock"], 

77 logger: Optional[logging.Logger], 

78 ) -> None: 

79 self._mcs = mcs 

80 self._market_clock = market_clock 

81 self._logger = logger or logging.getLogger(self.__class__.__module__) 

82 self._state: TaskState = TaskState.IDLE 

83 self._tasks: List[asyncio.Task] = [] 

84 

85 # ── SchedulableTask 공통 구현 ──────────────────────────────── 

86 

87 @property 

88 def state(self) -> TaskState: 

89 return self._state 

90 

91 @property 

92 def priority(self) -> TaskPriority: 

93 return TaskPriority.LOW 

94 

95 async def stop(self) -> None: 

96 self._logger.info(f"{self.task_name} 종료 시작: {len(self._tasks)}개 태스크") 

97 for task in self._tasks: 

98 if not task.done(): 98 ↛ 97line 98 didn't jump to line 97 because the condition on line 98 was always true

99 task.cancel() 

100 if self._tasks: 

101 await asyncio.gather(*self._tasks, return_exceptions=True) 

102 self._tasks.clear() 

103 self._state = TaskState.STOPPED 

104 self._logger.info(f"{self.task_name} 종료 완료") 

105 

106 async def suspend(self) -> None: 

107 """기본 구현: 상태만 전환. 청크 중단이 필요한 태스크는 재정의.""" 

108 if self._state == TaskState.RUNNING: 

109 self._state = TaskState.SUSPENDED 

110 

111 async def resume(self) -> None: 

112 """기본 구현: 상태만 전환. 청크 중단이 필요한 태스크는 재정의.""" 

113 if self._state == TaskState.SUSPENDED: 

114 self._state = TaskState.RUNNING 

115 

116 # ── 장마감 후 스케줄러 ──────────────────────────────────────── 

117 

118 @property 

119 @abstractmethod 

120 def _scheduler_label(self) -> str: 

121 """run_after_market_loop 에 전달할 레이블 (로그 식별자).""" 

122 

123 async def _after_market_scheduler(self) -> None: 

124 """장 마감 후 자동으로 작업을 스케줄링하는 루프.""" 

125 delay_sec = _load_after_market_delays().get(self.task_name, 0) 

126 await run_after_market_loop( 

127 mcs=self._mcs, 

128 market_clock=self._market_clock, 

129 logger=self._logger, 

130 on_market_closed=self._on_market_closed, 

131 label=self._scheduler_label, 

132 delay_sec=delay_sec, 

133 ) 

134 

135 @abstractmethod 

136 async def _on_market_closed(self, latest_trading_date: str) -> None: 

137 """장 마감 후 콜백 — 서브클래스에서 구체적인 작업을 구현한다."""