Coverage for services / indicator_service.py: 93%

263 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1import logging 

2import pandas as pd 

3import numpy as np 

4import math 

5import time 

6from datetime import datetime 

7from typing import List, Dict, Optional, TYPE_CHECKING, Union 

8from common.types import ResCommonResponse, ErrorCode, ResBollingerBand, ResRSI, ResMovingAverage, ResRelativeStrength 

9from core.cache.cache_store import CacheStore 

10from core.performance_profiler import PerformanceProfiler 

11 

12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true

13 from services.stock_query_service import StockQueryService 

14 

15class IndicatorService: 

16 """ 

17 기술적 지표 계산을 담당하는 서비스. 

18 StockQueryService를 통해 데이터를 조회하고 가공하여 지표 값을 반환합니다. 

19 """ 

20 def __init__(self, stock_query_service: Optional['StockQueryService'] = None, 

21 cache_store: Optional[CacheStore] = None, 

22 performance_profiler: Optional[PerformanceProfiler] = None): 

23 self.stock_query_service = stock_query_service 

24 self.cache_store = cache_store 

25 self.pm = performance_profiler if performance_profiler else PerformanceProfiler(enabled=False) 

26 

27 @staticmethod 

28 def _safe_float(val): 

29 if val is None or pd.isna(val): 

30 return None 

31 try: 

32 f = float(val) 

33 if math.isnan(f) or math.isinf(f): 

34 return None 

35 return f 

36 except (ValueError, TypeError): 

37 return None 

38 

39 async def _get_ohlcv_data(self, stock_code: str, candle_type: str, ohlcv_data: Optional[List[Dict]] = None) -> tuple: 

40 """ 

41 OHLCV 데이터를 가져옵니다. ohlcv_data가 전달되면 API 호출을 생략합니다. 

42 Returns: (ohlcv_data, error_response) - 성공 시 error_response는 None 

43 """ 

44 if ohlcv_data is not None: 

45 return ohlcv_data, None 

46 

47 if not self.stock_query_service: 

48 return None, ResCommonResponse(rt_cd=ErrorCode.API_ERROR.value, msg1="StockQueryService not initialized", data=None) 

49 

50 resp = await self.stock_query_service.get_ohlcv(stock_code, period=candle_type) 

51 if resp.rt_cd != ErrorCode.SUCCESS.value or not resp.data: 

52 return None, resp 

53 return resp.data, None 

54 

55 async def _get_with_incremental_cache( 

56 self, 

57 stock_code: str, 

58 candle_type: str, 

59 indicator_name: str, 

60 data: List[Dict], 

61 lookback_period: int, 

62 calc_func: callable, 

63 *calc_args 

64 ) -> ResCommonResponse: 

65 """ 

66 [공통 지표 캐싱 & 병합 파이프라인] 

67 과거 확정 데이터는 캐싱하고, 당일(미확정) 데이터만 증분 계산하여 O(1)로 병합합니다. 

68  

69 :param stock_code: 종목코드 

70 :param candle_type: 캔들 타입 ("D", "W", "M" 등) 

71 :param indicator_name: 캐시 키 생성을 위한 지표명 (예: "rsi_14", "bb_20_2.0") 

72 :param data: 전체 OHLCV 데이터 

73 :param lookback_period: 증분 계산 시 필요한 최소 과거 데이터 개수 (예: RSI 14면 14) 

74 :param calc_func: 실제 지표 계산을 수행하고 List[Dict]를 반환하는 콜백 함수 

75 :param calc_args: calc_func에 전달할 추가 인자들 

76 """ 

77 # 1. 예외 처리 및 캐시 미적용 조건 (데이터가 너무 적거나, 일봉이 아니거나, 캐시가 꺼진 경우) 

78 if not self.cache_store or candle_type != "D" or len(data) <= lookback_period: 

79 resp = calc_func(stock_code, data, *calc_args) 

80 # [수정포인트 1] 이미 ResCommonResponse 객체라면 이중 래핑 방지 

