Coverage for view / web / routes / system.py: 100%
125 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1"""
2시스템 상태 및 캐시 모니터링 API 엔드포인트.
3"""
4import asyncio
5import time
6from fastapi import APIRouter, HTTPException
7from view.web.api_common import _get_ctx
8import view.web.api_common as api_common
10router = APIRouter()
12# 태스크별 실행 스케줄 유형
13# intraday: 장 중에만 의미있는 태스크 (장 마감 후 비활성)
14# after_market: 장 마감 후 실행되는 배치 태스크 (장 중 비활성)
15# always_on: 항상 동작해야 하는 실시간 태스크
16_SCHEDULE_TYPES = {
17 "websocket_watchdog": "intraday",
18 "strategy_scheduler": "intraday",
19 "ranking_refresh": "after_market",
20 "daily_price_collector": "after_market",
21 "ohlcv_update": "after_market",
22 "전일기준주도주_생성": "after_market",
23 "notification_queue_task": "always_on",
24}
26_SCHEDULE_ORDER = {
27 "always_on": 0,
28 "intraday": 1,
29 "after_market": 2,
30 "unknown": 99,
31}
34@router.get("/cache/status")
35async def get_cache_status(expand: bool = True):
36 """메모리 캐시 상태 및 적중률 통계 반환"""
37 ctx = _get_ctx()
38 latest_trading_date = await ctx._mcs.get_latest_trading_date() if ctx._mcs else None
39 # get_cache_stats()는 CPU 집약적 작업(대량 JSON 직렬화)이므로 스레드 풀에서 실행해 이벤트 루프 블로킹을 방지
40 stats = await asyncio.to_thread(
41 ctx.get_cache_stats, expand=expand, latest_trading_date=latest_trading_date
42 ) or {}
44 if "items" not in stats:
45 stats["items"] = []
47 if stats.get("items"):
48 for item in stats["items"]:
49 code = item.get("code")
50 if code:
51 code_str = str(code).zfill(6) # 확실한 6자리 문자열로 변환
52 name = ctx.stock_code_repository.get_name_by_code(code_str)
53 item["name"] = name if name and name != code_str else code_str
55 return {
56 "success": True,
57 "data": stats
58 }
61@router.get("/debug/requests")
62def get_active_requests():
63 """현재 서버에서 처리 중인 in-flight 요청 목록 반환 (hang 진단용).
65 ForegroundScheduler가 백그라운드 태스크 중단을 기다리거나,
66 무거운 응답 직렬화로 이벤트 루프가 블로킹될 때 어떤 요청이 대기 중인지 확인한다.
67 이 엔드포인트 자체는 Foreground 미들웨어를 우회하므로 hang 상태에서도 응답한다.
68 """
69 try:
70 ctx = _get_ctx()
71 fg = getattr(ctx, "foreground_scheduler", None)
72 except Exception:
73 fg = None
75 now = time.monotonic()
76 active = [
77 {
78 "path": r["path"],
79 "method": r["method"],
80 "elapsed_sec": round(now - r["start"], 1),
81 "query": r["query"],
82 }
83 for r in api_common._active_requests.values()
84 ]
85 active.sort(key=lambda x: x["elapsed_sec"], reverse=True)
86 return {
87 "success": True,
88 "count": len(active),
89 "foreground": {
90 "active_count": fg.active_count if fg else 0,
91 "is_blocking_background": fg.is_active if fg else False,
92 },
93 "data": active,
94 }
97@router.get("/background/status")
98def get_background_status():
99 """백그라운드 태스크 상태 및 진행률 반환"""
100 ctx = _get_ctx()
102 fg = getattr(ctx, "foreground_scheduler", None)
103 foreground_info = {
104 "active_count": fg.active_count if fg else 0,
105 "is_blocking_background": fg.is_active if fg else False,
106 }
108 if not ctx.background_scheduler:
109 return {"success": True, "foreground": foreground_info, "data": []}
111 result = []
112 for item in ctx.background_scheduler.get_all_status():
113 name = item["name"]
114 task = ctx.background_scheduler.get_task(name)
115 schedule_type = _SCHEDULE_TYPES.get(name, "unknown")
117 # --- 모듈(파일) 경로를 기반으로 스케줄 유형 동적 판별 ---
118 schedule_type = "unknown"
119 if task:
120 module_name = task.__class__.__module__ # 예: "task.background.after_market.ranking_task"
122 # 특정 태스크명 우선 확인 (폴더 경로에 포함된 이름으로 인해 잘못 분류되는 것을 방지)
123 if "strategy_scheduler" in module_name:
124 schedule_type = "intraday"
125 elif "after_market" in module_name:
126 schedule_type = "after_market"
127 elif "intraday" in module_name:
128 schedule_type = "intraday"
129 elif "always_on" in module_name:
130 schedule_type = "always_on"
132 result.append({
133 "name": name,
134 "state": item["state"],
135 "priority": item["priority"],
136 "schedule_type": schedule_type,
137 "schedule_order": _SCHEDULE_ORDER.get(schedule_type, 99),
138 "progress": task.get_progress() if task else None,
139 })
141 # 스케줄 유형(실시간 -> 장중 -> 장마감) 순서로 정렬
142 result.sort(key=lambda x: x["schedule_order"])
143 return {"success": True, "foreground": foreground_info, "data": result}
146# ── 장 마감 후 태스크 강제 수집 엔드포인트 ─────────────────────────────
148@router.post("/background/ranking/force-update")
149async def force_ranking_update():
150 """skip 조건을 무시하고 투자자 랭킹을 강제 재수집한다."""
151 ctx = _get_ctx()
152 task = getattr(ctx, "ranking_task", None)
153 if not task:
154 raise HTTPException(status_code=503, detail="RankingTask가 초기화되지 않았습니다")
156 progress = task.get_progress()
157 if progress.get("running"):
158 raise HTTPException(status_code=409, detail="이미 수집이 진행 중입니다")
160 asyncio.create_task(task.force_collect())
161 return {"success": True, "message": "투자자 랭킹 강제 수집이 시작되었습니다."}
164@router.post("/background/daily-price/force-update")
165async def force_daily_price_update():
166 """skip 조건을 무시하고 전 종목 현재가를 강제 재수집한다."""
167 ctx = _get_ctx()
168 task = getattr(ctx, "daily_price_collector_task", None)
169 if not task:
170 raise HTTPException(status_code=503, detail="DailyPriceCollectorTask가 초기화되지 않았습니다")
172 progress = task.get_progress()
173 if progress.get("running"):
174 raise HTTPException(status_code=409, detail="이미 수집이 진행 중입니다")
176 asyncio.create_task(task.force_collect())
177 return {"success": True, "message": "현재가 강제 수집이 시작되었습니다."}
180@router.get("/subscriptions/status")
181def get_subscription_status():
182 """실시간 현재가 구독 현황 반환 (우선순위별 종목 + 마지막 수신 시각)"""
183 ctx = _get_ctx()
184 svc = getattr(ctx, "price_subscription_service", None)
185 if not svc:
186 return {"success": True, "data": None}
188 status = svc.get_status()
190 # 우선순위별 종목 목록 (active 여부 + 이름 + 마지막 수신 시각 부가)
191 streaming_svc = getattr(ctx, "streaming_service", None)
192 active_set = set(status.get("active_codes", []))
194 def _enrich(codes: list) -> list:
195 result = []
196 for code in codes:
197 name = ctx.stock_code_repository.get_name_by_code(code) or code
198 price_info = streaming_svc.get_cached_realtime_price(code) if streaming_svc else None
199 received_at = None
200 if isinstance(price_info, dict):
201 received_at = price_info.get("received_at")
202 result.append({
203 "code": code,
204 "name": name,
205 "active": code in active_set,
206 "received_at": received_at,
207 })
208 return result
210 by_priority = status.get("pending_by_priority", {})
211 return {
212 "success": True,
213 "data": {
214 "active_count": status["active_count"],
215 "max_subscriptions": status["max_subscriptions"],
216 "pending_count": status["pending_count"],
217 "HIGH": _enrich(by_priority.get("HIGH", [])),
218 "MEDIUM": _enrich(by_priority.get("MEDIUM", [])),
219 "LOW": _enrich(by_priority.get("LOW", [])),
220 },
221 }
224@router.post("/background/watchlist/force-update")
225async def force_watchlist_update():
226 """skip 조건을 무시하고 전일 기준 우량주를 강제 재생성한다."""
227 ctx = _get_ctx()
228 task = getattr(ctx, "premium_watchlist_generator_task", None)
229 if not task:
230 raise HTTPException(status_code=503, detail="PremiumWatchlistGeneratorTask가 초기화되지 않았습니다")
232 progress = task.get_progress()
233 if progress.get("running"):
234 raise HTTPException(status_code=409, detail="이미 생성이 진행 중입니다")
236 asyncio.create_task(task.force_generate())
237 return {"success": True, "message": "전일기준우량주 강제 생성이 시작되었습니다."}
240@router.post("/background/cache-warmup/force-update")
241async def force_cache_warmup():
242 """skip 조건을 무시하고 캐시 웜업을 강제 실행한다."""
243 ctx = _get_ctx()
244 task = getattr(ctx, "cache_warmup_task", None)
245 if not task:
246 raise HTTPException(status_code=503, detail="CacheWarmupTask가 초기화되지 않았습니다")
248 progress = task.get_progress()
249 if progress.get("running"):
250 raise HTTPException(status_code=409, detail="이미 웜업이 진행 중입니다")
252 asyncio.create_task(task.force_warmup())
253 return {"success": True, "message": "캐시 웜업이 시작되었습니다."}