Coverage for task / background / after_market / ranking_task.py: 87%

318 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# task/background/ranking_task.py 

2""" 

3랭킹 데이터 수집 및 캐시 관리 태스크. 

4전체 종목 순회가 필요한 랭킹 집계(외국인/기관/개인 순매수 등)와 

5장마감 후 기본 랭킹 캐시를 관리한다. 

6""" 

7import asyncio 

8import logging 

9import time 

10from datetime import datetime 

11from typing import List, Dict, Optional, TYPE_CHECKING 

12 

13from brokers.broker_api_wrapper import BrokerAPIWrapper 

14from brokers.korea_investment.korea_invest_env import KoreaInvestApiEnv 

15from common.types import ResCommonResponse, ErrorCode 

16from core.market_clock import MarketClock 

17from task.background.after_market.after_market_task_base import AfterMarketTask 

18from interfaces.schedulable_task import TaskState 

19from services.market_calendar_service import MarketCalendarService 

20from repositories.stock_code_repository import StockCodeRepository 

21from core.performance_profiler import PerformanceProfiler 

22from services.telegram_notifier import TelegramReporter 

23from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

24 

25 

26def _chunked(lst, size): 

27 for i in range(0, len(lst), size): 

28 yield lst[i:i + size] 

29 

30 

31# ETF/ETN 브랜드명 접두사 (TradingService._ETF_PREFIXES 와 동일) 

32_ETF_PREFIXES = ( 

33 "KODEX", "TIGER", "KBSTAR", "ARIRANG", "SOL", "ACE", 

34 "HANARO", "KOSEF", "PLUS", "TIMEFOLIO", "WON", "FOCUS", 

35 "VITA", "TREX", "MASTER", "WOORI", "KINDEX", 

36) 

37 

38 

39class RankingTask(AfterMarketTask): 

40 """랭킹 데이터를 수집·캐시하는 백그라운드 태스크.""" 

41 

42 # 청크 크기 및 레이트 리밋 

43 API_CHUNK_SIZE = 8 

44 CHUNK_SLEEP_SEC = 1.1 

45 

46 def __init__( 

47 self, 

48 broker_api_wrapper: BrokerAPIWrapper, 

49 stock_code_repository: StockCodeRepository, 

50 env: KoreaInvestApiEnv = None, 

51 logger=None, 

52 market_clock: MarketClock = None, 

53 market_data_service=None, 

54 performance_profiler: Optional[PerformanceProfiler] = None, 

55 notification_service: Optional[NotificationService] = None, 

56 telegram_reporter: Optional[TelegramReporter] = None, 

57 market_calendar_service: Optional[MarketCalendarService] = None, 

58 ): 

59 super().__init__( 

60 mcs=market_calendar_service, 

61 market_clock=market_clock, 

62 logger=logger or logging.getLogger(__name__), 

63 ) 

64 self._broker = broker_api_wrapper 

65 self.stock_code_repository = stock_code_repository 

66 self._env = env 

67 self._market_data_service = market_data_service 

68 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

69 self._notification_service = notification_service 

70 self._telegram_reporter = telegram_reporter 

71 self._suspend_event: asyncio.Event = asyncio.Event() 

72 self._suspend_event.set() # 초기에는 실행 가능 상태 

73 

74 # 투자자별 순매수 랭킹 캐시 

75 self._foreign_net_buy_cache: List[Dict] = [] 

76 self._foreign_net_sell_cache: List[Dict] = [] 

77 self._inst_net_buy_cache: List[Dict] = [] 

78 self._inst_net_sell_cache: List[Dict] = [] 

79 self._prsn_net_buy_cache: List[Dict] = [] 

80 self._prsn_net_sell_cache: List[Dict] = [] 

81 self._trading_value_cache: List[Dict] = [] # 거래대금 랭킹 (투자자 데이터 기반) 

82 # 프로그램 매매 랭킹 캐시 

