17 Commits

Author SHA1 Message Date
be0094b83f feat: YouTube 수익화 파이프라인 — 영상 제작·수익 추적·시장 조사 에이전트
- music-lab: FFmpeg 비주얼라이저·슬라이드쇼 영상 제작 (video_producer.py)
- music-lab: 영상 프로젝트 6개 + 수익화 5개 + 시장조사 5개 API 추가
- music-lab: video_projects·revenue_records·market_trends·trend_reports DB 추가
- agent-office: YouTubeResearchAgent (YouTube API·pytrends·Billboard 수집, 09:00 daily)
- agent-office: youtube_researcher.py + /youtube/research API 2개 추가
- infra: Dockerfile ffmpeg + Nginx /media/videos/ + docker-compose 볼륨·환경변수
2026-05-01 12:50:38 +09:00
e948393906 docs(CLAUDE.md): YouTube 수익화 기능 — API·DB·환경변수·스케줄러 문서 업데이트 2026-05-01 12:37:30 +09:00
0beceefeef fix(deploy): docker-compose에 PEXELS_API_KEY·YOUTUBE_DATA_API_KEY·VIDEO_DATA_DIR 환경변수 추가 2026-05-01 12:35:44 +09:00
355667cf9c fix(music-lab): market API 타입 강화·ANTHROPIC_API_KEY call-time·HTTP 레이어 테스트 추가 2026-05-01 12:32:24 +09:00
26b9eea0dc feat(music-lab): market_trends·trend_reports DB + market.py + /api/music/market 5개 API
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 12:26:37 +09:00
3b9dcfe0dd fix(agent-office): YouTubeResearchAgent 품질 개선 (동시실행 가드·에러 로깅·타입 수정)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 12:22:33 +09:00
1d4354e402 feat(agent-office): YouTubeResearchAgent + 스케줄러 + /youtube/research API
- db.py: youtube_research_jobs 테이블 추가 + CRUD 3종 (add/update/get_latest)
- agents/youtube.py: YouTubeResearchAgent 신규 구현 (on_schedule/on_command/on_approval/_run_research/send_weekly_report)
- agents/__init__.py: YouTubeResearchAgent 등록
- scheduler.py: youtube_research(매일 09:00) + youtube_weekly_report(월 08:00) cron 추가
- main.py: POST /api/agent-office/youtube/research + GET /api/agent-office/youtube/research/status 엔드포인트 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 12:13:12 +09:00
8604c6292d fix(agent-office): get_running_loop + pytrends timeout + UA 수정
- asyncio.get_event_loop() → asyncio.get_running_loop() (python 3.10+ 권장)
- TrendReq에 timeout=(5, 15) 추가 (connect, read timeout)
- User-Agent에서 'bot' 제거: 표준 Chrome UA로 변경

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 12:10:09 +09:00
21666f4372 feat(agent-office): youtube_researcher — YouTube API·pytrends·Billboard 수집
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 12:05:58 +09:00
f83b900320 fix: frontend 서비스에 /data/videos 볼륨 마운트 추가 2026-05-01 12:04:32 +09:00
a7b2fc0d9d chore: FFmpeg 설치 + Nginx /media/videos/ + docker-compose volumes + 환경변수 2026-05-01 12:03:19 +09:00
327d0b4e81 fix(music-lab): VIDEO_DATA_DIR 기본값 통일 + lazy import 정리
- VIDEO_DATA_DIR 기본값을 /app/data/videos로 수정 (기존 /app/data에 videos 서브디렉토리를 중복 붙이던 버그 수정)
- delete_project, export_project의 경로에서 중복된 "videos" 서브디렉토리 제거
- create_project 내부의 get_track_by_id lazy import를 파일 상단 import 블록으로 이동
2026-05-01 12:01:59 +09:00
8e7a3806c5 feat(music-lab): 영상 프로젝트 6개 + 수익화 5개 API 추가
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 11:59:11 +09:00
abf475433b fix(music-lab): xfade offset 누적 오차 수정 + 테스트 보강
- _build_slideshow_cmd: offset 공식을 `duration_per_image * i - xd * i`로 수정 (누적 전환 오차 제거)
- _generate_metadata: genre 빈 문자열일 때 yt_tags에 빈 문자열 삽입 방지
- test: VIDEO_DATA_DIR 패치를 monkeypatch로 교체 (자동 복원 보장)
- test: xfade offset 값 검증 테스트 추가 (29.00, 58.00)
- test: 미사용 import 제거 (pytest, sqlite3)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 11:56:41 +09:00
7336fd090e feat(music-lab): video_producer — FFmpeg 비주얼라이저·슬라이드쇼 + Claude 메타데이터
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 11:49:42 +09:00
62d79b2669 fix(music-lab): revenue avg_rpm 공식 수정 + UNIQUE 제약 + 테스트 보강
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 11:46:00 +09:00
a5495aeaa4 feat(music-lab): video_projects·revenue_records DB 마이그레이션 + CRUD
- init_db()에 video_projects, revenue_records 테이블 추가 (CREATE IF NOT EXISTS)
- video_projects CRUD: create/get/get_all/update_status/delete + get_track_by_id
- revenue_records CRUD: create/get_all/update/delete/get_revenue_dashboard (RPM 자동 계산)
- TDD: tests/test_db_video.py 5개 테스트 모두 PASSED
- pytest.ini 추가 (pythonpath=. 설정)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 11:41:07 +09:00
22 changed files with 1670 additions and 8 deletions

View File

@@ -88,3 +88,8 @@ AGENT_OFFICE_URL=http://agent-office:8000
REALESTATE_LAB_URL=http://realestate-lab:8000 REALESTATE_LAB_URL=http://realestate-lab:8000
REALESTATE_DASHBOARD_URL=http://localhost:8080/realestate REALESTATE_DASHBOARD_URL=http://localhost:8080/realestate
REALESTATE_NOTIFY_TIMEOUT=15 REALESTATE_NOTIFY_TIMEOUT=15
# [MUSIC LAB — YouTube Video Generation]
PEXELS_API_KEY=
YOUTUBE_DATA_API_KEY=
# VIDEO_DATA_DIR=/app/data/videos # 기본값, 재정의 필요 시만 설정

View File