81 return resp if isinstance(resp, ResCommonResponse) else ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(NoCache)", data=resp) 

82 

83 # 2. 확정 데이터(어제까지)와 당일 데이터 분리 

84 confirmed_data = data[:-1] 

85 confirmed_last_date = str(confirmed_data[-1]['date']) 

86 

87 # 캐시 키 생성 (지표명_종목코드_마지막확정일자) 

88 cache_key = f"{indicator_name}_{stock_code}_{confirmed_last_date}" 

89 

90 # 3. 캐시 조회 

91 cached_result = self.cache_store.get(cache_key) 

92 

93 # 4. 캐시 미스: 확정 데이터 전체에 대해 계산 후 캐시 저장 

94 if not cached_result: 

95 calc_resp = calc_func(stock_code, confirmed_data, *calc_args) 

96 # [수정포인트 2] 반환값이 ResCommonResponse면 내부 data만 추출해서 캐싱 

97 if isinstance(calc_resp, ResCommonResponse): 97 ↛ 102line 97 didn't jump to line 102 because the condition on line 97 was always true

98 if calc_resp.rt_cd != ErrorCode.SUCCESS.value: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 return calc_resp # 에러 발생 시 즉시 반환 

100 cached_result = calc_resp.data 

101 else: 

102 cached_result = calc_resp 

103 

104 self.cache_store.set(cache_key, cached_result) 

105 

106 # 5. 당일 증분 계산 

107 slice_size = lookback_period + 5 

108 partial_data = data[-slice_size:] 

109 

110 partial_resp = calc_func(stock_code, partial_data, *calc_args) 

111 

112 # [수정포인트 3] 반환값이 ResCommonResponse면 내부 data만 추출해서 병합 준비 

113 if isinstance(partial_resp, ResCommonResponse): 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true

114 if partial_resp.rt_cd != ErrorCode.SUCCESS.value: 

115 return partial_resp 

116 partial_result = partial_resp.data 

117 else: 

118 partial_result = partial_resp 

119 

120 if not partial_result: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1="부분 지표 계산 실패", data=None) 

122 

123 latest_indicator = partial_result[-1] 

124 # 6. O(1) 속도로 병합 (리스트 '+' 연산자 제거 최적화) 

125 final_data = cached_result.copy() # 얕은 복사로 원본 캐시 리스트 보호 

126 

127 if final_data and final_data[-1]['date'] == latest_indicator['date']: 

128 final_data[-1] = latest_indicator # 덮어쓰기 

129 else: 

130 final_data.append(latest_indicator) # 맨 뒤에 추가 

131 

132 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공(CacheHit)", data=final_data) 

133 

134 async def get_bollinger_bands(self, stock_code: str, period: int = 20, multiplier: float = 2.0, candle_type: str = "D", 

135 ohlcv_data: Optional[List[Dict]] = None) -> ResCommonResponse: 

136 """볼린저 밴드 조회""" 

137 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data=ohlcv_data) 

138 if err_resp: return err_resp 

139 

140 return await self._get_with_incremental_cache( 

141 stock_code, 

142 candle_type, 

143 f"bb_{period}_{multiplier}", 

144 data, 

145 period, 

146 self._calculate_bollinger_bands_full, # DataFrame 변환 및 순수 계산 로직만 있는 내부 함수 

147 period, # *calc_args 첫 번째 

148 multiplier # *calc_args 두 번째 

149 ) 

150 

151 async def get_rsi(self, stock_code: str, period: int = 14, candle_type: str = "D", 

152 ohlcv_data: Optional[List[Dict]] = None) -> ResCommonResponse: 

153 """RSI (상대강도지수) 조회""" 

154 # 1. OHLCV 데이터 가져오기 

155 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data=ohlcv_data) 

156 if err_resp: return err_resp 

157 

158 # 2. 공통 파이프라인에 위임 (순수 계산 함수인 _calculate_rsi_series_internal 등을 넘김) 