83 self._program_net_buy_cache: List[Dict] = [] 

84 self._program_net_sell_cache: List[Dict] = [] 

85 self._investor_ranking_updated_at: Optional[datetime] = None 

86 self._is_refreshing: bool = False 

87 self._last_collected_date: Optional[str] = None 

88 

89 # 기본 랭킹 캐시 (상승/하락/거래량/거래대금) — 장마감 후 1회 

90 self._basic_ranking_cache: Dict[str, ResCommonResponse] = {} 

91 self._basic_ranking_updated_at: Optional[datetime] = None 

92 self._basic_last_collected_date: Optional[str] = None 

93 

94 # 진행률 상태 

95 self._progress: Dict = { 

96 "running": False, 

97 "processed": 0, 

98 "total": 0, 

99 "collected": 0, 

100 "elapsed": 0.0, 

101 } 

102 

103 # ── SchedulableTask 인터페이스 구현 ──────────────────────── 

104 

105 @property 

106 def task_name(self) -> str: 

107 return "ranking_refresh" 

108 

109 @property 

110 def _scheduler_label(self) -> str: 

111 return "RankingTask" 

112 

113 async def start(self) -> None: 

114 """장마감 후 자동 갱신 스케줄러 시작.""" 

115 if self._state == TaskState.RUNNING: 

116 return 

117 self._state = TaskState.RUNNING 

118 self._suspend_event.set() 

119 

120 self._tasks.append( 

121 asyncio.create_task(self.start_after_market_scheduler()) 

122 ) 

123 self._logger.info(f"RankingTask 시작: {len(self._tasks)}개 태스크") 

124 

125 async def suspend(self) -> None: 

126 """랭킹 수집을 일시 중지한다 (chunk 사이에서 대기).""" 

127 if self._state == TaskState.RUNNING: 

128 self._suspend_event.clear() 

129 self._state = TaskState.SUSPENDED 

130 self._logger.info("RankingTask 일시 중지") 

131 

132 async def resume(self) -> None: 

133 """일시 중지된 랭킹 수집을 재개한다.""" 

134 if self._state == TaskState.SUSPENDED: 

135 self._suspend_event.set() 

136 self._state = TaskState.RUNNING 

137 self._logger.info("RankingTask 재개") 

138 

139 # ── 장마감 후 자동 갱신 스케줄러 ──────────────────────────── 

140 

141 async def start_after_market_scheduler(self) -> None: 

142 """장마감 후 자동으로 랭킹 갱신을 스케줄링하는 루프.""" 

143 await self._after_market_scheduler() 

144 

145 async def _on_market_closed(self, latest_trading_date: str) -> None: 

146 """장 마감 후 콜백: 해당 거래일의 랭킹 갱신이 필요하면 실행.""" 

147 needs_basic = ( 

148 not self._basic_last_collected_date 

149 or self._basic_last_collected_date != latest_trading_date 

150 ) 

151 needs_investor = ( 

152 not self._last_collected_date 

153 or self._last_collected_date != latest_trading_date 

154 ) 

155 

156 if needs_basic: 156 ↛ 159line 156 didn't jump to line 159 because the condition on line 156 was always true

157 await self.refresh_basic_ranking() 

158 self._basic_last_collected_date = latest_trading_date 

159 if needs_investor: 159 ↛ exitline 159 didn't return from function '_on_market_closed' because the condition on line 159 was always true

160 await self.refresh_investor_ranking() 

161 self._last_collected_date = latest_trading_date 

162 

163 # ── 기본 랭킹 캐시 (상승/하락/거래량/거래대금) ─────────────── 

164 

165 async def refresh_basic_ranking(self) -> None: 

166 """상승률/하락률/거래량/거래대금 랭킹을 1회 조회하여 캐시.""" 

167 if not self._market_data_service: 

168 self._logger.warning("MarketDataService 미설정 — 기본 랭킹 캐시 스킵") 

169 return 

170 

