Coverage for services / order_execution_service.py: 94%
146 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# app/order_execution_service.py
2import asyncio
3from typing import Optional
4from common.types import ErrorCode, ResCommonResponse, Exchange
5from core.performance_profiler import PerformanceProfiler
6from core.market_clock import MarketClock
7from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
8from services.market_calendar_service import MarketCalendarService
9from services.price_subscription_service import SubscriptionPriority
12class OrderExecutionService:
13 """
14 주식 매수/매도 주문 및 실시간 체결가/호가 구독 관련 핸들러를 관리하는 클래스입니다.
15 TradingService, Logger, MarketClock 인스턴스를 주입받아 사용합니다.
16 """
18 _ORDER_MAX_RETRIES = 5
19 _ORDER_RETRY_DELAY_SEC = 3
21 def __init__(self, broker_api_wrapper, logger,
22 market_clock: Optional[MarketClock] = None,
23 performance_profiler: Optional[PerformanceProfiler] = None,
24 notification_service: Optional[NotificationService] = None,
25 market_calendar_service: Optional[MarketCalendarService] = None,
26 price_subscription_service=None):
27 self.broker_api_wrapper = broker_api_wrapper
28 self.logger = logger
29 self.market_clock = market_clock
30 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
31 self._notification_service = notification_service
32 self.market_calendar_service = market_calendar_service
33 self._price_sub_svc = price_subscription_service
35 async def _retry_order(self, order_fn, stock_code, price, qty) -> ResCommonResponse:
36 """재시도 가능한 오류에 대해 주문 API를 재시도."""
37 last_result = None
38 for attempt in range(1, self._ORDER_MAX_RETRIES + 1): 38 ↛ 59line 38 didn't jump to line 59 because the loop on line 38 didn't complete
39 result: ResCommonResponse = await order_fn(stock_code, price, qty)
40 if result and result.rt_cd == ErrorCode.SUCCESS.value:
41 return result
42 last_result = result
44 error_code = None
45 if result: 45 ↛ 51line 45 didn't jump to line 51 because the condition on line 45 was always true
46 try:
47 error_code = ErrorCode(result.rt_cd)
48 except ValueError:
49 pass
51 if error_code and error_code.is_retriable and attempt < self._ORDER_MAX_RETRIES:
52 self.logger.warning(
53 f"주문 재시도 {attempt}/{self._ORDER_MAX_RETRIES}: "
54 f"{stock_code}, 사유: {result.msg1}"
55 )
56 await self.market_clock.async_sleep(self._ORDER_RETRY_DELAY_SEC * attempt)
57 continue
58 break
59 return last_result
61 async def _execute_order_via_broker(self, stock_code, price, qty, is_buy: bool, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
62 action_str = "매수" if is_buy else "매도"
63 self.logger.info(f"OrderExecutionService - 주식 {action_str} 주문 요청 - 종목: {stock_code}, 수량: {qty}, 가격: {price}")
64 try:
65 return await self.broker_api_wrapper.place_stock_order(stock_code, price, qty, is_buy=is_buy, exchange=exchange)
66 except Exception as e:
67 self.logger.exception(f"{action_str} 주문 중 오류 발생: {str(e)}")
68 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=f"{action_str} 주문 처리 중 예외 발생: {str(e)}", data=None)
70 async def handle_place_buy_order(self, stock_code, price, qty, exchange: Exchange = Exchange.KRX):
71 """주식 매수 주문 요청 및 결과 출력."""
72 t_start = self.pm.start_timer()
73 if self.market_calendar_service and not await self.market_calendar_service.is_market_open_now():
74 self.logger.warning("시장이 닫혀 있어 매수 주문을 제출하지 못했습니다.")
75 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None)
76 # Fallback if market_calendar_service is not available (though it should be)
77 elif not self.market_calendar_service and not self.market_clock.is_market_operating_hours(): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None)
80 buy_order_result: ResCommonResponse = await self._retry_order(
81 lambda c, p, q: self._execute_order_via_broker(c, p, q, is_buy=True, exchange=exchange), stock_code, price, qty
82 )
83 if buy_order_result and buy_order_result.rt_cd == ErrorCode.SUCCESS.value:
84 self.logger.info(
85 f"주식 매수 주문 성공: 종목={stock_code}, 수량={qty}, 결과={{'rt_cd': '{buy_order_result.rt_cd}', 'msg1': '{buy_order_result.msg1}'}}")
86 if self._price_sub_svc:
87 asyncio.create_task(self._price_sub_svc.add_subscription(
88 stock_code, SubscriptionPriority.HIGH, "portfolio"
89 ))
90 if self._notification_service:
91 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "매수 주문 성공",
92 f"{stock_code} {qty}주 @ {price}원",
93 metadata={"code": stock_code, "qty": qty, "price": price})
94 else:
95 rt_cd = buy_order_result.rt_cd if buy_order_result else 'None'
96 msg1 = buy_order_result.msg1 if buy_order_result else '응답 없음'
97 self.logger.error(
98 f"주식 매수 주문 실패: 종목={stock_code}, 결과={{'rt_cd': '{rt_cd}', 'msg1': '{msg1}'}}")
99 if self._notification_service: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "매수 주문 실패",
101 f"{stock_code} - {msg1}",
102 metadata={"code": stock_code, "error": msg1})
103 self.pm.log_timer(f"OrderExecutionService.handle_place_buy_order({stock_code})", t_start)
104 return buy_order_result
106 async def handle_place_sell_order(self, stock_code, price, qty, exchange: Exchange = Exchange.KRX):
107 """주식 매도 주문 요청 및 결과 출력."""
108 t_start = self.pm.start_timer()
109 if self.market_calendar_service and not await self.market_calendar_service.is_market_open_now():
110 self.logger.warning("시장이 닫혀 있어 매도 주문을 제출하지 못했습니다.")
111 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None)
112 # Fallback if market_calendar_service is not available
113 elif not self.market_calendar_service and not self.market_clock.is_market_operating_hours(): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None)
116 sell_order_result: ResCommonResponse = await self._retry_order(
117 lambda c, p, q: self._execute_order_via_broker(c, p, q, is_buy=False, exchange=exchange), stock_code, price, qty
118 )
119 if sell_order_result and sell_order_result.rt_cd == ErrorCode.SUCCESS.value:
120 self.logger.info(
121 f"주식 매도 주문 성공: 종목={stock_code}, 수량={qty}, 결과={{'rt_cd': '{sell_order_result.rt_cd}', 'msg1': '{sell_order_result.msg1}'}}")
122 if self._price_sub_svc:
123 asyncio.create_task(self._price_sub_svc.remove_subscription(
124 stock_code, "portfolio"
125 ))
126 if self._notification_service:
127 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "매도 주문 성공",
128 f"{stock_code} {qty}주 @ {price}원",
129 metadata={"code": stock_code, "qty": qty, "price": price})
130 else:
131 rt_cd = sell_order_result.rt_cd if sell_order_result else 'None'
132 msg1 = sell_order_result.msg1 if sell_order_result else '응답 없음'
133 self.logger.error(
134 f"주식 매도 주문 실패: 종목={stock_code}, 결과={{'rt_cd': '{rt_cd}', 'msg1': '{msg1}'}}")
135 if self._notification_service:
136 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "매도 주문 실패",
137 f"{stock_code} - {msg1}",
138 metadata={"code": stock_code, "error": msg1})
139 self.pm.log_timer(f"OrderExecutionService.handle_place_sell_order({stock_code})", t_start)
140 return sell_order_result
142 async def handle_buy_stock(self, stock_code, qty_input, price_input, exchange: Exchange = Exchange.KRX):
143 """
144 사용자 입력을 받아 주식 매수 주문을 처리합니다.
145 trading_app.py의 '3'번 옵션에 매핑됩니다.
146 """
148 try:
149 qty = int(qty_input)
150 price = int(price_input)
151 except ValueError:
152 msg = f"잘못된 매수 입력: 수량={qty_input}, 가격={price_input}"
153 self.logger.warning(msg)
154 return ResCommonResponse(rt_cd=ErrorCode.INVALID_INPUT.value, msg1=msg, data=None)
156 # handle_place_buy_order 호출
157 return await self.handle_place_buy_order(stock_code, price, qty, exchange=exchange)
159 async def handle_sell_stock(self, stock_code, qty_input, price_input, exchange: Exchange = Exchange.KRX):
160 """
161 사용자 입력을 받아 주식 매도 주문을 처리합니다.
162 trading_app.py의 '4'번 옵션에 매핑됩니다.
163 """
164 try:
165 qty = int(qty_input)
166 price = int(price_input)
167 except ValueError:
168 msg = f"잘못된 매도 입력: 수량={qty_input}, 가격={price_input}"
169 self.logger.warning(msg)
170 return ResCommonResponse(rt_cd=ErrorCode.INVALID_INPUT.value, msg1=msg, data=None)
172 # handle_place_sell_order 호출
173 return await self.handle_place_sell_order(stock_code, price, qty, exchange=exchange)
175 async def handle_realtime_price_quote_stream(self, stock_code):
176 """
177 실시간 주식 체결가/호가 스트림을 시작하고,
178 사용자 입력이 있을 때까지 데이터를 수신합니다.
179 """
180 self.logger.info(f"\n--- 실시간 주식 체결가/호가 구독 시작 ({stock_code}) ---")
182 # 콜백 함수 정의
183 def realtime_data_display_callback(data):
184 if isinstance(data, dict):
185 data_type = data.get('type')
186 output = data.get('data', {})
188 if data_type == 'realtime_price': # 주식 체결
189 current_price = output.get('STCK_PRPR', 'N/A')
190 acml_vol = output.get('ACML_VOL', 'N/A')
191 trade_time = output.get('STCK_CNTG_HOUR', 'N/A')
192 change_val = output.get('PRDY_VRSS', 'N/A')
193 change_sign = output.get('PRDY_VRSS_SIGN', 'N/A')
194 change_rate = output.get('PRDY_CTRT', 'N/A')
196 display_message = (
197 f"\r[실시간 체결 - {trade_time}] 종목: {stock_code}: 현재가 {current_price}원, "
198 f"전일대비: {change_sign}{change_val} ({change_rate}%), 누적량: {acml_vol}"
199 )
200 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="")
201 elif data_type == 'realtime_quote': # 주식 호가
202 askp1 = output.get('매도호가1', 'N/A')
203 bidp1 = output.get('매수호가1', 'N/A')
204 trade_time = output.get('영업시간', 'N/A')
205 display_message = (
206 f"\r[실시간 호가 - {trade_time}] 종목: {stock_code}: 매도1: {askp1}, 매수1: {bidp1}{' ' * 20}"
207 )
208 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="")
209 elif data_type == 'signing_notice': # 체결 통보
210 order_num = output.get('주문번호', 'N/A')
211 trade_qty = output.get('체결수량', 'N/A')
212 trade_price = output.get('체결단가', 'N/A')
213 trade_time = output.get('주식체결시간', 'N/A')
214 self.logger.debug(f"\n[체결통보] 주문: {order_num}, 수량: {trade_qty}, 단가: {trade_price}, 시간: {trade_time}")
215 else:
216 self.logger.debug(f"처리되지 않은 실시간 메시지: {data.get('tr_id')} - {data}")
218 # 웹소켓 연결 및 구독 요청
219 if await self.broker_api_wrapper.connect_websocket(on_message_callback=realtime_data_display_callback):
220 await self.broker_api_wrapper.subscribe_realtime_price(stock_code)
221 await self.broker_api_wrapper.subscribe_realtime_quote(stock_code)
223 try:
224 await asyncio.to_thread(input)
226 except KeyboardInterrupt:
227 self.logger.info("실시간 구독 중단 (KeyboardInterrupt).")
228 finally:
229 await self.broker_api_wrapper.unsubscribe_realtime_price(stock_code)
230 await self.broker_api_wrapper.unsubscribe_realtime_quote(stock_code)
231 await self.broker_api_wrapper.disconnect_websocket()
232 self.logger.info(f"실시간 주식 스트림 종료: 종목={stock_code}")
233 else:
234 self.logger.error("실시간 웹소켓 연결 실패.")