Coverage for services / stock_query_service.py: 95%

387 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# app/stock_query_service.py 

2from __future__ import annotations 

3from common.types import ErrorCode, ResCommonResponse, ResTopMarketCapApiItem, ResBasicStockInfo, \ 

4 ResStockFullInfoApiOutput, Exchange 

5from config.DynamicConfig import DynamicConfig 

6from typing import List, Dict, Optional, Literal 

7from core.performance_profiler import PerformanceProfiler 

8from services.notification_service import NotificationService, NotificationCategory, NotificationLevel 

9from services.market_data_service import MarketDataService 

10 

11 

12class StockQueryService: 

13 """ 

14 주식 현재가, 계좌 잔고, 시가총액 조회 등 데이터 조회 관련 핸들러를 관리하는 클래스입니다. 

15 MarketDataService, BrokerAPIWrapper 등 인스턴스를 주입받아 사용합니다. 

16 """ 

17 

18 def __init__(self, market_data_service: MarketDataService, logger, market_clock, indicator_service=None, 

19 ranking_task=None, performance_profiler: Optional[PerformanceProfiler] = None, 

20 notification_service: Optional[NotificationService] = None, 

21 broker_api_wrapper=None): 

22 self.broker = broker_api_wrapper 

23 self.market_data_service = market_data_service 

24 self.logger = logger 

25 self.market_clock = market_clock 

26 self.indicator_service = indicator_service 

27 self.ranking_task = ranking_task 

28 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

29 self._notification_service = notification_service 

30 

31 def _get_sign_from_code(self, sign_code): 

32 """API 응답의 부호 코드(1,2,3,4,5)를 실제 부호 문자열로 변환합니다.""" 

33 if sign_code == '1' or sign_code == '2': # 1:상한, 2:상승 

34 return "+" 

35 elif sign_code == '4' or sign_code == '5': # 4:하한, 5:하락 

36 return "-" 

37 else: # 3:보합 (또는 기타) 

38 return "" 

39 

40 async def get_current_price(self, stock_code: str, exchange: Exchange = Exchange.KRX, count_stats: bool = True, caller: str = "unknown") -> ResCommonResponse: 

41 """현재가만 빠르게 조회 (MarketDataService 래퍼).""" 

42 return await self.market_data_service.get_current_price(stock_code, exchange=exchange, count_stats=count_stats, caller=caller) 

43 

44 async def get_multi_price(self, stock_codes: list[str]) -> ResCommonResponse: 

45 """복수종목 현재가 조회 (최대 30종목, MarketDataService 래퍼).""" 

46 return await self.market_data_service.get_multi_price(stock_codes) 

47 

48 async def get_top_trading_value_stocks(self) -> ResCommonResponse: 

49 """거래대금 상위 종목 조회 (MarketDataService 래퍼).""" 

50 return await self.market_data_service.get_top_trading_value_stocks() 

51 

52 async def get_top_rise_fall_stocks(self, rise: bool = True) -> ResCommonResponse: 

53 """상승/하락 상위 종목 조회 (MarketDataService 래퍼).""" 

54 return await self.market_data_service.get_top_rise_fall_stocks(rise) 

55 

56 async def get_top_volume_stocks(self) -> ResCommonResponse: 

57 """거래량 상위 종목 조회 (MarketDataService 래퍼).""" 

58 return await self.market_data_service.get_top_volume_stocks() 

59 

60 async def get_financial_ratio(self, stock_code: str) -> ResCommonResponse: 

61 """재무비율 조회 (MarketDataService 래퍼).""" 

62 return await self.market_data_service.get_financial_ratio(stock_code) 

63 

64 async def get_stock_conclusion(self, stock_code: str) -> ResCommonResponse: 

65 """체결 정보 조회 (MarketDataService 래퍼).""" 

66 return await self.market_data_service.get_stock_conclusion(stock_code) 

67 

68 async def handle_get_current_stock_price(self, stock_code, caller: str = "unknown", exchange: Exchange = Exchange.KRX): 

69 """주식 현재가 및 상세 정보 조회 요청 및 결과 출력.""" 

70 self.logger.info(f"Stock_Query_Service - {stock_code} 현재가 및 상세 정보 조회 요청") 

71 resp: ResCommonResponse = await self.market_data_service.get_current_price(stock_code, exchange=exchange, caller=caller) 

72 

73 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

74 msg = resp.msg1 if resp else "응답 없음" 

75 self.logger.error(f"{stock_code} 현재가 및 상세 정보 조회 실패: {msg}") 

76 if self._notification_service: 

77 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.WARNING, "현재가 조회 실패", 

78 f"{stock_code} - {msg}", 

79 metadata={"code": stock_code}) 

80 return ResCommonResponse( 

81 rt_cd=(resp.rt_cd if resp else ErrorCode.API_ERROR.value), 

82 msg1=msg, 

83 data={"code": stock_code}, 

84 ) 

85 

86 # --- output 추출 및 통일화(ResStockFullInfoApiOutput) --- 

87 output = (resp.data or {}).get("output") if isinstance(resp.data, dict) else None 

88 

89 if not isinstance(output, ResStockFullInfoApiOutput): 

90 self.logger.error(f"잘못된 응답 데이터 타입 또는 output 없음: {type(output)}") 

91 return ResCommonResponse( 

92 rt_cd=ErrorCode.PARSING_ERROR.value, 

93 msg1=f"잘못된 응답 데이터 타입 또는 output 없음: {type(output)}", 

94 data={"code": stock_code}, 

95 ) 