171 t_start = self.pm.start_timer() 

172 self._logger.info("기본 랭킹 캐시 갱신 시작 (상승/하락/거래량/거래대금)") 

173 try: 

174 rise_resp, fall_resp, vol_resp, tv_resp = await asyncio.gather( 

175 self._market_data_service.get_top_rise_fall_stocks(True), 

176 self._market_data_service.get_top_rise_fall_stocks(False), 

177 self._market_data_service.get_top_volume_stocks(), 

178 self._market_data_service.get_top_trading_value_stocks(), 

179 return_exceptions=True, 

180 ) 

181 for key, resp in [("rise", rise_resp), ("fall", fall_resp), 

182 ("volume", vol_resp), ("trading_value", tv_resp)]: 

183 if isinstance(resp, Exception): 

184 self._logger.error(f"기본 랭킹 '{key}' 조회 실패: {resp}") 

185 else: 

186 self._basic_ranking_cache[key] = resp 

187 

188 self._basic_ranking_updated_at = datetime.now() 

189 self._logger.info(f"기본 랭킹 캐시 갱신 완료: {list(self._basic_ranking_cache.keys())}") 

190 self.pm.log_timer("RankingTask.refresh_basic_ranking", t_start, threshold=1.0) 

191 if self._notification_service: 

192 await self._notification_service.emit( 

193 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "기본 랭킹 갱신 완료", 

194 f"상승/하락/거래량/거래대금 캐시 갱신 완료", 

195 ) 

196 except Exception as e: 

197 self._logger.error(f"기본 랭킹 캐시 갱신 실패: {e}", exc_info=True) 

198 if self._notification_service: 198 ↛ exitline 198 didn't return from function 'refresh_basic_ranking' because the condition on line 198 was always true

199 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "기본 랭킹 갱신 실패", str(e)) 

200 

201 def get_progress(self) -> Dict: 

202 """태스크 진행률 반환 (SchedulableTask 인터페이스 구현).""" 

203 return dict(self._progress) 

204 

205 def get_investor_ranking_progress(self) -> Dict: 

206 """투자자 랭킹 수집 진행률 반환.""" 

207 return self.get_progress() 

208 

209 def get_basic_ranking_cache(self, category: str) -> Optional[ResCommonResponse]: 

210 """장마감 후 캐시된 기본 랭킹 반환. 캐시 없으면 None.""" 

211 return self._basic_ranking_cache.get(category) 

212 

213 # ── 투자자별 순매수/순매도 랭킹 ──────────────────────────── 

214 

215 async def _fetch_with_retry(self, api_call, *args, **kwargs): 

216 """API 호출을 재시도 로직으로 감싸는 헬퍼.""" 

217 t_start = self.pm.start_timer() 

218 max_retries = 3 

219 delay = 1.0 # 초 

220 for attempt in range(max_retries): 

221 try: 

222 resp = await api_call(*args, **kwargs) 

223 if resp and resp.rt_cd == ErrorCode.SUCCESS.value: 

224 self.pm.log_timer(f"RankingTask._fetch_with_retry({api_call.__name__}, {args[0]})", t_start) 

225 return resp 

226 

227 error_msg = resp.msg1 if resp else "응답 없음" 

228 self._logger.warning( 

229 f"API 호출 실패 (시도 {attempt + 1}/{max_retries}): {api_call.__name__}({args[0]}), 사유: {error_msg}. {delay}초 후 재시도." 

230 ) 

231 except Exception as e: 

232 self._logger.error( 

233 f"API 호출 예외 (시도 {attempt + 1}/{max_retries}): {api_call.__name__}({args[0]}), 오류: {e}. {delay}초 후 재시도.", 

234 exc_info=True 

235 ) 

236 

237 if attempt < max_retries - 1: 

238 await asyncio.sleep(delay) 

239 delay *= 1.5 # 약간의 지수 백오프 

240 

