Coverage for scheduler / after_market_loop.py: 96%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# scheduler/after_market_loop.py
2"""
3장 마감 후 작업을 자동 실행하는 공통 스케줄러 루프.
5RankingTask, DailyPriceCollectorTask 등 장 마감 후 1회 실행되는
6백그라운드 태스크가 공유하는 스케줄링 패턴을 모듈화한다.
8Usage::
10 await run_after_market_loop(
11 mcs=self._mcs,
12 market_clock=self._market_clock,
13 logger=self._logger,
14 on_market_closed=self._do_work, # async (latest_date: str) -> None
15 label="MyTask",
16 )
17"""
18import asyncio
19import logging
20from typing import Optional, Callable, Awaitable
22from core.market_clock import MarketClock
23from services.market_calendar_service import MarketCalendarService
26async def run_after_market_loop(
27 mcs: Optional[MarketCalendarService],
28 market_clock: Optional[MarketClock],
29 logger: Optional[logging.Logger],
30 on_market_closed: Callable[[str], Awaitable[None]],
31 label: str = "AfterMarketLoop",
32 delay_sec: int = 0,
33) -> None:
34 """장 마감 후 작업을 자동으로 반복 실행하는 루프.
36 Args:
37 mcs: 시장 개장/마감 판단용 MarketCalendar.
38 market_clock: 장 마감까지 남은 시간 계산용 MarketClock.
39 logger: 로깅용 Logger.
40 on_market_closed: 장 마감 후 호출할 콜백.
41 ``latest_trading_date`` (YYYYMMDD) 문자열을 받으며,
42 내부에서 이미 처리한 날짜인지 직접 판단한다.
43 label: 로그 메시지에 표시할 태스크 이름.
44 delay_sec: 장 마감 감지 후 콜백 실행 전까지의 Padding 시간(초).
45 여러 태스크의 실행 시점을 분산시킬 때 사용한다.
46 """
47 _log = logger or logging.getLogger(__name__)
48 _log.info(f"[{label}] 장마감 후 자동 스케줄러 시작 (delay={delay_sec}s)")
50 while True:
51 try:
52 # ── 1. 장 중이면 마감 시각까지 정확히 대기 ──
53 if mcs and await mcs.is_market_open_now():
54 wait_sec = (
55 market_clock.get_sleep_seconds_until_market_close()
56 if market_clock else 300
57 )
58 if wait_sec and wait_sec > 0: 58 ↛ 63line 58 didn't jump to line 63 because the condition on line 58 was always true
59 _log.info(
60 f"[{label}] 장 마감까지 {wait_sec:.0f}초 대기"
61 )
62 await asyncio.sleep(wait_sec + 60) # 마감 1분 뒤
63 continue
65 # ── 1b. 장 시작 전(09:00 이전)이면 마감 이후까지 대기 ──
66 # is_market_open_now()는 장 중(09:00~15:40)에만 True를 반환하므로,
67 # 09:00 이전과 15:40 이후를 구분하기 위해 별도로 확인한다.
68 # 장 중이 아닌데도 마감까지 1시간(3600초) 이상 남아있다면 = 장 시작 전.
69 if market_clock:
70 secs_until_close = market_clock.get_sleep_seconds_until_market_close()
71 if isinstance(secs_until_close, (int, float)) and secs_until_close > 3600:
72 _log.info(
73 f"[{label}] 장 시작 전 — 장 마감까지 {secs_until_close:.0f}초 대기 후 실행"
74 )
75 await asyncio.sleep(secs_until_close + 60)
76 continue
78 # ── 2. 장 마감 이후 — Padding 대기 후 콜백 실행 ──
79 if delay_sec > 0:
80 _log.info(f"[{label}] 장 마감 감지 — {delay_sec}초 Padding 대기 후 실행")
81 await asyncio.sleep(delay_sec)
83 latest_trading_date = (
84 await mcs.get_latest_trading_date() if mcs else None
85 )
86 if latest_trading_date:
87 await on_market_closed(latest_trading_date)
89 # ── 3. 스마트 대기: 다음 장 마감까지 ──
90 await _smart_sleep(market_clock, _log, label)
92 except asyncio.CancelledError:
93 _log.info(f"[{label}] 장마감 후 스케줄러 종료")
94 break
95 except Exception as e:
96 _log.error(f"[{label}] 스케줄러 오류: {e}", exc_info=True)
97 await asyncio.sleep(60)
100async def _smart_sleep(
101 market_clock: Optional[MarketClock],
102 logger: logging.Logger,
103 label: str,
104) -> None:
105 """다음 장 마감까지 스마트하게 대기한다.
107 - 아직 오늘 장 마감 전이면 → 마감+1분까지 정확히 대기
108 - 이미 장 마감 지났으면 → 12시간 대기 (다음날 장 마감 전 기상)
109 """
110 wait_sec = (
111 market_clock.get_sleep_seconds_until_market_close()
112 if market_clock else 0
113 )
114 if wait_sec > 0:
115 logger.info(
116 f"[{label}] 다음 장 마감까지 {wait_sec / 3600:.1f}시간 대기"
117 )
118 await asyncio.sleep(wait_sec + 60)
119 else:
120 logger.info(
121 f"[{label}] 오늘 작업 완료 또는 휴장. 12시간 대기"
122 )
123 await asyncio.sleep(12 * 3600)