96 

97 status_code_map = { 

98 "51": "관리종목", "52": "투자위험", "53": "투자경고", "54": "투자주의", 

99 "55": "신용가능", "57": "증거금 100%", "58": "거래정지", "59": "단기과열" 

100 } 

101 status_description = status_code_map.get(output.iscd_stat_cls_code, "정보 없음") 

102 

103 # 부호 처리 로직 추가 

104 change_val = output.prdy_vrss 

105 sign_code = output.prdy_vrss_sign 

106 actual_sign = self._get_sign_from_code(sign_code) 

107 

108 display_change = change_val 

109 try: 

110 f = float(change_val) 

111 if f != 0: 

112 display_change = f"{abs(int(f))}" 

113 else: 

114 display_change = "0" 

115 except (ValueError, TypeError): 

116 pass 

117 

118 view = { 

119 # 기본 정보 

120 "code": stock_code, 

121 "name": await self.market_data_service.get_name_by_code(stock_code), 

122 "is_new_high": output.is_new_high, # 신고가 여부 추가 

123 "is_new_low": output.is_new_low, # 신저가 여부 추가 

124 "price": output.stck_prpr, 

125 "change": output.prdy_vrss, 

126 "change_absolute": display_change, 

127 "rate": output.prdy_ctrt, 

128 "sign": actual_sign, 

129 "time": self.market_clock.get_current_kst_time().strftime("%H:%M:%S"), 

130 "bstp_kor_isnm": output.bstp_kor_isnm, 

131 "iscd_stat_cls_code_desc": f"{status_description} ({output.iscd_stat_cls_code})", 

132 

133 # 거래 정보 

134 "acml_tr_pbmn": output.acml_tr_pbmn, 

135 "acml_vol": output.acml_vol, 

136 "prdy_vrss_vol_rate": output.prdy_vrss_vol_rate, 

137 "frgn_ntby_qty": output.frgn_ntby_qty, 

138 "pgtr_ntby_qty": output.pgtr_ntby_qty, 

139 

140 # 당일 가격 정보 

141 "open": output.stck_oprc, 

142 "high": output.stck_hgpr, 

143 "low": output.stck_lwpr, 

144 "prev_close": output.stck_sdpr, # 기준가 

145 

146 # 투자 지표 

147 "per": output.per, 

148 "pbr": output.pbr, 

149 "eps": output.eps, 

150 "bps": output.bps, 

151 

152 # 250일 정보 

153 "d250_hgpr": output.d250_hgpr, 

154 "d250_hgpr_date": output.d250_hgpr_date, 

155 "d250_hgpr_vrss_prpr_rate": output.d250_hgpr_vrss_prpr_rate, 

156 "d250_lwpr": output.d250_lwpr, 

157 "d250_lwpr_date": output.d250_lwpr_date, 

158 "d250_lwpr_vrss_prpr_rate": output.d250_lwpr_vrss_prpr_rate, 

159 

160 # 연중 정보 

161 "dryy_hgpr": output.stck_dryy_hgpr, 

162 "dryy_hgpr_vrss_prpr_rate": output.dryy_hgpr_vrss_prpr_rate, 

163 "dryy_hgpr_date": output.dryy_hgpr_date, 

164 "dryy_lwpr": output.stck_dryy_lwpr, 

165 "dryy_lwpr_vrss_prpr_rate": output.dryy_lwpr_vrss_prpr_rate, 

166 "dryy_lwpr_date": output.dryy_lwpr_date, 

167 

168 # 52주 정보 

169 "w52_hgpr": output.w52_hgpr, 

170 "w52_hgpr_vrss_prpr_ctrt": output.w52_hgpr_vrss_prpr_ctrt, 

171 "w52_hgpr_date": output.w52_hgpr_date, 

172 "w52_lwpr": output.w52_lwpr, 

173 "w52_lwpr_vrss_prpr_ctrt": output.w52_lwpr_vrss_prpr_ctrt, 

174 "w52_lwpr_date": output.w52_lwpr_date, 

175 

176 # 기타 상태 

177 "crdt_able_yn": "가능" if output.crdt_able_yn == "Y" else "불가능", 

178 "short_over_yn": "예" if output.short_over_yn == "Y" else "아니오", 

179 "sltr_yn": "예" if output.sltr_yn == "Y" else "아니오", 

180 "mang_issu_cls_code": "예" if output.mang_issu_cls_code and output.mang_issu_cls_code.strip() else "아니오", 

181 } 

182 self.logger.info(f"{stock_code} 현재가 및 상세 정보 조회 성공") 

183 if self._notification_service: 

184 name = view.get("name", stock_code) 

185 sign_str = actual_sign if actual_sign == "+" else "" 

186 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "현재가 조회", 

187 f"{name}({stock_code}) {view['price']}원 ({sign_str}{view['rate']}%)", 

188 metadata={"code": stock_code, "price": view["price"]}) 

189 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="정상", data=view) 

190 

191 async def handle_get_account_balance(self, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

192 """계좌 잔고 조회 요청 및 결과 출력.""" 

193 resp = await self.broker.get_account_balance(exchange=exchange) 

194 if self._notification_service: 

195 if resp and resp.rt_cd == ErrorCode.SUCCESS.value: 

196 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "잔고 조회 완료", "계좌 잔고 조회 성공") 

197 else: 

198 msg = resp.msg1 if resp else "응답 없음" 

