Files
web-page-backend/agent-office/app/agents/stock.py
gahusb b23346143f feat(agent-office): 주식 브리핑 본문에 주요 뉴스 헤드라인+링크 추가
- stock-lab /news/summarize 응답에 top 8 기사(title/link/press) 포함
- agent-office stock.py: _build_briefing_body() 헬퍼 분리 — LLM 요약 + 📰 주요 뉴스 섹션(HTML <a> 링크). 향후 본문 고도화 시 이 함수만 수정
- telegram 포맷터/메시징에 body_is_html 플래그 추가 (링크 포함 메시지는 이중 escape 회피)
- 아내 전송도 동일 본문 재사용

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 00:56:20 +09:00

158 lines
6.4 KiB
Python

import asyncio
import html
from typing import Optional
from .base import BaseAgent
from ..db import create_task, update_task_status, get_agent_config, add_log
from .. import service_proxy
def _build_briefing_body(result: dict, max_headlines: int = 5) -> str:
"""아침 시장 브리핑 본문 조립.
LLM 요약 + 주요 뉴스 헤드라인(링크) 섹션을 합친다.
향후 본문 고도화 시 이 함수만 수정하면 됨 (텔레그램 HTML parse_mode).
"""
summary = (result.get("summary") or "").strip()
articles = result.get("articles") or []
# body_is_html=True 로 보낼 예정이므로 LLM 요약(plain text)도 escape
parts = [html.escape(summary)] if summary else []
headlines = []
for a in articles[:max_headlines]:
title = (a.get("title") or "").strip()
if not title:
continue
title_esc = html.escape(title)
link = (a.get("link") or "").strip()
press = (a.get("press") or "").strip()
press_suffix = f"{html.escape(press)}" if press else ""
if link:
headlines.append(f'• <a href="{html.escape(link, quote=True)}">{title_esc}</a>{press_suffix}')
else:
headlines.append(f"{title_esc}{press_suffix}")
if headlines:
parts.append("📰 <b>주요 뉴스</b>\n" + "\n".join(headlines))
return "\n\n".join(parts)
class StockAgent(BaseAgent):
agent_id = "stock"
display_name = "주식 트레이더"
async def on_schedule(self) -> None:
if self.state not in ("idle", "break"):
return
task_id = create_task(self.agent_id, "news_summary", {"limit": 15})
await self.transition("working", "AI 뉴스 요약 생성 중...", task_id)
try:
# AI 요약 호출 (뉴스 수집 + LLM 처리는 stock-lab이 담당)
result = await service_proxy.summarize_stock_news(limit=15)
await self.transition("reporting", "뉴스 요약 전송 중...")
body = _build_briefing_body(result)
# 새 통합 텔레그램 API 사용
from ..telegram import send_agent_message
tg_result = await send_agent_message(
agent_id=self.agent_id,
kind="report",
title="아침 시장 브리핑",
body=body,
body_is_html=True,
task_id=task_id,
metadata={
"tokens": result["tokens"]["total"],
"duration_ms": result["duration_ms"],
"model": result["model"],
},
)
# 아내 chat 추가 전송 (설정된 경우) — 제목 + 본문만 간결하게
from ..config import TELEGRAM_WIFE_CHAT_ID
if TELEGRAM_WIFE_CHAT_ID:
from ..telegram.messaging import send_raw
wife_text = f"📈 <b>아침 시장 브리핑</b>\n\n{body}"
wife_result = await send_raw(wife_text, chat_id=TELEGRAM_WIFE_CHAT_ID)
if not wife_result.get("ok"):
desc = wife_result.get("description") or "unknown"
add_log(self.agent_id, f"Wife telegram send failed: {desc}", "warning", task_id)
update_task_status(task_id, "succeeded", {
"summary": result["summary"],
"article_count": result.get("article_count", 0),
"tokens": result["tokens"],
"model": result["model"],
"duration_ms": result["duration_ms"],
"telegram_sent": tg_result.get("ok", False),
"telegram_message_id": tg_result.get("message_id"),
})
if not tg_result.get("ok"):
desc = tg_result.get("description") or "unknown"
code = tg_result.get("error_code")
add_log(self.agent_id, f"Telegram send failed: [{code}] {desc}", "warning", task_id)
if self._ws_manager:
await self._ws_manager.send_notification(
self.agent_id, "telegram_failed", task_id, "텔레그램 전송 실패"
)
await self.transition("idle", "뉴스 요약 완료")
except Exception as e:
add_log(self.agent_id, f"News summary failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}")
async def on_command(self, command: str, params: dict) -> dict:
if command == "test_telegram":
from ..telegram import send_agent_message
result = await send_agent_message(
agent_id=self.agent_id,
kind="info",
title="연결 테스트",
body="텔레그램 연동이 정상적으로 동작합니다.",
)
return {
"ok": result.get("ok", False),
"message": "텔레그램 전송 성공" if result.get("ok") else "텔레그램 전송 실패",
"telegram_message_id": result.get("message_id"),
}
if command == "fetch_news":
await self.on_schedule()
return {"ok": True, "message": "뉴스 수집 시작"}
if command == "add_alert":
symbol = params.get("symbol")
target_price = params.get("target_price")
if not symbol or target_price is None:
return {"ok": False, "message": "symbol과 target_price는 필수입니다"}
config = get_agent_config(self.agent_id)
alerts = config["custom_config"].get("alerts", [])
alerts.append({
"symbol": symbol,
"name": params.get("name", symbol),
"target_price": target_price,
"direction": params.get("direction", "above"),
})
from ..db import update_agent_config
update_agent_config(self.agent_id, custom_config={**config["custom_config"], "alerts": alerts})
return {"ok": True, "message": f"알람 추가: {params['symbol']}"}
if command == "list_alerts":
config = get_agent_config(self.agent_id)
alerts = config["custom_config"].get("alerts", [])
return {"ok": True, "alerts": alerts}
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
pass