Coverage for view / web / routes / program.py: 90%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1""" 

2프로그램매매 실시간 스트리밍 관련 API 엔드포인트 (program.html). 

3""" 

4import asyncio 

5import json 

6from fastapi import APIRouter, HTTPException, Request, WebSocket, WebSocketDisconnect 

7from fastapi.responses import StreamingResponse 

8from view.web.api_common import ( 

9 _get_ctx, _serialize_response, 

10 ProgramTradingRequest, ProgramTradingUnsubscribeRequest, ProgramTradingDataModel, 

11) 

12 

13router = APIRouter() 

14 

15 

16@router.post("/program-trading/subscribe") 

17async def subscribe_program_trading(req: ProgramTradingRequest): 

18 """프로그램매매 실시간 구독 시작 (다중 종목 추가 구독).""" 

19 ctx = _get_ctx() 

20 async with ctx.pm.profile_async(f"subscribe({req.code})"): 

21 success = await ctx.start_program_trading(req.code) 

22 if not success: 

23 raise HTTPException(status_code=500, detail="WebSocket 연결 실패") 

24 mapper = getattr(ctx, 'stock_code_repository', None) 

25 stock_name = mapper.get_name_by_code(req.code) if mapper else '' 

26 # [변경] 매니저 사용 

27 return {"success": True, "code": req.code, "stock_name": stock_name, "codes": ctx.realtime_data_service.get_subscribed_codes()} 

28 

29 

30@router.get("/program-trading/history/{code}") 

31async def get_program_trading_history(code: str): 

32 """프로그램 매매 추이 히스토리 조회 (차트용).""" 

33 ctx = _get_ctx() 

34 async with ctx.pm.profile_async(f"get_program_trading_history({code})"): 

35 t_start = ctx.pm.start_timer() 

36 resp = await ctx.streaming_service.handle_get_program_trading_history(code) 

37 result = _serialize_response(resp) 

38 

39 if result.get("rt_cd") == "0" and isinstance(result.get("data"), dict): 39 ↛ 42line 39 didn't jump to line 42 because the condition on line 39 was always true

40 mapper = getattr(ctx, 'stock_code_repository', None) 

41 result["data"]["name"] = mapper.get_name_by_code(code) if mapper else "" 

42 ctx.pm.log_timer(f"get_program_trading_history({code})", t_start) 

43 return result 

44 

45 

46@router.post("/program-trading/unsubscribe") 

47async def unsubscribe_program_trading(req: ProgramTradingUnsubscribeRequest = None): 

48 """프로그램매매 구독 해지. code 지정 시 개별 해지, 미지정 시 전체 해지.""" 

49 ctx = _get_ctx() 

50 if req and req.code: 

51 await ctx.stop_program_trading(req.code) 

52 else: 

53 await ctx.stop_all_program_trading() 

54 # [변경] 매니저 사용 

55 return {"success": True, "codes": ctx.realtime_data_service.get_subscribed_codes()} 

56 

57 

58@router.get("/program-trading/status") 

59async def get_program_trading_status(): 

60 """프로그램매매 구독 상태 확인 (시장 개장 여부 포함).""" 

61 ctx = _get_ctx() 

62 codes = ctx.realtime_data_service.get_subscribed_codes() 

63 is_market_open = await ctx.is_market_open_now() 

64 return { 

65 "subscribed": len(codes) > 0, 

66 "codes": codes, 

67 "is_market_open": is_market_open, 

68 } 

69 

70 

71@router.get("/program-trading/stream") 

72async def stream_program_trading(request: Request): 

73 """SSE 스트리밍: 프로그램매매 실시간 데이터를 브라우저에 전달 (Array 배열 전송 최적화 적용).""" 

74 ctx = _get_ctx() 

75 # 매니저를 통해 큐 생성 및 등록 

76 queue = ctx.realtime_data_service.create_subscriber_queue() 

77 

78 async def event_generator(): 

79 try: 

