Coverage for core / retry_queue / api_request_queue.py: 99%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# core/retry_queue/api_request_queue.py 

2import asyncio 

3import dataclasses 

4import random 

5from typing import Callable, Coroutine, Any 

6 

7from core.retry_queue.retry_classifier import classify, RequestOutcome 

8 

9 

10@dataclasses.dataclass 

11class QueuedRequest: 

12 fn: Callable[..., Coroutine] 

13 args: tuple 

14 kwargs: dict 

15 future: asyncio.Future # 호출자에게 최종 결과를 전달하는 Future 

16 attempt: int = 0 

17 request_id: str = "" 

18 

19 

20class ApiRequestQueue: 

21 """ 

22 조회 API 요청의 비동기 재시도 큐. 

23 

24 - 성공(DONE) : future 완료 + _done_q 에 적재 

25 - 재시도(RETRY): 지수 백오프 후 재실행 (asyncio.Task 생성, 큐 블로킹 없음) 

26 - 최종 실패(FAIL): future 완료(실패 결과) + _fail_q 에 적재 

27 

28 주의: 주문(trading) API 는 멱등성 문제로 이 큐를 사용하지 않습니다. 

29 """ 

30 

31 MAX_RETRIES = 5 

32 BASE_DELAY = 1.0 # 초 (지수 백오프: BASE_DELAY * 2^(attempt-1)) 

33 MAX_DELAY = 30.0 # 최대 지연 

34 

35 def __init__(self, logger): 

36 self._logger = logger 

37 self._done_q: asyncio.Queue[tuple[QueuedRequest, Any]] = asyncio.Queue() 

38 self._fail_q: asyncio.Queue[tuple[QueuedRequest, Any]] = asyncio.Queue() 

39 self._pending_tasks: set[asyncio.Task] = set() 

40 

41 # ------------------------------------------------------------------ 

42 # 공개 인터페이스 

43 # ------------------------------------------------------------------ 

44 

45 async def submit(self, fn: Callable, *args, request_id: str = "", **kwargs) -> asyncio.Future: 

46 """ 

47 요청을 즉시 실행합니다. 

48 실패 시 백그라운드에서 자동 재시도하며, 최종 결과를 Future로 반환합니다. 

49 """ 

50 loop = asyncio.get_event_loop() 

51 future = loop.create_future() 

52 req = QueuedRequest(fn, args, kwargs, future, attempt=0, request_id=request_id) 

53 self._spawn(self._execute(req)) 

54 return future 

55 

56 async def stop(self): 

57 """대기 중인 모든 재시도 태스크를 취소합니다.""" 

58 tasks = list(self._pending_tasks) 

59 for task in tasks: 

60 task.cancel() 

61 if tasks: 

62 await asyncio.gather(*tasks, return_exceptions=True) 

63 self._logger.info(f"[RetryQueue] 종료 완료 (취소된 태스크: {len(tasks)}개)") 

64 

65 # ------------------------------------------------------------------ 

66 # 내부 구현 

67 # ------------------------------------------------------------------ 

68 

69 def _spawn(self, coro: Coroutine) -> asyncio.Task: 

70 task = asyncio.create_task(coro) 

71 self._pending_tasks.add(task) 

72 task.add_done_callback(self._pending_tasks.discard) 

73 return task 

74 

75 async def _execute(self, req: QueuedRequest): 

76 try: 

77 result = await req.fn(*req.args, **req.kwargs) 

78 except Exception as e: 

79 self._logger.warning( 

80 f"[RetryQueue] 예외 발생 (id={req.request_id}, attempt={req.attempt}): {e}" 

81 ) 

82 result = None 

83 

84 outcome = classify(result) 

85 

86 if outcome == RequestOutcome.DONE: 

87 self._resolve(req, result) 

88 await self._done_q.put((req, result)) 

89 

90 elif outcome == RequestOutcome.RETRY: 

91 req.attempt += 1 

92 if req.attempt >= self.MAX_RETRIES: 

93 msg = getattr(result, "msg1", "응답 없음") 

94 self._logger.error( 

95 f"[RetryQueue] 최종 실패 (id={req.request_id}, " 

96 f"시도={req.attempt}/{self.MAX_RETRIES}): {msg}" 

97 ) 

98 self._resolve(req, result) 

99 await self._fail_q.put((req, result)) 

100 else: 

101 base = min(self.BASE_DELAY * (2 ** (req.attempt - 1)), self.MAX_DELAY) 

102 delay = base * (0.5 + random.random() * 0.5) # [50%, 100%] jitter 

103 msg = getattr(result, "msg1", "응답 없음") 

104 self._logger.warning( 

105 f"[RetryQueue] {delay:.1f}초 후 재시도 " 

106 f"(id={req.request_id}, {req.attempt}/{self.MAX_RETRIES}): {msg}" 

107 ) 

108 self._spawn(self._delay_and_execute(req, delay)) 

109 

110 else: # FAIL 

111 msg = getattr(result, "msg1", "응답 없음") 

112 self._logger.error( 

113 f"[RetryQueue] 재시도 불가 실패 (id={req.request_id}): {msg}" 

114 ) 

115 self._resolve(req, result) 

116 await self._fail_q.put((req, result)) 

117 

118 async def _delay_and_execute(self, req: QueuedRequest, delay: float): 

119 await asyncio.sleep(delay) 

120 await self._execute(req) 

121 

122 def _resolve(self, req: QueuedRequest, result: Any): 

123 """Future 가 아직 완료되지 않은 경우에만 결과를 설정합니다.""" 

124 if not req.future.done(): 124 ↛ exitline 124 didn't return from function '_resolve' because the condition on line 124 was always true

125 req.future.set_result(result) 

126 

127 # ------------------------------------------------------------------ 

128 # 외부 소비자용 (알림 등에 활용 가능) 

129 # ------------------------------------------------------------------ 

130 

131 @property 

132 def done_queue(self) -> asyncio.Queue: 

133 return self._done_q 

134 

135 @property 

136 def fail_queue(self) -> asyncio.Queue: 

137 return self._fail_q 

138 

139 @property 

140 def pending_count(self) -> int: 

141 return len(self._pending_tasks)