159 return await self._get_with_incremental_cache( 

160 stock_code, 

161 candle_type, 

162 f"rsi_{period}", 

163 data, 

164 period, 

165 self._calculate_rsi_series, # DataFrame 변환 및 순수 계산 로직만 있는 내부 함수 

166 period # <-- calc_func에 전달될 *calc_args (정상 작동!) 

167 ) 

168 

169 async def get_moving_average( 

170 self, 

171 stock_code: str, 

172 period: int = 20, 

173 method: str = "sma", 

174 candle_type: str = "D", 

175 ohlcv_data: Optional[List[Dict]] = None 

176 ) -> ResCommonResponse: 

177 """이동평균선 조회""" 

178 # 1. OHLCV 데이터 로드 및 에러 처리 

179 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data=ohlcv_data) 

180 

181 if err_resp: return err_resp 181 ↛ exitline 181 didn't return from function 'get_moving_average' because the return on line 181 wasn't executed

182 

183 

184 # 2. 공통 캐시 파이프라인에 위임 (method 파라미터 포함) 

185 return await self._get_with_incremental_cache( 

186 stock_code, 

187 candle_type, 

188 f"ma_{period}_{method}", # [수정됨] 캐시 키에 method 포함 (예: ma_20_sma) 

189 data, 

190 period, 

191 self._calculate_moving_average_full, # 기존 이동평균선 전체 계산 함수 

192 period, # *calc_args 첫 번째 인자 

193 method # [수정됨] *calc_args 두 번째 인자로 method 전달 

194 ) 

195 

196 async def get_relative_strength( 

197 self, 

198 stock_code: str, 

199 period_days: int = 63, 

200 candle_type: str = "D", 

201 ohlcv_data: Optional[List[Dict]] = None 

202 ) -> ResCommonResponse: 

203 """N일 수익률(상대강도 원시값)을 계산하여 반환합니다. 

204 (단일 스칼라 값만 필요하므로, Pandas 변환 및 캐싱 없이 O(1) 리스트 직접 연산으로 초고속 처리합니다.) 

205 """ 

206 t_start = self.pm.start_timer() 

207 t_calc = self.pm.start_timer() 

208 try: 

209 # 1. OHLCV 데이터 로드 

210 data, err_resp = await self._get_ohlcv_data(stock_code, candle_type, ohlcv_data) 

211 if err_resp: return err_resp 211 ↛ exitline 211 didn't return from function 'get_relative_strength' because the return on line 211 wasn't executed

212 

213 # pct_change(period_days)와 동일한 간격을 구하려면 데이터가 period_days + 1 개 필요합니다. 

214 if len(data) < period_days + 1: 

215 return ResCommonResponse( 

216 rt_cd=ErrorCode.EMPTY_VALUES.value, 

217 msg1=f"데이터 부족: {len(data)} < {period_days + 1}", 

218 data=None 

219 ) 

220 

221 # 2. 무거운 DataFrame 변환 없이 리스트 인덱싱으로 즉시 추출! 

222 recent_candle = data[-1] # 오늘(최근) 캔들 

223 past_candle = data[-(period_days + 1)] # N일 전 캔들 (예: 63일 전) 

224 

225 recent_close = self._safe_float(recent_candle.get('close')) 

226 past_close = self._safe_float(past_candle.get('close')) 

227 

228 # 데이터 유효성 검증 

229 if recent_close is None or past_close is None or past_close <= 0: 

230 return ResCommonResponse( 

231 rt_cd=ErrorCode.EMPTY_VALUES.value, 

232 msg1=f"유효하지 않은 종가 (past_close={past_close})", 

233 data=None 

234 ) 

235 

236 # 3. 수익률 계산 

237 return_pct = ((recent_close - past_close) / past_close) * 100 

238 

239 result = ResRelativeStrength( 

240 code=stock_code, 

241 date=str(recent_candle.get('date')), 

242 return_pct=round(return_pct, 2), 

243 ) 

244 