@@ -84,6 +84,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
| `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket | | `/api/agent-office/` | `agent-office:8000` | AI 에이전트 오피스 API + WebSocket |
| `/webhook`, `/webhook/` | `deployer:9000` | Gitea Webhook | | `/webhook`, `/webhook/` | `deployer:9000` | Gitea Webhook |
| `/media/music/` | `/data/music/` (파일 직접 서빙) | 생성된 오디오 파일 | | `/media/music/` | `/data/music/` (파일 직접 서빙) | 생성된 오디오 파일 |
| `/media/videos/` | `/data/videos/` (파일 직접 서빙) | YouTube 영상 MP4 |
| `/media/travel/.thumb/` | `/data/thumbs/` (파일 직접 서빙) | 썸네일 캐시 | | `/media/travel/.thumb/` | `/data/thumbs/` (파일 직접 서빙) | 썸네일 캐시 |
| `/media/travel/` | `/data/travel/` (파일 직접 서빙) | 원본 사진 | | `/media/travel/` | `/data/travel/` (파일 직접 서빙) | 원본 사진 |
| `/assets/` | 정적 파일 (장기 캐시) | Vite 해시 파일 | | `/assets/` | 정적 파일 (장기 캐시) | Vite 해시 파일 |
@@ -249,10 +250,11 @@ docker compose up -d
- 15:40 평일 — 총 자산 스냅샷 저장 (`save_daily_snapshot`) - 15:40 평일 — 총 자산 스냅샷 저장 (`save_daily_snapshot`)
### music-lab (music-lab/) ### music-lab (music-lab/)
- 듀얼 프로바이더 음악 생성 서비스 (Suno API + 로컬 MusicGen) - 듀얼 프로바이더 음악 생성 서비스 (Suno API + 로컬 MusicGen) + YouTube 영상 제작 + 시장 조사 트렌드
- 생성된 오디오 파일: `/app/data/music/` (Nginx가 `/media/music/`로 직접 서빙) - 생성된 오디오 파일: `/app/data/music/` (Nginx가 `/media/music/`로 직접 서빙)
- DB: `/app/data/music.db` (music_tasks, music_library 테이블) - 생성된 영상 파일: `/app/data/videos/` (Nginx가 `/media/videos/`로 직접 서빙)
- 파일 구조: `main.py`, `db.py`, `suno_provider.py`, `local_provider.py` - DB: `/app/data/music.db` (music_tasks, music_library, video_projects, revenue_records, market_trends, trend_reports 테이블)
- 파일 구조: `main.py`, `db.py`, `suno_provider.py`, `local_provider.py`, `video_producer.py`, `market.py`
- 생성 흐름: POST generate (provider 지정) → task_id 반환 → BackgroundTask → 파일 저장 → 라이브러리 자동 등록 - 생성 흐름: POST generate (provider 지정) → task_id 반환 → BackgroundTask → 파일 저장 → 라이브러리 자동 등록
**Provider 구조** **Provider 구조**
@@ -288,12 +290,51 @@ docker compose up -d
| POST | `/api/music/lyrics/library` | 가사 저장 | | POST | `/api/music/lyrics/library` | 가사 저장 |
| PUT | `/api/music/lyrics/library/{id}` | 가사 수정 | | PUT | `/api/music/lyrics/library/{id}` | 가사 수정 |
| DELETE | `/api/music/lyrics/library/{id}` | 가사 삭제 | | DELETE | `/api/music/lyrics/library/{id}` | 가사 삭제 |
| POST | `/api/music/video-project` | 영상 프로젝트 생성 (track_id, format, target_countries) |
| GET | `/api/music/video-projects` | 영상 프로젝트 목록 |
| GET | `/api/music/video-project/{id}` | 영상 프로젝트 상세 |
| POST | `/api/music/video-project/{id}/render` | FFmpeg 렌더링 시작 (BackgroundTask) |
| GET | `/api/music/video-project/{id}/export` | 내보내기 패키지 (mp4+thumbnail+metadata.json) |
| DELETE | `/api/music/video-project/{id}` | 영상 프로젝트 삭제 |
| GET | `/api/music/revenue/dashboard` | 수익 대시보드 (총수익·조회수·가중평균 RPM) |
| GET | `/api/music/revenue` | 수익 기록 목록 |
| POST | `/api/music/revenue` | 수익 기록 추가 (UNIQUE: yt_video_id+record_month+country) |
| PUT | `/api/music/revenue/{id}` | 수익 기록 수정 |
| DELETE | `/api/music/revenue/{id}` | 수익 기록 삭제 |
| POST | `/api/music/market/ingest` | agent-office 트렌드 수신 + 리포트 생성 |
| GET | `/api/music/market/trends` | 트렌드 조회 (country, genre, source, days=7) |
| GET | `/api/music/market/report/latest` | 최신 트렌드 리포트 |
| GET | `/api/music/market/report` | 트렌드 리포트 목록 (limit=10) |
| GET | `/api/music/market/suggest` | Suno 프롬프트 추천 (limit=5) |
**환경변수** **환경변수**
- `SUNO_API_KEY`: Suno API 키 (미설정 시 Suno provider 비활성화) - `SUNO_API_KEY`: Suno API 키 (미설정 시 Suno provider 비활성화)
- `MUSIC_AI_SERVER_URL`: 로컬 MusicGen 서버 URL (미설정 시 local provider 비활성화) - `MUSIC_AI_SERVER_URL`: 로컬 MusicGen 서버 URL (미설정 시 local provider 비활성화)
- `MUSIC_MEDIA_BASE`: 오디오 파일 공개 URL prefix (기본 `/media/music`) - `MUSIC_MEDIA_BASE`: 오디오 파일 공개 URL prefix (기본 `/media/music`)
- `MUSIC_DATA_PATH`: NAS 오디오 파일 저장 경로 (기본 `./data/music`) - `MUSIC_DATA_PATH`: NAS 오디오 파일 저장 경로 (기본 `./data/music`)
- `PEXELS_API_KEY`: Pexels 스톡 이미지 API 키 (미설정 시 슬라이드쇼 Pexels 이미지 비활성화)
- `ANTHROPIC_API_KEY`: Claude Haiku — YouTube 메타데이터 생성 + 시장 인사이트 (미설정 시 폴백 텍스트)
- `VIDEO_DATA_DIR`: 영상 파일 저장 경로 (기본 `/app/data/videos`)
**video_projects 테이블**
- format: `visualizer` | `slideshow`
- status: `pending``rendering``done` | `failed`
- target_countries: JSON 배열 (예: `["BR","US"]`)
- render_params: JSON 객체 (FFmpeg 파라미터 캐시)
**revenue_records 테이블**
- UNIQUE(yt_video_id, record_month, country)
- avg_rpm 계산: 가중평균 `SUM(revenue_usd)/SUM(views)*1000` (단순 AVG 아님)
**market_trends 테이블**
- source: `youtube` | `google_trends` | `billboard`
- metadata: JSON 객체 (원본 API 응답 부분)
- 인덱스: `idx_mt_country_source` ON (country, source, collected_at DESC)
**trend_reports 테이블**
- report_date UNIQUE — 같은 날 두 번 ingest 시 upsert
- top_genres: JSON 배열 `[{genre, score, countries}]` (최대 10개, score 내림차순)
- recommended_styles: JSON 배열 `[{genre, suno_prompt, target_countries, reason}]` (최대 5개)
**music_library 테이블 (확장 컬럼)** **music_library 테이블 (확장 컬럼)**
- `provider`: `suno` | `local` — 생성에 사용된 프로바이더 - `provider`: `suno` | `local` — 생성에 사용된 프로바이더
@@ -492,6 +533,16 @@ docker compose up -d
- `CONVERSATION_RATE_PER_MIN`: 채팅당 분당 최대 메시지 (기본 6) - `CONVERSATION_RATE_PER_MIN`: 채팅당 분당 최대 메시지 (기본 6)
- `LOTTO_BACKEND_URL`: 기본 `http://lotto:8000` - `LOTTO_BACKEND_URL`: 기본 `http://lotto:8000`
- `LOTTO_CURATOR_MODEL`: 기본 `claude-sonnet-4-5` - `LOTTO_CURATOR_MODEL`: 기본 `claude-sonnet-4-5`
- `YOUTUBE_DATA_API_KEY`: YouTube Data API v3 키 (미설정 시 YouTube trending 수집 skip)
**YouTubeResearchAgent (`agents/youtube.py`)**
- `agent_id = "youtube"` — AGENT_REGISTRY에 등록
- 09:00 매일 `on_schedule()` → 국가별 YouTube 트렌딩 + Google Trends + Billboard Top20 수집 → music-lab push
- `on_command("research", {countries: []})` → 수동 트리거 (백그라운드 asyncio.create_task)
- 수집 소스: `youtube_researcher.py` (fetch_youtube_trending, fetch_google_trends, fetch_billboard_top20)
- DB: `youtube_research_jobs` 테이블에 실행 이력 기록
- 동시실행 방지: `self.state == "working"` 체크 후 거부
- 월요일 08:00 `send_weekly_report()` → music-lab 최신 리포트 → 텔레그램 발송
**텔레그램 자연어 대화 (옵션 B)** **텔레그램 자연어 대화 (옵션 B)**
- 슬래시 명령이 아닌 일반 문장을 보내면 Claude Haiku 4.5가 응답 - 슬래시 명령이 아닌 일반 문장을 보내면 Claude Haiku 4.5가 응답
@@ -505,6 +556,8 @@ docker compose up -d
- 매주 월요일 07:00 — 로또 큐레이터 브리핑 (`lotto_curate`) - 매주 월요일 07:00 — 로또 큐레이터 브리핑 (`lotto_curate`)
- 60초 간격 — 유휴 에이전트 휴식 체크 (`idle_check_job`) - 60초 간격 — 유휴 에이전트 휴식 체크 (`idle_check_job`)
- ~~09:15 매일 — 청약 매칭 데일리 리포트~~ (Task 2026-04-28에서 폐기. realestate-lab의 push 트리거로 전환) - ~~09:15 매일 — 청약 매칭 데일리 리포트~~ (Task 2026-04-28에서 폐기. realestate-lab의 push 트리거로 전환)
- 09:00 매일 — YouTube 트렌드 수집 (`youtube_research`) → music-lab `/api/music/market/ingest` push
- 매주 월요일 08:00 — YouTube 주간 리포트 텔레그램 발송 (`youtube_weekly_report`)
**RealestateAgent (`agents/realestate.py`)** **RealestateAgent (`agents/realestate.py`)**
- 진입점: `on_new_matches(matches: list[dict]) -> {sent, sent_ids, message_id}` - 진입점: `on_new_matches(matches: list[dict]) -> {sent, sent_ids, message_id}`
@@ -534,6 +587,8 @@ docker compose up -d
| POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 송신 | | POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 송신 |
| GET | `/api/agent-office/states` | 전체 에이전트 상태 조회 | | GET | `/api/agent-office/states` | 전체 에이전트 상태 조회 |
| GET | `/api/agent-office/conversation/stats` | 텔레그램 자연어 대화 토큰·캐시 통계 (`days` 필터) | | GET | `/api/agent-office/conversation/stats` | 텔레그램 자연어 대화 토큰·캐시 통계 (`days` 필터) |
| POST | `/api/agent-office/youtube/research` | YouTube 트렌드 수집 수동 트리거 (body: `{countries: []}`) |
| GET | `/api/agent-office/youtube/research/status` | 마지막 수집 작업 상태 |
### personal (personal/) ### personal (personal/)
- 개인 서비스 (포트폴리오 + 블로그 + 투두 통합) - 개인 서비스 (포트폴리오 + 블로그 + 투두 통합)

View File

@@ -3,6 +3,7 @@ from .music import MusicAgent
from .blog import BlogAgent from .blog import BlogAgent
from .realestate import RealestateAgent from .realestate import RealestateAgent
from .lotto import LottoAgent from .lotto import LottoAgent
from .youtube import YouTubeResearchAgent
AGENT_REGISTRY = {} AGENT_REGISTRY = {}
@@ -12,6 +13,7 @@ def init_agents():
AGENT_REGISTRY["blog"] = BlogAgent() AGENT_REGISTRY["blog"] = BlogAgent()
AGENT_REGISTRY["realestate"] = RealestateAgent() AGENT_REGISTRY["realestate"] = RealestateAgent()
AGENT_REGISTRY["lotto"] = LottoAgent() AGENT_REGISTRY["lotto"] = LottoAgent()
AGENT_REGISTRY["youtube"] = YouTubeResearchAgent()
def get_agent(agent_id: str): def get_agent(agent_id: str):
return AGENT_REGISTRY.get(agent_id) return AGENT_REGISTRY.get(agent_id)

View File