241 self._logger.error(f"API 호출 최종 실패: {api_call.__name__}({args[0]})") 

242 self.pm.log_timer(f"RankingTask._fetch_with_retry({api_call.__name__}, {args[0]}) [최종실패]", t_start) 

243 return None # 최종 실패 시 None 반환 

244 

245 async def refresh_investor_ranking(self, force: bool = False) -> None: 

246 """전체 종목을 순회하여 외국인/기관/개인 순매수/순매도 랭킹을 갱신한다.""" 

247 # [성능 보호] 장 중에는 실행하지 않음 

248 if self._mcs and await self._mcs.is_market_open_now(): 

249 self._logger.info("장 운영 중이므로 투자자 랭킹 전체 갱신을 건너뜁니다.") 

250 return 

251 

252 if self._is_refreshing: 

253 self._logger.info("투자자 랭킹 갱신 이미 진행 중 — 스킵") 

254 return 

255 

256 t_start_total = self.pm.start_timer() 

257 self._is_refreshing = True 

258 start_time = time.time() 

259 self._logger.info("투자자 랭킹 백그라운드 갱신 시작") 

260 

261 # [변경] 오늘 날짜 대신 실제 장이 열린 최근 날짜 조회 

262 target_date = None 

263 if self._mcs: 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true

264 target_date = await self._mcs.get_latest_trading_date() 

265 

266 if not target_date: 

267 self._logger.error("최근 거래일을 확인할 수 없어 투자자 랭킹 갱신을 중단합니다.") 

268 self._is_refreshing = False 

269 return 

270 

271 if not force and self._last_collected_date == target_date: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 self._logger.info(f"이미 {target_date} 투자자 랭킹 갱신 완료 — 스킵") 

273 if self._notification_service: 

274 await self._notification_service.emit( 

275 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "투자자 랭킹 갱신 스킵", 

276 f"{target_date} 이미 갱신 완료된 상태입니다." 

277 ) 

278 self._is_refreshing = False 

279 return 

280 

281 self._logger.info(f"투자자 랭킹 백그라운드 갱신 시작 (기준일: {target_date})") 

282 self._progress = {"running": True, "processed": 0, "total": 0, "collected": 0, "elapsed": 0.0} 

283 

284 try: 

285 # 1. 전체 종목 로드 

286 all_stocks = self._load_all_stocks() 

287 total = len(all_stocks) 

288 self._progress["total"] = total 

289 self._logger.info(f"투자자 랭킹: 전체 {total}개 종목 순회 시작") 

290 

291 # 2. 종목별 투자자 매매동향 + 프로그램매매추이 조회 

292 results: List[Dict] = [] 

293 program_results: List[Dict] = [] 

294 processed = 0 

295 

296 for chunk in _chunked(all_stocks, self.API_CHUNK_SIZE): 

297 # suspend 상태이면 resume될 때까지 대기 

298 await self._suspend_event.wait() 

299 

300 # 투자자 매매동향 + 프로그램매매추이 동시 호출 

301 investor_tasks = [ 

302 self._fetch_with_retry(self._broker.get_investor_trade_by_stock_daily, code, target_date) 

303 for code, _, _ in chunk 

304 ] 

305 program_tasks = [ 

306 self._fetch_with_retry(self._broker.get_program_trade_by_stock_daily, code, target_date) 

307 for code, _, _ in chunk 

308 ] 

309 all_responses = await asyncio.gather( 

310 *investor_tasks, *program_tasks, return_exceptions=True 

311 ) 

312 investor_responses = all_responses[:len(chunk)] 

313 program_responses = all_responses[len(chunk):] 

314 

315 for (code, name, market), resp in zip(chunk, investor_responses): 

316 if isinstance(resp, Exception): 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 continue 

318 if not resp: 

319 continue 

320 data = resp.data 

321 if not data: 

322 continue 

323 # 캐시 역직렬화 시 dataclass로 변환될 수 있으므로 dict로 통일 