245 if self.pm.enabled: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 self.pm.log_timer(f"IndicatorService.get_relative_strength({stock_code})", t_start) 

247 

248 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=result) 

249 

250 except Exception as e: 

251 logging.getLogger(__name__).exception(f"상대강도 계산 중 오류: {e}") 

252 return ResCommonResponse( 

253 rt_cd=ErrorCode.UNKNOWN_ERROR.value, 

254 msg1=f"상대강도 계산 중 오류: {str(e)}", 

255 data=None 

256 ) 

257 

258 async def get_chart_indicators(self, stock_code: str, ohlcv_data: List[Dict]) -> ResCommonResponse: 

259 """ 

260 차트 렌더링용 지표(MA 5/10/20/60/120, BB, RS)를 한 번에 계산하여 반환합니다. 

261 과거 데이터에 대한 계산 결과를 캐싱하여 성능을 최적화합니다. 

262 """ 

263 t_start = self.pm.start_timer() 

264 

265 # 데이터가 너무 적거나 캐시 매니저가 없으면 전체 계산 (최대 기간 120일 + 여유) 

266 if not ohlcv_data or len(ohlcv_data) < 130 or not self.cache_store: 

267 resp = self._calculate_indicators_full(stock_code, ohlcv_data) 

268 self.pm.log_timer(f"IndicatorService.get_chart_indicators({stock_code})", t_start, extra_info="Full Calc", threshold=0.5) 

269 return resp 

270 

271 try: 

272 # 1. 확정된 과거 데이터 분리 (마지막 데이터 제외) 

273 # 마지막 데이터는 장 중 실시간으로 변할 수 있으므로 캐시 대상에서 제외 

274 confirmed_data = ohlcv_data[:-1] 

275 confirmed_len = len(confirmed_data) 

276 # 캐시 키: 시작일 + 종료일 + 데이터 수로 구성하여 ohlcv_limit 변경이나 

277 # DB 행 수 변화로 인한 캐시 불일치 방지 

278 confirmed_first_date = str(confirmed_data[0]['date']) 

279 confirmed_last_date = str(confirmed_data[-1]['date']) 

280 

281 cache_key = f"indicators_chart_{stock_code}_{confirmed_first_date}_{confirmed_last_date}_{confirmed_len}" 

282 

283 # 2. 캐시 조회 

284 raw_cache = self.cache_store.get_raw(cache_key) 

285 cached_wrapper = None 

286 if raw_cache and isinstance(raw_cache, tuple): 

287 cached_wrapper, _ = raw_cache 

288 

289 past_indicators = None 

290 if cached_wrapper: 

291 cached_data = cached_wrapper.get('data') 

292 # 캐시된 지표 행 수가 confirmed_data와 일치하는지 검증 

293 if cached_data: 293 ↛ 299line 293 didn't jump to line 299 because the condition on line 293 was always true

294 sample_key = next((k for k, v in cached_data.items() if isinstance(v, list)), None) 

295 if sample_key and len(cached_data[sample_key]) == confirmed_len: 295 ↛ 299line 295 didn't jump to line 299 because the condition on line 295 was always true

296 past_indicators = cached_data 

297 

298 # 3. 캐시 미스 시 과거 데이터 전체 계산 및 저장 

299 if not past_indicators: 

300 resp = self._calculate_indicators_full(stock_code, confirmed_data) 

301 if resp.rt_cd != ErrorCode.SUCCESS.value: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 return resp 

303 past_indicators = resp.data 

304 

305 # 캐시 저장 (파일 저장 포함) 

306 self.cache_store.set(cache_key, { 

307 "timestamp": datetime.now().isoformat(), 

308 "data": past_indicators 

309 }, save_to_file=True) 

310 

311 # 4. 오늘 데이터(마지막 1개)에 대한 지표 계산 (증분 계산) 

312 # 이동평균 등 계산을 위해 과거 데이터 일부가 필요함 (최대 120일 + 여유) 

313 lookback = 130 

314 partial_data = ohlcv_data[-lookback:] 

