Coverage for services / telegram_notifier.py: 95%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1import aiohttp 

2import logging 

3from typing import Optional, List, Dict 

4from services.notification_service import NotificationEvent, NotificationCategory, NotificationLevel 

5import unicodedata 

6 

7logger = logging.getLogger(__name__) 

8 

9class TelegramNotifier: 

10 """Telegram 알림을 비동기적으로 전송하는 핸들러 클래스입니다.""" 

11 

12 def __init__(self, bot_token: str, chat_id: str): 

13 self.bot_token = bot_token 

14 self.chat_id = chat_id 

15 self.api_url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" 

16 # 허용할 카테고리 목록 설정 (기본값: STRATEGY, BACKGROUND) 

17 self.allowed_categories = [NotificationCategory.STRATEGY, NotificationCategory.BACKGROUND] 

18 

19 async def handle_event(self, event: NotificationEvent) -> None: 

20 """NotificationService에서 호출할 비동기 콜백 메서드입니다.""" 

21 # ★ 필터링 로직: 허용된 카테고리가 설정되어 있고, 현재 이벤트 카테고리가 거기에 없으면 무시 

22 if self.allowed_categories is not None and event.category not in self.allowed_categories: 

23 return 

24 

25 # 특정 레벨(예: info, warning, error, critical)에 따라 이모지나 포맷을 다르게 할 수 있습니다. 

26 level_emoji = { 

27 "info": "ℹ️", 

28 "warning": "⚠️", 

29 "error": "❌", 

30 "critical": "🚨" 

31 }.get(event.level.value.lower(), "🔔") 

32 

33 # 메타데이터에 수익률(return_rate) 정보가 있으면 메시지 본문에 이모지와 함께 삽입 

34 message_body = event.message 

35 if event.metadata and "return_rate" in event.metadata: 

36 rr = event.metadata.get("return_rate") 

37 if rr is not None: 37 ↛ 51line 37 didn't jump to line 51 because the condition on line 37 was always true

38 if rr > 0: 

39 rt_emoji = "📈" 

40 elif rr < 0: 

41 rt_emoji = "📉" 

42 else: 

43 rt_emoji = "➖" 

44 

45 if "사유:" in message_body: 

46 message_body = message_body.replace("사유:", f"{rt_emoji} 수익: {rr:+.2f}%\n사유:") 

47 else: 

48 message_body += f"\n{rt_emoji} 수익: {rr:+.2f}%" 

49 

50 # 텔레그램으로 보낼 메시지 포맷 구성 

51 text = ( 

52 f"{level_emoji} <b>[{event.category.value}] {event.title}</b>\n" 

53 f"시간: {event.timestamp}\n" 

54 f"내용:\n{message_body}" 

55 ) 

56 

57 payload = { 

58 "chat_id": self.chat_id, 

59 "text": text, 

60 "parse_mode": "HTML" 

61 } 

62 

63 # 비동기 HTTP 요청으로 Telegram API 호출 

64 try: 

65 async with aiohttp.ClientSession() as session: 

66 async with session.post(self.api_url, json=payload) as response: 

67 if response.status != 200: 

68 response_text = await response.text() 

69 logger.error(f"Telegram 알림 전송 실패: {response.status} - {response_text}") 

70 except Exception as e: 

71 logger.error(f"Telegram 알림 전송 중 예외 발생: {e}") 

72 

73 

74class TelegramReporter: 

75 """텔레그램으로 정형화된 리포트(랭킹 등)를 전송하는 클래스입니다.""" 

76 

77 def __init__(self, bot_token: str, chat_id: str): 

78 self.bot_token = bot_token 

79 self.chat_id = chat_id 

80 self.api_url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" 

81 

82 async def _send_message(self, text: str) -> bool: 

83 """텔레그램으로 메시지를 비동기적으로 전송하는 헬퍼 메서드입니다.""" 

84 if not text: 

85 return True 

86 

87 payload = { 

88 "chat_id": self.chat_id, 

89 "text": text, 

90 "parse_mode": "HTML", 

91 "disable_web_page_preview": True, 

92 } 

93 try: 

94 async with aiohttp.ClientSession() as session: 

