""" Skill Runner — 텔레그램 봇에서 Claude Skills 스크립트를 실행하는 유틸리티 각 스킬 스크립트를 subprocess로 실행하고, 결과를 텔레그램 HTML 메시지로 포맷합니다. Claude Code 없이도 텔레그램 명령어만으로 분석 리포트를 받을 수 있습니다. """ import asyncio import json import logging import os import subprocess import sys from pathlib import Path from typing import List, Optional logger = logging.getLogger(__name__) # 봇 프로젝트 루트 (이 파일 기준 3단계 상위) BOT_ROOT = Path(__file__).resolve().parent.parent.parent.parent SKILLS_DIR = BOT_ROOT / ".claude" / "skills" PYTHON_EXE = sys.executable # 현재 봇과 동일한 Python 인터프리터 사용 def _skill_script(skill_name: str, script_name: str) -> Path: return SKILLS_DIR / skill_name / "scripts" / script_name async def _run_script(script_path: Path, extra_args: Optional[list] = None, timeout: int = 60) -> dict: """ 스킬 스크립트를 비동기 subprocess로 실행. --bot-path, --json 플래그를 자동으로 추가. 반환: {"ok": bool, "output": str, "json_data": dict|None} """ if not script_path.exists(): return {"ok": False, "output": f"스크립트 없음: {script_path}", "json_data": None} cmd = [PYTHON_EXE, str(script_path), "--bot-path", str(BOT_ROOT), "--json"] if extra_args: cmd.extend(extra_args) try: loop = asyncio.get_running_loop() # PYTHONIOENCODING=utf-8: 서브프로세스 stdout에서 유니코드/이모지 출력 허용 _env = {**os.environ, "PYTHONIOENCODING": "utf-8"} result = await loop.run_in_executor( None, lambda: subprocess.run( cmd, capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=timeout, cwd=str(BOT_ROOT), env=_env, ) ) raw_out = result.stdout.strip() raw_err = result.stderr.strip() # JSON 파싱 시도 json_data = None if raw_out: try: json_data = json.loads(raw_out) except json.JSONDecodeError: pass if result.returncode != 0 and not raw_out: return {"ok": False, "output": raw_err or "알 수 없는 오류", "json_data": None} return {"ok": True, "output": raw_out, "json_data": json_data} except subprocess.TimeoutExpired: return {"ok": False, "output": f"실행 시간 초과 ({timeout}초)", "json_data": None} except Exception as e: return {"ok": False, "output": str(e), "json_data": None} def _truncate(text: str, limit: int = 3800) -> str: if len(text) <= limit: return text return text[:limit] + "\n... (일부 생략)" def _escape_html(text: str) -> str: return text.replace("&", "&").replace("<", "<").replace(">", ">") # ───────────────────────────────────────────── # 스킬별 포맷터 # ───────────────────────────────────────────── def _fmt_syshealth(data: dict) -> str: ipc = data.get("ipc", {}) gpu = data.get("gpu", {}) token = data.get("kis_token", {}) procs = data.get("processes", {}) ipc_status = ipc.get("status", "?") ipc_emoji = {"FRESH": "✅", "NORMAL": "✅", "STALE": "⚠️", "EXPIRED": "🔴", "EMPTY": "⚠️", "ERROR": "🔴"}.get(ipc_status, "❓") age = ipc.get("age_seconds") age_str = f"{age}초 전" if age is not None else "알 수 없음" api_str = "✅ 실행 중" if procs.get("api_running") else "🔴 오프라인" token_str = "✅ 유효" if token.get("status") == "VALID" else f"🔴 {token.get('status','?')}" token_env = token.get("env", "?") vram = gpu.get("vram_used_gb") vram_str = f"{vram}GB / {gpu.get('vram_total_gb', 16)}GB" if vram else "측정 불가" cuda_str = "✅" if gpu.get("cuda_available") else "❌" # 로그 에러 집계 logs = data.get("logs", {}) all_errors = {} for ld in logs.values(): for k, v in ld.get("errors", {}).items(): all_errors[k] = all_errors.get(k, 0) + v err_lines = "\n".join( f" ⚠️ {k}: {v}회" for k, v in sorted(all_errors.items(), key=lambda x: x[1], reverse=True) ) or " ✅ 없음" balance = ipc.get("balance") balance_str = f"\n 잔고: {int(balance):,}원" if balance else "" wl_count = ipc.get("watchlist_count", 0) msg = ( f"🔧 시스템 헬스 진단\n" f"━━━━━━━━━━━━━━━━━━\n" f"API 서버: {api_str}\n" f"IPC 상태: {ipc_emoji} {ipc_status} ({age_str})" f"{balance_str}\n" f" 감시종목: {wl_count}개\n" f"GPU/CUDA: {cuda_str} VRAM: {vram_str}\n" f"KIS 토큰: {token_str} ({token_env})\n\n" f"로그 에러 (최근):\n{err_lines}" ) return msg def _fmt_risk(data: dict) -> str: mdd = data.get("mdd", {}) dl = data.get("daily_loss", {}) cl = data.get("consecutive_losses", {}) cap = data.get("total_capital", 0) mdd_val = mdd.get("mdd", 0) or 0 mdd_emoji = "✅" if mdd_val > -5 else ("⚠️" if mdd_val > -10 else "🔴") dl_ratio = dl.get("ratio", 0) or 0 dl_emoji = "✅" if dl_ratio < 50 else ("⚠️" if dl_ratio < 75 else "🔴") cl_count = cl.get("count", 0) cl_active = cl.get("cooldown_active", False) cl_emoji = "🚨" if cl_active else ("⚠️" if cl_count >= 2 else "✅") msg = ( f"🛡️ 리스크 대시보드\n" f"━━━━━━━━━━━━━━━━━━\n" f"총 자산: {int(cap):,}원\n\n" f"MDD: {mdd_emoji} {mdd_val:.1f}% ({mdd.get('level','?')})\n" f" 최고점: {int(mdd.get('peak',0) or 0):,}원 ({mdd.get('peak_days_ago','?')}일 전)\n" f" 복구 필요: +{mdd.get('recovery_needed',0):.1f}%\n\n" f"일일 손실한도: {dl_emoji} {dl_ratio:.0f}% 소진\n" f" 한도: {int(dl.get('limit',0) or 0):,}원 " f"사용: {int(dl.get('used',0) or 0):,}원\n\n" f"연속 손절: {cl_emoji} {cl_count}회" ) if cl_active: msg += f"\n 🚨 매수 중단 중 (재개: {cl.get('resume_time','?')})" return msg def _fmt_regime(data: dict) -> str: regime = data.get("regime", "?") msi = data.get("msi", {}) params = data.get("recommended_params", {}) ens = params.get("ensemble", {}) data_source = data.get("data_source", "ipc") source_note = " (IPC 데이터 없음 — 기본값 기반)\n" if data_source == "default" else "" regime_emoji = { "BULL_EXTREME": "🔥", "BULL_STRONG": "📈", "NORMAL": "➡️", "BEAR_WEAK": "📉", "BEAR_STRONG": "🚨" }.get(regime, "❓") status_emoji = {"SAFE": "✅", "CAUTION": "⚠️", "DANGER": "🚨"}.get(msi.get("status", ""), "❓") flags = msi.get("flags", {}) flag_lines = "\n".join(f" {v}" for v in flags.values()) msg = ( f"📊 시장 레짐 분석\n" f"━━━━━━━━━━━━━━━━━━\n" f"{source_note}" f"레짐: {regime_emoji} {regime}\n" f"MSI: {status_emoji} {msi.get('score','?')}/{msi.get('max','?')} ({msi.get('status','?')})\n\n" f"지표 현황:\n{flag_lines}\n\n" f"권고 파라미터:\n" f" buy_threshold: {params.get('buy_threshold','?')}\n" f" max_position: {params.get('max_position_ratio','?')}\n" f" sl_atr_mult: {params.get('sl_atr_multiplier','?')}\n\n" f"앙상블 권고:\n" f" tech: {ens.get('tech','?')} " f"lstm: {ens.get('lstm','?')} " f"sent: {ens.get('sentiment','?')}\n" f"다음 점검: {params.get('next_check_days','?')}일 후" ) return msg def _fmt_model_health(data: dict) -> str: models = data.get("models", {}) missing = data.get("missing_models", []) grade_emoji = {"HEALTHY": "🟢", "WARNING": "🟡", "DEGRADED": "🟠", "CRITICAL": "🔴", "MISSING": "⚫"} grade_counts = {} for info in models.values(): g = info.get("grade", "?") grade_counts[g] = grade_counts.get(g, 0) + 1 # 우선순위 높은 종목 상위 5개 critical = [(t, i) for t, i in models.items() if i.get("grade") in ("CRITICAL", "DEGRADED")] critical.sort(key=lambda x: {"CRITICAL": 0, "DEGRADED": 1}.get(x[1].get("grade"), 9)) summary_lines = "\n".join( f" {grade_emoji.get(g,'?')} {g}: {cnt}개" for g, cnt in grade_counts.items() ) critical_lines = "" for t, info in critical[:5]: critical_lines += f"\n {grade_emoji.get(info['grade'],'?')} {t}: {info.get('reason','?')}" missing_str = "" if missing: missing_str = f"\n\n모델 없는 감시종목:\n " + ", ".join(missing[:5]) if len(missing) > 5: missing_str += f" 외 {len(missing)-5}개" msg = ( f"🤖 LSTM 모델 건강도\n" f"━━━━━━━━━━━━━━━━━━\n" f"체크포인트 {len(models)}개:\n" f"{summary_lines}" ) if critical_lines: msg += f"\n\n조치 필요:{critical_lines}" msg += missing_str if not critical and not missing: msg += "\n\n✅ 모든 모델 정상" return msg def _fmt_weights(data: dict) -> str: current = data.get("current_global", {}) optimal = data.get("optimal_global", {}) health = data.get("ema_health", {}) contribs = data.get("signal_contributions", {}) issues = "\n".join(f" {i}" for i in health.get("issues", [])) health_status = "✅" if health.get("status") == "OK" else "⚠️" contrib_lines = "" for sig, c in contribs.items(): if c.get("total_trades", 0) > 0: acc = c.get("accuracy", 0) contrib_lines += f"\n {sig}: 정확도 {acc:.1%} ({c['total_trades']}거래)" delta_lines = "" for sig in ["tech", "lstm", "sentiment"]: cur = current.get(sig, 0) opt = optimal.get(sig, cur) diff = round(opt - cur, 3) arrow = "↑" if diff > 0 else ("↓" if diff < 0 else "→") delta_lines += f"\n {sig:12s}: {cur} {arrow} {opt}" msg = ( f"⚖️ 앙상블 가중치\n" f"━━━━━━━━━━━━━━━━━━\n" f"EMA 학습 상태: {health_status}\n{issues}\n" ) if contrib_lines: msg += f"\n신호 기여도:{contrib_lines}\n" msg += f"\n권고 조정:{delta_lines}" return msg def _fmt_postmortem(data: dict) -> str: stats = data.get("basic_stats", {}) combos = data.get("signal_combinations", {}) suggestions = data.get("parameter_suggestions", {}) days = data.get("days", 30) wr = stats.get("win_rate", 0) pr = stats.get("profit_ratio", 0) wr_emoji = "✅" if wr >= 55 else ("⚠️" if wr >= 50 else "🔴") pr_emoji = "✅" if pr >= 2.0 else ("⚠️" if pr >= 1.5 else "🔴") best_combos = list(combos.items())[:2] worst_combos = list(combos.items())[-2:] combo_lines = "" for k, v in best_combos: combo_lines += f"\n ✅ {k}: 승률 {v['win_rate']}% ({v['trades']}건)" for k, v in worst_combos: if v["win_rate"] < 50: combo_lines += f"\n ⚠️ {k}: 승률 {v['win_rate']}% ({v['trades']}건)" suggest_lines = "" for param, s in suggestions.items(): suggest_lines += f"\n {param}: {s.get('current','?')} → {s.get('recommended','?')}" msg = ( f"📊 매매 사후분석 (최근 {days}일)\n" f"━━━━━━━━━━━━━━━━━━\n" f"총 거래: {stats.get('total',0)}건 " f"승률: {wr_emoji} {wr}%\n" f"손익비: {pr_emoji} {pr} " f"Sharpe: {stats.get('sharpe',0)}\n" f"평균 수익: +{stats.get('avg_win_pct',0)}% " f"평균 손실: -{stats.get('avg_loss_pct',0)}%" ) if combo_lines: msg += f"\n\n신호 조합:{combo_lines}" if suggest_lines: msg += f"\n\n파라미터 권고:{suggest_lines}" return msg def _fmt_watchlist(data: dict) -> str: scored = data.get("scored", []) current = data.get("current_watchlist", []) r_min, r_max = data.get("recommended_range", (8, 15)) to_add = [s for s in scored if s.get("action") == "편입"] to_remove = [s for s in scored if s.get("action") == "제거"] to_keep = [s for s in scored if s.get("action") == "유지" and s.get("in_watchlist")] to_keep.sort(key=lambda x: x.get("total_score", 0), reverse=True) add_lines = "" for s in to_add[:5]: wr = f" ({s['win_rate']:.0%})" if s.get("win_rate") else "" add_lines += f"\n ✅ {s['ticker']} {s['total_score']}점 — {s.get('theme','?')}{wr}" remove_lines = "" for s in to_remove: remove_lines += f"\n ✕ {s['ticker']} {s['total_score']}점" keep_lines = "" for s in to_keep[:3]: keep_lines += f"\n • {s['ticker']} {s['total_score']}점" final = len(current) - len(to_remove) + len(to_add) size_ok = "✅" if r_min <= final <= r_max else "⚠️" msg = ( f"📋 Watchlist 분석\n" f"━━━━━━━━━━━━━━━━━━\n" f"현재 {len(current)}종목 → 최종 {final}종목 {size_ok}\n" f"권고 규모: {r_min}~{r_max}종목" ) if add_lines: msg += f"\n\n편입 추천:{add_lines}" if remove_lines: msg += f"\n\n제거 추천:{remove_lines}" if keep_lines: msg += f"\n\n상위 유지 종목:{keep_lines}" return msg # ───────────────────────────────────────────── # 공개 API — 텔레그램 핸들러에서 호출 # ───────────────────────────────────────────── def _to_chunks(text: str, limit: int = 3800) -> List[str]: """메시지가 Telegram 4096자 제한을 초과하면 청크로 분할""" if len(text) <= limit: return [text] chunks = [] while text: chunks.append(text[:limit]) text = text[limit:] return chunks async def run_syshealth() -> List[str]: script = _skill_script("bot-system-health-diagnostics", "health_checker.py") r = await _run_script(script, timeout=30) if not r["ok"]: return [f"⚠️ 시스템 헬스 실행 오류:\n{_escape_html(r['output'])}"] if r["json_data"]: return _to_chunks(_fmt_syshealth(r["json_data"])) return _to_chunks(f"
{_escape_html(r['output'])}
") async def run_risk() -> List[str]: script = _skill_script("auto-trade-risk-manager", "risk_dashboard.py") r = await _run_script(script, timeout=30) if not r["ok"]: return [f"⚠️ 리스크 분석 오류:\n{_escape_html(r['output'])}"] if r["json_data"]: return _to_chunks(_fmt_risk(r["json_data"])) return _to_chunks(f"
{_escape_html(r['output'])}
") async def run_regime() -> List[str]: script = _skill_script("korean-market-regime-detector", "regime_calculator.py") r = await _run_script(script, timeout=60) if not r["ok"]: return [f"⚠️ 레짐 분석 오류:\n{_escape_html(r['output'])}"] if r["json_data"]: return _to_chunks(_fmt_regime(r["json_data"])) return _to_chunks(f"
{_escape_html(r['output'])}
") async def run_model_health() -> List[str]: script = _skill_script("lstm-model-health-monitor", "model_health_report.py") r = await _run_script(script, timeout=60) if not r["ok"]: return [f"⚠️ 모델 건강도 오류:\n{_escape_html(r['output'])}"] if r["json_data"]: return _to_chunks(_fmt_model_health(r["json_data"])) return _to_chunks(f"
{_escape_html(r['output'])}
") async def run_weights() -> List[str]: script = _skill_script("ensemble-weight-optimizer", "weight_optimizer.py") r = await _run_script(script, timeout=30) if not r["ok"]: return [f"⚠️ 가중치 분석 오류:\n{_escape_html(r['output'])}"] if r["json_data"]: return _to_chunks(_fmt_weights(r["json_data"])) return _to_chunks(f"
{_escape_html(r['output'])}
") async def run_postmortem(days: int = 30) -> List[str]: script = _skill_script("trade-post-mortem-analyzer", "post_mortem_report.py") r = await _run_script(script, extra_args=["--days", str(days)], timeout=30) if not r["ok"]: return [f"⚠️ 매매 분석 오류:\n{_escape_html(r['output'])}"] if r["json_data"]: return _to_chunks(_fmt_postmortem(r["json_data"])) if not r["output"].strip(): return [f"📊 매매 사후분석 (최근 {days}일)\n━━━━━━━━━━━━━━━━━━\n분석 대상 매매 기록이 없습니다."] return _to_chunks(f"
{_escape_html(r['output'])}
") async def run_watchlist_check(candidates: Optional[List[str]] = None) -> List[str]: script = _skill_script("watchlist-intelligence-curator", "watchlist_scorer.py") extra = [] if candidates: extra = ["--candidates"] + candidates r = await _run_script(script, extra_args=extra, timeout=30) if not r["ok"]: return [f"⚠️ Watchlist 분석 오류:\n{_escape_html(r['output'])}"] if r["json_data"]: return _to_chunks(_fmt_watchlist(r["json_data"])) return _to_chunks(f"
{_escape_html(r['output'])}
")