Coverage for core / retry_queue / api_request_queue.py: 99%
81 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# core/retry_queue/api_request_queue.py
2import asyncio
3import dataclasses
4import random
5from typing import Callable, Coroutine, Any
7from core.retry_queue.retry_classifier import classify, RequestOutcome
10@dataclasses.dataclass
11class QueuedRequest:
12 fn: Callable[..., Coroutine]
13 args: tuple
14 kwargs: dict
15 future: asyncio.Future # 호출자에게 최종 결과를 전달하는 Future
16 attempt: int = 0
17 request_id: str = ""
20class ApiRequestQueue:
21 """
22 조회 API 요청의 비동기 재시도 큐.
24 - 성공(DONE) : future 완료 + _done_q 에 적재
25 - 재시도(RETRY): 지수 백오프 후 재실행 (asyncio.Task 생성, 큐 블로킹 없음)
26 - 최종 실패(FAIL): future 완료(실패 결과) + _fail_q 에 적재
28 주의: 주문(trading) API 는 멱등성 문제로 이 큐를 사용하지 않습니다.
29 """
31 MAX_RETRIES = 5
32 BASE_DELAY = 1.0 # 초 (지수 백오프: BASE_DELAY * 2^(attempt-1))
33 MAX_DELAY = 30.0 # 최대 지연
35 def __init__(self, logger):
36 self._logger = logger
37 self._done_q: asyncio.Queue[tuple[QueuedRequest, Any]] = asyncio.Queue()
38 self._fail_q: asyncio.Queue[tuple[QueuedRequest, Any]] = asyncio.Queue()
39 self._pending_tasks: set[asyncio.Task] = set()
41 # ------------------------------------------------------------------
42 # 공개 인터페이스
43 # ------------------------------------------------------------------
45 async def submit(self, fn: Callable, *args, request_id: str = "", **kwargs) -> asyncio.Future:
46 """
47 요청을 즉시 실행합니다.
48 실패 시 백그라운드에서 자동 재시도하며, 최종 결과를 Future로 반환합니다.
49 """
50 loop = asyncio.get_event_loop()
51 future = loop.create_future()
52 req = QueuedRequest(fn, args, kwargs, future, attempt=0, request_id=request_id)
53 self._spawn(self._execute(req))
54 return future
56 async def stop(self):
57 """대기 중인 모든 재시도 태스크를 취소합니다."""
58 tasks = list(self._pending_tasks)
59 for task in tasks:
60 task.cancel()
61 if tasks:
62 await asyncio.gather(*tasks, return_exceptions=True)
63 self._logger.info(f"[RetryQueue] 종료 완료 (취소된 태스크: {len(tasks)}개)")
65 # ------------------------------------------------------------------
66 # 내부 구현
67 # ------------------------------------------------------------------
69 def _spawn(self, coro: Coroutine) -> asyncio.Task:
70 task = asyncio.create_task(coro)
71 self._pending_tasks.add(task)
72 task.add_done_callback(self._pending_tasks.discard)
73 return task
75 async def _execute(self, req: QueuedRequest):
76 try:
77 result = await req.fn(*req.args, **req.kwargs)
78 except Exception as e:
79 self._logger.warning(
80 f"[RetryQueue] 예외 발생 (id={req.request_id}, attempt={req.attempt}): {e}"
81 )
82 result = None
84 outcome = classify(result)
86 if outcome == RequestOutcome.DONE:
87 self._resolve(req, result)
88 await self._done_q.put((req, result))
90 elif outcome == RequestOutcome.RETRY:
91 req.attempt += 1
92 if req.attempt >= self.MAX_RETRIES:
93 msg = getattr(result, "msg1", "응답 없음")
94 self._logger.error(
95 f"[RetryQueue] 최종 실패 (id={req.request_id}, "
96 f"시도={req.attempt}/{self.MAX_RETRIES}): {msg}"
97 )
98 self._resolve(req, result)
99 await self._fail_q.put((req, result))
100 else:
101 base = min(self.BASE_DELAY * (2 ** (req.attempt - 1)), self.MAX_DELAY)
102 delay = base * (0.5 + random.random() * 0.5) # [50%, 100%] jitter
103 msg = getattr(result, "msg1", "응답 없음")
104 self._logger.warning(
105 f"[RetryQueue] {delay:.1f}초 후 재시도 "
106 f"(id={req.request_id}, {req.attempt}/{self.MAX_RETRIES}): {msg}"
107 )
108 self._spawn(self._delay_and_execute(req, delay))
110 else: # FAIL
111 msg = getattr(result, "msg1", "응답 없음")
112 self._logger.error(
113 f"[RetryQueue] 재시도 불가 실패 (id={req.request_id}): {msg}"
114 )
115 self._resolve(req, result)
116 await self._fail_q.put((req, result))
118 async def _delay_and_execute(self, req: QueuedRequest, delay: float):
119 await asyncio.sleep(delay)
120 await self._execute(req)
122 def _resolve(self, req: QueuedRequest, result: Any):
123 """Future 가 아직 완료되지 않은 경우에만 결과를 설정합니다."""
124 if not req.future.done(): 124 ↛ exitline 124 didn't return from function '_resolve' because the condition on line 124 was always true
125 req.future.set_result(result)
127 # ------------------------------------------------------------------
128 # 외부 소비자용 (알림 등에 활용 가능)
129 # ------------------------------------------------------------------
131 @property
132 def done_queue(self) -> asyncio.Queue:
133 return self._done_q
135 @property
136 def fail_queue(self) -> asyncio.Queue:
137 return self._fail_q
139 @property
140 def pending_count(self) -> int:
141 return len(self._pending_tasks)