Files
ai-trade/modules/services/telegram.py

35 lines
1.2 KiB
Python

import requests
import os
import threading
from modules.config import Config
class TelegramMessenger:
def __init__(self, token=None, chat_id=None):
# 환경 변수에서 로드하거나 인자로 받음
self.token = token or Config.TELEGRAM_BOT_TOKEN
self.chat_id = chat_id or Config.TELEGRAM_CHAT_ID
if not self.token or not self.chat_id:
print("⚠️ [Telegram] Token or Chat ID not found.")
def send_message(self, message):
"""별도 스레드로 메시지를 전송하여 메인 루프 블로킹 방지"""
if not self.token or not self.chat_id:
return
def _send():
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
payload = {
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown"
}
try:
requests.post(url, json=payload, timeout=5)
except Exception as e:
print(f"⚠️ [Telegram] Error: {e}")
# 스레드 실행 (Fire-and-forget)
threading.Thread(target=_send, daemon=True).start()