Coverage for services / order_execution_service.py: 94%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# app/order_execution_service.py 

2import asyncio 

3from typing import Optional 

4from common.types import ErrorCode, ResCommonResponse, Exchange 

5from core.performance_profiler import PerformanceProfiler 

6from core.market_clock import MarketClock 

7from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

8from services.market_calendar_service import MarketCalendarService 

9from services.price_subscription_service import SubscriptionPriority 

10 

11 

12class OrderExecutionService: 

13 """ 

14 주식 매수/매도 주문 및 실시간 체결가/호가 구독 관련 핸들러를 관리하는 클래스입니다. 

15 TradingService, Logger, MarketClock 인스턴스를 주입받아 사용합니다. 

16 """ 

17 

18 _ORDER_MAX_RETRIES = 5 

19 _ORDER_RETRY_DELAY_SEC = 3 

20 

21 def __init__(self, broker_api_wrapper, logger, 

22 market_clock: Optional[MarketClock] = None, 

23 performance_profiler: Optional[PerformanceProfiler] = None, 

24 notification_service: Optional[NotificationService] = None, 

25 market_calendar_service: Optional[MarketCalendarService] = None, 

26 price_subscription_service=None): 

27 self.broker_api_wrapper = broker_api_wrapper 

28 self.logger = logger 

29 self.market_clock = market_clock 

30 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

31 self._notification_service = notification_service 

32 self.market_calendar_service = market_calendar_service 

33 self._price_sub_svc = price_subscription_service 

34 

35 async def _retry_order(self, order_fn, stock_code, price, qty) -> ResCommonResponse: 

36 """재시도 가능한 오류에 대해 주문 API를 재시도.""" 

37 last_result = None 

38 for attempt in range(1, self._ORDER_MAX_RETRIES + 1): 38 ↛ 59line 38 didn't jump to line 59 because the loop on line 38 didn't complete

39 result: ResCommonResponse = await order_fn(stock_code, price, qty) 

40 if result and result.rt_cd == ErrorCode.SUCCESS.value: 

41 return result 

42 last_result = result 

43 

44 error_code = None 

45 if result: 45 ↛ 51line 45 didn't jump to line 51 because the condition on line 45 was always true

46 try: 

47 error_code = ErrorCode(result.rt_cd) 

48 except ValueError: 

49 pass 

50 

51 if error_code and error_code.is_retriable and attempt < self._ORDER_MAX_RETRIES: 

52 self.logger.warning( 

53 f"주문 재시도 {attempt}/{self._ORDER_MAX_RETRIES}: " 

54 f"{stock_code}, 사유: {result.msg1}" 

55 ) 

56 await self.market_clock.async_sleep(self._ORDER_RETRY_DELAY_SEC * attempt) 

57 continue 

58 break 

59 return last_result 

60 

61 async def _execute_order_via_broker(self, stock_code, price, qty, is_buy: bool, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

62 action_str = "매수" if is_buy else "매도" 

63 self.logger.info(f"OrderExecutionService - 주식 {action_str} 주문 요청 - 종목: {stock_code}, 수량: {qty}, 가격: {price}") 

64 try: 

65 return await self.broker_api_wrapper.place_stock_order(stock_code, price, qty, is_buy=is_buy, exchange=exchange) 

66 except Exception as e: 

67 self.logger.exception(f"{action_str} 주문 중 오류 발생: {str(e)}") 

68 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=f"{action_str} 주문 처리 중 예외 발생: {str(e)}", data=None) 

69 

70 async def handle_place_buy_order(self, stock_code, price, qty, exchange: Exchange = Exchange.KRX): 

71 """주식 매수 주문 요청 및 결과 출력.""" 

72 t_start = self.pm.start_timer() 

73 if self.market_calendar_service and not await self.market_calendar_service.is_market_open_now(): 

74 self.logger.warning("시장이 닫혀 있어 매수 주문을 제출하지 못했습니다.") 

75 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None) 

76 # Fallback if market_calendar_service is not available (though it should be) 

77 elif not self.market_calendar_service and not self.market_clock.is_market_operating_hours(): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None) 

79 

80 buy_order_result: ResCommonResponse = await self._retry_order( 

81 lambda c, p, q: self._execute_order_via_broker(c, p, q, is_buy=True, exchange=exchange), stock_code, price, qty 

82 ) 

83 if buy_order_result and buy_order_result.rt_cd == ErrorCode.SUCCESS.value: 

