Coverage for core / retry_queue / client_with_retry_queue.py: 100%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# core/retry_queue/client_with_retry_queue.py 

2import asyncio 

3from core.retry_queue.api_request_queue import ApiRequestQueue 

4 

5 

6# 주문(멱등성 우려) 및 WebSocket(상태 기반) 메서드는 큐를 통하지 않고 직접 호출 

7_EXCLUDED_METHODS = frozenset({ 

8 # --- Trading (멱등성 보장 불가) --- 

9 "place_stock_order", 

10 

11 # --- WebSocket (상태 기반, 재시도 의미 없음) --- 

12 "connect_websocket", 

13 "disconnect_websocket", 

14 "subscribe_realtime_price", 

15 "unsubscribe_realtime_price", 

16 "subscribe_realtime_quote", 

17 "unsubscribe_realtime_quote", 

18 "subscribe_program_trading", 

19 "unsubscribe_program_trading", 

20 "subscribe_unified_price", 

21 "unsubscribe_unified_price", 

22 "is_websocket_receive_alive", 

23}) 

24 

25 

26class ClientWithRetryQueue: 

27 """ 

28 BrokerAPIWrapper._client 를 감싸는 retry-queue 프록시. 

29 

30 - 조회/계좌 API: submit() 을 통해 실패 시 자동 재시도 

31 - 주문/WebSocket API: 큐 우회, 직접 위임 (기존 동작 유지) 

32 """ 

33 

34 def __init__(self, client, queue: ApiRequestQueue): 

35 self._client = client 

36 self._queue = queue 

37 

38 def __getattr__(self, name: str): 

39 attr = getattr(self._client, name) 

40 

41 # 동기 메서드 또는 제외 목록 → 그대로 반환 

42 if name in _EXCLUDED_METHODS or not asyncio.iscoroutinefunction(attr): 

43 return attr 

44 

45 # 비동기 조회 메서드 → 큐를 통해 실행 

46 async def queued(*args, **kwargs): 

47 future = await self._queue.submit(attr, *args, request_id=name, **kwargs) 

48 return await future 

49 

50 return queued 

51 

52 

53def retry_queue_wrap_client(client, queue: ApiRequestQueue) -> ClientWithRetryQueue: 

54 """BrokerAPIWrapper.__init__ 에서 호출하는 팩토리 함수.""" 

55 return ClientWithRetryQueue(client, queue)