feat(agent-office): notification broadcast + telegram tracking + activity feed API

- Add WebSocket notification messages for task_assigned/task_completed
- Structure telegram send_message return value with ok/message_id
- Track telegram delivery status in task result_data
- Add test_telegram command to stock agent
- Add GET /api/agent-office/activity unified feed endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-11 15:15:01 +09:00
parent cce84de8be
commit de91f424a3
6 changed files with 112 additions and 8 deletions

View File

@@ -40,6 +40,14 @@ class BaseAgent:
if self._ws_manager: if self._ws_manager:
await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id) await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id)
if new_state == "working" and old != "working":
await self._ws_manager.send_notification(
self.agent_id, "task_assigned", task_id, detail or "새 작업 시작"
)
elif new_state == "idle" and old in ("working", "reporting"):
await self._ws_manager.send_notification(
self.agent_id, "task_completed", task_id, detail or "작업 완료"
)
if new_state == "break": if new_state == "break":
await self._ws_manager.send_agent_move(self.agent_id, "break_room") await self._ws_manager.send_agent_move(self.agent_id, "break_room")
elif old == "break" and new_state == "idle": elif old == "break" and new_state == "idle":

View File

@@ -22,15 +22,24 @@ class StockAgent(BaseAgent):
summary = self._format_news_summary(news, indices) summary = self._format_news_summary(news, indices)
update_task_status(task_id, "succeeded", {
"summary": summary,
"news_count": len(news) if isinstance(news, list) else 0,
})
await self.transition("reporting", "뉴스 요약 전송 중...") await self.transition("reporting", "뉴스 요약 전송 중...")
from ..telegram_bot import send_stock_summary from ..telegram_bot import send_stock_summary
await send_stock_summary(summary) tg_result = await send_stock_summary(summary)
update_task_status(task_id, "succeeded", {
"summary": summary,
"news_count": len(news) if isinstance(news, list) else 0,
"telegram_sent": tg_result.get("ok", False),
"telegram_message_id": tg_result.get("message_id"),
})
if not tg_result.get("ok"):
add_log(self.agent_id, "Telegram send failed", "warning", task_id)
if self._ws_manager:
await self._ws_manager.send_notification(
self.agent_id, "telegram_failed", task_id, "텔레그램 전송 실패"
)
await self.transition("idle", "뉴스 요약 완료") await self.transition("idle", "뉴스 요약 완료")
@@ -40,6 +49,15 @@ class StockAgent(BaseAgent):
await self.transition("idle", f"오류: {e}") await self.transition("idle", f"오류: {e}")
async def on_command(self, command: str, params: dict) -> dict: async def on_command(self, command: str, params: dict) -> dict:
if command == "test_telegram":
from ..telegram_bot import send_message
result = await send_message("🔔 [주식 에이전트] 텔레그램 테스트 메시지입니다.")
return {
"ok": result.get("ok", False),
"message": "텔레그램 전송 성공" if result.get("ok") else "텔레그램 전송 실패",
"telegram_message_id": result.get("message_id"),
}
if command == "fetch_news": if command == "fetch_news":
await self.on_schedule() await self.on_schedule()
return {"ok": True, "message": "뉴스 수집 시작"} return {"ok": True, "message": "뉴스 수집 시작"}

View File

@@ -259,3 +259,64 @@ def mark_telegram_responded(callback_id: str, action: str) -> None:
"UPDATE telegram_state SET responded=1, action=? WHERE callback_id=?", "UPDATE telegram_state SET responded=1, action=? WHERE callback_id=?",
(action, callback_id), (action, callback_id),
) )
def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
with _conn() as conn:
total_row = conn.execute("""
SELECT (SELECT COUNT(*) FROM agent_tasks) + (SELECT COUNT(*) FROM agent_logs) AS total
""").fetchone()
total = total_row["total"] if total_row else 0
rows = conn.execute("""
SELECT 'task' AS type, agent_id, id AS task_id, task_type,
status, NULL AS level,
COALESCE(
json_extract(result_data, '$.summary'),
task_type
) AS message,
created_at, completed_at,
result_data
FROM agent_tasks
UNION ALL
SELECT 'log' AS type, agent_id, task_id, NULL AS task_type,
NULL AS status, level,
message,
created_at, NULL AS completed_at,
NULL AS result_data
FROM agent_logs
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (limit, offset)).fetchall()
items = []
for r in rows:
item = {
"type": r["type"],
"agent_id": r["agent_id"],
"task_id": r["task_id"],
"message": r["message"],
"created_at": r["created_at"],
}
if r["type"] == "task":
item["task_type"] = r["task_type"]
item["status"] = r["status"]
item["completed_at"] = r["completed_at"]
if r["created_at"] and r["completed_at"]:
try:
from datetime import datetime
start = datetime.fromisoformat(r["created_at"].replace("Z", "+00:00"))
end = datetime.fromisoformat(r["completed_at"].replace("Z", "+00:00"))
item["duration_seconds"] = round((end - start).total_seconds())
except Exception:
item["duration_seconds"] = None
else:
item["duration_seconds"] = None
result_data = json.loads(r["result_data"]) if r["result_data"] else None
if result_data and "telegram_sent" in result_data:
item["telegram_sent"] = result_data["telegram_sent"]
else:
item["level"] = r["level"]
items.append(item)
return {"items": items, "total": total}

View File

@@ -4,7 +4,7 @@ from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from .config import CORS_ALLOW_ORIGINS from .config import CORS_ALLOW_ORIGINS
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed
from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate
from .websocket_manager import ws_manager from .websocket_manager import ws_manager
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
@@ -150,3 +150,7 @@ async def telegram_webhook(data: dict):
@app.get("/api/agent-office/states") @app.get("/api/agent-office/states")
def all_states(): def all_states():
return {"agents": get_all_agent_states()} return {"agents": get_all_agent_states()}
@app.get("/api/agent-office/activity")
def activity_feed(limit: int = 50, offset: int = 0):
return get_activity_feed(limit, offset)

View File

@@ -26,7 +26,11 @@ async def send_message(text: str, reply_markup: dict = None) -> dict:
} }
if reply_markup: if reply_markup:
payload["reply_markup"] = reply_markup payload["reply_markup"] = reply_markup
return await _api("sendMessage", payload) result = await _api("sendMessage", payload)
return {
"ok": result.get("ok", False),
"message_id": result.get("result", {}).get("message_id") if result.get("ok") else None,
}
async def send_stock_summary(summary: str) -> dict: async def send_stock_summary(summary: str) -> dict:
return await send_message(summary) return await send_message(summary)

View File

@@ -43,4 +43,13 @@ class WebSocketManager:
async def send_agent_move(self, agent_id: str, target: str) -> None: async def send_agent_move(self, agent_id: str, target: str) -> None:
await self.broadcast({"type": "agent_move", "agent": agent_id, "target": target}) await self.broadcast({"type": "agent_move", "agent": agent_id, "target": target})
async def send_notification(self, agent_id: str, event: str, task_id: str = None, message: str = "") -> None:
await self.broadcast({
"type": "notification",
"agent": agent_id,
"event": event,
"task_id": task_id,
"message": message,
})
ws_manager = WebSocketManager() ws_manager = WebSocketManager()