Coverage for task / background / after_market / premium_watchlist_generator_task.py: 96%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/after_market/premium_watchlist_generator_task.py 

2""" 

3전일 기준 우량주 생성 백그라운드 태스크. 

4 

5장 마감 후 자동으로 OneilUniverseService.generate_premium_watchlist()를 실행하여 

6오닐 전략 전일 기준 우량주 풀을 갱신한다. 

7""" 

8import asyncio 

9import logging 

10import time 

11from typing import Dict, Optional, TYPE_CHECKING 

12 

13from task.background.after_market.after_market_task_base import AfterMarketTask 

14from interfaces.schedulable_task import TaskState 

15from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

16 

17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true

18 from services.oneil_universe_service import OneilUniverseService 

19 from services.market_calendar_service import MarketCalendarService 

20 from core.market_clock import MarketClock 

21 

22 

23class PremiumWatchlistGeneratorTask(AfterMarketTask): 

24 """장 마감 후 전일 기준 우량주 풀을 자동 생성하는 백그라운드 태스크.""" 

25 

26 def __init__( 

27 self, 

28 universe_service: "OneilUniverseService", 

29 market_calendar_service: Optional["MarketCalendarService"] = None, 

30 market_clock: Optional["MarketClock"] = None, 

31 logger=None, 

32 notification_service: Optional["NotificationService"] = None, 

33 ): 

34 super().__init__( 

35 mcs=market_calendar_service, 

36 market_clock=market_clock, 

37 logger=logger or logging.getLogger(__name__), 

38 ) 

39 self._universe_service = universe_service 

40 self._ns = notification_service 

41 

42 self._is_generating: bool = False 

43 self._last_generated_date: Optional[str] = None 

44 self._progress: Dict = { 

45 "running": False, 

46 "last_generated_date": None, 

47 "last_result": None, 

48 } 

49 

50 # ── SchedulableTask 인터페이스 구현 ──────────────────────── 

51 

52 @property 

53 def task_name(self) -> str: 

54 return "전일기준주도주_생성" 

55 

56 @property 

57 def _scheduler_label(self) -> str: 

58 return "전일기준우량주생성" 

59 

60 async def start(self) -> None: 

61 if self._state == TaskState.RUNNING: 

62 return 

63 self._state = TaskState.RUNNING 

64 self._tasks.append(asyncio.create_task(self._after_market_scheduler())) 

65 self._logger.info("PremiumWatchlistGeneratorTask 시작") 

66 

67 def get_progress(self) -> Dict: 

68 result = dict(self._progress) 

69 gen = self._universe_service.generation_progress 

70 result.update({ 

71 "phase": gen.get("phase"), 

72 "processed": gen.get("processed", 0), 

73 "total": gen.get("total", 0), 

74 "passed": gen.get("passed", 0), 

75 "selected": gen.get("selected", 0), 

76 "elapsed": gen.get("elapsed", 0.0), 

77 }) 

78 return result 

79 

80 async def _on_market_closed(self, latest_trading_date: str) -> None: 

81 """장 마감 후 콜백: 해당 거래일의 생성이 필요하면 실행. 

82 

83 인메모리 기록과 파일 메타데이터를 모두 확인하여 

84 이미 생성된 경우 재실행을 생략한다 (서버 재시작 후에도 유효). 

85 """ 

86 if self._last_generated_date == latest_trading_date: 

87 if self._ns: 

88 await self._ns.emit( 

89 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전일기준우량주 생성 스킵", 

90 f"{latest_trading_date} 이미 생성 완료된 상태입니다." 

91 ) 

92 return 

93 

94 # 파일에 이미 당일 기준 우량주가 저장되어 있으면 재생성 불필요 

95 meta = self._universe_service.get_premium_stocks_meta() 

96 if meta and meta.get("generated_date") == latest_trading_date: 

97 self._last_generated_date = latest_trading_date 

98 self._progress["last_generated_date"] = latest_trading_date 

99 self._logger.info( 

100 f"전일 기준 우량주 이미 생성됨 (기준일: {latest_trading_date}, " 

101 f"생성시각: {meta.get('generated_at', '알 수 없음')}) — 생성 스킵" 

102 ) 

103 if self._ns: 

104 await self._ns.emit( 

105 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전일기준우량주 생성 스킵", 

106 f"{latest_trading_date} 이미 생성 완료된 상태입니다." 

107 ) 

108 return 

109 

110 await self._run_generation(latest_trading_date) 

111 

112 async def _run_generation(self, trading_date: str) -> None: 

113 if self._is_generating: 

114 self._logger.info("전일 기준 우량주 생성 이미 진행 중 — 스킵") 

115 return 

116 

117 self._is_generating = True 

118 self._progress["running"] = True 

119 start_time = time.time() 

120 self._logger.info(f"전일 기준 우량주 생성 시작 (기준일: {trading_date})") 

121 

122 try: 

123 result = await self._universe_service.generate_premium_watchlist(trading_date=trading_date) 

124 elapsed = time.time() - start_time 

125 self._last_generated_date = trading_date 

126 self._progress["last_generated_date"] = trading_date 

127 self._progress["last_result"] = result 

128 self._logger.info( 

129 f"전일 기준 우량주 생성 완료: " 

130 f"KOSPI {result.get('kospi_count')}종목, " 

131 f"KOSDAQ {result.get('kosdaq_count')}종목, " 

132 f"소요: {elapsed:.1f}초" 

133 ) 

134 if self._ns: 

135 await self._ns.emit( 

136 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "전일기준우량주 생성 완료", 

137 f"KOSPI {result.get('kospi_count')}개, KOSDAQ {result.get('kosdaq_count')}개 종목 수집 완료 (소요: {elapsed:.1f}초)" 

138 ) 

139 except Exception as e: 

140 self._logger.error(f"전일 기준 우량주 생성 실패: {e}", exc_info=True) 

141 if self._ns: 

142 await self._ns.emit(NotificationCategory.BACKGROUND, NotificationLevel.ERROR, "전일기준우량주 생성 실패", str(e)) 

143 finally: 

144 self._is_generating = False 

145 self._progress["running"] = False 

146 

147 async def force_generate(self) -> None: 

148 """강제 생성: skip 조건을 무시하고 전일 기준 우량주를 재생성한다.""" 

149 self._logger.info("PremiumWatchlistGeneratorTask 강제 생성 요청") 

150 target_date = None 

151 if self._mcs: 

152 target_date = await self._mcs.get_latest_trading_date() 

153 if not target_date: 

154 self._logger.error("최근 거래일을 확인할 수 없어 강제 생성을 중단합니다.") 

155 return 

156 await self._run_generation(target_date)