Coverage for strategies / program_buy_follow_strategy.py: 93%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1# strategies/program_buy_follow_strategy.py 

2from __future__ import annotations 

3 

4import logging 

5from dataclasses import dataclass 

6from typing import List, Optional 

7 

8from interfaces.live_strategy import LiveStrategy 

9from common.types import TradeSignal, ErrorCode, ResStockFullInfoApiOutput 

10from services.stock_query_service import StockQueryService 

11from core.market_clock import MarketClock 

12from strategies.base_strategy_config import BaseStrategyConfig 

13from core.logger import get_strategy_logger 

14 

15 

16@dataclass 

17class ProgramBuyFollowConfig(BaseStrategyConfig): 

18 """프로그램 매수 추종 전략 설정.""" 

19 min_program_net_buy: int = 0 # 프로그램 순매수 최소 기준 (> 0) 

20 min_program_buy_ratio: float = 5.0 # 거래대금 대비 프로그램 순매수 금액 비율 (%) 

21 trailing_stop_pct: float = 8.0 # 고가 대비 -8% 하락 시 익절 

22 stop_loss_pct: float = -5.0 # 매수가 대비 -5% 손절 

23 allow_reentry: bool = True # 당일 재진입 허용 여부 (False면 종목당 하루 1회만 매수) 

24 

25 

26class ProgramBuyFollowStrategy(LiveStrategy): 

27 """거래대금 상위 + 프로그램 순매수 추종 전략. 

28 

29 scan(): 

30 1. 거래대금 상위 30종목 조회 

31 2. 각 종목의 프로그램 순매수 금액 비중(거래대금 대비) 확인 

32 3. 기준(5%) 이상인 종목을 내림차순 정렬, BUY 시그널 반환 

33 

34 check_exits(): 

35 - 익절: 당일 고가 대비 <= -trailing_stop_pct 

36 - 손절: 매수가 대비 stop_loss_pct% 

37 - 프로그램 매도 전환: pgtr_ntby_qty < 0 

38 - 시간청산: 장 마감 15분 전 

39 """ 

40 

41 def __init__( 

42 self, 

43 stock_query_service: StockQueryService, 

44 market_clock: MarketClock, 

45 config: Optional[ProgramBuyFollowConfig] = None, 

46 logger: Optional[logging.Logger] = None, 

47 ): 

48 self._sqs = stock_query_service 

49 self._tm = market_clock 

50 self._cfg = config or ProgramBuyFollowConfig() 

51 if logger: 

52 self._logger = logger 

53 else: 

54 self._logger = get_strategy_logger("ProgramBuyFollow") 

55 

56 self._bought_today = set() 

57 self._last_date = "" 

58 

59 @property 

60 def name(self) -> str: 

61 return "프로그램매수추종" 

62 

63 async def scan(self) -> List[TradeSignal]: 

64 signals: List[TradeSignal] = [] 

65 self._logger.info({"event": "scan_started", "strategy_name": self.name}) 

66 

67 # 날짜 변경 시 당일 매수 기록 초기화 

68 today = self._tm.get_current_kst_time().strftime("%Y%m%d") 

69 if self._last_date != today: 

70 self._bought_today.clear() 

71 self._last_date = today 

72 

73 # 1) 거래대금 상위 종목 조회 

74 resp = await self._sqs.get_top_trading_value_stocks() 

75 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value: 

76 self._logger.warning({ 

77 "event": "scan_failed", 

78 "reason": "Failed to get top trading value stocks", 

79 "response": vars(resp) if resp else None, 

80 }) 

81 return signals 

82 

83 candidates = resp.data or [] 

84 self._logger.info({"event": "scan_candidates_fetched", "count": len(candidates)}) 

85 

86 scored = [] 

87 for stock in candidates: 

88 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or "" 

89 stock_name = stock.get("hts_kor_isnm", code) 

90 log_data = {"code": code, "name": stock_name} 

91 

92 if not code: 

93 continue 

94 

95 # 재진입 불허 설정 시, 이미 매수한 종목은 스킵 

96 if not self._cfg.allow_reentry and code in self._bought_today: 