315 

316 resp_partial = self._calculate_indicators_full(stock_code, partial_data) 

317 if resp_partial.rt_cd != ErrorCode.SUCCESS.value: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 return resp_partial 

319 

320 latest_indicators = resp_partial.data 

321 

322 # 5. 병합 (과거 지표 + 오늘 지표) 

323 merged_indicators = {} 

324 

325 for key, val_list in past_indicators.items(): 

326 if isinstance(val_list, list): 326 ↛ 339line 326 didn't jump to line 339 because the condition on line 326 was always true

327 # latest_indicators[key]의 마지막 요소(오늘치) 가져오기 

328 if key in latest_indicators and latest_indicators[key]: 

329 latest_item = latest_indicators[key][-1] 

330 merged_list = val_list.copy() 

331 if merged_list and merged_list[-1].get('date') == latest_item.get('date'): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 merged_list[-1] = latest_item 

333 else: 

334 merged_list.append(latest_item) 

335 merged_indicators[key] = merged_list 

336 else: 

337 merged_indicators[key] = val_list 

338 else: 

339 merged_indicators[key] = val_list 

340 

341 self.pm.log_timer(f"IndicatorService.get_chart_indicators({stock_code})", t_start, extra_info="Cached") 

342 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=merged_indicators) 

343 

344 except Exception as e: 

345 logging.getLogger(__name__).exception(f"지표 캐싱 처리 중 오류: {e}") 

346 # 오류 발생 시 안전하게 전체 재계산 시도 

347 return self._calculate_indicators_full(stock_code, ohlcv_data) 

348 

349 # ── 계산 로직 공통화 (Helper Methods) ───────────────────────────── 

350 

351 def _to_dataframe(self, ohlcv_data: list) -> pd.DataFrame: 

352 """리스트 데이터를 데이터프레임으로 변환하고 수치 전처리를 수행""" 

353 try: 

354 df = pd.DataFrame(ohlcv_data) 

355 

356 if df.empty: 

357 return df 

358 

359 # 1. 수치형 변환 (errors='coerce'는 숫자가 아닌 것을 NaN으로 만듭니다) 

360 if 'close' in df.columns: 

361 df['close'] = pd.to_numeric(df['close'], errors='coerce') 

362 

363 # 2. [핵심] 무한대(inf) 값을 실제 NaN으로 치환 

364 # np.inf를 쓰려면 위에 import numpy as np가 필요합니다. 

365 df['close'] = df['close'].replace([np.inf, -np.inf], np.nan) 

366 

367 return df 

368 

369 except Exception as e: 

370 # 여기서 에러가 나면 IndicatorService에서 999 에러를 반환하게 됩니다. 

371 raise e 

372 

373 @staticmethod 

374 def _compute_ma(df: pd.DataFrame, period: int, method: str = "sma", target_col: str = "ma") -> pd.DataFrame: 

375 """이동평균 계산 및 컬럼 추가""" 

376 if method.lower() == "ema": 

377 df[target_col] = df['close'].ewm(span=period, adjust=False).mean() 

378 else: 

379 df[target_col] = df['close'].rolling(window=period).mean() 

380 return df 

381 

382 @staticmethod 

383 def _compute_bb(df: pd.DataFrame, period: int, std_dev: float, prefix: str = "bb") -> pd.DataFrame: 

384 """볼린저 밴드 계산 및 컬럼 추가""" 

385 mb = df['close'].rolling(window=period).mean() 

386 std = df['close'].rolling(window=period).std(ddof=0) 

387 df[f'{prefix}_middle'] = mb 

388 df[f'{prefix}_upper'] = mb + (std * std_dev) 

389 df[f'{prefix}_lower'] = mb - (std * std_dev) 

390 return df 

391 

392 @staticmethod 

393 def _compute_rsi(df: pd.DataFrame, period: int, target_col: str = "rsi") -> pd.DataFrame: 

394 """RSI 계산 및 컬럼 추가""" 

