Coverage for task / background / after_market / after_market_task_base.py: 95%
67 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# task/background/after_market/after_market_task_base.py
2"""
3AfterMarketTask — 장 마감 후 실행되는 배치 태스크의 공통 기반 클래스.
5모든 after_market 태스크가 공유하는 보일러플레이트를 단일 위치에서 관리한다.
7공통 제공
8---------
9- ``_state``, ``_tasks`` 필드 초기화
10- ``state`` / ``priority`` property
11- ``stop()`` — asyncio.Task 취소 및 정리
12- ``suspend()`` / ``resume()`` — 기본(상태 전환만). 청크 중단이 필요한 서브클래스는 재정의.
13- ``_after_market_scheduler()`` — ``run_after_market_loop`` 연결
15서브클래스 필수 구현
16--------------------
17- ``task_name`` property
18- ``_scheduler_label`` property — run_after_market_loop 레이블 (로그 식별자)
19- ``_on_market_closed(latest_trading_date: str)`` — 장 마감 콜백
20- ``start()`` — 태스크 시작 (초기 1회 실행 + 스케줄러 등록)
21"""
22from __future__ import annotations
24import asyncio
25import logging
26import os
27from abc import ABC, abstractmethod
28from typing import Dict, List, Optional, TYPE_CHECKING
30import yaml
32from interfaces.schedulable_task import SchedulableTask, TaskPriority, TaskState
33from scheduler.after_market_loop import run_after_market_loop
35if TYPE_CHECKING: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 from core.market_clock import MarketClock
37 from services.market_calendar_service import MarketCalendarService
39from pydantic import BaseModel, Field
41class AfterMarketTasksConfig(BaseModel):
42 after_market_delay_sec: Dict[str, int] = Field(default_factory=dict)
44class TaskConfigModel(BaseModel):
45 after_market_tasks: AfterMarketTasksConfig = Field(default_factory=AfterMarketTasksConfig)
47_TASK_CONFIG_PATH = os.path.join(
48 os.path.dirname(os.path.abspath(__file__)),
49 "..", "..", "..", "config", "task_config.yaml",
50)
51_DEFAULT_DELAYS: Dict[str, int] = {}
53def _load_after_market_delays() -> Dict[str, int]:
54 """task_config.yaml 에서 after_market_delay_sec 매핑을 로드한다."""
55 global _DEFAULT_DELAYS
56 if _DEFAULT_DELAYS:
57 return _DEFAULT_DELAYS
58 try:
59 with open(_TASK_CONFIG_PATH, encoding="utf-8") as f:
60 raw = yaml.safe_load(f) or {}
62 # Pydantic 모델을 통한 안전한 파싱, 타입 캐스팅 및 기본값 할당
63 config = TaskConfigModel(**raw)
64 _DEFAULT_DELAYS = {k: v * 60 for k, v in config.after_market_tasks.after_market_delay_sec.items()}
65 except Exception:
66 _DEFAULT_DELAYS = {}
67 return _DEFAULT_DELAYS
70class AfterMarketTask(SchedulableTask, ABC):
71 """장 마감 후 주기적으로 실행되는 배치 태스크의 공통 기반 클래스."""
73 def __init__(
74 self,
75 mcs: Optional["MarketCalendarService"],
76 market_clock: Optional["MarketClock"],
77 logger: Optional[logging.Logger],
78 ) -> None:
79 self._mcs = mcs
80 self._market_clock = market_clock
81 self._logger = logger or logging.getLogger(self.__class__.__module__)
82 self._state: TaskState = TaskState.IDLE
83 self._tasks: List[asyncio.Task] = []
85 # ── SchedulableTask 공통 구현 ────────────────────────────────
87 @property
88 def state(self) -> TaskState:
89 return self._state
91 @property
92 def priority(self) -> TaskPriority:
93 return TaskPriority.LOW
95 async def stop(self) -> None:
96 self._logger.info(f"{self.task_name} 종료 시작: {len(self._tasks)}개 태스크")
97 for task in self._tasks:
98 if not task.done(): 98 ↛ 97line 98 didn't jump to line 97 because the condition on line 98 was always true
99 task.cancel()
100 if self._tasks:
101 await asyncio.gather(*self._tasks, return_exceptions=True)
102 self._tasks.clear()
103 self._state = TaskState.STOPPED
104 self._logger.info(f"{self.task_name} 종료 완료")
106 async def suspend(self) -> None:
107 """기본 구현: 상태만 전환. 청크 중단이 필요한 태스크는 재정의."""
108 if self._state == TaskState.RUNNING:
109 self._state = TaskState.SUSPENDED
111 async def resume(self) -> None:
112 """기본 구현: 상태만 전환. 청크 중단이 필요한 태스크는 재정의."""
113 if self._state == TaskState.SUSPENDED:
114 self._state = TaskState.RUNNING
116 # ── 장마감 후 스케줄러 ────────────────────────────────────────
118 @property
119 @abstractmethod
120 def _scheduler_label(self) -> str:
121 """run_after_market_loop 에 전달할 레이블 (로그 식별자)."""
123 async def _after_market_scheduler(self) -> None:
124 """장 마감 후 자동으로 작업을 스케줄링하는 루프."""
125 delay_sec = _load_after_market_delays().get(self.task_name, 0)
126 await run_after_market_loop(
127 mcs=self._mcs,
128 market_clock=self._market_clock,
129 logger=self._logger,
130 on_market_closed=self._on_market_closed,
131 label=self._scheduler_label,
132 delay_sec=delay_sec,
133 )
135 @abstractmethod
136 async def _on_market_closed(self, latest_trading_date: str) -> None:
137 """장 마감 후 콜백 — 서브클래스에서 구체적인 작업을 구현한다."""