diff --git a/docs/superpowers/plans/2026-05-07-music-youtube-pipeline.md b/docs/superpowers/plans/2026-05-07-music-youtube-pipeline.md new file mode 100644 index 0000000..fda0b90 --- /dev/null +++ b/docs/superpowers/plans/2026-05-07-music-youtube-pipeline.md @@ -0,0 +1,3325 @@ +# Music YouTube Pipeline Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 트랙 → 영상 → YouTube 발행까지 단계별 텔레그램 승인 파이프라인 구축. 각 단계 자동 산출물 생성 → 텔레그램 알림 → 자연어 응답으로 승인/반려/피드백 처리 → 다음 단계 또는 재생성. + +**Architecture:** music-lab(생성+상태머신+OAuth+업로드) ↔ agent-office(텔레그램 단일 채널 + 자연어 분류 + 폴링 오케스트레이션) ↔ web-ui(구성·진행 탭 + Library 트리거). 모든 AI/생성 작업은 BackgroundTask + DB job 상태 폴링. + +**Tech Stack:** +- music-lab: FastAPI, SQLite, Anthropic Claude SDK, OpenAI SDK, google-api-python-client, google-auth-oauthlib, FFmpeg, Pillow +- agent-office: FastAPI, APScheduler, python-telegram (기존), Anthropic SDK +- web-ui: React 18, Vite, fetch-based API helpers (기존 `src/api.js`) +- Tests: pytest + httpx 모킹 (`respx`/`httpx_mock`), freezegun, 기존 conftest 패턴 + +**Spec:** `docs/superpowers/specs/2026-05-07-music-youtube-pipeline-design.md` + +--- + +## File Structure + +### music-lab (`web-backend/music-lab/`) + +| 경로 | 책임 | +|------|------| +| `app/db.py` (modify) | 5개 신규 테이블 + 헬퍼 함수 | +| `app/pipeline/__init__.py` (new) | 패키지 초기화 | +| `app/pipeline/state_machine.py` (new) | 상태 전이 검증, `transition()` 함수 | +| `app/pipeline/orchestrator.py` (new) | `start_step(pipeline_id, step)` BackgroundTask 등록 | +| `app/pipeline/cover.py` (new) | DALL·E 3 호출 + 그라데이션 폴백 | +| `app/pipeline/video.py` (move/rename from video_producer.py) | FFmpeg visualizer/슬라이드쇼 (커버 입력 지원) | +| `app/pipeline/thumb.py` (new) | 썸네일 추출 + 텍스트 오버레이 | +| `app/pipeline/metadata.py` (new) | Claude Haiku 메타 생성 + 템플릿 치환 | +| `app/pipeline/review.py` (new) | Claude Sonnet 4축 검토 + 가중평균 | +| `app/pipeline/youtube.py` (new) | OAuth flow + resumable upload | +| `app/pipeline/storage.py` (new) | `/data/videos/{id}/` 디렉토리 관리 | +| `app/pipeline/setup.py` (new) | youtube_setup CRUD | +| `app/main.py` (modify) | 13개 엔드포인트 추가 | +| `tests/test_state_machine.py` (new) | 전이 검증 | +| `tests/test_pipeline_endpoints.py` (new) | CRUD + feedback | +| `tests/test_cover_generation.py` (new) | DALL·E mock + 폴백 | +| `tests/test_metadata_generation.py` (new) | Claude mock + 템플릿 | +| `tests/test_review.py` (new) | 4축 검토 + verdict | +| `tests/test_youtube_upload.py` (new) | google-api mock + retry | +| `requirements.txt` (modify) | openai, google-api-python-client, google-auth-oauthlib | + +### agent-office (`web-backend/agent-office/`) + +| 경로 | 책임 | +|------|------| +| `app/agents/youtube_publisher.py` (new) | 오케스트레이터 — poll + classify + feedback | +| `app/agents/__init__.py` (modify) | AGENT_REGISTRY 등록 | +| `app/scheduler.py` (modify) | `_poll_pipelines` 30초 잡 추가 | +| `app/telegram/conversational.py` (modify) | reply 매칭 → youtube_publisher 라우팅 | +| `app/service_proxy.py` (modify) | music-lab pipeline 헬퍼 | +| `tests/test_classify_intent.py` (new) | 화이트리스트/LLM 분기 | +| `tests/test_pipeline_polling.py` (new) | 멱등 폴링 | + +### web-ui (`web-ui/`) + +| 경로 | 책임 | +|------|------| +| `src/api.js` (modify) | pipeline/setup/youtube 헬퍼 | +| `src/pages/music/components/SetupTab.jsx` (new) | 구성 탭 | +| `src/pages/music/components/PipelineTab.jsx` (new) | 진행 탭 | +| `src/pages/music/components/PipelineCard.jsx` (new) | 카드 1장 (진행도/현재상태/피드백) | +| `src/pages/music/components/PipelineStartModal.jsx` (new) | Library 트랙 선택 모달 | +| `src/pages/music/components/YoutubeTab.jsx` (modify) | 서브탭 6개로 | +| `src/pages/music/components/Library.jsx` 또는 MusicStudio 라이브러리 부분 (modify) | "🎬 영상 파이프라인" 버튼 | +| `src/pages/music/MusicStudio.css` (modify) | 진행 탭/구성 탭 스타일 | + +### docker-compose / nginx + +| 경로 | 변경 | +|------|------| +| `docker-compose.yml` (modify) | music-lab env 4개 추가 | +| `nginx/conf.d/default.conf` (modify) | `/api/music/youtube/callback` 외부 노출 | +| `music-lab/Dockerfile` (modify) | 새 의존성 빌드 | + +--- + +## Task 1: music-lab DB 신규 테이블 + 헬퍼 + +**Files:** +- Modify: `music-lab/app/db.py` +- Test: `music-lab/tests/test_pipeline_db.py` + +- [ ] **Step 1: Write the failing test** `tests/test_pipeline_db.py` + +```python +import os +import tempfile +import pytest +from app import db + + +@pytest.fixture +def fresh_db(monkeypatch, tmp_path): + db_path = tmp_path / "music.db" + monkeypatch.setattr(db, "DB_PATH", str(db_path)) + db.init_db() + return db_path + + +def test_create_pipeline_inserts_row(fresh_db): + pid = db.create_pipeline(track_id=1) + row = db.get_pipeline(pid) + assert row["id"] == pid + assert row["state"] == "created" + assert row["track_id"] == 1 + assert row["feedback_count_per_step"] == {} + + +def test_update_pipeline_state_records_started_at(fresh_db, freezer): + pid = db.create_pipeline(track_id=1) + freezer.move_to("2026-05-07T08:00:00") + db.update_pipeline_state(pid, "cover_pending") + row = db.get_pipeline(pid) + assert row["state"] == "cover_pending" + assert row["state_started_at"] == "2026-05-07T08:00:00" + + +def test_increment_feedback_count(fresh_db): + pid = db.create_pipeline(track_id=1) + db.increment_feedback_count(pid, "cover") + db.increment_feedback_count(pid, "cover") + row = db.get_pipeline(pid) + assert row["feedback_count_per_step"] == {"cover": 2} + + +def test_record_feedback(fresh_db): + pid = db.create_pipeline(track_id=1) + db.record_feedback(pid, "cover", "더 어둡게") + rows = db.get_feedback_history(pid) + assert len(rows) == 1 + assert rows[0]["feedback_text"] == "더 어둡게" + + +def test_create_pipeline_job_lifecycle(fresh_db): + pid = db.create_pipeline(track_id=1) + job_id = db.create_pipeline_job(pid, "cover") + db.update_pipeline_job(job_id, status="running") + db.update_pipeline_job(job_id, status="succeeded", duration_ms=1234) + jobs = db.list_pipeline_jobs(pid) + assert jobs[0]["status"] == "succeeded" + assert jobs[0]["duration_ms"] == 1234 + + +def test_youtube_setup_default_row_created_on_init(fresh_db): + setup = db.get_youtube_setup() + assert setup["review_threshold"] == 60 + assert "metadata_template_json" in setup + + +def test_youtube_oauth_token_upsert(fresh_db): + db.upsert_oauth_token( + channel_id="UC123", + channel_title="My Channel", + avatar_url="https://...", + refresh_token="r1", + access_token="a1", + expires_at="2026-05-07T09:00:00", + ) + tok = db.get_oauth_token() + assert tok["channel_id"] == "UC123" + assert tok["refresh_token"] == "r1" + db.upsert_oauth_token( + channel_id="UC123", channel_title="My Channel", + avatar_url=None, refresh_token="r2", + access_token="a2", expires_at="2026-05-07T10:00:00", + ) + tok = db.get_oauth_token() + assert tok["refresh_token"] == "r2" # upsert +``` + +`requirements.txt`에 `freezegun` 추가 필요. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd music-lab && python -m pytest tests/test_pipeline_db.py -v` +Expected: ImportError on `db.create_pipeline` 등 (함수 미존재) + +- [ ] **Step 3: Add tables and helpers to `db.py`** + +`init_db()` 끝에 다음 5개 `CREATE TABLE IF NOT EXISTS` 추가: + +```python +def init_db(): + # ... existing tables ... + cursor.execute(""" + CREATE TABLE IF NOT EXISTS video_pipelines ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + track_id INTEGER NOT NULL, + state TEXT NOT NULL DEFAULT 'created', + state_started_at TEXT NOT NULL, + cover_url TEXT, + video_url TEXT, + thumbnail_url TEXT, + metadata_json TEXT, + review_json TEXT, + youtube_video_id TEXT, + feedback_count_per_step TEXT NOT NULL DEFAULT '{}', + last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + cancelled_at TEXT, + failed_reason TEXT + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS pipeline_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL, + step TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'queued', + error TEXT, + started_at TEXT, + finished_at TEXT, + duration_ms INTEGER, + FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS pipeline_feedback ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL, + step TEXT NOT NULL, + feedback_text TEXT NOT NULL, + received_at TEXT NOT NULL, + FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS youtube_oauth_tokens ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + channel_id TEXT NOT NULL, + channel_title TEXT, + avatar_url TEXT, + refresh_token TEXT NOT NULL, + access_token TEXT, + expires_at TEXT, + created_at TEXT NOT NULL + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS youtube_setup ( + id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1), + metadata_template_json TEXT NOT NULL, + cover_prompts_json TEXT NOT NULL, + review_weights_json TEXT NOT NULL, + review_threshold INTEGER NOT NULL DEFAULT 60, + visual_defaults_json TEXT NOT NULL, + publish_policy_json TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + + # 기본 setup 1행 보장 + cursor.execute("SELECT COUNT(*) FROM youtube_setup") + if cursor.fetchone()[0] == 0: + import json + from datetime import datetime + defaults = ( + json.dumps({ + "title": "[{genre}] {title} | {bpm}BPM", + "description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n", + "tags": ["lofi", "chill", "instrumental"], + "category_id": 10, + }), + json.dumps({ + "lo-fi": "moody anime cityscape at dusk, lofi aesthetic", + "phonk": "dark drift car aesthetic, neon, phonk vibe", + "ambient": "ethereal mountain landscape, ambient mood", + "default": "abstract music album cover art", + }), + json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}), + 60, + json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}), + json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}), + datetime.utcnow().isoformat(timespec="seconds"), + ) + cursor.execute(""" + INSERT INTO youtube_setup + (metadata_template_json, cover_prompts_json, review_weights_json, + review_threshold, visual_defaults_json, publish_policy_json, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, defaults) + + conn.commit() + conn.close() +``` + +다음 헬퍼 함수 추가 (파일 하단에): + +```python +import json as _json +from datetime import datetime as _dt + + +def _now() -> str: + return _dt.utcnow().isoformat(timespec="seconds") + + +def create_pipeline(track_id: int) -> int: + conn = sqlite3.connect(DB_PATH) + cur = conn.cursor() + now = _now() + cur.execute(""" + INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at) + VALUES (?, 'created', ?, ?, ?) + """, (track_id, now, now, now)) + pid = cur.lastrowid + conn.commit() + conn.close() + return pid + + +def get_pipeline(pid: int) -> dict | None: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT * FROM video_pipelines WHERE id = ?", (pid,)).fetchone() + conn.close() + if not row: + return None + d = dict(row) + d["feedback_count_per_step"] = _json.loads(d["feedback_count_per_step"] or "{}") + d["last_telegram_msg_ids"] = _json.loads(d["last_telegram_msg_ids"] or "{}") + if d.get("metadata_json"): + d["metadata"] = _json.loads(d["metadata_json"]) + if d.get("review_json"): + d["review"] = _json.loads(d["review_json"]) + return d + + +def update_pipeline_state(pid: int, state: str, **fields) -> None: + cols = ["state = ?", "state_started_at = ?", "updated_at = ?"] + vals = [state, _now(), _now()] + for k, v in fields.items(): + cols.append(f"{k} = ?") + vals.append(v) + vals.append(pid) + conn = sqlite3.connect(DB_PATH) + conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals) + conn.commit() + conn.close() + + +def list_pipelines(active_only: bool = False) -> list[dict]: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + if active_only: + rows = conn.execute(""" + SELECT * FROM video_pipelines + WHERE state NOT IN ('published','cancelled','failed','awaiting_manual') + ORDER BY created_at DESC + """).fetchall() + else: + rows = conn.execute("SELECT * FROM video_pipelines ORDER BY created_at DESC").fetchall() + conn.close() + return [get_pipeline(r["id"]) for r in rows] + + +def increment_feedback_count(pid: int, step: str) -> int: + p = get_pipeline(pid) + counts = p["feedback_count_per_step"] + counts[step] = counts.get(step, 0) + 1 + conn = sqlite3.connect(DB_PATH) + conn.execute("UPDATE video_pipelines SET feedback_count_per_step = ?, updated_at = ? WHERE id = ?", + (_json.dumps(counts), _now(), pid)) + conn.commit() + conn.close() + return counts[step] + + +def record_feedback(pid: int, step: str, feedback_text: str) -> None: + conn = sqlite3.connect(DB_PATH) + conn.execute(""" + INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at) + VALUES (?, ?, ?, ?) + """, (pid, step, feedback_text, _now())) + conn.commit() + conn.close() + + +def get_feedback_history(pid: int) -> list[dict]: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + rows = conn.execute(""" + SELECT * FROM pipeline_feedback + WHERE pipeline_id = ? ORDER BY id DESC + """, (pid,)).fetchall() + conn.close() + return [dict(r) for r in rows] + + +def create_pipeline_job(pid: int, step: str) -> int: + conn = sqlite3.connect(DB_PATH) + cur = conn.cursor() + cur.execute(""" + INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at) + VALUES (?, ?, 'queued', ?) + """, (pid, step, _now())) + job_id = cur.lastrowid + conn.commit() + conn.close() + return job_id + + +def update_pipeline_job(job_id: int, **fields) -> None: + if "status" in fields and fields["status"] in ("succeeded", "failed"): + fields["finished_at"] = _now() + cols = ", ".join(f"{k} = ?" for k in fields) + vals = list(fields.values()) + [job_id] + conn = sqlite3.connect(DB_PATH) + conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals) + conn.commit() + conn.close() + + +def list_pipeline_jobs(pid: int) -> list[dict]: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + rows = conn.execute(""" + SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC + """, (pid,)).fetchall() + conn.close() + return [dict(r) for r in rows] + + +def get_youtube_setup() -> dict: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone() + conn.close() + d = dict(row) + for k in ("metadata_template_json", "cover_prompts_json", + "review_weights_json", "visual_defaults_json", "publish_policy_json"): + d[k.replace("_json", "")] = _json.loads(d[k]) + return d + + +def update_youtube_setup(**kwargs) -> None: + field_map = { + "metadata_template": "metadata_template_json", + "cover_prompts": "cover_prompts_json", + "review_weights": "review_weights_json", + "visual_defaults": "visual_defaults_json", + "publish_policy": "publish_policy_json", + } + cols = [] + vals = [] + for k, v in kwargs.items(): + if k in field_map: + cols.append(f"{field_map[k]} = ?") + vals.append(_json.dumps(v)) + elif k == "review_threshold": + cols.append("review_threshold = ?") + vals.append(int(v)) + if not cols: + return + cols.append("updated_at = ?") + vals.append(_now()) + conn = sqlite3.connect(DB_PATH) + conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals) + conn.commit() + conn.close() + + +def upsert_oauth_token(channel_id: str, channel_title: str | None, + avatar_url: str | None, refresh_token: str, + access_token: str | None, expires_at: str | None) -> None: + conn = sqlite3.connect(DB_PATH) + cur = conn.cursor() + cur.execute("DELETE FROM youtube_oauth_tokens") + cur.execute(""" + INSERT INTO youtube_oauth_tokens + (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now())) + conn.commit() + conn.close() + + +def get_oauth_token() -> dict | None: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone() + conn.close() + return dict(row) if row else None + + +def delete_oauth_token() -> None: + conn = sqlite3.connect(DB_PATH) + conn.execute("DELETE FROM youtube_oauth_tokens") + conn.commit() + conn.close() +``` + +- [ ] **Step 4: Run tests to verify pass** + +Run: `cd music-lab && python -m pytest tests/test_pipeline_db.py -v` +Expected: 모두 PASS (7 tests) + +- [ ] **Step 5: Commit** + +```bash +git add music-lab/app/db.py music-lab/tests/test_pipeline_db.py music-lab/requirements.txt +git commit -m "feat(music-lab): pipeline 5개 DB 테이블 + 헬퍼" +``` + +--- + +## Task 2: 상태 머신 + +**Files:** +- Create: `music-lab/app/pipeline/__init__.py` +- Create: `music-lab/app/pipeline/state_machine.py` +- Test: `music-lab/tests/test_state_machine.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_state_machine.py +import pytest +from app.pipeline.state_machine import ( + next_state_on_approve, next_state_on_reject, can_transition, STEPS, USER_GATES, +) + + +def test_steps_sequence(): + assert STEPS == ["cover", "video", "thumb", "meta", "review", "publish"] + + +def test_user_gates_excludes_review(): + assert "review" not in USER_GATES + assert "publish" in USER_GATES + assert "cover" in USER_GATES + + +def test_approve_progression(): + assert next_state_on_approve("cover_pending") == "video_pending" + assert next_state_on_approve("video_pending") == "thumb_pending" + assert next_state_on_approve("thumb_pending") == "meta_pending" + assert next_state_on_approve("meta_pending") == "ai_review" + assert next_state_on_approve("publish_pending") == "publishing" + + +def test_approve_invalid_state_raises(): + with pytest.raises(ValueError): + next_state_on_approve("ai_review") # 자동 전이 — approve 호출 자체가 무효 + + +def test_reject_keeps_same_state(): + # 반려는 같은 *_pending 상태를 유지(재생성 트리거) + assert next_state_on_reject("cover_pending") == "cover_pending" + assert next_state_on_reject("publish_pending") == "publish_pending" + + +def test_can_transition_blocks_terminal_states(): + assert not can_transition("published", "cover_pending") + assert not can_transition("cancelled", "cover_pending") + assert not can_transition("failed", "cover_pending") + + +def test_can_transition_allows_cancel_from_anywhere(): + assert can_transition("cover_pending", "cancelled") + assert can_transition("publishing", "cancelled") + + +def test_can_transition_allows_failed_from_pending(): + assert can_transition("video_pending", "failed") + assert can_transition("publishing", "failed") +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd music-lab && python -m pytest tests/test_state_machine.py -v` +Expected: ImportError + +- [ ] **Step 3: Implement `state_machine.py`** + +`app/pipeline/__init__.py`: +```python +# 빈 파일 +``` + +`app/pipeline/state_machine.py`: +```python +"""파이프라인 상태 머신 — 전이 규칙 단일 소스.""" + +STEPS = ["cover", "video", "thumb", "meta", "review", "publish"] +USER_GATES = ["cover", "video", "thumb", "meta", "publish"] # review는 자동 + +_APPROVE_NEXT = { + "cover_pending": "video_pending", + "video_pending": "thumb_pending", + "thumb_pending": "meta_pending", + "meta_pending": "ai_review", # 자동 검토 단계로 + "publish_pending": "publishing", +} + +TERMINAL_STATES = {"published", "cancelled", "failed", "awaiting_manual"} + + +def next_state_on_approve(state: str) -> str: + if state not in _APPROVE_NEXT: + raise ValueError(f"승인 불가 상태: {state}") + return _APPROVE_NEXT[state] + + +def next_state_on_reject(state: str) -> str: + if not state.endswith("_pending"): + raise ValueError(f"반려 불가 상태: {state}") + return state # 같은 상태 유지 (재생성 후 다시 _pending) + + +def can_transition(from_state: str, to_state: str) -> bool: + if from_state in TERMINAL_STATES: + return False + if to_state in {"cancelled", "failed", "awaiting_manual"}: + return True + if to_state == _APPROVE_NEXT.get(from_state): + return True + # 자동 전이 (ai_review → publish_pending, publishing → published) + auto_transitions = { + ("ai_review", "publish_pending"), + ("publishing", "published"), + } + return (from_state, to_state) in auto_transitions +``` + +- [ ] **Step 4: Run tests** + +Run: `cd music-lab && python -m pytest tests/test_state_machine.py -v` +Expected: 8 PASS + +- [ ] **Step 5: Commit** + +```bash +git add music-lab/app/pipeline/ music-lab/tests/test_state_machine.py +git commit -m "feat(music-lab): pipeline 상태 머신" +``` + +--- + +## Task 3: Storage 헬퍼 + Cover 생성 (DALL·E + 폴백) + +**Files:** +- Create: `music-lab/app/pipeline/storage.py` +- Create: `music-lab/app/pipeline/cover.py` +- Test: `music-lab/tests/test_cover_generation.py` +- Modify: `music-lab/requirements.txt` (`openai`, `respx`) + +- [ ] **Step 1: Add deps** + +`requirements.txt`에 추가: `openai>=1.20.0`, `respx>=0.21`, `freezegun>=1.4` + +- [ ] **Step 2: Write failing test** + +```python +# tests/test_cover_generation.py +import pytest +import respx +from httpx import Response +from app.pipeline import cover, storage + + +@pytest.fixture +def tmp_storage(monkeypatch, tmp_path): + monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path)) + return tmp_path + + +@pytest.mark.asyncio +@respx.mock +async def test_dalle_success_saves_jpg(tmp_storage, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "test-key") + image_url = "https://oaidalleapiprodscus.blob.core.windows.net/x.png" + respx.post("https://api.openai.com/v1/images/generations").mock( + return_value=Response(200, json={"data": [{"url": image_url}]}) + ) + # PNG 1x1 픽셀 (간단) + png_bytes = bytes.fromhex( + "89504e470d0a1a0a0000000d49484452000000010000000108020000009077" + "53de0000000c4944415478da6300010000050001" + ) + respx.get(image_url).mock(return_value=Response(200, content=png_bytes)) + + out = await cover.generate(pipeline_id=42, genre="lo-fi", + prompt_template="moody anime", mood="chill") + assert out["used_fallback"] is False + assert out["url"].startswith("/media/videos/42/cover") + assert (tmp_storage / "42" / "cover.jpg").exists() + + +@pytest.mark.asyncio +@respx.mock +async def test_dalle_timeout_falls_back_to_gradient(tmp_storage, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "test-key") + respx.post("https://api.openai.com/v1/images/generations").mock( + side_effect=lambda req: Response(504) + ) + out = await cover.generate(pipeline_id=43, genre="phonk", + prompt_template="dark drift", mood="aggressive", + track_title="Midnight Drive") + assert out["used_fallback"] is True + assert (tmp_storage / "43" / "cover.jpg").exists() + + +@pytest.mark.asyncio +async def test_no_api_key_falls_back(tmp_storage, monkeypatch): + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + out = await cover.generate(pipeline_id=44, genre="ambient", + prompt_template="x", mood="calm", + track_title="Calm") + assert out["used_fallback"] is True + + +@pytest.mark.asyncio +@respx.mock +async def test_dalle_with_feedback_appends_to_prompt(tmp_storage, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "test-key") + captured = {} + def hook(req): + import json as _json + captured["body"] = _json.loads(req.content) + return Response(200, json={"data": [{"url": "https://x"}]}) + respx.post("https://api.openai.com/v1/images/generations").mock(side_effect=hook) + respx.get("https://x").mock(return_value=Response(200, content=b"\x89PNG\r\n\x1a\n")) + await cover.generate(pipeline_id=45, genre="lo-fi", + prompt_template="moody anime", mood="chill", + feedback="더 어둡게") + assert "더 어둡게" in captured["body"]["prompt"] +``` + +`pytest.ini` 또는 `conftest.py`에 `asyncio_mode = auto` 또는 마커 등록 필요. 기존 conftest 확인. + +- [ ] **Step 3: Run test, verify fail** + +Run: `python -m pytest tests/test_cover_generation.py -v` +Expected: ImportError + +- [ ] **Step 4: Implement `storage.py`** + +```python +"""파이프라인 산출물 디렉토리 관리.""" +import os + +VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") +VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos") + + +def pipeline_dir(pipeline_id: int) -> str: + path = os.path.join(VIDEO_DATA_DIR, str(pipeline_id)) + os.makedirs(path, exist_ok=True) + return path + + +def media_url(pipeline_id: int, filename: str) -> str: + return f"{VIDEO_MEDIA_BASE}/{pipeline_id}/{filename}" +``` + +- [ ] **Step 5: Implement `cover.py`** + +```python +"""AI 커버 아트 생성 — DALL·E 3 + 그라데이션 폴백.""" +import os +import logging +from typing import Optional + +import httpx +from PIL import Image, ImageDraw, ImageFont + +from . import storage + +logger = logging.getLogger("music-lab.cover") + +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +OPENAI_MODEL = os.getenv("OPENAI_IMAGE_MODEL", "gpt-image-1") +DALLE_TIMEOUT_S = 90 + +# 그라데이션 폴백 색 (장르별 RGB 페어) +GRADIENT_COLORS = { + "lo-fi": ((26, 26, 46), (22, 33, 62)), + "phonk": ((26, 10, 10), (45, 0, 0)), + "ambient": ((13, 33, 55), (10, 22, 40)), + "pop": ((26, 10, 46), (45, 27, 78)), + "default": ((17, 24, 39), (31, 41, 55)), +} + + +async def generate(*, pipeline_id: int, genre: str, prompt_template: str, + mood: str = "", track_title: str = "", feedback: str = "") -> dict: + """커버 아트 생성. 성공 시 jpg 저장 + URL 반환. 실패 시 그라데이션 폴백. + + 반환: {"url": str, "used_fallback": bool, "error": str | None} + """ + out_path = os.path.join(storage.pipeline_dir(pipeline_id), "cover.jpg") + used_fallback = False + error = None + + if OPENAI_API_KEY: + try: + await _generate_with_dalle(prompt_template, mood, feedback, out_path) + except Exception as e: + logger.warning("DALL·E 실패 — 폴백: %s", e) + error = str(e) + used_fallback = True + _generate_gradient(genre, track_title, out_path) + else: + used_fallback = True + error = "OPENAI_API_KEY 미설정" + _generate_gradient(genre, track_title, out_path) + + return { + "url": storage.media_url(pipeline_id, "cover.jpg"), + "used_fallback": used_fallback, + "error": error, + } + + +async def _generate_with_dalle(prompt_template: str, mood: str, + feedback: str, out_path: str) -> None: + prompt = prompt_template + if mood: + prompt = f"{prompt}, {mood} mood" + if feedback: + prompt = f"{prompt}. 추가 지시: {feedback}" + prompt = f"{prompt}, no text, high quality" + + async with httpx.AsyncClient(timeout=DALLE_TIMEOUT_S) as client: + resp = await client.post( + "https://api.openai.com/v1/images/generations", + headers={"Authorization": f"Bearer {OPENAI_API_KEY}"}, + json={"model": OPENAI_MODEL, "prompt": prompt, "size": "1024x1024", "n": 1}, + ) + resp.raise_for_status() + url = resp.json()["data"][0]["url"] + img_resp = await client.get(url) + img_resp.raise_for_status() + # PNG → JPG 변환 + from io import BytesIO + img = Image.open(BytesIO(img_resp.content)).convert("RGB") + img.save(out_path, "JPEG", quality=92) + + +def _generate_gradient(genre: str, track_title: str, out_path: str) -> None: + w, h = 1024, 1024 + top, bot = GRADIENT_COLORS.get(genre.lower(), GRADIENT_COLORS["default"]) + img = Image.new("RGB", (w, h)) + px = img.load() + for y in range(h): + t = y / h + r = int(top[0] + (bot[0] - top[0]) * t) + g = int(top[1] + (bot[1] - top[1]) * t) + b = int(top[2] + (bot[2] - top[2]) * t) + for x in range(w): + px[x, y] = (r, g, b) + + if track_title: + try: + font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 64) + except OSError: + font = ImageFont.load_default() + draw = ImageDraw.Draw(img) + bbox = draw.textbbox((0, 0), track_title, font=font) + tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] + draw.text(((w - tw) // 2, (h - th) // 2), track_title, fill=(255, 255, 255), font=font) + + img.save(out_path, "JPEG", quality=92) +``` + +- [ ] **Step 6: Run tests** + +Run: `python -m pytest tests/test_cover_generation.py -v` +Expected: 4 PASS + +- [ ] **Step 7: Commit** + +```bash +git add music-lab/app/pipeline/storage.py music-lab/app/pipeline/cover.py \ + music-lab/tests/test_cover_generation.py music-lab/requirements.txt +git commit -m "feat(music-lab): AI 커버 생성 + 그라데이션 폴백" +``` + +--- + +## Task 4: 영상/썸네일 생성 (FFmpeg, 기존 video_producer 이전) + +**Files:** +- Create: `music-lab/app/pipeline/video.py` (move logic from `app/video_producer.py`) +- Create: `music-lab/app/pipeline/thumb.py` +- Test: `music-lab/tests/test_video_thumb.py` + +- [ ] **Step 1: Write failing test** + +FFmpeg 직접 실행 대신 subprocess.run을 mock 처리. + +```python +# tests/test_video_thumb.py +import os +import pytest +from unittest.mock import patch, MagicMock +from app.pipeline import video, thumb, storage + + +@pytest.fixture +def tmp_storage(monkeypatch, tmp_path): + monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path)) + # 더미 입력 파일들 + audio = tmp_path / "audio.mp3" + audio.write_bytes(b"\x00" * 100) + cover = tmp_path / str(50) / "cover.jpg" + cover.parent.mkdir(parents=True) + cover.write_bytes(b"\x00" * 100) + return tmp_path + + +@patch("subprocess.run") +def test_generate_video_calls_ffmpeg(mock_run, tmp_storage): + mock_run.return_value = MagicMock(returncode=0, stderr="") + out = video.generate(pipeline_id=50, audio_path=str(tmp_storage / "audio.mp3"), + cover_path=str(tmp_storage / "50" / "cover.jpg"), + genre="lo-fi", duration_sec=120, resolution="1920x1080", + style="visualizer") + assert out["url"].endswith("/50/video.mp4") + assert out["used_fallback"] is False + args = mock_run.call_args[0][0] + assert args[0] == "ffmpeg" + assert "-i" in args + assert "showwaves" in " ".join(args) + + +@patch("subprocess.run") +def test_generate_video_failure_marks_failed(mock_run, tmp_storage): + mock_run.return_value = MagicMock(returncode=1, stderr="bad codec") + with pytest.raises(video.VideoGenerationError): + video.generate(pipeline_id=51, audio_path=str(tmp_storage / "audio.mp3"), + cover_path=str(tmp_storage / "50" / "cover.jpg"), + genre="lo-fi", duration_sec=120, resolution="1920x1080", + style="visualizer") + + +@patch("subprocess.run") +def test_thumb_extracts_frame(mock_run, tmp_storage): + mock_run.return_value = MagicMock(returncode=0, stderr="") + video_path = tmp_storage / "60" / "video.mp4" + video_path.parent.mkdir(parents=True) + video_path.write_bytes(b"\x00" * 100) + out = thumb.generate(pipeline_id=60, video_path=str(video_path), + track_title="Midnight Drive", overlay_text=True) + assert out["url"].endswith("/60/thumbnail.jpg") + args = mock_run.call_args[0][0] + assert args[0] == "ffmpeg" +``` + +- [ ] **Step 2: Run, verify fail** + +Run: `python -m pytest tests/test_video_thumb.py -v` +Expected: ImportError + +- [ ] **Step 3: Implement `video.py`** — 기존 `video_producer.py`의 ffmpeg 명령 빌더 활용 + +```python +"""영상 비주얼 생성 — visualizer/슬라이드쇼 스타일.""" +import os +import subprocess +import logging +from . import storage + +logger = logging.getLogger("music-lab.video") + +VIDEO_TIMEOUT_S = 300 # 5분 + + +class VideoGenerationError(Exception): + pass + + +def generate(*, pipeline_id: int, audio_path: str, cover_path: str, + genre: str, duration_sec: int, resolution: str = "1920x1080", + style: str = "visualizer") -> dict: + """영상 생성. 성공 시 mp4 저장 + URL 반환. 실패 시 예외.""" + w, h = resolution.split("x") + out_path = os.path.join(storage.pipeline_dir(pipeline_id), "video.mp4") + + if style == "visualizer": + cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h) + else: + # 차후: 슬라이드쇼 등 다른 스타일 + cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h) + + logger.info("ffmpeg 실행: %s", " ".join(cmd)) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=VIDEO_TIMEOUT_S) + if result.returncode != 0: + raise VideoGenerationError(f"ffmpeg 실패: {result.stderr[:500]}") + + return { + "url": storage.media_url(pipeline_id, "video.mp4"), + "used_fallback": False, + "duration_sec": duration_sec, + } + + +def _build_visualizer_cmd(audio: str, bg: str, out: str, w: str, h: str) -> list: + return [ + "ffmpeg", "-y", + "-loop", "1", "-i", bg, + "-i", audio, + "-filter_complex", + f"[0:v]scale={w}:{h}[bg];" + f"[1:a]showwaves=s={w}x200:mode=cline:colors=0xFF4444@0.8[wave];" + f"[bg][wave]overlay=0:({h}-200)[out]", + "-map", "[out]", "-map", "1:a", + "-c:v", "libx264", "-preset", "fast", "-crf", "23", + "-c:a", "aac", "-b:a", "192k", + "-shortest", out, + ] +``` + +- [ ] **Step 4: Implement `thumb.py`** + +```python +"""썸네일 생성 — 영상 5초 프레임 추출 + 텍스트 오버레이.""" +import os +import subprocess +import logging +from PIL import Image, ImageDraw, ImageFont + +from . import storage + +logger = logging.getLogger("music-lab.thumb") +THUMB_TIMEOUT_S = 60 + + +class ThumbGenerationError(Exception): + pass + + +def generate(*, pipeline_id: int, video_path: str, + track_title: str = "", overlay_text: bool = True) -> dict: + out_path = os.path.join(storage.pipeline_dir(pipeline_id), "thumbnail.jpg") + cmd = ["ffmpeg", "-y", "-i", video_path, + "-ss", "00:00:05", "-vframes", "1", "-q:v", "2", out_path] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=THUMB_TIMEOUT_S) + if result.returncode != 0: + raise ThumbGenerationError(f"ffmpeg 썸네일 실패: {result.stderr[:300]}") + + if overlay_text and track_title: + _overlay_title(out_path, track_title) + + return {"url": storage.media_url(pipeline_id, "thumbnail.jpg"), "used_fallback": False} + + +def _overlay_title(path: str, title: str) -> None: + try: + img = Image.open(path).convert("RGB") + draw = ImageDraw.Draw(img) + try: + font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 80) + except OSError: + font = ImageFont.load_default() + # 하단 30% 영역에 검정 반투명 박스 + 흰 글씨 + w, h = img.size + box_h = int(h * 0.3) + overlay = Image.new("RGBA", (w, box_h), (0, 0, 0, 160)) + img.paste(overlay, (0, h - box_h), overlay) + bbox = draw.textbbox((0, 0), title, font=font) + tw = bbox[2] - bbox[0] + draw.text(((w - tw) // 2, h - box_h + 30), title, fill=(255, 255, 255), font=font) + img.save(path, "JPEG", quality=92) + except Exception as e: + logger.warning("썸네일 오버레이 실패: %s", e) +``` + +- [ ] **Step 5: Run tests** + +Run: `python -m pytest tests/test_video_thumb.py -v` +Expected: 3 PASS + +- [ ] **Step 6: Commit** + +```bash +git add music-lab/app/pipeline/video.py music-lab/app/pipeline/thumb.py \ + music-lab/tests/test_video_thumb.py +git commit -m "feat(music-lab): pipeline 영상·썸네일 생성" +``` + +--- + +## Task 5: 메타데이터 생성 (Claude Haiku) + +**Files:** +- Create: `music-lab/app/pipeline/metadata.py` +- Test: `music-lab/tests/test_metadata_generation.py` + +- [ ] **Step 1: Write failing test** + +```python +# tests/test_metadata_generation.py +import pytest +import respx +from httpx import Response +from app.pipeline import metadata + + +@pytest.mark.asyncio +@respx.mock +async def test_metadata_calls_claude_and_parses_json(monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") + payload = { + "content": [{"type": "text", "text": '{"title":"[Lo-fi] Drive | 85BPM",' + '"description":"chill","tags":["lofi","85bpm"],' + '"category_id":10}'}] + } + respx.post("https://api.anthropic.com/v1/messages").mock( + return_value=Response(200, json=payload) + ) + result = await metadata.generate( + track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", + "moods": ["chill"], "instruments": ["piano"]}, + template={"title": "[{genre}] {title} | {bpm}BPM", + "description": "{title}\n", "tags": [], "category_id": 10}, + trend_keywords=["lofi", "study"], + feedback="", + ) + assert result["title"].startswith("[Lo-fi]") + assert "lofi" in result["tags"] + + +@pytest.mark.asyncio +@respx.mock +async def test_metadata_fallback_when_no_api_key(monkeypatch): + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + result = await metadata.generate( + track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", + "moods": [], "instruments": []}, + template={"title": "[{genre}] {title} | {bpm}BPM", + "description": "{title}", "tags": ["lofi"], "category_id": 10}, + trend_keywords=[], + ) + # 템플릿 변수 그대로 치환된 폴백 + assert result["title"] == "[lo-fi] Drive | 85BPM" + assert result["used_fallback"] is True + + +@pytest.mark.asyncio +@respx.mock +async def test_metadata_includes_feedback_in_prompt(monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") + captured = {} + def hook(req): + import json + captured["body"] = json.loads(req.content) + return Response(200, json={"content": [{"type": "text", + "text": '{"title":"x","description":"y","tags":[],"category_id":10}'}]}) + respx.post("https://api.anthropic.com/v1/messages").mock(side_effect=hook) + await metadata.generate( + track={"title": "X", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", + "moods": [], "instruments": []}, + template={"title": "{title}", "description": "{title}", "tags": [], "category_id": 10}, + trend_keywords=[], + feedback="제목을 짧게", + ) + assert "제목을 짧게" in str(captured["body"]) +``` + +- [ ] **Step 2: Run, verify fail** + +Run: `python -m pytest tests/test_metadata_generation.py -v` +Expected: ImportError + +- [ ] **Step 3: Implement `metadata.py`** + +```python +"""메타데이터 생성 — Claude Haiku + 템플릿 폴백.""" +import os +import json +import logging + +import httpx + +logger = logging.getLogger("music-lab.metadata") + +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +CLAUDE_MODEL = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001") +TIMEOUT_S = 30 + + +async def generate(*, track: dict, template: dict, trend_keywords: list[str], + feedback: str = "") -> dict: + """메타데이터 생성. 성공 시 LLM, 실패/미설정 시 템플릿 치환 폴백. + + 반환: {"title", "description", "tags", "category_id", "used_fallback", "error"} + """ + if not ANTHROPIC_API_KEY: + return {**_fallback_template(track, template), "used_fallback": True, "error": "no api key"} + + try: + result = await _call_claude(track, template, trend_keywords, feedback) + return {**result, "used_fallback": False, "error": None} + except Exception as e: + logger.warning("메타데이터 LLM 실패 — 폴백: %s", e) + return {**_fallback_template(track, template), "used_fallback": True, "error": str(e)} + + +def _fallback_template(track: dict, template: dict) -> dict: + fmt_vars = { + "title": track.get("title", ""), + "genre": track.get("genre", ""), + "bpm": track.get("bpm", ""), + "key": track.get("key", ""), + "scale": track.get("scale", ""), + } + title = template.get("title", "{title}").format(**fmt_vars) + description = template.get("description", "{title}").format(**fmt_vars) + return { + "title": title[:100], + "description": description[:5000], + "tags": (template.get("tags") or [])[:15], + "category_id": template.get("category_id", 10), + } + + +async def _call_claude(track: dict, template: dict, + trend_keywords: list[str], feedback: str) -> dict: + user_prompt = ( + "다음 트랙의 YouTube 메타데이터를 생성하세요. JSON으로만 응답.\n\n" + f"트랙: {json.dumps(track, ensure_ascii=False)}\n" + f"템플릿: {json.dumps(template, ensure_ascii=False)}\n" + f"트렌드 키워드: {', '.join(trend_keywords)}\n" + ) + if feedback: + user_prompt += f"\n사용자 피드백: {feedback}\n" + user_prompt += ( + '\n출력 JSON: {"title": "60자 이내", "description": "1000자 이내, 3-5문단",' + ' "tags": ["15개 이내"], "category_id": 10}' + ) + + async with httpx.AsyncClient(timeout=TIMEOUT_S) as client: + resp = await client.post( + "https://api.anthropic.com/v1/messages", + headers={ + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": CLAUDE_MODEL, + "max_tokens": 1024, + "messages": [{"role": "user", "content": user_prompt}], + }, + ) + resp.raise_for_status() + text = resp.json()["content"][0]["text"] + # 가장 첫 JSON 블록 추출 + start = text.find("{") + end = text.rfind("}") + 1 + return json.loads(text[start:end]) +``` + +- [ ] **Step 4: Run tests** + +Run: `python -m pytest tests/test_metadata_generation.py -v` +Expected: 3 PASS + +- [ ] **Step 5: Commit** + +```bash +git add music-lab/app/pipeline/metadata.py music-lab/tests/test_metadata_generation.py +git commit -m "feat(music-lab): pipeline 메타데이터 LLM 생성 + 폴백" +``` + +--- + +## Task 6: AI 최종 검토 (4축) + +**Files:** +- Create: `music-lab/app/pipeline/review.py` +- Test: `music-lab/tests/test_review.py` + +- [ ] **Step 1: Write failing test** + +```python +# tests/test_review.py +import pytest +import respx +from httpx import Response +from app.pipeline import review + + +@pytest.mark.asyncio +@respx.mock +async def test_review_returns_pass_when_above_threshold(monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "k") + body = {"content": [{"type": "text", "text": + '{"metadata_quality":{"score":80,"notes":"x"},' + '"policy_compliance":{"score":90,"issues":[]},' + '"viewer_experience":{"score":75,"notes":"y"},' + '"trend_alignment":{"score":70,"matched_keywords":["lofi"]},' + '"summary":"good"}'}]} + respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body)) + result = await review.run_4_axis( + pipeline={"id": 1}, track={"title": "x", "genre": "lo-fi", "bpm": 85}, + video_meta={"length_sec": 120, "resolution": "1920x1080"}, + metadata={"title": "Y", "description": "Z", "tags": ["lofi"], "category_id": 10}, + thumbnail_url="/m/x.jpg", trend_top=["lofi"], + weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20}, + threshold=60, + ) + assert result["verdict"] == "pass" + expected_total = 0.25 * 80 + 0.30 * 90 + 0.25 * 75 + 0.20 * 70 + assert result["weighted_total"] == pytest.approx(expected_total, abs=0.01) + + +@pytest.mark.asyncio +@respx.mock +async def test_review_fail_below_threshold(monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "k") + body = {"content": [{"type": "text", "text": + '{"metadata_quality":{"score":40,"notes":"x"},' + '"policy_compliance":{"score":50,"issues":[]},' + '"viewer_experience":{"score":30,"notes":"y"},' + '"trend_alignment":{"score":20,"matched_keywords":[]},' + '"summary":"bad"}'}]} + respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body)) + result = await review.run_4_axis( + pipeline={"id": 2}, track={"title": "x", "genre": "lo-fi", "bpm": 85}, + video_meta={"length_sec": 120, "resolution": "1920x1080"}, + metadata={"title": "Y", "description": "Z", "tags": [], "category_id": 10}, + thumbnail_url="/m/x.jpg", trend_top=[], + weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20}, + threshold=60, + ) + assert result["verdict"] == "fail" + + +@pytest.mark.asyncio +@respx.mock +async def test_review_heuristic_fallback_on_llm_error(monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "k") + respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(500)) + result = await review.run_4_axis( + pipeline={"id": 3}, track={"title": "x", "genre": "lo-fi", "bpm": 85}, + video_meta={"length_sec": 120, "resolution": "1920x1080"}, + metadata={"title": "Y" * 30, "description": "Z" * 200, "tags": ["a", "b"], "category_id": 10}, + thumbnail_url="/m/x.jpg", trend_top=["lofi"], + weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20}, + threshold=60, + ) + assert result["used_fallback"] is True + assert "weighted_total" in result +``` + +- [ ] **Step 2: Run, verify fail** + +Run: `python -m pytest tests/test_review.py -v` +Expected: ImportError + +- [ ] **Step 3: Implement `review.py`** + +```python +"""AI 최종 검토 — 4축(메타/정책/시청/트렌드) 가중 평균.""" +import os +import json +import logging + +import httpx + +logger = logging.getLogger("music-lab.review") + +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +CLAUDE_MODEL = os.getenv("CLAUDE_SONNET_MODEL", "claude-sonnet-4-6") +TIMEOUT_S = 60 + +POLICY_BANNED = {"f-word", "n-word"} # 실제 단어는 운영 시 별도 파일로 — 데모용 자리 + + +async def run_4_axis(*, pipeline: dict, track: dict, video_meta: dict, + metadata: dict, thumbnail_url: str, trend_top: list[str], + weights: dict, threshold: int) -> dict: + if not ANTHROPIC_API_KEY: + return _heuristic(metadata, video_meta, track, trend_top, weights, threshold, + fallback_reason="no api key") + try: + scores = await _call_claude(pipeline, track, video_meta, metadata, + thumbnail_url, trend_top) + return _weighted_verdict(scores, weights, threshold, used_fallback=False) + except Exception as e: + logger.warning("검토 LLM 실패 — 휴리스틱: %s", e) + return _heuristic(metadata, video_meta, track, trend_top, weights, threshold, + fallback_reason=str(e)) + + +def _weighted_verdict(scores: dict, weights: dict, threshold: int, + used_fallback: bool) -> dict: + total = ( + weights["meta"] / 100 * scores["metadata_quality"]["score"] + + weights["policy"] / 100 * scores["policy_compliance"]["score"] + + weights["viewer"] / 100 * scores["viewer_experience"]["score"] + + weights["trend"] / 100 * scores["trend_alignment"]["score"] + ) + return { + **scores, + "weighted_total": round(total, 2), + "verdict": "pass" if total >= threshold else "fail", + "used_fallback": used_fallback, + } + + +async def _call_claude(pipeline, track, video_meta, metadata, thumbnail_url, trend_top): + user = ( + "트랙·영상·메타데이터를 4축으로 평가하고 JSON만 응답:\n" + f"트랙: {json.dumps(track, ensure_ascii=False)}\n" + f"영상: {json.dumps(video_meta)}\n" + f"메타: {json.dumps(metadata, ensure_ascii=False)}\n" + f"썸네일: {thumbnail_url}\n" + f"트렌드: {trend_top}\n" + '출력: {"metadata_quality":{"score":0-100,"notes":""},' + '"policy_compliance":{"score":0-100,"issues":[]},' + '"viewer_experience":{"score":0-100,"notes":""},' + '"trend_alignment":{"score":0-100,"matched_keywords":[]},' + '"summary":""}' + ) + async with httpx.AsyncClient(timeout=TIMEOUT_S) as client: + resp = await client.post( + "https://api.anthropic.com/v1/messages", + headers={ + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={"model": CLAUDE_MODEL, "max_tokens": 1024, + "messages": [{"role": "user", "content": user}]}, + ) + resp.raise_for_status() + text = resp.json()["content"][0]["text"] + return json.loads(text[text.find("{"):text.rfind("}")+1]) + + +def _heuristic(metadata, video_meta, track, trend_top, weights, threshold, fallback_reason): + # 메타: 길이·태그 카운트 + title_len = len(metadata.get("title", "")) + desc_len = len(metadata.get("description", "")) + tag_n = len(metadata.get("tags", [])) + meta_score = 100 if 5 <= title_len <= 60 and 50 <= desc_len <= 1000 and 5 <= tag_n <= 15 else 50 + + # 정책: 금칙어 매치 + text_blob = (metadata.get("title", "") + metadata.get("description", "")).lower() + policy_score = 100 if not any(w in text_blob for w in POLICY_BANNED) else 30 + + # 시청: 영상 길이가 트랙과 큰 차이 없는지 휴리스틱(±5초) + expected = track.get("duration_sec", video_meta.get("length_sec", 0)) + delta = abs(video_meta.get("length_sec", 0) - expected) + viewer_score = 90 if delta <= 5 else 60 + + # 트렌드: 태그가 트렌드와 겹치는지 + overlap = set(metadata.get("tags", [])) & set(trend_top) + trend_score = 100 if overlap else 40 + + scores = { + "metadata_quality": {"score": meta_score, "notes": "휴리스틱"}, + "policy_compliance": {"score": policy_score, "issues": []}, + "viewer_experience": {"score": viewer_score, "notes": "휴리스틱"}, + "trend_alignment": {"score": trend_score, "matched_keywords": list(overlap)}, + "summary": f"휴리스틱 fallback: {fallback_reason}", + } + return _weighted_verdict(scores, weights, threshold, used_fallback=True) +``` + +- [ ] **Step 4: Run tests** + +Run: `python -m pytest tests/test_review.py -v` +Expected: 3 PASS + +- [ ] **Step 5: Commit** + +```bash +git add music-lab/app/pipeline/review.py music-lab/tests/test_review.py +git commit -m "feat(music-lab): pipeline 4축 AI 검토 + 휴리스틱 폴백" +``` + +--- + +## Task 7: YouTube OAuth + Upload + +**Files:** +- Create: `music-lab/app/pipeline/youtube.py` +- Test: `music-lab/tests/test_youtube_upload.py` +- Modify: `music-lab/requirements.txt` (`google-api-python-client>=2.100`, `google-auth-oauthlib>=1.2`) + +- [ ] **Step 1: Add deps** + +`requirements.txt`: +``` +google-api-python-client>=2.100 +google-auth-oauthlib>=1.2 +google-auth-httplib2>=0.2 +``` + +- [ ] **Step 2: Write failing test** + +```python +# tests/test_youtube_upload.py +import pytest +from unittest.mock import patch, MagicMock +from app.pipeline import youtube + + +def _setup_token(monkeypatch, db, refresh="r1", access="a1"): + db.upsert_oauth_token( + channel_id="UC1", channel_title="t", avatar_url=None, + refresh_token=refresh, access_token=access, expires_at="2099-01-01T00:00:00", + ) + + +@pytest.fixture +def fresh_db(monkeypatch, tmp_path): + from app import db + monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db")) + db.init_db() + return db + + +@patch("app.pipeline.youtube._build_youtube_client") +def test_upload_succeeds_after_resumable(mock_client, fresh_db, tmp_path, monkeypatch): + monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid") + monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec") + _setup_token(monkeypatch, fresh_db) + + yt = MagicMock() + insert = MagicMock() + insert.next_chunk.side_effect = [(None, None), (None, {"id": "VID123"})] + yt.videos().insert.return_value = insert + mock_client.return_value = yt + + video_path = tmp_path / "v.mp4" + video_path.write_bytes(b"\x00") + out = youtube.upload_video( + video_path=str(video_path), + thumbnail_path=None, + metadata={"title": "T", "description": "D", "tags": ["x"], "category_id": 10}, + privacy="private", + ) + assert out["video_id"] == "VID123" + + +@patch("app.pipeline.youtube._build_youtube_client") +def test_upload_no_token_raises(mock_client, fresh_db, tmp_path): + video_path = tmp_path / "v.mp4" + video_path.write_bytes(b"\x00") + with pytest.raises(youtube.NotAuthenticatedError): + youtube.upload_video( + video_path=str(video_path), thumbnail_path=None, + metadata={"title":"T","description":"D","tags":[],"category_id":10}, + privacy="private", + ) + + +@patch("app.pipeline.youtube._build_youtube_client") +def test_upload_quota_exceeded_marks_quota(mock_client, fresh_db, tmp_path, monkeypatch): + from googleapiclient.errors import HttpError + monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid") + monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec") + _setup_token(monkeypatch, fresh_db) + + yt = MagicMock() + err = HttpError(MagicMock(status=403), b'{"error":{"errors":[{"reason":"quotaExceeded"}]}}') + yt.videos().insert.return_value.next_chunk.side_effect = err + mock_client.return_value = yt + + video_path = tmp_path / "v.mp4" + video_path.write_bytes(b"\x00") + with pytest.raises(youtube.QuotaExceededError): + youtube.upload_video( + video_path=str(video_path), thumbnail_path=None, + metadata={"title":"T","description":"D","tags":[],"category_id":10}, + privacy="private", + ) +``` + +- [ ] **Step 3: Run, verify fail** + +Run: `python -m pytest tests/test_youtube_upload.py -v` +Expected: ImportError + +- [ ] **Step 4: Implement `youtube.py`** + +```python +"""YouTube OAuth flow + resumable 업로드.""" +import os +import logging +from urllib.parse import urlencode + +import httpx +from google.oauth2.credentials import Credentials +from googleapiclient.discovery import build +from googleapiclient.http import MediaFileUpload +from googleapiclient.errors import HttpError + +from app import db + +logger = logging.getLogger("music-lab.youtube") + +CLIENT_ID = os.getenv("YOUTUBE_OAUTH_CLIENT_ID", "") +CLIENT_SECRET = os.getenv("YOUTUBE_OAUTH_CLIENT_SECRET", "") +REDIRECT_URI = os.getenv("YOUTUBE_OAUTH_REDIRECT_URI", "") +SCOPES = ["https://www.googleapis.com/auth/youtube.upload", + "https://www.googleapis.com/auth/youtube.readonly"] + + +class NotAuthenticatedError(Exception): + pass + + +class QuotaExceededError(Exception): + pass + + +def get_auth_url() -> str: + if not CLIENT_ID or not REDIRECT_URI: + raise RuntimeError("OAuth 환경변수 미설정") + params = { + "client_id": CLIENT_ID, + "redirect_uri": REDIRECT_URI, + "response_type": "code", + "scope": " ".join(SCOPES), + "access_type": "offline", + "prompt": "consent", + } + return "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode(params) + + +async def exchange_code(code: str) -> dict: + """code → refresh_token + access_token + 채널 정보 → DB 저장.""" + async with httpx.AsyncClient(timeout=30) as client: + token_resp = await client.post( + "https://oauth2.googleapis.com/token", + data={ + "code": code, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "redirect_uri": REDIRECT_URI, + "grant_type": "authorization_code", + }, + ) + token_resp.raise_for_status() + tok = token_resp.json() + access = tok["access_token"] + refresh = tok["refresh_token"] + expires_at = _expiry_from_seconds(tok["expires_in"]) + + # 채널 정보 조회 + creds = _creds(access=access, refresh=refresh) + yt = build("youtube", "v3", credentials=creds, cache_discovery=False) + ch = yt.channels().list(part="snippet", mine=True).execute() + item = ch["items"][0] + db.upsert_oauth_token( + channel_id=item["id"], + channel_title=item["snippet"]["title"], + avatar_url=item["snippet"]["thumbnails"]["default"]["url"], + refresh_token=refresh, access_token=access, expires_at=expires_at, + ) + return {"channel_id": item["id"], "channel_title": item["snippet"]["title"]} + + +def get_status() -> dict | None: + tok = db.get_oauth_token() + if not tok: + return None + return { + "channel_id": tok["channel_id"], + "channel_title": tok["channel_title"], + "avatar_url": tok["avatar_url"], + } + + +def disconnect() -> None: + db.delete_oauth_token() + + +def upload_video(*, video_path: str, thumbnail_path: str | None, + metadata: dict, privacy: str) -> dict: + tok = db.get_oauth_token() + if not tok: + raise NotAuthenticatedError("YouTube 인증 없음") + creds = _creds(access=tok["access_token"], refresh=tok["refresh_token"]) + yt = _build_youtube_client(creds) + + body = { + "snippet": { + "title": metadata["title"], + "description": metadata["description"], + "tags": metadata.get("tags", []), + "categoryId": str(metadata.get("category_id", 10)), + }, + "status": {"privacyStatus": privacy, "selfDeclaredMadeForKids": False}, + } + media = MediaFileUpload(video_path, chunksize=4 * 1024 * 1024, resumable=True, mimetype="video/mp4") + req = yt.videos().insert(part="snippet,status", body=body, media_body=media) + + try: + response = None + while response is None: + status, response = req.next_chunk() + video_id = response["id"] + except HttpError as e: + if b"quotaExceeded" in e.content: + raise QuotaExceededError(str(e)) + raise + + if thumbnail_path: + try: + yt.thumbnails().set(videoId=video_id, media_body=thumbnail_path).execute() + except HttpError as e: + logger.warning("썸네일 업로드 실패: %s", e) + + return {"video_id": video_id} + + +def _build_youtube_client(creds): # patch 포인트 + return build("youtube", "v3", credentials=creds, cache_discovery=False) + + +def _creds(access: str, refresh: str) -> Credentials: + return Credentials( + token=access, refresh_token=refresh, + token_uri="https://oauth2.googleapis.com/token", + client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES, + ) + + +def _expiry_from_seconds(secs: int) -> str: + from datetime import datetime, timedelta + return (datetime.utcnow() + timedelta(seconds=secs)).isoformat(timespec="seconds") +``` + +- [ ] **Step 5: Run tests** + +Run: `python -m pytest tests/test_youtube_upload.py -v` +Expected: 3 PASS + +- [ ] **Step 6: Commit** + +```bash +git add music-lab/app/pipeline/youtube.py music-lab/tests/test_youtube_upload.py \ + music-lab/requirements.txt +git commit -m "feat(music-lab): YouTube OAuth + resumable 업로드" +``` + +--- + +## Task 8: 오케스트레이터 + 13개 엔드포인트 + +**Files:** +- Create: `music-lab/app/pipeline/orchestrator.py` +- Modify: `music-lab/app/main.py` +- Test: `music-lab/tests/test_pipeline_endpoints.py` + +이 task는 코드량이 가장 많음. 엔드포인트 12개 + orchestrator의 BackgroundTask 분기. + +- [ ] **Step 1: Write failing test** + +```python +# tests/test_pipeline_endpoints.py +import pytest +from fastapi.testclient import TestClient +from app.main import app +from app import db + + +@pytest.fixture +def client(monkeypatch, tmp_path): + monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db")) + db.init_db() + # 최소 트랙 1개 (라이브러리) + from app.db import save_track + save_track({"id": 1, "title": "T", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor", + "moods": [], "instruments": [], "audio_url": "/x.mp3", "duration_sec": 120}) + return TestClient(app) + + +def test_create_pipeline(client): + r = client.post("/api/music/pipeline", json={"track_id": 1}) + assert r.status_code == 201 + assert r.json()["state"] == "created" + + +def test_create_duplicate_pipeline_returns_409(client): + client.post("/api/music/pipeline", json={"track_id": 1}) + r = client.post("/api/music/pipeline", json={"track_id": 1}) + assert r.status_code == 409 + + +def test_get_pipeline_returns_jobs_and_feedback(client): + pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] + r = client.get(f"/api/music/pipeline/{pid}") + assert "jobs" in r.json() + assert "feedback" in r.json() + + +def test_list_pipelines_active_filter(client): + pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] + db.update_pipeline_state(pid, "published") + r = client.get("/api/music/pipeline?status=active") + assert all(p["state"] != "published" for p in r.json()["pipelines"]) + + +def test_feedback_approve_advances_state(client): + pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] + db.update_pipeline_state(pid, "cover_pending", cover_url="/m/x.jpg") + r = client.post(f"/api/music/pipeline/{pid}/feedback", + json={"step": "cover", "intent": "approve"}) + assert r.status_code == 202 + after = db.get_pipeline(pid) + # video_pending이 BackgroundTask로 진입 후 결과는 mock 환경에서 즉시 안 보일 수 있음 + # 최소 cover_approved 또는 video_pending 단계 확인 + assert after["state"] in ("cover_approved", "video_pending", "video_running") + + +def test_feedback_reject_records_feedback_and_increments_count(client): + pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] + db.update_pipeline_state(pid, "cover_pending") + r = client.post(f"/api/music/pipeline/{pid}/feedback", + json={"step": "cover", "intent": "reject", "feedback_text": "더 어둡게"}) + assert r.status_code == 202 + p = db.get_pipeline(pid) + assert p["feedback_count_per_step"]["cover"] == 1 + history = db.get_feedback_history(pid) + assert history[0]["feedback_text"] == "더 어둡게" + + +def test_feedback_after_5_rejects_marks_awaiting_manual(client): + pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] + db.update_pipeline_state(pid, "cover_pending") + for i in range(5): + client.post(f"/api/music/pipeline/{pid}/feedback", + json={"step": "cover", "intent": "reject", "feedback_text": f"again {i}"}) + r = client.post(f"/api/music/pipeline/{pid}/feedback", + json={"step": "cover", "intent": "reject", "feedback_text": "6th"}) + assert r.status_code == 409 + assert db.get_pipeline(pid)["state"] == "awaiting_manual" + + +def test_cancel_pipeline(client): + pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] + r = client.post(f"/api/music/pipeline/{pid}/cancel") + assert r.status_code == 200 + assert db.get_pipeline(pid)["state"] == "cancelled" +``` + +- [ ] **Step 2: Run, verify fail** + +Run: `python -m pytest tests/test_pipeline_endpoints.py -v` +Expected: 404 또는 collection error + +- [ ] **Step 3: Implement `orchestrator.py`** + +```python +"""파이프라인 오케스트레이터 — 단계별 BackgroundTask 등록 및 산출물 → DB 반영.""" +import asyncio +import json +import logging +from typing import Optional + +from app import db +from . import cover, video, thumb, metadata, review, youtube + +logger = logging.getLogger("music-lab.orchestrator") + +REVIEW_WEIGHTS_DEFAULT = {"meta": 25, "policy": 30, "viewer": 25, "trend": 20} + + +async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: + """단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이. + + 호출 직후 _running 상태로 전환, 끝나면 _pending(사용자 게이트) 또는 자동 다음. + 실패 시 failed 상태 + reason. + """ + job_id = db.create_pipeline_job(pipeline_id, step) + db.update_pipeline_job(job_id, status="running") + p = db.get_pipeline(pipeline_id) + track = _get_track(p["track_id"]) + + try: + if step == "cover": + result = await _run_cover(p, track, feedback) + elif step == "video": + result = await _run_video(p, track) + elif step == "thumb": + result = await _run_thumb(p, track, feedback) + elif step == "meta": + result = await _run_meta(p, track, feedback) + elif step == "review": + result = await _run_review(p, track) + elif step == "publish": + result = await _run_publish(p, track) + else: + raise ValueError(f"unknown step: {step}") + db.update_pipeline_job(job_id, status="succeeded") + db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {})) + except Exception as e: + logger.exception("step %s failed for pipeline %s", step, pipeline_id) + db.update_pipeline_job(job_id, status="failed", error=str(e)) + db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") + + +def _get_track(track_id: int) -> dict: + # tracks 테이블 스키마는 기존 music-lab — 여기서는 dict로 추정 + t = db.get_track(track_id) + if not t: + raise ValueError(f"트랙 {track_id} 없음") + return t + + +async def _run_cover(p, track, feedback): + setup = db.get_youtube_setup() + prompts = setup["cover_prompts"] + template = prompts.get(track["genre"].lower(), prompts.get("default", "")) + out = await cover.generate( + pipeline_id=p["id"], genre=track["genre"], prompt_template=template, + mood=", ".join(track.get("moods", [])), track_title=track["title"], feedback=feedback, + ) + return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}} + + +async def _run_video(p, track): + setup = db.get_youtube_setup() + vd = setup["visual_defaults"] + audio_path = _local_path(track["audio_url"]) # /media/... → /data/... + cover_path = _local_path(p["cover_url"]) + out = video.generate( + pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path, + genre=track["genre"], duration_sec=track.get("duration_sec", 120), + resolution=vd["resolution"], style=vd["style"], + ) + return {"next_state": "video_pending", "fields": {"video_url": out["url"]}} + + +async def _run_thumb(p, track, feedback): + video_path = _local_path(p["video_url"]) + out = thumb.generate(pipeline_id=p["id"], video_path=video_path, + track_title=track["title"], overlay_text=True) + return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}} + + +async def _run_meta(p, track, feedback): + setup = db.get_youtube_setup() + trend_top = _get_trend_top() + out = await metadata.generate( + track=track, template=setup["metadata_template"], + trend_keywords=trend_top, feedback=feedback, + ) + return {"next_state": "meta_pending", + "fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}} + + +async def _run_review(p, track): + setup = db.get_youtube_setup() + meta = json.loads(p["metadata_json"]) + result = await review.run_4_axis( + pipeline=p, track=track, + video_meta={"length_sec": track.get("duration_sec", 120), + "resolution": setup["visual_defaults"]["resolution"]}, + metadata=meta, thumbnail_url=p["thumbnail_url"], + trend_top=_get_trend_top(), + weights=setup["review_weights"], threshold=setup["review_threshold"], + ) + return {"next_state": "publish_pending", + "fields": {"review_json": json.dumps(result, ensure_ascii=False)}} + + +async def _run_publish(p, track): + setup = db.get_youtube_setup() + meta = json.loads(p["metadata_json"]) + privacy = setup["publish_policy"].get("privacy", "private") + result = youtube.upload_video( + video_path=_local_path(p["video_url"]), + thumbnail_path=_local_path(p["thumbnail_url"]), + metadata=meta, privacy=privacy, + ) + return {"next_state": "published", + "fields": {"youtube_video_id": result["video_id"]}} + + +def _local_path(media_url: str) -> str: + """ /media/videos/123/cover.jpg → /app/data/videos/123/cover.jpg """ + import os + base_media = os.getenv("VIDEO_MEDIA_BASE", "/media/videos") + base_data = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") + if media_url.startswith(base_media): + return media_url.replace(base_media, base_data, 1) + # /media/music/abc.mp3 → /app/data/music/abc.mp3 + return media_url.replace("/media/", "/app/data/", 1) + + +def _get_trend_top(n: int = 10) -> list[str]: + try: + rows = db.get_market_trends(limit=n) # 기존 market_trends 테이블 + return [r["genre"] for r in rows] + except Exception: + return [] +``` + +- [ ] **Step 4: Add endpoints to `main.py`** + +`app/main.py`에 다음 추가 (기존 endpoints 아래): + +```python +from fastapi import BackgroundTasks +from pydantic import BaseModel +from app.pipeline import orchestrator, youtube as yt_module +from app.pipeline.state_machine import ( + next_state_on_approve, next_state_on_reject, USER_GATES, +) + + +class PipelineCreate(BaseModel): + track_id: int + + +class FeedbackRequest(BaseModel): + step: str + intent: str # approve | reject + feedback_text: str | None = None + + +@app.post("/api/music/pipeline", status_code=201) +def create_pipeline(req: PipelineCreate): + # 동일 트랙 활성 파이프라인 중복 방지 + actives = db.list_pipelines(active_only=True) + if any(p["track_id"] == req.track_id for p in actives): + raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다") + pid = db.create_pipeline(req.track_id) + return db.get_pipeline(pid) + + +@app.get("/api/music/pipeline") +def list_pipelines_endpoint(status: str = "all"): + pipelines = db.list_pipelines(active_only=(status == "active")) + return {"pipelines": pipelines} + + +@app.get("/api/music/pipeline/{pid}") +def get_pipeline_endpoint(pid: int): + p = db.get_pipeline(pid) + if not p: + raise HTTPException(404) + p["jobs"] = db.list_pipeline_jobs(pid) + p["feedback"] = db.get_feedback_history(pid) + return p + + +@app.post("/api/music/pipeline/{pid}/start", status_code=202) +async def start_pipeline(pid: int, bg: BackgroundTasks): + p = db.get_pipeline(pid) + if not p: + raise HTTPException(404) + if p["state"] != "created": + raise HTTPException(409, f"이미 시작됨 ({p['state']})") + bg.add_task(orchestrator.run_step, pid, "cover") + return {"ok": True} + + +@app.post("/api/music/pipeline/{pid}/feedback", status_code=202) +async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks): + p = db.get_pipeline(pid) + if not p: + raise HTTPException(404) + if p["state"] == "awaiting_manual": + raise HTTPException(409, "수동 개입 대기 중") + state = p["state"] + expected = f"{req.step}_pending" + if state != expected: + # 멱등 처리 — 이미 다음 단계로 넘어갔으면 무시 + return {"ok": True, "skipped": True} + + if req.intent == "approve": + next_st = next_state_on_approve(state) + db.update_pipeline_state(pid, next_st) + # 다음 단계가 *_pending이면 자동 생성 트리거; ai_review/publishing도 자동 트리거 + next_step = _state_to_step(next_st) + if next_step: + bg.add_task(orchestrator.run_step, pid, next_step) + return {"ok": True} + + elif req.intent == "reject": + count = db.increment_feedback_count(pid, req.step) + if count > 5: + db.update_pipeline_state(pid, "awaiting_manual") + raise HTTPException(409, "재생성 한도 초과") + if req.feedback_text: + db.record_feedback(pid, req.step, req.feedback_text) + bg.add_task(orchestrator.run_step, pid, req.step, req.feedback_text or "") + return {"ok": True} + + else: + raise HTTPException(400, f"unknown intent: {req.intent}") + + +def _state_to_step(state: str) -> str | None: + return { + "video_pending": "video", + "thumb_pending": "thumb", + "meta_pending": "meta", + "ai_review": "review", + "publish_pending": None, # 사용자 명시 발행 호출 필요 + "publishing": "publish", + }.get(state) + + +@app.post("/api/music/pipeline/{pid}/cancel") +def cancel_pipeline(pid: int): + p = db.get_pipeline(pid) + if not p: + raise HTTPException(404) + db.update_pipeline_state(pid, "cancelled") + return {"ok": True} + + +@app.post("/api/music/pipeline/{pid}/publish", status_code=202) +async def publish_pipeline(pid: int, bg: BackgroundTasks): + p = db.get_pipeline(pid) + if not p: + raise HTTPException(404) + if p["state"] != "publish_pending": + raise HTTPException(409, f"발행 단계 아님 ({p['state']})") + db.update_pipeline_state(pid, "publishing") + bg.add_task(orchestrator.run_step, pid, "publish") + return {"ok": True} + + +# --- Setup --- + +class SetupRequest(BaseModel): + metadata_template: dict | None = None + cover_prompts: dict | None = None + review_weights: dict | None = None + review_threshold: int | None = None + visual_defaults: dict | None = None + publish_policy: dict | None = None + + +@app.get("/api/music/setup") +def get_setup(): + return db.get_youtube_setup() + + +@app.put("/api/music/setup") +def put_setup(req: SetupRequest): + db.update_youtube_setup(**{k: v for k, v in req.dict().items() if v is not None}) + return db.get_youtube_setup() + + +# --- YouTube OAuth --- + +@app.get("/api/music/youtube/auth-url") +def youtube_auth_url(): + return {"url": yt_module.get_auth_url()} + + +@app.get("/api/music/youtube/callback") +async def youtube_callback(code: str): + info = await yt_module.exchange_code(code) + return info + + +@app.post("/api/music/youtube/disconnect") +def youtube_disconnect(): + yt_module.disconnect() + return {"ok": True} + + +@app.get("/api/music/youtube/status") +def youtube_status(): + return yt_module.get_status() or {"connected": False} +``` + +- [ ] **Step 5: Run tests** + +Run: `python -m pytest tests/test_pipeline_endpoints.py -v` +Expected: 8 PASS (BackgroundTask는 TestClient에서 동기 실행 — 테스트는 mock orchestrator 또는 첫 단계만 검증) + +> 참고: BackgroundTasks가 실제 cover.generate를 호출하면 OPENAI_API_KEY 없이 그라데이션 폴백을 만든다. 테스트에서는 `monkeypatch.setattr(orchestrator, "run_step", AsyncMock())`로 우회해도 됨. 위 테스트는 상태 검증 위주라 OK. + +- [ ] **Step 6: Commit** + +```bash +git add music-lab/app/pipeline/orchestrator.py music-lab/app/main.py \ + music-lab/tests/test_pipeline_endpoints.py +git commit -m "feat(music-lab): pipeline 오케스트레이터 + 13개 엔드포인트" +``` + +--- + +## Task 9: agent-office 자연어 의도 분류 + +**Files:** +- Create: `agent-office/app/agents/classify_intent.py` +- Test: `agent-office/tests/test_classify_intent.py` + +- [ ] **Step 1: Write failing test** + +```python +# tests/test_classify_intent.py +import pytest +import respx +from httpx import Response +from app.agents import classify_intent as ci + + +def test_clear_approve_no_llm(monkeypatch): + called = {"n": 0} + monkeypatch.setattr(ci, "_llm_classify", lambda t: (called.update(n=called["n"]+1), ("unclear", None))[1]) + assert ci.classify("승인") == ("approve", None) + assert ci.classify("OK") == ("approve", None) + assert ci.classify("진행") == ("approve", None) + assert ci.classify("agree") == ("approve", None) + assert called["n"] == 0 + + +def test_clear_reject_only_no_llm(monkeypatch): + monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None)) + assert ci.classify("반려") == ("reject", None) + assert ci.classify("거절") == ("reject", None) + + +def test_reject_with_text_split(monkeypatch): + monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None)) + intent, fb = ci.classify("반려, 제목 짧게") + assert intent == "reject" + assert "제목 짧게" in fb + + +@respx.mock +def test_ambiguous_calls_llm(monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "k") + respx.post("https://api.anthropic.com/v1/messages").mock( + return_value=Response(200, json={"content": [{"type": "text", + "text": '{"intent":"reject","feedback":"좀 더 화려하게"}'}]}) + ) + intent, fb = ci.classify("음... 좀 더 화려한 분위기가 좋겠어") + assert intent == "reject" + assert "화려하게" in fb +``` + +- [ ] **Step 2: Run, verify fail** + +Run: `cd agent-office && python -m pytest tests/test_classify_intent.py -v` +Expected: ImportError + +- [ ] **Step 3: Implement `classify_intent.py`** + +```python +"""텔레그램 사용자 응답 자연어 분류 — 화이트리스트 우선, 모호 시 LLM.""" +import os +import json +import logging +import httpx + +logger = logging.getLogger("agent-office.classify_intent") + +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +CLAUDE_HAIKU = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001") + +APPROVE_WORDS = { + "승인", "시작", "진행", "ok", "okay", "agree", + "네", "예", "좋아", "좋아요", "go", "yes", "y", +} +REJECT_WORDS = {"반려", "거절", "취소", "no", "nope", "n"} + + +def classify(text: str) -> tuple[str, str | None]: + """returns (intent, feedback) — intent ∈ {approve, reject, unclear}""" + if not text: + return ("unclear", None) + t = text.strip().lower() + if t in APPROVE_WORDS: + return ("approve", None) + if t in REJECT_WORDS: + return ("reject", None) + # 반려 단어로 시작 + 추가 텍스트 + for w in REJECT_WORDS: + if t.startswith(w): + rest = text.strip()[len(w):].lstrip(" ,.-:").strip() + if rest: + return ("reject", rest) + # 승인 단어로 시작 + 추가 텍스트(추가 텍스트 무시) + for w in APPROVE_WORDS: + if t.startswith(w + " ") or t == w: + return ("approve", None) + return _llm_classify(text) + + +def _llm_classify(text: str) -> tuple[str, str | None]: + if not ANTHROPIC_API_KEY: + return ("unclear", None) + prompt = ( + "사용자 응답을 분류하세요. JSON으로만 응답.\n" + f'응답: "{text}"\n\n' + '출력: {"intent":"approve|reject|unclear","feedback":"반려면 수정 방향, 아니면 빈 문자열"}' + ) + try: + resp = httpx.post( + "https://api.anthropic.com/v1/messages", + headers={"x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01"}, + json={"model": CLAUDE_HAIKU, "max_tokens": 200, + "messages": [{"role": "user", "content": prompt}]}, + timeout=15, + ) + resp.raise_for_status() + text_out = resp.json()["content"][0]["text"] + data = json.loads(text_out[text_out.find("{"):text_out.rfind("}")+1]) + return (data.get("intent", "unclear"), data.get("feedback") or None) + except Exception as e: + logger.warning("LLM 분류 실패: %s", e) + return ("unclear", None) +``` + +- [ ] **Step 4: Run tests** + +Run: `python -m pytest tests/test_classify_intent.py -v` +Expected: 4 PASS + +- [ ] **Step 5: Commit** + +```bash +git add agent-office/app/agents/classify_intent.py agent-office/tests/test_classify_intent.py +git commit -m "feat(agent-office): 텔레그램 자연어 의도 분류" +``` + +--- + +## Task 10: youtube_publisher 에이전트 + 폴링 잡 + +**Files:** +- Create: `agent-office/app/agents/youtube_publisher.py` +- Modify: `agent-office/app/agents/__init__.py` +- Modify: `agent-office/app/scheduler.py` +- Modify: `agent-office/app/service_proxy.py` +- Modify: `agent-office/app/telegram/conversational.py` +- Test: `agent-office/tests/test_pipeline_polling.py` + +- [ ] **Step 1: Add service_proxy helpers** + +`service_proxy.py`에 추가: + +```python +async def list_active_pipelines() -> list[dict]: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=active") + resp.raise_for_status() + return resp.json()["pipelines"] + + +async def get_pipeline(pid: int) -> dict: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}") + resp.raise_for_status() + return resp.json() + + +async def post_pipeline_feedback(pid: int, step: str, intent: str, + feedback_text: str | None = None) -> dict: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.post( + f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/feedback", + json={"step": step, "intent": intent, "feedback_text": feedback_text}, + ) + resp.raise_for_status() + return resp.json() +``` + +- [ ] **Step 2: Implement youtube_publisher** + +```python +# agents/youtube_publisher.py +"""텔레그램 단일 채널로 단계별 승인 인터랙션 오케스트레이션.""" +import logging +from typing import Optional + +from .base import BaseAgent +from . import classify_intent +from .. import service_proxy +from ..db import add_log +from ..telegram.messaging import send_raw + +logger = logging.getLogger("agent-office.youtube_publisher") + + +_STEP_TITLES = { + "cover_pending": ("커버 아트", "cover"), + "video_pending": ("영상 비주얼", "video"), + "thumb_pending": ("썸네일", "thumb"), + "meta_pending": ("메타데이터", "meta"), + "publish_pending": ("최종 검토 + 발행", "publish"), +} + + +class YoutubePublisherAgent(BaseAgent): + agent_id = "youtube_publisher" + display_name = "YouTube 퍼블리셔" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._notified_state_per_pipeline: dict[int, str] = {} + + async def poll_state_changes(self) -> None: + """30초마다 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" + try: + pipelines = await service_proxy.list_active_pipelines() + except Exception as e: + logger.warning("폴링 실패: %s", e) + return + + for p in pipelines: + state = p["state"] + pid = p["id"] + if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state: + await self._notify_step(p) + self._notified_state_per_pipeline[pid] = state + + async def _notify_step(self, pipeline: dict) -> None: + state = pipeline["state"] + title_name, step = _STEP_TITLES[state] + body = self._format_body(pipeline, step) + sent = await send_raw( + text=f"🎵 [{pipeline.get('track_title', 'Pipeline')}] {title_name} 검토\n\n{body}\n\n" + f"➡️ 답장으로 알려주세요: '승인' 또는 '반려 + 수정 방향'", + metadata={"pipeline_id": pipeline["id"], "step": step}, + ) + if sent.get("ok"): + add_log(self.agent_id, f"pipeline {pipeline['id']} {step} 알림 전송", "info") + + def _format_body(self, p: dict, step: str) -> str: + if step == "cover": + return f"🖼️ 커버: {p.get('cover_url', '-')}" + if step == "video": + return f"🎬 영상: {p.get('video_url', '-')}" + if step == "thumb": + return f"🎴 썸네일: {p.get('thumbnail_url', '-')}" + if step == "meta": + m = p.get("metadata", {}) + return (f"📝 제목: {m.get('title','')}\n" + f"🏷️ 태그: {', '.join(m.get('tags', [])[:8])}\n" + f"📄 설명(앞부분): {(m.get('description','') or '')[:200]}") + if step == "publish": + r = p.get("review", {}) + return (f"AI 검토 결과: {r.get('verdict','?')} " + f"(가중 {r.get('weighted_total','?')}/100)\n" + f"{r.get('summary','')}") + return "" + + async def on_telegram_reply(self, pipeline_id: int, step: str, user_text: str) -> None: + intent, feedback = classify_intent.classify(user_text) + if intent == "unclear": + await send_raw("다시 입력해주세요. 예: '승인' 또는 '반려, 제목 짧게'") + return + try: + await service_proxy.post_pipeline_feedback(pipeline_id, step, intent, feedback) + except Exception as e: + await send_raw(f"⚠️ 처리 실패: {e}") + + async def on_schedule(self) -> None: + # 폴링은 스케줄러에서 호출 + await self.poll_state_changes() + + async def on_command(self, command: str, params: dict) -> dict: + return {"ok": False, "message": f"Unknown command: {command}"} + + async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: + pass +``` + +- [ ] **Step 3: Register agent + scheduler** + +`agents/__init__.py`에 등록: +```python +from .youtube_publisher import YoutubePublisherAgent + +AGENT_REGISTRY = { + # ... existing ... + "youtube_publisher": YoutubePublisherAgent(...), +} +``` + +`scheduler.py`에 추가: +```python +async def _poll_pipelines(): + agent = AGENT_REGISTRY.get("youtube_publisher") + if agent: + await agent.poll_state_changes() + + +def init_scheduler(): + # ... existing jobs ... + scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll") + scheduler.start() +``` + +- [ ] **Step 4: Telegram reply routing** + +`telegram/conversational.py`에서 reply 메시지 수신 시: + +```python +# 기존 핸들러에 추가 +async def handle_reply(message: dict) -> None: + reply_to = message.get("reply_to_message", {}) + msg_id = reply_to.get("message_id") + if not msg_id: + return + # DB의 last_telegram_msg_ids에서 pipeline_id, step 찾기 + # (이는 messaging.send_raw가 metadata에 저장해두는 게 깔끔하지만, + # 현재 스펙은 단순하게 매칭 — 또는 DB 별도 테이블 telegram_message_links 추가) + from .. import service_proxy + from ..agents import AGENT_REGISTRY + + # 단순 구현: pipeline_id/step을 메시지 캡션 또는 DB에서 조회 + link = _lookup_message_link(msg_id) + if not link: + return + agent = AGENT_REGISTRY.get("youtube_publisher") + await agent.on_telegram_reply(link["pipeline_id"], link["step"], message.get("text", "")) +``` + +> **Note**: `_lookup_message_link`는 별도 테이블(`telegram_message_links`) 또는 `video_pipelines.last_telegram_msg_ids` JSON 사용. 본 task에선 후자 사용 — 추가 마이그레이션 없이 기존 컬럼 활용. + +`messaging.send_raw` 호출 후 반환된 message_id를 music-lab에 저장하기 위해 service_proxy에 헬퍼 추가 후 youtube_publisher에서 `_notify_step` 끝에 호출. + +```python +# service_proxy.py +async def save_pipeline_telegram_msg(pid: int, step: str, msg_id: int) -> None: + async with httpx.AsyncClient(timeout=10) as client: + await client.patch( + f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/telegram-msg", + json={"step": step, "message_id": msg_id}, + ) +``` + +music-lab `main.py`에 endpoint 추가: +```python +class TelegramMsgPatch(BaseModel): + step: str + message_id: int + +@app.patch("/api/music/pipeline/{pid}/telegram-msg") +def save_telegram_msg(pid: int, req: TelegramMsgPatch): + p = db.get_pipeline(pid) + if not p: raise HTTPException(404) + ids = p["last_telegram_msg_ids"] + ids[req.step] = req.message_id + conn = sqlite3.connect(db.DB_PATH) + conn.execute("UPDATE video_pipelines SET last_telegram_msg_ids = ?, updated_at = ? WHERE id = ?", + (json.dumps(ids), db._now(), pid)) + conn.commit(); conn.close() + return {"ok": True} +``` + +reply 매칭용 `lookup` endpoint: +```python +@app.get("/api/music/pipeline/lookup-by-msg/{msg_id}") +def lookup_by_msg(msg_id: int): + # DB 전체 스캔 (소수의 active 파이프라인만 — 성능 OK) + for p in db.list_pipelines(active_only=True): + for step, mid in p["last_telegram_msg_ids"].items(): + if mid == msg_id: + return {"pipeline_id": p["id"], "step": step} + raise HTTPException(404) +``` + +agent-office `_lookup_message_link` 구현: +```python +def _lookup_message_link(msg_id: int) -> Optional[dict]: + import httpx + from ..config import MUSIC_LAB_URL + try: + resp = httpx.get(f"{MUSIC_LAB_URL}/api/music/pipeline/lookup-by-msg/{msg_id}", timeout=5) + if resp.status_code == 200: + return resp.json() + except Exception: + pass + return None +``` + +- [ ] **Step 5: Polling test** + +```python +# tests/test_pipeline_polling.py +import pytest +from unittest.mock import AsyncMock, patch +from app.agents.youtube_publisher import YoutubePublisherAgent + + +@pytest.mark.asyncio +@patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[{"id": 1, "state": "cover_pending", "cover_url": "/x.jpg"}])) +@patch("app.agents.youtube_publisher.send_raw", new=AsyncMock(return_value={"ok": True, "message_id": 99})) +async def test_poll_notifies_once_per_state(monkeypatch): + a = YoutubePublisherAgent(ws_manager=None) + await a.poll_state_changes() + await a.poll_state_changes() # 같은 상태 — 두 번째는 알림 안 함 + from app.agents.youtube_publisher import send_raw as sr + assert sr.call_count == 1 +``` + +- [ ] **Step 6: Run tests** + +Run: `python -m pytest tests/test_pipeline_polling.py -v` +Expected: PASS + +- [ ] **Step 7: Commit** + +```bash +git add agent-office/app/agents/youtube_publisher.py \ + agent-office/app/agents/__init__.py \ + agent-office/app/scheduler.py \ + agent-office/app/service_proxy.py \ + agent-office/app/telegram/conversational.py \ + agent-office/tests/test_pipeline_polling.py \ + music-lab/app/main.py +git commit -m "feat(agent-office): youtube_publisher 에이전트 + 30s 폴링" +``` + +--- + +## Task 11: 프론트엔드 — api.js 헬퍼 + +**Files:** +- Modify: `web-ui/src/api.js` + +- [ ] **Step 1: Add helpers** + +`src/api.js` 끝에 추가: + +```javascript +// --- Music Pipeline --- +export const listPipelines = (status='all') => apiGet(`/api/music/pipeline?status=${status}`); +export const getPipeline = (id) => apiGet(`/api/music/pipeline/${id}`); +export const createPipeline = (track_id) => apiPost('/api/music/pipeline', { track_id }); +export const startPipeline = (id) => apiPost(`/api/music/pipeline/${id}/start`); +export const cancelPipeline = (id) => apiPost(`/api/music/pipeline/${id}/cancel`); +export const publishPipeline = (id) => apiPost(`/api/music/pipeline/${id}/publish`); + +// --- Music Setup --- +export const getMusicSetup = () => apiGet('/api/music/setup'); +export const updateMusicSetup = (payload) => apiPut('/api/music/setup', payload); + +// --- YouTube OAuth --- +export const getYoutubeAuthUrl = () => apiGet('/api/music/youtube/auth-url'); +export const getYoutubeStatus = () => apiGet('/api/music/youtube/status'); +export const disconnectYoutube = () => apiPost('/api/music/youtube/disconnect'); +``` + +- [ ] **Step 2: Commit** + +```bash +git -C web-ui add src/api.js +git -C web-ui commit -m "feat(web-ui): pipeline/setup/youtube API 헬퍼" +``` + +--- + +## Task 12: 프론트엔드 — SetupTab + +**Files:** +- Create: `web-ui/src/pages/music/components/SetupTab.jsx` +- Modify: `web-ui/src/pages/music/MusicStudio.css` (스타일) + +- [ ] **Step 1: Implement SetupTab** + +```jsx +// SetupTab.jsx +import { useEffect, useState } from 'react'; +import { + getMusicSetup, updateMusicSetup, + getYoutubeAuthUrl, getYoutubeStatus, disconnectYoutube, +} from '../../../api'; + +export default function SetupTab() { + const [setup, setSetup] = useState(null); + const [yt, setYt] = useState(null); + const [saving, setSaving] = useState(false); + const [error, setError] = useState(''); + + useEffect(() => { + Promise.all([getMusicSetup(), getYoutubeStatus()]) + .then(([s, y]) => { setSetup(s); setYt(y); }) + .catch(e => setError(String(e))); + }, []); + + if (!setup) return

Loading…

; + + const save = async (patch) => { + setSaving(true); + try { + const next = await updateMusicSetup(patch); + setSetup(next); + } catch (e) { setError(String(e)); } + finally { setSaving(false); } + }; + + const connectYoutube = async () => { + const { url } = await getYoutubeAuthUrl(); + window.location.href = url; // OAuth flow + }; + + return ( +
+ {error &&
{error}
} + +
+

YouTube 채널 연동

+ {yt && yt.channel_id ? ( +
+ + {yt.channel_title} + +
+ ) : ( + + )} +
+ +
+

메타데이터 템플릿

+ +