# Music YouTube Pipeline Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 트랙 → 영상 → YouTube 발행까지 단계별 텔레그램 승인 파이프라인 구축. 각 단계 자동 산출물 생성 → 텔레그램 알림 → 자연어 응답으로 승인/반려/피드백 처리 → 다음 단계 또는 재생성. **Architecture:** music-lab(생성+상태머신+OAuth+업로드) ↔ agent-office(텔레그램 단일 채널 + 자연어 분류 + 폴링 오케스트레이션) ↔ web-ui(구성·진행 탭 + Library 트리거). 모든 AI/생성 작업은 BackgroundTask + DB job 상태 폴링. **Tech Stack:** - music-lab: FastAPI, SQLite, Anthropic Claude SDK, OpenAI SDK, google-api-python-client, google-auth-oauthlib, FFmpeg, Pillow - agent-office: FastAPI, APScheduler, python-telegram (기존), Anthropic SDK - web-ui: React 18, Vite, fetch-based API helpers (기존 `src/api.js`) - Tests: pytest + httpx 모킹 (`respx`/`httpx_mock`), freezegun, 기존 conftest 패턴 **Spec:** `docs/superpowers/specs/2026-05-07-music-youtube-pipeline-design.md` --- ## File Structure ### music-lab (`web-backend/music-lab/`) | 경로 | 책임 | |------|------| | `app/db.py` (modify) | 5개 신규 테이블 + 헬퍼 함수 | | `app/pipeline/__init__.py` (new) | 패키지 초기화 | | `app/pipeline/state_machine.py` (new) | 상태 전이 검증, `transition()` 함수 | | `app/pipeline/orchestrator.py` (new) | `start_step(pipeline_id, step)` BackgroundTask 등록 | | `app/pipeline/cover.py` (new) | DALL·E 3 호출 + 그라데이션 폴백 | | `app/pipeline/video.py` (move/rename from video_producer.py) | FFmpeg visualizer/슬라이드쇼 (커버 입력 지원) | | `app/pipeline/thumb.py` (new) | 썸네일 추출 + 텍스트 오버레이 | | `app/pipeline/metadata.py` (new) | Claude Haiku 메타 생성 + 템플릿 치환 | | `app/pipeline/review.py` (new) | Claude Sonnet 4축 검토 + 가중평균 | | `app/pipeline/youtube.py` (new) | OAuth flow + resumable upload | | `app/pipeline/storage.py` (new) | `/data/videos/{id}/` 디렉토리 관리 | | `app/pipeline/setup.py` (new) | youtube_setup CRUD | | `app/main.py` (modify) | 13개 엔드포인트 추가 | | `tests/test_state_machine.py` (new) | 전이 검증 | | `tests/test_pipeline_endpoints.py` (new) | CRUD + feedback | | `tests/test_cover_generation.py` (new) | DALL·E mock + 폴백 | | `tests/test_metadata_generation.py` (new) | Claude mock + 템플릿 | | `tests/test_review.py` (new) | 4축 검토 + verdict | | `tests/test_youtube_upload.py` (new) | google-api mock + retry | | `requirements.txt` (modify) | openai, google-api-python-client, google-auth-oauthlib | ### agent-office (`web-backend/agent-office/`) | 경로 | 책임 | |------|------| | `app/agents/youtube_publisher.py` (new) | 오케스트레이터 — poll + classify + feedback | | `app/agents/__init__.py` (modify) | AGENT_REGISTRY 등록 | | `app/scheduler.py` (modify) | `_poll_pipelines` 30초 잡 추가 | | `app/telegram/conversational.py` (modify) | reply 매칭 → youtube_publisher 라우팅 | | `app/service_proxy.py` (modify) | music-lab pipeline 헬퍼 | | `tests/test_classify_intent.py` (new) | 화이트리스트/LLM 분기 | | `tests/test_pipeline_polling.py` (new) | 멱등 폴링 | ### web-ui (`web-ui/`) | 경로 | 책임 | |------|------| | `src/api.js` (modify) | pipeline/setup/youtube 헬퍼 | | `src/pages/music/components/SetupTab.jsx` (new) | 구성 탭 | | `src/pages/music/components/PipelineTab.jsx` (new) | 진행 탭 | | `src/pages/music/components/PipelineCard.jsx` (new) | 카드 1장 (진행도/현재상태/피드백) | | `src/pages/music/components/PipelineStartModal.jsx` (new) | Library 트랙 선택 모달 | | `src/pages/music/components/YoutubeTab.jsx` (modify) | 서브탭 6개로 | | `src/pages/music/components/Library.jsx` 또는 MusicStudio 라이브러리 부분 (modify) | "🎬 영상 파이프라인" 버튼 | | `src/pages/music/MusicStudio.css` (modify) | 진행 탭/구성 탭 스타일 | ### docker-compose / nginx | 경로 | 변경 | |------|------| | `docker-compose.yml` (modify) | music-lab env 4개 추가 | | `nginx/conf.d/default.conf` (modify) | `/api/music/youtube/callback` 외부 노출 | | `music-lab/Dockerfile` (modify) | 새 의존성 빌드 | --- ## Task 1: music-lab DB 신규 테이블 + 헬퍼 **Files:** - Modify: `music-lab/app/db.py` - Test: `music-lab/tests/test_pipeline_db.py` - [ ] **Step 1: Write the failing test** `tests/test_pipeline_db.py` ```python import os import tempfile import pytest from app import db @pytest.fixture def fresh_db(monkeypatch, tmp_path): db_path = tmp_path / "music.db" monkeypatch.setattr(db, "DB_PATH", str(db_path)) db.init_db() return db_path def test_create_pipeline_inserts_row(fresh_db): pid = db.create_pipeline(track_id=1) row = db.get_pipeline(pid) assert row["id"] == pid assert row["state"] == "created" assert row["track_id"] == 1 assert row["feedback_count_per_step"] == {} def test_update_pipeline_state_records_started_at(fresh_db, freezer): pid = db.create_pipeline(track_id=1) freezer.move_to("2026-05-07T08:00:00") db.update_pipeline_state(pid, "cover_pending") row = db.get_pipeline(pid) assert row["state"] == "cover_pending" assert row["state_started_at"] == "2026-05-07T08:00:00" def test_increment_feedback_count(fresh_db): pid = db.create_pipeline(track_id=1) db.increment_feedback_count(pid, "cover") db.increment_feedback_count(pid, "cover") row = db.get_pipeline(pid) assert row["feedback_count_per_step"] == {"cover": 2} def test_record_feedback(fresh_db): pid = db.create_pipeline(track_id=1) db.record_feedback(pid, "cover", "더 어둡게") rows = db.get_feedback_history(pid) assert len(rows) == 1 assert rows[0]["feedback_text"] == "더 어둡게" def test_create_pipeline_job_lifecycle(fresh_db): pid = db.create_pipeline(track_id=1) job_id = db.create_pipeline_job(pid, "cover") db.update_pipeline_job(job_id, status="running") db.update_pipeline_job(job_id, status="succeeded", duration_ms=1234) jobs = db.list_pipeline_jobs(pid) assert jobs[0]["status"] == "succeeded" assert jobs[0]["duration_ms"] == 1234 def test_youtube_setup_default_row_created_on_init(fresh_db): setup = db.get_youtube_setup() assert setup["review_threshold"] == 60 assert "metadata_template_json" in setup def test_youtube_oauth_token_upsert(fresh_db): db.upsert_oauth_token( channel_id="UC123", channel_title="My Channel", avatar_url="https://...", refresh_token="r1", access_token="a1", expires_at="2026-05-07T09:00:00", ) tok = db.get_oauth_token() assert tok["channel_id"] == "UC123" assert tok["refresh_token"] == "r1" db.upsert_oauth_token( channel_id="UC123", channel_title="My Channel", avatar_url=None, refresh_token="r2", access_token="a2", expires_at="2026-05-07T10:00:00", ) tok = db.get_oauth_token() assert tok["refresh_token"] == "r2" # upsert ``` `requirements.txt`에 `freezegun` 추가 필요. - [ ] **Step 2: Run test to verify it fails** Run: `cd music-lab && python -m pytest tests/test_pipeline_db.py -v` Expected: ImportError on `db.create_pipeline` 등 (함수 미존재) - [ ] **Step 3: Add tables and helpers to `db.py`** `init_db()` 끝에 다음 5개 `CREATE TABLE IF NOT EXISTS` 추가: ```python def init_db(): # ... existing tables ... cursor.execute(""" CREATE TABLE IF NOT EXISTS video_pipelines ( id INTEGER PRIMARY KEY AUTOINCREMENT, track_id INTEGER NOT NULL, state TEXT NOT NULL DEFAULT 'created', state_started_at TEXT NOT NULL, cover_url TEXT, video_url TEXT, thumbnail_url TEXT, metadata_json TEXT, review_json TEXT, youtube_video_id TEXT, feedback_count_per_step TEXT NOT NULL DEFAULT '{}', last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, cancelled_at TEXT, failed_reason TEXT ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS pipeline_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, pipeline_id INTEGER NOT NULL, step TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'queued', error TEXT, started_at TEXT, finished_at TEXT, duration_ms INTEGER, FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS pipeline_feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, pipeline_id INTEGER NOT NULL, step TEXT NOT NULL, feedback_text TEXT NOT NULL, received_at TEXT NOT NULL, FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS youtube_oauth_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT NOT NULL, channel_title TEXT, avatar_url TEXT, refresh_token TEXT NOT NULL, access_token TEXT, expires_at TEXT, created_at TEXT NOT NULL ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS youtube_setup ( id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1), metadata_template_json TEXT NOT NULL, cover_prompts_json TEXT NOT NULL, review_weights_json TEXT NOT NULL, review_threshold INTEGER NOT NULL DEFAULT 60, visual_defaults_json TEXT NOT NULL, publish_policy_json TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # 기본 setup 1행 보장 cursor.execute("SELECT COUNT(*) FROM youtube_setup") if cursor.fetchone()[0] == 0: import json from datetime import datetime defaults = ( json.dumps({ "title": "[{genre}] {title} | {bpm}BPM", "description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n", "tags": ["lofi", "chill", "instrumental"], "category_id": 10, }), json.dumps({ "lo-fi": "moody anime cityscape at dusk, lofi aesthetic", "phonk": "dark drift car aesthetic, neon, phonk vibe", "ambient": "ethereal mountain landscape, ambient mood", "default": "abstract music album cover art", }), json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}), 60, json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}), json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}), datetime.utcnow().isoformat(timespec="seconds"), ) cursor.execute(""" INSERT INTO youtube_setup (metadata_template_json, cover_prompts_json, review_weights_json, review_threshold, visual_defaults_json, publish_policy_json, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, defaults) conn.commit() conn.close() ``` 다음 헬퍼 함수 추가 (파일 하단에): ```python import json as _json from datetime import datetime as _dt def _now() -> str: return _dt.utcnow().isoformat(timespec="seconds") def create_pipeline(track_id: int) -> int: conn = sqlite3.connect(DB_PATH) cur = conn.cursor() now = _now() cur.execute(""" INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at) VALUES (?, 'created', ?, ?, ?) """, (track_id, now, now, now)) pid = cur.lastrowid conn.commit() conn.close() return pid def get_pipeline(pid: int) -> dict | None: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row row = conn.execute("SELECT * FROM video_pipelines WHERE id = ?", (pid,)).fetchone() conn.close() if not row: return None d = dict(row) d["feedback_count_per_step"] = _json.loads(d["feedback_count_per_step"] or "{}") d["last_telegram_msg_ids"] = _json.loads(d["last_telegram_msg_ids"] or "{}") if d.get("metadata_json"): d["metadata"] = _json.loads(d["metadata_json"]) if d.get("review_json"): d["review"] = _json.loads(d["review_json"]) return d def update_pipeline_state(pid: int, state: str, **fields) -> None: cols = ["state = ?", "state_started_at = ?", "updated_at = ?"] vals = [state, _now(), _now()] for k, v in fields.items(): cols.append(f"{k} = ?") vals.append(v) vals.append(pid) conn = sqlite3.connect(DB_PATH) conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals) conn.commit() conn.close() def list_pipelines(active_only: bool = False) -> list[dict]: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row if active_only: rows = conn.execute(""" SELECT * FROM video_pipelines WHERE state NOT IN ('published','cancelled','failed','awaiting_manual') ORDER BY created_at DESC """).fetchall() else: rows = conn.execute("SELECT * FROM video_pipelines ORDER BY created_at DESC").fetchall() conn.close() return [get_pipeline(r["id"]) for r in rows] def increment_feedback_count(pid: int, step: str) -> int: p = get_pipeline(pid) counts = p["feedback_count_per_step"] counts[step] = counts.get(step, 0) + 1 conn = sqlite3.connect(DB_PATH) conn.execute("UPDATE video_pipelines SET feedback_count_per_step = ?, updated_at = ? WHERE id = ?", (_json.dumps(counts), _now(), pid)) conn.commit() conn.close() return counts[step] def record_feedback(pid: int, step: str, feedback_text: str) -> None: conn = sqlite3.connect(DB_PATH) conn.execute(""" INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at) VALUES (?, ?, ?, ?) """, (pid, step, feedback_text, _now())) conn.commit() conn.close() def get_feedback_history(pid: int) -> list[dict]: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row rows = conn.execute(""" SELECT * FROM pipeline_feedback WHERE pipeline_id = ? ORDER BY id DESC """, (pid,)).fetchall() conn.close() return [dict(r) for r in rows] def create_pipeline_job(pid: int, step: str) -> int: conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute(""" INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at) VALUES (?, ?, 'queued', ?) """, (pid, step, _now())) job_id = cur.lastrowid conn.commit() conn.close() return job_id def update_pipeline_job(job_id: int, **fields) -> None: if "status" in fields and fields["status"] in ("succeeded", "failed"): fields["finished_at"] = _now() cols = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [job_id] conn = sqlite3.connect(DB_PATH) conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals) conn.commit() conn.close() def list_pipeline_jobs(pid: int) -> list[dict]: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row rows = conn.execute(""" SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC """, (pid,)).fetchall() conn.close() return [dict(r) for r in rows] def get_youtube_setup() -> dict: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone() conn.close() d = dict(row) for k in ("metadata_template_json", "cover_prompts_json", "review_weights_json", "visual_defaults_json", "publish_policy_json"): d[k.replace("_json", "")] = _json.loads(d[k]) return d def update_youtube_setup(**kwargs) -> None: field_map = { "metadata_template": "metadata_template_json", "cover_prompts": "cover_prompts_json", "review_weights": "review_weights_json", "visual_defaults": "visual_defaults_json", "publish_policy": "publish_policy_json", } cols = [] vals = [] for k, v in kwargs.items(): if k in field_map: cols.append(f"{field_map[k]} = ?") vals.append(_json.dumps(v)) elif k == "review_threshold": cols.append("review_threshold = ?") vals.append(int(v)) if not cols: return cols.append("updated_at = ?") vals.append(_now()) conn = sqlite3.connect(DB_PATH) conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals) conn.commit() conn.close() def upsert_oauth_token(channel_id: str, channel_title: str | None, avatar_url: str | None, refresh_token: str, access_token: str | None, expires_at: str | None) -> None: conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("DELETE FROM youtube_oauth_tokens") cur.execute(""" INSERT INTO youtube_oauth_tokens (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now())) conn.commit() conn.close() def get_oauth_token() -> dict | None: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone() conn.close() return dict(row) if row else None def delete_oauth_token() -> None: conn = sqlite3.connect(DB_PATH) conn.execute("DELETE FROM youtube_oauth_tokens") conn.commit() conn.close() ``` - [ ] **Step 4: Run tests to verify pass** Run: `cd music-lab && python -m pytest tests/test_pipeline_db.py -v` Expected: 모두 PASS (7 tests) - [ ] **Step 5: Commit** ```bash git add music-lab/app/db.py music-lab/tests/test_pipeline_db.py music-lab/requirements.txt git commit -m "feat(music-lab): pipeline 5개 DB 테이블 + 헬퍼" ``` --- ## Task 2: 상태 머신 **Files:** - Create: `music-lab/app/pipeline/__init__.py` - Create: `music-lab/app/pipeline/state_machine.py` - Test: `music-lab/tests/test_state_machine.py` - [ ] **Step 1: Write the failing test** ```python # tests/test_state_machine.py import pytest from app.pipeline.state_machine import ( next_state_on_approve, next_state_on_reject, can_transition, STEPS, USER_GATES, ) def test_steps_sequence(): assert STEPS == ["cover", "video", "thumb", "meta", "review", "publish"] def test_user_gates_excludes_review(): assert "review" not in USER_GATES assert "publish" in USER_GATES assert "cover" in USER_GATES def test_approve_progression(): assert next_state_on_approve("cover_pending") == "video_pending" assert next_state_on_approve("video_pending") == "thumb_pending" assert next_state_on_approve("thumb_pending") == "meta_pending" assert next_state_on_approve("meta_pending") == "ai_review" assert next_state_on_approve("publish_pending") == "publishing" def test_approve_invalid_state_raises(): with pytest.raises(ValueError): next_state_on_approve("ai_review") # 자동 전이 — approve 호출 자체가 무효 def test_reject_keeps_same_state(): # 반려는 같은 *_pending 상태를 유지(재생성 트리거) assert next_state_on_reject("cover_pending") == "cover_pending" assert next_state_on_reject("publish_pending") == "publish_pending" def test_can_transition_blocks_terminal_states(): assert not can_transition("published", "cover_pending") assert not can_transition("cancelled", "cover_pending") assert not can_transition("failed", "cover_pending") def test_can_transition_allows_cancel_from_anywhere(): assert can_transition("cover_pending", "cancelled") assert can_transition("publishing", "cancelled") def test_can_transition_allows_failed_from_pending(): assert can_transition("video_pending", "failed") assert can_transition("publishing", "failed") ``` - [ ] **Step 2: Run test to verify it fails** Run: `cd music-lab && python -m pytest tests/test_state_machine.py -v` Expected: ImportError - [ ] **Step 3: Implement `state_machine.py`** `app/pipeline/__init__.py`: ```python # 빈 파일 ``` `app/pipeline/state_machine.py`: ```python """파이프라인 상태 머신 — 전이 규칙 단일 소스.""" STEPS = ["cover", "video", "thumb", "meta", "review", "publish"] USER_GATES = ["cover", "video", "thumb", "meta", "publish"] # review는 자동 _APPROVE_NEXT = { "cover_pending": "video_pending", "video_pending": "thumb_pending", "thumb_pending": "meta_pending", "meta_pending": "ai_review", # 자동 검토 단계로 "publish_pending": "publishing", } TERMINAL_STATES = {"published", "cancelled", "failed", "awaiting_manual"} def next_state_on_approve(state: str) -> str: if state not in _APPROVE_NEXT: raise ValueError(f"승인 불가 상태: {state}") return _APPROVE_NEXT[state] def next_state_on_reject(state: str) -> str: if not state.endswith("_pending"): raise ValueError(f"반려 불가 상태: {state}") return state # 같은 상태 유지 (재생성 후 다시 _pending) def can_transition(from_state: str, to_state: str) -> bool: if from_state in TERMINAL_STATES: return False if to_state in {"cancelled", "failed", "awaiting_manual"}: return True if to_state == _APPROVE_NEXT.get(from_state): return True # 자동 전이 (ai_review → publish_pending, publishing → published) auto_transitions = { ("ai_review", "publish_pending"), ("publishing", "published"), } return (from_state, to_state) in auto_transitions ``` - [ ] **Step 4: Run tests** Run: `cd music-lab && python -m pytest tests/test_state_machine.py -v` Expected: 8 PASS - [ ] **Step 5: Commit** ```bash git add music-lab/app/pipeline/ music-lab/tests/test_state_machine.py git commit -m "feat(music-lab): pipeline 상태 머신" ``` --- ## Task 3: Storage 헬퍼 + Cover 생성 (DALL·E + 폴백) **Files:** - Create: `music-lab/app/pipeline/storage.py` - Create: `music-lab/app/pipeline/cover.py` - Test: `music-lab/tests/test_cover_generation.py` - Modify: `music-lab/requirements.txt` (`openai`, `respx`) - [ ] **Step 1: Add deps** `requirements.txt`에 추가: `openai>=1.20.0`, `respx>=0.21`, `freezegun>=1.4` - [ ] **Step 2: Write failing test** ```python # tests/test_cover_generation.py import pytest import respx from httpx import Response from app.pipeline import cover, storage @pytest.fixture def tmp_storage(monkeypatch, tmp_path): monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path)) return tmp_path @pytest.mark.asyncio @respx.mock async def test_dalle_success_saves_jpg(tmp_storage, monkeypatch): monkeypatch.setenv("OPENAI_API_KEY", "test-key") image_url = "https://oaidalleapiprodscus.blob.core.windows.net/x.png" respx.post("https://api.openai.com/v1/images/generations").mock( return_value=Response(200, json={"data": [{"url": image_url}]}) ) # PNG 1x1 픽셀 (간단) png_bytes = bytes.fromhex( "89504e470d0a1a0a0000000d49484452000000010000000108020000009077" "53de0000000c4944415478da6300010000050001" ) respx.get(image_url).mock(return_value=Response(200, content=png_bytes)) out = await cover.generate(pipeline_id=42, genre="lo-fi", prompt_template="moody anime", mood="chill") assert out["used_fallback"] is False assert out["url"].startswith("/media/videos/42/cover") assert (tmp_storage / "42" / "cover.jpg").exists() @pytest.mark.asyncio @respx.mock async def test_dalle_timeout_falls_back_to_gradient(tmp_storage, monkeypatch): monkeypatch.setenv("OPENAI_API_KEY", "test-key") respx.post("https://api.openai.com/v1/images/generations").mock( side_effect=lambda req: Response(504) ) out = await cover.generate(pipeline_id=43, genre="phonk", prompt_template="dark drift", mood="aggressive", track_title="Midnight Drive") assert out["used_fallback"] is True assert (tmp_storage / "43" / "cover.jpg").exists() @pytest.mark.asyncio async def test_no_api_key_falls_back(tmp_storage, monkeypatch): monkeypatch.delenv("OPENAI_API_KEY", raising=False) out = await cover.generate(pipeline_id=44, genre="ambient", prompt_template="x", mood="calm", track_title="Calm") assert out["used_fallback"] is True @pytest.mark.asyncio @respx.mock async def test_dalle_with_feedback_appends_to_prompt(tmp_storage, monkeypatch): monkeypatch.setenv("OPENAI_API_KEY", "test-key") captured = {} def hook(req): import json as _json captured["body"] = _json.loads(req.content) return Response(200, json={"data": [{"url": "https://x"}]}) respx.post("https://api.openai.com/v1/images/generations").mock(side_effect=hook) respx.get("https://x").mock(return_value=Response(200, content=b"\x89PNG\r\n\x1a\n")) await cover.generate(pipeline_id=45, genre="lo-fi", prompt_template="moody anime", mood="chill", feedback="더 어둡게") assert "더 어둡게" in captured["body"]["prompt"] ``` `pytest.ini` 또는 `conftest.py`에 `asyncio_mode = auto` 또는 마커 등록 필요. 기존 conftest 확인. - [ ] **Step 3: Run test, verify fail** Run: `python -m pytest tests/test_cover_generation.py -v` Expected: ImportError - [ ] **Step 4: Implement `storage.py`** ```python """파이프라인 산출물 디렉토리 관리.""" import os VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos") def pipeline_dir(pipeline_id: int) -> str: path = os.path.join(VIDEO_DATA_DIR, str(pipeline_id)) os.makedirs(path, exist_ok=True) return path def media_url(pipeline_id: int, filename: str) -> str: return f"{VIDEO_MEDIA_BASE}/{pipeline_id}/{filename}" ``` - [ ] **Step 5: Implement `cover.py`** ```python """AI 커버 아트 생성 — DALL·E 3 + 그라데이션 폴백.""" import os import logging from typing import Optional import httpx from PIL import Image, ImageDraw, ImageFont from . import storage logger = logging.getLogger("music-lab.cover") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") OPENAI_MODEL = os.getenv("OPENAI_IMAGE_MODEL", "gpt-image-1") DALLE_TIMEOUT_S = 90 # 그라데이션 폴백 색 (장르별 RGB 페어) GRADIENT_COLORS = { "lo-fi": ((26, 26, 46), (22, 33, 62)), "phonk": ((26, 10, 10), (45, 0, 0)), "ambient": ((13, 33, 55), (10, 22, 40)), "pop": ((26, 10, 46), (45, 27, 78)), "default": ((17, 24, 39), (31, 41, 55)), } async def generate(*, pipeline_id: int, genre: str, prompt_template: str, mood: str = "", track_title: str = "", feedback: str = "") -> dict: """커버 아트 생성. 성공 시 jpg 저장 + URL 반환. 실패 시 그라데이션 폴백. 반환: {"url": str, "used_fallback": bool, "error": str | None} """ out_path = os.path.join(storage.pipeline_dir(pipeline_id), "cover.jpg") used_fallback = False error = None if OPENAI_API_KEY: try: await _generate_with_dalle(prompt_template, mood, feedback, out_path) except Exception as e: logger.warning("DALL·E 실패 — 폴백: %s", e) error = str(e) used_fallback = True _generate_gradient(genre, track_title, out_path) else: used_fallback = True error = "OPENAI_API_KEY 미설정" _generate_gradient(genre, track_title, out_path) return { "url": storage.media_url(pipeline_id, "cover.jpg"), "used_fallback": used_fallback, "error": error, } async def _generate_with_dalle(prompt_template: str, mood: str, feedback: str, out_path: str) -> None: prompt = prompt_template if mood: prompt = f"{prompt}, {mood} mood" if feedback: prompt = f"{prompt}. 추가 지시: {feedback}" prompt = f"{prompt}, no text, high quality" async with httpx.AsyncClient(timeout=DALLE_TIMEOUT_S) as client: resp = await client.post( "https://api.openai.com/v1/images/generations", headers={"Authorization": f"Bearer {OPENAI_API_KEY}"}, json={"model": OPENAI_MODEL, "prompt": prompt, "size": "1024x1024", "n": 1}, ) resp.raise_for_status() url = resp.json()["data"][0]["url"] img_resp = await client.get(url) img_resp.raise_for_status() # PNG → JPG 변환 from io import BytesIO img = Image.open(BytesIO(img_resp.content)).convert("RGB") img.save(out_path, "JPEG", quality=92) def _generate_gradient(genre: str, track_title: str, out_path: str) -> None: w, h = 1024, 1024 top, bot = GRADIENT_COLORS.get(genre.lower(), GRADIENT_COLORS["default"]) img = Image.new("RGB", (w, h)) px = img.load() for y in range(h): t = y / h r = int(top[0] + (bot[0] - top[0]) * t) g = int(top[1] + (bot[1] - top[1]) * t) b = int(top[2] + (bot[2] - top[2]) * t) for x in range(w): px[x, y] = (r, g, b) if track_title: try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 64) except OSError: font = ImageFont.load_default() draw = ImageDraw.Draw(img) bbox = draw.textbbox((0, 0), track_title, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] draw.text(((w - tw) // 2, (h - th) // 2), track_title, fill=(255, 255, 255), font=font) img.save(out_path, "JPEG", quality=92) ``` - [ ] **Step 6: Run tests** Run: `python -m pytest tests/test_cover_generation.py -v` Expected: 4 PASS - [ ] **Step 7: Commit** ```bash git add music-lab/app/pipeline/storage.py music-lab/app/pipeline/cover.py \ music-lab/tests/test_cover_generation.py music-lab/requirements.txt git commit -m "feat(music-lab): AI 커버 생성 + 그라데이션 폴백" ``` --- ## Task 4: 영상/썸네일 생성 (FFmpeg, 기존 video_producer 이전) **Files:** - Create: `music-lab/app/pipeline/video.py` (move logic from `app/video_producer.py`) - Create: `music-lab/app/pipeline/thumb.py` - Test: `music-lab/tests/test_video_thumb.py` - [ ] **Step 1: Write failing test** FFmpeg 직접 실행 대신 subprocess.run을 mock 처리. ```python # tests/test_video_thumb.py import os import pytest from unittest.mock import patch, MagicMock from app.pipeline import video, thumb, storage @pytest.fixture def tmp_storage(monkeypatch, tmp_path): monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path)) # 더미 입력 파일들 audio = tmp_path / "audio.mp3" audio.write_bytes(b"\x00" * 100) cover = tmp_path / str(50) / "cover.jpg" cover.parent.mkdir(parents=True) cover.write_bytes(b"\x00" * 100) return tmp_path @patch("subprocess.run") def test_generate_video_calls_ffmpeg(mock_run, tmp_storage): mock_run.return_value = MagicMock(returncode=0, stderr="") out = video.generate(pipeline_id=50, audio_path=str(tmp_storage / "audio.mp3"), cover_path=str(tmp_storage / "50" / "cover.jpg"), genre="lo-fi", duration_sec=120, resolution="1920x1080", style="visualizer") assert out["url"].endswith("/50/video.mp4") assert out["used_fallback"] is False args = mock_run.call_args[0][0] assert args[0] == "ffmpeg" assert "-i" in args assert "showwaves" in " ".join(args) @patch("subprocess.run") def test_generate_video_failure_marks_failed(mock_run, tmp_storage): mock_run.return_value = MagicMock(returncode=1, stderr="bad codec") with pytest.raises(video.VideoGenerationError): video.generate(pipeline_id=51, audio_path=str(tmp_storage / "audio.mp3"), cover_path=str(tmp_storage / "50" / "cover.jpg"), genre="lo-fi", duration_sec=120, resolution="1920x1080", style="visualizer") @patch("subprocess.run") def test_thumb_extracts_frame(mock_run, tmp_storage): mock_run.return_value = MagicMock(returncode=0, stderr="") video_path = tmp_storage / "60" / "video.mp4" video_path.parent.mkdir(parents=True) video_path.write_bytes(b"\x00" * 100) out = thumb.generate(pipeline_id=60, video_path=str(video_path), track_title="Midnight Drive", overlay_text=True) assert out["url"].endswith("/60/thumbnail.jpg") args = mock_run.call_args[0][0] assert args[0] == "ffmpeg" ``` - [ ] **Step 2: Run, verify fail** Run: `python -m pytest tests/test_video_thumb.py -v` Expected: ImportError - [ ] **Step 3: Implement `video.py`** — 기존 `video_producer.py`의 ffmpeg 명령 빌더 활용 ```python """영상 비주얼 생성 — visualizer/슬라이드쇼 스타일.""" import os import subprocess import logging from . import storage logger = logging.getLogger("music-lab.video") VIDEO_TIMEOUT_S = 300 # 5분 class VideoGenerationError(Exception): pass def generate(*, pipeline_id: int, audio_path: str, cover_path: str, genre: str, duration_sec: int, resolution: str = "1920x1080", style: str = "visualizer") -> dict: """영상 생성. 성공 시 mp4 저장 + URL 반환. 실패 시 예외.""" w, h = resolution.split("x") out_path = os.path.join(storage.pipeline_dir(pipeline_id), "video.mp4") if style == "visualizer": cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h) else: # 차후: 슬라이드쇼 등 다른 스타일 cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h) logger.info("ffmpeg 실행: %s", " ".join(cmd)) result = subprocess.run(cmd, capture_output=True, text=True, timeout=VIDEO_TIMEOUT_S) if result.returncode != 0: raise VideoGenerationError(f"ffmpeg 실패: {result.stderr[:500]}") return { "url": storage.media_url(pipeline_id, "video.mp4"), "used_fallback": False, "duration_sec": duration_sec, } def _build_visualizer_cmd(audio: str, bg: str, out: str, w: str, h: str) -> list: return [ "ffmpeg", "-y", "-loop", "1", "-i", bg, "-i", audio, "-filter_complex", f"[0:v]scale={w}:{h}[bg];" f"[1:a]showwaves=s={w}x200:mode=cline:colors=0xFF4444@0.8[wave];" f"[bg][wave]overlay=0:({h}-200)[out]", "-map", "[out]", "-map", "1:a", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "192k", "-shortest", out, ] ``` - [ ] **Step 4: Implement `thumb.py`** ```python """썸네일 생성 — 영상 5초 프레임 추출 + 텍스트 오버레이.""" import os import subprocess import logging from PIL import Image, ImageDraw, ImageFont from . import storage logger = logging.getLogger("music-lab.thumb") THUMB_TIMEOUT_S = 60 class ThumbGenerationError(Exception): pass def generate(*, pipeline_id: int, video_path: str, track_title: str = "", overlay_text: bool = True) -> dict: out_path = os.path.join(storage.pipeline_dir(pipeline_id), "thumbnail.jpg") cmd = ["ffmpeg", "-y", "-i", video_path, "-ss", "00:00:05", "-vframes", "1", "-q:v", "2", out_path] result = subprocess.run(cmd, capture_output=True, text=True, timeout=THUMB_TIMEOUT_S) if result.returncode != 0: raise ThumbGenerationError(f"ffmpeg 썸네일 실패: {result.stderr[:300]}") if overlay_text and track_title: _overlay_title(out_path, track_title) return {"url": storage.media_url(pipeline_id, "thumbnail.jpg"), "used_fallback": False} def _overlay_title(path: str, title: str) -> None: try: img = Image.open(path).convert("RGB") draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 80) except OSError: font = ImageFont.load_default() # 하단 30% 영역에 검정 반투명 박스 + 흰 글씨 w, h = img.size box_h = int(h * 0.3) overlay = Image.new("RGBA", (w, box_h), (0, 0, 0, 160)) img.paste(overlay, (0, h - box_h), overlay) bbox = draw.textbbox((0, 0), title, font=font) tw = bbox[2] - bbox[0] draw.text(((w - tw) // 2, h - box_h + 30), title, fill=(255, 255, 255), font=font) img.save(path, "JPEG", quality=92) except Exception as e: logger.warning("썸네일 오버레이 실패: %s", e) ``` - [ ] **Step 5: Run tests** Run: `python -m pytest tests/test_video_thumb.py -v` Expected: 3 PASS - [ ] **Step 6: Commit** ```bash git add music-lab/app/pipeline/video.py music-lab/app/pipeline/thumb.py \ music-lab/tests/test_video_thumb.py git commit -m "feat(music-lab): pipeline 영상·썸네일 생성" ``` --- ## Task 5: 메타데이터 생성 (Claude Haiku) **Files:** - Create: `music-lab/app/pipeline/metadata.py` - Test: `music-lab/tests/test_metadata_generation.py` - [ ] **Step 1: Write failing test** ```python # tests/test_metadata_generation.py import pytest import respx from httpx import Response from app.pipeline import metadata @pytest.mark.asyncio @respx.mock async def test_metadata_calls_claude_and_parses_json(monkeypatch): monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") payload = { "content": [{"type": "text", "text": '{"title":"[Lo-fi] Drive | 85BPM",' '"description":"chill","tags":["lofi","85bpm"],' '"category_id":10}'}] } respx.post("https://api.anthropic.com/v1/messages").mock( return_value=Response(200, json=payload) ) result = await metadata.generate( track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", "moods": ["chill"], "instruments": ["piano"]}, template={"title": "[{genre}] {title} | {bpm}BPM", "description": "{title}\n", "tags": [], "category_id": 10}, trend_keywords=["lofi", "study"], feedback="", ) assert result["title"].startswith("[Lo-fi]") assert "lofi" in result["tags"] @pytest.mark.asyncio @respx.mock async def test_metadata_fallback_when_no_api_key(monkeypatch): monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) result = await metadata.generate( track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", "moods": [], "instruments": []}, template={"title": "[{genre}] {title} | {bpm}BPM", "description": "{title}", "tags": ["lofi"], "category_id": 10}, trend_keywords=[], ) # 템플릿 변수 그대로 치환된 폴백 assert result["title"] == "[lo-fi] Drive | 85BPM" assert result["used_fallback"] is True @pytest.mark.asyncio @respx.mock async def test_metadata_includes_feedback_in_prompt(monkeypatch): monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") captured = {} def hook(req): import json captured["body"] = json.loads(req.content) return Response(200, json={"content": [{"type": "text", "text": '{"title":"x","description":"y","tags":[],"category_id":10}'}]}) respx.post("https://api.anthropic.com/v1/messages").mock(side_effect=hook) await metadata.generate( track={"title": "X", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", "moods": [], "instruments": []}, template={"title": "{title}", "description": "{title}", "tags": [], "category_id": 10}, trend_keywords=[], feedback="제목을 짧게", ) assert "제목을 짧게" in str(captured["body"]) ``` - [ ] **Step 2: Run, verify fail** Run: `python -m pytest tests/test_metadata_generation.py -v` Expected: ImportError - [ ] **Step 3: Implement `metadata.py`** ```python """메타데이터 생성 — Claude Haiku + 템플릿 폴백.""" import os import json import logging import httpx logger = logging.getLogger("music-lab.metadata") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") CLAUDE_MODEL = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001") TIMEOUT_S = 30 async def generate(*, track: dict, template: dict, trend_keywords: list[str], feedback: str = "") -> dict: """메타데이터 생성. 성공 시 LLM, 실패/미설정 시 템플릿 치환 폴백. 반환: {"title", "description", "tags", "category_id", "used_fallback", "error"} """ if not ANTHROPIC_API_KEY: return {**_fallback_template(track, template), "used_fallback": True, "error": "no api key"} try: result = await _call_claude(track, template, trend_keywords, feedback) return {**result, "used_fallback": False, "error": None} except Exception as e: logger.warning("메타데이터 LLM 실패 — 폴백: %s", e) return {**_fallback_template(track, template), "used_fallback": True, "error": str(e)} def _fallback_template(track: dict, template: dict) -> dict: fmt_vars = { "title": track.get("title", ""), "genre": track.get("genre", ""), "bpm": track.get("bpm", ""), "key": track.get("key", ""), "scale": track.get("scale", ""), } title = template.get("title", "{title}").format(**fmt_vars) description = template.get("description", "{title}").format(**fmt_vars) return { "title": title[:100], "description": description[:5000], "tags": (template.get("tags") or [])[:15], "category_id": template.get("category_id", 10), } async def _call_claude(track: dict, template: dict, trend_keywords: list[str], feedback: str) -> dict: user_prompt = ( "다음 트랙의 YouTube 메타데이터를 생성하세요. JSON으로만 응답.\n\n" f"트랙: {json.dumps(track, ensure_ascii=False)}\n" f"템플릿: {json.dumps(template, ensure_ascii=False)}\n" f"트렌드 키워드: {', '.join(trend_keywords)}\n" ) if feedback: user_prompt += f"\n사용자 피드백: {feedback}\n" user_prompt += ( '\n출력 JSON: {"title": "60자 이내", "description": "1000자 이내, 3-5문단",' ' "tags": ["15개 이내"], "category_id": 10}' ) async with httpx.AsyncClient(timeout=TIMEOUT_S) as client: resp = await client.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", }, json={ "model": CLAUDE_MODEL, "max_tokens": 1024, "messages": [{"role": "user", "content": user_prompt}], }, ) resp.raise_for_status() text = resp.json()["content"][0]["text"] # 가장 첫 JSON 블록 추출 start = text.find("{") end = text.rfind("}") + 1 return json.loads(text[start:end]) ``` - [ ] **Step 4: Run tests** Run: `python -m pytest tests/test_metadata_generation.py -v` Expected: 3 PASS - [ ] **Step 5: Commit** ```bash git add music-lab/app/pipeline/metadata.py music-lab/tests/test_metadata_generation.py git commit -m "feat(music-lab): pipeline 메타데이터 LLM 생성 + 폴백" ``` --- ## Task 6: AI 최종 검토 (4축) **Files:** - Create: `music-lab/app/pipeline/review.py` - Test: `music-lab/tests/test_review.py` - [ ] **Step 1: Write failing test** ```python # tests/test_review.py import pytest import respx from httpx import Response from app.pipeline import review @pytest.mark.asyncio @respx.mock async def test_review_returns_pass_when_above_threshold(monkeypatch): monkeypatch.setenv("ANTHROPIC_API_KEY", "k") body = {"content": [{"type": "text", "text": '{"metadata_quality":{"score":80,"notes":"x"},' '"policy_compliance":{"score":90,"issues":[]},' '"viewer_experience":{"score":75,"notes":"y"},' '"trend_alignment":{"score":70,"matched_keywords":["lofi"]},' '"summary":"good"}'}]} respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body)) result = await review.run_4_axis( pipeline={"id": 1}, track={"title": "x", "genre": "lo-fi", "bpm": 85}, video_meta={"length_sec": 120, "resolution": "1920x1080"}, metadata={"title": "Y", "description": "Z", "tags": ["lofi"], "category_id": 10}, thumbnail_url="/m/x.jpg", trend_top=["lofi"], weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20}, threshold=60, ) assert result["verdict"] == "pass" expected_total = 0.25 * 80 + 0.30 * 90 + 0.25 * 75 + 0.20 * 70 assert result["weighted_total"] == pytest.approx(expected_total, abs=0.01) @pytest.mark.asyncio @respx.mock async def test_review_fail_below_threshold(monkeypatch): monkeypatch.setenv("ANTHROPIC_API_KEY", "k") body = {"content": [{"type": "text", "text": '{"metadata_quality":{"score":40,"notes":"x"},' '"policy_compliance":{"score":50,"issues":[]},' '"viewer_experience":{"score":30,"notes":"y"},' '"trend_alignment":{"score":20,"matched_keywords":[]},' '"summary":"bad"}'}]} respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body)) result = await review.run_4_axis( pipeline={"id": 2}, track={"title": "x", "genre": "lo-fi", "bpm": 85}, video_meta={"length_sec": 120, "resolution": "1920x1080"}, metadata={"title": "Y", "description": "Z", "tags": [], "category_id": 10}, thumbnail_url="/m/x.jpg", trend_top=[], weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20}, threshold=60, ) assert result["verdict"] == "fail" @pytest.mark.asyncio @respx.mock async def test_review_heuristic_fallback_on_llm_error(monkeypatch): monkeypatch.setenv("ANTHROPIC_API_KEY", "k") respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(500)) result = await review.run_4_axis( pipeline={"id": 3}, track={"title": "x", "genre": "lo-fi", "bpm": 85}, video_meta={"length_sec": 120, "resolution": "1920x1080"}, metadata={"title": "Y" * 30, "description": "Z" * 200, "tags": ["a", "b"], "category_id": 10}, thumbnail_url="/m/x.jpg", trend_top=["lofi"], weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20}, threshold=60, ) assert result["used_fallback"] is True assert "weighted_total" in result ``` - [ ] **Step 2: Run, verify fail** Run: `python -m pytest tests/test_review.py -v` Expected: ImportError - [ ] **Step 3: Implement `review.py`** ```python """AI 최종 검토 — 4축(메타/정책/시청/트렌드) 가중 평균.""" import os import json import logging import httpx logger = logging.getLogger("music-lab.review") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") CLAUDE_MODEL = os.getenv("CLAUDE_SONNET_MODEL", "claude-sonnet-4-6") TIMEOUT_S = 60 POLICY_BANNED = {"f-word", "n-word"} # 실제 단어는 운영 시 별도 파일로 — 데모용 자리 async def run_4_axis(*, pipeline: dict, track: dict, video_meta: dict, metadata: dict, thumbnail_url: str, trend_top: list[str], weights: dict, threshold: int) -> dict: if not ANTHROPIC_API_KEY: return _heuristic(metadata, video_meta, track, trend_top, weights, threshold, fallback_reason="no api key") try: scores = await _call_claude(pipeline, track, video_meta, metadata, thumbnail_url, trend_top) return _weighted_verdict(scores, weights, threshold, used_fallback=False) except Exception as e: logger.warning("검토 LLM 실패 — 휴리스틱: %s", e) return _heuristic(metadata, video_meta, track, trend_top, weights, threshold, fallback_reason=str(e)) def _weighted_verdict(scores: dict, weights: dict, threshold: int, used_fallback: bool) -> dict: total = ( weights["meta"] / 100 * scores["metadata_quality"]["score"] + weights["policy"] / 100 * scores["policy_compliance"]["score"] + weights["viewer"] / 100 * scores["viewer_experience"]["score"] + weights["trend"] / 100 * scores["trend_alignment"]["score"] ) return { **scores, "weighted_total": round(total, 2), "verdict": "pass" if total >= threshold else "fail", "used_fallback": used_fallback, } async def _call_claude(pipeline, track, video_meta, metadata, thumbnail_url, trend_top): user = ( "트랙·영상·메타데이터를 4축으로 평가하고 JSON만 응답:\n" f"트랙: {json.dumps(track, ensure_ascii=False)}\n" f"영상: {json.dumps(video_meta)}\n" f"메타: {json.dumps(metadata, ensure_ascii=False)}\n" f"썸네일: {thumbnail_url}\n" f"트렌드: {trend_top}\n" '출력: {"metadata_quality":{"score":0-100,"notes":""},' '"policy_compliance":{"score":0-100,"issues":[]},' '"viewer_experience":{"score":0-100,"notes":""},' '"trend_alignment":{"score":0-100,"matched_keywords":[]},' '"summary":""}' ) async with httpx.AsyncClient(timeout=TIMEOUT_S) as client: resp = await client.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", }, json={"model": CLAUDE_MODEL, "max_tokens": 1024, "messages": [{"role": "user", "content": user}]}, ) resp.raise_for_status() text = resp.json()["content"][0]["text"] return json.loads(text[text.find("{"):text.rfind("}")+1]) def _heuristic(metadata, video_meta, track, trend_top, weights, threshold, fallback_reason): # 메타: 길이·태그 카운트 title_len = len(metadata.get("title", "")) desc_len = len(metadata.get("description", "")) tag_n = len(metadata.get("tags", [])) meta_score = 100 if 5 <= title_len <= 60 and 50 <= desc_len <= 1000 and 5 <= tag_n <= 15 else 50 # 정책: 금칙어 매치 text_blob = (metadata.get("title", "") + metadata.get("description", "")).lower() policy_score = 100 if not any(w in text_blob for w in POLICY_BANNED) else 30 # 시청: 영상 길이가 트랙과 큰 차이 없는지 휴리스틱(±5초) expected = track.get("duration_sec", video_meta.get("length_sec", 0)) delta = abs(video_meta.get("length_sec", 0) - expected) viewer_score = 90 if delta <= 5 else 60 # 트렌드: 태그가 트렌드와 겹치는지 overlap = set(metadata.get("tags", [])) & set(trend_top) trend_score = 100 if overlap else 40 scores = { "metadata_quality": {"score": meta_score, "notes": "휴리스틱"}, "policy_compliance": {"score": policy_score, "issues": []}, "viewer_experience": {"score": viewer_score, "notes": "휴리스틱"}, "trend_alignment": {"score": trend_score, "matched_keywords": list(overlap)}, "summary": f"휴리스틱 fallback: {fallback_reason}", } return _weighted_verdict(scores, weights, threshold, used_fallback=True) ``` - [ ] **Step 4: Run tests** Run: `python -m pytest tests/test_review.py -v` Expected: 3 PASS - [ ] **Step 5: Commit** ```bash git add music-lab/app/pipeline/review.py music-lab/tests/test_review.py git commit -m "feat(music-lab): pipeline 4축 AI 검토 + 휴리스틱 폴백" ``` --- ## Task 7: YouTube OAuth + Upload **Files:** - Create: `music-lab/app/pipeline/youtube.py` - Test: `music-lab/tests/test_youtube_upload.py` - Modify: `music-lab/requirements.txt` (`google-api-python-client>=2.100`, `google-auth-oauthlib>=1.2`) - [ ] **Step 1: Add deps** `requirements.txt`: ``` google-api-python-client>=2.100 google-auth-oauthlib>=1.2 google-auth-httplib2>=0.2 ``` - [ ] **Step 2: Write failing test** ```python # tests/test_youtube_upload.py import pytest from unittest.mock import patch, MagicMock from app.pipeline import youtube def _setup_token(monkeypatch, db, refresh="r1", access="a1"): db.upsert_oauth_token( channel_id="UC1", channel_title="t", avatar_url=None, refresh_token=refresh, access_token=access, expires_at="2099-01-01T00:00:00", ) @pytest.fixture def fresh_db(monkeypatch, tmp_path): from app import db monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db")) db.init_db() return db @patch("app.pipeline.youtube._build_youtube_client") def test_upload_succeeds_after_resumable(mock_client, fresh_db, tmp_path, monkeypatch): monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid") monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec") _setup_token(monkeypatch, fresh_db) yt = MagicMock() insert = MagicMock() insert.next_chunk.side_effect = [(None, None), (None, {"id": "VID123"})] yt.videos().insert.return_value = insert mock_client.return_value = yt video_path = tmp_path / "v.mp4" video_path.write_bytes(b"\x00") out = youtube.upload_video( video_path=str(video_path), thumbnail_path=None, metadata={"title": "T", "description": "D", "tags": ["x"], "category_id": 10}, privacy="private", ) assert out["video_id"] == "VID123" @patch("app.pipeline.youtube._build_youtube_client") def test_upload_no_token_raises(mock_client, fresh_db, tmp_path): video_path = tmp_path / "v.mp4" video_path.write_bytes(b"\x00") with pytest.raises(youtube.NotAuthenticatedError): youtube.upload_video( video_path=str(video_path), thumbnail_path=None, metadata={"title":"T","description":"D","tags":[],"category_id":10}, privacy="private", ) @patch("app.pipeline.youtube._build_youtube_client") def test_upload_quota_exceeded_marks_quota(mock_client, fresh_db, tmp_path, monkeypatch): from googleapiclient.errors import HttpError monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid") monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec") _setup_token(monkeypatch, fresh_db) yt = MagicMock() err = HttpError(MagicMock(status=403), b'{"error":{"errors":[{"reason":"quotaExceeded"}]}}') yt.videos().insert.return_value.next_chunk.side_effect = err mock_client.return_value = yt video_path = tmp_path / "v.mp4" video_path.write_bytes(b"\x00") with pytest.raises(youtube.QuotaExceededError): youtube.upload_video( video_path=str(video_path), thumbnail_path=None, metadata={"title":"T","description":"D","tags":[],"category_id":10}, privacy="private", ) ``` - [ ] **Step 3: Run, verify fail** Run: `python -m pytest tests/test_youtube_upload.py -v` Expected: ImportError - [ ] **Step 4: Implement `youtube.py`** ```python """YouTube OAuth flow + resumable 업로드.""" import os import logging from urllib.parse import urlencode import httpx from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.errors import HttpError from app import db logger = logging.getLogger("music-lab.youtube") CLIENT_ID = os.getenv("YOUTUBE_OAUTH_CLIENT_ID", "") CLIENT_SECRET = os.getenv("YOUTUBE_OAUTH_CLIENT_SECRET", "") REDIRECT_URI = os.getenv("YOUTUBE_OAUTH_REDIRECT_URI", "") SCOPES = ["https://www.googleapis.com/auth/youtube.upload", "https://www.googleapis.com/auth/youtube.readonly"] class NotAuthenticatedError(Exception): pass class QuotaExceededError(Exception): pass def get_auth_url() -> str: if not CLIENT_ID or not REDIRECT_URI: raise RuntimeError("OAuth 환경변수 미설정") params = { "client_id": CLIENT_ID, "redirect_uri": REDIRECT_URI, "response_type": "code", "scope": " ".join(SCOPES), "access_type": "offline", "prompt": "consent", } return "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode(params) async def exchange_code(code: str) -> dict: """code → refresh_token + access_token + 채널 정보 → DB 저장.""" async with httpx.AsyncClient(timeout=30) as client: token_resp = await client.post( "https://oauth2.googleapis.com/token", data={ "code": code, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "redirect_uri": REDIRECT_URI, "grant_type": "authorization_code", }, ) token_resp.raise_for_status() tok = token_resp.json() access = tok["access_token"] refresh = tok["refresh_token"] expires_at = _expiry_from_seconds(tok["expires_in"]) # 채널 정보 조회 creds = _creds(access=access, refresh=refresh) yt = build("youtube", "v3", credentials=creds, cache_discovery=False) ch = yt.channels().list(part="snippet", mine=True).execute() item = ch["items"][0] db.upsert_oauth_token( channel_id=item["id"], channel_title=item["snippet"]["title"], avatar_url=item["snippet"]["thumbnails"]["default"]["url"], refresh_token=refresh, access_token=access, expires_at=expires_at, ) return {"channel_id": item["id"], "channel_title": item["snippet"]["title"]} def get_status() -> dict | None: tok = db.get_oauth_token() if not tok: return None return { "channel_id": tok["channel_id"], "channel_title": tok["channel_title"], "avatar_url": tok["avatar_url"], } def disconnect() -> None: db.delete_oauth_token() def upload_video(*, video_path: str, thumbnail_path: str | None, metadata: dict, privacy: str) -> dict: tok = db.get_oauth_token() if not tok: raise NotAuthenticatedError("YouTube 인증 없음") creds = _creds(access=tok["access_token"], refresh=tok["refresh_token"]) yt = _build_youtube_client(creds) body = { "snippet": { "title": metadata["title"], "description": metadata["description"], "tags": metadata.get("tags", []), "categoryId": str(metadata.get("category_id", 10)), }, "status": {"privacyStatus": privacy, "selfDeclaredMadeForKids": False}, } media = MediaFileUpload(video_path, chunksize=4 * 1024 * 1024, resumable=True, mimetype="video/mp4") req = yt.videos().insert(part="snippet,status", body=body, media_body=media) try: response = None while response is None: status, response = req.next_chunk() video_id = response["id"] except HttpError as e: if b"quotaExceeded" in e.content: raise QuotaExceededError(str(e)) raise if thumbnail_path: try: yt.thumbnails().set(videoId=video_id, media_body=thumbnail_path).execute() except HttpError as e: logger.warning("썸네일 업로드 실패: %s", e) return {"video_id": video_id} def _build_youtube_client(creds): # patch 포인트 return build("youtube", "v3", credentials=creds, cache_discovery=False) def _creds(access: str, refresh: str) -> Credentials: return Credentials( token=access, refresh_token=refresh, token_uri="https://oauth2.googleapis.com/token", client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES, ) def _expiry_from_seconds(secs: int) -> str: from datetime import datetime, timedelta return (datetime.utcnow() + timedelta(seconds=secs)).isoformat(timespec="seconds") ``` - [ ] **Step 5: Run tests** Run: `python -m pytest tests/test_youtube_upload.py -v` Expected: 3 PASS - [ ] **Step 6: Commit** ```bash git add music-lab/app/pipeline/youtube.py music-lab/tests/test_youtube_upload.py \ music-lab/requirements.txt git commit -m "feat(music-lab): YouTube OAuth + resumable 업로드" ``` --- ## Task 8: 오케스트레이터 + 13개 엔드포인트 **Files:** - Create: `music-lab/app/pipeline/orchestrator.py` - Modify: `music-lab/app/main.py` - Test: `music-lab/tests/test_pipeline_endpoints.py` 이 task는 코드량이 가장 많음. 엔드포인트 12개 + orchestrator의 BackgroundTask 분기. - [ ] **Step 1: Write failing test** ```python # tests/test_pipeline_endpoints.py import pytest from fastapi.testclient import TestClient from app.main import app from app import db @pytest.fixture def client(monkeypatch, tmp_path): monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db")) db.init_db() # 최소 트랙 1개 (라이브러리) from app.db import save_track save_track({"id": 1, "title": "T", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", "moods": [], "instruments": [], "audio_url": "/x.mp3", "duration_sec": 120}) return TestClient(app) def test_create_pipeline(client): r = client.post("/api/music/pipeline", json={"track_id": 1}) assert r.status_code == 201 assert r.json()["state"] == "created" def test_create_duplicate_pipeline_returns_409(client): client.post("/api/music/pipeline", json={"track_id": 1}) r = client.post("/api/music/pipeline", json={"track_id": 1}) assert r.status_code == 409 def test_get_pipeline_returns_jobs_and_feedback(client): pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] r = client.get(f"/api/music/pipeline/{pid}") assert "jobs" in r.json() assert "feedback" in r.json() def test_list_pipelines_active_filter(client): pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] db.update_pipeline_state(pid, "published") r = client.get("/api/music/pipeline?status=active") assert all(p["state"] != "published" for p in r.json()["pipelines"]) def test_feedback_approve_advances_state(client): pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] db.update_pipeline_state(pid, "cover_pending", cover_url="/m/x.jpg") r = client.post(f"/api/music/pipeline/{pid}/feedback", json={"step": "cover", "intent": "approve"}) assert r.status_code == 202 after = db.get_pipeline(pid) # video_pending이 BackgroundTask로 진입 후 결과는 mock 환경에서 즉시 안 보일 수 있음 # 최소 cover_approved 또는 video_pending 단계 확인 assert after["state"] in ("cover_approved", "video_pending", "video_running") def test_feedback_reject_records_feedback_and_increments_count(client): pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] db.update_pipeline_state(pid, "cover_pending") r = client.post(f"/api/music/pipeline/{pid}/feedback", json={"step": "cover", "intent": "reject", "feedback_text": "더 어둡게"}) assert r.status_code == 202 p = db.get_pipeline(pid) assert p["feedback_count_per_step"]["cover"] == 1 history = db.get_feedback_history(pid) assert history[0]["feedback_text"] == "더 어둡게" def test_feedback_after_5_rejects_marks_awaiting_manual(client): pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] db.update_pipeline_state(pid, "cover_pending") for i in range(5): client.post(f"/api/music/pipeline/{pid}/feedback", json={"step": "cover", "intent": "reject", "feedback_text": f"again {i}"}) r = client.post(f"/api/music/pipeline/{pid}/feedback", json={"step": "cover", "intent": "reject", "feedback_text": "6th"}) assert r.status_code == 409 assert db.get_pipeline(pid)["state"] == "awaiting_manual" def test_cancel_pipeline(client): pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] r = client.post(f"/api/music/pipeline/{pid}/cancel") assert r.status_code == 200 assert db.get_pipeline(pid)["state"] == "cancelled" ``` - [ ] **Step 2: Run, verify fail** Run: `python -m pytest tests/test_pipeline_endpoints.py -v` Expected: 404 또는 collection error - [ ] **Step 3: Implement `orchestrator.py`** ```python """파이프라인 오케스트레이터 — 단계별 BackgroundTask 등록 및 산출물 → DB 반영.""" import asyncio import json import logging from typing import Optional from app import db from . import cover, video, thumb, metadata, review, youtube logger = logging.getLogger("music-lab.orchestrator") REVIEW_WEIGHTS_DEFAULT = {"meta": 25, "policy": 30, "viewer": 25, "trend": 20} async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: """단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이. 호출 직후 _running 상태로 전환, 끝나면 _pending(사용자 게이트) 또는 자동 다음. 실패 시 failed 상태 + reason. """ job_id = db.create_pipeline_job(pipeline_id, step) db.update_pipeline_job(job_id, status="running") p = db.get_pipeline(pipeline_id) track = _get_track(p["track_id"]) try: if step == "cover": result = await _run_cover(p, track, feedback) elif step == "video": result = await _run_video(p, track) elif step == "thumb": result = await _run_thumb(p, track, feedback) elif step == "meta": result = await _run_meta(p, track, feedback) elif step == "review": result = await _run_review(p, track) elif step == "publish": result = await _run_publish(p, track) else: raise ValueError(f"unknown step: {step}") db.update_pipeline_job(job_id, status="succeeded") db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {})) except Exception as e: logger.exception("step %s failed for pipeline %s", step, pipeline_id) db.update_pipeline_job(job_id, status="failed", error=str(e)) db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") def _get_track(track_id: int) -> dict: # tracks 테이블 스키마는 기존 music-lab — 여기서는 dict로 추정 t = db.get_track(track_id) if not t: raise ValueError(f"트랙 {track_id} 없음") return t async def _run_cover(p, track, feedback): setup = db.get_youtube_setup() prompts = setup["cover_prompts"] template = prompts.get(track["genre"].lower(), prompts.get("default", "")) out = await cover.generate( pipeline_id=p["id"], genre=track["genre"], prompt_template=template, mood=", ".join(track.get("moods", [])), track_title=track["title"], feedback=feedback, ) return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}} async def _run_video(p, track): setup = db.get_youtube_setup() vd = setup["visual_defaults"] audio_path = _local_path(track["audio_url"]) # /media/... → /data/... cover_path = _local_path(p["cover_url"]) out = video.generate( pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path, genre=track["genre"], duration_sec=track.get("duration_sec", 120), resolution=vd["resolution"], style=vd["style"], ) return {"next_state": "video_pending", "fields": {"video_url": out["url"]}} async def _run_thumb(p, track, feedback): video_path = _local_path(p["video_url"]) out = thumb.generate(pipeline_id=p["id"], video_path=video_path, track_title=track["title"], overlay_text=True) return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}} async def _run_meta(p, track, feedback): setup = db.get_youtube_setup() trend_top = _get_trend_top() out = await metadata.generate( track=track, template=setup["metadata_template"], trend_keywords=trend_top, feedback=feedback, ) return {"next_state": "meta_pending", "fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}} async def _run_review(p, track): setup = db.get_youtube_setup() meta = json.loads(p["metadata_json"]) result = await review.run_4_axis( pipeline=p, track=track, video_meta={"length_sec": track.get("duration_sec", 120), "resolution": setup["visual_defaults"]["resolution"]}, metadata=meta, thumbnail_url=p["thumbnail_url"], trend_top=_get_trend_top(), weights=setup["review_weights"], threshold=setup["review_threshold"], ) return {"next_state": "publish_pending", "fields": {"review_json": json.dumps(result, ensure_ascii=False)}} async def _run_publish(p, track): setup = db.get_youtube_setup() meta = json.loads(p["metadata_json"]) privacy = setup["publish_policy"].get("privacy", "private") result = youtube.upload_video( video_path=_local_path(p["video_url"]), thumbnail_path=_local_path(p["thumbnail_url"]), metadata=meta, privacy=privacy, ) return {"next_state": "published", "fields": {"youtube_video_id": result["video_id"]}} def _local_path(media_url: str) -> str: """ /media/videos/123/cover.jpg → /app/data/videos/123/cover.jpg """ import os base_media = os.getenv("VIDEO_MEDIA_BASE", "/media/videos") base_data = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") if media_url.startswith(base_media): return media_url.replace(base_media, base_data, 1) # /media/music/abc.mp3 → /app/data/music/abc.mp3 return media_url.replace("/media/", "/app/data/", 1) def _get_trend_top(n: int = 10) -> list[str]: try: rows = db.get_market_trends(limit=n) # 기존 market_trends 테이블 return [r["genre"] for r in rows] except Exception: return [] ``` - [ ] **Step 4: Add endpoints to `main.py`** `app/main.py`에 다음 추가 (기존 endpoints 아래): ```python from fastapi import BackgroundTasks from pydantic import BaseModel from app.pipeline import orchestrator, youtube as yt_module from app.pipeline.state_machine import ( next_state_on_approve, next_state_on_reject, USER_GATES, ) class PipelineCreate(BaseModel): track_id: int class FeedbackRequest(BaseModel): step: str intent: str # approve | reject feedback_text: str | None = None @app.post("/api/music/pipeline", status_code=201) def create_pipeline(req: PipelineCreate): # 동일 트랙 활성 파이프라인 중복 방지 actives = db.list_pipelines(active_only=True) if any(p["track_id"] == req.track_id for p in actives): raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다") pid = db.create_pipeline(req.track_id) return db.get_pipeline(pid) @app.get("/api/music/pipeline") def list_pipelines_endpoint(status: str = "all"): pipelines = db.list_pipelines(active_only=(status == "active")) return {"pipelines": pipelines} @app.get("/api/music/pipeline/{pid}") def get_pipeline_endpoint(pid: int): p = db.get_pipeline(pid) if not p: raise HTTPException(404) p["jobs"] = db.list_pipeline_jobs(pid) p["feedback"] = db.get_feedback_history(pid) return p @app.post("/api/music/pipeline/{pid}/start", status_code=202) async def start_pipeline(pid: int, bg: BackgroundTasks): p = db.get_pipeline(pid) if not p: raise HTTPException(404) if p["state"] != "created": raise HTTPException(409, f"이미 시작됨 ({p['state']})") bg.add_task(orchestrator.run_step, pid, "cover") return {"ok": True} @app.post("/api/music/pipeline/{pid}/feedback", status_code=202) async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks): p = db.get_pipeline(pid) if not p: raise HTTPException(404) if p["state"] == "awaiting_manual": raise HTTPException(409, "수동 개입 대기 중") state = p["state"] expected = f"{req.step}_pending" if state != expected: # 멱등 처리 — 이미 다음 단계로 넘어갔으면 무시 return {"ok": True, "skipped": True} if req.intent == "approve": next_st = next_state_on_approve(state) db.update_pipeline_state(pid, next_st) # 다음 단계가 *_pending이면 자동 생성 트리거; ai_review/publishing도 자동 트리거 next_step = _state_to_step(next_st) if next_step: bg.add_task(orchestrator.run_step, pid, next_step) return {"ok": True} elif req.intent == "reject": count = db.increment_feedback_count(pid, req.step) if count > 5: db.update_pipeline_state(pid, "awaiting_manual") raise HTTPException(409, "재생성 한도 초과") if req.feedback_text: db.record_feedback(pid, req.step, req.feedback_text) bg.add_task(orchestrator.run_step, pid, req.step, req.feedback_text or "") return {"ok": True} else: raise HTTPException(400, f"unknown intent: {req.intent}") def _state_to_step(state: str) -> str | None: return { "video_pending": "video", "thumb_pending": "thumb", "meta_pending": "meta", "ai_review": "review", "publish_pending": None, # 사용자 명시 발행 호출 필요 "publishing": "publish", }.get(state) @app.post("/api/music/pipeline/{pid}/cancel") def cancel_pipeline(pid: int): p = db.get_pipeline(pid) if not p: raise HTTPException(404) db.update_pipeline_state(pid, "cancelled") return {"ok": True} @app.post("/api/music/pipeline/{pid}/publish", status_code=202) async def publish_pipeline(pid: int, bg: BackgroundTasks): p = db.get_pipeline(pid) if not p: raise HTTPException(404) if p["state"] != "publish_pending": raise HTTPException(409, f"발행 단계 아님 ({p['state']})") db.update_pipeline_state(pid, "publishing") bg.add_task(orchestrator.run_step, pid, "publish") return {"ok": True} # --- Setup --- class SetupRequest(BaseModel): metadata_template: dict | None = None cover_prompts: dict | None = None review_weights: dict | None = None review_threshold: int | None = None visual_defaults: dict | None = None publish_policy: dict | None = None @app.get("/api/music/setup") def get_setup(): return db.get_youtube_setup() @app.put("/api/music/setup") def put_setup(req: SetupRequest): db.update_youtube_setup(**{k: v for k, v in req.dict().items() if v is not None}) return db.get_youtube_setup() # --- YouTube OAuth --- @app.get("/api/music/youtube/auth-url") def youtube_auth_url(): return {"url": yt_module.get_auth_url()} @app.get("/api/music/youtube/callback") async def youtube_callback(code: str): info = await yt_module.exchange_code(code) return info @app.post("/api/music/youtube/disconnect") def youtube_disconnect(): yt_module.disconnect() return {"ok": True} @app.get("/api/music/youtube/status") def youtube_status(): return yt_module.get_status() or {"connected": False} ``` - [ ] **Step 5: Run tests** Run: `python -m pytest tests/test_pipeline_endpoints.py -v` Expected: 8 PASS (BackgroundTask는 TestClient에서 동기 실행 — 테스트는 mock orchestrator 또는 첫 단계만 검증) > 참고: BackgroundTasks가 실제 cover.generate를 호출하면 OPENAI_API_KEY 없이 그라데이션 폴백을 만든다. 테스트에서는 `monkeypatch.setattr(orchestrator, "run_step", AsyncMock())`로 우회해도 됨. 위 테스트는 상태 검증 위주라 OK. - [ ] **Step 6: Commit** ```bash git add music-lab/app/pipeline/orchestrator.py music-lab/app/main.py \ music-lab/tests/test_pipeline_endpoints.py git commit -m "feat(music-lab): pipeline 오케스트레이터 + 13개 엔드포인트" ``` --- ## Task 9: agent-office 자연어 의도 분류 **Files:** - Create: `agent-office/app/agents/classify_intent.py` - Test: `agent-office/tests/test_classify_intent.py` - [ ] **Step 1: Write failing test** ```python # tests/test_classify_intent.py import pytest import respx from httpx import Response from app.agents import classify_intent as ci def test_clear_approve_no_llm(monkeypatch): called = {"n": 0} monkeypatch.setattr(ci, "_llm_classify", lambda t: (called.update(n=called["n"]+1), ("unclear", None))[1]) assert ci.classify("승인") == ("approve", None) assert ci.classify("OK") == ("approve", None) assert ci.classify("진행") == ("approve", None) assert ci.classify("agree") == ("approve", None) assert called["n"] == 0 def test_clear_reject_only_no_llm(monkeypatch): monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None)) assert ci.classify("반려") == ("reject", None) assert ci.classify("거절") == ("reject", None) def test_reject_with_text_split(monkeypatch): monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None)) intent, fb = ci.classify("반려, 제목 짧게") assert intent == "reject" assert "제목 짧게" in fb @respx.mock def test_ambiguous_calls_llm(monkeypatch): monkeypatch.setenv("ANTHROPIC_API_KEY", "k") respx.post("https://api.anthropic.com/v1/messages").mock( return_value=Response(200, json={"content": [{"type": "text", "text": '{"intent":"reject","feedback":"좀 더 화려하게"}'}]}) ) intent, fb = ci.classify("음... 좀 더 화려한 분위기가 좋겠어") assert intent == "reject" assert "화려하게" in fb ``` - [ ] **Step 2: Run, verify fail** Run: `cd agent-office && python -m pytest tests/test_classify_intent.py -v` Expected: ImportError - [ ] **Step 3: Implement `classify_intent.py`** ```python """텔레그램 사용자 응답 자연어 분류 — 화이트리스트 우선, 모호 시 LLM.""" import os import json import logging import httpx logger = logging.getLogger("agent-office.classify_intent") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") CLAUDE_HAIKU = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001") APPROVE_WORDS = { "승인", "시작", "진행", "ok", "okay", "agree", "네", "예", "좋아", "좋아요", "go", "yes", "y", } REJECT_WORDS = {"반려", "거절", "취소", "no", "nope", "n"} def classify(text: str) -> tuple[str, str | None]: """returns (intent, feedback) — intent ∈ {approve, reject, unclear}""" if not text: return ("unclear", None) t = text.strip().lower() if t in APPROVE_WORDS: return ("approve", None) if t in REJECT_WORDS: return ("reject", None) # 반려 단어로 시작 + 추가 텍스트 for w in REJECT_WORDS: if t.startswith(w): rest = text.strip()[len(w):].lstrip(" ,.-:").strip() if rest: return ("reject", rest) # 승인 단어로 시작 + 추가 텍스트(추가 텍스트 무시) for w in APPROVE_WORDS: if t.startswith(w + " ") or t == w: return ("approve", None) return _llm_classify(text) def _llm_classify(text: str) -> tuple[str, str | None]: if not ANTHROPIC_API_KEY: return ("unclear", None) prompt = ( "사용자 응답을 분류하세요. JSON으로만 응답.\n" f'응답: "{text}"\n\n' '출력: {"intent":"approve|reject|unclear","feedback":"반려면 수정 방향, 아니면 빈 문자열"}' ) try: resp = httpx.post( "https://api.anthropic.com/v1/messages", headers={"x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01"}, json={"model": CLAUDE_HAIKU, "max_tokens": 200, "messages": [{"role": "user", "content": prompt}]}, timeout=15, ) resp.raise_for_status() text_out = resp.json()["content"][0]["text"] data = json.loads(text_out[text_out.find("{"):text_out.rfind("}")+1]) return (data.get("intent", "unclear"), data.get("feedback") or None) except Exception as e: logger.warning("LLM 분류 실패: %s", e) return ("unclear", None) ``` - [ ] **Step 4: Run tests** Run: `python -m pytest tests/test_classify_intent.py -v` Expected: 4 PASS - [ ] **Step 5: Commit** ```bash git add agent-office/app/agents/classify_intent.py agent-office/tests/test_classify_intent.py git commit -m "feat(agent-office): 텔레그램 자연어 의도 분류" ``` --- ## Task 10: youtube_publisher 에이전트 + 폴링 잡 **Files:** - Create: `agent-office/app/agents/youtube_publisher.py` - Modify: `agent-office/app/agents/__init__.py` - Modify: `agent-office/app/scheduler.py` - Modify: `agent-office/app/service_proxy.py` - Modify: `agent-office/app/telegram/conversational.py` - Test: `agent-office/tests/test_pipeline_polling.py` - [ ] **Step 1: Add service_proxy helpers** `service_proxy.py`에 추가: ```python async def list_active_pipelines() -> list[dict]: async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=active") resp.raise_for_status() return resp.json()["pipelines"] async def get_pipeline(pid: int) -> dict: async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}") resp.raise_for_status() return resp.json() async def post_pipeline_feedback(pid: int, step: str, intent: str, feedback_text: str | None = None) -> dict: async with httpx.AsyncClient(timeout=15) as client: resp = await client.post( f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/feedback", json={"step": step, "intent": intent, "feedback_text": feedback_text}, ) resp.raise_for_status() return resp.json() ``` - [ ] **Step 2: Implement youtube_publisher** ```python # agents/youtube_publisher.py """텔레그램 단일 채널로 단계별 승인 인터랙션 오케스트레이션.""" import logging from typing import Optional from .base import BaseAgent from . import classify_intent from .. import service_proxy from ..db import add_log from ..telegram.messaging import send_raw logger = logging.getLogger("agent-office.youtube_publisher") _STEP_TITLES = { "cover_pending": ("커버 아트", "cover"), "video_pending": ("영상 비주얼", "video"), "thumb_pending": ("썸네일", "thumb"), "meta_pending": ("메타데이터", "meta"), "publish_pending": ("최종 검토 + 발행", "publish"), } class YoutubePublisherAgent(BaseAgent): agent_id = "youtube_publisher" display_name = "YouTube 퍼블리셔" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._notified_state_per_pipeline: dict[int, str] = {} async def poll_state_changes(self) -> None: """30초마다 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" try: pipelines = await service_proxy.list_active_pipelines() except Exception as e: logger.warning("폴링 실패: %s", e) return for p in pipelines: state = p["state"] pid = p["id"] if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state: await self._notify_step(p) self._notified_state_per_pipeline[pid] = state async def _notify_step(self, pipeline: dict) -> None: state = pipeline["state"] title_name, step = _STEP_TITLES[state] body = self._format_body(pipeline, step) sent = await send_raw( text=f"🎵 [{pipeline.get('track_title', 'Pipeline')}] {title_name} 검토\n\n{body}\n\n" f"➡️ 답장으로 알려주세요: '승인' 또는 '반려 + 수정 방향'", metadata={"pipeline_id": pipeline["id"], "step": step}, ) if sent.get("ok"): add_log(self.agent_id, f"pipeline {pipeline['id']} {step} 알림 전송", "info") def _format_body(self, p: dict, step: str) -> str: if step == "cover": return f"🖼️ 커버: {p.get('cover_url', '-')}" if step == "video": return f"🎬 영상: {p.get('video_url', '-')}" if step == "thumb": return f"🎴 썸네일: {p.get('thumbnail_url', '-')}" if step == "meta": m = p.get("metadata", {}) return (f"📝 제목: {m.get('title','')}\n" f"🏷️ 태그: {', '.join(m.get('tags', [])[:8])}\n" f"📄 설명(앞부분): {(m.get('description','') or '')[:200]}") if step == "publish": r = p.get("review", {}) return (f"AI 검토 결과: {r.get('verdict','?')} " f"(가중 {r.get('weighted_total','?')}/100)\n" f"{r.get('summary','')}") return "" async def on_telegram_reply(self, pipeline_id: int, step: str, user_text: str) -> None: intent, feedback = classify_intent.classify(user_text) if intent == "unclear": await send_raw("다시 입력해주세요. 예: '승인' 또는 '반려, 제목 짧게'") return try: await service_proxy.post_pipeline_feedback(pipeline_id, step, intent, feedback) except Exception as e: await send_raw(f"⚠️ 처리 실패: {e}") async def on_schedule(self) -> None: # 폴링은 스케줄러에서 호출 await self.poll_state_changes() async def on_command(self, command: str, params: dict) -> dict: return {"ok": False, "message": f"Unknown command: {command}"} async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: pass ``` - [ ] **Step 3: Register agent + scheduler** `agents/__init__.py`에 등록: ```python from .youtube_publisher import YoutubePublisherAgent AGENT_REGISTRY = { # ... existing ... "youtube_publisher": YoutubePublisherAgent(...), } ``` `scheduler.py`에 추가: ```python async def _poll_pipelines(): agent = AGENT_REGISTRY.get("youtube_publisher") if agent: await agent.poll_state_changes() def init_scheduler(): # ... existing jobs ... scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll") scheduler.start() ``` - [ ] **Step 4: Telegram reply routing** `telegram/conversational.py`에서 reply 메시지 수신 시: ```python # 기존 핸들러에 추가 async def handle_reply(message: dict) -> None: reply_to = message.get("reply_to_message", {}) msg_id = reply_to.get("message_id") if not msg_id: return # DB의 last_telegram_msg_ids에서 pipeline_id, step 찾기 # (이는 messaging.send_raw가 metadata에 저장해두는 게 깔끔하지만, # 현재 스펙은 단순하게 매칭 — 또는 DB 별도 테이블 telegram_message_links 추가) from .. import service_proxy from ..agents import AGENT_REGISTRY # 단순 구현: pipeline_id/step을 메시지 캡션 또는 DB에서 조회 link = _lookup_message_link(msg_id) if not link: return agent = AGENT_REGISTRY.get("youtube_publisher") await agent.on_telegram_reply(link["pipeline_id"], link["step"], message.get("text", "")) ``` > **Note**: `_lookup_message_link`는 별도 테이블(`telegram_message_links`) 또는 `video_pipelines.last_telegram_msg_ids` JSON 사용. 본 task에선 후자 사용 — 추가 마이그레이션 없이 기존 컬럼 활용. `messaging.send_raw` 호출 후 반환된 message_id를 music-lab에 저장하기 위해 service_proxy에 헬퍼 추가 후 youtube_publisher에서 `_notify_step` 끝에 호출. ```python # service_proxy.py async def save_pipeline_telegram_msg(pid: int, step: str, msg_id: int) -> None: async with httpx.AsyncClient(timeout=10) as client: await client.patch( f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/telegram-msg", json={"step": step, "message_id": msg_id}, ) ``` music-lab `main.py`에 endpoint 추가: ```python class TelegramMsgPatch(BaseModel): step: str message_id: int @app.patch("/api/music/pipeline/{pid}/telegram-msg") def save_telegram_msg(pid: int, req: TelegramMsgPatch): p = db.get_pipeline(pid) if not p: raise HTTPException(404) ids = p["last_telegram_msg_ids"] ids[req.step] = req.message_id conn = sqlite3.connect(db.DB_PATH) conn.execute("UPDATE video_pipelines SET last_telegram_msg_ids = ?, updated_at = ? WHERE id = ?", (json.dumps(ids), db._now(), pid)) conn.commit(); conn.close() return {"ok": True} ``` reply 매칭용 `lookup` endpoint: ```python @app.get("/api/music/pipeline/lookup-by-msg/{msg_id}") def lookup_by_msg(msg_id: int): # DB 전체 스캔 (소수의 active 파이프라인만 — 성능 OK) for p in db.list_pipelines(active_only=True): for step, mid in p["last_telegram_msg_ids"].items(): if mid == msg_id: return {"pipeline_id": p["id"], "step": step} raise HTTPException(404) ``` agent-office `_lookup_message_link` 구현: ```python def _lookup_message_link(msg_id: int) -> Optional[dict]: import httpx from ..config import MUSIC_LAB_URL try: resp = httpx.get(f"{MUSIC_LAB_URL}/api/music/pipeline/lookup-by-msg/{msg_id}", timeout=5) if resp.status_code == 200: return resp.json() except Exception: pass return None ``` - [ ] **Step 5: Polling test** ```python # tests/test_pipeline_polling.py import pytest from unittest.mock import AsyncMock, patch from app.agents.youtube_publisher import YoutubePublisherAgent @pytest.mark.asyncio @patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[{"id": 1, "state": "cover_pending", "cover_url": "/x.jpg"}])) @patch("app.agents.youtube_publisher.send_raw", new=AsyncMock(return_value={"ok": True, "message_id": 99})) async def test_poll_notifies_once_per_state(monkeypatch): a = YoutubePublisherAgent(ws_manager=None) await a.poll_state_changes() await a.poll_state_changes() # 같은 상태 — 두 번째는 알림 안 함 from app.agents.youtube_publisher import send_raw as sr assert sr.call_count == 1 ``` - [ ] **Step 6: Run tests** Run: `python -m pytest tests/test_pipeline_polling.py -v` Expected: PASS - [ ] **Step 7: Commit** ```bash git add agent-office/app/agents/youtube_publisher.py \ agent-office/app/agents/__init__.py \ agent-office/app/scheduler.py \ agent-office/app/service_proxy.py \ agent-office/app/telegram/conversational.py \ agent-office/tests/test_pipeline_polling.py \ music-lab/app/main.py git commit -m "feat(agent-office): youtube_publisher 에이전트 + 30s 폴링" ``` --- ## Task 11: 프론트엔드 — api.js 헬퍼 **Files:** - Modify: `web-ui/src/api.js` - [ ] **Step 1: Add helpers** `src/api.js` 끝에 추가: ```javascript // --- Music Pipeline --- export const listPipelines = (status='all') => apiGet(`/api/music/pipeline?status=${status}`); export const getPipeline = (id) => apiGet(`/api/music/pipeline/${id}`); export const createPipeline = (track_id) => apiPost('/api/music/pipeline', { track_id }); export const startPipeline = (id) => apiPost(`/api/music/pipeline/${id}/start`); export const cancelPipeline = (id) => apiPost(`/api/music/pipeline/${id}/cancel`); export const publishPipeline = (id) => apiPost(`/api/music/pipeline/${id}/publish`); // --- Music Setup --- export const getMusicSetup = () => apiGet('/api/music/setup'); export const updateMusicSetup = (payload) => apiPut('/api/music/setup', payload); // --- YouTube OAuth --- export const getYoutubeAuthUrl = () => apiGet('/api/music/youtube/auth-url'); export const getYoutubeStatus = () => apiGet('/api/music/youtube/status'); export const disconnectYoutube = () => apiPost('/api/music/youtube/disconnect'); ``` - [ ] **Step 2: Commit** ```bash git -C web-ui add src/api.js git -C web-ui commit -m "feat(web-ui): pipeline/setup/youtube API 헬퍼" ``` --- ## Task 12: 프론트엔드 — SetupTab **Files:** - Create: `web-ui/src/pages/music/components/SetupTab.jsx` - Modify: `web-ui/src/pages/music/MusicStudio.css` (스타일) - [ ] **Step 1: Implement SetupTab** ```jsx // SetupTab.jsx import { useEffect, useState } from 'react'; import { getMusicSetup, updateMusicSetup, getYoutubeAuthUrl, getYoutubeStatus, disconnectYoutube, } from '../../../api'; export default function SetupTab() { const [setup, setSetup] = useState(null); const [yt, setYt] = useState(null); const [saving, setSaving] = useState(false); const [error, setError] = useState(''); useEffect(() => { Promise.all([getMusicSetup(), getYoutubeStatus()]) .then(([s, y]) => { setSetup(s); setYt(y); }) .catch(e => setError(String(e))); }, []); if (!setup) return

Loading…

; const save = async (patch) => { setSaving(true); try { const next = await updateMusicSetup(patch); setSetup(next); } catch (e) { setError(String(e)); } finally { setSaving(false); } }; const connectYoutube = async () => { const { url } = await getYoutubeAuthUrl(); window.location.href = url; // OAuth flow }; return (
{error &&
{error}
}

YouTube 채널 연동

{yt && yt.channel_id ? (
{yt.channel_title}
) : ( )}

메타데이터 템플릿