Coverage for view / web / routes / scheduler.py: 92%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1""" 

2전략 스케줄러 제어 API 엔드포인트 (scheduler.html). 

3""" 

4import asyncio 

5from fastapi import APIRouter, HTTPException, Request 

6from fastapi.responses import StreamingResponse 

7from pydantic import BaseModel 

8from view.web.api_common import _get_ctx 

9 

10router = APIRouter() 

11 

12class UpdateMaxPositionsRequest(BaseModel): 

13 max_positions: int 

14 

15 

16@router.get("/scheduler/status") 

17async def get_scheduler_status(): 

18 """스케줄러 상태 조회.""" 

19 ctx = _get_ctx() 

20 if not ctx.scheduler: 

21 return {"running": False, "strategies": []} 

22 

23 status = ctx.scheduler.get_status() 

24 

25 # [BugFix] 보유 종목명 보정 

26 mapper = getattr(ctx, 'stock_code_repository', None) 

27 if mapper: 27 ↛ 36line 27 didn't jump to line 36 because the condition on line 27 was always true

28 for s in status.get("strategies", []): 

29 for hold in s.get("holdings", []): 

30 code = str(hold.get("code", "")) 

31 if code: 31 ↛ 29line 31 didn't jump to line 29 because the condition on line 31 was always true

32 real_name = mapper.get_name_by_code(code) 

33 if real_name: 33 ↛ 29line 33 didn't jump to line 29 because the condition on line 33 was always true

34 hold["name"] = real_name 

35 

36 return status 

37 

38 

39@router.post("/scheduler/start") 

40async def start_scheduler(): 

41 """스케줄러 시작 (상태 저장 — 재시작 시 자동 복원).""" 

42 ctx = _get_ctx() 

43 if not ctx.scheduler: 

44 raise HTTPException(status_code=503, detail="스케줄러가 초기화되지 않았습니다") 

45 await ctx.scheduler.start() 

46 ctx.scheduler._save_scheduler_state() 

47 return {"success": True, "status": ctx.scheduler.get_status()} 

48 

49 

50@router.post("/scheduler/stop") 

51async def stop_scheduler(): 

52 """스케줄러 정지 (수동 정지 — 재시작 시 자동 실행 안 함).""" 

53 ctx = _get_ctx() 

54 if not ctx.scheduler: 

55 raise HTTPException(status_code=503, detail="스케줄러가 초기화되지 않았습니다") 

56 await ctx.scheduler.stop(save_state=False) 

57 ctx.scheduler.clear_saved_state() 

58 return {"success": True, "status": ctx.scheduler.get_status()} 

59 

60 

61@router.post("/scheduler/strategy/{name:path}/start") 

62async def start_strategy(name: str): 

63 """개별 전략 활성화 (상태 저장 — 재시작 시 자동 복원).""" 

64 ctx = _get_ctx() 

65 if not ctx.scheduler: 

66 raise HTTPException(status_code=503, detail="스케줄러가 초기화되지 않았습니다") 

67 if not await ctx.scheduler.start_strategy(name): 

68 raise HTTPException(status_code=404, detail=f"전략 '{name}'을 찾을 수 없습니다") 

69 ctx.scheduler._save_scheduler_state() 

70 return {"success": True, "status": ctx.scheduler.get_status()} 

71 

72 

73@router.post("/scheduler/strategy/{name:path}/stop") 

74async def stop_strategy(name: str): 

75 """개별 전략 비활성화 (상태 저장 — 재시작 시 반영).""" 

76 ctx = _get_ctx() 

77 if not ctx.scheduler: 

78 raise HTTPException(status_code=503, detail="스케줄러가 초기화되지 않았습니다") 

79 if not await ctx.scheduler.stop_strategy(name): 

80 raise HTTPException(status_code=404, detail=f"전략 '{name}'을 찾을 수 없습니다") 

81 ctx.scheduler._save_scheduler_state() 

82 return {"success": True, "status": ctx.scheduler.get_status()} 

83 

84 

85@router.post("/scheduler/strategy/{name:path}/max-positions") 

86async def update_strategy_max_positions(name: str, req: UpdateMaxPositionsRequest): 

87 """개별 전략의 최대 포지션 수 동적 변경.""" 

88 ctx = _get_ctx() 

89 if not ctx.scheduler: 

90 raise HTTPException(status_code=503, detail="스케줄러가 초기화되지 않았습니다") 

91 

92 success = await ctx.scheduler.update_max_positions(name, req.max_positions) 

93 if not success: 

94 raise HTTPException(status_code=400, detail="최대 포지션 수 변경 실패 (1 이상이어야 함)") 

95 return {"success": True, "status": ctx.scheduler.get_status()} 

96 

97@router.get("/scheduler/history") 

98async def get_scheduler_history(strategy: str = None): 

99 """스케줄러 시그널 실행 이력 조회. ?strategy=전략명 으로 필터 가능.""" 

100 ctx = _get_ctx() 

101 t_start = ctx.pm.start_timer() 

102 if not ctx.scheduler: 

103 return {"history": []} 

104 

105 history = ctx.scheduler.get_signal_history(strategy) 

106 

107 ctx.pm.log_timer("get_scheduler_history", t_start) 

108 return {"history": history} 

109 

110@router.get("/scheduler/stream") 

111async def stream_scheduler_signals(request: Request): 

112 """SSE 스트리밍: 스케줄러 시그널 실행 이력을 실시간으로 브라우저에 전달.""" 

113 ctx = _get_ctx() 

114 if not ctx.scheduler: 

115 return StreamingResponse( 

116 iter([": no scheduler\n\n"]), media_type="text/event-stream" 

117 ) 

118 

119 queue = ctx.scheduler.create_subscriber_queue() 

120 

121 async def event_generator(): 

122 try: 

123 while True: 

124 try: 

125 data = await asyncio.wait_for(queue.get(), timeout=15) 

126 if data is None: 

127 break 

128 yield f"data: {data}\n\n" 

129 except asyncio.TimeoutError: 

130 if await request.is_disconnected(): 

131 break 

132 yield ": keepalive\n\n" 

133 except asyncio.CancelledError: 

134 pass 

135 finally: 

136 ctx.scheduler.remove_subscriber_queue(queue) 

137 

138 return StreamingResponse(event_generator(), media_type="text/event-stream") 

139 

140