Coverage for view / web / routes / system.py: 100%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1""" 

2시스템 상태 및 캐시 모니터링 API 엔드포인트. 

3""" 

4import asyncio 

5import time 

6from fastapi import APIRouter, HTTPException 

7from view.web.api_common import _get_ctx 

8import view.web.api_common as api_common 

9 

10router = APIRouter() 

11 

12# 태스크별 실행 스케줄 유형 

13# intraday: 장 중에만 의미있는 태스크 (장 마감 후 비활성) 

14# after_market: 장 마감 후 실행되는 배치 태스크 (장 중 비활성) 

15# always_on: 항상 동작해야 하는 실시간 태스크 

16_SCHEDULE_TYPES = { 

17 "websocket_watchdog": "intraday", 

18 "strategy_scheduler": "intraday", 

19 "ranking_refresh": "after_market", 

20 "daily_price_collector": "after_market", 

21 "ohlcv_update": "after_market", 

22 "전일기준주도주_생성": "after_market", 

23 "notification_queue_task": "always_on", 

24} 

25 

26_SCHEDULE_ORDER = { 

27 "always_on": 0, 

28 "intraday": 1, 

29 "after_market": 2, 

30 "unknown": 99, 

31} 

32 

33 

34@router.get("/cache/status") 

35async def get_cache_status(expand: bool = True): 

36 """메모리 캐시 상태 및 적중률 통계 반환""" 

37 ctx = _get_ctx() 

38 latest_trading_date = await ctx._mcs.get_latest_trading_date() if ctx._mcs else None 

39 # get_cache_stats()는 CPU 집약적 작업(대량 JSON 직렬화)이므로 스레드 풀에서 실행해 이벤트 루프 블로킹을 방지 

40 stats = await asyncio.to_thread( 

41 ctx.get_cache_stats, expand=expand, latest_trading_date=latest_trading_date 

42 ) or {} 

43 

44 if "items" not in stats: 

45 stats["items"] = [] 

46 

47 if stats.get("items"): 

48 for item in stats["items"]: 

49 code = item.get("code") 

50 if code: 

51 code_str = str(code).zfill(6) # 확실한 6자리 문자열로 변환 

52 name = ctx.stock_code_repository.get_name_by_code(code_str) 

53 item["name"] = name if name and name != code_str else code_str 

54 

55 return { 

56 "success": True, 

57 "data": stats 

58 } 

59 

60 

61@router.get("/debug/requests") 

62def get_active_requests(): 

63 """현재 서버에서 처리 중인 in-flight 요청 목록 반환 (hang 진단용). 

64 

65 ForegroundScheduler가 백그라운드 태스크 중단을 기다리거나, 

66 무거운 응답 직렬화로 이벤트 루프가 블로킹될 때 어떤 요청이 대기 중인지 확인한다. 

67 이 엔드포인트 자체는 Foreground 미들웨어를 우회하므로 hang 상태에서도 응답한다. 

68 """ 

69 try: 

70 ctx = _get_ctx() 

71 fg = getattr(ctx, "foreground_scheduler", None) 

72 except Exception: 

73 fg = None 

74 

75 now = time.monotonic() 

76 active = [ 

77 { 

78 "path": r["path"], 

79 "method": r["method"], 

80 "elapsed_sec": round(now - r["start"], 1), 

81 "query": r["query"], 

82 } 

83 for r in api_common._active_requests.values() 

84 ] 

85 active.sort(key=lambda x: x["elapsed_sec"], reverse=True) 

86 return { 

87 "success": True, 

88 "count": len(active), 

89 "foreground": { 

90 "active_count": fg.active_count if fg else 0, 

91 "is_blocking_background": fg.is_active if fg else False, 

92 }, 

93 "data": active, 

94 } 

95 

96 

97@router.get("/background/status") 

98def get_background_status(): 

99 """백그라운드 태스크 상태 및 진행률 반환""" 

100 ctx = _get_ctx() 

101 

102 fg = getattr(ctx, "foreground_scheduler", None) 

103 foreground_info = { 

104 "active_count": fg.active_count if fg else 0, 

105 "is_blocking_background": fg.is_active if fg else False, 

106 } 

107 

108 if not ctx.background_scheduler: 

109 return {"success": True, "foreground": foreground_info, "data": []} 

110 

111 result = [] 

112 for item in ctx.background_scheduler.get_all_status(): 

113 name = item["name"] 

114 task = ctx.background_scheduler.get_task(name) 

115 schedule_type = _SCHEDULE_TYPES.get(name, "unknown") 

116 

117 # --- 모듈(파일) 경로를 기반으로 스케줄 유형 동적 판별 --- 

118 schedule_type = "unknown" 

119 if task: 

120 module_name = task.__class__.__module__ # 예: "task.background.after_market.ranking_task" 

121 

122 # 특정 태스크명 우선 확인 (폴더 경로에 포함된 이름으로 인해 잘못 분류되는 것을 방지) 

123 if "strategy_scheduler" in module_name: 

124 schedule_type = "intraday" 

125 elif "after_market" in module_name: 

126 schedule_type = "after_market" 

127 elif "intraday" in module_name: 

128 schedule_type = "intraday" 

129 elif "always_on" in module_name: 

130 schedule_type = "always_on" 

131 