199 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.WARNING, "잔고 조회 실패", msg) 

200 return resp 

201 

202 async def handle_get_top_market_cap_stocks_code(self, market_code: str = "0000", limit: int = 30) -> ResCommonResponse: 

203 """ 

204 시가총액 상위 종목 중 상한가 도달 종목 조회 (출력 X). 

205 data: List[dict(code,name,price,change_rate)] 

206 """ 

207 self.logger.debug(f"상한가 스캔 요청 (시장={market_code}, limit={limit})") 

208 

209 # 모의투자 / 장시간 검증 

210 if getattr(self.market_data_service._env, "is_paper_trading", False): 

211 self.logger.warning("모의투자 환경에서는 상한가 조회 미지원") 

212 return ResCommonResponse( 

213 rt_cd=ErrorCode.API_ERROR.value, 

214 msg1="모의투자 미지원 API입니다.", 

215 data=None 

216 ) 

217 

218 try: 

219 # 상위 종목 조회 

220 top_res: ResCommonResponse = await self.market_data_service.get_top_market_cap_stocks_code(market_code, limit) 

221 if not top_res or top_res.rt_cd != ErrorCode.SUCCESS.value: 

222 self.logger.error(f"상위 종목 목록 조회 실패: {top_res}") 

223 return ResCommonResponse( 

224 rt_cd=ErrorCode.API_ERROR.value, 

225 msg1="상위 종목 목록 조회 실패", 

226 data=None 

227 ) 

228 

229 top_list: List[ResTopMarketCapApiItem] = top_res.data or [] 

230 if not top_list: 

231 self.logger.debug("상위 종목 없음") 

232 return ResCommonResponse( 

233 rt_cd=ErrorCode.SUCCESS.value, 

234 msg1="조회 성공 (종목 없음)", 

235 data=[] 

236 ) 

237 

238 targets = top_list[:limit] 

239 found: list[dict] = [] 

240 

241 for item in targets: 

242 # dataclass(ResTopMarketCapApiItem)와 dict 모두 지원 

243 get = (lambda k: getattr(item, k, None)) if not isinstance(item, dict) else item.get 

244 

245 code = get("mksc_shrn_iscd") or get("iscd") 

246 name = get("hts_kor_isnm") 

247 prdy_vrss_sign = get("prdy_vrss_sign") 

248 stck_prpr = get("stck_prpr") 

249 prdy_ctrt = get("prdy_ctrt") 

250 

251 if not code: 

252 self.logger.warning(f"유효하지 않은 종목코드: {item}") 

253 continue 

254 

255 # 정책: prdy_vrss_sign == '1'이면 상한으로 간주 

256 if prdy_vrss_sign == "1": 

257 found.append({ 

258 "code": code, 

259 "name": name, 

260 "price": str(stck_prpr) if stck_prpr is not None else None, 

261 "change_rate": str(prdy_ctrt) if prdy_ctrt is not None else None, 

262 }) 

263 self.logger.debug(f"상한가 발견: {name}({code}) {stck_prpr}원 {prdy_ctrt}%") 

264 else: 

265 # 필요시 디버그 로그만 

266 self.logger.debug(f"상한가 아님: {name}({code}) sign={prdy_vrss_sign}") 

267 

268 self.logger.info("시가총액 상위 종목 조회 성공") 

269 return ResCommonResponse( 

270 rt_cd=ErrorCode.SUCCESS.value, 

271 msg1="조회 성공", 

272 data=found # 빈 리스트 허용 

273 ) 

274 

275 except Exception as e: 

276 self.logger.exception("상한가 조회 중 예외") 

277 return ResCommonResponse( 

278 rt_cd=ErrorCode.UNKNOWN_ERROR.value, 

279 msg1=f"예외 발생: {e}", 

280 data=None 

281 ) 

282 

283 async def get_stock_change_rate(self, stock_code: str) -> ResCommonResponse: 

284 """ 

285 전일대비 등락률 조회. 출력 없음. 계산/포맷만 수행하여 ResCommonResponse로 반환. 

286 data 예시: 

287 { 

288 "stock_code": "005930", 

289 "current_price": "70400", 

290 "change_value_display": "+500", # 부호/0 처리 적용된 표시값 

291 "change_rate": "0.71" # API 그대로 문자열 유지 

292 } 

293 """ 

294 res: ResCommonResponse = await self.market_data_service.get_current_price(stock_code, caller="StockQueryService") 

295 if not (res and res.rt_cd == ErrorCode.SUCCESS.value): 

296 self.logger.error(f"{stock_code} 전일대비 등락률 조회 실패: {res}") 

297 # 실패도 통일된 형태로 반환 

298 return ResCommonResponse(rt_cd="1", msg1="조회 실패", data={"stock_code": stock_code}) 

299 

300 output = res.data.get("output") or {} 

301 current_price = output.stck_prpr 

302 change_val_str = output.prdy_vrss 

303 change_sign_code = output.prdy_vrss_sign 

304 change_rate_str = output.prdy_ctrt 

305 

306 actual_sign = self._get_sign_from_code(change_sign_code) 

307 

308 display_change_val = change_val_str 

309 try: 

310 f = float(change_val_str) 

311 if f != 0: 

312 display_change_val = f"{actual_sign}{abs(int(f))}" 

313 elif f == 0: 313 ↛ 319line 313 didn't jump to line 319 because the condition on line 313 was always true

314 display_change_val = "0" 

315 except (ValueError, TypeError): 