80 # 1. 저장된 과거 데이터 먼저 전송 (Replay) 

81 history = ctx.realtime_data_service.get_history_data() 

82 for code, items in list(history.items()): 82 ↛ 83line 82 didn't jump to line 83 because the loop on line 82 never started

83 for item in list(items): 

84 # 과거 데이터도 클라이언트가 해석할 수 있도록 Array로 변환하여 전송 

85 # 배열 순서: [종목코드, 체결시간, 현재가, 등락률, 대비, 부호, 매도체결, 매수체결, 순매수체결, 순매수대금, 매도잔량, 매수잔량] 

86 payload = [ 

87 code, 

88 item.get('주식체결시간', ''), 

89 item.get('price', 0), 

90 item.get('rate', 0), 

91 item.get('change', 0), 

92 item.get('sign', ''), 

93 item.get('매도체결량', 0), 

94 item.get('매수2체결량', 0), 

95 item.get('순매수체결량', 0), 

96 item.get('순매수거래대금', 0), 

97 item.get('매도호가잔량', 0), 

98 item.get('매수호가잔량', 0) 

99 ] 

100 yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" 

101 await asyncio.sleep(0.0001) 

102 

103 # 2. 실시간 데이터 전송 (realtime_data_service에서 JSON 직렬화 후 큐에 삽입됨) 

104 while True: 

105 try: 

106 data = await asyncio.wait_for(queue.get(), timeout=15.0) 

107 if data is None: # 테스트 종료 신호 (Poison Pill) 

108 break 

109 yield f"data: {data}\n\n" 

110 except asyncio.TimeoutError: 

111 if await request.is_disconnected(): 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 break 

113 yield ": keepalive\n\n" 

114 except asyncio.CancelledError: 

115 pass 

116 finally: 

117 # 연결 종료 시 매니저를 통해 큐 제거 

118 ctx.realtime_data_service.remove_subscriber_queue(queue) 

119 

120 return StreamingResponse(event_generator(), media_type="text/event-stream") 

121 

122 

123@router.post("/program-trading/save-data") 

124async def save_pt_data(data: ProgramTradingDataModel): 

125 """프로그램 매매 데이터를 서버 파일(data/program_subscribe/pt_data.json)에 저장""" 

126 try: 

127 # [변경] 매니저를 통해 스냅샷 저장 

128 ctx = _get_ctx() 

129 t_start = ctx.pm.start_timer() 

130 ctx.realtime_data_service.save_snapshot(data.model_dump()) 

131 ctx.pm.log_timer("save_pt_data", t_start) 

132 return {"success": True} 

133 except Exception as e: 

134 print(f"[WebAPI] PT Data Save Error: {e}") 

135 return {"success": False, "msg": str(e)} 

136 

137 

138@router.get("/program-trading/load-data") 

139async def load_pt_data(): 

140 """서버 파일에서 프로그램 매매 데이터 로드""" 

141 # [변경] 매니저를 통해 스냅샷 로드 

142 ctx = _get_ctx() 

143 t_start = ctx.pm.start_timer() 

144 data = ctx.realtime_data_service.load_snapshot() 

145 ctx.pm.log_timer("load_pt_data", t_start) 

146 

147 if data is None: 

148 return {"success": False, "msg": "File not found"} 

149 return {"success": True, "data": data} 

150 

151 

152@router.get("/program-trading/db-status") 

153async def get_db_status(): 

154 """DB 내부 상태(스냅샷 시간, 히스토리 건수 등) 조회.""" 

155 ctx = _get_ctx() 

156 return ctx.realtime_data_service.inspect_db_status() 

157 

158 

159@router.websocket("/ws/echo") 

160async def websocket_endpoint(websocket: WebSocket): 

161 """WebSocket 테스트용 에코 엔드포인트.""" 

162 await websocket.accept() 

163 try: 

164 while True: 

165 data = await websocket.receive_text() 

166 await websocket.send_text(f"Message text was: {data}") 

167 except WebSocketDisconnect: 

168 pass