Coverage for strategies / program_buy_follow_strategy.py: 93%
165 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1# strategies/program_buy_follow_strategy.py
2from __future__ import annotations
4import logging
5from dataclasses import dataclass
6from typing import List, Optional
8from interfaces.live_strategy import LiveStrategy
9from common.types import TradeSignal, ErrorCode, ResStockFullInfoApiOutput
10from services.stock_query_service import StockQueryService
11from core.market_clock import MarketClock
12from strategies.base_strategy_config import BaseStrategyConfig
13from core.logger import get_strategy_logger
16@dataclass
17class ProgramBuyFollowConfig(BaseStrategyConfig):
18 """프로그램 매수 추종 전략 설정."""
19 min_program_net_buy: int = 0 # 프로그램 순매수 최소 기준 (> 0)
20 min_program_buy_ratio: float = 5.0 # 거래대금 대비 프로그램 순매수 금액 비율 (%)
21 trailing_stop_pct: float = 8.0 # 고가 대비 -8% 하락 시 익절
22 stop_loss_pct: float = -5.0 # 매수가 대비 -5% 손절
23 allow_reentry: bool = True # 당일 재진입 허용 여부 (False면 종목당 하루 1회만 매수)
26class ProgramBuyFollowStrategy(LiveStrategy):
27 """거래대금 상위 + 프로그램 순매수 추종 전략.
29 scan():
30 1. 거래대금 상위 30종목 조회
31 2. 각 종목의 프로그램 순매수 금액 비중(거래대금 대비) 확인
32 3. 기준(5%) 이상인 종목을 내림차순 정렬, BUY 시그널 반환
34 check_exits():
35 - 익절: 당일 고가 대비 <= -trailing_stop_pct
36 - 손절: 매수가 대비 stop_loss_pct%
37 - 프로그램 매도 전환: pgtr_ntby_qty < 0
38 - 시간청산: 장 마감 15분 전
39 """
41 def __init__(
42 self,
43 stock_query_service: StockQueryService,
44 market_clock: MarketClock,
45 config: Optional[ProgramBuyFollowConfig] = None,
46 logger: Optional[logging.Logger] = None,
47 ):
48 self._sqs = stock_query_service
49 self._tm = market_clock
50 self._cfg = config or ProgramBuyFollowConfig()
51 if logger:
52 self._logger = logger
53 else:
54 self._logger = get_strategy_logger("ProgramBuyFollow")
56 self._bought_today = set()
57 self._last_date = ""
59 @property
60 def name(self) -> str:
61 return "프로그램매수추종"
63 async def scan(self) -> List[TradeSignal]:
64 signals: List[TradeSignal] = []
65 self._logger.info({"event": "scan_started", "strategy_name": self.name})
67 # 날짜 변경 시 당일 매수 기록 초기화
68 today = self._tm.get_current_kst_time().strftime("%Y%m%d")
69 if self._last_date != today:
70 self._bought_today.clear()
71 self._last_date = today
73 # 1) 거래대금 상위 종목 조회
74 resp = await self._sqs.get_top_trading_value_stocks()
75 if not resp or resp.rt_cd != ErrorCode.SUCCESS.value:
76 self._logger.warning({
77 "event": "scan_failed",
78 "reason": "Failed to get top trading value stocks",
79 "response": vars(resp) if resp else None,
80 })
81 return signals
83 candidates = resp.data or []
84 self._logger.info({"event": "scan_candidates_fetched", "count": len(candidates)})
86 scored = []
87 for stock in candidates:
88 code = stock.get("mksc_shrn_iscd") or stock.get("stck_shrn_iscd") or ""
89 stock_name = stock.get("hts_kor_isnm", code)
90 log_data = {"code": code, "name": stock_name}
92 if not code:
93 continue
95 # 재진입 불허 설정 시, 이미 매수한 종목은 스킵
96 if not self._cfg.allow_reentry and code in self._bought_today:
97 continue
99 try:
100 # 2) 종목 상세 정보 조회 (pgtr_ntby_qty 포함)
101 full_resp = await self._sqs.get_current_price(code, caller=self.name)
102 if not full_resp or full_resp.rt_cd != ErrorCode.SUCCESS.value:
103 log_data["reason"] = "Failed to get full stock price"
104 self._logger.info({"event": "candidate_rejected", **log_data})
105 continue
107 output = self._extract_output(full_resp)
108 if output is None:
109 log_data["reason"] = "No 'output' in full response"
110 self._logger.info({"event": "candidate_rejected", **log_data})
111 continue
113 pgtr_ntby = self._get_int_field(output, "pgtr_ntby_qty")
114 current = self._get_int_field(output, "stck_prpr")
115 acml_tr_pbmn = self._get_int_field(output, "acml_tr_pbmn")
117 # 프로그램 순매수 금액 및 비중 계산
118 pg_buy_amt = pgtr_ntby * current
119 pg_ratio = (pg_buy_amt / acml_tr_pbmn * 100) if acml_tr_pbmn > 0 else 0.0
121 log_data.update({
122 "program_net_buy": pgtr_ntby,
123 "current_price": current,
124 "pg_ratio": round(pg_ratio, 2),
125 "trade_value": acml_tr_pbmn
126 })
128 # 조건: 순매수 수량 > 0 AND 거래대금 대비 비중 >= 5%
129 if pgtr_ntby <= 0 or pg_ratio < self._cfg.min_program_buy_ratio:
130 log_data["reason"] = f"Ratio {pg_ratio:.2f}% < {self._cfg.min_program_buy_ratio}% or Not net buy"
131 self._logger.info({"event": "candidate_rejected", **log_data})
132 continue
134 if current <= 0: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 log_data["reason"] = "Current price is zero"
136 self._logger.info({"event": "candidate_rejected", **log_data})
137 continue
139 scored.append((pg_ratio, code, stock_name, current, log_data))
141 except Exception as e:
142 self._logger.error({
143 "event": "scan_error", "code": code, "error": str(e),
144 }, exc_info=True)
146 # 3) 프로그램 순매수 비중 높은 순으로 정렬
147 scored.sort(key=lambda x: x[0], reverse=True)
149 for pg_ratio, code, stock_name, current, log_data in scored:
150 pgtr_ntby = log_data.get("program_net_buy", 0)
151 trade_value = log_data.get("trade_value", 0)
152 pg_buy_amt = pgtr_ntby * current
154 reason_msg = (
155 f"PG추종(PG매수 {pg_buy_amt // 100_000_000:,}억({pg_ratio:.1f}%), "
156 f"누적대금 {trade_value // 100_000_000:,}억)"
157 )
158 signals.append(TradeSignal(
159 code=code, name=stock_name, action="BUY", price=current, qty=1,
160 reason=reason_msg, strategy_name=self.name,
161 ))
162 self._bought_today.add(code) # 매수 신호 발생 기록
163 self._logger.info({
164 "event": "buy_signal_generated",
165 "code": code, "name": stock_name, "price": current,
166 "reason": reason_msg, "data": log_data,
167 })
169 self._logger.info({"event": "scan_finished", "signals_found": len(signals)})
170 return signals
172 async def check_exits(self, holdings: List[dict]) -> List[TradeSignal]:
173 signals: List[TradeSignal] = []
174 self._logger.info({"event": "check_exits_started", "holdings_count": len(holdings)})
176 now = self._tm.get_current_kst_time()
177 close_time = self._tm.get_market_close_time()
178 minutes_to_close = (close_time - now).total_seconds() / 60
180 for hold in holdings:
181 code = str(hold.get("code", ""))
182 buy_price = hold.get("buy_price", 0)
183 stock_name = hold.get("name", code)
184 log_data = {"code": code, "name": stock_name, "buy_price": buy_price}
186 if not code or not buy_price:
187 continue
189 try:
190 full_resp = await self._sqs.get_current_price(code, caller=self.name)
191 if not full_resp or full_resp.rt_cd != ErrorCode.SUCCESS.value: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 self._logger.warning({
193 "event": "check_exits_failed",
194 "reason": "Failed to get current price for holding",
195 **log_data,
196 })
197 continue
199 output = self._extract_output(full_resp)
200 if output is None: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 continue
203 current = self._get_int_field(output, "stck_prpr")
204 high = self._get_int_field(output, "stck_hgpr")
205 pgtr_ntby = self._get_int_field(output, "pgtr_ntby_qty")
206 log_data.update({"current_price": current, "day_high": high, "program_net_buy": pgtr_ntby})
208 if current <= 0 or high <= 0: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 continue
211 pnl_pct = ((current - buy_price) / buy_price) * 100
212 drop_from_high = ((current - high) / high) * 100
213 log_data.update({"pnl_pct": round(pnl_pct, 2), "drop_from_high_pct": round(drop_from_high, 2)})
215 reason = ""
216 should_sell = False
218 if drop_from_high <= -self._cfg.trailing_stop_pct:
219 reason = f"익절(트레일링): 고가({high:,})대비 {drop_from_high:.1f}%"
220 should_sell = True
221 elif pnl_pct <= self._cfg.stop_loss_pct:
222 reason = f"손절: 매수가대비 {pnl_pct:.1f}%"
223 should_sell = True
224 elif pgtr_ntby < 0:
225 reason = f"프로그램매도전환: 순매수 {pgtr_ntby:,}주"
226 should_sell = True
227 elif minutes_to_close <= 15: 227 ↛ 231line 227 didn't jump to line 231 because the condition on line 227 was always true
228 reason = f"시간청산: 장마감 {minutes_to_close:.0f}분전"
229 should_sell = True
231 if should_sell: 231 ↛ 244line 231 didn't jump to line 244 because the condition on line 231 was always true
232 stock_name_from_api = self._get_str_field(output, "hts_kor_isnm") or stock_name
233 holding_qty = int(hold.get("qty", 1))
234 signals.append(TradeSignal(
235 code=code, name=stock_name_from_api, action="SELL", price=current, qty=holding_qty,
236 reason=reason, strategy_name=self.name,
237 ))
238 self._logger.info({
239 "event": "sell_signal_generated",
240 "code": code, "name": stock_name_from_api, "price": current,
241 "reason": reason, "data": log_data,
242 })
243 else:
244 self._logger.info({ "event": "hold_checked", "code": code, "reason": "No exit condition met", "data": log_data })
247 except Exception as e:
248 self._logger.error({
249 "event": "check_exits_error", "code": code, "error": str(e)
250 }, exc_info=True)
252 self._logger.info({"event": "check_exits_finished", "signals_found": len(signals)})
253 return signals
255 # ── 내부 유틸 ──
257 @staticmethod
258 def _extract_output(resp):
259 """API 응답에서 output 객체(dict 또는 dataclass) 추출."""
260 data = resp.data
261 if isinstance(data, dict):
262 return data.get("output")
263 return data
265 @staticmethod
266 def _get_int_field(output, field_name: str) -> int:
267 if isinstance(output, dict):
268 val = output.get(field_name, "0")
269 else:
270 val = getattr(output, field_name, "0")
271 try:
272 return int(val or "0")
273 except (ValueError, TypeError):
274 return 0
276 @staticmethod
277 def _get_str_field(output, field_name: str) -> str:
278 if isinstance(output, dict):
279 return str(output.get(field_name, "") or "")
280 return str(getattr(output, field_name, "") or "")