395 delta = df['close'].diff() 

396 u = delta.clip(lower=0) 

397 d = -1 * delta.clip(upper=0) 

398 au = u.ewm(alpha=1/period, min_periods=period, adjust=False).mean() 

399 ad = d.ewm(alpha=1/period, min_periods=period, adjust=False).mean() 

400 rs = au / ad 

401 df[target_col] = 100 - (100 / (1 + rs)) 

402 return df 

403 

404 def calc_bb_widths_sync( 

405 self, 

406 ohlcv_data: List[Dict], 

407 period: int = 20, 

408 multiplier: float = 2.0, 

409 ) -> List[float]: 

410 """이미 확보한 OHLCV 데이터로 BB 폭(upper-lower) 목록을 동기 계산합니다. 

411 async 오버헤드 없이 순수 pandas 계산만 수행합니다.""" 

412 resp = self._calculate_bollinger_bands_full("", ohlcv_data, period, multiplier) 

413 if resp.rt_cd != ErrorCode.SUCCESS.value or not resp.data: 

414 return [] 

415 widths = [] 

416 for band in resp.data: 

417 upper = band.get('upper') if isinstance(band, dict) else getattr(band, 'upper', None) 

418 lower = band.get('lower') if isinstance(band, dict) else getattr(band, 'lower', None) 

419 if upper is not None and lower is not None: 

420 widths.append(upper - lower) 

421 return widths 

422 

423 def calc_rs_sync( 

424 self, 

425 ohlcv_data: List[Dict], 

426 period_days: int = 63, 

427 ) -> float: 

428 """이미 확보한 OHLCV 데이터로 RS 수익률을 동기 계산합니다. 

429 async 오버헤드 없이 O(1) 리스트 인덱싱만 수행합니다.""" 

430 try: 

431 if len(ohlcv_data) < period_days + 1: 

432 return 0.0 

433 recent_close = self._safe_float(ohlcv_data[-1].get('close')) 

434 past_close = self._safe_float(ohlcv_data[-(period_days + 1)].get('close')) 

435 if recent_close is None or past_close is None or past_close <= 0: 

436 return 0.0 

437 return round(((recent_close - past_close) / past_close) * 100, 2) 

438 except Exception: 

439 return 0.0 

440 

441 def _calculate_bollinger_bands_full(self, stock_code, data, period, std_dev) -> ResCommonResponse: 

442 """볼린저 밴드 전체 계산 (내부용)""" 

443 try: 

444 df = self._to_dataframe(data) 

445 

446 df['MB'] = df['close'].rolling(window=period).mean() 

447 df['std'] = df['close'].rolling(window=period).std(ddof=0) 

448 df['UB'] = df['MB'] + (df['std'] * std_dev) 

449 df['LB'] = df['MB'] - (df['std'] * std_dev) 

450 

451 results = [ 

452 { 

453 "code": stock_code, "date": str(row.date), 

454 "close": self._safe_float(row.close), 

455 "middle": self._safe_float(row.MB), 

456 "upper": self._safe_float(row.UB), 

457 "lower": self._safe_float(row.LB) 

458 } 

459 for row in df.itertuples(index=False) 

460 ] 

461 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=results) 

462 except Exception as e: 

463 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None) 

464 

465 def _calculate_rsi_series(self, stock_code, data, period) -> ResCommonResponse: 

466 """RSI 시계열 전체 계산 (내부용)""" 

467 try: 

468 df = self._to_dataframe(data) 

469 

470 # 공통 로직 사용 

471 df = self._compute_rsi(df, period, target_col="rsi") 

472 

473 results = [ 

474 { 

475 "code": stock_code, "date": str(row.date), 

476 "close": self._safe_float(row.close), 

477 "rsi": self._safe_float(row.rsi) 

478 } 

479 for row in df.itertuples(index=False) 

480 ] 

481 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=results) 

482 except Exception as e: 

483 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None) 

484 

485 def _calculate_moving_average_full(self, stock_code, data, period, method) -> ResCommonResponse: 

