Coverage for services / stock_query_service.py: 95%
387 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# app/stock_query_service.py
2from __future__ import annotations
3from common.types import ErrorCode, ResCommonResponse, ResTopMarketCapApiItem, ResBasicStockInfo, \
4 ResStockFullInfoApiOutput, Exchange
5from config.DynamicConfig import DynamicConfig
6from typing import List, Dict, Optional, Literal
7from core.performance_profiler import PerformanceProfiler
8from services.notification_service import NotificationService, NotificationCategory, NotificationLevel
9from services.market_data_service import MarketDataService
12class StockQueryService:
13 """
14 주식 현재가, 계좌 잔고, 시가총액 조회 등 데이터 조회 관련 핸들러를 관리하는 클래스입니다.
15 MarketDataService, BrokerAPIWrapper 등 인스턴스를 주입받아 사용합니다.
16 """
18 def __init__(self, market_data_service: MarketDataService, logger, market_clock, indicator_service=None,
19 ranking_task=None, performance_profiler: Optional[PerformanceProfiler] = None,
20 notification_service: Optional[NotificationService] = None,
21 broker_api_wrapper=None):
22 self.broker = broker_api_wrapper
23 self.market_data_service = market_data_service
24 self.logger = logger
25 self.market_clock = market_clock
26 self.indicator_service = indicator_service
27 self.ranking_task = ranking_task
28 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
29 self._notification_service = notification_service
31 def _get_sign_from_code(self, sign_code):
32 """API 응답의 부호 코드(1,2,3,4,5)를 실제 부호 문자열로 변환합니다."""
33 if sign_code == '1' or sign_code == '2': # 1:상한, 2:상승
34 return "+"
35 elif sign_code == '4' or sign_code == '5': # 4:하한, 5:하락
36 return "-"
37 else: # 3:보합 (또는 기타)
38 return ""
40 async def get_current_price(self, stock_code: str, exchange: Exchange = Exchange.KRX, count_stats: bool = True, caller: str = "unknown") -> ResCommonResponse:
41 """현재가만 빠르게 조회 (MarketDataService 래퍼)."""
42 return await self.market_data_service.get_current_price(stock_code, exchange=exchange, count_stats=count_stats, caller=caller)
44 async def get_multi_price(self, stock_codes: list[str]) -> ResCommonResponse:
45 """복수종목 현재가 조회 (최대 30종목, MarketDataService 래퍼)."""
46 return await self.market_data_service.get_multi_price(stock_codes)
48 async def get_top_trading_value_stocks(self) -> ResCommonResponse:
49 """거래대금 상위 종목 조회 (MarketDataService 래퍼)."""
50 return await self.market_data_service.get_top_trading_value_stocks()
52 async def get_top_rise_fall_stocks(self, rise: bool = True) -> ResCommonResponse:
53 """상승/하락 상위 종목 조회 (MarketDataService 래퍼)."""
54 return await self.market_data_service.get_top_rise_fall_stocks(rise)
56 async def get_top_volume_stocks(self) -> ResCommonResponse:
57 """거래량 상위 종목 조회 (MarketDataService 래퍼)."""
58 return await self.market_data_service.get_top_volume_stocks()
60 async def get_financial_ratio(self, stock_code: str) -> ResCommonResponse:
61 """재무비율 조회 (MarketDataService 래퍼)."""
62 return await self.market_data_service.get_financial_ratio(stock_code)
64 async def get_stock_conclusion(self, stock_code: str) -> ResCommonResponse:
65 """체결 정보 조회 (MarketDataService 래퍼)."""
66 return await self.market_data_service.get_stock_conclusion(stock_code)
68 async def handle_get_current_stock_price(self, stock_code, caller: str = "unknown", exchange: Exchange = Exchange.KRX):
69 """주식 현재가 및 상세 정보 조회 요청 및 결과 출력."""
70 self.logger.info(f"Stock_Query_Service - {stock_code} 현재가 및 상세 정보 조회 요청")
71 resp: ResCommonResponse = await self.market_data_service.get_current_price(stock_code, exchange=exchange, caller=caller)
73 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
74 msg = resp.msg1 if resp else "응답 없음"
75 self.logger.error(f"{stock_code} 현재가 및 상세 정보 조회 실패: {msg}")
76 if self._notification_service:
77 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.WARNING, "현재가 조회 실패",
78 f"{stock_code} - {msg}",
79 metadata={"code": stock_code})
80 return ResCommonResponse(
81 rt_cd=(resp.rt_cd if resp else ErrorCode.API_ERROR.value),
82 msg1=msg,
83 data={"code": stock_code},
84 )
86 # --- output 추출 및 통일화(ResStockFullInfoApiOutput) ---
87 output = (resp.data or {}).get("output") if isinstance(resp.data, dict) else None
89 if not isinstance(output, ResStockFullInfoApiOutput):
90 self.logger.error(f"잘못된 응답 데이터 타입 또는 output 없음: {type(output)}")
91 return ResCommonResponse(
92 rt_cd=ErrorCode.PARSING_ERROR.value,
93 msg1=f"잘못된 응답 데이터 타입 또는 output 없음: {type(output)}",
94 data={"code": stock_code},
95 )
97 status_code_map = {
98 "51": "관리종목", "52": "투자위험", "53": "투자경고", "54": "투자주의",
99 "55": "신용가능", "57": "증거금 100%", "58": "거래정지", "59": "단기과열"
100 }
101 status_description = status_code_map.get(output.iscd_stat_cls_code, "정보 없음")
103 # 부호 처리 로직 추가
104 change_val = output.prdy_vrss
105 sign_code = output.prdy_vrss_sign
106 actual_sign = self._get_sign_from_code(sign_code)
108 display_change = change_val
109 try:
110 f = float(change_val)
111 if f != 0:
112 display_change = f"{abs(int(f))}"
113 else:
114 display_change = "0"
115 except (ValueError, TypeError):
116 pass
118 view = {
119 # 기본 정보
120 "code": stock_code,
121 "name": await self.market_data_service.get_name_by_code(stock_code),
122 "is_new_high": output.is_new_high, # 신고가 여부 추가
123 "is_new_low": output.is_new_low, # 신저가 여부 추가
124 "price": output.stck_prpr,
125 "change": output.prdy_vrss,
126 "change_absolute": display_change,
127 "rate": output.prdy_ctrt,
128 "sign": actual_sign,
129 "time": self.market_clock.get_current_kst_time().strftime("%H:%M:%S"),
130 "bstp_kor_isnm": output.bstp_kor_isnm,
131 "iscd_stat_cls_code_desc": f"{status_description} ({output.iscd_stat_cls_code})",
133 # 거래 정보
134 "acml_tr_pbmn": output.acml_tr_pbmn,
135 "acml_vol": output.acml_vol,
136 "prdy_vrss_vol_rate": output.prdy_vrss_vol_rate,
137 "frgn_ntby_qty": output.frgn_ntby_qty,
138 "pgtr_ntby_qty": output.pgtr_ntby_qty,
140 # 당일 가격 정보
141 "open": output.stck_oprc,
142 "high": output.stck_hgpr,
143 "low": output.stck_lwpr,
144 "prev_close": output.stck_sdpr, # 기준가
146 # 투자 지표
147 "per": output.per,
148 "pbr": output.pbr,
149 "eps": output.eps,
150 "bps": output.bps,
152 # 250일 정보
153 "d250_hgpr": output.d250_hgpr,
154 "d250_hgpr_date": output.d250_hgpr_date,
155 "d250_hgpr_vrss_prpr_rate": output.d250_hgpr_vrss_prpr_rate,
156 "d250_lwpr": output.d250_lwpr,
157 "d250_lwpr_date": output.d250_lwpr_date,
158 "d250_lwpr_vrss_prpr_rate": output.d250_lwpr_vrss_prpr_rate,
160 # 연중 정보
161 "dryy_hgpr": output.stck_dryy_hgpr,
162 "dryy_hgpr_vrss_prpr_rate": output.dryy_hgpr_vrss_prpr_rate,
163 "dryy_hgpr_date": output.dryy_hgpr_date,
164 "dryy_lwpr": output.stck_dryy_lwpr,
165 "dryy_lwpr_vrss_prpr_rate": output.dryy_lwpr_vrss_prpr_rate,
166 "dryy_lwpr_date": output.dryy_lwpr_date,
168 # 52주 정보
169 "w52_hgpr": output.w52_hgpr,
170 "w52_hgpr_vrss_prpr_ctrt": output.w52_hgpr_vrss_prpr_ctrt,
171 "w52_hgpr_date": output.w52_hgpr_date,
172 "w52_lwpr": output.w52_lwpr,
173 "w52_lwpr_vrss_prpr_ctrt": output.w52_lwpr_vrss_prpr_ctrt,
174 "w52_lwpr_date": output.w52_lwpr_date,
176 # 기타 상태
177 "crdt_able_yn": "가능" if output.crdt_able_yn == "Y" else "불가능",
178 "short_over_yn": "예" if output.short_over_yn == "Y" else "아니오",
179 "sltr_yn": "예" if output.sltr_yn == "Y" else "아니오",
180 "mang_issu_cls_code": "예" if output.mang_issu_cls_code and output.mang_issu_cls_code.strip() else "아니오",
181 }
182 self.logger.info(f"{stock_code} 현재가 및 상세 정보 조회 성공")
183 if self._notification_service:
184 name = view.get("name", stock_code)
185 sign_str = actual_sign if actual_sign == "+" else ""
186 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "현재가 조회",
187 f"{name}({stock_code}) {view['price']}원 ({sign_str}{view['rate']}%)",
188 metadata={"code": stock_code, "price": view["price"]})
189 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="정상", data=view)
191 async def handle_get_account_balance(self, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
192 """계좌 잔고 조회 요청 및 결과 출력."""
193 resp = await self.broker.get_account_balance(exchange=exchange)
194 if self._notification_service:
195 if resp and resp.rt_cd == ErrorCode.SUCCESS.value:
196 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, "잔고 조회 완료", "계좌 잔고 조회 성공")
197 else:
198 msg = resp.msg1 if resp else "응답 없음"
199 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.WARNING, "잔고 조회 실패", msg)
200 return resp
202 async def handle_get_top_market_cap_stocks_code(self, market_code: str = "0000", limit: int = 30) -> ResCommonResponse:
203 """
204 시가총액 상위 종목 중 상한가 도달 종목 조회 (출력 X).
205 data: List[dict(code,name,price,change_rate)]
206 """
207 self.logger.debug(f"상한가 스캔 요청 (시장={market_code}, limit={limit})")
209 # 모의투자 / 장시간 검증
210 if getattr(self.market_data_service._env, "is_paper_trading", False):
211 self.logger.warning("모의투자 환경에서는 상한가 조회 미지원")
212 return ResCommonResponse(
213 rt_cd=ErrorCode.API_ERROR.value,
214 msg1="모의투자 미지원 API입니다.",
215 data=None
216 )
218 try:
219 # 상위 종목 조회
220 top_res: ResCommonResponse = await self.market_data_service.get_top_market_cap_stocks_code(market_code, limit)
221 if not top_res or top_res.rt_cd != ErrorCode.SUCCESS.value:
222 self.logger.error(f"상위 종목 목록 조회 실패: {top_res}")
223 return ResCommonResponse(
224 rt_cd=ErrorCode.API_ERROR.value,
225 msg1="상위 종목 목록 조회 실패",
226 data=None
227 )
229 top_list: List[ResTopMarketCapApiItem] = top_res.data or []
230 if not top_list:
231 self.logger.debug("상위 종목 없음")
232 return ResCommonResponse(
233 rt_cd=ErrorCode.SUCCESS.value,
234 msg1="조회 성공 (종목 없음)",
235 data=[]
236 )
238 targets = top_list[:limit]
239 found: list[dict] = []
241 for item in targets:
242 # dataclass(ResTopMarketCapApiItem)와 dict 모두 지원
243 get = (lambda k: getattr(item, k, None)) if not isinstance(item, dict) else item.get
245 code = get("mksc_shrn_iscd") or get("iscd")
246 name = get("hts_kor_isnm")
247 prdy_vrss_sign = get("prdy_vrss_sign")
248 stck_prpr = get("stck_prpr")
249 prdy_ctrt = get("prdy_ctrt")
251 if not code:
252 self.logger.warning(f"유효하지 않은 종목코드: {item}")
253 continue
255 # 정책: prdy_vrss_sign == '1'이면 상한으로 간주
256 if prdy_vrss_sign == "1":
257 found.append({
258 "code": code,
259 "name": name,
260 "price": str(stck_prpr) if stck_prpr is not None else None,
261 "change_rate": str(prdy_ctrt) if prdy_ctrt is not None else None,
262 })
263 self.logger.debug(f"상한가 발견: {name}({code}) {stck_prpr}원 {prdy_ctrt}%")
264 else:
265 # 필요시 디버그 로그만
266 self.logger.debug(f"상한가 아님: {name}({code}) sign={prdy_vrss_sign}")
268 self.logger.info("시가총액 상위 종목 조회 성공")
269 return ResCommonResponse(
270 rt_cd=ErrorCode.SUCCESS.value,
271 msg1="조회 성공",
272 data=found # 빈 리스트 허용
273 )
275 except Exception as e:
276 self.logger.exception("상한가 조회 중 예외")
277 return ResCommonResponse(
278 rt_cd=ErrorCode.UNKNOWN_ERROR.value,
279 msg1=f"예외 발생: {e}",
280 data=None
281 )
283 async def get_stock_change_rate(self, stock_code: str) -> ResCommonResponse:
284 """
285 전일대비 등락률 조회. 출력 없음. 계산/포맷만 수행하여 ResCommonResponse로 반환.
286 data 예시:
287 {
288 "stock_code": "005930",
289 "current_price": "70400",
290 "change_value_display": "+500", # 부호/0 처리 적용된 표시값
291 "change_rate": "0.71" # API 그대로 문자열 유지
292 }
293 """
294 res: ResCommonResponse = await self.market_data_service.get_current_price(stock_code, caller="StockQueryService")
295 if not (res and res.rt_cd == ErrorCode.SUCCESS.value):
296 self.logger.error(f"{stock_code} 전일대비 등락률 조회 실패: {res}")
297 # 실패도 통일된 형태로 반환
298 return ResCommonResponse(rt_cd="1", msg1="조회 실패", data={"stock_code": stock_code})
300 output = res.data.get("output") or {}
301 current_price = output.stck_prpr
302 change_val_str = output.prdy_vrss
303 change_sign_code = output.prdy_vrss_sign
304 change_rate_str = output.prdy_ctrt
306 actual_sign = self._get_sign_from_code(change_sign_code)
308 display_change_val = change_val_str
309 try:
310 f = float(change_val_str)
311 if f != 0:
312 display_change_val = f"{actual_sign}{abs(int(f))}"
313 elif f == 0: 313 ↛ 319line 313 didn't jump to line 319 because the condition on line 313 was always true
314 display_change_val = "0"
315 except (ValueError, TypeError):
316 # 숫자 아님 → 그대로 노출
317 pass
319 data = {
320 "stock_code": stock_code,
321 "current_price": current_price,
322 "change_value_display": display_change_val,
323 "change_rate": change_rate_str,
324 }
325 self.logger.info(
326 f"{stock_code} 전일대비 등락률 조회 성공: 현재가={current_price}, "
327 f"전일대비={display_change_val}, 등락률={change_rate_str}%"
328 )
329 return ResCommonResponse(rt_cd="0", msg1="정상", data=data)
331 async def get_open_vs_current(self, stock_code: str) -> ResCommonResponse:
332 """
333 시가 대비 등락률/금액 계산 후 반환. 출력 없음.
334 data 예시:
335 {
336 "stock_code": "005930",
337 "current_price": "70400",
338 "open_price": "70000",
339 "vs_open_value_display": "+400", # 금액 부호/0 처리
340 "vs_open_rate_display": "+0.57%" # 퍼센트 부호/0 처리
341 }
342 """
343 res: ResCommonResponse = await self.market_data_service.get_current_price(stock_code, caller="StockQueryService")
344 if not (res and res.rt_cd == ErrorCode.SUCCESS.value):
345 self.logger.error(f"{stock_code} 시가대비 조회 실패: {res}")
346 return ResCommonResponse(rt_cd="1", msg1="조회 실패", data={"stock_code": stock_code})
348 output = res.data.get("output") or {}
349 cur_str = output.stck_prpr
350 open_str = output.stck_oprc
352 try:
353 cur = float(cur_str) if cur_str not in (None, "N/A") else None
354 opn = float(open_str) if open_str not in (None, "N/A") else None
355 except (ValueError, TypeError):
356 self.logger.warning(
357 f"{stock_code} 시가대비 조회 실패: 가격 파싱 오류 (현재가={cur_str}, 시가={open_str})"
358 )
359 return ResCommonResponse(rt_cd="1", msg1="가격 파싱 오류", data={"stock_code": stock_code})
361 vs_val_disp = "N/A"
362 vs_rate_disp = "N/A"
364 if cur is not None and opn is not None:
365 diff = cur - opn
366 vs_val_disp = "0" if diff == 0 else f"{diff:+.0f}"
367 if opn != 0:
368 vs_rate_disp = f"{(diff / opn) * 100:+.2f}%"
369 else:
370 vs_rate_disp = "N/A"
372 data = {
373 "stock_code": stock_code,
374 "current_price": cur_str,
375 "open_price": open_str,
376 "vs_open_value_display": vs_val_disp,
377 "vs_open_rate_display": vs_rate_disp,
378 }
379 self.logger.info(
380 f"{stock_code} 시가대비 조회 성공: 현재가={cur_str}, 시가={open_str}, "
381 f"시가대비={vs_val_disp} ({vs_rate_disp})"
382 )
383 return ResCommonResponse(rt_cd="0", msg1="정상", data=data)
385 async def handle_upper_limit_stocks(self, market_code: str = "0000", limit: int = 500):
386 """
387 시가총액 상위 종목 조회 (출력 X). TradingService 결과를 표준 스키마로 반환.
388 data: List[ResTopMarketCapApiItem]
389 """
391 try:
392 res: ResCommonResponse = await self.market_data_service.get_top_market_cap_stocks_code(market_code, limit)
393 if not res or res.rt_cd != ErrorCode.SUCCESS.value:
394 self.logger.error(f"시가총액 상위 종목 조회 실패: {res}")
395 return ResCommonResponse(
396 rt_cd=ErrorCode.API_ERROR.value,
397 msg1="시가총액 상위 종목 조회 실패",
398 data=None
399 )
400 # 성공
401 self.logger.info(f"시가총액 상위 종목 조회 성공 (시장: {market_code}, 개수={len(res.data) if res.data else 0})")
402 return ResCommonResponse(
403 rt_cd=ErrorCode.SUCCESS.value,
404 msg1="조회 성공",
405 data=res.data, # 그대로 전달 (List[ResTopMarketCapApiItem])
406 )
407 except Exception as e:
408 self.logger.exception("시가총액 상위 종목 조회 중 예외")
409 return ResCommonResponse(
410 rt_cd=ErrorCode.UNKNOWN_ERROR.value,
411 msg1=f"예외 발생: {e}",
412 data=None
413 )
415 async def handle_current_upper_limit_stocks(self):
416 """
417 전체 종목 중 현재 상한가에 도달한 종목을 조회하여 출력합니다.
418 """
419 self.logger.info("Service - 현재 상한가 종목 조회 요청 ")
421 try:
422 rise_res: ResCommonResponse = await self.market_data_service.get_top_rise_fall_stocks(rise=True)
423 if rise_res.rt_cd != ErrorCode.SUCCESS.value:
424 self.logger.warning("상승률 조회 실패.")
425 return rise_res
427 upper_limit_stocks: ResCommonResponse = await self.market_data_service.get_current_upper_limit_stocks(
428 rise_res.data)
430 if upper_limit_stocks.rt_cd != ErrorCode.SUCCESS.value:
431 self.logger.info("현재 상한가 종목 없음.")
433 return upper_limit_stocks
435 except Exception as e:
436 self.logger.error(f"현재 상한가 종목 조회 중 오류 발생: {e}", exc_info=True)
437 raise
439 async def handle_get_asking_price(self, stock_code: str, depth: int = 10):
440 """종목의 실시간 호가 정보 조회 및 출력."""
441 self.logger.info(f"Service - {stock_code} 호가 정보 조회 요청")
442 response = await self.market_data_service.get_asking_price(stock_code)
444 if not response or response.rt_cd != ErrorCode.SUCCESS.value:
445 msg = response.msg1 if response else "응답 없음"
446 self.logger.error(f"{stock_code} 호가 정보 조회 실패: {msg}")
447 return ResCommonResponse(
448 rt_cd=(response.rt_cd if response else ErrorCode.API_ERROR.value),
449 msg1=msg,
450 data={"code": stock_code},
451 )
453 raw1 = (response.data or {}).get("output1") or {}
454 # 일부 구현에서 list로 줄 수도 있으니 방어
455 if isinstance(raw1, list): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true
456 raw1 = raw1[0] if raw1 else {}
458 rows = []
459 for i in range(1, depth + 1):
460 rows.append({
461 "level": i,
462 "ask_price": raw1.get(f"askp{i}", "N/A"),
463 "ask_rem": raw1.get(f"askp_rsqn{i}", "N/A"),
464 "bid_price": raw1.get(f"bidp{i}", "N/A"),
465 "bid_rem": raw1.get(f"bidp_rsqn{i}", "N/A"),
466 })
468 view_model = {
469 "code": stock_code,
470 "rows": rows,
471 # 필요시 추가 필드들(예: 현재가/참고값 등)
472 "meta": {
473 "prpr": raw1.get("stck_prpr"),
474 "time": raw1.get("aplm_hour") or raw1.get("stck_cntg_hour"),
475 }
476 }
478 self.logger.info(f"{stock_code} 호가 정보 조회 성공")
479 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="정상", data=view_model)
481 async def handle_get_time_concluded_prices(self, stock_code: str):
482 """종목의 시간대별 체결가 정보 조회 및 출력."""
483 self.logger.info(f"Service - {stock_code} 시간대별 체결가 조회 요청")
484 response = await self.market_data_service.get_time_concluded_prices(stock_code)
486 if not response or response.rt_cd != ErrorCode.SUCCESS.value:
487 msg = response.msg1 if response else "응답 없음"
488 self.logger.error(f"{stock_code} 시간대별 체결가 조회 실패: {msg}")
489 return ResCommonResponse(
490 rt_cd=(response.rt_cd if response else ErrorCode.API_ERROR.value),
491 msg1=msg,
492 data={"code": stock_code},
493 )
495 raw = (response.data or {}).get("output") or []
496 if isinstance(raw, dict):
497 raw = [raw]
499 rows = []
500 for item in raw:
501 rows.append({
502 "time": item.get("stck_cntg_hour", "N/A"),
503 "price": item.get("stck_prpr", "N/A"),
504 "change": item.get("prdy_vrss", "N/A"),
505 "volume": item.get("cntg_vol", "N/A"),
506 })
508 view_model = {"code": stock_code, "rows": rows}
509 self.logger.info(f"{stock_code} 시간대별 체결가 조회 성공")
510 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="정상", data=view_model)
512 async def handle_get_top_stocks(self, category: str) -> ResCommonResponse:
513 """상위 종목 조회 및 출력 (상승률, 하락률, 거래량, 외국인순매수 등)."""
514 t_start = self.pm.start_timer()
515 # (title, func, param, is_sync) — is_sync=True이면 동기 함수 호출
516 # 장마감 후 캐시된 기본 랭킹이 있으면 우선 사용 (trading_value 제외 — ranking_task에서 처리)
517 basic_categories = ("rise", "fall", "volume")
518 if category in basic_categories and self.ranking_task:
519 cached = self.ranking_task.get_basic_ranking_cache(category)
520 if cached is not None:
521 self.logger.info(f"Handler - {category} 캐시 히트 (장마감 후 캐시)")
522 return cached
524 category_map = {
525 "rise": ("상승률", self.market_data_service.get_top_rise_fall_stocks, True, False),
526 "fall": ("하락률", self.market_data_service.get_top_rise_fall_stocks, False, False),
527 "volume": ("거래량", self.market_data_service.get_top_volume_stocks, None, False),
528 "trading_value": ("거래대금", self.market_data_service.get_top_trading_value_stocks, None, False),
529 }
531 # 랭킹 태스크 카테고리 (동기 함수)
532 # 거래대금: 장마감 후에는 투자자 데이터(acml_tr_pbmn) 기반으로 전환
533 if self.ranking_task:
534 category_map["trading_value"] = (
535 "거래대금", self.ranking_task.get_trading_value_ranking, None, False
536 )
537 category_map["foreign_buy"] = (
538 "외인 순매수", self.ranking_task.get_foreign_net_buy_ranking, None, False
539 )
540 category_map["foreign_sell"] = (
541 "외인 순매도", self.ranking_task.get_foreign_net_sell_ranking, None, False
542 )
543 category_map["inst_buy"] = (
544 "기관 순매수", self.ranking_task.get_inst_net_buy_ranking, None, False
545 )
546 category_map["inst_sell"] = (
547 "기관 순매도", self.ranking_task.get_inst_net_sell_ranking, None, False
548 )
549 category_map["prsn_buy"] = (
550 "개인 순매수", self.ranking_task.get_prsn_net_buy_ranking, None, False
551 )
552 category_map["prsn_sell"] = (
553 "개인 순매도", self.ranking_task.get_prsn_net_sell_ranking, None, False
554 )
555 category_map["program_buy"] = (
556 "프로그램 순매수", self.ranking_task.get_program_net_buy_ranking, None, False
557 )
558 category_map["program_sell"] = (
559 "프로그램 순매도", self.ranking_task.get_program_net_sell_ranking, None, False
560 )
562 if category not in category_map:
563 self.logger.error(f"지원하지 않는 카테고리: {category}")
564 return ResCommonResponse(
565 rt_cd=ErrorCode.INVALID_INPUT.value,
566 msg1=f"지원하지 않는 카테고리: {category}",
567 data=None,
568 )
570 title, service_func, param, is_sync = category_map[category]
571 self.logger.info(f"Handler - {title} 상위 종목 조회 요청")
573 if is_sync: 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true
574 response = service_func(param) if param is not None else service_func()
575 else:
576 response = await (service_func(param) if param is not None else service_func())
578 if response and response.rt_cd == ErrorCode.SUCCESS.value:
579 self.logger.info(f"{title} 상위 종목 조회 성공")
580 if self._notification_service:
581 cnt = len(response.data) if response.data else 0
582 await self._notification_service.emit(NotificationCategory.API, NotificationLevel.INFO, f"{title} 랭킹 조회",
583 f"{title} 상위 {cnt}개 종목 조회 완료",
584 metadata={"category": category})
585 else:
586 msg = response.msg1 if response else "응답 없음"
587 self.logger.error(f"{title} 상위 종목 조회 실패: {msg}")
588 if self._notification_service: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true
589 await self._notification_service.emit(NotificationCategory.SYSTEM, NotificationLevel.WARNING, f"{title} 랭킹 조회 실패", msg,
590 metadata={"category": category})
592 self.pm.log_timer(f"StockQueryService.handle_get_top_stocks({category})", t_start, threshold=0.5)
593 return response
595 async def handle_get_etf_info(self, etf_code: str):
596 """
597 ETF 정보를 TradingService에서 받아와 출력용 뷰모델로 가공하여 반환만 한다.
598 출력은 cli_view에 위임한다.
599 """
600 self.logger.info(f"Service - {etf_code} ETF 정보 조회 요청")
602 response = await self.market_data_service.get_etf_info(etf_code)
604 # 실패면 그대로 전달 (cli_view에서 실패 출력)
605 if not response or response.rt_cd != ErrorCode.SUCCESS.value:
606 msg = response.msg1 if response else "응답 없음"
607 self.logger.error(f"{etf_code} ETF 정보 조회 실패: {msg}")
608 # data에는 최소한 식별 정보만 넣어두면 뷰에서 에러 메시지에 활용 가능
609 return ResCommonResponse(
610 rt_cd=response.rt_cd if response else ErrorCode.API_ERROR.value,
611 msg1=msg,
612 data={"code": etf_code}
613 )
615 # 성공: 출력용 뷰모델로 가공
616 raw = response.data.get("output", {}) if response.data else {}
617 view_model = {
618 "code": etf_code,
619 "name": raw.get("etf_rprs_bstp_kor_isnm", "N/A"),
620 "price": raw.get("stck_prpr", "N/A"),
621 "nav": raw.get("nav", "N/A"),
622 "market_cap": raw.get("stck_llam", "N/A"),
623 }
625 self.logger.info(f"{etf_code} ETF 정보 조회 성공")
626 return ResCommonResponse(
627 rt_cd=ErrorCode.SUCCESS.value,
628 msg1="정상",
629 data=view_model
630 )
633 async def get_ohlcv(self, stock_code: str, period: str = "D", caller: str = "unknown", exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
634 """
635 OHLCV 데이터를 반환합니다.
636 """
637 self.logger.info(f"ServiceHandler - {stock_code} OHLCV 데이터 요청 period={period}")
638 return await self.market_data_service.get_ohlcv(stock_code, period=period, caller=caller, exchange=exchange)
640 async def get_ohlcv_range(self, stock_code: str, period: str = "D", start_date: str = None, end_date: str = None, exchange: Exchange = Exchange.KRX) -> ResCommonResponse:
641 """
642 특정 기간의 OHLCV 데이터를 조회합니다.
643 """
644 return await self.market_data_service.get_ohlcv_range(stock_code, period, start_date, end_date, exchange=exchange)
646 async def get_ohlcv_with_indicators(self, stock_code: str, period: str = "D", caller: str = "unknown") -> ResCommonResponse:
647 """
648 OHLCV 데이터를 1회 조회한 후, 해당 데이터로 MA5/10/20/60/120 + 볼린저밴드 + RS를 한번에 계산하여 반환.
649 차트 렌더링 시 7개 API 호출을 1개로 통합하기 위한 메서드.
650 """
651 t_start = self.pm.start_timer()
652 self.logger.info(f"ServiceHandler - {stock_code} OHLCV+지표 통합 조회 period={period}")
653 try:
654 # 1. OHLCV 1회 조회
655 t0 = self.pm.start_timer()
656 resp = await self.market_data_service.get_ohlcv(stock_code, period=period, caller=caller)
657 self.pm.log_timer(f"{stock_code} OHLCV 조회", t0)
659 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
660 return resp or ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="OHLCV 조회 실패", data=None)
662 if not resp.data:
663 return ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="OHLCV 조회 실패", data=None)
665 ohlcv_data = resp.data
667 # 2. 지표 계산 (OHLCV 데이터를 직접 전달하여 API 재호출 방지)
668 indicator_service = self.indicator_service
669 t2 = self.pm.start_timer()
671 # [최적화] 통합 지표 계산 메서드 호출 (DataFrame 변환 1회)
672 indicators_resp = await indicator_service.get_chart_indicators(stock_code, ohlcv_data)
674 self.pm.log_timer(f"{stock_code} 지표 통합 계산", t2)
676 if indicators_resp.rt_cd != ErrorCode.SUCCESS.value:
677 self.logger.error(f"지표 계산 실패: {indicators_resp.msg1}")
678 indicators_data = {"ma5": [], "ma10": [], "ma20": [], "ma60": [], "ma120": [], "bb": [], "rs": []}
679 else:
680 indicators_data = indicators_resp.data
682 result = {
683 "ohlcv": ohlcv_data,
684 "indicators": indicators_data
685 }
686 self.pm.log_timer(f"{stock_code} get_ohlcv_with_indicators 전체", t_start, threshold=0.5)
688 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1=f"OHLCV+지표 {len(ohlcv_data)}건", data=result)
690 except Exception as e:
691 self.logger.error(f"{stock_code} OHLCV+지표 통합 조회 중 오류: {e}", exc_info=True)
692 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None)
694 async def get_recent_daily_ohlcv(self, stock_code: str, limit: int = DynamicConfig.OHLCV.DAILY_ITEMCHARTPRICE_MAX_RANGE, end_date: Optional[str] = None) -> ResCommonResponse:
695 """
696 타겟 종목의 최근 일봉을 limit개 반환.
697 TradingService.get_recent_daily_ohlcv를 래핑하여 ResCommonResponse 형태로 통일.
698 """
699 try:
700 rows = await self.market_data_service.get_recent_daily_ohlcv(stock_code, limit=limit, end_date=end_date)
701 if not rows:
702 return ResCommonResponse(rt_cd=ErrorCode.EMPTY_VALUES.value, msg1="데이터 없음", data=[])
703 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=rows)
704 except Exception as e:
705 self.logger.error(f"[OHLCV] {stock_code} 조회 실패: {e}", exc_info=True)
706 return ResCommonResponse(rt_cd=ErrorCode.EMPTY_VALUES.value, msg1=str(e), data=[])
708 async def get_investor_trade_daily_multi(self, stock_code: str, date: str = None, days: int = 3) -> ResCommonResponse:
709 """종목별 투자자 매매동향 다중일 조회 (실전 전용).
711 Returns:
712 data: list[dict] — 최대 days개, 각 항목 {frgn_ntby_tr_pbmn, orgn_ntby_tr_pbmn, acml_tr_pbmn, stck_bsop_date, ...}
713 단위: frgn/orgn_ntby_tr_pbmn 은 백만원, acml_tr_pbmn 은 원.
714 """
715 if not self.broker:
716 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1="broker 미설정", data=[])
717 return await self.broker.get_investor_trade_by_stock_daily_multi(stock_code, date, days)
719 async def get_intraday_minutes_today(self, stock_code: str, *, input_hour_1: str) -> ResCommonResponse:
720 """
721 당일 분봉 조회. MarketDataService 위임.
722 """
723 return await self.market_data_service.get_intraday_minutes_today(
724 stock_code=stock_code, input_hour_1=input_hour_1
725 )
727 async def get_intraday_minutes_by_date(
728 self, stock_code: str, *, input_date_1: str, input_hour_1: str = ""
729 ) -> ResCommonResponse:
730 """
731 일별(특정 일자) 분봉 조회. MarketDataService 위임.
732 """
733 return await self.market_data_service.get_intraday_minutes_by_date(
734 stock_code=stock_code, input_date_1=input_date_1, input_hour_1=input_hour_1
735 )
737 async def get_day_intraday_minutes_list(
738 self,
739 stock_code: str,
740 *,
741 date_ymd: Optional[str] = None, # None이면 '오늘'(KST) 조회
742 session: Literal["REGULAR", "EXTENDED"] = "REGULAR", # REGULAR=09:00~15:40, EXTENDED=08:00~20:00
743 start_hhmmss: Optional[str] = None,
744 end_hhmmss: Optional[str] = None,
745 max_batches: int = 200
746 ) -> List[Dict]:
747 """
748 하루치 분봉(분봉 행 dict)의 '정규화된 리스트'를 반환한다. (출력은 호출부/cli_view에서)
749 - date_ymd=None: 오늘(KST) → get_intraday_minutes_today(배치당 30개; 모의/실전 모두 가능)
750 - date_ymd=YYYYMMDD: 지정일 → get_intraday_minutes_by_date(배치당 100개; 실전 전용)
751 - 시간 범위: session 프리셋으로 선택하거나 start/end를 직접 지정 가능
752 - 반환: 시간 오름차순(HHMMSS) 정렬된 리스트. 각 행은 최소 다음 키를 포함:
753 'stck_bsop_date'(YYYYMMDD), 'stck_cntg_hour'(HHMMSS), 나머지는 원본 필드 유지
754 """
755 t_start = self.pm.start_timer()
756 # 세션 범위 결정
757 if not start_hhmmss or not end_hhmmss:
758 if session.upper() == "EXTENDED":
759 start_hhmmss = start_hhmmss or "080000"
760 end_hhmmss = end_hhmmss or "200000"
761 else:
762 start_hhmmss = start_hhmmss or "090000"
763 end_hhmmss = end_hhmmss or "153000"
765 start_hhmmss = self.market_clock.to_hhmmss(start_hhmmss)
766 end_hhmmss = self.market_clock.to_hhmmss(end_hhmmss)
768 # 조회 날짜
769 if date_ymd:
770 ymd = date_ymd
771 else:
772 now_kst = self.market_clock.get_current_kst_time()
773 ymd = now_kst.strftime("%Y%m%d")
775 # 배치 호출 함수 선택
776 async def _fetch_batch(cursor_hhmmss: str):
777 cursor_hhmmss = self.market_clock.to_hhmmss(cursor_hhmmss)
778 if self.market_data_service._env.is_paper_trading:
779 # 오늘(모의/실전; 배치당 30개)
780 return await self.get_intraday_minutes_today(
781 stock_code, input_hour_1=cursor_hhmmss
782 )
783 else:
784 # 지정일(실전 전용; 배치당 100개)
785 return await self.get_intraday_minutes_by_date(
786 stock_code, input_date_1=ymd, input_hour_1=cursor_hhmmss
787 )
789 def _extract_rows(resp_obj) -> list[dict]:
790 """resp.data가 list 또는 dict(output2/rows/data 키)인 모든 경우를 수용."""
791 data = getattr(resp_obj, "data", None)
792 if isinstance(data, list):
793 return data
794 if isinstance(data, dict): 794 ↛ 797line 794 didn't jump to line 797 because the condition on line 794 was always true
795 rows = data.get("output2") or data.get("rows") or data.get("data") or []
796 return rows if isinstance(rows, list) else []
797 return []
799 # 커서: end부터 과거로 내려가며 수집
800 cursor = end_hhmmss
801 seen: set[tuple[str, str]] = set() # (date, hhmmss)
802 collected: List[Dict] = []
803 batches = 0
805 while batches < max_batches: 805 ↛ 855line 805 didn't jump to line 855 because the condition on line 805 was always true
806 batches += 1
807 resp = await _fetch_batch(cursor)
808 if not resp or str(getattr(resp, "rt_cd", "1")) != "0":
809 break
811 rows = _extract_rows(resp)
812 if not rows:
813 break
815 min_time_in_batch = None
816 added = 0
818 for row in rows:
819 d = str(row.get("stck_bsop_date") or ymd)
820 t = self.market_clock.to_hhmmss(row.get("stck_cntg_hour") or "")
822 if (min_time_in_batch is None) or (t < min_time_in_batch): 822 ↛ 826line 822 didn't jump to line 826 because the condition on line 822 was always true
823 min_time_in_batch = t
825 # 범위 필터
826 if t < start_hhmmss or t > end_hhmmss:
827 continue
828 key = (d, t)
829 if key in seen:
830 continue
831 seen.add(key)
833 norm = dict(row)
834 norm["stck_bsop_date"] = d
835 norm["stck_cntg_hour"] = t
836 collected.append(norm)
837 added += 1
839 if added == 0:
840 if min_time_in_batch: 840 ↛ 845line 840 didn't jump to line 845 because the condition on line 840 was always true
841 cursor = self.market_clock.dec_minute(min_time_in_batch, 1)
842 if cursor < start_hhmmss:
843 break
844 continue
845 break
847 if min_time_in_batch: 847 ↛ 852line 847 didn't jump to line 852 because the condition on line 847 was always true
848 cursor = self.market_clock.dec_minute(min_time_in_batch, 1)
849 if cursor < start_hhmmss:
850 break
851 else:
852 break
854 # 최종 정렬(과거→현재)
855 collected.sort(key=lambda r: r.get("stck_cntg_hour", ""))
857 self.pm.log_timer(f"StockQueryService.get_day_intraday_minutes_list({stock_code}, {batches}배치)", t_start, threshold=1.0)
858 return collected