Coverage for scheduler / after_market_loop.py: 96%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# scheduler/after_market_loop.py 

2""" 

3장 마감 후 작업을 자동 실행하는 공통 스케줄러 루프. 

4 

5RankingTask, DailyPriceCollectorTask 등 장 마감 후 1회 실행되는 

6백그라운드 태스크가 공유하는 스케줄링 패턴을 모듈화한다. 

7 

8Usage:: 

9 

10 await run_after_market_loop( 

11 mcs=self._mcs, 

12 market_clock=self._market_clock, 

13 logger=self._logger, 

14 on_market_closed=self._do_work, # async (latest_date: str) -> None 

15 label="MyTask", 

16 ) 

17""" 

18import asyncio 

19import logging 

20from typing import Optional, Callable, Awaitable 

21 

22from core.market_clock import MarketClock 

23from services.market_calendar_service import MarketCalendarService 

24 

25 

26async def run_after_market_loop( 

27 mcs: Optional[MarketCalendarService], 

28 market_clock: Optional[MarketClock], 

29 logger: Optional[logging.Logger], 

30 on_market_closed: Callable[[str], Awaitable[None]], 

31 label: str = "AfterMarketLoop", 

32 delay_sec: int = 0, 

33) -> None: 

34 """장 마감 후 작업을 자동으로 반복 실행하는 루프. 

35 

36 Args: 

37 mcs: 시장 개장/마감 판단용 MarketCalendar. 

38 market_clock: 장 마감까지 남은 시간 계산용 MarketClock. 

39 logger: 로깅용 Logger. 

40 on_market_closed: 장 마감 후 호출할 콜백. 

41 ``latest_trading_date`` (YYYYMMDD) 문자열을 받으며, 

42 내부에서 이미 처리한 날짜인지 직접 판단한다. 

43 label: 로그 메시지에 표시할 태스크 이름. 

44 delay_sec: 장 마감 감지 후 콜백 실행 전까지의 Padding 시간(초). 

45 여러 태스크의 실행 시점을 분산시킬 때 사용한다. 

46 """ 

47 _log = logger or logging.getLogger(__name__) 

48 _log.info(f"[{label}] 장마감 후 자동 스케줄러 시작 (delay={delay_sec}s)") 

49 

50 while True: 

51 try: 

52 # ── 1. 장 중이면 마감 시각까지 정확히 대기 ── 

53 if mcs and await mcs.is_market_open_now(): 

54 wait_sec = ( 

55 market_clock.get_sleep_seconds_until_market_close() 

56 if market_clock else 300 

57 ) 

58 if wait_sec and wait_sec > 0: 58 ↛ 63line 58 didn't jump to line 63 because the condition on line 58 was always true

59 _log.info( 

60 f"[{label}] 장 마감까지 {wait_sec:.0f}초 대기" 

61 ) 

62 await asyncio.sleep(wait_sec + 60) # 마감 1분 뒤 

63 continue 

64 

65 # ── 1b. 장 시작 전(09:00 이전)이면 마감 이후까지 대기 ── 

66 # is_market_open_now()는 장 중(09:00~15:40)에만 True를 반환하므로, 

67 # 09:00 이전과 15:40 이후를 구분하기 위해 별도로 확인한다. 

68 # 장 중이 아닌데도 마감까지 1시간(3600초) 이상 남아있다면 = 장 시작 전. 

69 if market_clock: 

70 secs_until_close = market_clock.get_sleep_seconds_until_market_close() 

71 if isinstance(secs_until_close, (int, float)) and secs_until_close > 3600: 

72 _log.info( 

73 f"[{label}] 장 시작 전 — 장 마감까지 {secs_until_close:.0f}초 대기 후 실행" 

74 ) 

75 await asyncio.sleep(secs_until_close + 60) 

76 continue 

77 

78 # ── 2. 장 마감 이후 — Padding 대기 후 콜백 실행 ── 

79 if delay_sec > 0: 

80 _log.info(f"[{label}] 장 마감 감지 — {delay_sec}초 Padding 대기 후 실행") 

81 await asyncio.sleep(delay_sec) 

82 

83 latest_trading_date = ( 

84 await mcs.get_latest_trading_date() if mcs else None 

85 ) 

86 if latest_trading_date: 

87 await on_market_closed(latest_trading_date) 

88 

89 # ── 3. 스마트 대기: 다음 장 마감까지 ── 

90 await _smart_sleep(market_clock, _log, label) 

91 

92 except asyncio.CancelledError: 

93 _log.info(f"[{label}] 장마감 후 스케줄러 종료") 

94 break 

95 except Exception as e: 

96 _log.error(f"[{label}] 스케줄러 오류: {e}", exc_info=True) 

97 await asyncio.sleep(60) 

98 

99 

100async def _smart_sleep( 

101 market_clock: Optional[MarketClock], 

102 logger: logging.Logger, 

103 label: str, 

104) -> None: 

105 """다음 장 마감까지 스마트하게 대기한다. 

106 

107 - 아직 오늘 장 마감 전이면 → 마감+1분까지 정확히 대기 

108 - 이미 장 마감 지났으면 → 12시간 대기 (다음날 장 마감 전 기상) 

109 """ 

110 wait_sec = ( 

111 market_clock.get_sleep_seconds_until_market_close() 

112 if market_clock else 0 

113 ) 

114 if wait_sec > 0: 

115 logger.info( 

116 f"[{label}] 다음 장 마감까지 {wait_sec / 3600:.1f}시간 대기" 

117 ) 

118 await asyncio.sleep(wait_sec + 60) 

119 else: 

120 logger.info( 

121 f"[{label}] 오늘 작업 완료 또는 휴장. 12시간 대기" 

122 ) 

123 await asyncio.sleep(12 * 3600)