""" 텔레그램 봇 - Shared Memory IPC + 양방향 명령 채널 """ import os import asyncio import logging import subprocess from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes # [디버깅] 파일 로깅 추가 log_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), "telegram_bot.log") file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[logging.StreamHandler(), file_handler] ) logging.getLogger("httpx").setLevel(logging.WARNING) class TelegramBotServer: def __init__(self, bot_token, ipc=None, shutdown_event=None): self.application = Application.builder()\ .token(bot_token)\ .concurrent_updates(True)\ .build() self.bot_instance = None self.ipc = ipc self.shutdown_event = shutdown_event self.is_shutting_down = False self.should_restart = False def set_bot_instance(self, bot): self.bot_instance = bot def refresh_bot_instance(self): """IPC에서 최신 봇 인스턴스 데이터 읽기""" if self.ipc: self.bot_instance = self.ipc.get_bot_instance_data() else: # fallback: 새 IPC 인스턴스 생성 from modules.utils.ipc import SharedIPC ipc = SharedIPC() self.bot_instance = ipc.get_bot_instance_data() return self.bot_instance is not None async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): logging.info(f"[Command] /start from user {update.effective_user.id}") await update.message.reply_text( "AI Trading Bot Command Center\n" "명령어 목록:\n" "/status - 현재 봇 및 시장 상태 조회\n" "/portfolio - 현재 보유 종목 및 평가액\n" "/watchlist - 현재 감시 중인 종목 리스트\n" "/update_watchlist - Watchlist 즉시 업데이트\n" "/macro - 거시경제 지표 및 시장 위험도\n" "/system - PC 리소스(CPU/GPU) 상태\n" "/ai - AI 모델 학습 상태 조회\n" "/evaluate - 즉시 성과 평가 보고서 생성\n\n" "[AI 진단 스킬]\n" "/syshealth - 시스템 종합 건강 진단\n" "/risk - 리스크 대시보드 (MDD, 연속손절)\n" "/regime - 코스피 시장 레짐 감지\n" "/model_health - LSTM 모델 건강 체크\n" "/weights - 앙상블 가중치 분석\n" "/postmortem [일수] - 매매 사후 분석 (기본 30일)\n" "/watchlist_check - 감시 종목 스코어링\n\n" "[관리 명령어]\n" "/restart - 메인 봇 재시작 요청\n" "/exec 명령어 - 원격 명령어 실행\n" "/stop - 봇 종료", parse_mode="HTML" ) async def status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): logging.info(f"[Command] /status from user {update.effective_user.id}") if not self.refresh_bot_instance(): await update.message.reply_text("메인 봇이 실행 중이 아닙니다.") return from datetime import datetime now = datetime.now() is_market_open = (9 <= now.hour < 15) or (now.hour == 15 and now.minute < 30) status_msg = "System Status: ONLINE\n" status_msg += f"Market: {'OPEN' if is_market_open else 'CLOSED'}\n" macro_warn = self.bot_instance.is_macro_warning_sent status_msg += f"Macro Filter: {'DANGER (Trading Halted)' if macro_warn else 'SAFE'}\n" await update.message.reply_text(status_msg, parse_mode="HTML") async def portfolio_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): if not self.refresh_bot_instance(): await update.message.reply_text("봇 인스턴스가 연결되지 않았습니다.") return await update.message.reply_text("잔고를 조회 중입니다...") try: balance = self.bot_instance.kis.get_balance() if "error" in balance: await update.message.reply_text(f"잔고 조회 실패: {balance['error']}") return msg = f"Total Asset: {int(balance['total_eval']):,} KRW\n" \ f"Deposit: {int(balance['deposit']):,} KRW\n\n" if balance['holdings']: msg += "[Holdings]\n" for stock in balance['holdings']: yld = float(stock.get('yield', 0)) # 상승(빨강), 하락(파랑) 이모지 적용 if yld > 0: icon = "🔴" yld_str = f"+{yld}" elif yld < 0: icon = "🔵" yld_str = f"{yld}" else: icon = "⚪" yld_str = f"{yld}" msg += f"{icon} {stock['name']}: {yld_str}%\n" \ f" (수량: {stock['qty']} / 손익: {stock['profit_loss']:,})\n" else: msg += "보유 중인 종목이 없습니다." await update.message.reply_text(msg, parse_mode="HTML") except Exception as e: await update.message.reply_text(f"Error: {str(e)}") async def watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): if not self.refresh_bot_instance(): await update.message.reply_text("봇 인스턴스가 연결되지 않았습니다.") return target_dict = self.bot_instance.load_watchlist() discovered = self.bot_instance.discovered_stocks msg = f"Watchlist: {len(target_dict)} items\n" for code, name in target_dict.items(): themes = self.bot_instance.theme_manager.get_themes(code) theme_str = f" ({', '.join(themes)})" if themes else "" msg += f"• {name}{theme_str}\n" if discovered: msg += f"\nDiscovered Today ({len(discovered)}):\n" for code in discovered: msg += f"- {code}\n" await update.message.reply_text(msg, parse_mode="HTML") async def update_watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Watchlist 업데이트 - command queue를 통해 메인 봇에 요청""" if self.ipc and self.ipc.send_command('update_watchlist'): await update.message.reply_text("Watchlist 업데이트를 메인 봇에 요청했습니다.") else: # fallback: 직접 업데이트 await update.message.reply_text("Watchlist를 업데이트하고 있습니다... (30초 소요)") try: from modules.services.kis import KISClient from watchlist_manager import WatchlistManager from modules.config import Config temp_kis = KISClient() mgr = WatchlistManager(temp_kis, watchlist_file=Config.WATCHLIST_FILE) summary = mgr.update_watchlist_daily() summary = summary.replace("&", "&").replace("<", "<").replace(">", ">") await update.message.reply_text(summary) except Exception as e: await update.message.reply_text(f"업데이트 실패: {e}") async def macro_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): if not self.refresh_bot_instance(): await update.message.reply_text("메인 봇 연결 대기 중...") return await update.message.reply_text("거시경제 데이터를 불러옵니다...") try: indices = getattr(self.bot_instance.kis, '_macro_indices', {}) if not indices: await update.message.reply_text("데이터가 아직 수집되지 않았습니다.") return msi = float(indices.get('MSI', 0)) if msi >= 50: risk_status = "🔴 DANGER" risk_desc = "시장 극도 불안정 - 매수 중단 권고" elif msi >= 30: risk_status = "🟡 CAUTION" risk_desc = "시장 불안정 - 보수적 매매 권고" else: risk_status = "🟢 SAFE" risk_desc = "시장 안정 - 정상 매매 가능" from datetime import datetime now_str = datetime.now().strftime("%m/%d %H:%M") msg = f"거시경제 지표 {now_str}\n" msg += f"━━━━━━━━━━━━━━━━━━\n" msg += f"Market Risk: {risk_status}\n" msg += f"{risk_desc}\n\n" # MSI 상세 msi_bar = "█" * int(msi / 10) + "░" * (10 - int(msi / 10)) msg += f"Stress Index (MSI): {msi:.1f}/100\n" msg += f"[{msi_bar}]\n\n" # 지수 상세 index_order = ["KOSPI", "KOSDAQ", "KOSPI200"] for k in index_order: if k not in indices: continue v = indices[k] price = float(v.get('price', 0)) change = float(v.get('change', 0)) change_val = float(v.get('change_val', 0)) high = float(v.get('high', 0)) low = float(v.get('low', 0)) prev_close = float(v.get('prev_close', 0)) volume = int(v.get('volume', 0)) if price == 0: # 장 마감 후: prev_close(전일 종가)라도 표시 if prev_close > 0: msg += f"⚫ {k}: {prev_close:,.2f} (전일 종가 기준, 장 마감)\n\n" else: msg += f"⚫ {k}: 데이터 없음 (장 마감 후)\n\n" continue if change > 0: icon = "🔴" chg_str = f"+{change:.2f}% (+{change_val:.2f}pt)" elif change < 0: icon = "🔵" chg_str = f"{change:.2f}% ({change_val:.2f}pt)" else: icon = "⚪" chg_str = f"{change:.2f}%" msg += f"{icon} {k}: {price:,.2f} {chg_str}\n" if high and low: msg += f" 고: {high:,.2f} 저: {low:,.2f}" if prev_close: msg += f" 전일종가: {prev_close:,.2f}" msg += "\n" if volume: msg += f" 거래량: {volume:,}천주\n" msg += "\n" await update.message.reply_text(msg, parse_mode="HTML") except Exception as e: await update.message.reply_text(f"Error: {e}") async def system_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): if not self.refresh_bot_instance(): await update.message.reply_text("메인 봇이 실행 중이 아닙니다.") return import psutil # non-blocking CPU 측정 cpu = psutil.cpu_percent(interval=0) ram = psutil.virtual_memory().percent top_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']): try: proc_info = proc.info if proc_info['name'] == 'System Idle Process': continue top_processes.append(proc_info) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass top_processes.sort(key=lambda x: x.get('cpu_percent', 0), reverse=True) top_3 = top_processes[:3] gpu_status = self.bot_instance.ollama_monitor.get_gpu_status() gpu_msg = "N/A" if gpu_status and gpu_status.get('name') != 'N/A': gpu_name = gpu_status.get('name', 'GPU') gpu_msg = f"{gpu_name}\n Temp: {gpu_status.get('temp', 0)}C / " \ f"VRAM: {gpu_status.get('vram_used', 0)}GB / {gpu_status.get('vram_total', 0)}GB" msg = "PC System Status\n" \ f"CPU: {cpu}%\n" \ f"RAM: {ram}%\n" \ f"GPU: {gpu_msg}\n\n" if top_3: msg += "Top CPU Processes:\n" for i, proc in enumerate(top_3, 1): proc_name = proc.get('name', 'Unknown') proc_cpu = proc.get('cpu_percent', 0) msg += f" {i}. {proc_name} - {proc_cpu:.1f}%\n" await update.message.reply_text(msg, parse_mode="HTML") async def ai_status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): if not self.refresh_bot_instance(): await update.message.reply_text("메인 봇이 실행 중이 아닙니다.") return from modules.config import Config gpu = self.bot_instance.ollama_monitor.get_gpu_status() if Config.GEMINI_API_KEY: llm_primary = f"Gemini ({Config.GEMINI_MODEL})" llm_fallback = f"Ollama ({Config.OLLAMA_MODEL})" else: llm_primary = f"Ollama ({Config.OLLAMA_MODEL})" llm_fallback = None msg = "AI Model Status\n" msg += f"* LLM Engine: {llm_primary}\n" if llm_fallback: msg += f"* Fallback: {llm_fallback}\n" msg += f"* LSTM Device: {gpu.get('name', 'GPU')}\n" if gpu: msg += f"* GPU Load: {gpu.get('load', 0)}%\n" msg += f"* VRAM Usage: {gpu.get('vram_used', 0)}GB / {gpu.get('vram_total', 0)}GB" await update.message.reply_text(msg, parse_mode="HTML") async def restart_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/restart: 메인 봇에 재시작 명령 전달""" if self.ipc and self.ipc.send_command('restart'): await update.message.reply_text( "메인 봇에 재시작 요청을 전송했습니다.", parse_mode="HTML") else: # IPC 명령 실패 시 텔레그램 봇만 재시작 await update.message.reply_text( "텔레그램 인터페이스를 재시작합니다...", parse_mode="HTML") self.should_restart = True self.application.stop_running() async def stop_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text( "텔레그램 봇을 종료합니다.", parse_mode="HTML") self.should_restart = False self.application.stop_running() async def exec_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): text = update.message.text.strip() parts = text.split(maxsplit=1) if len(parts) < 2: await update.message.reply_text("사용법: /exec 명령어") return command = parts[1] await update.message.reply_text(f"실행 중: {command}", parse_mode="HTML") try: dangerous_keywords = ['rm', 'del', 'format', 'shutdown', 'reboot'] if any(keyword in command.lower() for keyword in dangerous_keywords): await update.message.reply_text("위험한 명령어는 실행할 수 없습니다.") return import platform is_windows = platform.system() == 'Windows' if is_windows: exec_cmd = ['powershell', '-Command', command] else: exec_cmd = command def run_subprocess(): return subprocess.run( exec_cmd, shell=not is_windows, capture_output=True, text=True, encoding='utf-8', errors='replace', timeout=30, cwd=os.getcwd() ) loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, run_subprocess) output = result.stdout.strip() if result.stdout else "" error_output = result.stderr.strip() if result.stderr else "" if output and error_output: combined = f"[STDOUT]\n{output}\n\n[STDERR]\n{error_output}" elif output: combined = output elif error_output: combined = f"[ERROR]\n{error_output}" else: combined = "명령어 실행 완료 (출력 없음)" if len(combined) > 3000: combined = combined[:3000] + "\n... (Truncated)" combined = combined.replace("&", "&").replace("<", "<").replace(">", ">") await update.message.reply_text(f"
{combined}
", parse_mode="HTML") except asyncio.TimeoutError: await update.message.reply_text("명령어 실행 시간 초과 (30초)") except Exception as e: await update.message.reply_text(f"실행 오류: {e}") async def evaluate_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/evaluate: 즉시 성과 평가 보고서 생성 (LLM 분석 포함)""" await update.message.reply_text( "📊 성과 평가를 실행합니다...\n" "LLM 전문가 패널 분석 포함 시 30초~1분 소요됩니다.", parse_mode="HTML" ) try: from modules.utils.performance_db import PerformanceDB from modules.analysis.evaluator import PerformanceEvaluator evaluator = PerformanceEvaluator() loop = asyncio.get_running_loop() report = await loop.run_in_executor(None, evaluator.generate_weekly_report) if len(report) > 4000: report = report[:4000] + "\n... (일부 생략)" await update.message.reply_text(report, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /evaluate error: {e}") await update.message.reply_text(f"평가 오류: {e}") # ────────────────────────────────────────────── # AI 진단 스킬 명령어 (skill_runner 기반) # ────────────────────────────────────────────── async def syshealth_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/syshealth: 시스템 종합 건강 진단""" await update.message.reply_text("🔍 시스템 건강 진단 중... (최대 30초 소요)", parse_mode="HTML") try: from modules.services.telegram_bot import skill_runner result = await skill_runner.run_syshealth() for chunk in result: await update.message.reply_text(chunk, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /syshealth error: {e}") await update.message.reply_text(f"진단 오류: {e}") async def risk_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/risk: 리스크 대시보드 (MDD, 연속손절, 포지션 집중도)""" await update.message.reply_text("📊 리스크 데이터 분석 중...", parse_mode="HTML") try: from modules.services.telegram_bot import skill_runner result = await skill_runner.run_risk() for chunk in result: await update.message.reply_text(chunk, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /risk error: {e}") await update.message.reply_text(f"리스크 분석 오류: {e}") async def regime_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/regime: 코스피 시장 레짐 감지""" await update.message.reply_text("📈 시장 레짐 분석 중...", parse_mode="HTML") try: from modules.services.telegram_bot import skill_runner result = await skill_runner.run_regime() for chunk in result: await update.message.reply_text(chunk, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /regime error: {e}") await update.message.reply_text(f"레짐 분석 오류: {e}") async def model_health_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/model_health: LSTM 모델 건강 체크""" await update.message.reply_text("🧠 LSTM 모델 체크포인트 스캔 중...", parse_mode="HTML") try: from modules.services.telegram_bot import skill_runner result = await skill_runner.run_model_health() for chunk in result: await update.message.reply_text(chunk, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /model_health error: {e}") await update.message.reply_text(f"모델 건강 체크 오류: {e}") async def weights_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/weights: 앙상블 가중치 분석""" await update.message.reply_text("⚖️ 앙상블 가중치 분석 중...", parse_mode="HTML") try: from modules.services.telegram_bot import skill_runner result = await skill_runner.run_weights() for chunk in result: await update.message.reply_text(chunk, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /weights error: {e}") await update.message.reply_text(f"가중치 분석 오류: {e}") async def postmortem_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/postmortem [days]: 매매 사후 분석 (기본 30일)""" args = context.args days = 30 if args: try: days = int(args[0]) days = max(7, min(days, 365)) except ValueError: pass await update.message.reply_text( f"🔬 최근 {days}일 매매 사후 분석 중...", parse_mode="HTML") try: from modules.services.telegram_bot import skill_runner result = await skill_runner.run_postmortem(days) for chunk in result: await update.message.reply_text(chunk, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /postmortem error: {e}") await update.message.reply_text(f"사후 분석 오류: {e}") async def watchlist_check_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """/watchlist_check: 현재 감시 종목 스코어링""" await update.message.reply_text("🔎 감시 종목 스코어링 중...", parse_mode="HTML") try: from modules.services.telegram_bot import skill_runner # 현재 watchlist에서 종목 코드 목록 로드 candidates = [] try: import json, os from modules.config import Config wl_path = Config.WATCHLIST_FILE if os.path.exists(wl_path): with open(wl_path, encoding="utf-8") as f: wl_data = json.load(f) if isinstance(wl_data, dict): candidates = list(wl_data.keys()) elif isinstance(wl_data, list): candidates = wl_data except Exception: pass result = await skill_runner.run_watchlist_check(candidates) for chunk in result: await update.message.reply_text(chunk, parse_mode="HTML") except Exception as e: logging.error(f"[Command] /watchlist_check error: {e}") await update.message.reply_text(f"스코어링 오류: {e}") def run(self): handlers = [ ("start", self.start_command), ("status", self.status_command), ("portfolio", self.portfolio_command), ("watchlist", self.watchlist_command), ("update_watchlist", self.update_watchlist_command), ("macro", self.macro_command), ("system", self.system_command), ("ai", self.ai_status_command), ("evaluate", self.evaluate_command), ("syshealth", self.syshealth_command), ("risk", self.risk_command), ("regime", self.regime_command), ("model_health", self.model_health_command), ("weights", self.weights_command), ("postmortem", self.postmortem_command), ("watchlist_check", self.watchlist_check_command), ("restart", self.restart_command), ("stop", self.stop_command), ("exec", self.exec_command) ] for cmd, func in handlers: self.application.add_handler(CommandHandler(cmd, func)) async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: if "Conflict" in str(context.error): print(f"[Telegram] Conflict detected. Stopping...") if self.application.running: await self.application.stop() return print(f"[Telegram Error] {context.error}") self.application.add_error_handler(error_handler) logging.info("[Telegram] Command Server Started (Shared Memory IPC Mode).") print("[Telegram] Command Server Started (Shared Memory IPC Mode).") try: self.application.run_polling( allowed_updates=Update.ALL_TYPES, drop_pending_updates=True ) except Exception as e: print(f"[Telegram] Polling Error: {e}")