316 # 숫자 아님 → 그대로 노출 

317 pass 

318 

319 data = { 

320 "stock_code": stock_code, 

321 "current_price": current_price, 

322 "change_value_display": display_change_val, 

323 "change_rate": change_rate_str, 

324 } 

325 self.logger.info( 

326 f"{stock_code} 전일대비 등락률 조회 성공: 현재가={current_price}, " 

327 f"전일대비={display_change_val}, 등락률={change_rate_str}%" 

328 ) 

329 return ResCommonResponse(rt_cd="0", msg1="정상", data=data) 

330 

331 async def get_open_vs_current(self, stock_code: str) -> ResCommonResponse: 

332 """ 

333 시가 대비 등락률/금액 계산 후 반환. 출력 없음. 

334 data 예시: 

335 { 

336 "stock_code": "005930", 

337 "current_price": "70400", 

338 "open_price": "70000", 

339 "vs_open_value_display": "+400", # 금액 부호/0 처리 

340 "vs_open_rate_display": "+0.57%" # 퍼센트 부호/0 처리 

341 } 

342 """ 

343 res: ResCommonResponse = await self.market_data_service.get_current_price(stock_code, caller="StockQueryService") 

344 if not (res and res.rt_cd == ErrorCode.SUCCESS.value): 

345 self.logger.error(f"{stock_code} 시가대비 조회 실패: {res}") 

346 return ResCommonResponse(rt_cd="1", msg1="조회 실패", data={"stock_code": stock_code}) 

347 

348 output = res.data.get("output") or {} 

349 cur_str = output.stck_prpr 

350 open_str = output.stck_oprc 

351 

352 try: 

353 cur = float(cur_str) if cur_str not in (None, "N/A") else None 

354 opn = float(open_str) if open_str not in (None, "N/A") else None 

355 except (ValueError, TypeError): 

356 self.logger.warning( 

357 f"{stock_code} 시가대비 조회 실패: 가격 파싱 오류 (현재가={cur_str}, 시가={open_str})" 

358 ) 

359 return ResCommonResponse(rt_cd="1", msg1="가격 파싱 오류", data={"stock_code": stock_code}) 

360 

361 vs_val_disp = "N/A" 

362 vs_rate_disp = "N/A" 

363 

364 if cur is not None and opn is not None: 

365 diff = cur - opn 

366 vs_val_disp = "0" if diff == 0 else f"{diff:+.0f}" 

367 if opn != 0: 

368 vs_rate_disp = f"{(diff / opn) * 100:+.2f}%" 

369 else: 

370 vs_rate_disp = "N/A" 

371 

372 data = { 

373 "stock_code": stock_code, 

374 "current_price": cur_str, 

375 "open_price": open_str, 

376 "vs_open_value_display": vs_val_disp, 

377 "vs_open_rate_display": vs_rate_disp, 

378 } 

379 self.logger.info( 

380 f"{stock_code} 시가대비 조회 성공: 현재가={cur_str}, 시가={open_str}, " 

381 f"시가대비={vs_val_disp} ({vs_rate_disp})" 

382 ) 

383 return ResCommonResponse(rt_cd="0", msg1="정상", data=data) 

384 

385 async def handle_upper_limit_stocks(self, market_code: str = "0000", limit: int = 500): 

386 """ 

387 시가총액 상위 종목 조회 (출력 X). TradingService 결과를 표준 스키마로 반환. 

388 data: List[ResTopMarketCapApiItem] 

389 """ 

390 

391 try: 

392 res: ResCommonResponse = await self.market_data_service.get_top_market_cap_stocks_code(market_code, limit) 

393 if not res or res.rt_cd != ErrorCode.SUCCESS.value: 

394 self.logger.error(f"시가총액 상위 종목 조회 실패: {res}") 

395 return ResCommonResponse( 

396 rt_cd=ErrorCode.API_ERROR.value, 

397 msg1="시가총액 상위 종목 조회 실패", 

398 data=None 

399 ) 

400 # 성공 

401 self.logger.info(f"시가총액 상위 종목 조회 성공 (시장: {market_code}, 개수={len(res.data) if res.data else 0})") 

402 return ResCommonResponse( 

403 rt_cd=ErrorCode.SUCCESS.value, 

404 msg1="조회 성공", 

405 data=res.data, # 그대로 전달 (List[ResTopMarketCapApiItem]) 

406 ) 

407 except Exception as e: 

408 self.logger.exception("시가총액 상위 종목 조회 중 예외") 

409 return ResCommonResponse( 

410 rt_cd=ErrorCode.UNKNOWN_ERROR.value, 

411 msg1=f"예외 발생: {e}", 

412 data=None 

413 ) 

414 

415 async def handle_current_upper_limit_stocks(self): 

416 """ 

417 전체 종목 중 현재 상한가에 도달한 종목을 조회하여 출력합니다. 

418 """ 

419 self.logger.info("Service - 현재 상한가 종목 조회 요청 ") 

420 

421 try: 

422 rise_res: ResCommonResponse = await self.market_data_service.get_top_rise_fall_stocks(rise=True) 

423 if rise_res.rt_cd != ErrorCode.SUCCESS.value: 

424 self.logger.warning("상승률 조회 실패.") 

425 return rise_res 

426 

427 upper_limit_stocks: ResCommonResponse = await self.market_data_service.get_current_upper_limit_stocks( 

428 rise_res.data) 

429 

430 if upper_limit_stocks.rt_cd != ErrorCode.SUCCESS.value: 

