"""텔레그램 Webhook 이벤트 처리.""" from typing import Optional from ..db import get_telegram_callback, mark_telegram_responded from .client import _enabled, api_call async def handle_webhook(data: dict, agent_dispatcher=None) -> Optional[dict]: """텔레그램에서 들어오는 이벤트 처리. - callback_query(인라인 버튼)는 항상 처리 → 승인/거절 dict 반환 - message(텍스트 슬래시 명령)는 `agent_dispatcher`가 주입된 경우에만 처리 agent_dispatcher: async (agent_id, command, params) -> dict - agent_id == "__global__", command == "status" 특수 케이스는 {agent_id: {state, detail}} dict를 반환해야 함. """ callback_query = data.get("callback_query") if callback_query: return await _handle_callback(callback_query) message = data.get("message") if message: chat = message.get("chat", {}) print(f"[TG-WEBHOOK] chat.id={chat.get('id')} type={chat.get('type')} text={message.get('text')!r}", flush=True) if message and message.get("text") and agent_dispatcher is not None: return await _handle_message(message, agent_dispatcher) return None async def _handle_callback(callback_query: dict) -> Optional[dict]: """기존 승인/거절 콜백 처리 로직.""" callback_id = callback_query.get("data", "") cb = get_telegram_callback(callback_id) if not cb: return None action = callback_id.split("_")[0] mark_telegram_responded(callback_id, action) feedback_text = { "approve": "승인됨 ✅", "reject": "거절됨 ❌", }.get(action, f"처리됨: {action}") await api_call( "answerCallbackQuery", { "callback_query_id": callback_query["id"], "text": feedback_text, }, ) return { "task_id": cb["task_id"], "agent_id": cb["agent_id"], "action": action, "approved": action == "approve", } async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]: """슬래시 명령 메시지 처리.""" from .router import parse_command, resolve_agent_command, HELP_TEXT from .messaging import send_raw, send_agent_message from .agent_registry import AGENT_META text = message.get("text", "") parsed = parse_command(text) if not parsed: # 슬래시 명령이 아니면 자연어 대화로 라우팅 chat_id = str(message.get("chat", {}).get("id", "")) if not chat_id: return None from .conversational import respond_to_message reply = await respond_to_message(chat_id, text) if reply: import html as _html await send_raw(_html.escape(reply), chat_id=chat_id) return {"handled": "chat"} return None agent_id, command, args = parsed # 전역 명령 if agent_id is None: if command == "help": await send_raw(HELP_TEXT) return {"handled": "help"} if command == "agents": lines = ["📋 등록된 에이전트", ""] for aid, meta in AGENT_META.items(): lines.append( f"{meta['emoji']} {meta['display_name']} /{aid}" ) await send_raw("\n".join(lines)) return {"handled": "agents"} if command == "status": try: result = await agent_dispatcher("__global__", "status", {}) body_lines = [] if isinstance(result, dict): for aid, info in result.items(): meta = AGENT_META.get( aid, {"emoji": "🤖", "display_name": aid} ) state = info.get("state", "unknown") if isinstance(info, dict) else "unknown" body_lines.append( f"{meta['emoji']} {meta['display_name']}: {state}" ) detail = info.get("detail") if isinstance(info, dict) else None if detail: body_lines.append(f" └ {detail}") await send_raw("📊 전체 상태\n\n" + "\n".join(body_lines)) except Exception as e: await send_raw(f"⚠️ 상태 조회 실패: {e}") return {"handled": "status"} return None # 에이전트 명령 if agent_id not in AGENT_META: await send_raw( f"⚠️ 알 수 없는 에이전트: {agent_id}\n/help 로 사용 가능한 명령 확인" ) return {"handled": "unknown_agent"} resolved = resolve_agent_command(agent_id, command, args) if resolved is None: await send_raw( f"⚠️ {agent_id}에서 {command} 명령은 지원하지 않습니다." ) return {"handled": "unknown_command"} internal_cmd, params = resolved try: result = await agent_dispatcher(agent_id, internal_cmd, params) ok = result.get("ok", False) if isinstance(result, dict) else False msg = result.get("message", "") if isinstance(result, dict) else str(result) await send_agent_message( agent_id=agent_id, kind="info" if ok else "error", title=f"{internal_cmd} 실행 결과", body=msg or str(result), ) except Exception as e: await send_raw(f"⚠️ 명령 실행 실패: {e}") return {"handled": "command", "agent_id": agent_id, "command": internal_cmd} async def setup_webhook() -> dict: from ..config import TELEGRAM_WEBHOOK_URL if not _enabled() or not TELEGRAM_WEBHOOK_URL: return {"ok": False, "description": "Webhook URL not configured"} return await api_call("setWebhook", {"url": TELEGRAM_WEBHOOK_URL})