Coverage for services / telegram_notifier.py: 95%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1import aiohttp
2import logging
3from typing import Optional, List, Dict
4from services.notification_service import NotificationEvent, NotificationCategory, NotificationLevel
5import unicodedata
7logger = logging.getLogger(__name__)
9class TelegramNotifier:
10 """Telegram 알림을 비동기적으로 전송하는 핸들러 클래스입니다."""
12 def __init__(self, bot_token: str, chat_id: str):
13 self.bot_token = bot_token
14 self.chat_id = chat_id
15 self.api_url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
16 # 허용할 카테고리 목록 설정 (기본값: STRATEGY, BACKGROUND)
17 self.allowed_categories = [NotificationCategory.STRATEGY, NotificationCategory.BACKGROUND]
19 async def handle_event(self, event: NotificationEvent) -> None:
20 """NotificationService에서 호출할 비동기 콜백 메서드입니다."""
21 # ★ 필터링 로직: 허용된 카테고리가 설정되어 있고, 현재 이벤트 카테고리가 거기에 없으면 무시
22 if self.allowed_categories is not None and event.category not in self.allowed_categories:
23 return
25 # 특정 레벨(예: info, warning, error, critical)에 따라 이모지나 포맷을 다르게 할 수 있습니다.
26 level_emoji = {
27 "info": "ℹ️",
28 "warning": "⚠️",
29 "error": "❌",
30 "critical": "🚨"
31 }.get(event.level.value.lower(), "🔔")
33 # 메타데이터에 수익률(return_rate) 정보가 있으면 메시지 본문에 이모지와 함께 삽입
34 message_body = event.message
35 if event.metadata and "return_rate" in event.metadata:
36 rr = event.metadata.get("return_rate")
37 if rr is not None: 37 ↛ 51line 37 didn't jump to line 51 because the condition on line 37 was always true
38 if rr > 0:
39 rt_emoji = "📈"
40 elif rr < 0:
41 rt_emoji = "📉"
42 else:
43 rt_emoji = "➖"
45 if "사유:" in message_body:
46 message_body = message_body.replace("사유:", f"{rt_emoji} 수익: {rr:+.2f}%\n사유:")
47 else:
48 message_body += f"\n{rt_emoji} 수익: {rr:+.2f}%"
50 # 텔레그램으로 보낼 메시지 포맷 구성
51 text = (
52 f"{level_emoji} <b>[{event.category.value}] {event.title}</b>\n"
53 f"시간: {event.timestamp}\n"
54 f"내용:\n{message_body}"
55 )
57 payload = {
58 "chat_id": self.chat_id,
59 "text": text,
60 "parse_mode": "HTML"
61 }
63 # 비동기 HTTP 요청으로 Telegram API 호출
64 try:
65 async with aiohttp.ClientSession() as session:
66 async with session.post(self.api_url, json=payload) as response:
67 if response.status != 200:
68 response_text = await response.text()
69 logger.error(f"Telegram 알림 전송 실패: {response.status} - {response_text}")
70 except Exception as e:
71 logger.error(f"Telegram 알림 전송 중 예외 발생: {e}")
74class TelegramReporter:
75 """텔레그램으로 정형화된 리포트(랭킹 등)를 전송하는 클래스입니다."""
77 def __init__(self, bot_token: str, chat_id: str):
78 self.bot_token = bot_token
79 self.chat_id = chat_id
80 self.api_url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
82 async def _send_message(self, text: str) -> bool:
83 """텔레그램으로 메시지를 비동기적으로 전송하는 헬퍼 메서드입니다."""
84 if not text:
85 return True
87 payload = {
88 "chat_id": self.chat_id,
89 "text": text,
90 "parse_mode": "HTML",
91 "disable_web_page_preview": True,
92 }
93 try:
94 async with aiohttp.ClientSession() as session:
95 async with session.post(self.api_url, json=payload) as response:
96 if response.status != 200:
97 response_text = await response.text()
98 logger.error(f"Telegram 리포트 전송 실패: {response.status} - {response_text}")
99 return False
100 return True
101 except Exception as e:
102 logger.error(f"Telegram 리포트 전송 중 예외 발생: {e}")
103 return False
105 def _format_ranking_table(self, title: str, ranking_data: List[Dict], value_field: str, limit: int = 10, divisor: float = 100_000_000, show_ratio: bool = True) -> str:
106 """랭킹 데이터를 HTML 포맷의 문자열로 변환합니다."""
107 if not ranking_data:
108 return ""
110 header = f"<b>🏆 {title} (Top {limit})</b>\n"
111 table = "<pre>"
112 if show_ratio: 112 ↛ 116line 112 didn't jump to line 116 because the condition on line 112 was always true
113 table += "순 종목 등락 금액(억) 비중\n"
114 table += "-" * 38 + "\n"
115 else:
116 table += "순 종목 등락 금액(억)\n"
117 table += "-" * 31 + "\n"
119 for i, item in enumerate(ranking_data[:limit]):
120 rank = i + 1
121 name = item.get('hts_kor_isnm', 'N/A')
123 # 1. 종목명 자르기 (최대 8칸)
124 display_name = ""
125 display_width = 0
126 for char in name:
127 char_width = 2 if unicodedata.east_asian_width(char) in ('F', 'W', 'A') else 1
128 if display_width + char_width > 8:
129 break
130 display_name += char
131 display_width += char_width
133 name_padding = 8 - display_width
134 name_str = f"{display_name}{' ' * name_padding}"
136 # 2. 등락률
137 try:
138 rate = float(item.get('prdy_ctrt', '0') or '0')
139 if rate > 0:
140 rate_str = f"+{rate:.1f}%"
141 elif rate < 0:
142 rate_str = f"{rate:.1f}%"
143 else:
144 rate_str = "0.0%"
145 except:
146 rate_str = "-"
148 # 3. 금액 (억)
149 value_str = item.get(value_field, "0")
150 raw_val = 0
151 try:
152 # 금액(원)을 억 단위로 변환
153 raw_val = float(value_str or "0")
154 val_100m = raw_val / divisor
155 val_str = f"{val_100m:,.0f}"
156 except (ValueError, TypeError):
157 val_str = "-"
159 if show_ratio: 159 ↛ 175line 159 didn't jump to line 175 because the condition on line 159 was always true
160 # 4. 비중 (순매수금액 / 총거래대금)
161 ratio_str = "-"
162 try:
163 acml_tr_pbmn = float(item.get('acml_tr_pbmn', '0') or '0')
164 if acml_tr_pbmn > 0:
165 # divisor=100이면 raw_val은 백만원 단위 -> 원 단위로 변환
166 net_won = raw_val * 1_000_000 if divisor == 100 else raw_val
168 ratio = abs(net_won) / acml_tr_pbmn * 100
169 ratio_str = f"{ratio:.1f}%"
170 except:
171 pass
173 table += f"{rank:<2} {name_str} {rate_str:>7} {val_str:>8} {ratio_str:>6}\n"
174 else:
175 table += f"{rank:<2} {name_str} {rate_str:>7} {val_str:>8}\n"
177 table += "</pre>"
178 return header + table
180 async def send_ranking_report(self, rankings: Dict[str, List[Dict]], report_date: str):
181 """
182 다양한 랭킹 정보를 하나의 리포트로 묶어 텔레그램에 전송합니다.
183 rankings 딕셔너리는 'foreign_buy', 'all_stocks' 등의 키를 가집니다.
184 """
185 report_parts = []
187 # 1. 개별 랭킹
188 report_parts.append(self._format_ranking_table("외국인 순매수", rankings.get('foreign_buy', []), 'frgn_ntby_tr_pbmn', divisor=100))
189 report_parts.append(self._format_ranking_table("기관 순매수", rankings.get('inst_buy', []), 'orgn_ntby_tr_pbmn', divisor=100))
190 report_parts.append(self._format_ranking_table("프로그램 순매수", rankings.get('program_buy', []), 'whol_smtn_ntby_tr_pbmn', divisor=100_000_000))
191 report_parts.append(self._format_ranking_table("외국인 순매도", rankings.get('foreign_sell', []), 'frgn_ntby_tr_pbmn', divisor=100))
192 report_parts.append(self._format_ranking_table("기관 순매도", rankings.get('inst_sell', []), 'orgn_ntby_tr_pbmn', divisor=100))
193 report_parts.append(self._format_ranking_table("프로그램 순매도", rankings.get('program_sell', []), 'whol_smtn_ntby_tr_pbmn', divisor=100_000_000))
195 # 2. 조합 랭킹
196 all_stocks = rankings.get('all_stocks')
197 program_all_stocks = rankings.get('program_all_stocks')
199 if all_stocks:
200 # 외인+기관
201 fi_combined = []
202 for stock in all_stocks:
203 try:
204 f_net = float(stock.get('frgn_ntby_tr_pbmn', '0') or '0')
205 i_net = float(stock.get('orgn_ntby_tr_pbmn', '0') or '0')
206 new_stock = stock.copy()
207 new_stock['fi_combined_net'] = f_net + i_net
208 fi_combined.append(new_stock)
209 except (ValueError, TypeError): continue
211 report_parts.append(self._format_ranking_table("외인+기관 순매수", sorted(fi_combined, key=lambda x: x['fi_combined_net'], reverse=True), 'fi_combined_net', divisor=100))
212 report_parts.append(self._format_ranking_table("외인+기관 순매도", sorted(fi_combined, key=lambda x: x['fi_combined_net']), 'fi_combined_net', divisor=100))
214 # 외인+기관+프로그램
215 if program_all_stocks:
216 prog_map = {p['stck_shrn_iscd']: float(p.get('whol_smtn_ntby_tr_pbmn', '0') or '0') for p in program_all_stocks}
217 fip_combined = []
218 for stock in all_stocks:
219 try:
220 f_net = float(stock.get('frgn_ntby_tr_pbmn', '0') or '0')
221 i_net = float(stock.get('orgn_ntby_tr_pbmn', '0') or '0')
222 p_net = prog_map.get(stock.get('stck_shrn_iscd'), 0.0)
223 new_stock = stock.copy()
224 new_stock['fip_combined_net'] = f_net + i_net + (p_net / 1_000_000)
225 fip_combined.append(new_stock)
226 except (ValueError, TypeError): continue
228 report_parts.append(self._format_ranking_table("외인+기관+프로그램 순매수", sorted(fip_combined, key=lambda x: x['fip_combined_net'], reverse=True), 'fip_combined_net', divisor=100))
229 report_parts.append(self._format_ranking_table("외인+기관+프로그램 순매도", sorted(fip_combined, key=lambda x: x['fip_combined_net']), 'fip_combined_net', divisor=100))
231 # 3. 거래대금
232 report_parts.append(self._format_ranking_table("거래대금 상위", rankings.get('trading_value', []), 'acml_tr_pbmn', divisor=100_000_000, show_ratio=False))
234 # 메시지 분할 전송
235 title = f"🔔 <b>장 마감 랭킹 리포트 ({report_date})</b>\n"
236 await self._send_message(title)
238 current_message = ""
239 for part in filter(None, report_parts):
240 if len(current_message.encode('utf-8')) + len(part.encode('utf-8')) + 2 > 4096: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 await self._send_message(current_message)
242 current_message = part
243 else:
244 current_message += ("\n\n" + part) if current_message else part
246 if current_message: 246 ↛ exitline 246 didn't return from function 'send_ranking_report' because the condition on line 246 was always true
247 await self._send_message(current_message)