95 async with session.post(self.api_url, json=payload) as response: 

96 if response.status != 200: 

97 response_text = await response.text() 

98 logger.error(f"Telegram 리포트 전송 실패: {response.status} - {response_text}") 

99 return False 

100 return True 

101 except Exception as e: 

102 logger.error(f"Telegram 리포트 전송 중 예외 발생: {e}") 

103 return False 

104 

105 def _format_ranking_table(self, title: str, ranking_data: List[Dict], value_field: str, limit: int = 10, divisor: float = 100_000_000, show_ratio: bool = True) -> str: 

106 """랭킹 데이터를 HTML 포맷의 문자열로 변환합니다.""" 

107 if not ranking_data: 

108 return "" 

109 

110 header = f"<b>🏆 {title} (Top {limit})</b>\n" 

111 table = "<pre>" 

112 if show_ratio: 112 ↛ 116line 112 didn't jump to line 116 because the condition on line 112 was always true

113 table += "순 종목 등락 금액(억) 비중\n" 

114 table += "-" * 38 + "\n" 

115 else: 

116 table += "순 종목 등락 금액(억)\n" 

117 table += "-" * 31 + "\n" 

118 

119 for i, item in enumerate(ranking_data[:limit]): 

120 rank = i + 1 

121 name = item.get('hts_kor_isnm', 'N/A') 

122 

123 # 1. 종목명 자르기 (최대 8칸) 

124 display_name = "" 

125 display_width = 0 

126 for char in name: 

127 char_width = 2 if unicodedata.east_asian_width(char) in ('F', 'W', 'A') else 1 

128 if display_width + char_width > 8: 

129 break 

130 display_name += char 

131 display_width += char_width 

132 

133 name_padding = 8 - display_width 

134 name_str = f"{display_name}{' ' * name_padding}" 

135 

136 # 2. 등락률 

137 try: 

138 rate = float(item.get('prdy_ctrt', '0') or '0') 

139 if rate > 0: 

140 rate_str = f"+{rate:.1f}%" 

141 elif rate < 0: 

142 rate_str = f"{rate:.1f}%" 

143 else: 

144 rate_str = "0.0%" 

145 except: 

146 rate_str = "-" 

147 

148 # 3. 금액 (억) 

149 value_str = item.get(value_field, "0") 

150 raw_val = 0 

151 try: 

152 # 금액(원)을 억 단위로 변환 

153 raw_val = float(value_str or "0") 

154 val_100m = raw_val / divisor 

155 val_str = f"{val_100m:,.0f}" 

156 except (ValueError, TypeError): 

157 val_str = "-" 

158 

159 if show_ratio: 159 ↛ 175line 159 didn't jump to line 175 because the condition on line 159 was always true

160 # 4. 비중 (순매수금액 / 총거래대금) 

161 ratio_str = "-" 

162 try: 

163 acml_tr_pbmn = float(item.get('acml_tr_pbmn', '0') or '0') 

164 if acml_tr_pbmn > 0: 

165 # divisor=100이면 raw_val은 백만원 단위 -> 원 단위로 변환 

166 net_won = raw_val * 1_000_000 if divisor == 100 else raw_val 

167 

168 ratio = abs(net_won) / acml_tr_pbmn * 100 

169 ratio_str = f"{ratio:.1f}%" 

170 except: 

171 pass 

172 

173 table += f"{rank:<2} {name_str} {rate_str:>7} {val_str:>8} {ratio_str:>6}\n" 

174 else: 

175 table += f"{rank:<2} {name_str} {rate_str:>7} {val_str:>8}\n" 

176 

177 table += "</pre>" 

178 return header + table 

179 

180 async def send_ranking_report(self, rankings: Dict[str, List[Dict]], report_date: str): 

181 """ 

182 다양한 랭킹 정보를 하나의 리포트로 묶어 텔레그램에 전송합니다. 

183 rankings 딕셔너리는 'foreign_buy', 'all_stocks' 등의 키를 가집니다. 

184 """ 

185 report_parts = [] 

186 

187 # 1. 개별 랭킹 

188 report_parts.append(self._format_ranking_table("외국인 순매수", rankings.get('foreign_buy', []), 'frgn_ntby_tr_pbmn', divisor=100)) 

