"""
텔레그램 봇 - Shared Memory IPC + 양방향 명령 채널
"""
import os
import asyncio
import logging
import subprocess
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
# [디버깅] 파일 로깅 추가
log_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))),
"telegram_bot.log")
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[logging.StreamHandler(), file_handler]
)
logging.getLogger("httpx").setLevel(logging.WARNING)
class TelegramBotServer:
def __init__(self, bot_token, ipc=None, shutdown_event=None):
self.application = Application.builder()\
.token(bot_token)\
.concurrent_updates(True)\
.build()
self.bot_instance = None
self.ipc = ipc
self.shutdown_event = shutdown_event
self.is_shutting_down = False
self.should_restart = False
def set_bot_instance(self, bot):
self.bot_instance = bot
def refresh_bot_instance(self):
"""IPC에서 최신 봇 인스턴스 데이터 읽기"""
if self.ipc:
self.bot_instance = self.ipc.get_bot_instance_data()
else:
# fallback: 새 IPC 인스턴스 생성
from modules.utils.ipc import SharedIPC
ipc = SharedIPC()
self.bot_instance = ipc.get_bot_instance_data()
return self.bot_instance is not None
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
logging.info(f"[Command] /start from user {update.effective_user.id}")
await update.message.reply_text(
"AI Trading Bot Command Center\n"
"명령어 목록:\n"
"/status - 현재 봇 및 시장 상태 조회\n"
"/portfolio - 현재 보유 종목 및 평가액\n"
"/watchlist - 현재 감시 중인 종목 리스트\n"
"/update_watchlist - Watchlist 즉시 업데이트\n"
"/macro - 거시경제 지표 및 시장 위험도\n"
"/system - PC 리소스(CPU/GPU) 상태\n"
"/ai - AI 모델 학습 상태 조회\n"
"/evaluate - 즉시 성과 평가 보고서 생성\n\n"
"[관리 명령어]\n"
"/restart - 메인 봇 재시작 요청\n"
"/exec 명령어 - 원격 명령어 실행\n"
"/stop - 봇 종료",
parse_mode="HTML"
)
async def status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
logging.info(f"[Command] /status from user {update.effective_user.id}")
if not self.refresh_bot_instance():
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
from datetime import datetime
now = datetime.now()
is_market_open = (9 <= now.hour < 15) or (now.hour == 15 and now.minute < 30)
status_msg = "System Status: ONLINE\n"
status_msg += f"Market: {'OPEN' if is_market_open else 'CLOSED'}\n"
macro_warn = self.bot_instance.is_macro_warning_sent
status_msg += f"Macro Filter: {'DANGER (Trading Halted)' if macro_warn else 'SAFE'}\n"
await update.message.reply_text(status_msg, parse_mode="HTML")
async def portfolio_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self.refresh_bot_instance():
await update.message.reply_text("봇 인스턴스가 연결되지 않았습니다.")
return
await update.message.reply_text("잔고를 조회 중입니다...")
try:
balance = self.bot_instance.kis.get_balance()
if "error" in balance:
await update.message.reply_text(f"잔고 조회 실패: {balance['error']}")
return
msg = f"Total Asset: {int(balance['total_eval']):,} KRW\n" \
f"Deposit: {int(balance['deposit']):,} KRW\n\n"
if balance['holdings']:
msg += "[Holdings]\n"
for stock in balance['holdings']:
yld = float(stock.get('yield', 0))
# 상승(빨강), 하락(파랑) 이모지 적용
if yld > 0:
icon = "🔴"
yld_str = f"+{yld}"
elif yld < 0:
icon = "🔵"
yld_str = f"{yld}"
else:
icon = "⚪"
yld_str = f"{yld}"
msg += f"{icon} {stock['name']}: {yld_str}%\n" \
f" (수량: {stock['qty']} / 손익: {stock['profit_loss']:,})\n"
else:
msg += "보유 중인 종목이 없습니다."
await update.message.reply_text(msg, parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"Error: {str(e)}")
async def watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self.refresh_bot_instance():
await update.message.reply_text("봇 인스턴스가 연결되지 않았습니다.")
return
target_dict = self.bot_instance.load_watchlist()
discovered = self.bot_instance.discovered_stocks
msg = f"Watchlist: {len(target_dict)} items\n"
for code, name in target_dict.items():
themes = self.bot_instance.theme_manager.get_themes(code)
theme_str = f" ({', '.join(themes)})" if themes else ""
msg += f"• {name}{theme_str}\n"
if discovered:
msg += f"\nDiscovered Today ({len(discovered)}):\n"
for code in discovered:
msg += f"- {code}\n"
await update.message.reply_text(msg, parse_mode="HTML")
async def update_watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Watchlist 업데이트 - command queue를 통해 메인 봇에 요청"""
if self.ipc and self.ipc.send_command('update_watchlist'):
await update.message.reply_text("Watchlist 업데이트를 메인 봇에 요청했습니다.")
else:
# fallback: 직접 업데이트
await update.message.reply_text("Watchlist를 업데이트하고 있습니다... (30초 소요)")
try:
from modules.services.kis import KISClient
from watchlist_manager import WatchlistManager
from modules.config import Config
temp_kis = KISClient()
mgr = WatchlistManager(temp_kis, watchlist_file=Config.WATCHLIST_FILE)
summary = mgr.update_watchlist_daily()
summary = summary.replace("&", "&").replace("<", "<").replace(">", ">")
await update.message.reply_text(summary)
except Exception as e:
await update.message.reply_text(f"업데이트 실패: {e}")
async def macro_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self.refresh_bot_instance():
await update.message.reply_text("메인 봇 연결 대기 중...")
return
await update.message.reply_text("거시경제 데이터를 불러옵니다...")
try:
indices = getattr(self.bot_instance.kis, '_macro_indices', {})
if not indices:
await update.message.reply_text("데이터가 아직 수집되지 않았습니다.")
return
msi = float(indices.get('MSI', 0))
if msi >= 50:
risk_status = "🔴 DANGER"
risk_desc = "시장 극도 불안정 - 매수 중단 권고"
elif msi >= 30:
risk_status = "🟡 CAUTION"
risk_desc = "시장 불안정 - 보수적 매매 권고"
else:
risk_status = "🟢 SAFE"
risk_desc = "시장 안정 - 정상 매매 가능"
from datetime import datetime
now_str = datetime.now().strftime("%m/%d %H:%M")
msg = f"거시경제 지표 {now_str}\n"
msg += f"━━━━━━━━━━━━━━━━━━\n"
msg += f"Market Risk: {risk_status}\n"
msg += f"{risk_desc}\n\n"
# MSI 상세
msi_bar = "█" * int(msi / 10) + "░" * (10 - int(msi / 10))
msg += f"Stress Index (MSI): {msi:.1f}/100\n"
msg += f"[{msi_bar}]\n\n"
# 지수 상세
index_order = ["KOSPI", "KOSDAQ", "KOSPI200"]
for k in index_order:
if k not in indices:
continue
v = indices[k]
price = float(v.get('price', 0))
change = float(v.get('change', 0))
change_val = float(v.get('change_val', 0))
high = float(v.get('high', 0))
low = float(v.get('low', 0))
prev_close = float(v.get('prev_close', 0))
volume = int(v.get('volume', 0))
if price == 0:
msg += f"⚫ {k}: 데이터 없음 (장 마감 후)\n\n"
continue
if change > 0:
icon = "🔴"
chg_str = f"+{change:.2f}% (+{change_val:.2f}pt)"
elif change < 0:
icon = "🔵"
chg_str = f"{change:.2f}% ({change_val:.2f}pt)"
else:
icon = "⚪"
chg_str = f"{change:.2f}%"
msg += f"{icon} {k}: {price:,.2f} {chg_str}\n"
if high and low:
msg += f" 고: {high:,.2f} 저: {low:,.2f}"
if prev_close:
msg += f" 전일종가: {prev_close:,.2f}"
msg += "\n"
if volume:
msg += f" 거래량: {volume:,}천주\n"
msg += "\n"
await update.message.reply_text(msg, parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"Error: {e}")
async def system_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self.refresh_bot_instance():
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
import psutil
# non-blocking CPU 측정
cpu = psutil.cpu_percent(interval=0)
ram = psutil.virtual_memory().percent
top_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
proc_info = proc.info
if proc_info['name'] == 'System Idle Process':
continue
top_processes.append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
top_processes.sort(key=lambda x: x.get('cpu_percent', 0), reverse=True)
top_3 = top_processes[:3]
gpu_status = self.bot_instance.ollama_monitor.get_gpu_status()
gpu_msg = "N/A"
if gpu_status and gpu_status.get('name') != 'N/A':
gpu_name = gpu_status.get('name', 'GPU')
gpu_msg = f"{gpu_name}\n Temp: {gpu_status.get('temp', 0)}C / " \
f"VRAM: {gpu_status.get('vram_used', 0)}GB / {gpu_status.get('vram_total', 0)}GB"
msg = "PC System Status\n" \
f"CPU: {cpu}%\n" \
f"RAM: {ram}%\n" \
f"GPU: {gpu_msg}\n\n"
if top_3:
msg += "Top CPU Processes:\n"
for i, proc in enumerate(top_3, 1):
proc_name = proc.get('name', 'Unknown')
proc_cpu = proc.get('cpu_percent', 0)
msg += f" {i}. {proc_name} - {proc_cpu:.1f}%\n"
await update.message.reply_text(msg, parse_mode="HTML")
async def ai_status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self.refresh_bot_instance():
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
from modules.config import Config
gpu = self.bot_instance.ollama_monitor.get_gpu_status()
msg = "AI Model Status\n"
msg += f"* LLM Engine: Ollama ({Config.OLLAMA_MODEL})\n"
msg += f"* Device: {gpu.get('name', 'GPU')}\n"
if gpu:
msg += f"* GPU Load: {gpu.get('load', 0)}%\n"
msg += f"* VRAM Usage: {gpu.get('vram_used', 0)}GB / {gpu.get('vram_total', 0)}GB"
await update.message.reply_text(msg, parse_mode="HTML")
async def restart_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/restart: 메인 봇에 재시작 명령 전달"""
if self.ipc and self.ipc.send_command('restart'):
await update.message.reply_text(
"메인 봇에 재시작 요청을 전송했습니다.", parse_mode="HTML")
else:
# IPC 명령 실패 시 텔레그램 봇만 재시작
await update.message.reply_text(
"텔레그램 인터페이스를 재시작합니다...", parse_mode="HTML")
self.should_restart = True
self.application.stop_running()
async def stop_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"텔레그램 봇을 종료합니다.", parse_mode="HTML")
self.should_restart = False
self.application.stop_running()
async def exec_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text.strip()
parts = text.split(maxsplit=1)
if len(parts) < 2:
await update.message.reply_text("사용법: /exec 명령어")
return
command = parts[1]
await update.message.reply_text(f"실행 중: {command}", parse_mode="HTML")
try:
dangerous_keywords = ['rm', 'del', 'format', 'shutdown', 'reboot']
if any(keyword in command.lower() for keyword in dangerous_keywords):
await update.message.reply_text("위험한 명령어는 실행할 수 없습니다.")
return
import platform
is_windows = platform.system() == 'Windows'
if is_windows:
exec_cmd = ['powershell', '-Command', command]
else:
exec_cmd = command
def run_subprocess():
return subprocess.run(
exec_cmd,
shell=not is_windows,
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=30,
cwd=os.getcwd()
)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, run_subprocess)
output = result.stdout.strip() if result.stdout else ""
error_output = result.stderr.strip() if result.stderr else ""
if output and error_output:
combined = f"[STDOUT]\n{output}\n\n[STDERR]\n{error_output}"
elif output:
combined = output
elif error_output:
combined = f"[ERROR]\n{error_output}"
else:
combined = "명령어 실행 완료 (출력 없음)"
if len(combined) > 3000:
combined = combined[:3000] + "\n... (Truncated)"
combined = combined.replace("&", "&").replace("<", "<").replace(">", ">")
await update.message.reply_text(f"
{combined}", parse_mode="HTML")
except asyncio.TimeoutError:
await update.message.reply_text("명령어 실행 시간 초과 (30초)")
except Exception as e:
await update.message.reply_text(f"실행 오류: {e}")
async def evaluate_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/evaluate: 즉시 성과 평가 보고서 생성 (LLM 분석 포함)"""
await update.message.reply_text(
"📊 성과 평가를 실행합니다...\n"
"LLM 전문가 패널 분석 포함 시 30초~1분 소요됩니다.",
parse_mode="HTML"
)
try:
from modules.utils.performance_db import PerformanceDB
from modules.analysis.evaluator import PerformanceEvaluator
evaluator = PerformanceEvaluator()
loop = asyncio.get_running_loop()
report = await loop.run_in_executor(None, evaluator.generate_weekly_report)
if len(report) > 4000:
report = report[:4000] + "\n... (일부 생략)"
await update.message.reply_text(report, parse_mode="HTML")
except Exception as e:
logging.error(f"[Command] /evaluate error: {e}")
await update.message.reply_text(f"평가 오류: {e}")
def run(self):
handlers = [
("start", self.start_command),
("status", self.status_command),
("portfolio", self.portfolio_command),
("watchlist", self.watchlist_command),
("update_watchlist", self.update_watchlist_command),
("macro", self.macro_command),
("system", self.system_command),
("ai", self.ai_status_command),
("evaluate", self.evaluate_command),
("restart", self.restart_command),
("stop", self.stop_command),
("exec", self.exec_command)
]
for cmd, func in handlers:
self.application.add_handler(CommandHandler(cmd, func))
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
if "Conflict" in str(context.error):
print(f"[Telegram] Conflict detected. Stopping...")
if self.application.running:
await self.application.stop()
return
print(f"[Telegram Error] {context.error}")
self.application.add_error_handler(error_handler)
logging.info("[Telegram] Command Server Started (Shared Memory IPC Mode).")
print("[Telegram] Command Server Started (Shared Memory IPC Mode).")
try:
self.application.run_polling(
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True
)
except Exception as e:
print(f"[Telegram] Polling Error: {e}")