Coverage for task / background / after_market / ranking_task.py: 87%
318 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# task/background/ranking_task.py
2"""
3랭킹 데이터 수집 및 캐시 관리 태스크.
4전체 종목 순회가 필요한 랭킹 집계(외국인/기관/개인 순매수 등)와
5장마감 후 기본 랭킹 캐시를 관리한다.
6"""
7import asyncio
8import logging
9import time
10from datetime import datetime
11from typing import List, Dict, Optional, TYPE_CHECKING
13from brokers.broker_api_wrapper import BrokerAPIWrapper
14from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv
15from common.types import ResCommonResponse, ErrorCode
16from core.market_clock import MarketClock
17from task.background.after_market.after_market_task_base import AfterMarketTask
18from interfaces.schedulable_task import TaskState
19from services.market_calendar_service import MarketCalendarService
20from repositories.stock_code_repository import StockCodeRepository
21from core.performance_profiler import PerformanceProfiler
22from services.telegram_notifier import TelegramReporter
23from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
26def _chunked(lst, size):
27 for i in range(0, len(lst), size):
28 yield lst[i:i + size]
31# ETF/ETN 브랜드명 접두사 (TradingService._ETF_PREFIXES 와 동일)
32_ETF_PREFIXES = (
33 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE",
34 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS",
35 "VITA", "TREX", "MASTER", "WOORI", "KINDEX",
36)
39class RankingTask(AfterMarketTask):
40 """랭킹 데이터를 수집·캐시하는 백그라운드 태스크."""
42 # 청크 크기 및 레이트 리밋
43 API_CHUNK_SIZE = 8
44 CHUNK_SLEEP_SEC = 1.1
46 def __init__(
47 self,
48 broker_api_wrapper: BrokerAPIWrapper,
49 stock_code_repository: StockCodeRepository,
50 env: KoreaInvestApiEnv = None,
51 logger=None,
52 market_clock: MarketClock = None,
53 market_data_service=None,
54 performance_profiler: Optional[PerformanceProfiler] = None,
55 notification_service: Optional[NotificationService] = None,
56 telegram_reporter: Optional[TelegramReporter] = None,
57 market_calendar_service: Optional[MarketCalendarService] = None,
58 ):
59 super().__init__(
60 mcs=market_calendar_service,
61 market_clock=market_clock,
62 logger=logger or logging.getLogger(__name__),
63 )
64 self._broker = broker_api_wrapper
65 self.stock_code_repository = stock_code_repository
66 self._env = env
67 self._market_data_service = market_data_service
68 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
69 self._notification_service = notification_service
70 self._telegram_reporter = telegram_reporter
71 self._suspend_event: asyncio.Event = asyncio.Event()
72 self._suspend_event.set() # 초기에는 실행 가능 상태
74 # 투자자별 순매수 랭킹 캐시
75 self._foreign_net_buy_cache: List[Dict] = []
76 self._foreign_net_sell_cache: List[Dict] = []
77 self._inst_net_buy_cache: List[Dict] = []
78 self._inst_net_sell_cache: List[Dict] = []
79 self._prsn_net_buy_cache: List[Dict] = []
80 self._prsn_net_sell_cache: List[Dict] = []
81 self._trading_value_cache: List[Dict] = [] # 거래대금 랭킹 (투자자 데이터 기반)
82 # 프로그램 매매 랭킹 캐시
83 self._program_net_buy_cache: List[Dict] = []
84 self._program_net_sell_cache: List[Dict] = []
85 self._investor_ranking_updated_at: Optional[datetime] = None
86 self._is_refreshing: bool = False
87 self._last_collected_date: Optional[str] = None
89 # 기본 랭킹 캐시 (상승/하락/거래량/거래대금) — 장마감 후 1회
90 self._basic_ranking_cache: Dict[str, ResCommonResponse] = {}
91 self._basic_ranking_updated_at: Optional[datetime] = None
92 self._basic_last_collected_date: Optional[str] = None
94 # 진행률 상태
95 self._progress: Dict = {
96 "running": False,
97 "processed": 0,
98 "total": 0,
99 "collected": 0,
100 "elapsed": 0.0,
101 }
103 # ── SchedulableTask 인터페이스 구현 ────────────────────────
105 @property
106 def task_name(self) -> str:
107 return "ranking_refresh"
109 @property
110 def _scheduler_label(self) -> str:
111 return "RankingTask"
113 async def start(self) -> None:
114 """장마감 후 자동 갱신 스케줄러 시작."""
115 if self._state == TaskState.RUNNING:
116 return
117 self._state = TaskState.RUNNING
118 self._suspend_event.set()
120 self._tasks.append(
121 asyncio.create_task(self.start_after_market_scheduler())
122 )
123 self._logger.info(f"RankingTask 시작: {len(self._tasks)}개 태스크")
125 async def suspend(self) -> None:
126 """랭킹 수집을 일시 중지한다 (chunk 사이에서 대기)."""
127 if self._state == TaskState.RUNNING:
128 self._suspend_event.clear()
129 self._state = TaskState.SUSPENDED
130 self._logger.info("RankingTask 일시 중지")
132 async def resume(self) -> None:
133 """일시 중지된 랭킹 수집을 재개한다."""
134 if self._state == TaskState.SUSPENDED:
135 self._suspend_event.set()
136 self._state = TaskState.RUNNING
137 self._logger.info("RankingTask 재개")
139 # ── 장마감 후 자동 갱신 스케줄러 ────────────────────────────
141 async def start_after_market_scheduler(self) -> None:
142 """장마감 후 자동으로 랭킹 갱신을 스케줄링하는 루프."""
143 await self._after_market_scheduler()
145 async def _on_market_closed(self, latest_trading_date: str) -> None:
146 """장 마감 후 콜백: 해당 거래일의 랭킹 갱신이 필요하면 실행."""
147 needs_basic = (
148 not self._basic_last_collected_date
149 or self._basic_last_collected_date != latest_trading_date
150 )
151 needs_investor = (
152 not self._last_collected_date
153 or self._last_collected_date != latest_trading_date
154 )
156 if needs_basic: 156 ↛ 159line 156 didn't jump to line 159 because the condition on line 156 was always true
157 await self.refresh_basic_ranking()
158 self._basic_last_collected_date = latest_trading_date
159 if needs_investor: 159 ↛ exitline 159 didn't return from function '_on_market_closed' because the condition on line 159 was always true
160 await self.refresh_investor_ranking()
161 self._last_collected_date = latest_trading_date
163 # ── 기본 랭킹 캐시 (상승/하락/거래량/거래대금) ───────────────
165 async def refresh_basic_ranking(self) -> None:
166 """상승률/하락률/거래량/거래대금 랭킹을 1회 조회하여 캐시."""
167 if not self._market_data_service:
168 self._logger.warning("MarketDataService 미설정 — 기본 랭킹 캐시 스킵")
169 return
171 t_start = self.pm.start_timer()
172 self._logger.info("기본 랭킹 캐시 갱신 시작 (상승/하락/거래량/거래대금)")
173 try:
174 rise_resp, fall_resp, vol_resp, tv_resp = await asyncio.gather(
175 self._market_data_service.get_top_rise_fall_stocks(True),
176 self._market_data_service.get_top_rise_fall_stocks(False),
177 self._market_data_service.get_top_volume_stocks(),
178 self._market_data_service.get_top_trading_value_stocks(),
179 return_exceptions=True,
180 )
181 for key, resp in [("rise", rise_resp), ("fall", fall_resp),
182 ("volume", vol_resp), ("trading_value", tv_resp)]:
183 if isinstance(resp, Exception):
184 self._logger.error(f"기본 랭킹 '{key}' 조회 실패: {resp}")
185 else:
186 self._basic_ranking_cache[key] = resp
188 self._basic_ranking_updated_at = datetime.now()
189 self._logger.info(f"기본 랭킹 캐시 갱신 완료: {list(self._basic_ranking_cache.keys())}")
190 self.pm.log_timer("RankingTask.refresh_basic_ranking", t_start, threshold=1.0)
191 if self._notification_service:
192 await self._notification_service.emit(
193 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "기본 랭킹 갱신 완료",
194 f"상승/하락/거래량/거래대금 캐시 갱신 완료",
195 )
196 except Exception as e:
197 self._logger.error(f"기본 랭킹 캐시 갱신 실패: {e}", exc_info=True)
198 if self._notification_service: 198 ↛ exitline 198 didn't return from function 'refresh_basic_ranking' because the condition on line 198 was always true
199 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "기본 랭킹 갱신 실패", str(e))
201 def get_progress(self) -> Dict:
202 """태스크 진행률 반환 (SchedulableTask 인터페이스 구현)."""
203 return dict(self._progress)
205 def get_investor_ranking_progress(self) -> Dict:
206 """투자자 랭킹 수집 진행률 반환."""
207 return self.get_progress()
209 def get_basic_ranking_cache(self, category: str) -> Optional[ResCommonResponse]:
210 """장마감 후 캐시된 기본 랭킹 반환. 캐시 없으면 None."""
211 return self._basic_ranking_cache.get(category)
213 # ── 투자자별 순매수/순매도 랭킹 ────────────────────────────
215 async def _fetch_with_retry(self, api_call, *args, **kwargs):
216 """API 호출을 재시도 로직으로 감싸는 헬퍼."""
217 t_start = self.pm.start_timer()
218 max_retries = 3
219 delay = 1.0 # 초
220 for attempt in range(max_retries):
221 try:
222 resp = await api_call(*args, **kwargs)
223 if resp and resp.rt_cd == ErrorCode.SUCCESS.value:
224 self.pm.log_timer(f"RankingTask._fetch_with_retry({api_call.__name__}, {args[0]})", t_start)
225 return resp
227 error_msg = resp.msg1 if resp else "응답 없음"
228 self._logger.warning(
229 f"API 호출 실패 (시도 {attempt + 1}/{max_retries}): {api_call.__name__}({args[0]}), 사유: {error_msg}. {delay}초 후 재시도."
230 )
231 except Exception as e:
232 self._logger.error(
233 f"API 호출 예외 (시도 {attempt + 1}/{max_retries}): {api_call.__name__}({args[0]}), 오류: {e}. {delay}초 후 재시도.",
234 exc_info=True
235 )
237 if attempt < max_retries - 1:
238 await asyncio.sleep(delay)
239 delay *= 1.5 # 약간의 지수 백오프
241 self._logger.error(f"API 호출 최종 실패: {api_call.__name__}({args[0]})")
242 self.pm.log_timer(f"RankingTask._fetch_with_retry({api_call.__name__}, {args[0]}) [최종실패]", t_start)
243 return None # 최종 실패 시 None 반환
245 async def refresh_investor_ranking(self, force: bool = False) -> None:
246 """전체 종목을 순회하여 외국인/기관/개인 순매수/순매도 랭킹을 갱신한다."""
247 # [성능 보호] 장 중에는 실행하지 않음
248 if self._mcs and await self._mcs.is_market_open_now():
249 self._logger.info("장 운영 중이므로 투자자 랭킹 전체 갱신을 건너뜁니다.")
250 return
252 if self._is_refreshing:
253 self._logger.info("투자자 랭킹 갱신 이미 진행 중 — 스킵")
254 return
256 t_start_total = self.pm.start_timer()
257 self._is_refreshing = True
258 start_time = time.time()
259 self._logger.info("투자자 랭킹 백그라운드 갱신 시작")
261 # [변경] 오늘 날짜 대신 실제 장이 열린 최근 날짜 조회
262 target_date = None
263 if self._mcs: 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true
264 target_date = await self._mcs.get_latest_trading_date()
266 if not target_date:
267 self._logger.error("최근 거래일을 확인할 수 없어 투자자 랭킹 갱신을 중단합니다.")
268 self._is_refreshing = False
269 return
271 if not force and self._last_collected_date == target_date: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 self._logger.info(f"이미 {target_date} 투자자 랭킹 갱신 완료 — 스킵")
273 if self._notification_service:
274 await self._notification_service.emit(
275 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "투자자 랭킹 갱신 스킵",
276 f"{target_date} 이미 갱신 완료된 상태입니다."
277 )
278 self._is_refreshing = False
279 return
281 self._logger.info(f"투자자 랭킹 백그라운드 갱신 시작 (기준일: {target_date})")
282 self._progress = {"running": True, "processed": 0, "total": 0, "collected": 0, "elapsed": 0.0}
284 try:
285 # 1. 전체 종목 로드
286 all_stocks = self._load_all_stocks()
287 total = len(all_stocks)
288 self._progress["total"] = total
289 self._logger.info(f"투자자 랭킹: 전체 {total}개 종목 순회 시작")
291 # 2. 종목별 투자자 매매동향 + 프로그램매매추이 조회
292 results: List[Dict] = []
293 program_results: List[Dict] = []
294 processed = 0
296 for chunk in _chunked(all_stocks, self.API_CHUNK_SIZE):
297 # suspend 상태이면 resume될 때까지 대기
298 await self._suspend_event.wait()
300 # 투자자 매매동향 + 프로그램매매추이 동시 호출
301 investor_tasks = [
302 self._fetch_with_retry(self._broker.get_investor_trade_by_stock_daily, code, target_date)
303 for code, _, _ in chunk
304 ]
305 program_tasks = [
306 self._fetch_with_retry(self._broker.get_program_trade_by_stock_daily, code, target_date)
307 for code, _, _ in chunk
308 ]
309 all_responses = await asyncio.gather(
310 *investor_tasks, *program_tasks, return_exceptions=True
311 )
312 investor_responses = all_responses[:len(chunk)]
313 program_responses = all_responses[len(chunk):]
315 for (code, name, market), resp in zip(chunk, investor_responses):
316 if isinstance(resp, Exception): 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 continue
318 if not resp:
319 continue
320 data = resp.data
321 if not data:
322 continue
323 # 캐시 역직렬화 시 dataclass로 변환될 수 있으므로 dict로 통일
324 if hasattr(data, 'to_dict') and callable(data.to_dict): 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true
325 data = data.to_dict()
326 if not isinstance(data, dict):
327 continue
329 frgn_qty = int(data.get("frgn_ntby_qty", "0") or "0")
330 orgn_qty = int(data.get("orgn_ntby_qty", "0") or "0")
331 prsn_qty = int(data.get("prsn_ntby_qty", "0") or "0")
332 frgn_pbmn = int(data.get("frgn_ntby_tr_pbmn", "0") or "0")
333 orgn_pbmn = int(data.get("orgn_ntby_tr_pbmn", "0") or "0")
334 prsn_pbmn = int(data.get("prsn_ntby_tr_pbmn", "0") or "0")
336 acml_tr_pbmn = data.get("acml_tr_pbmn", "0") or "0"
338 results.append({
339 "stck_shrn_iscd": code,
340 "hts_kor_isnm": name,
341 "stck_prpr": data.get("stck_prpr", "0"),
342 "prdy_ctrt": data.get("prdy_ctrt", "0"),
343 "prdy_vrss": data.get("prdy_vrss", "0"),
344 "prdy_vrss_sign": data.get("prdy_vrss_sign", ""),
345 "acml_vol": data.get("acml_vol", "0"),
346 "acml_tr_pbmn": acml_tr_pbmn,
347 "frgn_ntby_qty": str(frgn_qty),
348 "orgn_ntby_qty": str(orgn_qty),
349 "prsn_ntby_qty": str(prsn_qty),
350 "frgn_ntby_tr_pbmn": str(frgn_pbmn),
351 "orgn_ntby_tr_pbmn": str(orgn_pbmn),
352 "prsn_ntby_tr_pbmn": str(prsn_pbmn),
353 })
355 # 프로그램매매추이 수집
356 for (code, name, market), resp in zip(chunk, program_responses):
357 if isinstance(resp, Exception): 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 continue
359 if not resp:
360 continue
361 data = resp.data
362 if not data: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true
363 continue
364 if hasattr(data, 'to_dict') and callable(data.to_dict): 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true
365 data = data.to_dict()
366 if not isinstance(data, dict):
367 continue
369 ntby_tr_pbmn = int(data.get("whol_smtn_ntby_tr_pbmn", "0") or "0")
371 program_results.append({
372 "stck_shrn_iscd": code,
373 "hts_kor_isnm": name,
374 "stck_prpr": data.get("stck_clpr", "0"),
375 "prdy_ctrt": data.get("prdy_ctrt", "0"),
376 "prdy_vrss": data.get("prdy_vrss", "0"),
377 "prdy_vrss_sign": data.get("prdy_vrss_sign", ""),
378 "acml_vol": data.get("acml_vol", "0"),
379 "acml_tr_pbmn": data.get("acml_tr_pbmn", "0") or "0",
380 "whol_smtn_ntby_tr_pbmn": str(ntby_tr_pbmn),
381 "whol_smtn_ntby_qty": data.get("whol_smtn_ntby_qty", "0") or "0",
382 "whol_smtn_seln_tr_pbmn": data.get("whol_smtn_seln_tr_pbmn", "0") or "0",
383 "whol_smtn_shnu_tr_pbmn": data.get("whol_smtn_shnu_tr_pbmn", "0") or "0",
384 })
386 processed += len(chunk)
387 elapsed = time.time() - start_time
388 self._progress.update({
389 "processed": processed,
390 "collected": len(results),
391 "elapsed": round(elapsed, 1),
392 })
393 if processed % 50 == 0 or processed >= total:
394 self._logger.info(
395 f"투자자 랭킹 진행: {processed}/{total} ({processed/total*100:.1f}%) "
396 f"| 수집: {len(results)} | 프로그램: {len(program_results)} | 소요: {elapsed:.1f}s"
397 )
399 # 전체 캐시 HIT면 sleep 불필요, 실제 API 호출이 있었으면 rate limit sleep
400 all_cache_hit = all(
401 getattr(r, '_cache_hit', False)
402 for r in all_responses if not isinstance(r, Exception)
403 )
404 if not all_cache_hit: 404 ↛ 296line 404 didn't jump to line 296 because the condition on line 404 was always true
405 await asyncio.sleep(self.CHUNK_SLEEP_SEC)
407 # 2-1. 프로그램 데이터의 acml_tr_pbmn으로 투자자 결과 보정
408 prog_tr_map = {r["stck_shrn_iscd"]: r["acml_tr_pbmn"] for r in program_results}
409 for r in results:
410 if int(r.get("acml_tr_pbmn", "0") or "0") == 0: 410 ↛ 409line 410 didn't jump to line 409 because the condition on line 410 was always true
411 r["acml_tr_pbmn"] = prog_tr_map.get(r["stck_shrn_iscd"], "0")
413 # 3. 투자자별 정렬 → 순매수대금 기준 상위 30 / 하위 30
414 self._foreign_net_buy_cache, self._foreign_net_sell_cache = \
415 self._build_ranking(results, "frgn_ntby_tr_pbmn")
416 self._inst_net_buy_cache, self._inst_net_sell_cache = \
417 self._build_ranking(results, "orgn_ntby_tr_pbmn")
418 self._prsn_net_buy_cache, self._prsn_net_sell_cache = \
419 self._build_ranking(results, "prsn_ntby_tr_pbmn")
421 # 거래대금 랭킹도 함께 구축 (acml_tr_pbmn 기준 상위 30)
422 self._trading_value_cache = self._build_trading_value_ranking(results, top_n=30)
424 # 4. 프로그램 순매수대금 정렬 → 상위 30 / 하위 30
425 self._program_net_buy_cache, self._program_net_sell_cache = \
426 self._build_ranking(program_results, "whol_smtn_ntby_tr_pbmn")
428 self._investor_ranking_updated_at = datetime.now()
429 self._last_collected_date = target_date
431 elapsed = time.time() - start_time
432 self._logger.info(
433 f"투자자 랭킹 갱신 완료: {len(results)}개 종목 수집, 소요: {elapsed:.1f}s"
434 )
435 self.pm.log_timer("RankingTask.refresh_investor_ranking", t_start_total, threshold=10.0)
436 if self._notification_service:
437 await self._notification_service.emit(
438 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "투자자 랭킹 갱신 완료",
439 f"{len(results)}개 종목 수집, 소요: {elapsed:.1f}초",
440 )
441 if self._telegram_reporter:
442 self._logger.info("텔레그램 랭킹 리포트 전송 시작")
443 rankings_for_report = {
444 'foreign_buy': self._foreign_net_buy_cache,
445 'foreign_sell': self._foreign_net_sell_cache,
446 'inst_buy': self._inst_net_buy_cache,
447 'inst_sell': self._inst_net_sell_cache,
448 'prsn_buy': self._prsn_net_buy_cache,
449 'prsn_sell': self._prsn_net_sell_cache,
450 'program_buy': self._program_net_buy_cache,
451 'program_sell': self._program_net_sell_cache,
452 'trading_value': self._trading_value_cache,
453 'all_stocks': results,
454 'program_all_stocks': program_results
455 }
456 try:
457 await self._telegram_reporter.send_ranking_report(rankings_for_report, report_date=target_date)
458 self._logger.info("텔레그램 랭킹 리포트 전송 완료")
459 except Exception as e:
460 self._logger.error(f"텔레그램 랭킹 리포트 전송 중 오류: {e}", exc_info=True)
461 except Exception as e:
462 self._logger.error(f"투자자 랭킹 갱신 실패: {e}", exc_info=True)
463 if self._notification_service: 463 ↛ 466line 463 didn't jump to line 466 because the condition on line 463 was always true
464 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "투자자 랭킹 갱신 실패", str(e))
465 finally:
466 self._is_refreshing = False
467 self._progress["running"] = False
469 @staticmethod
470 def _build_ranking(results: List[Dict], pbmn_field: str, top_n: int = 30):
471 """순매수대금 필드 기준 정렬 → (상위 30, 하위 30) 튜플 반환."""
472 sorted_list = sorted(results, key=lambda x: int(x[pbmn_field]), reverse=True)
474 buy_top = [dict(item) for item in sorted_list[:top_n]]
475 for i, item in enumerate(buy_top, 1):
476 item["data_rank"] = str(i)
478 sell_slice = sorted_list[-top_n:] if len(sorted_list) >= top_n else sorted_list[:]
479 sell_top = [dict(item) for item in reversed(sell_slice)]
480 for i, item in enumerate(sell_top, 1):
481 item["data_rank"] = str(i)
483 return buy_top, sell_top
485 @staticmethod
486 def _build_trading_value_ranking(results: List[Dict], top_n: int = 30) -> List[Dict]:
487 """누적거래대금(acml_tr_pbmn) 기준 내림차순 상위 N개 반환."""
488 sorted_list = sorted(results, key=lambda x: int(x.get("acml_tr_pbmn", "0") or "0"), reverse=True)
489 top = [dict(item) for item in sorted_list[:top_n]]
490 for i, item in enumerate(top, 1):
491 item["data_rank"] = str(i)
492 return top
494 async def get_trading_value_ranking(self, limit: int = 30) -> ResCommonResponse:
495 """투자자 데이터 기반 거래대금 랭킹 반환 (캐시에서 즉시)."""
496 return await self._get_ranking_from_cache(self._trading_value_cache, "거래대금", limit)
498 async def _check_and_trigger_refresh(self) -> Optional[ResCommonResponse]:
499 """캐시 비어있으면 온디맨드 갱신 트리거. 즉시 반환할 응답이 있으면 반환."""
500 # [성능 보호] 장 중에는 온디맨드 갱신 트리거 안 함
501 if self._mcs and await self._mcs.is_market_open_now():
502 return None
504 # 캐시 비어있고 갱신 중이 아니면 온디맨드 트리거
505 if not self._foreign_net_buy_cache and not self._is_refreshing:
506 try:
507 asyncio.get_running_loop()
508 self._logger.info("투자자 랭킹 캐시 없음 → 온디맨드 백그라운드 갱신 트리거")
509 asyncio.create_task(self.refresh_investor_ranking())
510 except RuntimeError:
511 self._logger.warning("이벤트 루프 없음 — 온디맨드 갱신 스킵")
512 return None
514 # ── 외국인 ──
516 async def get_foreign_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse:
517 """외국인 순매수 상위 랭킹 반환 (캐시에서 즉시)."""
518 return await self._get_ranking_from_cache(self._foreign_net_buy_cache, "외국인 순매수", limit)
520 async def get_foreign_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse:
521 """외국인 순매도 상위 랭킹 반환 (캐시에서 즉시)."""
522 return await self._get_ranking_from_cache(self._foreign_net_sell_cache, "외국인 순매도", limit)
524 # ── 기관 ──
526 async def get_inst_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse:
527 """기관 순매수 상위 랭킹 반환 (캐시에서 즉시)."""
528 return await self._get_ranking_from_cache(self._inst_net_buy_cache, "기관 순매수", limit)
530 async def get_inst_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse:
531 """기관 순매도 상위 랭킹 반환 (캐시에서 즉시)."""
532 return await self._get_ranking_from_cache(self._inst_net_sell_cache, "기관 순매도", limit)
534 # ── 개인 ──
536 async def get_prsn_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse:
537 """개인 순매수 상위 랭킹 반환 (캐시에서 즉시)."""
538 return await self._get_ranking_from_cache(self._prsn_net_buy_cache, "개인 순매수", limit)
540 async def get_prsn_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse:
541 """개인 순매도 상위 랭킹 반환 (캐시에서 즉시)."""
542 return await self._get_ranking_from_cache(self._prsn_net_sell_cache, "개인 순매도", limit)
544 # ── 프로그램 ──
546 async def get_program_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse:
547 """프로그램 순매수 상위 랭킹 반환 (캐시에서 즉시)."""
548 return await self._get_ranking_from_cache(self._program_net_buy_cache, "프로그램 순매수", limit)
550 async def get_program_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse:
551 """프로그램 순매도 상위 랭킹 반환 (캐시에서 즉시)."""
552 return await self._get_ranking_from_cache(self._program_net_sell_cache, "프로그램 순매도", limit)
554 # ── 내부 헬퍼 ─────────────────────────────────────────────
556 async def _get_ranking_from_cache(self, cache: List[Dict], label: str, limit: int) -> ResCommonResponse:
557 """캐시에서 랭킹 데이터 반환. 캐시 없으면 트리거 + 빈 응답."""
558 blocked = await self._check_and_trigger_refresh()
559 if blocked: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 return blocked
561 if not cache:
562 return ResCommonResponse(
563 rt_cd=ErrorCode.SUCCESS.value,
564 msg1="데이터 수집 중...",
565 data=[]
566 )
567 return ResCommonResponse(
568 rt_cd=ErrorCode.SUCCESS.value,
569 msg1=f"{label} 상위 종목 조회 성공",
570 data=cache[:limit]
571 )
573 def _load_all_stocks(self) -> List[tuple]:
574 """StockCodeRepository에서 KOSPI/KOSDAQ 전체 종목 로드."""
575 all_stocks = []
576 for _, row in self.stock_code_repository.df.iterrows():
577 code = row.get("종목코드", "")
578 name = row.get("종목명", "")
579 market = row.get("시장구분", "")
581 if not code: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true
582 continue
584 # ETF/ETN 사전 필터링으로 불필요한 API 호출 방지
585 if any(name.startswith(p) for p in _ETF_PREFIXES):
586 continue
588 # [성능 개선] 우선주(코드 끝자리가 0이 아님) 및 스팩(SPAC) 제외
589 if code[-1] != '0':
590 continue
591 if "스팩" in name: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true
592 continue
594 if market in ("KOSPI", "KOSDAQ"): 594 ↛ 576line 594 didn't jump to line 576 because the condition on line 594 was always true
595 all_stocks.append((code, name, market))
596 return all_stocks
598 async def force_collect(self) -> None:
599 """강제 수집: skip 조건을 무시하고 투자자 랭킹을 재수집한다."""
600 self._logger.info("RankingTask 강제 수집 요청")
601 await self.refresh_investor_ranking(force=True)