132 result.append({ 

133 "name": name, 

134 "state": item["state"], 

135 "priority": item["priority"], 

136 "schedule_type": schedule_type, 

137 "schedule_order": _SCHEDULE_ORDER.get(schedule_type, 99), 

138 "progress": task.get_progress() if task else None, 

139 }) 

140 

141 # 스케줄 유형(실시간 -> 장중 -> 장마감) 순서로 정렬 

142 result.sort(key=lambda x: x["schedule_order"]) 

143 return {"success": True, "foreground": foreground_info, "data": result} 

144 

145 

146# ── 장 마감 후 태스크 강제 수집 엔드포인트 ───────────────────────────── 

147 

148@router.post("/background/ranking/force-update") 

149async def force_ranking_update(): 

150 """skip 조건을 무시하고 투자자 랭킹을 강제 재수집한다.""" 

151 ctx = _get_ctx() 

152 task = getattr(ctx, "ranking_task", None) 

153 if not task: 

154 raise HTTPException(status_code=503, detail="RankingTask가 초기화되지 않았습니다") 

155 

156 progress = task.get_progress() 

157 if progress.get("running"): 

158 raise HTTPException(status_code=409, detail="이미 수집이 진행 중입니다") 

159 

160 asyncio.create_task(task.force_collect()) 

161 return {"success": True, "message": "투자자 랭킹 강제 수집이 시작되었습니다."} 

162 

163 

164@router.post("/background/daily-price/force-update") 

165async def force_daily_price_update(): 

166 """skip 조건을 무시하고 전 종목 현재가를 강제 재수집한다.""" 

167 ctx = _get_ctx() 

168 task = getattr(ctx, "daily_price_collector_task", None) 

169 if not task: 

170 raise HTTPException(status_code=503, detail="DailyPriceCollectorTask가 초기화되지 않았습니다") 

171 

172 progress = task.get_progress() 

173 if progress.get("running"): 

174 raise HTTPException(status_code=409, detail="이미 수집이 진행 중입니다") 

175 

176 asyncio.create_task(task.force_collect()) 

177 return {"success": True, "message": "현재가 강제 수집이 시작되었습니다."} 

178 

179 

180@router.get("/subscriptions/status") 

181def get_subscription_status(): 

182 """실시간 현재가 구독 현황 반환 (우선순위별 종목 + 마지막 수신 시각)""" 

183 ctx = _get_ctx() 

184 svc = getattr(ctx, "price_subscription_service", None) 

185 if not svc: 

186 return {"success": True, "data": None} 

187 

188 status = svc.get_status() 

189 

190 # 우선순위별 종목 목록 (active 여부 + 이름 + 마지막 수신 시각 부가) 

191 streaming_svc = getattr(ctx, "streaming_service", None) 

192 active_set = set(status.get("active_codes", [])) 

193 

194 def _enrich(codes: list) -> list: 

195 result = [] 

196 for code in codes: 

197 name = ctx.stock_code_repository.get_name_by_code(code) or code 

198 price_info = streaming_svc.get_cached_realtime_price(code) if streaming_svc else None 

199 received_at = None 

200 if isinstance(price_info, dict): 

201 received_at = price_info.get("received_at") 

202 result.append({ 

203 "code": code, 

204 "name": name, 

205 "active": code in active_set, 

206 "received_at": received_at, 

207 }) 

208 return result 

209 

210 by_priority = status.get("pending_by_priority", {}) 

211 return { 

212 "success": True, 

213 "data": { 

214 "active_count": status["active_count"], 

215 "max_subscriptions": status["max_subscriptions"], 

216 "pending_count": status["pending_count"], 

217 "HIGH": _enrich(by_priority.get("HIGH", [])), 

218 "MEDIUM": _enrich(by_priority.get("MEDIUM", [])), 

219 "LOW": _enrich(by_priority.get("LOW", [])), 

220 }, 

221 } 

222 

223 

224@router.post("/background/watchlist/force-update") 

225async def force_watchlist_update(): 

226 """skip 조건을 무시하고 전일 기준 우량주를 강제 재생성한다.""" 

227 ctx = _get_ctx() 

228 task = getattr(ctx, "premium_watchlist_generator_task", None) 

229 if not task: 

230 raise HTTPException(status_code=503, detail="PremiumWatchlistGeneratorTask가 초기화되지 않았습니다") 

231 

232 progress = task.get_progress() 

233 if progress.get("running"): 

234 raise HTTPException(status_code=409, detail="이미 생성이 진행 중입니다") 

235 

236 asyncio.create_task(task.force_generate()) 

237 return {"success": True, "message": "전일기준우량주 강제 생성이 시작되었습니다."} 

238 

239 

240@router.post("/background/cache-warmup/force-update") 

241async def force_cache_warmup(): 

242 """skip 조건을 무시하고 캐시 웜업을 강제 실행한다.""" 

243 ctx = _get_ctx() 

244 task = getattr(ctx, "cache_warmup_task", None) 

245 if not task: 

246 raise HTTPException(status_code=503, detail="CacheWarmupTask가 초기화되지 않았습니다") 

247 

248 progress = task.get_progress() 

249 if progress.get("running"): 

250 raise HTTPException(status_code=409, detail="이미 웜업이 진행 중입니다") 

251 

252 asyncio.create_task(task.force_warmup()) 

253 return {"success": True, "message": "캐시 웜업이 시작되었습니다."}