97 continue 

98 

99 try: 

100 # 2) 종목 상세 정보 조회 (pgtr_ntby_qty 포함) 

101 full_resp = await self._sqs.get_current_price(code, caller=self.name) 

102 if not full_resp or full_resp.rt_cd != ErrorCode.SUCCESS.value: 

103 log_data["reason"] = "Failed to get full stock price" 

104 self._logger.info({"event": "candidate_rejected", **log_data}) 

105 continue 

106 

107 output = self._extract_output(full_resp) 

108 if output is None: 

109 log_data["reason"] = "No 'output' in full response" 

110 self._logger.info({"event": "candidate_rejected", **log_data}) 

111 continue 

112 

113 pgtr_ntby = self._get_int_field(output, "pgtr_ntby_qty") 

114 current = self._get_int_field(output, "stck_prpr") 

115 acml_tr_pbmn = self._get_int_field(output, "acml_tr_pbmn") 

116 

117 # 프로그램 순매수 금액 및 비중 계산 

118 pg_buy_amt = pgtr_ntby * current 

119 pg_ratio = (pg_buy_amt / acml_tr_pbmn * 100) if acml_tr_pbmn > 0 else 0.0 

120 

121 log_data.update({ 

122 "program_net_buy": pgtr_ntby, 

123 "current_price": current, 

124 "pg_ratio": round(pg_ratio, 2), 

125 "trade_value": acml_tr_pbmn 

126 }) 

127 

128 # 조건: 순매수 수량 > 0 AND 거래대금 대비 비중 >= 5% 

129 if pgtr_ntby <= 0 or pg_ratio < self._cfg.min_program_buy_ratio: 

130 log_data["reason"] = f"Ratio {pg_ratio:.2f}% < {self._cfg.min_program_buy_ratio}% or Not net buy" 

131 self._logger.info({"event": "candidate_rejected", **log_data}) 

132 continue 

133 

134 if current <= 0: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 log_data["reason"] = "Current price is zero" 

136 self._logger.info({"event": "candidate_rejected", **log_data}) 

137 continue 

138 

139 scored.append((pg_ratio, code, stock_name, current, log_data)) 

140 

141 except Exception as e: 

142 self._logger.error({ 

143 "event": "scan_error", "code": code, "error": str(e), 

144 }, exc_info=True) 

145 

146 # 3) 프로그램 순매수 비중 높은 순으로 정렬 

147 scored.sort(key=lambda x: x[0], reverse=True) 

148 

149 for pg_ratio, code, stock_name, current, log_data in scored: 

150 pgtr_ntby = log_data.get("program_net_buy", 0) 

151 trade_value = log_data.get("trade_value", 0) 

152 pg_buy_amt = pgtr_ntby * current 

153 

154 reason_msg = ( 

155 f"PG추종(PG매수 {pg_buy_amt // 100_000_000:,}억({pg_ratio:.1f}%), " 

156 f"누적대금 {trade_value // 100_000_000:,}억)" 

157 ) 

158 signals.append(TradeSignal( 

159 code=code, name=stock_name, action="BUY", price=current, qty=1, 

160 reason=reason_msg, strategy_name=self.name, 

161 )) 

162 self._bought_today.add(code) # 매수 신호 발생 기록 

163 self._logger.info({ 

164 "event": "buy_signal_generated", 

165 "code": code, "name": stock_name, "price": current, 

166 "reason": reason_msg, "data": log_data, 

167 }) 

168 

169 self._logger.info({"event": "scan_finished", "signals_found": len(signals)}) 

170 return signals 

171 

172 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]: 

173 signals: List[TradeSignal] = [] 

174 self._logger.info({"event": "check_exits_started", "holdings_count": len(holdings)}) 

175 

176 now = self._tm.get_current_kst_time() 

177 close_time = self._tm.get_market_close_time() 

178 minutes_to_close = (close_time - now).total_seconds() / 60 

179 

180 for hold in holdings: 

181 code = str(hold.get("code", "")) 

182 buy_price = hold.get("buy_price", 0) 

183 stock_name = hold.get("name", code) 