324 if hasattr(data, 'to_dict') and callable(data.to_dict): 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true

325 data = data.to_dict() 

326 if not isinstance(data, dict): 

327 continue 

328 

329 frgn_qty = int(data.get("frgn_ntby_qty", "0") or "0") 

330 orgn_qty = int(data.get("orgn_ntby_qty", "0") or "0") 

331 prsn_qty = int(data.get("prsn_ntby_qty", "0") or "0") 

332 frgn_pbmn = int(data.get("frgn_ntby_tr_pbmn", "0") or "0") 

333 orgn_pbmn = int(data.get("orgn_ntby_tr_pbmn", "0") or "0") 

334 prsn_pbmn = int(data.get("prsn_ntby_tr_pbmn", "0") or "0") 

335 

336 acml_tr_pbmn = data.get("acml_tr_pbmn", "0") or "0" 

337 

338 results.append({ 

339 "stck_shrn_iscd": code, 

340 "hts_kor_isnm": name, 

341 "stck_prpr": data.get("stck_prpr", "0"), 

342 "prdy_ctrt": data.get("prdy_ctrt", "0"), 

343 "prdy_vrss": data.get("prdy_vrss", "0"), 

344 "prdy_vrss_sign": data.get("prdy_vrss_sign", ""), 

345 "acml_vol": data.get("acml_vol", "0"), 

346 "acml_tr_pbmn": acml_tr_pbmn, 

347 "frgn_ntby_qty": str(frgn_qty), 

348 "orgn_ntby_qty": str(orgn_qty), 

349 "prsn_ntby_qty": str(prsn_qty), 

350 "frgn_ntby_tr_pbmn": str(frgn_pbmn), 

351 "orgn_ntby_tr_pbmn": str(orgn_pbmn), 

352 "prsn_ntby_tr_pbmn": str(prsn_pbmn), 

353 }) 

354 

355 # 프로그램매매추이 수집 

356 for (code, name, market), resp in zip(chunk, program_responses): 

357 if isinstance(resp, Exception): 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 continue 

359 if not resp: 

360 continue 

361 data = resp.data 

362 if not data: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true

363 continue 

364 if hasattr(data, 'to_dict') and callable(data.to_dict): 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true

365 data = data.to_dict() 

366 if not isinstance(data, dict): 

367 continue 

368 

369 ntby_tr_pbmn = int(data.get("whol_smtn_ntby_tr_pbmn", "0") or "0") 

370 

371 program_results.append({ 

372 "stck_shrn_iscd": code, 

373 "hts_kor_isnm": name, 

374 "stck_prpr": data.get("stck_clpr", "0"), 

375 "prdy_ctrt": data.get("prdy_ctrt", "0"), 

376 "prdy_vrss": data.get("prdy_vrss", "0"), 

377 "prdy_vrss_sign": data.get("prdy_vrss_sign", ""), 

378 "acml_vol": data.get("acml_vol", "0"), 

379 "acml_tr_pbmn": data.get("acml_tr_pbmn", "0") or "0", 

380 "whol_smtn_ntby_tr_pbmn": str(ntby_tr_pbmn), 

381 "whol_smtn_ntby_qty": data.get("whol_smtn_ntby_qty", "0") or "0", 

382 "whol_smtn_seln_tr_pbmn": data.get("whol_smtn_seln_tr_pbmn", "0") or "0", 

383 "whol_smtn_shnu_tr_pbmn": data.get("whol_smtn_shnu_tr_pbmn", "0") or "0", 

384 }) 

385 

386 processed += len(chunk) 

387 elapsed = time.time() - start_time 

388 self._progress.update({ 

389 "processed": processed, 

390 "collected": len(results), 

391 "elapsed": round(elapsed, 1), 

392 }) 

393 if processed % 50 == 0 or processed >= total: 

