From 1d4354e4021a5d0c49632d1f549b779d730e7935 Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 1 May 2026 12:13:12 +0900 Subject: [PATCH] =?UTF-8?q?feat(agent-office):=20YouTubeResearchAgent=20+?= =?UTF-8?q?=20=EC=8A=A4=EC=BC=80=EC=A4=84=EB=9F=AC=20+=20/youtube/research?= =?UTF-8?q?=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - db.py: youtube_research_jobs 테이블 추가 + CRUD 3종 (add/update/get_latest) - agents/youtube.py: YouTubeResearchAgent 신규 구현 (on_schedule/on_command/on_approval/_run_research/send_weekly_report) - agents/__init__.py: YouTubeResearchAgent 등록 - scheduler.py: youtube_research(매일 09:00) + youtube_weekly_report(월 08:00) cron 추가 - main.py: POST /api/agent-office/youtube/research + GET /api/agent-office/youtube/research/status 엔드포인트 추가 Co-Authored-By: Claude Sonnet 4.6 --- agent-office/app/agents/__init__.py | 2 + agent-office/app/agents/youtube.py | 85 +++++++++++++++++++++++++++++ agent-office/app/db.py | 54 ++++++++++++++++++ agent-office/app/main.py | 28 +++++++++- agent-office/app/scheduler.py | 12 ++++ 5 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 agent-office/app/agents/youtube.py diff --git a/agent-office/app/agents/__init__.py b/agent-office/app/agents/__init__.py index 4b913e2..6556440 100644 --- a/agent-office/app/agents/__init__.py +++ b/agent-office/app/agents/__init__.py @@ -3,6 +3,7 @@ from .music import MusicAgent from .blog import BlogAgent from .realestate import RealestateAgent from .lotto import LottoAgent +from .youtube import YouTubeResearchAgent AGENT_REGISTRY = {} @@ -12,6 +13,7 @@ def init_agents(): AGENT_REGISTRY["blog"] = BlogAgent() AGENT_REGISTRY["realestate"] = RealestateAgent() AGENT_REGISTRY["lotto"] = LottoAgent() + AGENT_REGISTRY["youtube"] = YouTubeResearchAgent() def get_agent(agent_id: str): return AGENT_REGISTRY.get(agent_id) diff --git a/agent-office/app/agents/youtube.py b/agent-office/app/agents/youtube.py new file mode 100644 index 0000000..11276a3 --- /dev/null +++ b/agent-office/app/agents/youtube.py @@ -0,0 +1,85 @@ +# agent-office/app/agents/youtube.py +import asyncio +from datetime import date + +from .base import BaseAgent +from ..db import add_youtube_research_job, update_youtube_research_job +from ..youtube_researcher import ( + TARGET_COUNTRIES, TREND_KEYWORDS, + fetch_youtube_trending, fetch_google_trends, fetch_billboard_top20, + push_to_music_lab, +) + + +class YouTubeResearchAgent(BaseAgent): + agent_id = "youtube" + display_name = "YouTube 리서치" + + async def on_schedule(self) -> None: + await self._run_research(TARGET_COUNTRIES) + + async def on_command(self, command: str, params: dict) -> dict: + if command == "research": + countries = params.get("countries", TARGET_COUNTRIES) + asyncio.create_task(self._run_research(countries)) + return {"ok": True, "message": f"리서치 시작: {countries}"} + return {"ok": False, "message": f"Unknown command: {command}"} + + async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: + pass + + async def _run_research(self, countries: list) -> None: + job_id = add_youtube_research_job(countries) + await self.transition("working", f"트렌드 수집 중 ({','.join(countries)})", str(job_id)) + + all_trends = [] + try: + for country in countries: + trends = await fetch_youtube_trending(country) + all_trends.extend(trends) + + gt = await fetch_google_trends(TREND_KEYWORDS, countries) + all_trends.extend(gt) + + bb = await fetch_billboard_top20() + all_trends.extend(bb) + + ok = await push_to_music_lab(all_trends, date.today().isoformat()) + if not ok: + raise RuntimeError("music-lab push 실패") + + update_youtube_research_job(job_id, "completed", len(all_trends)) + await self.transition("reporting", f"수집 완료: {len(all_trends)}건", str(job_id)) + except Exception as e: + update_youtube_research_job(job_id, "failed", len(all_trends), str(e)) + await self.transition("idle", f"수집 실패: {e}") + return + + await self.transition("idle", "리서치 완료") + + async def send_weekly_report(self) -> None: + """매주 월요일 08:00 — 주간 인사이트 텔레그램 발송.""" + import httpx + from ..youtube_researcher import MUSIC_LAB_URL + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get(f"{MUSIC_LAB_URL}/api/music/market/report/latest") + if resp.status_code != 200: + return + report = resp.json() + except Exception: + return + + top = report.get("top_genres", [])[:3] + insights = report.get("insights", "") + text = "📊 *YouTube 시장 주간 리포트*\n\n🔥 인기 장르:\n" + for g in top: + text += f" • {g['genre']} (score: {g['score']:.2f})\n" + if insights: + text += f"\n💡 {insights[:300]}" + + try: + from ..telegram_bot import send_message + await send_message(text) + except (ImportError, Exception): + pass diff --git a/agent-office/app/db.py b/agent-office/app/db.py index 61aa7c2..eab9f93 100644 --- a/agent-office/app/db.py +++ b/agent-office/app/db.py @@ -86,6 +86,17 @@ def init_db() -> None: CREATE INDEX IF NOT EXISTS idx_conv_chat ON conversation_messages(chat_id, created_at DESC) """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS youtube_research_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + status TEXT NOT NULL DEFAULT 'running', + countries TEXT NOT NULL DEFAULT '[]', + trends_collected INTEGER NOT NULL DEFAULT 0, + error TEXT, + started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), + completed_at TEXT + ) + """) # Seed default agent configs for agent_id, name in [ ("stock", "주식 트레이더"), @@ -93,6 +104,7 @@ def init_db() -> None: ("blog", "블로그 마케터"), ("realestate", "청약 애널리스트"), ("lotto", "로또 큐레이터"), + ("youtube", "YouTube 리서치"), ]: conn.execute( "INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)", @@ -501,3 +513,45 @@ def get_activity_feed(limit: int = 50, offset: int = 0) -> dict: items.append(item) return {"items": items, "total": total} + + +# ── youtube_research_jobs CRUD ──────────────────────────────────────────────── + +def add_youtube_research_job(countries: list) -> int: + with _conn() as conn: + conn.execute( + "INSERT INTO youtube_research_jobs (countries) VALUES (?)", + (json.dumps(countries),), + ) + return conn.execute("SELECT last_insert_rowid()").fetchone()[0] + + +def update_youtube_research_job( + job_id: int, status: str, trends_collected: int, error: Optional[str] = None +) -> None: + with _conn() as conn: + conn.execute( + """UPDATE youtube_research_jobs + SET status=?, trends_collected=?, error=?, + completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') + WHERE id=?""", + (status, trends_collected, error, job_id), + ) + + +def get_latest_youtube_research_job() -> Optional[Dict[str, Any]]: + with _conn() as conn: + row = conn.execute( + "SELECT * FROM youtube_research_jobs ORDER BY started_at DESC LIMIT 1" + ).fetchone() + if not row: + return None + return { + "id": row["id"], + "status": row["status"], + "countries": json.loads(row["countries"]), + "trends_collected": row["trends_collected"], + "error": row["error"], + "started_at": row["started_at"], + "completed_at": row["completed_at"], + } diff --git a/agent-office/app/main.py b/agent-office/app/main.py index efab5ef..25c8b12 100644 --- a/agent-office/app/main.py +++ b/agent-office/app/main.py @@ -4,7 +4,7 @@ from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from .config import CORS_ALLOW_ORIGINS -from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed +from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed, get_latest_youtube_research_job from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate from .websocket_manager import ws_manager from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY @@ -199,3 +199,29 @@ async def realestate_notify(body: RealestateNotifyBody): from fastapi import HTTPException raise HTTPException(status_code=503, detail="RealestateAgent not initialized") return await agent.on_new_matches(body.matches) + + +# --- YouTube Research Agent Endpoints --- + +class YouTubeResearchBody(BaseModel): + countries: List[str] = [] + + +@app.post("/api/agent-office/youtube/research") +async def trigger_youtube_research(body: YouTubeResearchBody = None): + agent = AGENT_REGISTRY.get("youtube") + if not agent: + raise HTTPException(status_code=503, detail="YouTubeResearchAgent 없음") + params = {} + if body and body.countries: + params["countries"] = body.countries + result = await agent.on_command("research", params) + return result + + +@app.get("/api/agent-office/youtube/research/status") +def youtube_research_status(): + job = get_latest_youtube_research_job() + if not job: + return {"status": "never_run"} + return job diff --git a/agent-office/app/scheduler.py b/agent-office/app/scheduler.py index 4b619ca..32fecf9 100644 --- a/agent-office/app/scheduler.py +++ b/agent-office/app/scheduler.py @@ -24,9 +24,21 @@ async def _run_lotto_schedule(): if agent: await agent.on_schedule() +async def _run_youtube_research(): + agent = AGENT_REGISTRY.get("youtube") + if agent: + await agent.on_schedule() + +async def _send_youtube_weekly_report(): + agent = AGENT_REGISTRY.get("youtube") + if agent: + await agent.send_weekly_report() + def init_scheduler(): scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news") scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline") scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate") + scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research") + scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report") scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check") scheduler.start()