@@ -0,0 +1,93 @@
# agent-office/app/agents/youtube.py
import asyncio
import logging
from datetime import date
import httpx
from .base import BaseAgent
from ..db import add_youtube_research_job, update_youtube_research_job, add_log
from ..youtube_researcher import (
TARGET_COUNTRIES, TREND_KEYWORDS, MUSIC_LAB_URL,
fetch_youtube_trending, fetch_google_trends, fetch_billboard_top20,
push_to_music_lab,
)
logger = logging.getLogger(__name__)
class YouTubeResearchAgent(BaseAgent):
agent_id = "youtube"
display_name = "YouTube 리서치"
async def on_schedule(self) -> None:
await self._run_research(TARGET_COUNTRIES)
async def on_command(self, command: str, params: dict) -> dict:
if command == "research":
if self.state == "working":
return {"ok": False, "message": "이미 수집 중"}
countries = params.get("countries", TARGET_COUNTRIES)
asyncio.create_task(self._run_research(countries))
return {"ok": True, "message": f"리서치 시작: {countries}"}
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
pass
async def _run_research(self, countries: list) -> None:
job_id = add_youtube_research_job(countries)
await self.transition("working", f"트렌드 수집 중 ({','.join(countries)})", str(job_id))
all_trends = []
try:
for country in countries:
trends = await fetch_youtube_trending(country)
all_trends.extend(trends)
gt = await fetch_google_trends(TREND_KEYWORDS, countries)
all_trends.extend(gt)
bb = await fetch_billboard_top20()
all_trends.extend(bb)
ok = await push_to_music_lab(all_trends, date.today().isoformat())
if not ok:
raise RuntimeError("music-lab push 실패")
update_youtube_research_job(job_id, "completed", len(all_trends))
await self.transition("reporting", f"수집 완료: {len(all_trends)}", str(job_id))
except Exception as e:
update_youtube_research_job(job_id, "failed", len(all_trends), str(e))
await self.transition("idle", f"수집 실패: {e}")
return
await self.transition("idle", "리서치 완료")
async def send_weekly_report(self) -> None:
"""매주 월요일 08:00 — 주간 인사이트 텔레그램 발송."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/market/report/latest")
if resp.status_code != 200:
return
report = resp.json()
except Exception as e:
add_log(self.agent_id, f"주간 리포트 조회 실패: {e}", level="error")
logger.error("send_weekly_report: music-lab 조회 실패: %s", e)
return
top = report.get("top_genres", [])[:3]
insights = report.get("insights", "")
text = "📊 *YouTube 시장 주간 리포트*\n\n🔥 인기 장르:\n"
for g in top:
text += f"{g['genre']} (score: {g['score']:.2f})\n"
if insights:
text += f"\n💡 {insights[:300]}"
try:
from ..telegram_bot import send_message
await send_message(text)
except (ImportError, Exception) as e:
add_log(self.agent_id, f"주간 리포트 텔레그램 발송 실패: {e}", level="error")
logger.error("send_weekly_report: 텔레그램 발송 실패: %s", e)

View File

@@ -86,6 +86,17 @@ def init_db() -> None:
CREATE INDEX IF NOT EXISTS idx_conv_chat CREATE INDEX IF NOT EXISTS idx_conv_chat
ON conversation_messages(chat_id, created_at DESC) ON conversation_messages(chat_id, created_at DESC)
""") """)
conn.execute("""
CREATE TABLE IF NOT EXISTS youtube_research_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
status TEXT NOT NULL DEFAULT 'running',
countries TEXT NOT NULL DEFAULT '[]',
trends_collected INTEGER NOT NULL DEFAULT 0,
error TEXT,
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
completed_at TEXT
)
""")
# Seed default agent configs # Seed default agent configs
for agent_id, name in [ for agent_id, name in [
("stock", "주식 트레이더"), ("stock", "주식 트레이더"),
@@ -93,6 +104,7 @@ def init_db() -> None:
("blog", "블로그 마케터"), ("blog", "블로그 마케터"),
("realestate", "청약 애널리스트"), ("realestate", "청약 애널리스트"),
("lotto", "로또 큐레이터"), ("lotto", "로또 큐레이터"),
("youtube", "YouTube 리서치"),
]: ]:
conn.execute( conn.execute(
"INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)", "INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)",
@@ -501,3 +513,45 @@ def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
items.append(item) items.append(item)
return {"items": items, "total": total} return {"items": items, "total": total}
# ── youtube_research_jobs CRUD ────────────────────────────────────────────────
def add_youtube_research_job(countries: list) -> int:
with _conn() as conn:
conn.execute(
"INSERT INTO youtube_research_jobs (countries) VALUES (?)",
(json.dumps(countries),),
)
return conn.execute("SELECT last_insert_rowid()").fetchone()[0]
def update_youtube_research_job(
job_id: int, status: str, trends_collected: int, error: Optional[str] = None
) -> None:
with _conn() as conn:
conn.execute(
"""UPDATE youtube_research_jobs
SET status=?, trends_collected=?, error=?,
completed_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id=?""",
(status, trends_collected, error, job_id),
)
def get_latest_youtube_research_job() -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM youtube_research_jobs ORDER BY id DESC LIMIT 1"
).fetchone()
if not row:
return None
return {
"id": row["id"],
"status": row["status"],
"countries": json.loads(row["countries"]),
"trends_collected": row["trends_collected"],
"error": row["error"],
"started_at": row["started_at"],
"completed_at": row["completed_at"],
}

View File

@@ -4,7 +4,7 @@ from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from .config import CORS_ALLOW_ORIGINS from .config import CORS_ALLOW_ORIGINS
from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed from .db import init_db, get_all_agents, get_agent_config, update_agent_config, get_agent_tasks, get_pending_approvals, get_task, get_logs, get_activity_feed, get_latest_youtube_research_job
from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate from .models import CommandRequest, ApprovalRequest, AgentConfigUpdate
from .websocket_manager import ws_manager from .websocket_manager import ws_manager
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
@@ -185,7 +185,7 @@ def activity_feed(limit: int = 50, offset: int = 0):
# --- Realestate Agent Push Endpoint --- # --- Realestate Agent Push Endpoint ---
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Dict, Any from typing import List, Dict, Any, Optional
class RealestateNotifyBody(BaseModel): class RealestateNotifyBody(BaseModel):
@@ -199,3 +199,29 @@ async def realestate_notify(body: RealestateNotifyBody):
from fastapi import HTTPException from fastapi import HTTPException
raise HTTPException(status_code=503, detail="RealestateAgent not initialized") raise HTTPException(status_code=503, detail="RealestateAgent not initialized")
return await agent.on_new_matches(body.matches) return await agent.on_new_matches(body.matches)
# --- YouTube Research Agent Endpoints ---
class YouTubeResearchBody(BaseModel):
countries: List[str] = []
@app.post("/api/agent-office/youtube/research")
async def trigger_youtube_research(body: Optional[YouTubeResearchBody] = None):
agent = get_agent("youtube")
if not agent:
raise HTTPException(status_code=503, detail="YouTubeResearchAgent 없음")
params = {}
if body and body.countries:
params["countries"] = body.countries
result = await agent.on_command("research", params)
return result
@app.get("/api/agent-office/youtube/research/status")
def youtube_research_status():
job = get_latest_youtube_research_job()
if not job:
return {"status": "never_run"}
return job

View File

@@ -24,9 +24,21 @@ async def _run_lotto_schedule():
if agent: if agent:
await agent.on_schedule() await agent.on_schedule()
async def _run_youtube_research():
agent = AGENT_REGISTRY.get("youtube")
if agent:
await agent.on_schedule()
async def _send_youtube_weekly_report():
agent = AGENT_REGISTRY.get("youtube")
if agent:
await agent.send_weekly_report()
def init_scheduler(): def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news") scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline") scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate") scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check") scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check")
scheduler.start() scheduler.start()

View File

@@ -0,0 +1,142 @@
import os
import re
import asyncio
from typing import List, Dict, Any
import httpx
YOUTUBE_DATA_API_KEY = os.getenv("YOUTUBE_DATA_API_KEY", "")
MUSIC_LAB_URL = os.getenv("MUSIC_LAB_URL", "http://music-lab:8000")
TARGET_COUNTRIES = ["BR", "ID", "MX", "US", "KR"]
TREND_KEYWORDS = ["lofi music", "phonk", "ambient music", "chill beats", "study music"]
YOUTUBE_MUSIC_CAT = "10"
GENRE_TAGS = {
"lo-fi": ["lofi", "lo-fi", "lo fi", "chill", "study"],
"phonk": ["phonk", "drift", "memphis"],
"ambient": ["ambient", "relaxing", "meditation"],
"pop": ["pop", "kpop", "k-pop"],
"funk": ["funk", "baile funk"],
"latin": ["latin", "reggaeton", "sertanejo"],
}
def _tags_to_genre(tags: list) -> str:
joined = " ".join(t.lower() for t in tags)
for genre, kws in GENRE_TAGS.items():
if any(kw in joined for kw in kws):
return genre
return "general"
async def fetch_youtube_trending(country: str, max_results: int = 50) -> List[Dict[str, Any]]:
"""YouTube Data API v3 — 국가별 트렌딩 음악 영상 (categoryId=10)."""
if not YOUTUBE_DATA_API_KEY:
return []
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
"https://www.googleapis.com/youtube/v3/videos",
params={
"part": "snippet,statistics",
"chart": "mostPopular",
"regionCode": country,
"videoCategoryId": YOUTUBE_MUSIC_CAT,
"maxResults": max_results,
"key": YOUTUBE_DATA_API_KEY,
},
)
if resp.status_code != 200:
return []
items = resp.json().get("items", [])
except Exception:
return []
results = []
for i, item in enumerate(items):
snippet = item.get("snippet", {})
stats = item.get("statistics", {})
genre = _tags_to_genre(snippet.get("tags") or [])
results.append({
"source": "youtube",
"country": country,
"genre": genre,
"keyword": snippet.get("title", "")[:100],
"score": round(1.0 - i / max_results, 3),
"rank": i + 1,
"metadata": {
"video_id": item["id"],
"view_count": int(stats.get("viewCount", 0)),
"channel": snippet.get("channelTitle", ""),
},
})
return results
async def fetch_google_trends(keywords: List[str], countries: List[str]) -> List[Dict[str, Any]]:
"""pytrends — 키워드별 Google 관심도 (sync → threadpool)."""
try:
from pytrends.request import TrendReq
except ImportError:
return []
def _sync_fetch(kw: str) -> List[Dict[str, Any]]:
try:
pt = TrendReq(hl="en-US", tz=0, timeout=(5, 15))
pt.build_payload([kw], timeframe="now 7-d")
df = pt.interest_over_time()
if df.empty or kw not in df.columns:
return []
score = round(float(df[kw].mean()) / 100.0, 3)
return [
{"source": "google_trends", "country": c, "genre": "",
"keyword": kw, "score": score, "rank": None, "metadata": {}}
for c in countries
]
except Exception:
return []
loop = asyncio.get_running_loop()
results = []
for kw in keywords[:5]:
rows = await loop.run_in_executor(None, _sync_fetch, kw)
results.extend(rows)
await asyncio.sleep(1.0)
return results
async def fetch_billboard_top20() -> List[Dict[str, Any]]:
"""Billboard Hot 100 스크래핑 — 상위 20위."""
async with httpx.AsyncClient(
timeout=10.0,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"},
follow_redirects=True,
) as client:
try:
resp = await client.get("https://www.billboard.com/charts/hot-100/")
if resp.status_code != 200:
return []
titles = re.findall(
r'class="c-title[^"]*"[^>]*>\s*([^<\n]{3,80})\s*<', resp.text
)[:20]
return [
{"source": "billboard", "country": "US", "genre": "pop",
"keyword": t.strip(), "score": round(1.0 - i / 20, 3),
"rank": i + 1, "metadata": {}}
for i, t in enumerate(titles) if t.strip()
]
except Exception:
return []
async def push_to_music_lab(trends: List[Dict[str, Any]], report_date: str) -> bool:
"""수집한 트렌드를 music-lab /api/music/market/ingest로 push."""
async with httpx.AsyncClient(timeout=15.0) as client:
try:
resp = await client.post(
f"{MUSIC_LAB_URL}/api/music/market/ingest",
json={"trends": trends, "report_date": report_date},
)
return resp.status_code == 200
except Exception:
return False

View File

@@ -3,3 +3,5 @@ uvicorn[standard]==0.30.6
apscheduler==3.10.4 apscheduler==3.10.4
websockets>=12.0 websockets>=12.0
httpx>=0.27 httpx>=0.27
google-api-python-client>=2.100.0
pytrends>=4.9.2

View File

@@ -64,8 +64,12 @@ services:
- SUNO_API_KEY=${SUNO_API_KEY:-} - SUNO_API_KEY=${SUNO_API_KEY:-}
- MUSIC_MEDIA_BASE=${MUSIC_MEDIA_BASE:-/media/music} - MUSIC_MEDIA_BASE=${MUSIC_MEDIA_BASE:-/media/music}
- CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080} - CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080}
- PEXELS_API_KEY=${PEXELS_API_KEY:-}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
- VIDEO_DATA_DIR=${VIDEO_DATA_DIR:-/app/data/videos}
volumes: volumes:
- ${RUNTIME_PATH}/data/music:/app/data - ${RUNTIME_PATH}/data/music:/app/data
- ${RUNTIME_PATH:-.}/data/videos:/app/data/videos
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s interval: 30s
@@ -138,6 +142,7 @@ services:
- CONVERSATION_MODEL=${CONVERSATION_MODEL:-claude-haiku-4-5-20251001} - CONVERSATION_MODEL=${CONVERSATION_MODEL:-claude-haiku-4-5-20251001}
- CONVERSATION_HISTORY_LIMIT=${CONVERSATION_HISTORY_LIMIT:-20} - CONVERSATION_HISTORY_LIMIT=${CONVERSATION_HISTORY_LIMIT:-20}
- CONVERSATION_RATE_PER_MIN=${CONVERSATION_RATE_PER_MIN:-6} - CONVERSATION_RATE_PER_MIN=${CONVERSATION_RATE_PER_MIN:-6}
- YOUTUBE_DATA_API_KEY=${YOUTUBE_DATA_API_KEY:-}
volumes: volumes:
- ${RUNTIME_PATH:-.}/data/agent-office:/app/data - ${RUNTIME_PATH:-.}/data/agent-office:/app/data
depends_on: depends_on:
@@ -209,6 +214,7 @@ services:
- ${PHOTO_PATH}:/data/travel:ro - ${PHOTO_PATH}:/data/travel:ro
- ${RUNTIME_PATH}/travel-thumbs:/data/thumbs:ro - ${RUNTIME_PATH}/travel-thumbs:/data/thumbs:ro
- ${RUNTIME_PATH}/data/music:/data/music:ro - ${RUNTIME_PATH}/data/music:/data/music:ro
- ${RUNTIME_PATH}/data/videos:/data/videos:ro
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
healthcheck: healthcheck:

View File

@@ -1,6 +1,8 @@
FROM python:3.12-alpine FROM python:3.12-alpine
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
RUN apk add --no-cache ffmpeg
WORKDIR /app WORKDIR /app
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt

View File

@@ -95,6 +95,80 @@ def init_db() -> None:
except sqlite3.OperationalError: except sqlite3.OperationalError:
pass pass
# ── video_projects 테이블 ─────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS video_projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER,
format TEXT NOT NULL DEFAULT 'visualizer',
status TEXT NOT NULL DEFAULT 'pending',
output_path TEXT NOT NULL DEFAULT '',
output_url TEXT NOT NULL DEFAULT '',
thumbnail_path TEXT NOT NULL DEFAULT '',
target_countries TEXT NOT NULL DEFAULT '[]',
yt_title TEXT NOT NULL DEFAULT '',
yt_description TEXT NOT NULL DEFAULT '',
yt_tags TEXT NOT NULL DEFAULT '[]',
render_params TEXT NOT NULL DEFAULT '{}',
error TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
completed_at TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)")
# ── revenue_records 테이블 ────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS revenue_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
video_project_id INTEGER,
yt_video_id TEXT NOT NULL DEFAULT '',
record_month TEXT NOT NULL DEFAULT '',
views INTEGER NOT NULL DEFAULT 0,
watch_hours REAL NOT NULL DEFAULT 0.0,
revenue_usd REAL NOT NULL DEFAULT 0.0,
rpm_usd REAL NOT NULL DEFAULT 0.0,
country TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT 'manual',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(yt_video_id, record_month, country)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)")
# ── market_trends 테이블 ──────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS market_trends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL DEFAULT '',
country TEXT NOT NULL DEFAULT '',
genre TEXT NOT NULL DEFAULT '',
keyword TEXT NOT NULL DEFAULT '',
score REAL NOT NULL DEFAULT 0.0,
rank INTEGER,
metadata TEXT NOT NULL DEFAULT '{}',
collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_mt_country_source "
"ON market_trends(country, source, collected_at DESC)"
)
# ── trend_reports 테이블 ──────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS trend_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
report_date TEXT UNIQUE NOT NULL DEFAULT '',
top_genres TEXT NOT NULL DEFAULT '[]',
top_keywords TEXT NOT NULL DEFAULT '[]',
recommended_styles TEXT NOT NULL DEFAULT '[]',
insights TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# ── music_tasks CRUD ────────────────────────────────────────────────────────── # ── music_tasks CRUD ──────────────────────────────────────────────────────────
@@ -343,3 +417,286 @@ def delete_lyrics(lyrics_id: int) -> bool:
return False return False
conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,)) conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,))
return True return True
# ── video_projects CRUD ───────────────────────────────────────────────────────
def _vp_row_to_dict(r) -> dict:
return {
"id": r["id"],
"track_id": r["track_id"],
"format": r["format"],
"status": r["status"],
"output_path": r["output_path"],
"output_url": r["output_url"],
"thumbnail_path": r["thumbnail_path"],
"target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [],
"yt_title": r["yt_title"],
"yt_description": r["yt_description"],
"yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [],
"render_params": json.loads(r["render_params"]) if r["render_params"] else {},
"error": r["error"],
"created_at": r["created_at"],
"completed_at": r["completed_at"],
}
def create_video_project(data: dict) -> dict:
with _conn() as conn:
conn.execute(
"""INSERT INTO video_projects (track_id, format, target_countries, render_params)
VALUES (?, ?, ?, ?)""",
(data.get("track_id"), data.get("format", "visualizer"),
json.dumps(data.get("target_countries", [])),
json.dumps(data.get("render_params", {}))),
)
row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone()
return _vp_row_to_dict(row)
def get_video_project(project_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone()
return _vp_row_to_dict(row) if row else None
def get_all_video_projects() -> list:
with _conn() as conn:
rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall()
return [_vp_row_to_dict(r) for r in rows]
def update_video_project_status(
project_id: int,
status: str,
output_path: str = "",
output_url: str = "",
thumbnail_path: str = "",
yt_title: str = "",
yt_description: str = "",
yt_tags: list = None,
error: str = None,
) -> None:
completed_at_expr = (
"strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL"
)
with _conn() as conn:
conn.execute(
f"""UPDATE video_projects
SET status=?, output_path=?, output_url=?, thumbnail_path=?,
yt_title=?, yt_description=?, yt_tags=?, error=?,
completed_at={completed_at_expr}
WHERE id=?""",
(status, output_path, output_url, thumbnail_path,
yt_title, yt_description, json.dumps(yt_tags or []), error, project_id),
)
def delete_video_project(project_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,))
return True
def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone()
return _track_row_to_dict(row) if row else None
# ── revenue_records CRUD ──────────────────────────────────────────────────────
def _rr_row_to_dict(r) -> dict:
return {
"id": r["id"],
"video_project_id": r["video_project_id"],
"yt_video_id": r["yt_video_id"],
"record_month": r["record_month"],
"views": r["views"],
"watch_hours": r["watch_hours"],
"revenue_usd": r["revenue_usd"],
"rpm_usd": r["rpm_usd"],
"country": r["country"],
"source": r["source"],
"created_at": r["created_at"],
}
def create_revenue_record(data: dict) -> dict:
views = data.get("views", 0)
revenue = data.get("revenue_usd", 0.0)
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
with _conn() as conn:
conn.execute(
"""INSERT INTO revenue_records
(video_project_id, yt_video_id, record_month, views, watch_hours,
revenue_usd, rpm_usd, country, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(data.get("video_project_id"), data.get("yt_video_id", ""),
data.get("record_month", ""), views, data.get("watch_hours", 0.0),
revenue, rpm, data.get("country", ""), data.get("source", "manual")),
)
row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone()
return _rr_row_to_dict(row)
def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list:
with _conn() as conn:
q = "SELECT * FROM revenue_records WHERE 1=1"
params: list = []
if yt_video_id:
q += " AND yt_video_id=?"
params.append(yt_video_id)
if year_month:
q += " AND record_month=?"
params.append(year_month)
q += " ORDER BY record_month DESC"
rows = conn.execute(q, params).fetchall()
return [_rr_row_to_dict(r) for r in rows]
def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
if not row:
return None
cur = _rr_row_to_dict(row)
views = data.get("views", cur["views"])
revenue = data.get("revenue_usd", cur["revenue_usd"])
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
conn.execute(
"""UPDATE revenue_records
SET yt_video_id=?, record_month=?, views=?, watch_hours=?,
revenue_usd=?, rpm_usd=?, country=?, source=?
WHERE id=?""",
(data.get("yt_video_id", cur["yt_video_id"]),
data.get("record_month", cur["record_month"]),
views, data.get("watch_hours", cur["watch_hours"]),
revenue, rpm,
data.get("country", cur["country"]),
data.get("source", cur["source"]),
record_id),
)
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
return _rr_row_to_dict(row)
def delete_revenue_record(record_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,))
return True
def get_revenue_dashboard() -> dict:
with _conn() as conn:
total = conn.execute(
"SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records"
).fetchone()
by_month = conn.execute(
"""SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views,
CASE WHEN SUM(views) > 0 THEN ROUND(SUM(revenue_usd) / SUM(views) * 1000, 4) ELSE 0.0 END as avg_rpm FROM revenue_records
GROUP BY record_month ORDER BY record_month DESC LIMIT 12"""
).fetchall()
by_country = conn.execute(
"""SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views
FROM revenue_records WHERE country != ''
GROUP BY country ORDER BY revenue DESC LIMIT 10"""
).fetchall()
return {
"total_revenue_usd": total["total"] or 0.0,
"total_views": total["views"] or 0,
"total_watch_hours": total["hours"] or 0.0,
"by_month": [dict(r) for r in by_month],
"by_country": [dict(r) for r in by_country],
}
# ── market_trends CRUD ────────────────────────────────────────────────────────
def insert_market_trends(trends: list) -> None:
with _conn() as conn:
conn.executemany(
"""INSERT INTO market_trends (source, country, genre, keyword, score, rank, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
[(t.get("source",""), t.get("country",""), t.get("genre",""),
t.get("keyword",""), t.get("score", 0.0), t.get("rank"),
json.dumps(t.get("metadata", {})))
for t in trends],
)
def get_market_trends(
country: str = None, genre: str = None, source: str = None, days: int = 7
) -> list:
with _conn() as conn:
q = "SELECT * FROM market_trends WHERE collected_at >= datetime('now', ?)"
params: list = [f"-{days} days"]
if country:
q += " AND country=?"; params.append(country)
if genre:
q += " AND genre=?"; params.append(genre)
if source:
q += " AND source=?"; params.append(source)
q += " ORDER BY collected_at DESC LIMIT 500"
rows = conn.execute(q, params).fetchall()
return [
{"id": r["id"], "source": r["source"], "country": r["country"],
"genre": r["genre"], "keyword": r["keyword"], "score": r["score"],
"rank": r["rank"], "metadata": json.loads(r["metadata"]),
"collected_at": r["collected_at"]}
for r in rows
]
# ── trend_reports CRUD ────────────────────────────────────────────────────────
def upsert_trend_report(data: dict) -> None:
with _conn() as conn:
conn.execute(
"""INSERT INTO trend_reports
(report_date, top_genres, top_keywords, recommended_styles, insights)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(report_date) DO UPDATE SET
top_genres=excluded.top_genres,
top_keywords=excluded.top_keywords,
recommended_styles=excluded.recommended_styles,
insights=excluded.insights""",
(data["report_date"], json.dumps(data["top_genres"]),
json.dumps(data["top_keywords"]), json.dumps(data["recommended_styles"]),
data["insights"]),
)
def get_latest_trend_report() -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT 1"
).fetchone()
if not row:
return None
return {
"id": row["id"],
"report_date": row["report_date"],
"top_genres": json.loads(row["top_genres"]),
"top_keywords": json.loads(row["top_keywords"]),
"recommended_styles": json.loads(row["recommended_styles"]),
"insights": row["insights"],
"created_at": row["created_at"],
}
def get_trend_reports(limit: int = 10) -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT id, report_date, insights, created_at FROM trend_reports "
"ORDER BY report_date DESC LIMIT ?", (limit,)
).fetchall()
return [{"id": r["id"], "report_date": r["report_date"],
"insights": r["insights"][:100], "created_at": r["created_at"]}
for r in rows]

View File

@@ -1,17 +1,26 @@
import json
import os import os
import shutil
import uuid import uuid
from typing import List, Optional from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from pydantic import BaseModel
from .db import ( from .db import (
init_db, init_db,
create_task, get_task, create_task, get_task,
get_all_tracks, add_track, delete_track, get_track_file_path, get_track_by_task_id, get_all_tracks, add_track, delete_track, get_track_file_path, get_track_by_task_id, get_track_by_id,
update_track_duration, update_track_file_info, update_track_hash, update_track_duration, update_track_file_info, update_track_hash,
get_all_lyrics, add_lyrics, update_lyrics, delete_lyrics, get_all_lyrics, add_lyrics, update_lyrics, delete_lyrics,
create_video_project, get_video_project, get_all_video_projects,
update_video_project_status, delete_video_project,
create_revenue_record, get_all_revenue_records,
update_revenue_record, delete_revenue_record, get_revenue_dashboard,
get_market_trends as _get_market_trends,
get_latest_trend_report, get_trend_reports as _get_trend_reports,
) )
from .market import ingest_trends, get_suggestions
from .local_provider import run_local_generation from .local_provider import run_local_generation
from .suno_provider import ( from .suno_provider import (
run_suno_generation, run_suno_extend, run_vocal_removal, run_suno_generation, run_suno_extend, run_vocal_removal,
@@ -33,6 +42,7 @@ app.add_middleware(
) )
MUSIC_DATA_DIR = "/app/data" MUSIC_DATA_DIR = "/app/data"
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
def _get_mp3_duration(file_path: str) -> Optional[int]: def _get_mp3_duration(file_path: str) -> Optional[int]:
@@ -669,3 +679,182 @@ def remove_lyrics(lyrics_id: int):
if not delete_lyrics(lyrics_id): if not delete_lyrics(lyrics_id):
raise HTTPException(status_code=404, detail="Lyrics not found") raise HTTPException(status_code=404, detail="Lyrics not found")
return {"ok": True} return {"ok": True}
# ── 영상 프로젝트 모델 ────────────────────────────────────────────────────────
class VideoProjectCreate(BaseModel):
track_id: int
format: str = "visualizer"
target_countries: List[str] = []
render_params: dict = {}
class RevenueCreate(BaseModel):
video_project_id: Optional[int] = None
yt_video_id: str = ""
record_month: str
views: int = 0
watch_hours: float = 0.0
revenue_usd: float = 0.0
country: str = ""
source: str = "manual"
class RevenueUpdate(BaseModel):
yt_video_id: Optional[str] = None
record_month: Optional[str] = None
views: Optional[int] = None
watch_hours: Optional[float] = None
revenue_usd: Optional[float] = None
country: Optional[str] = None
source: Optional[str] = None
# ── 영상 프로젝트 API ─────────────────────────────────────────────────────────
@app.post("/api/music/video-project", status_code=201)
def create_project(req: VideoProjectCreate, background_tasks: BackgroundTasks):
if not get_track_by_id(req.track_id):
raise HTTPException(status_code=404, detail="Track not found")
if req.format not in ("visualizer", "slideshow"):
raise HTTPException(status_code=400, detail="format은 'visualizer' 또는 'slideshow'")
proj = create_video_project(req.model_dump())
return proj
@app.get("/api/music/video-projects")
def list_projects():
return {"projects": get_all_video_projects()}
@app.get("/api/music/video-project/{project_id}")
def get_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
return proj
@app.post("/api/music/video-project/{project_id}/render")
def render_project(project_id: int, background_tasks: BackgroundTasks):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] == "rendering":
raise HTTPException(status_code=409, detail="이미 렌더링 중입니다")
from .video_producer import produce_video
background_tasks.add_task(produce_video, project_id)
return {"ok": True, "project_id": project_id, "status": "rendering"}
@app.get("/api/music/video-project/{project_id}/export")
def export_project(project_id: int):
proj = get_video_project(project_id)
if not proj:
raise HTTPException(status_code=404, detail="Project not found")
if proj["status"] != "done":
raise HTTPException(status_code=400, detail=f"렌더링 미완료 (status: {proj['status']})")
meta_path = os.path.join(VIDEO_DATA_DIR, str(project_id), "metadata.json")
metadata = {}
if os.path.exists(meta_path):
with open(meta_path, encoding="utf-8") as f:
metadata = json.load(f)
thumb_url = proj["output_url"].replace("output.mp4", "thumbnail.jpg") if proj["output_url"] else ""
return {
"project_id": project_id,
"output_url": proj["output_url"],
"thumbnail_url": thumb_url,
"yt_title": proj["yt_title"],
"yt_description": proj["yt_description"],
"yt_tags": proj["yt_tags"],
"metadata": metadata,
}
@app.delete("/api/music/video-project/{project_id}")
def delete_project(project_id: int):
if not get_video_project(project_id):
raise HTTPException(status_code=404, detail="Project not found")
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
if os.path.isdir(out_dir):
shutil.rmtree(out_dir, ignore_errors=True)
delete_video_project(project_id)
return {"ok": True}
# ── 수익화 추적 API ───────────────────────────────────────────────────────────
@app.get("/api/music/revenue/dashboard")
def revenue_dashboard():
return get_revenue_dashboard()
@app.get("/api/music/revenue")
def list_revenue(yt_video_id: Optional[str] = None, year_month: Optional[str] = None):
return {"records": get_all_revenue_records(yt_video_id, year_month)}
@app.post("/api/music/revenue", status_code=201)
def add_revenue(req: RevenueCreate):
return create_revenue_record(req.model_dump())
@app.put("/api/music/revenue/{record_id}")
def edit_revenue(record_id: int, req: RevenueUpdate):
data = {k: v for k, v in req.model_dump().items() if v is not None}
result = update_revenue_record(record_id, data)
if not result:
raise HTTPException(status_code=404, detail="Record not found")
return result
@app.delete("/api/music/revenue/{record_id}")
def remove_revenue(record_id: int):
if not delete_revenue_record(record_id):
raise HTTPException(status_code=404, detail="Record not found")
return {"ok": True}
# ── 시장 조사 API ─────────────────────────────────────────────────────────────
class MarketIngestRequest(BaseModel):
trends: List[Dict[str, Any]]
report_date: str = ""
@app.post("/api/music/market/ingest")
def market_ingest(req: MarketIngestRequest):
"""agent-office → 트렌드 데이터 수신 + 리포트 생성."""
from datetime import date
report_date = req.report_date or date.today().isoformat()
report = ingest_trends(req.trends, report_date)
return {"ok": True, "trends_saved": len(req.trends), "report_date": report_date}
@app.get("/api/music/market/trends")
def list_market_trends(
country: Optional[str] = None,
genre: Optional[str] = None,
source: Optional[str] = None,
days: int = Query(7, ge=1),
):
return {"trends": _get_market_trends(country, genre, source, days)}
@app.get("/api/music/market/report/latest")
def get_market_report_latest():
report = get_latest_trend_report()
if not report:
raise HTTPException(status_code=404, detail="리포트 없음 — 아직 수집 전")
return report
@app.get("/api/music/market/report")
def list_market_reports(limit: int = 10):
return {"reports": _get_trend_reports(limit)}
@app.get("/api/music/market/suggest")
def market_suggest(limit: int = 5):
return {"suggestions": get_suggestions(limit)}

101
music-lab/app/market.py Normal file
View File

@@ -0,0 +1,101 @@
# music-lab/app/market.py
import os
from collections import Counter, defaultdict
from typing import Any, Dict, List, Optional
from .db import (
get_latest_trend_report, get_trend_reports,
insert_market_trends, upsert_trend_report,
)
GENRE_PROMPTS: Dict[str, str] = {
"lo-fi": "lo-fi hip hop, chill, relaxing beats, study music, 85 BPM, jazzy chords",
"phonk": "dark phonk, aggressive 808 bass, Memphis trap, distorted synths, 140 BPM",
"ambient": "ambient, atmospheric, ethereal pads, slow evolving textures, no percussion",
"pop": "upbeat pop, catchy melody, modern production, 120 BPM",
"funk": "baile funk, Brazilian funk, energetic, 150 BPM",
"latin": "reggaeton, latin pop, dembow rhythm, 100 BPM",
"general": "music, modern production, wide appeal",
}
def ingest_trends(trends: List[Dict[str, Any]], report_date: str) -> Dict[str, Any]:
"""agent-office 트렌드 수신 → 저장 + 리포트 생성."""
insert_market_trends(trends)
report = _build_report(trends, report_date)
upsert_trend_report(report)
return report
def _build_report(trends: List[Dict[str, Any]], report_date: str) -> Dict[str, Any]:
genre_scores: Dict[str, float] = defaultdict(float)
genre_countries: Dict[str, set] = defaultdict(set)
keywords: List[str] = []
for t in trends:
g = t.get("genre") or "general"
genre_scores[g] += t.get("score", 0.0)
genre_countries[g].add(t.get("country", ""))
kw = t.get("keyword", "")
if kw:
keywords.append(kw)
top_genres = sorted(
[{"genre": g, "score": round(s, 3), "countries": list(genre_countries[g])}
for g, s in genre_scores.items()],
key=lambda x: x["score"], reverse=True,
)[:10]
kw_counts = Counter(keywords)
top_keywords = [kw for kw, _ in kw_counts.most_common(15)]
recommended_styles = [
{
"genre": g["genre"],
"suno_prompt": GENRE_PROMPTS.get(g["genre"], GENRE_PROMPTS["general"]),
"target_countries": g["countries"][:3],
"reason": f"트렌딩 score {g['score']:.2f}",
}
for g in top_genres[:5]
]
return {
"report_date": report_date,
"top_genres": top_genres,
"top_keywords": top_keywords,
"recommended_styles": recommended_styles,
"insights": _generate_insights(top_genres, top_keywords),
}
def _generate_insights(top_genres: list, top_keywords: list) -> str:
api_key = os.getenv("ANTHROPIC_API_KEY", "")
if not top_genres:
return "아직 수집된 트렌드 데이터가 없습니다."
if not api_key:
names = ", ".join(g["genre"] for g in top_genres[:3])
return f"이번 주 인기 장르: {names}. 해당 장르 중심 제작을 추천합니다."
import anthropic
client = anthropic.Anthropic(api_key=api_key)
genre_str = ", ".join(f"{g['genre']}({g['score']:.1f})" for g in top_genres[:5])
kw_str = ", ".join(top_keywords[:10])
try:
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=300,
messages=[{"role": "user", "content":
f"YouTube 음악 트렌드 인사이트를 2-3문장으로 요약.\n"
f"인기 장르: {genre_str}\n인기 키워드: {kw_str}"}],
)
return msg.content[0].text.strip()
except Exception:
names = ", ".join(g["genre"] for g in top_genres[:3])
return f"인기 장르: {names}."
def get_suggestions(limit: int = 5) -> List[Dict[str, Any]]:
report = get_latest_trend_report()
if not report:
return []
return report.get("recommended_styles", [])[:limit]

View File

@@ -0,0 +1,256 @@
import json
import os
import subprocess
from typing import Optional
import requests
from .db import get_video_project, get_track_by_id, update_video_project_status
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
GENRE_COLORS = {
"lo-fi": ((26, 26, 46), (22, 33, 62)),
"phonk": ((26, 10, 10), (45, 0, 0)),
"ambient": ((13, 33, 55), (10, 22, 40)),
"pop": ((26, 10, 46), (45, 27, 78)),
"default": ((17, 24, 39), (31, 41, 55)),
}
def _make_gradient_bg(width: int, height: int, genre: str, output_path: str) -> None:
from PIL import Image
top_rgb, bot_rgb = GENRE_COLORS.get(genre.lower(), GENRE_COLORS["default"])
img = Image.new("RGB", (width, height))
pixels = img.load()
for y in range(height):
t = y / height
r = int(top_rgb[0] + (bot_rgb[0] - top_rgb[0]) * t)
g = int(top_rgb[1] + (bot_rgb[1] - top_rgb[1]) * t)
b = int(top_rgb[2] + (bot_rgb[2] - top_rgb[2]) * t)
for x in range(width):
pixels[x, y] = (r, g, b)
img.save(output_path, "JPEG", quality=95)
def _build_visualizer_cmd(audio_path: str, bg_path: str, output_path: str) -> list:
return [
"ffmpeg", "-y",
"-loop", "1", "-i", bg_path,
"-i", audio_path,
"-filter_complex",
"[1:a]showwaves=s=1920x200:mode=cline:colors=0xFF4444@0.8[wave];"
"[0:v][wave]overlay=0:880[out]",
"-map", "[out]", "-map", "1:a",
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "192k",
"-shortest", output_path,
]
def _build_thumbnail_cmd(video_path: str, thumb_path: str) -> list:
return [
"ffmpeg", "-y",
"-i", video_path,
"-ss", "00:00:05",
"-vframes", "1",
"-q:v", "2",
thumb_path,
]
def _build_slideshow_cmd(
image_paths: list, audio_path: str, output_path: str, duration_per_image: float
) -> list:
n = len(image_paths)
inputs = []
for p in image_paths:
inputs += ["-i", p]
inputs += ["-i", audio_path]
scale = (
"scale=1920:1080:force_original_aspect_ratio=decrease,"
"pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1"
)
filter_parts = [f"[{i}:v]{scale}[v{i}]" for i in range(n)]
xd = 1.0
if n == 1:
filter_str = ";".join(filter_parts) + ";[v0]copy[out]"
else:
filter_str = ";".join(filter_parts)
prev = "v0"
for i in range(1, n):
offset = max(0.0, duration_per_image * i - xd * i)
nxt = "out" if i == n - 1 else f"xf{i}"
filter_str += (
f";[{prev}][v{i}]xfade=transition=fade:"
f"duration={xd}:offset={offset:.2f}[{nxt}]"
)
prev = nxt
return [
"ffmpeg", "-y",
*inputs,
"-filter_complex", filter_str,
"-map", "[out]", "-map", f"{n}:a",
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "192k",
"-shortest", output_path,
]
def _fetch_pexels_images(keywords: list, count: int = 5) -> list:
if not PEXELS_API_KEY or not keywords:
return []
query = " ".join(k for k in keywords if k)[:60]
try:
resp = requests.get(
"https://api.pexels.com/v1/search",
headers={"Authorization": PEXELS_API_KEY},
params={"query": query, "per_page": count, "orientation": "landscape"},
timeout=10,
)
if resp.status_code != 200:
return []
return [p["src"]["large2x"] for p in resp.json().get("photos", [])]
except Exception:
return []
def _download_url(url: str, dest_path: str) -> bool:
try:
resp = requests.get(url, timeout=30, stream=True)
resp.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
return True
except Exception:
return False
def _generate_metadata(genre: str, moods: list, lyrics: str, target_countries: list) -> dict:
if not ANTHROPIC_API_KEY:
tags = [genre] + moods[:3] if genre else moods[:3]
return {"yt_title": f"{genre or 'Chill'} Music", "yt_description": "", "yt_tags": tags}
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
countries_str = ", ".join(target_countries) if target_countries else "global"
prompt = (
f"YouTube 음악 영상 메타데이터를 JSON으로 생성해주세요.\n"
f"장르: {genre}\n분위기: {', '.join(moods)}\n"
f"가사 일부: {lyrics[:200] if lyrics else '인스트루멘탈'}\n"
f"타겟 국가: {countries_str}\n\n"
'{"yt_title":"제목(최대100자,SEO최적화)","yt_description":"설명(500자이내,해시태그포함)",'
'"yt_tags":["태그1",...]} 형식으로만 응답.'
)
try:
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
text = msg.content[0].text
start, end = text.find("{"), text.rfind("}") + 1
return json.loads(text[start:end])
except Exception:
return {"yt_title": f"{genre or 'Music'} - Chill Beats", "yt_description": "", "yt_tags": [genre] if genre else []}
def _render_visualizer(track: dict, proj: dict, output_path: str) -> None:
out_dir = os.path.dirname(output_path)
bg_path = os.path.join(out_dir, "bg.jpg")
cover_images = track.get("cover_images") or []
if cover_images:
ok = _download_url(cover_images[0], bg_path)
if not ok:
cover_images = []
if not cover_images:
_make_gradient_bg(1920, 1080, track.get("genre", "default"), bg_path)
cmd = _build_visualizer_cmd(track["file_path"], bg_path, output_path)
subprocess.run(cmd, check=True, capture_output=True)
def _render_slideshow(track: dict, proj: dict, output_path: str) -> None:
out_dir = os.path.dirname(output_path)
img_dir = os.path.join(out_dir, "imgs")
os.makedirs(img_dir, exist_ok=True)
moods = track.get("moods") or []
genre = track.get("genre", "")
keywords = [genre] + moods[:2] if genre else moods[:3]
pexels_urls = _fetch_pexels_images(keywords, count=5)
suno_cover_urls = track.get("cover_images") or []
all_urls = pexels_urls + suno_cover_urls[:2]
img_paths = []
for i, url in enumerate(all_urls):
dest = os.path.join(img_dir, f"img_{i:02d}.jpg")
if url and _download_url(url, dest):
img_paths.append(dest)
if not img_paths:
bg = os.path.join(img_dir, "bg_fallback.jpg")
_make_gradient_bg(1920, 1080, genre or "default", bg)
img_paths = [bg]
duration = track.get("duration_sec") or 180
dur_per_img = max(3.0, duration / len(img_paths))
cmd = _build_slideshow_cmd(img_paths, track["file_path"], output_path, dur_per_img)
subprocess.run(cmd, check=True, capture_output=True)
def produce_video(project_id: int) -> None:
proj = get_video_project(project_id)
if not proj:
return
update_video_project_status(project_id, "rendering")
try:
track = get_track_by_id(proj["track_id"])
if not track or not track.get("file_path"):
raise ValueError(f"트랙 파일 없음 (track_id={proj['track_id']})")
out_dir = os.path.join(VIDEO_DATA_DIR, str(project_id))
os.makedirs(out_dir, exist_ok=True)
output_path = os.path.join(out_dir, "output.mp4")
if proj["format"] == "visualizer":
_render_visualizer(track, proj, output_path)
elif proj["format"] == "slideshow":
_render_slideshow(track, proj, output_path)
else:
raise ValueError(f"Unknown format: {proj['format']}")
thumb_path = os.path.join(out_dir, "thumbnail.jpg")
subprocess.run(_build_thumbnail_cmd(output_path, thumb_path), check=True, capture_output=True)
meta = _generate_metadata(
genre=track.get("genre", ""),
moods=track.get("moods") or [],
lyrics=track.get("lyrics", ""),
target_countries=proj.get("target_countries", []),
)
with open(os.path.join(out_dir, "metadata.json"), "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
update_video_project_status(
project_id, "done",
output_path=output_path,
output_url=f"{VIDEO_MEDIA_BASE}/{project_id}/output.mp4",
thumbnail_path=thumb_path,
yt_title=meta.get("yt_title", ""),
yt_description=meta.get("yt_description", ""),
yt_tags=meta.get("yt_tags", []),
)
except Exception as e:
update_video_project_status(project_id, "failed", error=str(e))

3
music-lab/pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
testpaths = tests
pythonpath = .

View File

@@ -3,3 +3,7 @@ uvicorn[standard]==0.30.6
requests==2.32.3 requests==2.32.3
python-multipart==0.0.12 python-multipart==0.0.12
mutagen==1.47.0 mutagen==1.47.0
anthropic>=0.40.0
Pillow>=11.0.0
pytest>=8.0.0
httpx>=0.27.0

View File

@@ -0,0 +1,7 @@
import pytest
@pytest.fixture
def tmp_db(tmp_path, monkeypatch):
db_path = str(tmp_path / "test_music.db")
monkeypatch.setattr("app.db.DB_PATH", db_path)
return db_path

View File

@@ -0,0 +1,125 @@
import pytest
def test_create_and_get_video_project(tmp_db):
from app.db import init_db, create_video_project, get_video_project
init_db()
proj = create_video_project({"track_id": 1, "format": "visualizer", "target_countries": ["BR", "ID"]})
assert proj["id"] == 1
assert proj["format"] == "visualizer"
assert proj["status"] == "pending"
assert "BR" in proj["target_countries"]
fetched = get_video_project(1)
assert fetched["id"] == 1
assert fetched["track_id"] == 1
def test_update_video_project_status(tmp_db):
from app.db import init_db, create_video_project, update_video_project_status, get_video_project
init_db()
create_video_project({"track_id": 2, "format": "slideshow"})
update_video_project_status(
1, "done",
output_path="/data/videos/1/output.mp4",
output_url="/media/videos/1/output.mp4",
thumbnail_path="/data/videos/1/thumbnail.jpg",
yt_title="Chill Beats Brazil",
yt_description="relaxing lofi",
yt_tags=["lofi", "chill"],
)
proj = get_video_project(1)
assert proj["status"] == "done"
assert proj["yt_title"] == "Chill Beats Brazil"
assert "lofi" in proj["yt_tags"]
assert proj["completed_at"] is not None
def test_delete_video_project(tmp_db):
from app.db import init_db, create_video_project, delete_video_project, get_video_project
init_db()
create_video_project({"track_id": 1, "format": "visualizer"})
assert delete_video_project(1) is True
assert get_video_project(1) is None
assert delete_video_project(99) is False
def test_create_revenue_record(tmp_db):
from app.db import init_db, create_revenue_record, get_all_revenue_records
import pytest
init_db()
rec = create_revenue_record({
"yt_video_id": "abc123",
"record_month": "2026-04",
"views": 10000,
"watch_hours": 500.0,
"revenue_usd": 25.0,
"country": "BR",
})
assert rec["id"] == 1
assert rec["rpm_usd"] == pytest.approx(2.5)
records = get_all_revenue_records(yt_video_id="abc123")
assert len(records) == 1
def test_revenue_dashboard(tmp_db):
from app.db import init_db, create_revenue_record, get_revenue_dashboard
init_db()
create_revenue_record({"yt_video_id": "v1", "record_month": "2026-04", "views": 5000, "revenue_usd": 10.0})
create_revenue_record({"yt_video_id": "v2", "record_month": "2026-04", "views": 5000, "revenue_usd": 15.0})
dash = get_revenue_dashboard()
assert dash["total_revenue_usd"] == pytest.approx(25.0)
assert dash["total_views"] == 10000
assert len(dash["by_month"]) == 1
def test_create_revenue_record_zero_views(tmp_db):
from app.db import init_db, create_revenue_record
init_db()
rec = create_revenue_record({
"yt_video_id": "zero",
"record_month": "2026-04",
"views": 0,
"revenue_usd": 0.0,
})
assert rec["rpm_usd"] == 0.0
def test_update_revenue_record(tmp_db):
from app.db import init_db, create_revenue_record, update_revenue_record
init_db()
create_revenue_record({"yt_video_id": "x", "record_month": "2026-04", "views": 1000, "revenue_usd": 5.0})
updated = update_revenue_record(1, {"views": 2000, "revenue_usd": 8.0})
assert updated["rpm_usd"] == pytest.approx(4.0)
assert update_revenue_record(999, {}) is None
def test_delete_revenue_record(tmp_db):
from app.db import init_db, create_revenue_record, delete_revenue_record, get_all_revenue_records
init_db()
create_revenue_record({"yt_video_id": "del", "record_month": "2026-04", "views": 100, "revenue_usd": 1.0})
assert delete_revenue_record(1) is True
assert delete_revenue_record(99) is False
assert get_all_revenue_records(yt_video_id="del") == []
def test_video_project_status_failed(tmp_db):
from app.db import init_db, create_video_project, update_video_project_status, get_video_project
init_db()
create_video_project({"track_id": 1, "format": "visualizer"})
update_video_project_status(1, "failed", error="FFmpeg error")
proj = get_video_project(1)
assert proj["status"] == "failed"
assert proj["completed_at"] is not None
assert proj["error"] == "FFmpeg error"
def test_video_project_status_rendering_no_completed_at(tmp_db):
from app.db import init_db, create_video_project, update_video_project_status, get_video_project
init_db()
create_video_project({"track_id": 1, "format": "visualizer"})
update_video_project_status(1, "rendering")
proj = get_video_project(1)
assert proj["status"] == "rendering"
assert proj["completed_at"] is None

View File

@@ -0,0 +1,99 @@
# music-lab/tests/test_market.py
def test_ingest_and_report(tmp_db):
from app.db import init_db
from app.market import ingest_trends, get_suggestions
init_db()
trends = [
{"source": "youtube", "country": "BR", "genre": "lo-fi", "keyword": "lofi study", "score": 0.9, "rank": 1, "metadata": {}},
{"source": "youtube", "country": "ID", "genre": "pop", "keyword": "pop hits", "score": 0.7, "rank": 2, "metadata": {}},
{"source": "billboard", "country": "US", "genre": "pop", "keyword": "top 40", "score": 0.8, "rank": 1, "metadata": {}},
]
report = ingest_trends(trends, "2026-05-01")
assert report["report_date"] == "2026-05-01"
assert len(report["top_genres"]) >= 2
# pop이 lo-fi보다 score 높아야 함 (2건)
genres_by_score = [g["genre"] for g in report["top_genres"]]
assert genres_by_score[0] == "pop"
suggestions = get_suggestions(limit=3)
assert len(suggestions) >= 1
assert "suno_prompt" in suggestions[0]
def test_ingest_idempotent(tmp_db):
"""같은 날 두 번 ingest해도 report가 upsert 돼야 함."""
from app.db import init_db, get_trend_reports
from app.market import ingest_trends
init_db()
trends = [{"source": "youtube", "country": "BR", "genre": "lo-fi",
"keyword": "chill", "score": 0.8, "rank": 1, "metadata": {}}]
ingest_trends(trends, "2026-05-01")
ingest_trends(trends, "2026-05-01") # 두 번째
reports = get_trend_reports()
assert len([r for r in reports if r["report_date"] == "2026-05-01"]) == 1
from fastapi.testclient import TestClient
def test_market_endpoints_empty(tmp_db):
"""Empty DB: /report/latest returns 404, /suggest returns []."""
from app.db import init_db
init_db()
from app.main import app
client = TestClient(app)
resp = client.get("/api/music/market/report/latest")
assert resp.status_code == 404
resp = client.get("/api/music/market/suggest")
assert resp.status_code == 200
assert resp.json()["suggestions"] == []
def test_market_ingest_endpoint(tmp_db):
"""POST /ingest returns ok, GET /report/latest returns report, GET /trends returns data."""
from app.db import init_db
init_db()
from app.main import app
client = TestClient(app)
payload = {
"trends": [
{"source": "youtube", "country": "BR", "genre": "lo-fi",
"keyword": "lofi", "score": 0.9, "rank": 1, "metadata": {}},
],
"report_date": "2026-05-01",
}
resp = client.post("/api/music/market/ingest", json=payload)
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True
assert data["trends_saved"] == 1
assert data["report_date"] == "2026-05-01"
resp = client.get("/api/music/market/report/latest")
assert resp.status_code == 200
assert resp.json()["report_date"] == "2026-05-01"
resp = client.get("/api/music/market/trends")
assert resp.status_code == 200
assert len(resp.json()["trends"]) == 1
def test_ingest_empty_trends(tmp_db):
"""Empty trends list ingests without error and returns a well-formed report."""
from app.db import init_db
from app.market import ingest_trends
init_db()
report = ingest_trends([], "2026-05-02")
assert report["report_date"] == "2026-05-02"
assert report["top_genres"] == []
assert report["recommended_styles"] == []
assert isinstance(report["insights"], str)
assert len(report["insights"]) > 0

View File

@@ -0,0 +1,111 @@
# music-lab/tests/test_video_producer.py
import os
from unittest.mock import patch, MagicMock
def test_build_visualizer_cmd():
from app.video_producer import _build_visualizer_cmd
cmd = _build_visualizer_cmd(
audio_path="/data/music/test.mp3",
bg_path="/tmp/bg.jpg",
output_path="/data/videos/1/output.mp4",
)
assert cmd[0] == "ffmpeg"
assert "/data/music/test.mp3" in cmd
assert "/data/videos/1/output.mp4" in cmd
assert any("showwaves" in str(c) for c in cmd)
def test_make_gradient_bg_uses_pillow(tmp_path):
from app.video_producer import _make_gradient_bg
out = str(tmp_path / "bg.jpg")
_make_gradient_bg(1920, 1080, "lo-fi", out)
assert os.path.exists(out)
assert os.path.getsize(out) > 0
def test_extract_thumbnail_cmd():
from app.video_producer import _build_thumbnail_cmd
cmd = _build_thumbnail_cmd("/data/videos/1/output.mp4", "/data/videos/1/thumbnail.jpg")
assert cmd[0] == "ffmpeg"
assert "00:00:05" in cmd
assert "/data/videos/1/thumbnail.jpg" in cmd
def test_build_slideshow_cmd_single_image():
from app.video_producer import _build_slideshow_cmd
cmd = _build_slideshow_cmd(
image_paths=["/tmp/img0.jpg"],
audio_path="/tmp/audio.mp3",
output_path="/tmp/out.mp4",
duration_per_image=30.0,
)
assert "ffmpeg" in cmd[0]
assert "/tmp/out.mp4" in cmd
assert any("copy" in str(c) for c in cmd)
def test_build_slideshow_cmd_multiple_images():
from app.video_producer import _build_slideshow_cmd
cmd = _build_slideshow_cmd(
image_paths=["/tmp/img0.jpg", "/tmp/img1.jpg", "/tmp/img2.jpg"],
audio_path="/tmp/audio.mp3",
output_path="/tmp/out.mp4",
duration_per_image=60.0,
)
assert "ffmpeg" in cmd[0]
assert any("xfade" in str(c) for c in cmd)
assert "/tmp/out.mp4" in cmd
def test_produce_video_visualizer_calls_ffmpeg(tmp_db, tmp_path, monkeypatch):
"""produce_video가 visualizer 포맷으로 FFmpeg를 호출하는지 확인."""
from app.db import init_db, create_video_project
init_db()
# music_library에 직접 트랙 삽입
import app.db as db_mod
with db_mod._conn() as conn:
conn.execute(
"""INSERT INTO music_library (title, genre, audio_url, file_path, provider)
VALUES (?, ?, ?, ?, ?)""",
("Test Track", "lo-fi", "/media/music/test.mp3",
str(tmp_path / "test.mp3"), "suno"),
)
# 빈 mp3 파일 생성
(tmp_path / "test.mp3").write_bytes(b"\x00" * 100)
create_video_project({"track_id": 1, "format": "visualizer", "target_countries": ["BR"]})
import app.video_producer as vp
monkeypatch.setattr("app.video_producer.VIDEO_DATA_DIR", str(tmp_path / "videos"))
with patch("app.video_producer.subprocess.run") as mock_run, \
patch("app.video_producer._generate_metadata", return_value={
"yt_title": "Chill Beats", "yt_description": "desc", "yt_tags": ["lofi"]
}), \
patch("app.video_producer._download_url", return_value=False):
mock_run.return_value = MagicMock(returncode=0)
vp.produce_video(1)
from app.db import get_video_project
proj = get_video_project(1)
assert proj["status"] == "done"
assert mock_run.called
def test_build_slideshow_cmd_offset_calculation():
from app.video_producer import _build_slideshow_cmd
imgs = ["/tmp/img0.jpg", "/tmp/img1.jpg", "/tmp/img2.jpg"]
cmd = _build_slideshow_cmd(imgs, "/tmp/audio.mp3", "/tmp/out.mp4", duration_per_image=30.0)
# filter_complex 문자열 추출
fc_idx = cmd.index("-filter_complex")
fc = cmd[fc_idx + 1]
# xfade이 2번 등장해야 함 (이미지 3개 → 전환 2번)
assert fc.count("xfade") == 2
# 첫 번째 xfade offset: 30*1 - 1*1 = 29.0
assert "offset=29.00" in fc
# 두 번째 xfade offset: 30*2 - 1*2 = 58.0
assert "offset=58.00" in fc

View File

@@ -33,6 +33,17 @@ server {
autoindex off; autoindex off;
} }
# music videos — Nginx가 직접 비디오 파일 서빙
location ^~ /media/videos/ {
alias /data/videos/;
expires 1d;
add_header Cache-Control "public, max-age=86400" always;
add_header Accept-Ranges bytes always; # 비디오 스트리밍 범위 요청 지원
autoindex off;
}
# music API — 변수 기반 proxy_pass + $request_uri로 전체 경로 전달 # music API — 변수 기반 proxy_pass + $request_uri로 전체 경로 전달
location /api/music/ { location /api/music/ {
resolver 127.0.0.11 valid=10s; resolver 127.0.0.11 valid=10s;