Coverage for view / web / routes / program.py: 90%
100 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1"""
2프로그램매매 실시간 스트리밍 관련 API 엔드포인트 (program.html).
3"""
4import asyncio
5import json
6from fastapi import APIRouter, HTTPException, Request, WebSocket, WebSocketDisconnect
7from fastapi.responses import StreamingResponse
8from view.web.api_common import (
9 _get_ctx, _serialize_response,
10 ProgramTradingRequest, ProgramTradingUnsubscribeRequest, ProgramTradingDataModel,
11)
13router = APIRouter()
16@router.post("/program-trading/subscribe")
17async def subscribe_program_trading(req: ProgramTradingRequest):
18 """프로그램매매 실시간 구독 시작 (다중 종목 추가 구독)."""
19 ctx = _get_ctx()
20 async with ctx.pm.profile_async(f"subscribe({req.code})"):
21 success = await ctx.start_program_trading(req.code)
22 if not success:
23 raise HTTPException(status_code=500, detail="WebSocket 연결 실패")
24 mapper = getattr(ctx, 'stock_code_repository', None)
25 stock_name = mapper.get_name_by_code(req.code) if mapper else ''
26 # [변경] 매니저 사용
27 return {"success": True, "code": req.code, "stock_name": stock_name, "codes": ctx.realtime_data_service.get_subscribed_codes()}
30@router.get("/program-trading/history/{code}")
31async def get_program_trading_history(code: str):
32 """프로그램 매매 추이 히스토리 조회 (차트용)."""
33 ctx = _get_ctx()
34 async with ctx.pm.profile_async(f"get_program_trading_history({code})"):
35 t_start = ctx.pm.start_timer()
36 resp = await ctx.streaming_service.handle_get_program_trading_history(code)
37 result = _serialize_response(resp)
39 if result.get("rt_cd") == "0" and isinstance(result.get("data"), dict): 39 ↛ 42line 39 didn't jump to line 42 because the condition on line 39 was always true
40 mapper = getattr(ctx, 'stock_code_repository', None)
41 result["data"]["name"] = mapper.get_name_by_code(code) if mapper else ""
42 ctx.pm.log_timer(f"get_program_trading_history({code})", t_start)
43 return result
46@router.post("/program-trading/unsubscribe")
47async def unsubscribe_program_trading(req: ProgramTradingUnsubscribeRequest = None):
48 """프로그램매매 구독 해지. code 지정 시 개별 해지, 미지정 시 전체 해지."""
49 ctx = _get_ctx()
50 if req and req.code:
51 await ctx.stop_program_trading(req.code)
52 else:
53 await ctx.stop_all_program_trading()
54 # [변경] 매니저 사용
55 return {"success": True, "codes": ctx.realtime_data_service.get_subscribed_codes()}
58@router.get("/program-trading/status")
59async def get_program_trading_status():
60 """프로그램매매 구독 상태 확인 (시장 개장 여부 포함)."""
61 ctx = _get_ctx()
62 codes = ctx.realtime_data_service.get_subscribed_codes()
63 is_market_open = await ctx.is_market_open_now()
64 return {
65 "subscribed": len(codes) > 0,
66 "codes": codes,
67 "is_market_open": is_market_open,
68 }
71@router.get("/program-trading/stream")
72async def stream_program_trading(request: Request):
73 """SSE 스트리밍: 프로그램매매 실시간 데이터를 브라우저에 전달 (Array 배열 전송 최적화 적용)."""
74 ctx = _get_ctx()
75 # 매니저를 통해 큐 생성 및 등록
76 queue = ctx.realtime_data_service.create_subscriber_queue()
78 async def event_generator():
79 try:
80 # 1. 저장된 과거 데이터 먼저 전송 (Replay)
81 history = ctx.realtime_data_service.get_history_data()
82 for code, items in list(history.items()): 82 ↛ 83line 82 didn't jump to line 83 because the loop on line 82 never started
83 for item in list(items):
84 # 과거 데이터도 클라이언트가 해석할 수 있도록 Array로 변환하여 전송
85 # 배열 순서: [종목코드, 체결시간, 현재가, 등락률, 대비, 부호, 매도체결, 매수체결, 순매수체결, 순매수대금, 매도잔량, 매수잔량]
86 payload = [
87 code,
88 item.get('주식체결시간', ''),
89 item.get('price', 0),
90 item.get('rate', 0),
91 item.get('change', 0),
92 item.get('sign', ''),
93 item.get('매도체결량', 0),
94 item.get('매수2체결량', 0),
95 item.get('순매수체결량', 0),
96 item.get('순매수거래대금', 0),
97 item.get('매도호가잔량', 0),
98 item.get('매수호가잔량', 0)
99 ]
100 yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
101 await asyncio.sleep(0.0001)
103 # 2. 실시간 데이터 전송 (realtime_data_service에서 JSON 직렬화 후 큐에 삽입됨)
104 while True:
105 try:
106 data = await asyncio.wait_for(queue.get(), timeout=15.0)
107 if data is None: # 테스트 종료 신호 (Poison Pill)
108 break
109 yield f"data: {data}\n\n"
110 except asyncio.TimeoutError:
111 if await request.is_disconnected(): 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 break
113 yield ": keepalive\n\n"
114 except asyncio.CancelledError:
115 pass
116 finally:
117 # 연결 종료 시 매니저를 통해 큐 제거
118 ctx.realtime_data_service.remove_subscriber_queue(queue)
120 return StreamingResponse(event_generator(), media_type="text/event-stream")
123@router.post("/program-trading/save-data")
124async def save_pt_data(data: ProgramTradingDataModel):
125 """프로그램 매매 데이터를 서버 파일(data/program_subscribe/pt_data.json)에 저장"""
126 try:
127 # [변경] 매니저를 통해 스냅샷 저장
128 ctx = _get_ctx()
129 t_start = ctx.pm.start_timer()
130 ctx.realtime_data_service.save_snapshot(data.model_dump())
131 ctx.pm.log_timer("save_pt_data", t_start)
132 return {"success": True}
133 except Exception as e:
134 print(f"[WebAPI] PT Data Save Error: {e}")
135 return {"success": False, "msg": str(e)}
138@router.get("/program-trading/load-data")
139async def load_pt_data():
140 """서버 파일에서 프로그램 매매 데이터 로드"""
141 # [변경] 매니저를 통해 스냅샷 로드
142 ctx = _get_ctx()
143 t_start = ctx.pm.start_timer()
144 data = ctx.realtime_data_service.load_snapshot()
145 ctx.pm.log_timer("load_pt_data", t_start)
147 if data is None:
148 return {"success": False, "msg": "File not found"}
149 return {"success": True, "data": data}
152@router.get("/program-trading/db-status")
153async def get_db_status():
154 """DB 내부 상태(스냅샷 시간, 히스토리 건수 등) 조회."""
155 ctx = _get_ctx()
156 return ctx.realtime_data_service.inspect_db_status()
159@router.websocket("/ws/echo")
160async def websocket_endpoint(websocket: WebSocket):
161 """WebSocket 테스트용 에코 엔드포인트."""
162 await websocket.accept()
163 try:
164 while True:
165 data = await websocket.receive_text()
166 await websocket.send_text(f"Message text was: {data}")
167 except WebSocketDisconnect:
168 pass