394 self._logger.info( 

395 f"투자자 랭킹 진행: {processed}/{total} ({processed/total*100:.1f}%) " 

396 f"| 수집: {len(results)} | 프로그램: {len(program_results)} | 소요: {elapsed:.1f}s" 

397 ) 

398 

399 # 전체 캐시 HIT면 sleep 불필요, 실제 API 호출이 있었으면 rate limit sleep 

400 all_cache_hit = all( 

401 getattr(r, '_cache_hit', False) 

402 for r in all_responses if not isinstance(r, Exception) 

403 ) 

404 if not all_cache_hit: 404 ↛ 296line 404 didn't jump to line 296 because the condition on line 404 was always true

405 await asyncio.sleep(self.CHUNK_SLEEP_SEC) 

406 

407 # 2-1. 프로그램 데이터의 acml_tr_pbmn으로 투자자 결과 보정 

408 prog_tr_map = {r["stck_shrn_iscd"]: r["acml_tr_pbmn"] for r in program_results} 

409 for r in results: 

410 if int(r.get("acml_tr_pbmn", "0") or "0") == 0: 410 ↛ 409line 410 didn't jump to line 409 because the condition on line 410 was always true

411 r["acml_tr_pbmn"] = prog_tr_map.get(r["stck_shrn_iscd"], "0") 

412 

413 # 3. 투자자별 정렬 → 순매수대금 기준 상위 30 / 하위 30 

414 self._foreign_net_buy_cache, self._foreign_net_sell_cache = \ 

415 self._build_ranking(results, "frgn_ntby_tr_pbmn") 

416 self._inst_net_buy_cache, self._inst_net_sell_cache = \ 

417 self._build_ranking(results, "orgn_ntby_tr_pbmn") 

418 self._prsn_net_buy_cache, self._prsn_net_sell_cache = \ 

419 self._build_ranking(results, "prsn_ntby_tr_pbmn") 

420 

421 # 거래대금 랭킹도 함께 구축 (acml_tr_pbmn 기준 상위 30) 

422 self._trading_value_cache = self._build_trading_value_ranking(results, top_n=30) 

423 

424 # 4. 프로그램 순매수대금 정렬 → 상위 30 / 하위 30 

425 self._program_net_buy_cache, self._program_net_sell_cache = \ 

426 self._build_ranking(program_results, "whol_smtn_ntby_tr_pbmn") 

427 

428 self._investor_ranking_updated_at = datetime.now() 

429 self._last_collected_date = target_date 

430 

431 elapsed = time.time() - start_time 

432 self._logger.info( 

433 f"투자자 랭킹 갱신 완료: {len(results)}개 종목 수집, 소요: {elapsed:.1f}s" 

434 ) 

435 self.pm.log_timer("RankingTask.refresh_investor_ranking", t_start_total, threshold=10.0) 

436 if self._notification_service: 

437 await self._notification_service.emit( 

438 NotificationCategory.BACKGROUND, NotificationLevel.INFO, "투자자 랭킹 갱신 완료", 

439 f"{len(results)}개 종목 수집, 소요: {elapsed:.1f}초", 

440 ) 

441 if self._telegram_reporter: 

442 self._logger.info("텔레그램 랭킹 리포트 전송 시작") 

443 rankings_for_report = { 

444 'foreign_buy': self._foreign_net_buy_cache, 

445 'foreign_sell': self._foreign_net_sell_cache, 

446 'inst_buy': self._inst_net_buy_cache, 

447 'inst_sell': self._inst_net_sell_cache, 

448 'prsn_buy': self._prsn_net_buy_cache, 

449 'prsn_sell': self._prsn_net_sell_cache, 

450 'program_buy': self._program_net_buy_cache, 

451 'program_sell': self._program_net_sell_cache, 

452 'trading_value': self._trading_value_cache, 

453 'all_stocks': results, 

454 'program_all_stocks': program_results 

455 } 

456 try: 

457 await self._telegram_reporter.send_ranking_report(rankings_for_report, report_date=target_date) 