84 self.logger.info( 

85 f"주식 매수 주문 성공: 종목={stock_code}, 수량={qty}, 결과={{'rt_cd': '{buy_order_result.rt_cd}', 'msg1': '{buy_order_result.msg1}'}}") 

86 if self._price_sub_svc: 

87 asyncio.create_task(self._price_sub_svc.add_subscription( 

88 stock_code, SubscriptionPriority.HIGH, "portfolio" 

89 )) 

90 if self._notification_service: 

91 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "매수 주문 성공", 

92 f"{stock_code} {qty}주 @ {price}원", 

93 metadata={"code": stock_code, "qty": qty, "price": price}) 

94 else: 

95 rt_cd = buy_order_result.rt_cd if buy_order_result else 'None' 

96 msg1 = buy_order_result.msg1 if buy_order_result else '응답 없음' 

97 self.logger.error( 

98 f"주식 매수 주문 실패: 종목={stock_code}, 결과={{'rt_cd': '{rt_cd}', 'msg1': '{msg1}'}}") 

99 if self._notification_service: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "매수 주문 실패", 

101 f"{stock_code} - {msg1}", 

102 metadata={"code": stock_code, "error": msg1}) 

103 self.pm.log_timer(f"OrderExecutionService.handle_place_buy_order({stock_code})", t_start) 

104 return buy_order_result 

105 

106 async def handle_place_sell_order(self, stock_code, price, qty, exchange: Exchange = Exchange.KRX): 

107 """주식 매도 주문 요청 및 결과 출력.""" 

108 t_start = self.pm.start_timer() 

109 if self.market_calendar_service and not await self.market_calendar_service.is_market_open_now(): 

110 self.logger.warning("시장이 닫혀 있어 매도 주문을 제출하지 못했습니다.") 

111 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None) 

112 # Fallback if market_calendar_service is not available 

113 elif not self.market_calendar_service and not self.market_clock.is_market_operating_hours(): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 return ResCommonResponse(rt_cd=ErrorCode.MARKET_CLOSED.value, msg1="장 마감 시간에는 주문할 수 없습니다.", data=None) 

115 

116 sell_order_result: ResCommonResponse = await self._retry_order( 

117 lambda c, p, q: self._execute_order_via_broker(c, p, q, is_buy=False, exchange=exchange), stock_code, price, qty 

118 ) 

119 if sell_order_result and sell_order_result.rt_cd == ErrorCode.SUCCESS.value: 

120 self.logger.info( 

121 f"주식 매도 주문 성공: 종목={stock_code}, 수량={qty}, 결과={{'rt_cd': '{sell_order_result.rt_cd}', 'msg1': '{sell_order_result.msg1}'}}") 

122 if self._price_sub_svc: 

123 asyncio.create_task(self._price_sub_svc.remove_subscription( 

124 stock_code, "portfolio" 

125 )) 

126 if self._notification_service: 

127 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "매도 주문 성공", 

128 f"{stock_code} {qty}주 @ {price}원", 

129 metadata={"code": stock_code, "qty": qty, "price": price}) 

130 else: 

131 rt_cd = sell_order_result.rt_cd if sell_order_result else 'None' 

132 msg1 = sell_order_result.msg1 if sell_order_result else '응답 없음' 

133 self.logger.error( 

134 f"주식 매도 주문 실패: 종목={stock_code}, 결과={{'rt_cd': '{rt_cd}', 'msg1': '{msg1}'}}") 

135 if self._notification_service: 

136 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "매도 주문 실패", 

137 f"{stock_code} - {msg1}", 

138 metadata={"code": stock_code, "error": msg1}) 

139 self.pm.log_timer(f"OrderExecutionService.handle_place_sell_order({stock_code})", t_start) 

140 return sell_order_result 

141 

142 async def handle_buy_stock(self, stock_code, qty_input, price_input, exchange: Exchange = Exchange.KRX): 

143 """ 

144 사용자 입력을 받아 주식 매수 주문을 처리합니다. 

145 trading_app.py의 '3'번 옵션에 매핑됩니다. 

146 """ 

147 

148 try: 

149 qty = int(qty_input) 

150 price = int(price_input) 

151 except ValueError: 

152 msg = f"잘못된 매수 입력: 수량={qty_input}, 가격={price_input}" 

153 self.logger.warning(msg) 

154 return ResCommonResponse(rt_cd=ErrorCode.INVALID_INPUT.value, msg1=msg, data=None) 

155 

156 # handle_place_buy_order 호출 