431 self.logger.info("현재 상한가 종목 없음.") 

432 

433 return upper_limit_stocks 

434 

435 except Exception as e: 

436 self.logger.error(f"현재 상한가 종목 조회 중 오류 발생: {e}", exc_info=True) 

437 raise 

438 

439 async def handle_get_asking_price(self, stock_code: str, depth: int = 10): 

440 """종목의 실시간 호가 정보 조회 및 출력.""" 

441 self.logger.info(f"Service - {stock_code} 호가 정보 조회 요청") 

442 response = await self.market_data_service.get_asking_price(stock_code) 

443 

444 if not response or response.rt_cd != ErrorCode.SUCCESS.value: 

445 msg = response.msg1 if response else "응답 없음" 

446 self.logger.error(f"{stock_code} 호가 정보 조회 실패: {msg}") 

447 return ResCommonResponse( 

448 rt_cd=(response.rt_cd if response else ErrorCode.API_ERROR.value), 

449 msg1=msg, 

450 data={"code": stock_code}, 

451 ) 

452 

453 raw1 = (response.data or {}).get("output1") or {} 

454 # 일부 구현에서 list로 줄 수도 있으니 방어 

455 if isinstance(raw1, list): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 raw1 = raw1[0] if raw1 else {} 

457 

458 rows = [] 

459 for i in range(1, depth + 1): 

460 rows.append({ 

461 "level": i, 

462 "ask_price": raw1.get(f"askp{i}", "N/A"), 

463 "ask_rem": raw1.get(f"askp_rsqn{i}", "N/A"), 

464 "bid_price": raw1.get(f"bidp{i}", "N/A"), 

465 "bid_rem": raw1.get(f"bidp_rsqn{i}", "N/A"), 

466 }) 

467 

468 view_model = { 

469 "code": stock_code, 

470 "rows": rows, 

471 # 필요시 추가 필드들(예: 현재가/참고값 등) 

472 "meta": { 

473 "prpr": raw1.get("stck_prpr"), 

474 "time": raw1.get("aplm_hour") or raw1.get("stck_cntg_hour"), 

475 } 

476 } 

477 

478 self.logger.info(f"{stock_code} 호가 정보 조회 성공") 

479 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="정상", data=view_model) 

480 

481 async def handle_get_time_concluded_prices(self, stock_code: str): 

482 """종목의 시간대별 체결가 정보 조회 및 출력.""" 

483 self.logger.info(f"Service - {stock_code} 시간대별 체결가 조회 요청") 

484 response = await self.market_data_service.get_time_concluded_prices(stock_code) 

485 

486 if not response or response.rt_cd != ErrorCode.SUCCESS.value: 

487 msg = response.msg1 if response else "응답 없음" 

488 self.logger.error(f"{stock_code} 시간대별 체결가 조회 실패: {msg}") 

489 return ResCommonResponse( 

490 rt_cd=(response.rt_cd if response else ErrorCode.API_ERROR.value), 

491 msg1=msg, 

492 data={"code": stock_code}, 

493 ) 

494 

495 raw = (response.data or {}).get("output") or [] 

496 if isinstance(raw, dict): 

497 raw = [raw] 

498 

499 rows = [] 

500 for item in raw: 

501 rows.append({ 

502 "time": item.get("stck_cntg_hour", "N/A"), 

503 "price": item.get("stck_prpr", "N/A"), 

504 "change": item.get("prdy_vrss", "N/A"), 

505 "volume": item.get("cntg_vol", "N/A"), 

506 }) 

507 

508 view_model = {"code": stock_code, "rows": rows} 

509 self.logger.info(f"{stock_code} 시간대별 체결가 조회 성공") 

510 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="정상", data=view_model) 

511 

512 async def handle_get_top_stocks(self, category: str) -> ResCommonResponse: 

513 """상위 종목 조회 및 출력 (상승률, 하락률, 거래량, 외국인순매수 등).""" 

514 t_start = self.pm.start_timer() 

515 # (title, func, param, is_sync) — is_sync=True이면 동기 함수 호출 

516 # 장마감 후 캐시된 기본 랭킹이 있으면 우선 사용 (trading_value 제외 — ranking_task에서 처리) 

517 basic_categories = ("rise", "fall", "volume") 

518 if category in basic_categories and self.ranking_task: 

519 cached = self.ranking_task.get_basic_ranking_cache(category) 

520 if cached is not None: 

521 self.logger.info(f"Handler - {category} 캐시 히트 (장마감 후 캐시)") 

522 return cached 

523 

524 category_map = { 

525 "rise": ("상승률", self.market_data_service.get_top_rise_fall_stocks, True, False), 

526 "fall": ("하락률", self.market_data_service.get_top_rise_fall_stocks, False, False), 

527 "volume": ("거래량", self.market_data_service.get_top_volume_stocks, None, False), 

528 "trading_value": ("거래대금", self.market_data_service.get_top_trading_value_stocks, None, False), 

529 } 

530 

531 # 랭킹 태스크 카테고리 (동기 함수) 

532 # 거래대금: 장마감 후에는 투자자 데이터(acml_tr_pbmn) 기반으로 전환 

533 if self.ranking_task: 

534 category_map["trading_value"] = ( 

535 "거래대금", self.ranking_task.get_trading_value_ranking, None, False 

536 ) 

537 category_map["foreign_buy"] = ( 

538 "외인 순매수", self.ranking_task.get_foreign_net_buy_ranking, None, False 

539 ) 