458 self._logger.info("텔레그램 랭킹 리포트 전송 완료") 

459 except Exception as e: 

460 self._logger.error(f"텔레그램 랭킹 리포트 전송 중 오류: {e}", exc_info=True) 

461 except Exception as e: 

462 self._logger.error(f"투자자 랭킹 갱신 실패: {e}", exc_info=True) 

463 if self._notification_service: 463 ↛ 466line 463 didn't jump to line 466 because the condition on line 463 was always true

464 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.ERROR, "투자자 랭킹 갱신 실패", str(e)) 

465 finally: 

466 self._is_refreshing = False 

467 self._progress["running"] = False 

468 

469 @staticmethod 

470 def _build_ranking(results: List[Dict], pbmn_field: str, top_n: int = 30): 

471 """순매수대금 필드 기준 정렬 → (상위 30, 하위 30) 튜플 반환.""" 

472 sorted_list = sorted(results, key=lambda x: int(x[pbmn_field]), reverse=True) 

473 

474 buy_top = [dict(item) for item in sorted_list[:top_n]] 

475 for i, item in enumerate(buy_top, 1): 

476 item["data_rank"] = str(i) 

477 

478 sell_slice = sorted_list[-top_n:] if len(sorted_list) >= top_n else sorted_list[:] 

479 sell_top = [dict(item) for item in reversed(sell_slice)] 

480 for i, item in enumerate(sell_top, 1): 

481 item["data_rank"] = str(i) 

482 

483 return buy_top, sell_top 

484 

485 @staticmethod 

486 def _build_trading_value_ranking(results: List[Dict], top_n: int = 30) -> List[Dict]: 

487 """누적거래대금(acml_tr_pbmn) 기준 내림차순 상위 N개 반환.""" 

488 sorted_list = sorted(results, key=lambda x: int(x.get("acml_tr_pbmn", "0") or "0"), reverse=True) 

489 top = [dict(item) for item in sorted_list[:top_n]] 

490 for i, item in enumerate(top, 1): 

491 item["data_rank"] = str(i) 

492 return top 

493 

494 async def get_trading_value_ranking(self, limit: int = 30) -> ResCommonResponse: 

495 """투자자 데이터 기반 거래대금 랭킹 반환 (캐시에서 즉시).""" 

496 return await self._get_ranking_from_cache(self._trading_value_cache, "거래대금", limit) 

497 

498 async def _check_and_trigger_refresh(self) -> Optional[ResCommonResponse]: 

499 """캐시 비어있으면 온디맨드 갱신 트리거. 즉시 반환할 응답이 있으면 반환.""" 

500 # [성능 보호] 장 중에는 온디맨드 갱신 트리거 안 함 

501 if self._mcs and await self._mcs.is_market_open_now(): 

502 return None 

503 

504 # 캐시 비어있고 갱신 중이 아니면 온디맨드 트리거 

505 if not self._foreign_net_buy_cache and not self._is_refreshing: 

506 try: 

507 asyncio.get_running_loop() 

508 self._logger.info("투자자 랭킹 캐시 없음 → 온디맨드 백그라운드 갱신 트리거") 

509 asyncio.create_task(self.refresh_investor_ranking()) 

510 except RuntimeError: 

511 self._logger.warning("이벤트 루프 없음 — 온디맨드 갱신 스킵") 

512 return None 

513 

514 # ── 외국인 ── 

515 

516 async def get_foreign_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse: 

517 """외국인 순매수 상위 랭킹 반환 (캐시에서 즉시).""" 

518 return await self._get_ranking_from_cache(self._foreign_net_buy_cache, "외국인 순매수", limit) 

519 

520 async def get_foreign_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse: 

521 """외국인 순매도 상위 랭킹 반환 (캐시에서 즉시).""" 

522 return await self._get_ranking_from_cache(self._foreign_net_sell_cache, "외국인 순매도", limit) 

523 

