feat: YouTube 수익화 파이프라인 — 영상 제작·수익 추적·시장 조사 에이전트
- music-lab: FFmpeg 비주얼라이저·슬라이드쇼 영상 제작 (video_producer.py) - music-lab: 영상 프로젝트 6개 + 수익화 5개 + 시장조사 5개 API 추가 - music-lab: video_projects·revenue_records·market_trends·trend_reports DB 추가 - agent-office: YouTubeResearchAgent (YouTube API·pytrends·Billboard 수집, 09:00 daily) - agent-office: youtube_researcher.py + /youtube/research API 2개 추가 - infra: Dockerfile ffmpeg + Nginx /media/videos/ + docker-compose 볼륨·환경변수
This commit is contained in:
@@ -3,6 +3,7 @@ from .music import MusicAgent
|
||||
from .blog import BlogAgent
|
||||
from .realestate import RealestateAgent
|
||||
from .lotto import LottoAgent
|
||||
from .youtube import YouTubeResearchAgent
|
||||
|
||||
AGENT_REGISTRY = {}
|
||||
|
||||
@@ -12,6 +13,7 @@ def init_agents():
|
||||
AGENT_REGISTRY["blog"] = BlogAgent()
|
||||
AGENT_REGISTRY["realestate"] = RealestateAgent()
|
||||
AGENT_REGISTRY["lotto"] = LottoAgent()
|
||||
AGENT_REGISTRY["youtube"] = YouTubeResearchAgent()
|
||||
|
||||
def get_agent(agent_id: str):
|
||||
return AGENT_REGISTRY.get(agent_id)
|
||||
|
||||
93
agent-office/app/agents/youtube.py
Normal file
93
agent-office/app/agents/youtube.py
Normal file
@@ -0,0 +1,93 @@
|
||||
# agent-office/app/agents/youtube.py
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import date
|
||||
|
||||
import httpx
|
||||
|
||||
from .base import BaseAgent
|
||||
from ..db import add_youtube_research_job, update_youtube_research_job, add_log
|
||||
from ..youtube_researcher import (
|
||||
TARGET_COUNTRIES, TREND_KEYWORDS, MUSIC_LAB_URL,
|
||||
fetch_youtube_trending, fetch_google_trends, fetch_billboard_top20,
|
||||
push_to_music_lab,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class YouTubeResearchAgent(BaseAgent):
|
||||
agent_id = "youtube"
|
||||
display_name = "YouTube 리서치"
|
||||
|
||||
async def on_schedule(self) -> None:
|
||||
await self._run_research(TARGET_COUNTRIES)
|
||||
|
||||
async def on_command(self, command: str, params: dict) -> dict:
|
||||
if command == "research":
|
||||
if self.state == "working":
|
||||
return {"ok": False, "message": "이미 수집 중"}
|
||||
countries = params.get("countries", TARGET_COUNTRIES)
|
||||
asyncio.create_task(self._run_research(countries))
|
||||
return {"ok": True, "message": f"리서치 시작: {countries}"}
|
||||
return {"ok": False, "message": f"Unknown command: {command}"}
|
||||
|
||||
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
|
||||
pass
|
||||
|
||||
async def _run_research(self, countries: list) -> None:
|
||||
job_id = add_youtube_research_job(countries)
|
||||
await self.transition("working", f"트렌드 수집 중 ({','.join(countries)})", str(job_id))
|
||||
|
||||
all_trends = []
|
||||
try:
|
||||
for country in countries:
|
||||
trends = await fetch_youtube_trending(country)
|
||||
all_trends.extend(trends)
|
||||
|
||||
gt = await fetch_google_trends(TREND_KEYWORDS, countries)
|
||||
all_trends.extend(gt)
|
||||
|
||||
bb = await fetch_billboard_top20()
|
||||
all_trends.extend(bb)
|
||||
|
||||
ok = await push_to_music_lab(all_trends, date.today().isoformat())
|
||||
if not ok:
|
||||
raise RuntimeError("music-lab push 실패")
|
||||
|
||||
update_youtube_research_job(job_id, "completed", len(all_trends))
|
||||
await self.transition("reporting", f"수집 완료: {len(all_trends)}건", str(job_id))
|
||||
except Exception as e:
|
||||
update_youtube_research_job(job_id, "failed", len(all_trends), str(e))
|
||||
await self.transition("idle", f"수집 실패: {e}")
|
||||
return
|
||||
|
||||
await self.transition("idle", "리서치 완료")
|
||||
|
||||
async def send_weekly_report(self) -> None:
|
||||
"""매주 월요일 08:00 — 주간 인사이트 텔레그램 발송."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/market/report/latest")
|
||||
if resp.status_code != 200:
|
||||
return
|
||||
report = resp.json()
|
||||
except Exception as e:
|
||||
add_log(self.agent_id, f"주간 리포트 조회 실패: {e}", level="error")
|
||||
logger.error("send_weekly_report: music-lab 조회 실패: %s", e)
|
||||
return
|
||||
|
||||
top = report.get("top_genres", [])[:3]
|
||||
insights = report.get("insights", "")
|
||||
text = "📊 *YouTube 시장 주간 리포트*\n\n🔥 인기 장르:\n"
|
||||
for g in top:
|
||||
text += f" • {g['genre']} (score: {g['score']:.2f})\n"
|
||||
if insights:
|
||||
text += f"\n💡 {insights[:300]}"
|
||||
|
||||
try:
|
||||
from ..telegram_bot import send_message
|
||||
await send_message(text)
|
||||
except (ImportError, Exception) as e:
|
||||
add_log(self.agent_id, f"주간 리포트 텔레그램 발송 실패: {e}", level="error")
|
||||
logger.error("send_weekly_report: 텔레그램 발송 실패: %s", e)
|
||||
@@ -86,6 +86,17 @@ def init_db() -> None:
|
||||
CREATE INDEX IF NOT EXISTS idx_conv_chat
|
||||
ON conversation_messages(chat_id, created_at DESC)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS youtube_research_jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
status TEXT NOT NULL DEFAULT 'running',
|
||||
countries TEXT NOT NULL DEFAULT '[]',
|
||||
trends_collected INTEGER NOT NULL DEFAULT 0,
|
||||
error TEXT,
|
||||
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
||||
completed_at TEXT
|
||||
)
|
||||
""")
|
||||
# Seed default agent configs
|
||||
for agent_id, name in [
|
||||
("stock", "주식 트레이더"),
|
||||
@@ -93,6 +104,7 @@ def init_db() -> None:
|
||||
("blog", "블로그 마케터"),
|
||||
("realestate", "청약 애널리스트"),
|
||||
("lotto", "로또 큐레이터"),
|
||||
("youtube", "YouTube 리서치"),
|
||||
]:
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)",
|
||||
@@ -501,3 +513,45 @@ def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
|
||||
items.append(item)
|
||||
|
||||
return {"items": items, "total": total}
|
||||
|
||||
|
||||
# ── youtube_research_jobs CRUD ────────────────────────────────────────────────
|
||||
|
||||
def add_youtube_research_job(countries: list) -> int:
|
||||
with _conn() as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO youtube_research_jobs (countries) VALUES (?)",
|
||||
(json.dumps(countries),),
|
||||
)
|
||||
return conn.execute("SELECT last_insert_rowid()").fetchone()[0]
|
||||
|
||||
|
||||
def update_youtube_research_job(
|
||||
job_id: int, status: str, trends_collected: int, error: Optional[str] = None
|
||||
) -> None:
|
||||
with _conn() as conn:
|
||||
conn.execute(
|
||||
"""UPDATE youtube_research_jobs
|
||||
SET status=?, trends_collected=?, error=?,
|
||||
completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
|
||||
WHERE id=?""",
|
||||
(status, trends_collected, error, job_id),
|
||||
)
|
||||
|
||||
|
||||
def get_latest_youtube_research_job() -> Optional[Dict[str, Any]]:
|
||||
with _conn() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT * FROM youtube_research_jobs ORDER BY id DESC LIMIT 1"
|
||||
).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {
|
||||
"id": row["id"],
|
||||
"status": row["status"],
|
||||
"countries": json.loads(row["countries"]),
|
||||
"trends_collected": row["trends_collected"],
|
||||
"error": row["error"],
|
||||
"started_at": row["started_at"],
|
||||
"completed_at": row["completed_at"],
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from .config import CORS_ALLOW_ORIGINS
|
||||
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed
|
||||
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed, get_latest_youtube_research_job
|
||||
from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate
|
||||
from .websocket_manager import ws_manager
|
||||
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
|
||||
@@ -185,7 +185,7 @@ def activity_feed(limit: int = 50, offset: int = 0):
|
||||
# --- Realestate Agent Push Endpoint ---
|
||||
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Dict, Any
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
|
||||
class RealestateNotifyBody(BaseModel):
|
||||
@@ -199,3 +199,29 @@ async def realestate_notify(body: RealestateNotifyBody):
|
||||
from fastapi import HTTPException
|
||||
raise HTTPException(status_code=503, detail="RealestateAgent not initialized")
|
||||
return await agent.on_new_matches(body.matches)
|
||||
|
||||
|
||||
# --- YouTube Research Agent Endpoints ---
|
||||
|
||||
class YouTubeResearchBody(BaseModel):
|
||||
countries: List[str] = []
|
||||
|
||||
|
||||
@app.post("/api/agent-office/youtube/research")
|
||||
async def trigger_youtube_research(body: Optional[YouTubeResearchBody] = None):
|
||||
agent = get_agent("youtube")
|
||||
if not agent:
|
||||
raise HTTPException(status_code=503, detail="YouTubeResearchAgent 없음")
|
||||
params = {}
|
||||
if body and body.countries:
|
||||
params["countries"] = body.countries
|
||||
result = await agent.on_command("research", params)
|
||||
return result
|
||||
|
||||
|
||||
@app.get("/api/agent-office/youtube/research/status")
|
||||
def youtube_research_status():
|
||||
job = get_latest_youtube_research_job()
|
||||
if not job:
|
||||
return {"status": "never_run"}
|
||||
return job
|
||||
|
||||
@@ -24,9 +24,21 @@ async def _run_lotto_schedule():
|
||||
if agent:
|
||||
await agent.on_schedule()
|
||||
|
||||
async def _run_youtube_research():
|
||||
agent = AGENT_REGISTRY.get("youtube")
|
||||
if agent:
|
||||
await agent.on_schedule()
|
||||
|
||||
async def _send_youtube_weekly_report():
|
||||
agent = AGENT_REGISTRY.get("youtube")
|
||||
if agent:
|
||||
await agent.send_weekly_report()
|
||||
|
||||
def init_scheduler():
|
||||
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
|
||||
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
|
||||
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate")
|
||||
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")
|
||||
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
|
||||
scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check")
|
||||
scheduler.start()
|
||||
|
||||
142
agent-office/app/youtube_researcher.py
Normal file
142
agent-office/app/youtube_researcher.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import os
|
||||
import re
|
||||
import asyncio
|
||||
from typing import List, Dict, Any
|
||||
|
||||
import httpx
|
||||
|
||||
YOUTUBE_DATA_API_KEY = os.getenv("YOUTUBE_DATA_API_KEY", "")
|
||||
MUSIC_LAB_URL = os.getenv("MUSIC_LAB_URL", "http://music-lab:8000")
|
||||
TARGET_COUNTRIES = ["BR", "ID", "MX", "US", "KR"]
|
||||
TREND_KEYWORDS = ["lofi music", "phonk", "ambient music", "chill beats", "study music"]
|
||||
YOUTUBE_MUSIC_CAT = "10"
|
||||
|
||||
GENRE_TAGS = {
|
||||
"lo-fi": ["lofi", "lo-fi", "lo fi", "chill", "study"],
|
||||
"phonk": ["phonk", "drift", "memphis"],
|
||||
"ambient": ["ambient", "relaxing", "meditation"],
|
||||
"pop": ["pop", "kpop", "k-pop"],
|
||||
"funk": ["funk", "baile funk"],
|
||||
"latin": ["latin", "reggaeton", "sertanejo"],
|
||||
}
|
||||
|
||||
|
||||
def _tags_to_genre(tags: list) -> str:
|
||||
joined = " ".join(t.lower() for t in tags)
|
||||
for genre, kws in GENRE_TAGS.items():
|
||||
if any(kw in joined for kw in kws):
|
||||
return genre
|
||||
return "general"
|
||||
|
||||
|
||||
async def fetch_youtube_trending(country: str, max_results: int = 50) -> List[Dict[str, Any]]:
|
||||
"""YouTube Data API v3 — 국가별 트렌딩 음악 영상 (categoryId=10)."""
|
||||
if not YOUTUBE_DATA_API_KEY:
|
||||
return []
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
try:
|
||||
resp = await client.get(
|
||||
"https://www.googleapis.com/youtube/v3/videos",
|
||||
params={
|
||||
"part": "snippet,statistics",
|
||||
"chart": "mostPopular",
|
||||
"regionCode": country,
|
||||
"videoCategoryId": YOUTUBE_MUSIC_CAT,
|
||||
"maxResults": max_results,
|
||||
"key": YOUTUBE_DATA_API_KEY,
|
||||
},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return []
|
||||
items = resp.json().get("items", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
results = []
|
||||
for i, item in enumerate(items):
|
||||
snippet = item.get("snippet", {})
|
||||
stats = item.get("statistics", {})
|
||||
genre = _tags_to_genre(snippet.get("tags") or [])
|
||||
results.append({
|
||||
"source": "youtube",
|
||||
"country": country,
|
||||
"genre": genre,
|
||||
"keyword": snippet.get("title", "")[:100],
|
||||
"score": round(1.0 - i / max_results, 3),
|
||||
"rank": i + 1,
|
||||
"metadata": {
|
||||
"video_id": item["id"],
|
||||
"view_count": int(stats.get("viewCount", 0)),
|
||||
"channel": snippet.get("channelTitle", ""),
|
||||
},
|
||||
})
|
||||
return results
|
||||
|
||||
|
||||
async def fetch_google_trends(keywords: List[str], countries: List[str]) -> List[Dict[str, Any]]:
|
||||
"""pytrends — 키워드별 Google 관심도 (sync → threadpool)."""
|
||||
try:
|
||||
from pytrends.request import TrendReq
|
||||
except ImportError:
|
||||
return []
|
||||
|
||||
def _sync_fetch(kw: str) -> List[Dict[str, Any]]:
|
||||
try:
|
||||
pt = TrendReq(hl="en-US", tz=0, timeout=(5, 15))
|
||||
pt.build_payload([kw], timeframe="now 7-d")
|
||||
df = pt.interest_over_time()
|
||||
if df.empty or kw not in df.columns:
|
||||
return []
|
||||
score = round(float(df[kw].mean()) / 100.0, 3)
|
||||
return [
|
||||
{"source": "google_trends", "country": c, "genre": "",
|
||||
"keyword": kw, "score": score, "rank": None, "metadata": {}}
|
||||
for c in countries
|
||||
]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
results = []
|
||||
for kw in keywords[:5]:
|
||||
rows = await loop.run_in_executor(None, _sync_fetch, kw)
|
||||
results.extend(rows)
|
||||
await asyncio.sleep(1.0)
|
||||
return results
|
||||
|
||||
|
||||
async def fetch_billboard_top20() -> List[Dict[str, Any]]:
|
||||
"""Billboard Hot 100 스크래핑 — 상위 20위."""
|
||||
async with httpx.AsyncClient(
|
||||
timeout=10.0,
|
||||
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"},
|
||||
follow_redirects=True,
|
||||
) as client:
|
||||
try:
|
||||
resp = await client.get("https://www.billboard.com/charts/hot-100/")
|
||||
if resp.status_code != 200:
|
||||
return []
|
||||
titles = re.findall(
|
||||
r'class="c-title[^"]*"[^>]*>\s*([^<\n]{3,80})\s*<', resp.text
|
||||
)[:20]
|
||||
return [
|
||||
{"source": "billboard", "country": "US", "genre": "pop",
|
||||
"keyword": t.strip(), "score": round(1.0 - i / 20, 3),
|
||||
"rank": i + 1, "metadata": {}}
|
||||
for i, t in enumerate(titles) if t.strip()
|
||||
]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
async def push_to_music_lab(trends: List[Dict[str, Any]], report_date: str) -> bool:
|
||||
"""수집한 트렌드를 music-lab /api/music/market/ingest로 push."""
|
||||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{MUSIC_LAB_URL}/api/music/market/ingest",
|
||||
json={"trends": trends, "report_date": report_date},
|
||||
)
|
||||
return resp.status_code == 200
|
||||
except Exception:
|
||||
return False
|
||||
Reference in New Issue
Block a user