540 category_map["foreign_sell"] = ( 

541 "외인 순매도", self.ranking_task.get_foreign_net_sell_ranking, None, False 

542 ) 

543 category_map["inst_buy"] = ( 

544 "기관 순매수", self.ranking_task.get_inst_net_buy_ranking, None, False 

545 ) 

546 category_map["inst_sell"] = ( 

547 "기관 순매도", self.ranking_task.get_inst_net_sell_ranking, None, False 

548 ) 

549 category_map["prsn_buy"] = ( 

550 "개인 순매수", self.ranking_task.get_prsn_net_buy_ranking, None, False 

551 ) 

552 category_map["prsn_sell"] = ( 

553 "개인 순매도", self.ranking_task.get_prsn_net_sell_ranking, None, False 

554 ) 

555 category_map["program_buy"] = ( 

556 "프로그램 순매수", self.ranking_task.get_program_net_buy_ranking, None, False 

557 ) 

558 category_map["program_sell"] = ( 

559 "프로그램 순매도", self.ranking_task.get_program_net_sell_ranking, None, False 

560 ) 

561 

562 if category not in category_map: 

563 self.logger.error(f"지원하지 않는 카테고리: {category}") 

564 return ResCommonResponse( 

565 rt_cd=ErrorCode.INVALID_INPUT.value, 

566 msg1=f"지원하지 않는 카테고리: {category}", 

567 data=None, 

568 ) 

569 

570 title, service_func, param, is_sync = category_map[category] 

571 self.logger.info(f"Handler - {title} 상위 종목 조회 요청") 

572 

573 if is_sync: 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true

574 response = service_func(param) if param is not None else service_func() 

575 else: 

576 response = await (service_func(param) if param is not None else service_func()) 

577 

578 if response and response.rt_cd == ErrorCode.SUCCESS.value: 

579 self.logger.info(f"{title} 상위 종목 조회 성공") 

580 if self._notification_service: 

581 cnt = len(response.data) if response.data else 0 

582 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, f"{title} 랭킹 조회", 

583 f"{title} 상위 {cnt}개 종목 조회 완료", 

584 metadata={"category": category}) 

585 else: 

586 msg = response.msg1 if response else "응답 없음" 

587 self.logger.error(f"{title} 상위 종목 조회 실패: {msg}") 

588 if self._notification_service: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true

589 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.WARNING, f"{title} 랭킹 조회 실패", msg, 

590 metadata={"category": category}) 

591 

592 self.pm.log_timer(f"StockQueryService.handle_get_top_stocks({category})", t_start, threshold=0.5) 

593 return response 

594 

595 async def handle_get_etf_info(self, etf_code: str): 

596 """ 

597 ETF 정보를 TradingService에서 받아와 출력용 뷰모델로 가공하여 반환만 한다. 

598 출력은 cli_view에 위임한다. 

599 """ 

600 self.logger.info(f"Service - {etf_code} ETF 정보 조회 요청") 

601 

602 response = await self.market_data_service.get_etf_info(etf_code) 

603 

604 # 실패면 그대로 전달 (cli_view에서 실패 출력) 

605 if not response or response.rt_cd != ErrorCode.SUCCESS.value: 

606 msg = response.msg1 if response else "응답 없음" 

607 self.logger.error(f"{etf_code} ETF 정보 조회 실패: {msg}") 

608 # data에는 최소한 식별 정보만 넣어두면 뷰에서 에러 메시지에 활용 가능 

609 return ResCommonResponse( 

610 rt_cd=response.rt_cd if response else ErrorCode.API_ERROR.value, 

611 msg1=msg, 

612 data={"code": etf_code} 

613 ) 

614 

615 # 성공: 출력용 뷰모델로 가공 

616 raw = response.data.get("output", {}) if response.data else {} 

617 view_model = { 

618 "code": etf_code, 

619 "name": raw.get("etf_rprs_bstp_kor_isnm", "N/A"), 

620 "price": raw.get("stck_prpr", "N/A"), 

621 "nav": raw.get("nav", "N/A"), 

622 "market_cap": raw.get("stck_llam", "N/A"), 

623 } 

624 

625 self.logger.info(f"{etf_code} ETF 정보 조회 성공") 

626 return ResCommonResponse( 

627 rt_cd=ErrorCode.SUCCESS.value, 

628 msg1="정상", 

629 data=view_model 

630 ) 

631 

632 

633 async def get_ohlcv(self, stock_code: str, period: str = "D", caller: str = "unknown", exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

634 """ 

635 OHLCV 데이터를 반환합니다. 

636 """ 

637 self.logger.info(f"ServiceHandler - {stock_code} OHLCV 데이터 요청 period={period}") 

638 return await self.market_data_service.get_ohlcv(stock_code, period=period, caller=caller, exchange=exchange) 

639 

640 async def get_ohlcv_range(self, stock_code: str, period: str = "D", start_date: str = None, end_date: str = None, exchange: Exchange = Exchange.KRX) -> ResCommonResponse: 

641 """ 

642 특정 기간의 OHLCV 데이터를 조회합니다. 

643 """ 

644 return await self.market_data_service.get_ohlcv_range(stock_code, period, start_date, end_date, exchange=exchange) 

645 

646 async def get_ohlcv_with_indicators(self, stock_code: str, period: str = "D", caller: str = "unknown") -> ResCommonResponse: 

647 """ 

648 OHLCV 데이터를 1회 조회한 후, 해당 데이터로 MA5/10/20/60/120 + 볼린저밴드 + RS를 한번에 계산하여 반환. 

649 차트 렌더링 시 7개 API 호출을 1개로 통합하기 위한 메서드. 

650 """ 

651 t_start = self.pm.start_timer() 

652 self.logger.info(f"ServiceHandler - {stock_code} OHLCV+지표 통합 조회 period={period}") 

653 try: 

654 # 1. OHLCV 1회 조회 

655 t0 = self.pm.start_timer() 

656 resp = await self.market_data_service.get_ohlcv(stock_code, period=period, caller=caller) 

657 self.pm.log_timer(f"{stock_code} OHLCV 조회", t0) 

658 

659 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

660 return resp or ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="OHLCV 조회 실패", data=None) 