157 return await self.handle_place_buy_order(stock_code, price, qty, exchange=exchange) 

158 

159 async def handle_sell_stock(self, stock_code, qty_input, price_input, exchange: Exchange = Exchange.KRX): 

160 """ 

161 사용자 입력을 받아 주식 매도 주문을 처리합니다. 

162 trading_app.py의 '4'번 옵션에 매핑됩니다. 

163 """ 

164 try: 

165 qty = int(qty_input) 

166 price = int(price_input) 

167 except ValueError: 

168 msg = f"잘못된 매도 입력: 수량={qty_input}, 가격={price_input}" 

169 self.logger.warning(msg) 

170 return ResCommonResponse(rt_cd=ErrorCode.INVALID_INPUT.value, msg1=msg, data=None) 

171 

172 # handle_place_sell_order 호출 

173 return await self.handle_place_sell_order(stock_code, price, qty, exchange=exchange) 

174 

175 async def handle_realtime_price_quote_stream(self, stock_code): 

176 """ 

177 실시간 주식 체결가/호가 스트림을 시작하고, 

178 사용자 입력이 있을 때까지 데이터를 수신합니다. 

179 """ 

180 self.logger.info(f"\n--- 실시간 주식 체결가/호가 구독 시작 ({stock_code}) ---") 

181 

182 # 콜백 함수 정의 

183 def realtime_data_display_callback(data): 

184 if isinstance(data, dict): 

185 data_type = data.get('type') 

186 output = data.get('data', {}) 

187 

188 if data_type == 'realtime_price': # 주식 체결 

189 current_price = output.get('STCK_PRPR', 'N/A') 

190 acml_vol = output.get('ACML_VOL', 'N/A') 

191 trade_time = output.get('STCK_CNTG_HOUR', 'N/A') 

192 change_val = output.get('PRDY_VRSS', 'N/A') 

193 change_sign = output.get('PRDY_VRSS_SIGN', 'N/A') 

194 change_rate = output.get('PRDY_CTRT', 'N/A') 

195 

196 display_message = ( 

197 f"\r[실시간 체결 - {trade_time}] 종목: {stock_code}: 현재가 {current_price}원, " 

198 f"전일대비: {change_sign}{change_val} ({change_rate}%), 누적량: {acml_vol}" 

199 ) 

200 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="") 

201 elif data_type == 'realtime_quote': # 주식 호가 

202 askp1 = output.get('매도호가1', 'N/A') 

203 bidp1 = output.get('매수호가1', 'N/A') 

204 trade_time = output.get('영업시간', 'N/A') 

205 display_message = ( 

206 f"\r[실시간 호가 - {trade_time}] 종목: {stock_code}: 매도1: {askp1}, 매수1: {bidp1}{' ' * 20}" 

207 ) 

208 self.logger.debug(f"\r{display_message}{' ' * (80 - len(display_message))}", end="") 

209 elif data_type == 'signing_notice': # 체결 통보 

210 order_num = output.get('주문번호', 'N/A') 

211 trade_qty = output.get('체결수량', 'N/A') 

212 trade_price = output.get('체결단가', 'N/A') 

213 trade_time = output.get('주식체결시간', 'N/A') 

214 self.logger.debug(f"\n[체결통보] 주문: {order_num}, 수량: {trade_qty}, 단가: {trade_price}, 시간: {trade_time}") 

215 else: 

216 self.logger.debug(f"처리되지 않은 실시간 메시지: {data.get('tr_id')} - {data}") 

217 

218 # 웹소켓 연결 및 구독 요청 

219 if await self.broker_api_wrapper.connect_websocket(on_message_callback=realtime_data_display_callback): 

220 await self.broker_api_wrapper.subscribe_realtime_price(stock_code) 

221 await self.broker_api_wrapper.subscribe_realtime_quote(stock_code) 

222 

223 try: 

224 await asyncio.to_thread(input) 

225 

226 except KeyboardInterrupt: 

227 self.logger.info("실시간 구독 중단 (KeyboardInterrupt).") 

228 finally: 

229 await self.broker_api_wrapper.unsubscribe_realtime_price(stock_code) 

230 await self.broker_api_wrapper.unsubscribe_realtime_quote(stock_code) 

231 await self.broker_api_wrapper.disconnect_websocket() 

232 self.logger.info(f"실시간 주식 스트림 종료: 종목={stock_code}") 

233 else: 

234 self.logger.error("실시간 웹소켓 연결 실패.")