# agent-office/app/agents/youtube.py import asyncio import logging from datetime import date import httpx from .base import BaseAgent from ..db import add_youtube_research_job, update_youtube_research_job, add_log from ..youtube_researcher import ( TARGET_COUNTRIES, TREND_KEYWORDS, MUSIC_LAB_URL, fetch_youtube_trending, fetch_google_trends, fetch_billboard_top20, push_to_music_lab, ) logger = logging.getLogger(__name__) class YouTubeResearchAgent(BaseAgent): agent_id = "youtube" display_name = "YouTube 리서치" async def on_schedule(self) -> None: await self._run_research(TARGET_COUNTRIES) async def on_command(self, command: str, params: dict) -> dict: if command == "research": if self.state == "working": return {"ok": False, "message": "이미 수집 중"} countries = params.get("countries", TARGET_COUNTRIES) asyncio.create_task(self._run_research(countries)) return {"ok": True, "message": f"리서치 시작: {countries}"} return {"ok": False, "message": f"Unknown command: {command}"} async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: pass async def _run_research(self, countries: list) -> None: job_id = add_youtube_research_job(countries) await self.transition("working", f"트렌드 수집 중 ({','.join(countries)})", str(job_id)) all_trends = [] try: for country in countries: trends = await fetch_youtube_trending(country) all_trends.extend(trends) gt = await fetch_google_trends(TREND_KEYWORDS, countries) all_trends.extend(gt) bb = await fetch_billboard_top20() all_trends.extend(bb) ok = await push_to_music_lab(all_trends, date.today().isoformat()) if not ok: raise RuntimeError("music-lab push 실패") update_youtube_research_job(job_id, "completed", len(all_trends)) await self.transition("reporting", f"수집 완료: {len(all_trends)}건", str(job_id)) except Exception as e: update_youtube_research_job(job_id, "failed", len(all_trends), str(e)) await self.transition("idle", f"수집 실패: {e}") return await self.transition("idle", "리서치 완료") async def send_weekly_report(self) -> None: """매주 월요일 08:00 — 주간 인사이트 텔레그램 발송.""" try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get(f"{MUSIC_LAB_URL}/api/music/market/report/latest") if resp.status_code != 200: return report = resp.json() except Exception as e: add_log(self.agent_id, f"주간 리포트 조회 실패: {e}", level="error") logger.error("send_weekly_report: music-lab 조회 실패: %s", e) return top = report.get("top_genres", [])[:3] insights = report.get("insights", "") text = "📊 *YouTube 시장 주간 리포트*\n\n🔥 인기 장르:\n" for g in top: text += f" • {g['genre']} (score: {g['score']:.2f})\n" if insights: text += f"\n💡 {insights[:300]}" try: from ..telegram_bot import send_message await send_message(text) except (ImportError, Exception) as e: add_log(self.agent_id, f"주간 리포트 텔레그램 발송 실패: {e}", level="error") logger.error("send_weekly_report: 텔레그램 발송 실패: %s", e)