184 log_data = {"code": code, "name": stock_name, "buy_price": buy_price} 

185 

186 if not code or not buy_price: 

187 continue 

188 

189 try: 

190 full_resp = await self._sqs.get_current_price(code, caller=self.name) 

191 if not full_resp or full_resp.rt_cd != ErrorCode.SUCCESS.value: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 self._logger.warning({ 

193 "event": "check_exits_failed", 

194 "reason": "Failed to get current price for holding", 

195 **log_data, 

196 }) 

197 continue 

198 

199 output = self._extract_output(full_resp) 

200 if output is None: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 continue 

202 

203 current = self._get_int_field(output, "stck_prpr") 

204 high = self._get_int_field(output, "stck_hgpr") 

205 pgtr_ntby = self._get_int_field(output, "pgtr_ntby_qty") 

206 log_data.update({"current_price": current, "day_high": high, "program_net_buy": pgtr_ntby}) 

207 

208 if current <= 0 or high <= 0: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 continue 

210 

211 pnl_pct = ((current - buy_price) / buy_price) * 100 

212 drop_from_high = ((current - high) / high) * 100 

213 log_data.update({"pnl_pct": round(pnl_pct, 2), "drop_from_high_pct": round(drop_from_high, 2)}) 

214 

215 reason = "" 

216 should_sell = False 

217 

218 if drop_from_high <= -self._cfg.trailing_stop_pct: 

219 reason = f"익절(트레일링): 고가({high:,})대비 {drop_from_high:.1f}%" 

220 should_sell = True 

221 elif pnl_pct <= self._cfg.stop_loss_pct: 

222 reason = f"손절: 매수가대비 {pnl_pct:.1f}%" 

223 should_sell = True 

224 elif pgtr_ntby < 0: 

225 reason = f"프로그램매도전환: 순매수 {pgtr_ntby:,}주" 

226 should_sell = True 

227 elif minutes_to_close <= 15: 227 ↛ 231line 227 didn't jump to line 231 because the condition on line 227 was always true

228 reason = f"시간청산: 장마감 {minutes_to_close:.0f}분전" 

229 should_sell = True 

230 

231 if should_sell: 231 ↛ 244line 231 didn't jump to line 244 because the condition on line 231 was always true

232 stock_name_from_api = self._get_str_field(output, "hts_kor_isnm") or stock_name 

233 holding_qty = int(hold.get("qty", 1)) 

234 signals.append(TradeSignal( 

235 code=code, name=stock_name_from_api, action="SELL", price=current, qty=holding_qty, 

236 reason=reason, strategy_name=self.name, 

237 )) 

238 self._logger.info({ 

239 "event": "sell_signal_generated", 

240 "code": code, "name": stock_name_from_api, "price": current, 

241 "reason": reason, "data": log_data, 

242 }) 

243 else: 

244 self._logger.info({ "event": "hold_checked", "code": code, "reason": "No exit condition met", "data": log_data }) 

245 

246 

247 except Exception as e: 

248 self._logger.error({ 

249 "event": "check_exits_error", "code": code, "error": str(e) 

250 }, exc_info=True) 

251 

252 self._logger.info({"event": "check_exits_finished", "signals_found": len(signals)}) 

253 return signals 

254 

255 # ── 내부 유틸 ── 

256 

257 @staticmethod 

258 def _extract_output(resp): 

259 """API 응답에서 output 객체(dict 또는 dataclass) 추출.""" 

260 data = resp.data 

261 if isinstance(data, dict): 

262 return data.get("output") 

263 return data 

264 

265 @staticmethod 

266 def _get_int_field(output, field_name: str) -> int: 

267 if isinstance(output, dict): 

268 val = output.get(field_name, "0") 

269 else: 

270 val = getattr(output, field_name, "0") 

271 try: 

272 return int(val or "0") 

273 except (ValueError, TypeError): 

274 return 0 

275 

276 @staticmethod 

277 def _get_str_field(output, field_name: str) -> str: 

278 if isinstance(output, dict): 

279 return str(output.get(field_name, "") or "") 

280 return str(getattr(output, field_name, "") or "")