524 # ── 기관 ── 

525 

526 async def get_inst_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse: 

527 """기관 순매수 상위 랭킹 반환 (캐시에서 즉시).""" 

528 return await self._get_ranking_from_cache(self._inst_net_buy_cache, "기관 순매수", limit) 

529 

530 async def get_inst_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse: 

531 """기관 순매도 상위 랭킹 반환 (캐시에서 즉시).""" 

532 return await self._get_ranking_from_cache(self._inst_net_sell_cache, "기관 순매도", limit) 

533 

534 # ── 개인 ── 

535 

536 async def get_prsn_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse: 

537 """개인 순매수 상위 랭킹 반환 (캐시에서 즉시).""" 

538 return await self._get_ranking_from_cache(self._prsn_net_buy_cache, "개인 순매수", limit) 

539 

540 async def get_prsn_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse: 

541 """개인 순매도 상위 랭킹 반환 (캐시에서 즉시).""" 

542 return await self._get_ranking_from_cache(self._prsn_net_sell_cache, "개인 순매도", limit) 

543 

544 # ── 프로그램 ── 

545 

546 async def get_program_net_buy_ranking(self, limit: int = 30) -> ResCommonResponse: 

547 """프로그램 순매수 상위 랭킹 반환 (캐시에서 즉시).""" 

548 return await self._get_ranking_from_cache(self._program_net_buy_cache, "프로그램 순매수", limit) 

549 

550 async def get_program_net_sell_ranking(self, limit: int = 30) -> ResCommonResponse: 

551 """프로그램 순매도 상위 랭킹 반환 (캐시에서 즉시).""" 

552 return await self._get_ranking_from_cache(self._program_net_sell_cache, "프로그램 순매도", limit) 

553 

554 # ── 내부 헬퍼 ───────────────────────────────────────────── 

555 

556 async def _get_ranking_from_cache(self, cache: List[Dict], label: str, limit: int) -> ResCommonResponse: 

557 """캐시에서 랭킹 데이터 반환. 캐시 없으면 트리거 + 빈 응답.""" 

558 blocked = await self._check_and_trigger_refresh() 

559 if blocked: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 return blocked 

561 if not cache: 

562 return ResCommonResponse( 

563 rt_cd=ErrorCode.SUCCESS.value, 

564 msg1="데이터 수집 중...", 

565 data=[] 

566 ) 

567 return ResCommonResponse( 

568 rt_cd=ErrorCode.SUCCESS.value, 

569 msg1=f"{label} 상위 종목 조회 성공", 

570 data=cache[:limit] 

571 ) 

572 

573 def _load_all_stocks(self) -> List[tuple]: 

574 """StockCodeRepository에서 KOSPI/KOSDAQ 전체 종목 로드.""" 

575 all_stocks = [] 

576 for _, row in self.stock_code_repository.df.iterrows(): 

577 code = row.get("종목코드", "") 

578 name = row.get("종목명", "") 

579 market = row.get("시장구분", "") 

580 

581 if not code: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true

582 continue 

583 

584 # ETF/ETN 사전 필터링으로 불필요한 API 호출 방지 

585 if any(name.startswith(p) for p in _ETF_PREFIXES): 

586 continue 

587 

588 # [성능 개선] 우선주(코드 끝자리가 0이 아님) 및 스팩(SPAC) 제외 

589 if code[-1] != '0': 

590 continue 

591 if "스팩" in name: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 continue 

593 

594 if market in ("KOSPI", "KOSDAQ"): 594 ↛ 576line 594 didn't jump to line 576 because the condition on line 594 was always true

595 all_stocks.append((code, name, market)) 

596 return all_stocks 

597 

598 async def force_collect(self) -> None: 

599 """강제 수집: skip 조건을 무시하고 투자자 랭킹을 재수집한다.""" 

600 self._logger.info("RankingTask 강제 수집 요청") 

601 await self.refresh_investor_ranking(force=True)