Coverage for view / web / web_app_initializer.py: 84%
294 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1"""
2웹 애플리케이션용 서비스 초기화 모듈.
3TradingApp의 초기화 로직을 참고하여 서비스 레이어만 초기화한다.
4"""
5import asyncio
6from config.config_loader import load_configs
7from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv
8import os
9from brokers.broker_api_wrapper import BrokerAPIWrapper
10from services.stock_query_service import StockQueryService
11from services.streaming_service import StreamingService
12from services.order_execution_service import OrderExecutionService
13from repositories.virtual_trade_repository import VirtualTradeRepository
14from services.virtual_trade_service import VirtualTradeService
15from repositories.stock_code_repository import StockCodeRepository
16from services.indicator_service import IndicatorService
17from core.market_clock import MarketClock
18from core.logger import Logger, get_strategy_logger, get_streaming_logger
19from core.performance_profiler import PerformanceProfiler
20from scheduler.strategy_scheduler import StrategyScheduler, StrategySchedulerConfig
21from scheduler.background_scheduler import BackgroundScheduler
22from scheduler.foreground_scheduler import ForegroundScheduler
23from task.background.intraday.strategy_scheduler_task_adapter import StrategySchedulerTaskAdapter
24from task.background.intraday.websocket_watchdog_task import WebSocketWatchdogTask
25from task.background.after_market.ranking_task import RankingTask
26from task.background.after_market.daily_price_collector_task import DailyPriceCollectorTask
27from task.background.after_market.ohlcv_update_task import OhlcvUpdateTask
28from task.background.after_market.premium_watchlist_generator_task import PremiumWatchlistGeneratorTask
29from task.background.after_market.cache_warmup_task import CacheWarmupTask
30from task.background.always_on.notification_queue_task import NotificationQueueTask
31from services.naver_finance_scraper_service import NaverFinanceScraperService
32from strategies.volume_breakout_live_strategy import VolumeBreakoutLiveStrategy
33from strategies.program_buy_follow_strategy import ProgramBuyFollowStrategy
34from strategies.traditional_volume_breakout_strategy import TraditionalVolumeBreakoutStrategy
35from strategies.oneil_squeeze_breakout_strategy import OneilSqueezeBreakoutStrategy
36from strategies.oneil_pocket_pivot_strategy import OneilPocketPivotStrategy
37from strategies.high_tight_flag_strategy import HighTightFlagStrategy
38from strategies.first_pullback_strategy import FirstPullbackStrategy
39from services.oneil_universe_service import OneilUniverseService
40from repositories.stock_repository import StockRepository
41from services.program_trading_stream_service import ProgramTradingStreamService
42from services.market_data_service import MarketDataService
43from services.market_calendar_service import MarketCalendarService
44from services.notification_service import NotificationService, NotificationCategory
45from services.price_subscription_service import PriceSubscriptionService
46from services.telegram_notifier import TelegramNotifier, TelegramReporter
47from view.web import web_api # 임포트 확인
48from core.cache.cache_store import CacheStore
50class WebAppContext:
51 """웹 앱에서 사용할 서비스 컨텍스트."""
53 def __init__(self, app_context):
54 self.logger = Logger()
55 self.env = app_context.env if app_context else None
56 self.full_config = {} # [추가] 전체 설정을 담을 그릇
57 self.market_clock: MarketClock = None
58 self.broker: BrokerAPIWrapper = None
59 self.stock_query_service: StockQueryService = None
60 self.streaming_service: StreamingService = None
61 self.order_execution_service: OrderExecutionService = None
62 self.indicator_service: IndicatorService = None
63 self.virtual_repo = VirtualTradeRepository()
64 self.virtual_trade_service = VirtualTradeService(repository=self.virtual_repo, market_clock=self.market_clock)
65 self.virtual_trade_service.backfill_snapshots() # 과거 CSV 기반 스냅샷 역산
66 self.stock_code_repository = StockCodeRepository(logger=self.logger)
67 self.scheduler: StrategyScheduler = None
68 self.oneil_universe_service: OneilUniverseService = None
69 self.ranking_task: RankingTask = None
70 self.websocket_watchdog_task: WebSocketWatchdogTask = None
71 self.daily_price_collector_task: DailyPriceCollectorTask = None
72 self.ohlcv_update_task: OhlcvUpdateTask = None
73 self.premium_watchlist_generator_task: PremiumWatchlistGeneratorTask = None
74 self.cache_warmup_task: CacheWarmupTask = None
75 self.stock_repository: StockRepository = None
76 self.background_scheduler: BackgroundScheduler = None
77 self.foreground_scheduler: ForegroundScheduler = None
78 self._mcs: MarketCalendarService = None
79 self.notification_service: NotificationService = None
80 self.notification_queue_task: NotificationQueueTask = None
81 self.initialized = False
82 self.pm: PerformanceProfiler = None
84 # 프로그램매매 실시간 데이터 서비스
85 self.realtime_data_service = ProgramTradingStreamService(self.logger)
86 self.price_subscription_service: PriceSubscriptionService = None
88 # 실시간 스트리밍 전용 이벤트 로거 (logs/streaming/)
89 self.streaming_event_logger = get_streaming_logger()
91 web_api.set_ctx(self)
93 def load_config_and_env(self):
94 """설정 파일 로드 및 환경 초기화."""
95 config_data = load_configs()
96 self.full_config = config_data # 전체 설정 저장
98 # Pydantic 모델(AppConfig)을 dict로 변환
99 config_dict = config_data
100 if hasattr(config_data, "model_dump"):
101 config_dict = config_data.model_dump()
102 elif hasattr(config_data, "dict"): 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 config_dict = config_data.dict()
105 self.env = KoreaInvestApiEnv(config_dict, self.logger)
106 self.market_clock = MarketClock(
107 market_open_time=config_dict.get('market_open_time', "09:00"),
108 market_close_time=config_dict.get('market_close_time', "15:40"),
109 timezone=config_dict.get('market_timezone', "Asia/Seoul"),
110 logger=self.logger
111 )
112 self.notification_service = NotificationService(self.market_clock)
113 # ---------------------------------------------------------
114 # [추가] Telegram Notifier 초기화 및 핸들러 등록
115 telegram_token = config_dict.get("telegram_bot_token")
116 telegram_chat_id = config_dict.get("telegram_chat_id")
118 if telegram_token and telegram_chat_id:
119 # WebAppContext 인스턴스 변수로 유지하여 생명주기 관리
120 self.telegram_notifier = TelegramNotifier(
121 bot_token=telegram_token,
122 chat_id=telegram_chat_id,
123 )
124 self.notification_service.register_external_handler(
125 self.telegram_notifier.handle_event
126 )
127 self.logger.info("텔레그램 외부 알림 핸들러가 성공적으로 등록되었습니다.")
129 # [추가] Telegram Reporter 초기화 (RankingTask 주입용)
130 self.telegram_reporter = TelegramReporter(bot_token=telegram_token, chat_id=telegram_chat_id)
131 self.logger.info("텔레그램 리포터가 초기화되었습니다.")
132 else:
133 self.logger.info("텔레그램 설정이 누락되어 알림 핸들러를 등록하지 않습니다.")
134 # ---------------------------------------------------------
135 self.logger.info("웹 앱: 환경 설정 로드 완료.")
137 # [신규] MarketCalendarService 초기화
138 self._mcs = MarketCalendarService(self.market_clock, self.logger, performance_profiler=self.pm)
141 async def initialize_services(self, is_paper_trading: bool = True):
142 """서비스 레이어 초기화. TradingApp._complete_api_initialization() 참조."""
143 self.env.set_trading_mode(is_paper_trading)
144 token_acquired = await self.env.get_access_token()
145 if not token_acquired:
146 self.logger.critical("웹 앱: 토큰 발급 실패.")
147 return False
148 # 모의투자 모드에서도 실전 토큰 사전 발급 (조회 API는 항상 실전 인증 사용)
149 if is_paper_trading:
150 await self.env.get_real_access_token()
152 self.broker = BrokerAPIWrapper(
153 env=self.env, logger=self.logger, market_clock=self.market_clock,
154 market_calendar_service=self._mcs
155 )
157 # [수정] MarketCalendarService에 Broker 주입 (Fetcher 로직은 Manager 내부로 이동)
158 self._mcs.set_broker(self.broker)
160 # [신규] 휴장일 정보 동기화
161 # 이를 통해 get_next_market_open_time 등이 임시공휴일을 정확히 인지하게 됨
162 await self._mcs._sync_calendar_if_needed()
164 # 캐시 매니저 생성
165 # Pydantic 모델(AppConfig)을 dict로 변환하여 전달
166 config_dict = self.full_config
167 if hasattr(config_dict, "model_dump"):
168 config_dict = config_dict.model_dump()
169 elif hasattr(config_dict, "dict"): 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 config_dict = config_dict.dict()
172 perf_log = config_dict.get("performance_logging", False)
173 perf_threshold = config_dict.get("performance_threshold", 0.1)
174 # [변경] PerformanceProfiler 인스턴스 생성 및 주입 준비
175 self.pm = PerformanceProfiler(enabled=perf_log, threshold=perf_threshold)
177 cache_store = CacheStore(config_dict)
178 cache_store.set_logger(self.logger)
180 # Repository 초기화 (StockQueryService 주입을 위해 선 생성)
181 self.stock_repository = StockRepository(logger=self.logger)
183 self.market_data_service = MarketDataService(
184 broker_api_wrapper=self.broker, env=self.env, logger=self.logger, market_clock=self.market_clock, cache_store=cache_store,
185 market_calendar_service=self._mcs,
186 performance_profiler=self.pm,
187 stock_repository=self.stock_repository
188 )
190 # IndicatorService 초기화 (순환 참조 해결을 위해 먼저 생성 후 주입)
191 self.indicator_service = IndicatorService(cache_store=cache_store, performance_profiler=self.pm)
193 self.ranking_task = RankingTask(
194 broker_api_wrapper=self.broker,
195 stock_code_repository=self.stock_code_repository,
196 env=self.env,
197 logger=self.logger,
198 market_clock=self.market_clock,
199 performance_profiler=self.pm,
200 notification_service=self.notification_service,
201 telegram_reporter=getattr(self, 'telegram_reporter', None),
202 market_calendar_service=self._mcs,
203 market_data_service=self.market_data_service,
204 )
205 self.stock_query_service = StockQueryService(
206 market_data_service=self.market_data_service, logger=self.logger, market_clock=self.market_clock,
207 indicator_service=self.indicator_service,
208 ranking_task=self.ranking_task,
209 performance_profiler=self.pm,
210 notification_service=self.notification_service,
211 broker_api_wrapper=self.broker,
212 )
213 # IndicatorService에 StockQueryService 주입
214 self.indicator_service.stock_query_service = self.stock_query_service
215 # StreamingService 초기화
216 self.streaming_service = StreamingService(
217 broker_api_wrapper=self.broker,
218 logger=self.logger,
219 market_clock=self.market_clock,
220 market_data_service=self.market_data_service,
221 streaming_logger=self.streaming_event_logger,
222 )
223 # PriceSubscriptionService 초기화 (StreamingService 생성 이후)
224 self.price_subscription_service = PriceSubscriptionService(
225 streaming_service=self.streaming_service,
226 stock_repo=self.stock_repository,
227 logger=self.logger,
228 streaming_logger=self.streaming_event_logger,
229 )
231 # WebSocketWatchdogTask 초기화
232 self.websocket_watchdog_task = WebSocketWatchdogTask(
233 streaming_service=self.streaming_service,
234 realtime_data_service=self.realtime_data_service,
235 market_calendar_service=self._mcs,
236 performance_profiler=self.pm,
237 notification_service=self.notification_service,
238 logger=self.logger,
239 streaming_logger=self.streaming_event_logger,
240 )
242 self.daily_price_collector_task = DailyPriceCollectorTask(
243 stock_query_service=self.stock_query_service,
244 stock_code_repository=self.stock_code_repository,
245 stock_repo=self.stock_repository,
246 market_calendar_service=self._mcs,
247 market_clock=self.market_clock,
248 performance_profiler=self.pm,
249 notification_service=self.notification_service,
250 logger=self.logger,
251 )
253 self.ohlcv_update_task = OhlcvUpdateTask(
254 stock_query_service=self.stock_query_service,
255 stock_code_repository=self.stock_code_repository,
256 stock_repo=self.stock_repository,
257 market_calendar_service=self._mcs,
258 market_clock=self.market_clock,
259 performance_profiler=self.pm,
260 notification_service=self.notification_service,
261 logger=self.logger,
262 )
264 self.order_execution_service = OrderExecutionService(
265 broker_api_wrapper=self.broker,
266 logger=self.logger, market_clock=self.market_clock,
267 performance_profiler=self.pm,
268 notification_service=self.notification_service,
269 market_calendar_service=self._mcs,
270 price_subscription_service=self.price_subscription_service,
271 )
273 # [신규] 오닐 유니버스 서비스 초기화
274 self.oneil_universe_service = OneilUniverseService(
275 stock_query_service=self.stock_query_service,
276 indicator_service=self.indicator_service,
277 stock_code_repository=self.stock_code_repository,
278 market_clock=self.market_clock,
279 scraper_service=NaverFinanceScraperService(logger=self.logger),
280 logger=self.logger,
281 performance_profiler=self.pm,
282 price_subscription_service=self.price_subscription_service,
283 )
285 self.premium_watchlist_generator_task = PremiumWatchlistGeneratorTask(
286 universe_service=self.oneil_universe_service,
287 market_calendar_service=self._mcs,
288 market_clock=self.market_clock,
289 notification_service=self.notification_service,
290 logger=self.logger,
291 )
293 self.cache_warmup_task = CacheWarmupTask(
294 market_data_service=self.market_data_service,
295 stock_query_service=self.stock_query_service,
296 universe_service=self.oneil_universe_service,
297 market_calendar_service=self._mcs,
298 market_clock=self.market_clock,
299 notification_service=self.notification_service,
300 logger=self.logger,
301 )
303 # NotificationQueueTask 초기화
304 self.notification_queue_task = NotificationQueueTask(
305 notification_service=self.notification_service,
306 poll_interval=config_dict.get("notification_queue_poll_interval", 1.0),
307 logger=self.logger,
308 )
310 # BackgroundScheduler 초기화 및 태스크 등록
311 self.background_scheduler = BackgroundScheduler(
312 logger=self.logger,
313 performance_profiler=self.pm,
314 )
315 if self.ranking_task: 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was always true
316 self.background_scheduler.register(self.ranking_task)
317 if self.websocket_watchdog_task: 317 ↛ 319line 317 didn't jump to line 319 because the condition on line 317 was always true
318 self.background_scheduler.register(self.websocket_watchdog_task)
319 if self.daily_price_collector_task: 319 ↛ 321line 319 didn't jump to line 321 because the condition on line 319 was always true
320 self.background_scheduler.register(self.daily_price_collector_task)
321 if self.ohlcv_update_task: 321 ↛ 323line 321 didn't jump to line 323 because the condition on line 321 was always true
322 self.background_scheduler.register(self.ohlcv_update_task)
323 if self.premium_watchlist_generator_task: 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was always true
324 self.background_scheduler.register(self.premium_watchlist_generator_task)
325 if self.cache_warmup_task: 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was always true
326 self.background_scheduler.register(self.cache_warmup_task)
327 if self.notification_queue_task: 327 ↛ 331line 327 didn't jump to line 331 because the condition on line 327 was always true
328 self.background_scheduler.register(self.notification_queue_task)
330 # ForegroundScheduler 초기화
331 self.foreground_scheduler = ForegroundScheduler(
332 background_scheduler=self.background_scheduler,
333 logger=self.logger,
334 performance_profiler=self.pm,
335 )
337 # 기동 시 포트폴리오/프리미엄 종목 구독 초기화
338 asyncio.create_task(self._initialize_price_subscriptions())
340 self.initialized = True
341 mode = "모의투자" if is_paper_trading else "실전투자"
342 self.logger.info(f"웹 앱: 서비스 초기화 완료 ({mode})")
343 return True
345 async def _initialize_price_subscriptions(self) -> None:
346 """기동 시 포트폴리오(HIGH) 및 프리미엄 종목(MEDIUM) 구독을 초기화."""
347 if not self.price_subscription_service: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 return
350 from services.price_subscription_service import SubscriptionPriority
352 # 1. 보유 종목 → HIGH 구독
353 try:
354 resp = await self.broker.get_account_balance()
355 if resp and resp.rt_cd == "0" and resp.data: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 holdings = resp.data.get("output2", []) if isinstance(resp.data, dict) else []
357 for item in holdings:
358 code = item.get("pdno", "").strip()
359 if code:
360 await self.price_subscription_service.add_subscription(
361 code, SubscriptionPriority.HIGH, "portfolio"
362 )
363 except Exception as e:
364 self.logger.warning(f"보유 종목 구독 초기화 실패: {e}")
366 # 2. 프리미엄 종목 → MEDIUM 구독
367 try:
368 import json, os
369 premium_path = os.path.join(os.path.dirname(__file__), "..", "..", "data", "premium_stocks.json")
370 if os.path.exists(premium_path): 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 with open(premium_path, "r", encoding="utf-8") as f:
372 data = json.load(f)
373 codes = data.get("kospi", []) + data.get("kosdaq", [])
374 if codes:
375 await self.price_subscription_service.sync_subscriptions(
376 codes=codes,
377 category_key="strategy_premium",
378 priority=SubscriptionPriority.MEDIUM,
379 )
380 except Exception as e:
381 self.logger.warning(f"프리미엄 종목 구독 초기화 실패: {e}")
383 def get_env_type(self) -> str:
384 if self.env is None:
385 return "미설정"
386 return "모의투자" if self.env.is_paper_trading else "실전투자"
388 async def is_market_open_now(self) -> bool:
389 if self._mcs is None:
390 return False
391 return await self._mcs.is_market_open_now() if self._mcs else False
393 def get_current_time_str(self) -> str:
394 if self.market_clock is None:
395 return ""
396 return self.market_clock.get_current_kst_time().strftime('%Y-%m-%d %H:%M:%S')
398 def get_cache_stats(self, expand: bool = False, latest_trading_date: str = None) -> dict:
399 """메모리 캐시 통계를 반환합니다."""
400 if self.stock_repository:
401 return self.stock_repository.get_cache_stats(expand=expand, latest_trading_date=latest_trading_date)
402 return {}
404 # --- 전략 스케줄러 ---
406 def initialize_scheduler(self):
407 """전략 스케줄러 생성 및 전략 등록 (자동 시작하지 않음, 웹 UI에서 수동 시작)."""
408 self.scheduler = StrategyScheduler(
409 virtual_trade_service=self.virtual_trade_service,
410 order_execution_service=self.order_execution_service,
411 stock_query_service=self.stock_query_service,
412 stock_code_repository=self.stock_code_repository,
413 market_clock=self.market_clock,
414 market_calendar_service=self._mcs,
415 logger=get_strategy_logger('StrategyScheduler'),
416 dry_run=False,
417 notification_service=self.notification_service,
418 performance_profiler=self.pm,
419 )
421 # 거래량 돌파 전략 등록
422 vb_strategy = VolumeBreakoutLiveStrategy(
423 stock_query_service=self.stock_query_service,
424 market_clock=self.market_clock,
425 logger=get_strategy_logger('VolumeBreakoutLive'),
426 )
427 self.scheduler.register(StrategySchedulerConfig(
428 strategy=vb_strategy,
429 interval_minutes=5,
430 max_positions=3,
431 order_qty=1,
432 enabled=False,
433 force_exit_on_close=True, # 👈 단타 전략이므로 장 마감 전 강제 청산
434 allow_pyramiding=False, # 👈 단타 전략이므로 불타기 금지
435 ))
437 # 프로그램 매수 추종 전략 등록
438 pbf_strategy = ProgramBuyFollowStrategy(
439 stock_query_service=self.stock_query_service,
440 market_clock=self.market_clock,
441 logger=get_strategy_logger('ProgramBuyFollow'),
442 )
443 self.scheduler.register(StrategySchedulerConfig(
444 strategy=pbf_strategy,
445 interval_minutes=10,
446 max_positions=3,
447 order_qty=1,
448 enabled=False,
449 force_exit_on_close=True, # 👈 단타 전략이므로 장 마감 전 강제 청산
450 allow_pyramiding=False, # 👈 단타 전략이므로 불타기 금지
451 ))
453 # 전통적 거래량 돌파 전략 등록
454 tvb_strategy = TraditionalVolumeBreakoutStrategy(
455 stock_query_service=self.stock_query_service,
456 stock_code_repository=self.stock_code_repository,
457 market_clock=self.market_clock,
458 logger=get_strategy_logger('TraditionalVolumeBreakout'),
459 )
460 self.scheduler.register(StrategySchedulerConfig(
461 strategy=tvb_strategy,
462 interval_minutes=1,
463 max_positions=5,
464 order_qty=1,
465 enabled=False,
466 force_exit_on_close=True, # 👈 단타 전략이므로 장 마감 전 강제 청산
467 allow_pyramiding=False, # 👈 단타 전략이므로 불타기 금지
468 ))
470 # 오닐 스퀴즈 돌파 전략 등록
471 osb_strategy = OneilSqueezeBreakoutStrategy(
472 stock_query_service=self.stock_query_service,
473 universe_service=self.oneil_universe_service,
474 market_clock=self.market_clock,
475 logger=get_strategy_logger('OneilSqueezeBreakout'),
476 )
477 self.scheduler.register(StrategySchedulerConfig(
478 strategy=osb_strategy,
479 interval_minutes=3,
480 max_positions=5,
481 order_qty=1,
482 enabled=False,
483 force_exit_on_close=False, # 👈 오닐 전략은 오버나잇(홀딩) 허용!
484 allow_pyramiding=True, # 👈 오버나잇 전략이므로 불타기 허용
485 ))
487 self.osb_strategy = osb_strategy # (웹 API 하위 호환성 유지용)
488 self.oneil_universe_service_ref = self.oneil_universe_service
490 # 오닐 포켓 피봇 & BGU 전략 등록
491 pp_strategy = OneilPocketPivotStrategy(
492 stock_query_service=self.stock_query_service,
493 universe_service=self.oneil_universe_service,
494 market_clock=self.market_clock,
495 logger=get_strategy_logger('OneilPocketPivot'),
496 )
497 self.scheduler.register(StrategySchedulerConfig(
498 strategy=pp_strategy,
499 interval_minutes=3,
500 max_positions=5,
501 order_qty=1,
502 enabled=False,
503 force_exit_on_close=False, # 7주 홀딩 허용
504 allow_pyramiding=True, # 👈 오버나잇 전략이므로 불타기 허용
505 ))
507 # 하이 타이트 플래그 전략 등록
508 htf_strategy = HighTightFlagStrategy(
509 stock_query_service=self.stock_query_service,
510 universe_service=self.oneil_universe_service,
511 market_clock=self.market_clock,
512 logger=get_strategy_logger('HighTightFlag'),
513 )
514 self.scheduler.register(StrategySchedulerConfig(
515 strategy=htf_strategy,
516 interval_minutes=3,
517 max_positions=5,
518 order_qty=1,
519 enabled=False,
520 force_exit_on_close=False, # HTF는 오버나잇 홀딩 허용
521 ))
523 # 첫 눌림목(Holy Grail) 전략 등록
524 fp_strategy = FirstPullbackStrategy(
525 stock_query_service=self.stock_query_service,
526 universe_service=self.oneil_universe_service,
527 market_clock=self.market_clock,
528 logger=get_strategy_logger('FirstPullback'),
529 )
530 self.scheduler.register(StrategySchedulerConfig(
531 strategy=fp_strategy,
532 interval_minutes=3,
533 max_positions=5,
534 order_qty=1,
535 enabled=False,
536 force_exit_on_close=False, # 스윙 전략: 오버나잇 허용
537 allow_pyramiding=False,
538 ))
539 self.logger.info("웹 앱: 전략 스케줄러 초기화 완료 (수동 시작 대기)")
541 # StrategyScheduler를 BackgroundScheduler에 어댑터로 등록
542 if self.background_scheduler and self.scheduler:
543 adapter = StrategySchedulerTaskAdapter(self.scheduler)
544 self.background_scheduler.register(adapter)
546 def start_background_tasks(self):
547 """백그라운드 태스크 시작 — BackgroundScheduler에 위임."""
548 # StreamingService에 콜백 등록 (내부 저장 → 재연결 시에도 자동 유지됨)
549 if self.streaming_service: 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true
550 self.streaming_service._callback = self._web_realtime_callback
552 if self.background_scheduler: 552 ↛ exitline 552 didn't return from function 'start_background_tasks' because the condition on line 552 was always true
553 asyncio.create_task(self.background_scheduler.start_all())
555 async def shutdown(self):
556 """서비스 종료 처리 — BackgroundScheduler에 위임."""
557 if self.background_scheduler: 557 ↛ 559line 557 didn't jump to line 559 because the condition on line 557 was always true
558 await self.background_scheduler.shutdown()
559 if self.broker: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 await self.broker.stop()
561 self.logger.info("웹 앱: 서비스 종료 완료")
563 # --- 프로그램매매 실시간 스트리밍 ---
565 def _web_realtime_callback(self, data):
566 """웹소켓 실시간 콜백: 기존 핸들러 + 웹 SSE 전달."""
567 if self.streaming_service: 567 ↛ 569line 567 didn't jump to line 569 because the condition on line 567 was always true
568 self.streaming_service.dispatch_realtime_message(data)
569 if data.get('type') == 'realtime_program_trading':
570 item = data.get('data', {})
571 # [추가] 현재가 정보 주입
572 if self.streaming_service: 572 ↛ 585line 572 didn't jump to line 585 because the condition on line 572 was always true
573 code = item.get('유가증권단축종목코드')
574 price_data = self.streaming_service.get_cached_realtime_price(code)
575 if price_data: 575 ↛ 585line 575 didn't jump to line 585 because the condition on line 575 was always true
576 if isinstance(price_data, dict): 576 ↛ 582line 576 didn't jump to line 582 because the condition on line 576 was always true
577 item['price'] = price_data.get('price')
578 item['change'] = price_data.get('change')
579 item['rate'] = price_data.get('rate')
580 item['sign'] = price_data.get('sign')
581 else:
582 item['price'] = price_data
584 # [변경] 매니저에게 데이터 처리 위임
585 self.realtime_data_service.on_data_received(item)
587 async def start_program_trading(self, code: str) -> bool:
588 """프로그램매매 구독 시작 (웹소켓 연결 + 구독). 이미 구독 중이면 스킵."""
589 # [변경] 매니저를 통해 구독 상태 확인
590 if self.realtime_data_service.is_subscribed(code):
591 # [추가] 구독 상태이지만 수신 태스크가 죽었으면 강제 재연결
592 if (self.broker
593 and not self.broker.is_websocket_receive_alive()):
594 self.logger.warning(f"[프로그램매매] {code} 구독 상태이나 수신 태스크 종료됨. 재연결 시도.")
595 await self.websocket_watchdog_task.force_reconnect_program_trading()
597 # 재연결 과정에서 실패하여 구독 목록에서 제거되었는지 확인
598 if not self.realtime_data_service.is_subscribed(code):
599 self.logger.info(f"[프로그램매매] {code} 재연결 실패로 구독 해제됨. 신규 구독 재시도.")
600 else:
601 return True
602 else:
603 return True
605 try:
606 t_start = self.pm.start_timer()
607 connected = await self.streaming_service.connect_websocket(self._web_realtime_callback)
608 self.pm.log_timer(f"connect_websocket({code})", t_start)
609 if not connected: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true
610 self.logger.warning(f"프로그램매매 구독 실패 (WebSocket 연결 불가): {code}")
611 return False
613 t_sub_pt = self.pm.start_timer()
614 sub_pt_success = await self.streaming_service.subscribe_program_trading(code)
615 self.pm.log_timer(f"subscribe_program_trading({code})", t_sub_pt)
617 t_sub_price = self.pm.start_timer()
618 sub_price_success = await self.streaming_service.subscribe_realtime_price(code)
619 self.pm.log_timer(f"subscribe_realtime_price({code})", t_sub_price)
621 if sub_pt_success and sub_price_success:
622 self.realtime_data_service.add_subscribed_code(code)
623 self.logger.info(f"프로그램매매 신규 구독 성공: {code}")
624 return True
625 else:
626 # 하나라도 실패하면, 성공했을 수 있는 다른 구독을 해지하여 상태를 정리한다.
627 self.logger.warning(f"프로그램매매 구독 실패 (pt: {sub_pt_success}, price: {sub_price_success}) - {code}")
628 if sub_pt_success: 628 ↛ 630line 628 didn't jump to line 630 because the condition on line 628 was always true
629 await self.streaming_service.unsubscribe_program_trading(code)
630 if sub_price_success: 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true
631 await self.streaming_service.unsubscribe_realtime_price(code)
632 return False
634 except Exception as e:
635 self.logger.error(f"프로그램매매 구독 중 예외 발생 ({code}): {e}", exc_info=True)
636 return False
638 async def stop_program_trading(self, code: str):
639 """특정 종목 프로그램매매 구독 해지."""
640 if self.realtime_data_service.is_subscribed(code): 640 ↛ exitline 640 didn't return from function 'stop_program_trading' because the condition on line 640 was always true
641 await self.streaming_service.unsubscribe_program_trading(code)
642 await self.streaming_service.unsubscribe_realtime_price(code)
643 self.realtime_data_service.remove_subscribed_code(code)
645 async def stop_all_program_trading(self):
646 """모든 프로그램매매 구독 해지."""
647 for code in self.realtime_data_service.get_subscribed_codes():
648 await self.streaming_service.unsubscribe_program_trading(code)
649 await self.streaming_service.unsubscribe_realtime_price(code)
650 self.realtime_data_service.clear_subscribed_codes()