486 """이동평균 전체 계산 (내부용)""" 

487 try: 

488 df = self._to_dataframe(data) 

489 

490 # 공통 로직 사용 

491 df = self._compute_ma(df, period, method, target_col="ma") 

492 

493 results = [ 

494 { 

495 "code": stock_code, "date": str(row.date), 

496 "close": self._safe_float(row.close), 

497 "ma": self._safe_float(row.ma) 

498 } 

499 for row in df.itertuples(index=False) 

500 ] 

501 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="OK", data=results) 

502 except Exception as e: 

503 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None) 

504 

505 def _calculate_indicators_full(self, stock_code: str, ohlcv_data: List[Dict]) -> ResCommonResponse: 

506 """전체 데이터를 받아 지표를 계산하는 내부 메서드 (Vectorized 최적화 버전)""" 

507 try: 

508 # 1. DataFrame 변환 (1회 수행) 

509 df = self._to_dataframe(ohlcv_data) 

510 if df.empty: 

511 return ResCommonResponse(rt_cd=ErrorCode.EMPTY_VALUES.value, msg1="데이터 없음", data=None) 

512 

513 # 2. 지표 계산 (Vectorized operations) 

514 # MA 

515 for p in [5, 10, 20, 60, 120]: 

516 df = self._compute_ma(df, p, "sma", target_col=f"ma{p}") 

517 

518 # BB (20일, 2.0) 

519 df = self._compute_bb(df, 20, 2.0, prefix="bb") 

520 

521 # RS (63일 등락률) 

522 rs_period = 63 

523 df['rs'] = df['close'].pct_change(periods=rs_period) * 100 

524 

525 # 3. 결과 포맷팅 전처리 (빠른 일괄 변환) 

526 # 3-1. 응답 규격을 맞추기 위해 date 컬럼을 일괄 문자열로 캐스팅 

527 df['date'] = df['date'].astype(str) 

528 

529 # 3-2. 숫자형 컬럼의 무한대(inf)를 NaN으로 치환 (문자열 컬럼 순회 방지로 속도 극대화) 

530 num_cols = df.select_dtypes(include=[np.number]).columns 

531 df[num_cols] = df[num_cols].replace([np.inf, -np.inf], np.nan) 

532 

533 # 3-3. JSON 직렬화를 위해 모든 NaN을 None으로 일괄 치환 

534 # (Pandas float64 자동 캐스팅 방지를 위해 object 명시적 캐스팅 적용) 

535 df = df.astype(object).where(pd.notnull(df), None) 

536 

537 # 4. 결과 포맷팅 (Pandas to_dict 활용 - 파이썬 반복문 0번) 

538 indicators = {} 

539 

540 # MA 추출 

541 for p in [5, 10, 20, 60, 120]: 

542 ma_key = f'ma{p}' 

543 indicators[ma_key] = df[['date', 'close', ma_key]].rename( 

544 columns={ma_key: 'ma'} 

545 ).to_dict('records') 

546 

547 # BB 추출 (이름 매핑 및 code 컬럼 동적 추가) 

548 indicators["bb"] = df[['date', 'close', 'bb_middle', 'bb_upper', 'bb_lower']].rename( 

549 columns={'bb_middle': 'middle', 'bb_upper': 'upper', 'bb_lower': 'lower'} 

550 ).assign(code=stock_code)[['code', 'date', 'close', 'middle', 'upper', 'lower']].to_dict('records') 

551 

552 # RS 추출 

553 indicators["rs"] = df[['date', 'rs']].to_dict('records') 

554 

555 return ResCommonResponse(rt_cd=ErrorCode.SUCCESS.value, msg1="성공", data=indicators) 

556 

557 except Exception as e: 

558 logging.getLogger(__name__).exception(f"지표 통합 계산 중 오류: {e}") 

559 return ResCommonResponse(rt_cd=ErrorCode.UNKNOWN_ERROR.value, msg1=str(e), data=None)