661 

662 if not resp.data: 

663 return ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="OHLCV 조회 실패", data=None) 

664 

665 ohlcv_data = resp.data 

666 

667 # 2. 지표 계산 (OHLCV 데이터를 직접 전달하여 API 재호출 방지) 

668 indicator_service = self.indicator_service 

669 t2 = self.pm.start_timer() 

670 

671 # [최적화] 통합 지표 계산 메서드 호출 (DataFrame 변환 1회) 

672 indicators_resp = await indicator_service.get_chart_indicators(stock_code, ohlcv_data) 

673 

674 self.pm.log_timer(f"{stock_code} 지표 통합 계산", t2) 

675 

676 if indicators_resp.rt_cd != ErrorCode.SUCCESS.value: 

677 self.logger.error(f"지표 계산 실패: {indicators_resp.msg1}") 

678 indicators_data = {"ma5": [], "ma10": [], "ma20": [], "ma60": [], "ma120": [], "bb": [], "rs": []} 

679 else: 

680 indicators_data = indicators_resp.data 

681 

682 result = { 

683 "ohlcv": ohlcv_data, 

684 "indicators": indicators_data 

685 } 

686 self.pm.log_timer(f"{stock_code} get_ohlcv_with_indicators 전체", t_start, threshold=0.5) 

687 

688 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1=f"OHLCV+지표 {len(ohlcv_data)}건", data=result) 

689 

690 except Exception as e: 

691 self.logger.error(f"{stock_code} OHLCV+지표 통합 조회 중 오류: {e}", exc_info=True) 

692 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None) 

693 

694 async def get_recent_daily_ohlcv(self, stock_code: str, limit: int = DynamicConfig.OHLCV.DAILY_ITEMCHARTPRICE_MAX_RANGE, end_date: Optional[str] = None) -> ResCommonResponse: 

695 """ 

696 타겟 종목의 최근 일봉을 limit개 반환. 

697 TradingService.get_recent_daily_ohlcv를 래핑하여 ResCommonResponse 형태로 통일. 

698 """ 

699 try: 

700 rows = await self.market_data_service.get_recent_daily_ohlcv(stock_code, limit=limit, end_date=end_date) 

701 if not rows: 

702 return ResCommonResponse(rt_cd=ErrorCode.EMPTY_VALUES.value, msg1="데이터 없음", data=[]) 

703 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=rows) 

704 except Exception as e: 

705 self.logger.error(f"[OHLCV] {stock_code} 조회 실패: {e}", exc_info=True) 

706 return ResCommonResponse(rt_cd=ErrorCode.EMPTY_VALUES.value, msg1=str(e), data=[]) 

707 

708 async def get_investor_trade_daily_multi(self, stock_code: str, date: str = None, days: int = 3) -> ResCommonResponse: 

709 """종목별 투자자 매매동향 다중일 조회 (실전 전용). 

710 

711 Returns: 

712 data: list[dict] — 최대 days개, 각 항목 {frgn_ntby_tr_pbmn, orgn_ntby_tr_pbmn, acml_tr_pbmn, stck_bsop_date, ...} 

713 단위: frgn/orgn_ntby_tr_pbmn 은 백만원, acml_tr_pbmn 은 원. 

714 """ 

715 if not self.broker: 

716 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1="broker 미설정", data=[]) 

717 return await self.broker.get_investor_trade_by_stock_daily_multi(stock_code, date, days) 

718 

719 async def get_intraday_minutes_today(self, stock_code: str, *, input_hour_1: str) -> ResCommonResponse: 

720 """ 

721 당일 분봉 조회. MarketDataService 위임. 

722 """ 

723 return await self.market_data_service.get_intraday_minutes_today( 

724 stock_code=stock_code, input_hour_1=input_hour_1 

725 ) 

726 

727 async def get_intraday_minutes_by_date( 

728 self, stock_code: str, *, input_date_1: str, input_hour_1: str = "" 

729 ) -> ResCommonResponse: 

730 """ 

731 일별(특정 일자) 분봉 조회. MarketDataService 위임. 

732 """ 

733 return await self.market_data_service.get_intraday_minutes_by_date( 

734 stock_code=stock_code, input_date_1=input_date_1, input_hour_1=input_hour_1 

735 ) 

736 

737 async def get_day_intraday_minutes_list( 

738 self, 

739 stock_code: str, 

740 *, 

741 date_ymd: Optional[str] = None, # None이면 '오늘'(KST) 조회 

742 session: Literal["REGULAR", "EXTENDED"] = "REGULAR", # REGULAR=09:00~15:40, EXTENDED=08:00~20:00 

743 start_hhmmss: Optional[str] = None, 

744 end_hhmmss: Optional[str] = None, 

745 max_batches: int = 200 

746 ) -> List[Dict]: 

