Files
web-page-backend/agent-office/app/telegram/messaging.py
gahusb b867b8ce13 feat(agent-office): 아침 시장 브리핑 아내 텔레그램 추가 전송
- TELEGRAM_WIFE_CHAT_ID 환경변수 추가 (빈 값이면 비활성)
- send_raw()에 chat_id override 파라미터 추가
- 주식 에이전트 브리핑 전송 후, 아내 chat에 제목+본문만 간결 포맷으로 추가 전송
  (기술 메타데이터/버튼 없음, 읽기 전용)

NAS .env에 TELEGRAM_WIFE_CHAT_ID 추가 필요.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 00:49:55 +09:00

72 lines
2.3 KiB
Python

"""고수준 메시지 전송 API."""
import uuid
from typing import Optional
from ..config import TELEGRAM_CHAT_ID
from ..db import save_telegram_callback
from .client import _enabled, api_call
from .formatter import MessageKind, format_agent_message
async def send_raw(text: str, reply_markup: Optional[dict] = None, chat_id: Optional[str] = None) -> dict:
"""가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로."""
if not _enabled():
return {"ok": False, "message_id": None}
payload = {
"chat_id": chat_id or TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
}
if reply_markup:
payload["reply_markup"] = reply_markup
result = await api_call("sendMessage", payload)
ok = result.get("ok", False)
return {
"ok": ok,
"message_id": result.get("result", {}).get("message_id") if ok else None,
"description": result.get("description") if not ok else None,
"error_code": result.get("error_code") if not ok else None,
}
async def send_agent_message(
agent_id: str,
kind: MessageKind,
title: str,
body: str,
task_id: Optional[str] = None,
actions: Optional[list] = None,
metadata: Optional[dict] = None,
) -> dict:
"""통합 에이전트 메시지 API. 모든 에이전트가 이걸 씀."""
text = format_agent_message(agent_id, kind, title, body, metadata)
reply_markup = None
if actions:
buttons = []
for action in actions:
cb_id = f"{action['action']}_{uuid.uuid4().hex[:8]}"
save_telegram_callback(cb_id, task_id or "", agent_id)
buttons.append({"text": action["label"], "callback_data": cb_id})
reply_markup = {"inline_keyboard": [buttons]}
return await send_raw(text, reply_markup)
async def send_approval_request(
agent_id: str,
task_id: str,
title: str,
detail: str,
) -> dict:
"""승인/거절 단축 헬퍼."""
return await send_agent_message(
agent_id=agent_id,
kind="approval",
title=title,
body=detail,
task_id=task_id,
actions=[
{"label": "✅ 승인", "action": "approve"},
{"label": "❌ 거절", "action": "reject"},
],
)