Coverage for services / indicator_service.py: 93%
263 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1import logging
2import pandas as pd
3import numpy as np
4import math
5import time
6from datetime import datetime
7from typing import List, Dict, Optional, TYPE_CHECKING, Union
8from common.types import ResCommonResponse, ErrorCode, ResBollingerBand, ResRSI, ResMovingAverage, ResRelativeStrength
9from core.cache.cache_store import CacheStore
10from core.performance_profiler import PerformanceProfiler
12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true
13 from services.stock_query_service import StockQueryService
15class IndicatorService:
16 """
17 기술적 지표 계산을 담당하는 서비스.
18 StockQueryService를 통해 데이터를 조회하고 가공하여 지표 값을 반환합니다.
19 """
20 def __init__(self, stock_query_service: Optional['StockQueryService'] = None,
21 cache_store: Optional[CacheStore] = None,
22 performance_profiler: Optional[PerformanceProfiler] = None):
23 self.stock_query_service = stock_query_service
24 self.cache_store = cache_store
25 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False)
27 @staticmethod
28 def _safe_float(val):
29 if val is None or pd.isna(val):
30 return None
31 try:
32 f = float(val)
33 if math.isnan(f) or math.isinf(f):
34 return None
35 return f
36 except (ValueError, TypeError):
37 return None
39 async def _get_ohlcv_data(self, stock_code: str, candle_type: str, ohlcv_data: Optional[List[Dict]] = None) -> tuple:
40 """
41 OHLCV 데이터를 가져옵니다. ohlcv_data가 전달되면 API 호출을 생략합니다.
42 Returns: (ohlcv_data, error_response) - 성공 시 error_response는 None
43 """
44 if ohlcv_data is not None:
45 return ohlcv_data, None
47 if not self.stock_query_service:
48 return None, ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="StockQueryService not initialized", data=None)
50 resp = await self.stock_query_service.get_ohlcv(stock_code, period=candle_type)
51 if resp.rt_cd != ErrorCode.SUCCESS.value or not resp.data:
52 return None, resp
53 return resp.data, None
55 async def _get_with_incremental_cache(
56 self,
57 stock_code: str,
58 candle_type: str,
59 indicator_name: str,
60 data: List[Dict],
61 lookback_period: int,
62 calc_func: callable,
63 *calc_args
64 ) -> ResCommonResponse:
65 """
66 [공통 지표 캐싱 & 병합 파이프라인]
67 과거 확정 데이터는 캐싱하고, 당일(미확정) 데이터만 증분 계산하여 O(1)로 병합합니다.
69 :param stock_code: 종목코드
70 :param candle_type: 캔들 타입 ("D", "W", "M" 등)
71 :param indicator_name: 캐시 키 생성을 위한 지표명 (예: "rsi_14", "bb_20_2.0")
72 :param data: 전체 OHLCV 데이터
73 :param lookback_period: 증분 계산 시 필요한 최소 과거 데이터 개수 (예: RSI 14면 14)
74 :param calc_func: 실제 지표 계산을 수행하고 List[Dict]를 반환하는 콜백 함수
75 :param calc_args: calc_func에 전달할 추가 인자들
76 """
77 # 1. 예외 처리 및 캐시 미적용 조건 (데이터가 너무 적거나, 일봉이 아니거나, 캐시가 꺼진 경우)
78 if not self.cache_store or candle_type != "D" or len(data) <= lookback_period:
79 resp = calc_func(stock_code, data, *calc_args)
80 # [수정포인트 1] 이미 ResCommonResponse 객체라면 이중 래핑 방지
81 return resp if isinstance(resp, ResCommonResponse) else ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(NoCache)", data=resp)
83 # 2. 확정 데이터(어제까지)와 당일 데이터 분리
84 confirmed_data = data[:-1]
85 confirmed_last_date = str(confirmed_data[-1]['date'])
87 # 캐시 키 생성 (지표명_종목코드_마지막확정일자)
88 cache_key = f"{indicator_name}_{stock_code}_{confirmed_last_date}"
90 # 3. 캐시 조회
91 cached_result = self.cache_store.get(cache_key)
93 # 4. 캐시 미스: 확정 데이터 전체에 대해 계산 후 캐시 저장
94 if not cached_result:
95 calc_resp = calc_func(stock_code, confirmed_data, *calc_args)
96 # [수정포인트 2] 반환값이 ResCommonResponse면 내부 data만 추출해서 캐싱
97 if isinstance(calc_resp, ResCommonResponse): 97 ↛ 102line 97 didn't jump to line 102 because the condition on line 97 was always true
98 if calc_resp.rt_cd != ErrorCode.SUCCESS.value: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 return calc_resp # 에러 발생 시 즉시 반환
100 cached_result = calc_resp.data
101 else:
102 cached_result = calc_resp
104 self.cache_store.set(cache_key, cached_result)
106 # 5. 당일 증분 계산
107 slice_size = lookback_period + 5
108 partial_data = data[-slice_size:]
110 partial_resp = calc_func(stock_code, partial_data, *calc_args)
112 # [수정포인트 3] 반환값이 ResCommonResponse면 내부 data만 추출해서 병합 준비
113 if isinstance(partial_resp, ResCommonResponse): 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true
114 if partial_resp.rt_cd != ErrorCode.SUCCESS.value:
115 return partial_resp
116 partial_result = partial_resp.data
117 else:
118 partial_result = partial_resp
120 if not partial_result: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1="부분 지표 계산 실패", data=None)
123 latest_indicator = partial_result[-1]
124 # 6. O(1) 속도로 병합 (리스트 '+' 연산자 제거 최적화)
125 final_data = cached_result.copy() # 얕은 복사로 원본 캐시 리스트 보호
127 if final_data and final_data[-1]['date'] == latest_indicator['date']:
128 final_data[-1] = latest_indicator # 덮어쓰기
129 else:
130 final_data.append(latest_indicator) # 맨 뒤에 추가
132 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(CacheHit)", data=final_data)
134 async def get_bollinger_bands(self, stock_code: str, period: int = 20, multiplier: float = 2.0, candle_type: str = "D",
135 ohlcv_data: Optional[List[Dict]] = None) -> ResCommonResponse:
136 """볼린저 밴드 조회"""
137 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data=ohlcv_data)
138 if err_resp: return err_resp
140 return await self._get_with_incremental_cache(
141 stock_code,
142 candle_type,
143 f"bb_{period}_{multiplier}",
144 data,
145 period,
146 self._calculate_bollinger_bands_full, # DataFrame 변환 및 순수 계산 로직만 있는 내부 함수
147 period, # *calc_args 첫 번째
148 multiplier # *calc_args 두 번째
149 )
151 async def get_rsi(self, stock_code: str, period: int = 14, candle_type: str = "D",
152 ohlcv_data: Optional[List[Dict]] = None) -> ResCommonResponse:
153 """RSI (상대강도지수) 조회"""
154 # 1. OHLCV 데이터 가져오기
155 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data=ohlcv_data)
156 if err_resp: return err_resp
158 # 2. 공통 파이프라인에 위임 (순수 계산 함수인 _calculate_rsi_series_internal 등을 넘김)
159 return await self._get_with_incremental_cache(
160 stock_code,
161 candle_type,
162 f"rsi_{period}",
163 data,
164 period,
165 self._calculate_rsi_series, # DataFrame 변환 및 순수 계산 로직만 있는 내부 함수
166 period # <-- calc_func에 전달될 *calc_args (정상 작동!)
167 )
169 async def get_moving_average(
170 self,
171 stock_code: str,
172 period: int = 20,
173 method: str = "sma",
174 candle_type: str = "D",
175 ohlcv_data: Optional[List[Dict]] = None
176 ) -> ResCommonResponse:
177 """이동평균선 조회"""
178 # 1. OHLCV 데이터 로드 및 에러 처리
179 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data=ohlcv_data)
181 if err_resp: return err_resp 181 ↛ exitline 181 didn't return from function 'get_moving_average' because the return on line 181 wasn't executed
184 # 2. 공통 캐시 파이프라인에 위임 (method 파라미터 포함)
185 return await self._get_with_incremental_cache(
186 stock_code,
187 candle_type,
188 f"ma_{period}_{method}", # [수정됨] 캐시 키에 method 포함 (예: ma_20_sma)
189 data,
190 period,
191 self._calculate_moving_average_full, # 기존 이동평균선 전체 계산 함수
192 period, # *calc_args 첫 번째 인자
193 method # [수정됨] *calc_args 두 번째 인자로 method 전달
194 )
196 async def get_relative_strength(
197 self,
198 stock_code: str,
199 period_days: int = 63,
200 candle_type: str = "D",
201 ohlcv_data: Optional[List[Dict]] = None
202 ) -> ResCommonResponse:
203 """N일 수익률(상대강도 원시값)을 계산하여 반환합니다.
204 (단일 스칼라 값만 필요하므로, Pandas 변환 및 캐싱 없이 O(1) 리스트 직접 연산으로 초고속 처리합니다.)
205 """
206 t_start = self.pm.start_timer()
207 t_calc = self.pm.start_timer()
208 try:
209 # 1. OHLCV 데이터 로드
210 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data)
211 if err_resp: return err_resp 211 ↛ exitline 211 didn't return from function 'get_relative_strength' because the return on line 211 wasn't executed
213 # pct_change(period_days)와 동일한 간격을 구하려면 데이터가 period_days + 1 개 필요합니다.
214 if len(data) < period_days + 1:
215 return ResCommonResponse(
216 rt_cd=ErrorCode.EMPTY_VALUES.value,
217 msg1=f"데이터 부족: {len(data)} < {period_days + 1}",
218 data=None
219 )
221 # 2. 무거운 DataFrame 변환 없이 리스트 인덱싱으로 즉시 추출!
222 recent_candle = data[-1] # 오늘(최근) 캔들
223 past_candle = data[-(period_days + 1)] # N일 전 캔들 (예: 63일 전)
225 recent_close = self._safe_float(recent_candle.get('close'))
226 past_close = self._safe_float(past_candle.get('close'))
228 # 데이터 유효성 검증
229 if recent_close is None or past_close is None or past_close <= 0:
230 return ResCommonResponse(
231 rt_cd=ErrorCode.EMPTY_VALUES.value,
232 msg1=f"유효하지 않은 종가 (past_close={past_close})",
233 data=None
234 )
236 # 3. 수익률 계산
237 return_pct = ((recent_close - past_close) / past_close) * 100
239 result = ResRelativeStrength(
240 code=stock_code,
241 date=str(recent_candle.get('date')),
242 return_pct=round(return_pct, 2),
243 )
245 if self.pm.enabled: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 self.pm.log_timer(f"IndicatorService.get_relative_strength({stock_code})", t_start)
248 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=result)
250 except Exception as e:
251 logging.getLogger(__name__).exception(f"상대강도 계산 중 오류: {e}")
252 return ResCommonResponse(
253 rt_cd=ErrorCode.UNKNOWN_ERROR.value,
254 msg1=f"상대강도 계산 중 오류: {str(e)}",
255 data=None
256 )
258 async def get_chart_indicators(self, stock_code: str, ohlcv_data: List[Dict]) -> ResCommonResponse:
259 """
260 차트 렌더링용 지표(MA 5/10/20/60/120, BB, RS)를 한 번에 계산하여 반환합니다.
261 과거 데이터에 대한 계산 결과를 캐싱하여 성능을 최적화합니다.
262 """
263 t_start = self.pm.start_timer()
265 # 데이터가 너무 적거나 캐시 매니저가 없으면 전체 계산 (최대 기간 120일 + 여유)
266 if not ohlcv_data or len(ohlcv_data) < 130 or not self.cache_store:
267 resp = self._calculate_indicators_full(stock_code, ohlcv_data)
268 self.pm.log_timer(f"IndicatorService.get_chart_indicators({stock_code})", t_start, extra_info="Full Calc", threshold=0.5)
269 return resp
271 try:
272 # 1. 확정된 과거 데이터 분리 (마지막 데이터 제외)
273 # 마지막 데이터는 장 중 실시간으로 변할 수 있으므로 캐시 대상에서 제외
274 confirmed_data = ohlcv_data[:-1]
275 confirmed_len = len(confirmed_data)
276 # 캐시 키: 시작일 + 종료일 + 데이터 수로 구성하여 ohlcv_limit 변경이나
277 # DB 행 수 변화로 인한 캐시 불일치 방지
278 confirmed_first_date = str(confirmed_data[0]['date'])
279 confirmed_last_date = str(confirmed_data[-1]['date'])
281 cache_key = f"indicators_chart_{stock_code}_{confirmed_first_date}_{confirmed_last_date}_{confirmed_len}"
283 # 2. 캐시 조회
284 raw_cache = self.cache_store.get_raw(cache_key)
285 cached_wrapper = None
286 if raw_cache and isinstance(raw_cache, tuple):
287 cached_wrapper, _ = raw_cache
289 past_indicators = None
290 if cached_wrapper:
291 cached_data = cached_wrapper.get('data')
292 # 캐시된 지표 행 수가 confirmed_data와 일치하는지 검증
293 if cached_data: 293 ↛ 299line 293 didn't jump to line 299 because the condition on line 293 was always true
294 sample_key = next((k for k, v in cached_data.items() if isinstance(v, list)), None)
295 if sample_key and len(cached_data[sample_key]) == confirmed_len: 295 ↛ 299line 295 didn't jump to line 299 because the condition on line 295 was always true
296 past_indicators = cached_data
298 # 3. 캐시 미스 시 과거 데이터 전체 계산 및 저장
299 if not past_indicators:
300 resp = self._calculate_indicators_full(stock_code, confirmed_data)
301 if resp.rt_cd != ErrorCode.SUCCESS.value: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 return resp
303 past_indicators = resp.data
305 # 캐시 저장 (파일 저장 포함)
306 self.cache_store.set(cache_key, {
307 "timestamp": datetime.now().isoformat(),
308 "data": past_indicators
309 }, save_to_file=True)
311 # 4. 오늘 데이터(마지막 1개)에 대한 지표 계산 (증분 계산)
312 # 이동평균 등 계산을 위해 과거 데이터 일부가 필요함 (최대 120일 + 여유)
313 lookback = 130
314 partial_data = ohlcv_data[-lookback:]
316 resp_partial = self._calculate_indicators_full(stock_code, partial_data)
317 if resp_partial.rt_cd != ErrorCode.SUCCESS.value: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 return resp_partial
320 latest_indicators = resp_partial.data
322 # 5. 병합 (과거 지표 + 오늘 지표)
323 merged_indicators = {}
325 for key, val_list in past_indicators.items():
326 if isinstance(val_list, list): 326 ↛ 339line 326 didn't jump to line 339 because the condition on line 326 was always true
327 # latest_indicators[key]의 마지막 요소(오늘치) 가져오기
328 if key in latest_indicators and latest_indicators[key]:
329 latest_item = latest_indicators[key][-1]
330 merged_list = val_list.copy()
331 if merged_list and merged_list[-1].get('date') == latest_item.get('date'): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 merged_list[-1] = latest_item
333 else:
334 merged_list.append(latest_item)
335 merged_indicators[key] = merged_list
336 else:
337 merged_indicators[key] = val_list
338 else:
339 merged_indicators[key] = val_list
341 self.pm.log_timer(f"IndicatorService.get_chart_indicators({stock_code})", t_start, extra_info="Cached")
342 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=merged_indicators)
344 except Exception as e:
345 logging.getLogger(__name__).exception(f"지표 캐싱 처리 중 오류: {e}")
346 # 오류 발생 시 안전하게 전체 재계산 시도
347 return self._calculate_indicators_full(stock_code, ohlcv_data)
349 # ── 계산 로직 공통화 (Helper Methods) ─────────────────────────────
351 def _to_dataframe(self, ohlcv_data: list) -> pd.DataFrame:
352 """리스트 데이터를 데이터프레임으로 변환하고 수치 전처리를 수행"""
353 try:
354 df = pd.DataFrame(ohlcv_data)
356 if df.empty:
357 return df
359 # 1. 수치형 변환 (errors='coerce'는 숫자가 아닌 것을 NaN으로 만듭니다)
360 if 'close' in df.columns:
361 df['close'] = pd.to_numeric(df['close'], errors='coerce')
363 # 2. [핵심] 무한대(inf) 값을 실제 NaN으로 치환
364 # np.inf를 쓰려면 위에 import numpy as np가 필요합니다.
365 df['close'] = df['close'].replace([np.inf, -np.inf], np.nan)
367 return df
369 except Exception as e:
370 # 여기서 에러가 나면 IndicatorService에서 999 에러를 반환하게 됩니다.
371 raise e
373 @staticmethod
374 def _compute_ma(df: pd.DataFrame, period: int, method: str = "sma", target_col: str = "ma") -> pd.DataFrame:
375 """이동평균 계산 및 컬럼 추가"""
376 if method.lower() == "ema":
377 df[target_col] = df['close'].ewm(span=period, adjust=False).mean()
378 else:
379 df[target_col] = df['close'].rolling(window=period).mean()
380 return df
382 @staticmethod
383 def _compute_bb(df: pd.DataFrame, period: int, std_dev: float, prefix: str = "bb") -> pd.DataFrame:
384 """볼린저 밴드 계산 및 컬럼 추가"""
385 mb = df['close'].rolling(window=period).mean()
386 std = df['close'].rolling(window=period).std(ddof=0)
387 df[f'{prefix}_middle'] = mb
388 df[f'{prefix}_upper'] = mb + (std * std_dev)
389 df[f'{prefix}_lower'] = mb - (std * std_dev)
390 return df
392 @staticmethod
393 def _compute_rsi(df: pd.DataFrame, period: int, target_col: str = "rsi") -> pd.DataFrame:
394 """RSI 계산 및 컬럼 추가"""
395 delta = df['close'].diff()
396 u = delta.clip(lower=0)
397 d = -1 * delta.clip(upper=0)
398 au = u.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
399 ad = d.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
400 rs = au / ad
401 df[target_col] = 100 - (100 / (1 + rs))
402 return df
404 def calc_bb_widths_sync(
405 self,
406 ohlcv_data: List[Dict],
407 period: int = 20,
408 multiplier: float = 2.0,
409 ) -> List[float]:
410 """이미 확보한 OHLCV 데이터로 BB 폭(upper-lower) 목록을 동기 계산합니다.
411 async 오버헤드 없이 순수 pandas 계산만 수행합니다."""
412 resp = self._calculate_bollinger_bands_full("", ohlcv_data, period, multiplier)
413 if resp.rt_cd != ErrorCode.SUCCESS.value or not resp.data:
414 return []
415 widths = []
416 for band in resp.data:
417 upper = band.get('upper') if isinstance(band, dict) else getattr(band, 'upper', None)
418 lower = band.get('lower') if isinstance(band, dict) else getattr(band, 'lower', None)
419 if upper is not None and lower is not None:
420 widths.append(upper - lower)
421 return widths
423 def calc_rs_sync(
424 self,
425 ohlcv_data: List[Dict],
426 period_days: int = 63,
427 ) -> float:
428 """이미 확보한 OHLCV 데이터로 RS 수익률을 동기 계산합니다.
429 async 오버헤드 없이 O(1) 리스트 인덱싱만 수행합니다."""
430 try:
431 if len(ohlcv_data) < period_days + 1:
432 return 0.0
433 recent_close = self._safe_float(ohlcv_data[-1].get('close'))
434 past_close = self._safe_float(ohlcv_data[-(period_days + 1)].get('close'))
435 if recent_close is None or past_close is None or past_close <= 0:
436 return 0.0
437 return round(((recent_close - past_close) / past_close) * 100, 2)
438 except Exception:
439 return 0.0
441 def _calculate_bollinger_bands_full(self, stock_code, data, period, std_dev) -> ResCommonResponse:
442 """볼린저 밴드 전체 계산 (내부용)"""
443 try:
444 df = self._to_dataframe(data)
446 df['MB'] = df['close'].rolling(window=period).mean()
447 df['std'] = df['close'].rolling(window=period).std(ddof=0)
448 df['UB'] = df['MB'] + (df['std'] * std_dev)
449 df['LB'] = df['MB'] - (df['std'] * std_dev)
451 results = [
452 {
453 "code": stock_code, "date": str(row.date),
454 "close": self._safe_float(row.close),
455 "middle": self._safe_float(row.MB),
456 "upper": self._safe_float(row.UB),
457 "lower": self._safe_float(row.LB)
458 }
459 for row in df.itertuples(index=False)
460 ]
461 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=results)
462 except Exception as e:
463 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None)
465 def _calculate_rsi_series(self, stock_code, data, period) -> ResCommonResponse:
466 """RSI 시계열 전체 계산 (내부용)"""
467 try:
468 df = self._to_dataframe(data)
470 # 공통 로직 사용
471 df = self._compute_rsi(df, period, target_col="rsi")
473 results = [
474 {
475 "code": stock_code, "date": str(row.date),
476 "close": self._safe_float(row.close),
477 "rsi": self._safe_float(row.rsi)
478 }
479 for row in df.itertuples(index=False)
480 ]
481 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=results)
482 except Exception as e:
483 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None)
485 def _calculate_moving_average_full(self, stock_code, data, period, method) -> ResCommonResponse:
486 """이동평균 전체 계산 (내부용)"""
487 try:
488 df = self._to_dataframe(data)
490 # 공통 로직 사용
491 df = self._compute_ma(df, period, method, target_col="ma")
493 results = [
494 {
495 "code": stock_code, "date": str(row.date),
496 "close": self._safe_float(row.close),
497 "ma": self._safe_float(row.ma)
498 }
499 for row in df.itertuples(index=False)
500 ]
501 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=results)
502 except Exception as e:
503 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None)
505 def _calculate_indicators_full(self, stock_code: str, ohlcv_data: List[Dict]) -> ResCommonResponse:
506 """전체 데이터를 받아 지표를 계산하는 내부 메서드 (Vectorized 최적화 버전)"""
507 try:
508 # 1. DataFrame 변환 (1회 수행)
509 df = self._to_dataframe(ohlcv_data)
510 if df.empty:
511 return ResCommonResponse(rt_cd=ErrorCode.EMPTY_VALUES.value, msg1="데이터 없음", data=None)
513 # 2. 지표 계산 (Vectorized operations)
514 # MA
515 for p in [5, 10, 20, 60, 120]:
516 df = self._compute_ma(df, p, "sma", target_col=f"ma{p}")
518 # BB (20일, 2.0)
519 df = self._compute_bb(df, 20, 2.0, prefix="bb")
521 # RS (63일 등락률)
522 rs_period = 63
523 df['rs'] = df['close'].pct_change(periods=rs_period) * 100
525 # 3. 결과 포맷팅 전처리 (빠른 일괄 변환)
526 # 3-1. 응답 규격을 맞추기 위해 date 컬럼을 일괄 문자열로 캐스팅
527 df['date'] = df['date'].astype(str)
529 # 3-2. 숫자형 컬럼의 무한대(inf)를 NaN으로 치환 (문자열 컬럼 순회 방지로 속도 극대화)
530 num_cols = df.select_dtypes(include=[np.number]).columns
531 df[num_cols] = df[num_cols].replace([np.inf, -np.inf], np.nan)
533 # 3-3. JSON 직렬화를 위해 모든 NaN을 None으로 일괄 치환
534 # (Pandas float64 자동 캐스팅 방지를 위해 object 명시적 캐스팅 적용)
535 df = df.astype(object).where(pd.notnull(df), None)
537 # 4. 결과 포맷팅 (Pandas to_dict 활용 - 파이썬 반복문 0번)
538 indicators = {}
540 # MA 추출
541 for p in [5, 10, 20, 60, 120]:
542 ma_key = f'ma{p}'
543 indicators[ma_key] = df[['date', 'close', ma_key]].rename(
544 columns={ma_key: 'ma'}
545 ).to_dict('records')
547 # BB 추출 (이름 매핑 및 code 컬럼 동적 추가)
548 indicators["bb"] = df[['date', 'close', 'bb_middle', 'bb_upper', 'bb_lower']].rename(
549 columns={'bb_middle': 'middle', 'bb_upper': 'upper', 'bb_lower': 'lower'}
550 ).assign(code=stock_code)[['code', 'date', 'close', 'middle', 'upper', 'lower']].to_dict('records')
552 # RS 추출
553 indicators["rs"] = df[['date', 'rs']].to_dict('records')
555 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=indicators)
557 except Exception as e:
558 logging.getLogger(__name__).exception(f"지표 통합 계산 중 오류: {e}")
559 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None)