747 """ 

748 하루치 분봉(분봉 행 dict)의 '정규화된 리스트'를 반환한다. (출력은 호출부/cli_view에서) 

749 - date_ymd=None: 오늘(KST) → get_intraday_minutes_today(배치당 30개; 모의/실전 모두 가능) 

750 - date_ymd=YYYYMMDD: 지정일 → get_intraday_minutes_by_date(배치당 100개; 실전 전용) 

751 - 시간 범위: session 프리셋으로 선택하거나 start/end를 직접 지정 가능 

752 - 반환: 시간 오름차순(HHMMSS) 정렬된 리스트. 각 행은 최소 다음 키를 포함: 

753 'stck_bsop_date'(YYYYMMDD), 'stck_cntg_hour'(HHMMSS), 나머지는 원본 필드 유지 

754 """ 

755 t_start = self.pm.start_timer() 

756 # 세션 범위 결정 

757 if not start_hhmmss or not end_hhmmss: 

758 if session.upper() == "EXTENDED": 

759 start_hhmmss = start_hhmmss or "080000" 

760 end_hhmmss = end_hhmmss or "200000" 

761 else: 

762 start_hhmmss = start_hhmmss or "090000" 

763 end_hhmmss = end_hhmmss or "153000" 

764 

765 start_hhmmss = self.market_clock.to_hhmmss(start_hhmmss) 

766 end_hhmmss = self.market_clock.to_hhmmss(end_hhmmss) 

767 

768 # 조회 날짜 

769 if date_ymd: 

770 ymd = date_ymd 

771 else: 

772 now_kst = self.market_clock.get_current_kst_time() 

773 ymd = now_kst.strftime("%Y%m%d") 

774 

775 # 배치 호출 함수 선택 

776 async def _fetch_batch(cursor_hhmmss: str): 

777 cursor_hhmmss = self.market_clock.to_hhmmss(cursor_hhmmss) 

778 if self.market_data_service._env.is_paper_trading: 

779 # 오늘(모의/실전; 배치당 30개) 

780 return await self.get_intraday_minutes_today( 

781 stock_code, input_hour_1=cursor_hhmmss 

782 ) 

783 else: 

784 # 지정일(실전 전용; 배치당 100개) 

785 return await self.get_intraday_minutes_by_date( 

786 stock_code, input_date_1=ymd, input_hour_1=cursor_hhmmss 

787 ) 

788 

789 def _extract_rows(resp_obj) -> list[dict]: 

790 """resp.data가 list 또는 dict(output2/rows/data 키)인 모든 경우를 수용.""" 

791 data = getattr(resp_obj, "data", None) 

792 if isinstance(data, list): 

793 return data 

794 if isinstance(data, dict): 794 ↛ 797line 794 didn't jump to line 797 because the condition on line 794 was always true

795 rows = data.get("output2") or data.get("rows") or data.get("data") or [] 

796 return rows if isinstance(rows, list) else [] 

797 return [] 

798 

799 # 커서: end부터 과거로 내려가며 수집 

800 cursor = end_hhmmss 

801 seen: set[tuple[str, str]] = set() # (date, hhmmss) 

802 collected: List[Dict] = [] 

803 batches = 0 

804 

805 while batches < max_batches: 805 ↛ 855line 805 didn't jump to line 855 because the condition on line 805 was always true

806 batches += 1 

807 resp = await _fetch_batch(cursor) 

808 if not resp or str(getattr(resp, "rt_cd", "1")) != "0": 

809 break 

810 

811 rows = _extract_rows(resp) 

812 if not rows: 

813 break 

814 

815 min_time_in_batch = None 

816 added = 0 

817 

818 for row in rows: 

819 d = str(row.get("stck_bsop_date") or ymd) 

820 t = self.market_clock.to_hhmmss(row.get("stck_cntg_hour") or "") 

821 

822 if (min_time_in_batch is None) or (t < min_time_in_batch): 822 ↛ 826line 822 didn't jump to line 826 because the condition on line 822 was always true

823 min_time_in_batch = t 

824 

825 # 범위 필터 

826 if t < start_hhmmss or t > end_hhmmss: 

827 continue 

828 key = (d, t) 

829 if key in seen: 

830 continue 

831 seen.add(key) 

832 

833 norm = dict(row) 

834 norm["stck_bsop_date"] = d 

835 norm["stck_cntg_hour"] = t 

836 collected.append(norm) 

837 added += 1 

838 

839 if added == 0: 

840 if min_time_in_batch: 840 ↛ 845line 840 didn't jump to line 845 because the condition on line 840 was always true

841 cursor = self.market_clock.dec_minute(min_time_in_batch, 1) 

842 if cursor < start_hhmmss: 

843 break 

844 continue 

845 break 

846 

847 if min_time_in_batch: 847 ↛ 852line 847 didn't jump to line 852 because the condition on line 847 was always true

848 cursor = self.market_clock.dec_minute(min_time_in_batch, 1) 

849 if cursor < start_hhmmss: 

850 break 

851 else: 

852 break 

853 

854 # 최종 정렬(과거→현재) 

855 collected.sort(key=lambda r: r.get("stck_cntg_hour", "")) 

856 

857 self.pm.log_timer(f"StockQueryService.get_day_intraday_minutes_list({stock_code}, {batches}배치)", t_start, threshold=1.0) 

858 return collected