189 report_parts.append(self._format_ranking_table("기관 순매수", rankings.get('inst_buy', []), 'orgn_ntby_tr_pbmn', divisor=100)) 

190 report_parts.append(self._format_ranking_table("프로그램 순매수", rankings.get('program_buy', []), 'whol_smtn_ntby_tr_pbmn', divisor=100_000_000)) 

191 report_parts.append(self._format_ranking_table("외국인 순매도", rankings.get('foreign_sell', []), 'frgn_ntby_tr_pbmn', divisor=100)) 

192 report_parts.append(self._format_ranking_table("기관 순매도", rankings.get('inst_sell', []), 'orgn_ntby_tr_pbmn', divisor=100)) 

193 report_parts.append(self._format_ranking_table("프로그램 순매도", rankings.get('program_sell', []), 'whol_smtn_ntby_tr_pbmn', divisor=100_000_000)) 

194 

195 # 2. 조합 랭킹 

196 all_stocks = rankings.get('all_stocks') 

197 program_all_stocks = rankings.get('program_all_stocks') 

198 

199 if all_stocks: 

200 # 외인+기관 

201 fi_combined = [] 

202 for stock in all_stocks: 

203 try: 

204 f_net = float(stock.get('frgn_ntby_tr_pbmn', '0') or '0') 

205 i_net = float(stock.get('orgn_ntby_tr_pbmn', '0') or '0') 

206 new_stock = stock.copy() 

207 new_stock['fi_combined_net'] = f_net + i_net 

208 fi_combined.append(new_stock) 

209 except (ValueError, TypeError): continue 

210 

211 report_parts.append(self._format_ranking_table("외인+기관 순매수", sorted(fi_combined, key=lambda x: x['fi_combined_net'], reverse=True), 'fi_combined_net', divisor=100)) 

212 report_parts.append(self._format_ranking_table("외인+기관 순매도", sorted(fi_combined, key=lambda x: x['fi_combined_net']), 'fi_combined_net', divisor=100)) 

213 

214 # 외인+기관+프로그램 

215 if program_all_stocks: 

216 prog_map = {p['stck_shrn_iscd']: float(p.get('whol_smtn_ntby_tr_pbmn', '0') or '0') for p in program_all_stocks} 

217 fip_combined = [] 

218 for stock in all_stocks: 

219 try: 

220 f_net = float(stock.get('frgn_ntby_tr_pbmn', '0') or '0') 

221 i_net = float(stock.get('orgn_ntby_tr_pbmn', '0') or '0') 

222 p_net = prog_map.get(stock.get('stck_shrn_iscd'), 0.0) 

223 new_stock = stock.copy() 

224 new_stock['fip_combined_net'] = f_net + i_net + (p_net / 1_000_000) 

225 fip_combined.append(new_stock) 

226 except (ValueError, TypeError): continue 

227 

228 report_parts.append(self._format_ranking_table("외인+기관+프로그램 순매수", sorted(fip_combined, key=lambda x: x['fip_combined_net'], reverse=True), 'fip_combined_net', divisor=100)) 

229 report_parts.append(self._format_ranking_table("외인+기관+프로그램 순매도", sorted(fip_combined, key=lambda x: x['fip_combined_net']), 'fip_combined_net', divisor=100)) 

230 

231 # 3. 거래대금 

232 report_parts.append(self._format_ranking_table("거래대금 상위", rankings.get('trading_value', []), 'acml_tr_pbmn', divisor=100_000_000, show_ratio=False)) 

233 

234 # 메시지 분할 전송 

235 title = f"🔔 <b>장 마감 랭킹 리포트 ({report_date})</b>\n" 

236 await self._send_message(title) 

237 

238 current_message = "" 

239 for part in filter(None, report_parts): 

240 if len(current_message.encode('utf-8')) + len(part.encode('utf-8')) + 2 > 4096: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 await self._send_message(current_message) 

242 current_message = part 

243 else: 

244 current_message += ("\n\n" + part) if current_message else part 

245 

246 if current_message: 246 ↛ exitline 246 didn't return from function 'send_ranking_report' because the condition on line 246 was always true

247 await self._send_message(current_message)