Files
web-page-backend/docs/superpowers/plans/2026-05-07-music-youtube-pipeline.md
gahusb e03d074222 docs(plan): Music YouTube 파이프라인 구현 계획 — 16 task
스펙 2026-05-07-music-youtube-pipeline-design.md를 16개 task로 분해.
TDD 패턴: 각 task = 실패 테스트 → 구현 → 통과 → 커밋.

태스크 흐름:
1. DB 5개 테이블 + 헬퍼
2. 상태 머신
3. Storage + 커버 (DALL·E + 폴백)
4. 영상/썸네일 (FFmpeg)
5. 메타데이터 (Claude Haiku)
6. AI 검토 4축 (Claude Sonnet + 휴리스틱)
7. YouTube OAuth + 업로드
8. 오케스트레이터 + 13 엔드포인트
9. agent-office 자연어 의도 분류
10. youtube_publisher 에이전트 + 30s 폴링
11. web-ui api.js 헬퍼
12. SetupTab
13. PipelineTab + 카드
14. YoutubeTab 6 서브탭 + Library 트리거
15. docker-compose env + nginx
16. 통합 테스트 + 수동 E2E

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:23:46 +09:00

3326 lines
118 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Music YouTube Pipeline Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 트랙 → 영상 → YouTube 발행까지 단계별 텔레그램 승인 파이프라인 구축. 각 단계 자동 산출물 생성 → 텔레그램 알림 → 자연어 응답으로 승인/반려/피드백 처리 → 다음 단계 또는 재생성.
**Architecture:** music-lab(생성+상태머신+OAuth+업로드) ↔ agent-office(텔레그램 단일 채널 + 자연어 분류 + 폴링 오케스트레이션) ↔ web-ui(구성·진행 탭 + Library 트리거). 모든 AI/생성 작업은 BackgroundTask + DB job 상태 폴링.
**Tech Stack:**
- music-lab: FastAPI, SQLite, Anthropic Claude SDK, OpenAI SDK, google-api-python-client, google-auth-oauthlib, FFmpeg, Pillow
- agent-office: FastAPI, APScheduler, python-telegram (기존), Anthropic SDK
- web-ui: React 18, Vite, fetch-based API helpers (기존 `src/api.js`)
- Tests: pytest + httpx 모킹 (`respx`/`httpx_mock`), freezegun, 기존 conftest 패턴
**Spec:** `docs/superpowers/specs/2026-05-07-music-youtube-pipeline-design.md`
---
## File Structure
### music-lab (`web-backend/music-lab/`)
| 경로 | 책임 |
|------|------|
| `app/db.py` (modify) | 5개 신규 테이블 + 헬퍼 함수 |
| `app/pipeline/__init__.py` (new) | 패키지 초기화 |
| `app/pipeline/state_machine.py` (new) | 상태 전이 검증, `transition()` 함수 |
| `app/pipeline/orchestrator.py` (new) | `start_step(pipeline_id, step)` BackgroundTask 등록 |
| `app/pipeline/cover.py` (new) | DALL·E 3 호출 + 그라데이션 폴백 |
| `app/pipeline/video.py` (move/rename from video_producer.py) | FFmpeg visualizer/슬라이드쇼 (커버 입력 지원) |
| `app/pipeline/thumb.py` (new) | 썸네일 추출 + 텍스트 오버레이 |
| `app/pipeline/metadata.py` (new) | Claude Haiku 메타 생성 + 템플릿 치환 |
| `app/pipeline/review.py` (new) | Claude Sonnet 4축 검토 + 가중평균 |
| `app/pipeline/youtube.py` (new) | OAuth flow + resumable upload |
| `app/pipeline/storage.py` (new) | `/data/videos/{id}/` 디렉토리 관리 |
| `app/pipeline/setup.py` (new) | youtube_setup CRUD |
| `app/main.py` (modify) | 13개 엔드포인트 추가 |
| `tests/test_state_machine.py` (new) | 전이 검증 |
| `tests/test_pipeline_endpoints.py` (new) | CRUD + feedback |
| `tests/test_cover_generation.py` (new) | DALL·E mock + 폴백 |
| `tests/test_metadata_generation.py` (new) | Claude mock + 템플릿 |
| `tests/test_review.py` (new) | 4축 검토 + verdict |
| `tests/test_youtube_upload.py` (new) | google-api mock + retry |
| `requirements.txt` (modify) | openai, google-api-python-client, google-auth-oauthlib |
### agent-office (`web-backend/agent-office/`)
| 경로 | 책임 |
|------|------|
| `app/agents/youtube_publisher.py` (new) | 오케스트레이터 — poll + classify + feedback |
| `app/agents/__init__.py` (modify) | AGENT_REGISTRY 등록 |
| `app/scheduler.py` (modify) | `_poll_pipelines` 30초 잡 추가 |
| `app/telegram/conversational.py` (modify) | reply 매칭 → youtube_publisher 라우팅 |
| `app/service_proxy.py` (modify) | music-lab pipeline 헬퍼 |
| `tests/test_classify_intent.py` (new) | 화이트리스트/LLM 분기 |
| `tests/test_pipeline_polling.py` (new) | 멱등 폴링 |
### web-ui (`web-ui/`)
| 경로 | 책임 |
|------|------|
| `src/api.js` (modify) | pipeline/setup/youtube 헬퍼 |
| `src/pages/music/components/SetupTab.jsx` (new) | 구성 탭 |
| `src/pages/music/components/PipelineTab.jsx` (new) | 진행 탭 |
| `src/pages/music/components/PipelineCard.jsx` (new) | 카드 1장 (진행도/현재상태/피드백) |
| `src/pages/music/components/PipelineStartModal.jsx` (new) | Library 트랙 선택 모달 |
| `src/pages/music/components/YoutubeTab.jsx` (modify) | 서브탭 6개로 |
| `src/pages/music/components/Library.jsx` 또는 MusicStudio 라이브러리 부분 (modify) | "🎬 영상 파이프라인" 버튼 |
| `src/pages/music/MusicStudio.css` (modify) | 진행 탭/구성 탭 스타일 |
### docker-compose / nginx
| 경로 | 변경 |
|------|------|
| `docker-compose.yml` (modify) | music-lab env 4개 추가 |
| `nginx/conf.d/default.conf` (modify) | `/api/music/youtube/callback` 외부 노출 |
| `music-lab/Dockerfile` (modify) | 새 의존성 빌드 |
---
## Task 1: music-lab DB 신규 테이블 + 헬퍼
**Files:**
- Modify: `music-lab/app/db.py`
- Test: `music-lab/tests/test_pipeline_db.py`
- [ ] **Step 1: Write the failing test** `tests/test_pipeline_db.py`
```python
import os
import tempfile
import pytest
from app import db
@pytest.fixture
def fresh_db(monkeypatch, tmp_path):
db_path = tmp_path / "music.db"
monkeypatch.setattr(db, "DB_PATH", str(db_path))
db.init_db()
return db_path
def test_create_pipeline_inserts_row(fresh_db):
pid = db.create_pipeline(track_id=1)
row = db.get_pipeline(pid)
assert row["id"] == pid
assert row["state"] == "created"
assert row["track_id"] == 1
assert row["feedback_count_per_step"] == {}
def test_update_pipeline_state_records_started_at(fresh_db, freezer):
pid = db.create_pipeline(track_id=1)
freezer.move_to("2026-05-07T08:00:00")
db.update_pipeline_state(pid, "cover_pending")
row = db.get_pipeline(pid)
assert row["state"] == "cover_pending"
assert row["state_started_at"] == "2026-05-07T08:00:00"
def test_increment_feedback_count(fresh_db):
pid = db.create_pipeline(track_id=1)
db.increment_feedback_count(pid, "cover")
db.increment_feedback_count(pid, "cover")
row = db.get_pipeline(pid)
assert row["feedback_count_per_step"] == {"cover": 2}
def test_record_feedback(fresh_db):
pid = db.create_pipeline(track_id=1)
db.record_feedback(pid, "cover", "더 어둡게")
rows = db.get_feedback_history(pid)
assert len(rows) == 1
assert rows[0]["feedback_text"] == "더 어둡게"
def test_create_pipeline_job_lifecycle(fresh_db):
pid = db.create_pipeline(track_id=1)
job_id = db.create_pipeline_job(pid, "cover")
db.update_pipeline_job(job_id, status="running")
db.update_pipeline_job(job_id, status="succeeded", duration_ms=1234)
jobs = db.list_pipeline_jobs(pid)
assert jobs[0]["status"] == "succeeded"
assert jobs[0]["duration_ms"] == 1234
def test_youtube_setup_default_row_created_on_init(fresh_db):
setup = db.get_youtube_setup()
assert setup["review_threshold"] == 60
assert "metadata_template_json" in setup
def test_youtube_oauth_token_upsert(fresh_db):
db.upsert_oauth_token(
channel_id="UC123",
channel_title="My Channel",
avatar_url="https://...",
refresh_token="r1",
access_token="a1",
expires_at="2026-05-07T09:00:00",
)
tok = db.get_oauth_token()
assert tok["channel_id"] == "UC123"
assert tok["refresh_token"] == "r1"
db.upsert_oauth_token(
channel_id="UC123", channel_title="My Channel",
avatar_url=None, refresh_token="r2",
access_token="a2", expires_at="2026-05-07T10:00:00",
)
tok = db.get_oauth_token()
assert tok["refresh_token"] == "r2" # upsert
```
`requirements.txt``freezegun` 추가 필요.
- [ ] **Step 2: Run test to verify it fails**
Run: `cd music-lab && python -m pytest tests/test_pipeline_db.py -v`
Expected: ImportError on `db.create_pipeline` 등 (함수 미존재)
- [ ] **Step 3: Add tables and helpers to `db.py`**
`init_db()` 끝에 다음 5개 `CREATE TABLE IF NOT EXISTS` 추가:
```python
def init_db():
# ... existing tables ...
cursor.execute("""
CREATE TABLE IF NOT EXISTS video_pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER NOT NULL,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS pipeline_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
step TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
error TEXT,
started_at TEXT,
finished_at TEXT,
duration_ms INTEGER,
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS pipeline_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
step TEXT NOT NULL,
feedback_text TEXT NOT NULL,
received_at TEXT NOT NULL,
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS youtube_oauth_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
channel_title TEXT,
avatar_url TEXT,
refresh_token TEXT NOT NULL,
access_token TEXT,
expires_at TEXT,
created_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS youtube_setup (
id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1),
metadata_template_json TEXT NOT NULL,
cover_prompts_json TEXT NOT NULL,
review_weights_json TEXT NOT NULL,
review_threshold INTEGER NOT NULL DEFAULT 60,
visual_defaults_json TEXT NOT NULL,
publish_policy_json TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 기본 setup 1행 보장
cursor.execute("SELECT COUNT(*) FROM youtube_setup")
if cursor.fetchone()[0] == 0:
import json
from datetime import datetime
defaults = (
json.dumps({
"title": "[{genre}] {title} | {bpm}BPM",
"description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n",
"tags": ["lofi", "chill", "instrumental"],
"category_id": 10,
}),
json.dumps({
"lo-fi": "moody anime cityscape at dusk, lofi aesthetic",
"phonk": "dark drift car aesthetic, neon, phonk vibe",
"ambient": "ethereal mountain landscape, ambient mood",
"default": "abstract music album cover art",
}),
json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}),
60,
json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}),
json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}),
datetime.utcnow().isoformat(timespec="seconds"),
)
cursor.execute("""
INSERT INTO youtube_setup
(metadata_template_json, cover_prompts_json, review_weights_json,
review_threshold, visual_defaults_json, publish_policy_json, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", defaults)
conn.commit()
conn.close()
```
다음 헬퍼 함수 추가 (파일 하단에):
```python
import json as _json
from datetime import datetime as _dt
def _now() -> str:
return _dt.utcnow().isoformat(timespec="seconds")
def create_pipeline(track_id: int) -> int:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
now = _now()
cur.execute("""
INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at)
VALUES (?, 'created', ?, ?, ?)
""", (track_id, now, now, now))
pid = cur.lastrowid
conn.commit()
conn.close()
return pid
def get_pipeline(pid: int) -> dict | None:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM video_pipelines WHERE id = ?", (pid,)).fetchone()
conn.close()
if not row:
return None
d = dict(row)
d["feedback_count_per_step"] = _json.loads(d["feedback_count_per_step"] or "{}")
d["last_telegram_msg_ids"] = _json.loads(d["last_telegram_msg_ids"] or "{}")
if d.get("metadata_json"):
d["metadata"] = _json.loads(d["metadata_json"])
if d.get("review_json"):
d["review"] = _json.loads(d["review_json"])
return d
def update_pipeline_state(pid: int, state: str, **fields) -> None:
cols = ["state = ?", "state_started_at = ?", "updated_at = ?"]
vals = [state, _now(), _now()]
for k, v in fields.items():
cols.append(f"{k} = ?")
vals.append(v)
vals.append(pid)
conn = sqlite3.connect(DB_PATH)
conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals)
conn.commit()
conn.close()
def list_pipelines(active_only: bool = False) -> list[dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
if active_only:
rows = conn.execute("""
SELECT * FROM video_pipelines
WHERE state NOT IN ('published','cancelled','failed','awaiting_manual')
ORDER BY created_at DESC
""").fetchall()
else:
rows = conn.execute("SELECT * FROM video_pipelines ORDER BY created_at DESC").fetchall()
conn.close()
return [get_pipeline(r["id"]) for r in rows]
def increment_feedback_count(pid: int, step: str) -> int:
p = get_pipeline(pid)
counts = p["feedback_count_per_step"]
counts[step] = counts.get(step, 0) + 1
conn = sqlite3.connect(DB_PATH)
conn.execute("UPDATE video_pipelines SET feedback_count_per_step = ?, updated_at = ? WHERE id = ?",
(_json.dumps(counts), _now(), pid))
conn.commit()
conn.close()
return counts[step]
def record_feedback(pid: int, step: str, feedback_text: str) -> None:
conn = sqlite3.connect(DB_PATH)
conn.execute("""
INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at)
VALUES (?, ?, ?, ?)
""", (pid, step, feedback_text, _now()))
conn.commit()
conn.close()
def get_feedback_history(pid: int) -> list[dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT * FROM pipeline_feedback
WHERE pipeline_id = ? ORDER BY id DESC
""", (pid,)).fetchall()
conn.close()
return [dict(r) for r in rows]
def create_pipeline_job(pid: int, step: str) -> int:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("""
INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at)
VALUES (?, ?, 'queued', ?)
""", (pid, step, _now()))
job_id = cur.lastrowid
conn.commit()
conn.close()
return job_id
def update_pipeline_job(job_id: int, **fields) -> None:
if "status" in fields and fields["status"] in ("succeeded", "failed"):
fields["finished_at"] = _now()
cols = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [job_id]
conn = sqlite3.connect(DB_PATH)
conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals)
conn.commit()
conn.close()
def list_pipeline_jobs(pid: int) -> list[dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC
""", (pid,)).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_youtube_setup() -> dict:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
conn.close()
d = dict(row)
for k in ("metadata_template_json", "cover_prompts_json",
"review_weights_json", "visual_defaults_json", "publish_policy_json"):
d[k.replace("_json", "")] = _json.loads(d[k])
return d
def update_youtube_setup(**kwargs) -> None:
field_map = {
"metadata_template": "metadata_template_json",
"cover_prompts": "cover_prompts_json",
"review_weights": "review_weights_json",
"visual_defaults": "visual_defaults_json",
"publish_policy": "publish_policy_json",
}
cols = []
vals = []
for k, v in kwargs.items():
if k in field_map:
cols.append(f"{field_map[k]} = ?")
vals.append(_json.dumps(v))
elif k == "review_threshold":
cols.append("review_threshold = ?")
vals.append(int(v))
if not cols:
return
cols.append("updated_at = ?")
vals.append(_now())
conn = sqlite3.connect(DB_PATH)
conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals)
conn.commit()
conn.close()
def upsert_oauth_token(channel_id: str, channel_title: str | None,
avatar_url: str | None, refresh_token: str,
access_token: str | None, expires_at: str | None) -> None:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("DELETE FROM youtube_oauth_tokens")
cur.execute("""
INSERT INTO youtube_oauth_tokens
(channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now()))
conn.commit()
conn.close()
def get_oauth_token() -> dict | None:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone()
conn.close()
return dict(row) if row else None
def delete_oauth_token() -> None:
conn = sqlite3.connect(DB_PATH)
conn.execute("DELETE FROM youtube_oauth_tokens")
conn.commit()
conn.close()
```
- [ ] **Step 4: Run tests to verify pass**
Run: `cd music-lab && python -m pytest tests/test_pipeline_db.py -v`
Expected: 모두 PASS (7 tests)
- [ ] **Step 5: Commit**
```bash
git add music-lab/app/db.py music-lab/tests/test_pipeline_db.py music-lab/requirements.txt
git commit -m "feat(music-lab): pipeline 5개 DB 테이블 + 헬퍼"
```
---
## Task 2: 상태 머신
**Files:**
- Create: `music-lab/app/pipeline/__init__.py`
- Create: `music-lab/app/pipeline/state_machine.py`
- Test: `music-lab/tests/test_state_machine.py`
- [ ] **Step 1: Write the failing test**
```python
# tests/test_state_machine.py
import pytest
from app.pipeline.state_machine import (
next_state_on_approve, next_state_on_reject, can_transition, STEPS, USER_GATES,
)
def test_steps_sequence():
assert STEPS == ["cover", "video", "thumb", "meta", "review", "publish"]
def test_user_gates_excludes_review():
assert "review" not in USER_GATES
assert "publish" in USER_GATES
assert "cover" in USER_GATES
def test_approve_progression():
assert next_state_on_approve("cover_pending") == "video_pending"
assert next_state_on_approve("video_pending") == "thumb_pending"
assert next_state_on_approve("thumb_pending") == "meta_pending"
assert next_state_on_approve("meta_pending") == "ai_review"
assert next_state_on_approve("publish_pending") == "publishing"
def test_approve_invalid_state_raises():
with pytest.raises(ValueError):
next_state_on_approve("ai_review") # 자동 전이 — approve 호출 자체가 무효
def test_reject_keeps_same_state():
# 반려는 같은 *_pending 상태를 유지(재생성 트리거)
assert next_state_on_reject("cover_pending") == "cover_pending"
assert next_state_on_reject("publish_pending") == "publish_pending"
def test_can_transition_blocks_terminal_states():
assert not can_transition("published", "cover_pending")
assert not can_transition("cancelled", "cover_pending")
assert not can_transition("failed", "cover_pending")
def test_can_transition_allows_cancel_from_anywhere():
assert can_transition("cover_pending", "cancelled")
assert can_transition("publishing", "cancelled")
def test_can_transition_allows_failed_from_pending():
assert can_transition("video_pending", "failed")
assert can_transition("publishing", "failed")
```
- [ ] **Step 2: Run test to verify it fails**
Run: `cd music-lab && python -m pytest tests/test_state_machine.py -v`
Expected: ImportError
- [ ] **Step 3: Implement `state_machine.py`**
`app/pipeline/__init__.py`:
```python
# 빈 파일
```
`app/pipeline/state_machine.py`:
```python
"""파이프라인 상태 머신 — 전이 규칙 단일 소스."""
STEPS = ["cover", "video", "thumb", "meta", "review", "publish"]
USER_GATES = ["cover", "video", "thumb", "meta", "publish"] # review는 자동
_APPROVE_NEXT = {
"cover_pending": "video_pending",
"video_pending": "thumb_pending",
"thumb_pending": "meta_pending",
"meta_pending": "ai_review", # 자동 검토 단계로
"publish_pending": "publishing",
}
TERMINAL_STATES = {"published", "cancelled", "failed", "awaiting_manual"}
def next_state_on_approve(state: str) -> str:
if state not in _APPROVE_NEXT:
raise ValueError(f"승인 불가 상태: {state}")
return _APPROVE_NEXT[state]
def next_state_on_reject(state: str) -> str:
if not state.endswith("_pending"):
raise ValueError(f"반려 불가 상태: {state}")
return state # 같은 상태 유지 (재생성 후 다시 _pending)
def can_transition(from_state: str, to_state: str) -> bool:
if from_state in TERMINAL_STATES:
return False
if to_state in {"cancelled", "failed", "awaiting_manual"}:
return True
if to_state == _APPROVE_NEXT.get(from_state):
return True
# 자동 전이 (ai_review → publish_pending, publishing → published)
auto_transitions = {
("ai_review", "publish_pending"),
("publishing", "published"),
}
return (from_state, to_state) in auto_transitions
```
- [ ] **Step 4: Run tests**
Run: `cd music-lab && python -m pytest tests/test_state_machine.py -v`
Expected: 8 PASS
- [ ] **Step 5: Commit**
```bash
git add music-lab/app/pipeline/ music-lab/tests/test_state_machine.py
git commit -m "feat(music-lab): pipeline 상태 머신"
```
---
## Task 3: Storage 헬퍼 + Cover 생성 (DALL·E + 폴백)
**Files:**
- Create: `music-lab/app/pipeline/storage.py`
- Create: `music-lab/app/pipeline/cover.py`
- Test: `music-lab/tests/test_cover_generation.py`
- Modify: `music-lab/requirements.txt` (`openai`, `respx`)
- [ ] **Step 1: Add deps**
`requirements.txt`에 추가: `openai>=1.20.0`, `respx>=0.21`, `freezegun>=1.4`
- [ ] **Step 2: Write failing test**
```python
# tests/test_cover_generation.py
import pytest
import respx
from httpx import Response
from app.pipeline import cover, storage
@pytest.fixture
def tmp_storage(monkeypatch, tmp_path):
monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path))
return tmp_path
@pytest.mark.asyncio
@respx.mock
async def test_dalle_success_saves_jpg(tmp_storage, monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "test-key")
image_url = "https://oaidalleapiprodscus.blob.core.windows.net/x.png"
respx.post("https://api.openai.com/v1/images/generations").mock(
return_value=Response(200, json={"data": [{"url": image_url}]})
)
# PNG 1x1 픽셀 (간단)
png_bytes = bytes.fromhex(
"89504e470d0a1a0a0000000d49484452000000010000000108020000009077"
"53de0000000c4944415478da6300010000050001"
)
respx.get(image_url).mock(return_value=Response(200, content=png_bytes))
out = await cover.generate(pipeline_id=42, genre="lo-fi",
prompt_template="moody anime", mood="chill")
assert out["used_fallback"] is False
assert out["url"].startswith("/media/videos/42/cover")
assert (tmp_storage / "42" / "cover.jpg").exists()
@pytest.mark.asyncio
@respx.mock
async def test_dalle_timeout_falls_back_to_gradient(tmp_storage, monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "test-key")
respx.post("https://api.openai.com/v1/images/generations").mock(
side_effect=lambda req: Response(504)
)
out = await cover.generate(pipeline_id=43, genre="phonk",
prompt_template="dark drift", mood="aggressive",
track_title="Midnight Drive")
assert out["used_fallback"] is True
assert (tmp_storage / "43" / "cover.jpg").exists()
@pytest.mark.asyncio
async def test_no_api_key_falls_back(tmp_storage, monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
out = await cover.generate(pipeline_id=44, genre="ambient",
prompt_template="x", mood="calm",
track_title="Calm")
assert out["used_fallback"] is True
@pytest.mark.asyncio
@respx.mock
async def test_dalle_with_feedback_appends_to_prompt(tmp_storage, monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "test-key")
captured = {}
def hook(req):
import json as _json
captured["body"] = _json.loads(req.content)
return Response(200, json={"data": [{"url": "https://x"}]})
respx.post("https://api.openai.com/v1/images/generations").mock(side_effect=hook)
respx.get("https://x").mock(return_value=Response(200, content=b"\x89PNG\r\n\x1a\n"))
await cover.generate(pipeline_id=45, genre="lo-fi",
prompt_template="moody anime", mood="chill",
feedback="더 어둡게")
assert "더 어둡게" in captured["body"]["prompt"]
```
`pytest.ini` 또는 `conftest.py``asyncio_mode = auto` 또는 마커 등록 필요. 기존 conftest 확인.
- [ ] **Step 3: Run test, verify fail**
Run: `python -m pytest tests/test_cover_generation.py -v`
Expected: ImportError
- [ ] **Step 4: Implement `storage.py`**
```python
"""파이프라인 산출물 디렉토리 관리."""
import os
VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")
def pipeline_dir(pipeline_id: int) -> str:
path = os.path.join(VIDEO_DATA_DIR, str(pipeline_id))
os.makedirs(path, exist_ok=True)
return path
def media_url(pipeline_id: int, filename: str) -> str:
return f"{VIDEO_MEDIA_BASE}/{pipeline_id}/{filename}"
```
- [ ] **Step 5: Implement `cover.py`**
```python
"""AI 커버 아트 생성 — DALL·E 3 + 그라데이션 폴백."""
import os
import logging
from typing import Optional
import httpx
from PIL import Image, ImageDraw, ImageFont
from . import storage
logger = logging.getLogger("music-lab.cover")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_MODEL = os.getenv("OPENAI_IMAGE_MODEL", "gpt-image-1")
DALLE_TIMEOUT_S = 90
# 그라데이션 폴백 색 (장르별 RGB 페어)
GRADIENT_COLORS = {
"lo-fi": ((26, 26, 46), (22, 33, 62)),
"phonk": ((26, 10, 10), (45, 0, 0)),
"ambient": ((13, 33, 55), (10, 22, 40)),
"pop": ((26, 10, 46), (45, 27, 78)),
"default": ((17, 24, 39), (31, 41, 55)),
}
async def generate(*, pipeline_id: int, genre: str, prompt_template: str,
mood: str = "", track_title: str = "", feedback: str = "") -> dict:
"""커버 아트 생성. 성공 시 jpg 저장 + URL 반환. 실패 시 그라데이션 폴백.
반환: {"url": str, "used_fallback": bool, "error": str | None}
"""
out_path = os.path.join(storage.pipeline_dir(pipeline_id), "cover.jpg")
used_fallback = False
error = None
if OPENAI_API_KEY:
try:
await _generate_with_dalle(prompt_template, mood, feedback, out_path)
except Exception as e:
logger.warning("DALL·E 실패 — 폴백: %s", e)
error = str(e)
used_fallback = True
_generate_gradient(genre, track_title, out_path)
else:
used_fallback = True
error = "OPENAI_API_KEY 미설정"
_generate_gradient(genre, track_title, out_path)
return {
"url": storage.media_url(pipeline_id, "cover.jpg"),
"used_fallback": used_fallback,
"error": error,
}
async def _generate_with_dalle(prompt_template: str, mood: str,
feedback: str, out_path: str) -> None:
prompt = prompt_template
if mood:
prompt = f"{prompt}, {mood} mood"
if feedback:
prompt = f"{prompt}. 추가 지시: {feedback}"
prompt = f"{prompt}, no text, high quality"
async with httpx.AsyncClient(timeout=DALLE_TIMEOUT_S) as client:
resp = await client.post(
"https://api.openai.com/v1/images/generations",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
json={"model": OPENAI_MODEL, "prompt": prompt, "size": "1024x1024", "n": 1},
)
resp.raise_for_status()
url = resp.json()["data"][0]["url"]
img_resp = await client.get(url)
img_resp.raise_for_status()
# PNG → JPG 변환
from io import BytesIO
img = Image.open(BytesIO(img_resp.content)).convert("RGB")
img.save(out_path, "JPEG", quality=92)
def _generate_gradient(genre: str, track_title: str, out_path: str) -> None:
w, h = 1024, 1024
top, bot = GRADIENT_COLORS.get(genre.lower(), GRADIENT_COLORS["default"])
img = Image.new("RGB", (w, h))
px = img.load()
for y in range(h):
t = y / h
r = int(top[0] + (bot[0] - top[0]) * t)
g = int(top[1] + (bot[1] - top[1]) * t)
b = int(top[2] + (bot[2] - top[2]) * t)
for x in range(w):
px[x, y] = (r, g, b)
if track_title:
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 64)
except OSError:
font = ImageFont.load_default()
draw = ImageDraw.Draw(img)
bbox = draw.textbbox((0, 0), track_title, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(((w - tw) // 2, (h - th) // 2), track_title, fill=(255, 255, 255), font=font)
img.save(out_path, "JPEG", quality=92)
```
- [ ] **Step 6: Run tests**
Run: `python -m pytest tests/test_cover_generation.py -v`
Expected: 4 PASS
- [ ] **Step 7: Commit**
```bash
git add music-lab/app/pipeline/storage.py music-lab/app/pipeline/cover.py \
music-lab/tests/test_cover_generation.py music-lab/requirements.txt
git commit -m "feat(music-lab): AI 커버 생성 + 그라데이션 폴백"
```
---
## Task 4: 영상/썸네일 생성 (FFmpeg, 기존 video_producer 이전)
**Files:**
- Create: `music-lab/app/pipeline/video.py` (move logic from `app/video_producer.py`)
- Create: `music-lab/app/pipeline/thumb.py`
- Test: `music-lab/tests/test_video_thumb.py`
- [ ] **Step 1: Write failing test**
FFmpeg 직접 실행 대신 subprocess.run을 mock 처리.
```python
# tests/test_video_thumb.py
import os
import pytest
from unittest.mock import patch, MagicMock
from app.pipeline import video, thumb, storage
@pytest.fixture
def tmp_storage(monkeypatch, tmp_path):
monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path))
# 더미 입력 파일들
audio = tmp_path / "audio.mp3"
audio.write_bytes(b"\x00" * 100)
cover = tmp_path / str(50) / "cover.jpg"
cover.parent.mkdir(parents=True)
cover.write_bytes(b"\x00" * 100)
return tmp_path
@patch("subprocess.run")
def test_generate_video_calls_ffmpeg(mock_run, tmp_storage):
mock_run.return_value = MagicMock(returncode=0, stderr="")
out = video.generate(pipeline_id=50, audio_path=str(tmp_storage / "audio.mp3"),
cover_path=str(tmp_storage / "50" / "cover.jpg"),
genre="lo-fi", duration_sec=120, resolution="1920x1080",
style="visualizer")
assert out["url"].endswith("/50/video.mp4")
assert out["used_fallback"] is False
args = mock_run.call_args[0][0]
assert args[0] == "ffmpeg"
assert "-i" in args
assert "showwaves" in " ".join(args)
@patch("subprocess.run")
def test_generate_video_failure_marks_failed(mock_run, tmp_storage):
mock_run.return_value = MagicMock(returncode=1, stderr="bad codec")
with pytest.raises(video.VideoGenerationError):
video.generate(pipeline_id=51, audio_path=str(tmp_storage / "audio.mp3"),
cover_path=str(tmp_storage / "50" / "cover.jpg"),
genre="lo-fi", duration_sec=120, resolution="1920x1080",
style="visualizer")
@patch("subprocess.run")
def test_thumb_extracts_frame(mock_run, tmp_storage):
mock_run.return_value = MagicMock(returncode=0, stderr="")
video_path = tmp_storage / "60" / "video.mp4"
video_path.parent.mkdir(parents=True)
video_path.write_bytes(b"\x00" * 100)
out = thumb.generate(pipeline_id=60, video_path=str(video_path),
track_title="Midnight Drive", overlay_text=True)
assert out["url"].endswith("/60/thumbnail.jpg")
args = mock_run.call_args[0][0]
assert args[0] == "ffmpeg"
```
- [ ] **Step 2: Run, verify fail**
Run: `python -m pytest tests/test_video_thumb.py -v`
Expected: ImportError
- [ ] **Step 3: Implement `video.py`** — 기존 `video_producer.py`의 ffmpeg 명령 빌더 활용
```python
"""영상 비주얼 생성 — visualizer/슬라이드쇼 스타일."""
import os
import subprocess
import logging
from . import storage
logger = logging.getLogger("music-lab.video")
VIDEO_TIMEOUT_S = 300 # 5분
class VideoGenerationError(Exception):
pass
def generate(*, pipeline_id: int, audio_path: str, cover_path: str,
genre: str, duration_sec: int, resolution: str = "1920x1080",
style: str = "visualizer") -> dict:
"""영상 생성. 성공 시 mp4 저장 + URL 반환. 실패 시 예외."""
w, h = resolution.split("x")
out_path = os.path.join(storage.pipeline_dir(pipeline_id), "video.mp4")
if style == "visualizer":
cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h)
else:
# 차후: 슬라이드쇼 등 다른 스타일
cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h)
logger.info("ffmpeg 실행: %s", " ".join(cmd))
result = subprocess.run(cmd, capture_output=True, text=True, timeout=VIDEO_TIMEOUT_S)
if result.returncode != 0:
raise VideoGenerationError(f"ffmpeg 실패: {result.stderr[:500]}")
return {
"url": storage.media_url(pipeline_id, "video.mp4"),
"used_fallback": False,
"duration_sec": duration_sec,
}
def _build_visualizer_cmd(audio: str, bg: str, out: str, w: str, h: str) -> list:
return [
"ffmpeg", "-y",
"-loop", "1", "-i", bg,
"-i", audio,
"-filter_complex",
f"[0:v]scale={w}:{h}[bg];"
f"[1:a]showwaves=s={w}x200:mode=cline:colors=0xFF4444@0.8[wave];"
f"[bg][wave]overlay=0:({h}-200)[out]",
"-map", "[out]", "-map", "1:a",
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "192k",
"-shortest", out,
]
```
- [ ] **Step 4: Implement `thumb.py`**
```python
"""썸네일 생성 — 영상 5초 프레임 추출 + 텍스트 오버레이."""
import os
import subprocess
import logging
from PIL import Image, ImageDraw, ImageFont
from . import storage
logger = logging.getLogger("music-lab.thumb")
THUMB_TIMEOUT_S = 60
class ThumbGenerationError(Exception):
pass
def generate(*, pipeline_id: int, video_path: str,
track_title: str = "", overlay_text: bool = True) -> dict:
out_path = os.path.join(storage.pipeline_dir(pipeline_id), "thumbnail.jpg")
cmd = ["ffmpeg", "-y", "-i", video_path,
"-ss", "00:00:05", "-vframes", "1", "-q:v", "2", out_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=THUMB_TIMEOUT_S)
if result.returncode != 0:
raise ThumbGenerationError(f"ffmpeg 썸네일 실패: {result.stderr[:300]}")
if overlay_text and track_title:
_overlay_title(out_path, track_title)
return {"url": storage.media_url(pipeline_id, "thumbnail.jpg"), "used_fallback": False}
def _overlay_title(path: str, title: str) -> None:
try:
img = Image.open(path).convert("RGB")
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 80)
except OSError:
font = ImageFont.load_default()
# 하단 30% 영역에 검정 반투명 박스 + 흰 글씨
w, h = img.size
box_h = int(h * 0.3)
overlay = Image.new("RGBA", (w, box_h), (0, 0, 0, 160))
img.paste(overlay, (0, h - box_h), overlay)
bbox = draw.textbbox((0, 0), title, font=font)
tw = bbox[2] - bbox[0]
draw.text(((w - tw) // 2, h - box_h + 30), title, fill=(255, 255, 255), font=font)
img.save(path, "JPEG", quality=92)
except Exception as e:
logger.warning("썸네일 오버레이 실패: %s", e)
```
- [ ] **Step 5: Run tests**
Run: `python -m pytest tests/test_video_thumb.py -v`
Expected: 3 PASS
- [ ] **Step 6: Commit**
```bash
git add music-lab/app/pipeline/video.py music-lab/app/pipeline/thumb.py \
music-lab/tests/test_video_thumb.py
git commit -m "feat(music-lab): pipeline 영상·썸네일 생성"
```
---
## Task 5: 메타데이터 생성 (Claude Haiku)
**Files:**
- Create: `music-lab/app/pipeline/metadata.py`
- Test: `music-lab/tests/test_metadata_generation.py`
- [ ] **Step 1: Write failing test**
```python
# tests/test_metadata_generation.py
import pytest
import respx
from httpx import Response
from app.pipeline import metadata
@pytest.mark.asyncio
@respx.mock
async def test_metadata_calls_claude_and_parses_json(monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
payload = {
"content": [{"type": "text", "text": '{"title":"[Lo-fi] Drive | 85BPM",'
'"description":"chill","tags":["lofi","85bpm"],'
'"category_id":10}'}]
}
respx.post("https://api.anthropic.com/v1/messages").mock(
return_value=Response(200, json=payload)
)
result = await metadata.generate(
track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
"moods": ["chill"], "instruments": ["piano"]},
template={"title": "[{genre}] {title} | {bpm}BPM",
"description": "{title}\n", "tags": [], "category_id": 10},
trend_keywords=["lofi", "study"],
feedback="",
)
assert result["title"].startswith("[Lo-fi]")
assert "lofi" in result["tags"]
@pytest.mark.asyncio
@respx.mock
async def test_metadata_fallback_when_no_api_key(monkeypatch):
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
result = await metadata.generate(
track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
"moods": [], "instruments": []},
template={"title": "[{genre}] {title} | {bpm}BPM",
"description": "{title}", "tags": ["lofi"], "category_id": 10},
trend_keywords=[],
)
# 템플릿 변수 그대로 치환된 폴백
assert result["title"] == "[lo-fi] Drive | 85BPM"
assert result["used_fallback"] is True
@pytest.mark.asyncio
@respx.mock
async def test_metadata_includes_feedback_in_prompt(monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
captured = {}
def hook(req):
import json
captured["body"] = json.loads(req.content)
return Response(200, json={"content": [{"type": "text",
"text": '{"title":"x","description":"y","tags":[],"category_id":10}'}]})
respx.post("https://api.anthropic.com/v1/messages").mock(side_effect=hook)
await metadata.generate(
track={"title": "X", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
"moods": [], "instruments": []},
template={"title": "{title}", "description": "{title}", "tags": [], "category_id": 10},
trend_keywords=[],
feedback="제목을 짧게",
)
assert "제목을 짧게" in str(captured["body"])
```
- [ ] **Step 2: Run, verify fail**
Run: `python -m pytest tests/test_metadata_generation.py -v`
Expected: ImportError
- [ ] **Step 3: Implement `metadata.py`**
```python
"""메타데이터 생성 — Claude Haiku + 템플릿 폴백."""
import os
import json
import logging
import httpx
logger = logging.getLogger("music-lab.metadata")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
CLAUDE_MODEL = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001")
TIMEOUT_S = 30
async def generate(*, track: dict, template: dict, trend_keywords: list[str],
feedback: str = "") -> dict:
"""메타데이터 생성. 성공 시 LLM, 실패/미설정 시 템플릿 치환 폴백.
반환: {"title", "description", "tags", "category_id", "used_fallback", "error"}
"""
if not ANTHROPIC_API_KEY:
return {**_fallback_template(track, template), "used_fallback": True, "error": "no api key"}
try:
result = await _call_claude(track, template, trend_keywords, feedback)
return {**result, "used_fallback": False, "error": None}
except Exception as e:
logger.warning("메타데이터 LLM 실패 — 폴백: %s", e)
return {**_fallback_template(track, template), "used_fallback": True, "error": str(e)}
def _fallback_template(track: dict, template: dict) -> dict:
fmt_vars = {
"title": track.get("title", ""),
"genre": track.get("genre", ""),
"bpm": track.get("bpm", ""),
"key": track.get("key", ""),
"scale": track.get("scale", ""),
}
title = template.get("title", "{title}").format(**fmt_vars)
description = template.get("description", "{title}").format(**fmt_vars)
return {
"title": title[:100],
"description": description[:5000],
"tags": (template.get("tags") or [])[:15],
"category_id": template.get("category_id", 10),
}
async def _call_claude(track: dict, template: dict,
trend_keywords: list[str], feedback: str) -> dict:
user_prompt = (
"다음 트랙의 YouTube 메타데이터를 생성하세요. JSON으로만 응답.\n\n"
f"트랙: {json.dumps(track, ensure_ascii=False)}\n"
f"템플릿: {json.dumps(template, ensure_ascii=False)}\n"
f"트렌드 키워드: {', '.join(trend_keywords)}\n"
)
if feedback:
user_prompt += f"\n사용자 피드백: {feedback}\n"
user_prompt += (
'\n출력 JSON: {"title": "60자 이내", "description": "1000자 이내, 3-5문단",'
' "tags": ["15개 이내"], "category_id": 10}'
)
async with httpx.AsyncClient(timeout=TIMEOUT_S) as client:
resp = await client.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": CLAUDE_MODEL,
"max_tokens": 1024,
"messages": [{"role": "user", "content": user_prompt}],
},
)
resp.raise_for_status()
text = resp.json()["content"][0]["text"]
# 가장 첫 JSON 블록 추출
start = text.find("{")
end = text.rfind("}") + 1
return json.loads(text[start:end])
```
- [ ] **Step 4: Run tests**
Run: `python -m pytest tests/test_metadata_generation.py -v`
Expected: 3 PASS
- [ ] **Step 5: Commit**
```bash
git add music-lab/app/pipeline/metadata.py music-lab/tests/test_metadata_generation.py
git commit -m "feat(music-lab): pipeline 메타데이터 LLM 생성 + 폴백"
```
---
## Task 6: AI 최종 검토 (4축)
**Files:**
- Create: `music-lab/app/pipeline/review.py`
- Test: `music-lab/tests/test_review.py`
- [ ] **Step 1: Write failing test**
```python
# tests/test_review.py
import pytest
import respx
from httpx import Response
from app.pipeline import review
@pytest.mark.asyncio
@respx.mock
async def test_review_returns_pass_when_above_threshold(monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
body = {"content": [{"type": "text", "text":
'{"metadata_quality":{"score":80,"notes":"x"},'
'"policy_compliance":{"score":90,"issues":[]},'
'"viewer_experience":{"score":75,"notes":"y"},'
'"trend_alignment":{"score":70,"matched_keywords":["lofi"]},'
'"summary":"good"}'}]}
respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body))
result = await review.run_4_axis(
pipeline={"id": 1}, track={"title": "x", "genre": "lo-fi", "bpm": 85},
video_meta={"length_sec": 120, "resolution": "1920x1080"},
metadata={"title": "Y", "description": "Z", "tags": ["lofi"], "category_id": 10},
thumbnail_url="/m/x.jpg", trend_top=["lofi"],
weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20},
threshold=60,
)
assert result["verdict"] == "pass"
expected_total = 0.25 * 80 + 0.30 * 90 + 0.25 * 75 + 0.20 * 70
assert result["weighted_total"] == pytest.approx(expected_total, abs=0.01)
@pytest.mark.asyncio
@respx.mock
async def test_review_fail_below_threshold(monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
body = {"content": [{"type": "text", "text":
'{"metadata_quality":{"score":40,"notes":"x"},'
'"policy_compliance":{"score":50,"issues":[]},'
'"viewer_experience":{"score":30,"notes":"y"},'
'"trend_alignment":{"score":20,"matched_keywords":[]},'
'"summary":"bad"}'}]}
respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body))
result = await review.run_4_axis(
pipeline={"id": 2}, track={"title": "x", "genre": "lo-fi", "bpm": 85},
video_meta={"length_sec": 120, "resolution": "1920x1080"},
metadata={"title": "Y", "description": "Z", "tags": [], "category_id": 10},
thumbnail_url="/m/x.jpg", trend_top=[],
weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20},
threshold=60,
)
assert result["verdict"] == "fail"
@pytest.mark.asyncio
@respx.mock
async def test_review_heuristic_fallback_on_llm_error(monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(500))
result = await review.run_4_axis(
pipeline={"id": 3}, track={"title": "x", "genre": "lo-fi", "bpm": 85},
video_meta={"length_sec": 120, "resolution": "1920x1080"},
metadata={"title": "Y" * 30, "description": "Z" * 200, "tags": ["a", "b"], "category_id": 10},
thumbnail_url="/m/x.jpg", trend_top=["lofi"],
weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20},
threshold=60,
)
assert result["used_fallback"] is True
assert "weighted_total" in result
```
- [ ] **Step 2: Run, verify fail**
Run: `python -m pytest tests/test_review.py -v`
Expected: ImportError
- [ ] **Step 3: Implement `review.py`**
```python
"""AI 최종 검토 — 4축(메타/정책/시청/트렌드) 가중 평균."""
import os
import json
import logging
import httpx
logger = logging.getLogger("music-lab.review")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
CLAUDE_MODEL = os.getenv("CLAUDE_SONNET_MODEL", "claude-sonnet-4-6")
TIMEOUT_S = 60
POLICY_BANNED = {"f-word", "n-word"} # 실제 단어는 운영 시 별도 파일로 — 데모용 자리
async def run_4_axis(*, pipeline: dict, track: dict, video_meta: dict,
metadata: dict, thumbnail_url: str, trend_top: list[str],
weights: dict, threshold: int) -> dict:
if not ANTHROPIC_API_KEY:
return _heuristic(metadata, video_meta, track, trend_top, weights, threshold,
fallback_reason="no api key")
try:
scores = await _call_claude(pipeline, track, video_meta, metadata,
thumbnail_url, trend_top)
return _weighted_verdict(scores, weights, threshold, used_fallback=False)
except Exception as e:
logger.warning("검토 LLM 실패 — 휴리스틱: %s", e)
return _heuristic(metadata, video_meta, track, trend_top, weights, threshold,
fallback_reason=str(e))
def _weighted_verdict(scores: dict, weights: dict, threshold: int,
used_fallback: bool) -> dict:
total = (
weights["meta"] / 100 * scores["metadata_quality"]["score"] +
weights["policy"] / 100 * scores["policy_compliance"]["score"] +
weights["viewer"] / 100 * scores["viewer_experience"]["score"] +
weights["trend"] / 100 * scores["trend_alignment"]["score"]
)
return {
**scores,
"weighted_total": round(total, 2),
"verdict": "pass" if total >= threshold else "fail",
"used_fallback": used_fallback,
}
async def _call_claude(pipeline, track, video_meta, metadata, thumbnail_url, trend_top):
user = (
"트랙·영상·메타데이터를 4축으로 평가하고 JSON만 응답:\n"
f"트랙: {json.dumps(track, ensure_ascii=False)}\n"
f"영상: {json.dumps(video_meta)}\n"
f"메타: {json.dumps(metadata, ensure_ascii=False)}\n"
f"썸네일: {thumbnail_url}\n"
f"트렌드: {trend_top}\n"
'출력: {"metadata_quality":{"score":0-100,"notes":""},'
'"policy_compliance":{"score":0-100,"issues":[]},'
'"viewer_experience":{"score":0-100,"notes":""},'
'"trend_alignment":{"score":0-100,"matched_keywords":[]},'
'"summary":""}'
)
async with httpx.AsyncClient(timeout=TIMEOUT_S) as client:
resp = await client.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={"model": CLAUDE_MODEL, "max_tokens": 1024,
"messages": [{"role": "user", "content": user}]},
)
resp.raise_for_status()
text = resp.json()["content"][0]["text"]
return json.loads(text[text.find("{"):text.rfind("}")+1])
def _heuristic(metadata, video_meta, track, trend_top, weights, threshold, fallback_reason):
# 메타: 길이·태그 카운트
title_len = len(metadata.get("title", ""))
desc_len = len(metadata.get("description", ""))
tag_n = len(metadata.get("tags", []))
meta_score = 100 if 5 <= title_len <= 60 and 50 <= desc_len <= 1000 and 5 <= tag_n <= 15 else 50
# 정책: 금칙어 매치
text_blob = (metadata.get("title", "") + metadata.get("description", "")).lower()
policy_score = 100 if not any(w in text_blob for w in POLICY_BANNED) else 30
# 시청: 영상 길이가 트랙과 큰 차이 없는지 휴리스틱(±5초)
expected = track.get("duration_sec", video_meta.get("length_sec", 0))
delta = abs(video_meta.get("length_sec", 0) - expected)
viewer_score = 90 if delta <= 5 else 60
# 트렌드: 태그가 트렌드와 겹치는지
overlap = set(metadata.get("tags", [])) & set(trend_top)
trend_score = 100 if overlap else 40
scores = {
"metadata_quality": {"score": meta_score, "notes": "휴리스틱"},
"policy_compliance": {"score": policy_score, "issues": []},
"viewer_experience": {"score": viewer_score, "notes": "휴리스틱"},
"trend_alignment": {"score": trend_score, "matched_keywords": list(overlap)},
"summary": f"휴리스틱 fallback: {fallback_reason}",
}
return _weighted_verdict(scores, weights, threshold, used_fallback=True)
```
- [ ] **Step 4: Run tests**
Run: `python -m pytest tests/test_review.py -v`
Expected: 3 PASS
- [ ] **Step 5: Commit**
```bash
git add music-lab/app/pipeline/review.py music-lab/tests/test_review.py
git commit -m "feat(music-lab): pipeline 4축 AI 검토 + 휴리스틱 폴백"
```
---
## Task 7: YouTube OAuth + Upload
**Files:**
- Create: `music-lab/app/pipeline/youtube.py`
- Test: `music-lab/tests/test_youtube_upload.py`
- Modify: `music-lab/requirements.txt` (`google-api-python-client>=2.100`, `google-auth-oauthlib>=1.2`)
- [ ] **Step 1: Add deps**
`requirements.txt`:
```
google-api-python-client>=2.100
google-auth-oauthlib>=1.2
google-auth-httplib2>=0.2
```
- [ ] **Step 2: Write failing test**
```python
# tests/test_youtube_upload.py
import pytest
from unittest.mock import patch, MagicMock
from app.pipeline import youtube
def _setup_token(monkeypatch, db, refresh="r1", access="a1"):
db.upsert_oauth_token(
channel_id="UC1", channel_title="t", avatar_url=None,
refresh_token=refresh, access_token=access, expires_at="2099-01-01T00:00:00",
)
@pytest.fixture
def fresh_db(monkeypatch, tmp_path):
from app import db
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
db.init_db()
return db
@patch("app.pipeline.youtube._build_youtube_client")
def test_upload_succeeds_after_resumable(mock_client, fresh_db, tmp_path, monkeypatch):
monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid")
monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec")
_setup_token(monkeypatch, fresh_db)
yt = MagicMock()
insert = MagicMock()
insert.next_chunk.side_effect = [(None, None), (None, {"id": "VID123"})]
yt.videos().insert.return_value = insert
mock_client.return_value = yt
video_path = tmp_path / "v.mp4"
video_path.write_bytes(b"\x00")
out = youtube.upload_video(
video_path=str(video_path),
thumbnail_path=None,
metadata={"title": "T", "description": "D", "tags": ["x"], "category_id": 10},
privacy="private",
)
assert out["video_id"] == "VID123"
@patch("app.pipeline.youtube._build_youtube_client")
def test_upload_no_token_raises(mock_client, fresh_db, tmp_path):
video_path = tmp_path / "v.mp4"
video_path.write_bytes(b"\x00")
with pytest.raises(youtube.NotAuthenticatedError):
youtube.upload_video(
video_path=str(video_path), thumbnail_path=None,
metadata={"title":"T","description":"D","tags":[],"category_id":10},
privacy="private",
)
@patch("app.pipeline.youtube._build_youtube_client")
def test_upload_quota_exceeded_marks_quota(mock_client, fresh_db, tmp_path, monkeypatch):
from googleapiclient.errors import HttpError
monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid")
monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec")
_setup_token(monkeypatch, fresh_db)
yt = MagicMock()
err = HttpError(MagicMock(status=403), b'{"error":{"errors":[{"reason":"quotaExceeded"}]}}')
yt.videos().insert.return_value.next_chunk.side_effect = err
mock_client.return_value = yt
video_path = tmp_path / "v.mp4"
video_path.write_bytes(b"\x00")
with pytest.raises(youtube.QuotaExceededError):
youtube.upload_video(
video_path=str(video_path), thumbnail_path=None,
metadata={"title":"T","description":"D","tags":[],"category_id":10},
privacy="private",
)
```
- [ ] **Step 3: Run, verify fail**
Run: `python -m pytest tests/test_youtube_upload.py -v`
Expected: ImportError
- [ ] **Step 4: Implement `youtube.py`**
```python
"""YouTube OAuth flow + resumable 업로드."""
import os
import logging
from urllib.parse import urlencode
import httpx
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.errors import HttpError
from app import db
logger = logging.getLogger("music-lab.youtube")
CLIENT_ID = os.getenv("YOUTUBE_OAUTH_CLIENT_ID", "")
CLIENT_SECRET = os.getenv("YOUTUBE_OAUTH_CLIENT_SECRET", "")
REDIRECT_URI = os.getenv("YOUTUBE_OAUTH_REDIRECT_URI", "")
SCOPES = ["https://www.googleapis.com/auth/youtube.upload",
"https://www.googleapis.com/auth/youtube.readonly"]
class NotAuthenticatedError(Exception):
pass
class QuotaExceededError(Exception):
pass
def get_auth_url() -> str:
if not CLIENT_ID or not REDIRECT_URI:
raise RuntimeError("OAuth 환경변수 미설정")
params = {
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": " ".join(SCOPES),
"access_type": "offline",
"prompt": "consent",
}
return "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode(params)
async def exchange_code(code: str) -> dict:
"""code → refresh_token + access_token + 채널 정보 → DB 저장."""
async with httpx.AsyncClient(timeout=30) as client:
token_resp = await client.post(
"https://oauth2.googleapis.com/token",
data={
"code": code,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uri": REDIRECT_URI,
"grant_type": "authorization_code",
},
)
token_resp.raise_for_status()
tok = token_resp.json()
access = tok["access_token"]
refresh = tok["refresh_token"]
expires_at = _expiry_from_seconds(tok["expires_in"])
# 채널 정보 조회
creds = _creds(access=access, refresh=refresh)
yt = build("youtube", "v3", credentials=creds, cache_discovery=False)
ch = yt.channels().list(part="snippet", mine=True).execute()
item = ch["items"][0]
db.upsert_oauth_token(
channel_id=item["id"],
channel_title=item["snippet"]["title"],
avatar_url=item["snippet"]["thumbnails"]["default"]["url"],
refresh_token=refresh, access_token=access, expires_at=expires_at,
)
return {"channel_id": item["id"], "channel_title": item["snippet"]["title"]}
def get_status() -> dict | None:
tok = db.get_oauth_token()
if not tok:
return None
return {
"channel_id": tok["channel_id"],
"channel_title": tok["channel_title"],
"avatar_url": tok["avatar_url"],
}
def disconnect() -> None:
db.delete_oauth_token()
def upload_video(*, video_path: str, thumbnail_path: str | None,
metadata: dict, privacy: str) -> dict:
tok = db.get_oauth_token()
if not tok:
raise NotAuthenticatedError("YouTube 인증 없음")
creds = _creds(access=tok["access_token"], refresh=tok["refresh_token"])
yt = _build_youtube_client(creds)
body = {
"snippet": {
"title": metadata["title"],
"description": metadata["description"],
"tags": metadata.get("tags", []),
"categoryId": str(metadata.get("category_id", 10)),
},
"status": {"privacyStatus": privacy, "selfDeclaredMadeForKids": False},
}
media = MediaFileUpload(video_path, chunksize=4 * 1024 * 1024, resumable=True, mimetype="video/mp4")
req = yt.videos().insert(part="snippet,status", body=body, media_body=media)
try:
response = None
while response is None:
status, response = req.next_chunk()
video_id = response["id"]
except HttpError as e:
if b"quotaExceeded" in e.content:
raise QuotaExceededError(str(e))
raise
if thumbnail_path:
try:
yt.thumbnails().set(videoId=video_id, media_body=thumbnail_path).execute()
except HttpError as e:
logger.warning("썸네일 업로드 실패: %s", e)
return {"video_id": video_id}
def _build_youtube_client(creds): # patch 포인트
return build("youtube", "v3", credentials=creds, cache_discovery=False)
def _creds(access: str, refresh: str) -> Credentials:
return Credentials(
token=access, refresh_token=refresh,
token_uri="https://oauth2.googleapis.com/token",
client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES,
)
def _expiry_from_seconds(secs: int) -> str:
from datetime import datetime, timedelta
return (datetime.utcnow() + timedelta(seconds=secs)).isoformat(timespec="seconds")
```
- [ ] **Step 5: Run tests**
Run: `python -m pytest tests/test_youtube_upload.py -v`
Expected: 3 PASS
- [ ] **Step 6: Commit**
```bash
git add music-lab/app/pipeline/youtube.py music-lab/tests/test_youtube_upload.py \
music-lab/requirements.txt
git commit -m "feat(music-lab): YouTube OAuth + resumable 업로드"
```
---
## Task 8: 오케스트레이터 + 13개 엔드포인트
**Files:**
- Create: `music-lab/app/pipeline/orchestrator.py`
- Modify: `music-lab/app/main.py`
- Test: `music-lab/tests/test_pipeline_endpoints.py`
이 task는 코드량이 가장 많음. 엔드포인트 12개 + orchestrator의 BackgroundTask 분기.
- [ ] **Step 1: Write failing test**
```python
# tests/test_pipeline_endpoints.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app import db
@pytest.fixture
def client(monkeypatch, tmp_path):
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
db.init_db()
# 최소 트랙 1개 (라이브러리)
from app.db import save_track
save_track({"id": 1, "title": "T", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
"moods": [], "instruments": [], "audio_url": "/x.mp3", "duration_sec": 120})
return TestClient(app)
def test_create_pipeline(client):
r = client.post("/api/music/pipeline", json={"track_id": 1})
assert r.status_code == 201
assert r.json()["state"] == "created"
def test_create_duplicate_pipeline_returns_409(client):
client.post("/api/music/pipeline", json={"track_id": 1})
r = client.post("/api/music/pipeline", json={"track_id": 1})
assert r.status_code == 409
def test_get_pipeline_returns_jobs_and_feedback(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
r = client.get(f"/api/music/pipeline/{pid}")
assert "jobs" in r.json()
assert "feedback" in r.json()
def test_list_pipelines_active_filter(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "published")
r = client.get("/api/music/pipeline?status=active")
assert all(p["state"] != "published" for p in r.json()["pipelines"])
def test_feedback_approve_advances_state(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "cover_pending", cover_url="/m/x.jpg")
r = client.post(f"/api/music/pipeline/{pid}/feedback",
json={"step": "cover", "intent": "approve"})
assert r.status_code == 202
after = db.get_pipeline(pid)
# video_pending이 BackgroundTask로 진입 후 결과는 mock 환경에서 즉시 안 보일 수 있음
# 최소 cover_approved 또는 video_pending 단계 확인
assert after["state"] in ("cover_approved", "video_pending", "video_running")
def test_feedback_reject_records_feedback_and_increments_count(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "cover_pending")
r = client.post(f"/api/music/pipeline/{pid}/feedback",
json={"step": "cover", "intent": "reject", "feedback_text": "더 어둡게"})
assert r.status_code == 202
p = db.get_pipeline(pid)
assert p["feedback_count_per_step"]["cover"] == 1
history = db.get_feedback_history(pid)
assert history[0]["feedback_text"] == "더 어둡게"
def test_feedback_after_5_rejects_marks_awaiting_manual(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "cover_pending")
for i in range(5):
client.post(f"/api/music/pipeline/{pid}/feedback",
json={"step": "cover", "intent": "reject", "feedback_text": f"again {i}"})
r = client.post(f"/api/music/pipeline/{pid}/feedback",
json={"step": "cover", "intent": "reject", "feedback_text": "6th"})
assert r.status_code == 409
assert db.get_pipeline(pid)["state"] == "awaiting_manual"
def test_cancel_pipeline(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
r = client.post(f"/api/music/pipeline/{pid}/cancel")
assert r.status_code == 200
assert db.get_pipeline(pid)["state"] == "cancelled"
```
- [ ] **Step 2: Run, verify fail**
Run: `python -m pytest tests/test_pipeline_endpoints.py -v`
Expected: 404 또는 collection error
- [ ] **Step 3: Implement `orchestrator.py`**
```python
"""파이프라인 오케스트레이터 — 단계별 BackgroundTask 등록 및 산출물 → DB 반영."""
import asyncio
import json
import logging
from typing import Optional
from app import db
from . import cover, video, thumb, metadata, review, youtube
logger = logging.getLogger("music-lab.orchestrator")
REVIEW_WEIGHTS_DEFAULT = {"meta": 25, "policy": 30, "viewer": 25, "trend": 20}
async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
"""단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이.
호출 직후 _running 상태로 전환, 끝나면 _pending(사용자 게이트) 또는 자동 다음.
실패 시 failed 상태 + reason.
"""
job_id = db.create_pipeline_job(pipeline_id, step)
db.update_pipeline_job(job_id, status="running")
p = db.get_pipeline(pipeline_id)
track = _get_track(p["track_id"])
try:
if step == "cover":
result = await _run_cover(p, track, feedback)
elif step == "video":
result = await _run_video(p, track)
elif step == "thumb":
result = await _run_thumb(p, track, feedback)
elif step == "meta":
result = await _run_meta(p, track, feedback)
elif step == "review":
result = await _run_review(p, track)
elif step == "publish":
result = await _run_publish(p, track)
else:
raise ValueError(f"unknown step: {step}")
db.update_pipeline_job(job_id, status="succeeded")
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
except Exception as e:
logger.exception("step %s failed for pipeline %s", step, pipeline_id)
db.update_pipeline_job(job_id, status="failed", error=str(e))
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
def _get_track(track_id: int) -> dict:
# tracks 테이블 스키마는 기존 music-lab — 여기서는 dict로 추정
t = db.get_track(track_id)
if not t:
raise ValueError(f"트랙 {track_id} 없음")
return t
async def _run_cover(p, track, feedback):
setup = db.get_youtube_setup()
prompts = setup["cover_prompts"]
template = prompts.get(track["genre"].lower(), prompts.get("default", ""))
out = await cover.generate(
pipeline_id=p["id"], genre=track["genre"], prompt_template=template,
mood=", ".join(track.get("moods", [])), track_title=track["title"], feedback=feedback,
)
return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}}
async def _run_video(p, track):
setup = db.get_youtube_setup()
vd = setup["visual_defaults"]
audio_path = _local_path(track["audio_url"]) # /media/... → /data/...
cover_path = _local_path(p["cover_url"])
out = video.generate(
pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path,
genre=track["genre"], duration_sec=track.get("duration_sec", 120),
resolution=vd["resolution"], style=vd["style"],
)
return {"next_state": "video_pending", "fields": {"video_url": out["url"]}}
async def _run_thumb(p, track, feedback):
video_path = _local_path(p["video_url"])
out = thumb.generate(pipeline_id=p["id"], video_path=video_path,
track_title=track["title"], overlay_text=True)
return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}}
async def _run_meta(p, track, feedback):
setup = db.get_youtube_setup()
trend_top = _get_trend_top()
out = await metadata.generate(
track=track, template=setup["metadata_template"],
trend_keywords=trend_top, feedback=feedback,
)
return {"next_state": "meta_pending",
"fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}}
async def _run_review(p, track):
setup = db.get_youtube_setup()
meta = json.loads(p["metadata_json"])
result = await review.run_4_axis(
pipeline=p, track=track,
video_meta={"length_sec": track.get("duration_sec", 120),
"resolution": setup["visual_defaults"]["resolution"]},
metadata=meta, thumbnail_url=p["thumbnail_url"],
trend_top=_get_trend_top(),
weights=setup["review_weights"], threshold=setup["review_threshold"],
)
return {"next_state": "publish_pending",
"fields": {"review_json": json.dumps(result, ensure_ascii=False)}}
async def _run_publish(p, track):
setup = db.get_youtube_setup()
meta = json.loads(p["metadata_json"])
privacy = setup["publish_policy"].get("privacy", "private")
result = youtube.upload_video(
video_path=_local_path(p["video_url"]),
thumbnail_path=_local_path(p["thumbnail_url"]),
metadata=meta, privacy=privacy,
)
return {"next_state": "published",
"fields": {"youtube_video_id": result["video_id"]}}
def _local_path(media_url: str) -> str:
""" /media/videos/123/cover.jpg → /app/data/videos/123/cover.jpg """
import os
base_media = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")
base_data = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
if media_url.startswith(base_media):
return media_url.replace(base_media, base_data, 1)
# /media/music/abc.mp3 → /app/data/music/abc.mp3
return media_url.replace("/media/", "/app/data/", 1)
def _get_trend_top(n: int = 10) -> list[str]:
try:
rows = db.get_market_trends(limit=n) # 기존 market_trends 테이블
return [r["genre"] for r in rows]
except Exception:
return []
```
- [ ] **Step 4: Add endpoints to `main.py`**
`app/main.py`에 다음 추가 (기존 endpoints 아래):
```python
from fastapi import BackgroundTasks
from pydantic import BaseModel
from app.pipeline import orchestrator, youtube as yt_module
from app.pipeline.state_machine import (
next_state_on_approve, next_state_on_reject, USER_GATES,
)
class PipelineCreate(BaseModel):
track_id: int
class FeedbackRequest(BaseModel):
step: str
intent: str # approve | reject
feedback_text: str | None = None
@app.post("/api/music/pipeline", status_code=201)
def create_pipeline(req: PipelineCreate):
# 동일 트랙 활성 파이프라인 중복 방지
actives = db.list_pipelines(active_only=True)
if any(p["track_id"] == req.track_id for p in actives):
raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다")
pid = db.create_pipeline(req.track_id)
return db.get_pipeline(pid)
@app.get("/api/music/pipeline")
def list_pipelines_endpoint(status: str = "all"):
pipelines = db.list_pipelines(active_only=(status == "active"))
return {"pipelines": pipelines}
@app.get("/api/music/pipeline/{pid}")
def get_pipeline_endpoint(pid: int):
p = db.get_pipeline(pid)
if not p:
raise HTTPException(404)
p["jobs"] = db.list_pipeline_jobs(pid)
p["feedback"] = db.get_feedback_history(pid)
return p
@app.post("/api/music/pipeline/{pid}/start", status_code=202)
async def start_pipeline(pid: int, bg: BackgroundTasks):
p = db.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "created":
raise HTTPException(409, f"이미 시작됨 ({p['state']})")
bg.add_task(orchestrator.run_step, pid, "cover")
return {"ok": True}
@app.post("/api/music/pipeline/{pid}/feedback", status_code=202)
async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks):
p = db.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] == "awaiting_manual":
raise HTTPException(409, "수동 개입 대기 중")
state = p["state"]
expected = f"{req.step}_pending"
if state != expected:
# 멱등 처리 — 이미 다음 단계로 넘어갔으면 무시
return {"ok": True, "skipped": True}
if req.intent == "approve":
next_st = next_state_on_approve(state)
db.update_pipeline_state(pid, next_st)
# 다음 단계가 *_pending이면 자동 생성 트리거; ai_review/publishing도 자동 트리거
next_step = _state_to_step(next_st)
if next_step:
bg.add_task(orchestrator.run_step, pid, next_step)
return {"ok": True}
elif req.intent == "reject":
count = db.increment_feedback_count(pid, req.step)
if count > 5:
db.update_pipeline_state(pid, "awaiting_manual")
raise HTTPException(409, "재생성 한도 초과")
if req.feedback_text:
db.record_feedback(pid, req.step, req.feedback_text)
bg.add_task(orchestrator.run_step, pid, req.step, req.feedback_text or "")
return {"ok": True}
else:
raise HTTPException(400, f"unknown intent: {req.intent}")
def _state_to_step(state: str) -> str | None:
return {
"video_pending": "video",
"thumb_pending": "thumb",
"meta_pending": "meta",
"ai_review": "review",
"publish_pending": None, # 사용자 명시 발행 호출 필요
"publishing": "publish",
}.get(state)
@app.post("/api/music/pipeline/{pid}/cancel")
def cancel_pipeline(pid: int):
p = db.get_pipeline(pid)
if not p:
raise HTTPException(404)
db.update_pipeline_state(pid, "cancelled")
return {"ok": True}
@app.post("/api/music/pipeline/{pid}/publish", status_code=202)
async def publish_pipeline(pid: int, bg: BackgroundTasks):
p = db.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "publish_pending":
raise HTTPException(409, f"발행 단계 아님 ({p['state']})")
db.update_pipeline_state(pid, "publishing")
bg.add_task(orchestrator.run_step, pid, "publish")
return {"ok": True}
# --- Setup ---
class SetupRequest(BaseModel):
metadata_template: dict | None = None
cover_prompts: dict | None = None
review_weights: dict | None = None
review_threshold: int | None = None
visual_defaults: dict | None = None
publish_policy: dict | None = None
@app.get("/api/music/setup")
def get_setup():
return db.get_youtube_setup()
@app.put("/api/music/setup")
def put_setup(req: SetupRequest):
db.update_youtube_setup(**{k: v for k, v in req.dict().items() if v is not None})
return db.get_youtube_setup()
# --- YouTube OAuth ---
@app.get("/api/music/youtube/auth-url")
def youtube_auth_url():
return {"url": yt_module.get_auth_url()}
@app.get("/api/music/youtube/callback")
async def youtube_callback(code: str):
info = await yt_module.exchange_code(code)
return info
@app.post("/api/music/youtube/disconnect")
def youtube_disconnect():
yt_module.disconnect()
return {"ok": True}
@app.get("/api/music/youtube/status")
def youtube_status():
return yt_module.get_status() or {"connected": False}
```
- [ ] **Step 5: Run tests**
Run: `python -m pytest tests/test_pipeline_endpoints.py -v`
Expected: 8 PASS (BackgroundTask는 TestClient에서 동기 실행 — 테스트는 mock orchestrator 또는 첫 단계만 검증)
> 참고: BackgroundTasks가 실제 cover.generate를 호출하면 OPENAI_API_KEY 없이 그라데이션 폴백을 만든다. 테스트에서는 `monkeypatch.setattr(orchestrator, "run_step", AsyncMock())`로 우회해도 됨. 위 테스트는 상태 검증 위주라 OK.
- [ ] **Step 6: Commit**
```bash
git add music-lab/app/pipeline/orchestrator.py music-lab/app/main.py \
music-lab/tests/test_pipeline_endpoints.py
git commit -m "feat(music-lab): pipeline 오케스트레이터 + 13개 엔드포인트"
```
---
## Task 9: agent-office 자연어 의도 분류
**Files:**
- Create: `agent-office/app/agents/classify_intent.py`
- Test: `agent-office/tests/test_classify_intent.py`
- [ ] **Step 1: Write failing test**
```python
# tests/test_classify_intent.py
import pytest
import respx
from httpx import Response
from app.agents import classify_intent as ci
def test_clear_approve_no_llm(monkeypatch):
called = {"n": 0}
monkeypatch.setattr(ci, "_llm_classify", lambda t: (called.update(n=called["n"]+1), ("unclear", None))[1])
assert ci.classify("승인") == ("approve", None)
assert ci.classify("OK") == ("approve", None)
assert ci.classify("진행") == ("approve", None)
assert ci.classify("agree") == ("approve", None)
assert called["n"] == 0
def test_clear_reject_only_no_llm(monkeypatch):
monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None))
assert ci.classify("반려") == ("reject", None)
assert ci.classify("거절") == ("reject", None)
def test_reject_with_text_split(monkeypatch):
monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None))
intent, fb = ci.classify("반려, 제목 짧게")
assert intent == "reject"
assert "제목 짧게" in fb
@respx.mock
def test_ambiguous_calls_llm(monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
respx.post("https://api.anthropic.com/v1/messages").mock(
return_value=Response(200, json={"content": [{"type": "text",
"text": '{"intent":"reject","feedback":"좀 더 화려하게"}'}]})
)
intent, fb = ci.classify("음... 좀 더 화려한 분위기가 좋겠어")
assert intent == "reject"
assert "화려하게" in fb
```
- [ ] **Step 2: Run, verify fail**
Run: `cd agent-office && python -m pytest tests/test_classify_intent.py -v`
Expected: ImportError
- [ ] **Step 3: Implement `classify_intent.py`**
```python
"""텔레그램 사용자 응답 자연어 분류 — 화이트리스트 우선, 모호 시 LLM."""
import os
import json
import logging
import httpx
logger = logging.getLogger("agent-office.classify_intent")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
CLAUDE_HAIKU = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001")
APPROVE_WORDS = {
"승인", "시작", "진행", "ok", "okay", "agree",
"", "", "좋아", "좋아요", "go", "yes", "y",
}
REJECT_WORDS = {"반려", "거절", "취소", "no", "nope", "n"}
def classify(text: str) -> tuple[str, str | None]:
"""returns (intent, feedback) — intent ∈ {approve, reject, unclear}"""
if not text:
return ("unclear", None)
t = text.strip().lower()
if t in APPROVE_WORDS:
return ("approve", None)
if t in REJECT_WORDS:
return ("reject", None)
# 반려 단어로 시작 + 추가 텍스트
for w in REJECT_WORDS:
if t.startswith(w):
rest = text.strip()[len(w):].lstrip(" ,.-:").strip()
if rest:
return ("reject", rest)
# 승인 단어로 시작 + 추가 텍스트(추가 텍스트 무시)
for w in APPROVE_WORDS:
if t.startswith(w + " ") or t == w:
return ("approve", None)
return _llm_classify(text)
def _llm_classify(text: str) -> tuple[str, str | None]:
if not ANTHROPIC_API_KEY:
return ("unclear", None)
prompt = (
"사용자 응답을 분류하세요. JSON으로만 응답.\n"
f'응답: "{text}"\n\n'
'출력: {"intent":"approve|reject|unclear","feedback":"반려면 수정 방향, 아니면 빈 문자열"}'
)
try:
resp = httpx.post(
"https://api.anthropic.com/v1/messages",
headers={"x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01"},
json={"model": CLAUDE_HAIKU, "max_tokens": 200,
"messages": [{"role": "user", "content": prompt}]},
timeout=15,
)
resp.raise_for_status()
text_out = resp.json()["content"][0]["text"]
data = json.loads(text_out[text_out.find("{"):text_out.rfind("}")+1])
return (data.get("intent", "unclear"), data.get("feedback") or None)
except Exception as e:
logger.warning("LLM 분류 실패: %s", e)
return ("unclear", None)
```
- [ ] **Step 4: Run tests**
Run: `python -m pytest tests/test_classify_intent.py -v`
Expected: 4 PASS
- [ ] **Step 5: Commit**
```bash
git add agent-office/app/agents/classify_intent.py agent-office/tests/test_classify_intent.py
git commit -m "feat(agent-office): 텔레그램 자연어 의도 분류"
```
---
## Task 10: youtube_publisher 에이전트 + 폴링 잡
**Files:**
- Create: `agent-office/app/agents/youtube_publisher.py`
- Modify: `agent-office/app/agents/__init__.py`
- Modify: `agent-office/app/scheduler.py`
- Modify: `agent-office/app/service_proxy.py`
- Modify: `agent-office/app/telegram/conversational.py`
- Test: `agent-office/tests/test_pipeline_polling.py`
- [ ] **Step 1: Add service_proxy helpers**
`service_proxy.py`에 추가:
```python
async def list_active_pipelines() -> list[dict]:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=active")
resp.raise_for_status()
return resp.json()["pipelines"]
async def get_pipeline(pid: int) -> dict:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}")
resp.raise_for_status()
return resp.json()
async def post_pipeline_feedback(pid: int, step: str, intent: str,
feedback_text: str | None = None) -> dict:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/feedback",
json={"step": step, "intent": intent, "feedback_text": feedback_text},
)
resp.raise_for_status()
return resp.json()
```
- [ ] **Step 2: Implement youtube_publisher**
```python
# agents/youtube_publisher.py
"""텔레그램 단일 채널로 단계별 승인 인터랙션 오케스트레이션."""
import logging
from typing import Optional
from .base import BaseAgent
from . import classify_intent
from .. import service_proxy
from ..db import add_log
from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office.youtube_publisher")
_STEP_TITLES = {
"cover_pending": ("커버 아트", "cover"),
"video_pending": ("영상 비주얼", "video"),
"thumb_pending": ("썸네일", "thumb"),
"meta_pending": ("메타데이터", "meta"),
"publish_pending": ("최종 검토 + 발행", "publish"),
}
class YoutubePublisherAgent(BaseAgent):
agent_id = "youtube_publisher"
display_name = "YouTube 퍼블리셔"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._notified_state_per_pipeline: dict[int, str] = {}
async def poll_state_changes(self) -> None:
"""30초마다 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
try:
pipelines = await service_proxy.list_active_pipelines()
except Exception as e:
logger.warning("폴링 실패: %s", e)
return
for p in pipelines:
state = p["state"]
pid = p["id"]
if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state:
await self._notify_step(p)
self._notified_state_per_pipeline[pid] = state
async def _notify_step(self, pipeline: dict) -> None:
state = pipeline["state"]
title_name, step = _STEP_TITLES[state]
body = self._format_body(pipeline, step)
sent = await send_raw(
text=f"🎵 [{pipeline.get('track_title', 'Pipeline')}] {title_name} 검토\n\n{body}\n\n"
f"➡️ 답장으로 알려주세요: '승인' 또는 '반려 + 수정 방향'",
metadata={"pipeline_id": pipeline["id"], "step": step},
)
if sent.get("ok"):
add_log(self.agent_id, f"pipeline {pipeline['id']} {step} 알림 전송", "info")
def _format_body(self, p: dict, step: str) -> str:
if step == "cover":
return f"🖼️ 커버: {p.get('cover_url', '-')}"
if step == "video":
return f"🎬 영상: {p.get('video_url', '-')}"
if step == "thumb":
return f"🎴 썸네일: {p.get('thumbnail_url', '-')}"
if step == "meta":
m = p.get("metadata", {})
return (f"📝 제목: {m.get('title','')}\n"
f"🏷️ 태그: {', '.join(m.get('tags', [])[:8])}\n"
f"📄 설명(앞부분): {(m.get('description','') or '')[:200]}")
if step == "publish":
r = p.get("review", {})
return (f"AI 검토 결과: {r.get('verdict','?')} "
f"(가중 {r.get('weighted_total','?')}/100)\n"
f"{r.get('summary','')}")
return ""
async def on_telegram_reply(self, pipeline_id: int, step: str, user_text: str) -> None:
intent, feedback = classify_intent.classify(user_text)
if intent == "unclear":
await send_raw("다시 입력해주세요. 예: '승인' 또는 '반려, 제목 짧게'")
return
try:
await service_proxy.post_pipeline_feedback(pipeline_id, step, intent, feedback)
except Exception as e:
await send_raw(f"⚠️ 처리 실패: {e}")
async def on_schedule(self) -> None:
# 폴링은 스케줄러에서 호출
await self.poll_state_changes()
async def on_command(self, command: str, params: dict) -> dict:
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
pass
```
- [ ] **Step 3: Register agent + scheduler**
`agents/__init__.py`에 등록:
```python
from .youtube_publisher import YoutubePublisherAgent
AGENT_REGISTRY = {
# ... existing ...
"youtube_publisher": YoutubePublisherAgent(...),
}
```
`scheduler.py`에 추가:
```python
async def _poll_pipelines():
agent = AGENT_REGISTRY.get("youtube_publisher")
if agent:
await agent.poll_state_changes()
def init_scheduler():
# ... existing jobs ...
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")
scheduler.start()
```
- [ ] **Step 4: Telegram reply routing**
`telegram/conversational.py`에서 reply 메시지 수신 시:
```python
# 기존 핸들러에 추가
async def handle_reply(message: dict) -> None:
reply_to = message.get("reply_to_message", {})
msg_id = reply_to.get("message_id")
if not msg_id:
return
# DB의 last_telegram_msg_ids에서 pipeline_id, step 찾기
# (이는 messaging.send_raw가 metadata에 저장해두는 게 깔끔하지만,
# 현재 스펙은 단순하게 매칭 — 또는 DB 별도 테이블 telegram_message_links 추가)
from .. import service_proxy
from ..agents import AGENT_REGISTRY
# 단순 구현: pipeline_id/step을 메시지 캡션 또는 DB에서 조회
link = _lookup_message_link(msg_id)
if not link:
return
agent = AGENT_REGISTRY.get("youtube_publisher")
await agent.on_telegram_reply(link["pipeline_id"], link["step"], message.get("text", ""))
```
> **Note**: `_lookup_message_link`는 별도 테이블(`telegram_message_links`) 또는 `video_pipelines.last_telegram_msg_ids` JSON 사용. 본 task에선 후자 사용 — 추가 마이그레이션 없이 기존 컬럼 활용.
`messaging.send_raw` 호출 후 반환된 message_id를 music-lab에 저장하기 위해 service_proxy에 헬퍼 추가 후 youtube_publisher에서 `_notify_step` 끝에 호출.
```python
# service_proxy.py
async def save_pipeline_telegram_msg(pid: int, step: str, msg_id: int) -> None:
async with httpx.AsyncClient(timeout=10) as client:
await client.patch(
f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/telegram-msg",
json={"step": step, "message_id": msg_id},
)
```
music-lab `main.py`에 endpoint 추가:
```python
class TelegramMsgPatch(BaseModel):
step: str
message_id: int
@app.patch("/api/music/pipeline/{pid}/telegram-msg")
def save_telegram_msg(pid: int, req: TelegramMsgPatch):
p = db.get_pipeline(pid)
if not p: raise HTTPException(404)
ids = p["last_telegram_msg_ids"]
ids[req.step] = req.message_id
conn = sqlite3.connect(db.DB_PATH)
conn.execute("UPDATE video_pipelines SET last_telegram_msg_ids = ?, updated_at = ? WHERE id = ?",
(json.dumps(ids), db._now(), pid))
conn.commit(); conn.close()
return {"ok": True}
```
reply 매칭용 `lookup` endpoint:
```python
@app.get("/api/music/pipeline/lookup-by-msg/{msg_id}")
def lookup_by_msg(msg_id: int):
# DB 전체 스캔 (소수의 active 파이프라인만 — 성능 OK)
for p in db.list_pipelines(active_only=True):
for step, mid in p["last_telegram_msg_ids"].items():
if mid == msg_id:
return {"pipeline_id": p["id"], "step": step}
raise HTTPException(404)
```
agent-office `_lookup_message_link` 구현:
```python
def _lookup_message_link(msg_id: int) -> Optional[dict]:
import httpx
from ..config import MUSIC_LAB_URL
try:
resp = httpx.get(f"{MUSIC_LAB_URL}/api/music/pipeline/lookup-by-msg/{msg_id}", timeout=5)
if resp.status_code == 200:
return resp.json()
except Exception:
pass
return None
```
- [ ] **Step 5: Polling test**
```python
# tests/test_pipeline_polling.py
import pytest
from unittest.mock import AsyncMock, patch
from app.agents.youtube_publisher import YoutubePublisherAgent
@pytest.mark.asyncio
@patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[{"id": 1, "state": "cover_pending", "cover_url": "/x.jpg"}]))
@patch("app.agents.youtube_publisher.send_raw", new=AsyncMock(return_value={"ok": True, "message_id": 99}))
async def test_poll_notifies_once_per_state(monkeypatch):
a = YoutubePublisherAgent(ws_manager=None)
await a.poll_state_changes()
await a.poll_state_changes() # 같은 상태 — 두 번째는 알림 안 함
from app.agents.youtube_publisher import send_raw as sr
assert sr.call_count == 1
```
- [ ] **Step 6: Run tests**
Run: `python -m pytest tests/test_pipeline_polling.py -v`
Expected: PASS
- [ ] **Step 7: Commit**
```bash
git add agent-office/app/agents/youtube_publisher.py \
agent-office/app/agents/__init__.py \
agent-office/app/scheduler.py \
agent-office/app/service_proxy.py \
agent-office/app/telegram/conversational.py \
agent-office/tests/test_pipeline_polling.py \
music-lab/app/main.py
git commit -m "feat(agent-office): youtube_publisher 에이전트 + 30s 폴링"
```
---
## Task 11: 프론트엔드 — api.js 헬퍼
**Files:**
- Modify: `web-ui/src/api.js`
- [ ] **Step 1: Add helpers**
`src/api.js` 끝에 추가:
```javascript
// --- Music Pipeline ---
export const listPipelines = (status='all') => apiGet(`/api/music/pipeline?status=${status}`);
export const getPipeline = (id) => apiGet(`/api/music/pipeline/${id}`);
export const createPipeline = (track_id) => apiPost('/api/music/pipeline', { track_id });
export const startPipeline = (id) => apiPost(`/api/music/pipeline/${id}/start`);
export const cancelPipeline = (id) => apiPost(`/api/music/pipeline/${id}/cancel`);
export const publishPipeline = (id) => apiPost(`/api/music/pipeline/${id}/publish`);
// --- Music Setup ---
export const getMusicSetup = () => apiGet('/api/music/setup');
export const updateMusicSetup = (payload) => apiPut('/api/music/setup', payload);
// --- YouTube OAuth ---
export const getYoutubeAuthUrl = () => apiGet('/api/music/youtube/auth-url');
export const getYoutubeStatus = () => apiGet('/api/music/youtube/status');
export const disconnectYoutube = () => apiPost('/api/music/youtube/disconnect');
```
- [ ] **Step 2: Commit**
```bash
git -C web-ui add src/api.js
git -C web-ui commit -m "feat(web-ui): pipeline/setup/youtube API 헬퍼"
```
---
## Task 12: 프론트엔드 — SetupTab
**Files:**
- Create: `web-ui/src/pages/music/components/SetupTab.jsx`
- Modify: `web-ui/src/pages/music/MusicStudio.css` (스타일)
- [ ] **Step 1: Implement SetupTab**
```jsx
// SetupTab.jsx
import { useEffect, useState } from 'react';
import {
getMusicSetup, updateMusicSetup,
getYoutubeAuthUrl, getYoutubeStatus, disconnectYoutube,
} from '../../../api';
export default function SetupTab() {
const [setup, setSetup] = useState(null);
const [yt, setYt] = useState(null);
const [saving, setSaving] = useState(false);
const [error, setError] = useState('');
useEffect(() => {
Promise.all([getMusicSetup(), getYoutubeStatus()])
.then(([s, y]) => { setSetup(s); setYt(y); })
.catch(e => setError(String(e)));
}, []);
if (!setup) return <p className="ms-loading">Loading</p>;
const save = async (patch) => {
setSaving(true);
try {
const next = await updateMusicSetup(patch);
setSetup(next);
} catch (e) { setError(String(e)); }
finally { setSaving(false); }
};
const connectYoutube = async () => {
const { url } = await getYoutubeAuthUrl();
window.location.href = url; // OAuth flow
};
return (
<div className="setup-container">
{error && <div className="ms-error">{error}</div>}
<section className="setup-card">
<h3>YouTube 채널 연동</h3>
{yt && yt.channel_id ? (
<div className="setup-channel">
<img src={yt.avatar_url} alt="" className="setup-avatar" />
<span>{yt.channel_title}</span>
<button onClick={async () => { await disconnectYoutube(); setYt({}); }}>
연결 해제
</button>
</div>
) : (
<button className="button primary" onClick={connectYoutube}>
Google 계정 연결
</button>
)}
</section>
<section className="setup-card">
<h3>메타데이터 템플릿</h3>
<label>제목 패턴
<input
value={setup.metadata_template.title}
onChange={e => setSetup(s => ({...s, metadata_template: {...s.metadata_template, title: e.target.value}}))}
/>
</label>
<label>설명 템플릿
<textarea
rows={6}
value={setup.metadata_template.description}
onChange={e => setSetup(s => ({...s, metadata_template: {...s.metadata_template, description: e.target.value}}))}
/>
</label>
<label>기본 태그 (쉼표 구분)
<input
value={(setup.metadata_template.tags || []).join(', ')}
onChange={e => setSetup(s => ({...s, metadata_template: {...s.metadata_template,
tags: e.target.value.split(',').map(t => t.trim()).filter(Boolean)}}))}
/>
</label>
<button onClick={() => save({ metadata_template: setup.metadata_template })}>저장</button>
</section>
<section className="setup-card">
<h3>AI 커버 prompt (장르별)</h3>
{Object.entries(setup.cover_prompts).map(([g, p]) => (
<div key={g} className="setup-prompt-row">
<span className="setup-prompt-genre">{g}</span>
<input
value={p}
onChange={e => setSetup(s => ({...s, cover_prompts: {...s.cover_prompts, [g]: e.target.value}}))}
/>
</div>
))}
<button onClick={() => save({ cover_prompts: setup.cover_prompts })}>저장</button>
</section>
<section className="setup-card">
<h3>AI 최종 검토 기준</h3>
{['meta','policy','viewer','trend'].map(k => (
<label key={k}>
{k} 가중치 ({setup.review_weights[k]})
<input type="range" min="0" max="100"
value={setup.review_weights[k]}
onChange={e => setSetup(s => ({...s, review_weights: {...s.review_weights, [k]: parseInt(e.target.value)}}))}
/>
</label>
))}
<label>임계값 ({setup.review_threshold})
<input type="range" min="0" max="100" value={setup.review_threshold}
onChange={e => setSetup(s => ({...s, review_threshold: parseInt(e.target.value)}))}
/>
</label>
<button onClick={() => save({ review_weights: setup.review_weights, review_threshold: setup.review_threshold })}>저장</button>
</section>
<section className="setup-card">
<h3>영상 비주얼 기본값</h3>
<label>해상도
<select value={setup.visual_defaults.resolution}
onChange={e => setSetup(s => ({...s, visual_defaults: {...s.visual_defaults, resolution: e.target.value}}))}>
<option value="1920x1080">1920×1080 (가로)</option>
<option value="1080x1920">1080×1920 (세로/Shorts)</option>
</select>
</label>
<label>스타일
<select value={setup.visual_defaults.style}
onChange={e => setSetup(s => ({...s, visual_defaults: {...s.visual_defaults, style: e.target.value}}))}>
<option value="visualizer">Visualizer (파형)</option>
</select>
</label>
<button onClick={() => save({ visual_defaults: setup.visual_defaults })}>저장</button>
</section>
<section className="setup-card">
<h3>발행 정책</h3>
<label>privacy
<select value={setup.publish_policy.privacy}
onChange={e => setSetup(s => ({...s, publish_policy: {...s.publish_policy, privacy: e.target.value}}))}>
<option value="private">Private (비공개)</option>
<option value="unlisted">Unlisted</option>
<option value="public">Public</option>
</select>
</label>
<button onClick={() => save({ publish_policy: setup.publish_policy })}>저장</button>
</section>
{saving && <div className="setup-saving">저장 ...</div>}
</div>
);
}
```
- [ ] **Step 2: Add CSS**
`MusicStudio.css` 끝에 추가:
```css
.setup-container { display:flex; flex-direction:column; gap:16px; padding:16px; }
.setup-card { background:rgba(0,0,0,.3); border:1px solid var(--line); border-radius:14px; padding:16px; }
.setup-card h3 { margin:0 0 12px; font-size:15px; color:var(--text); }
.setup-card label { display:block; margin:8px 0; font-size:12px; color:var(--muted); }
.setup-card input, .setup-card textarea, .setup-card select {
width:100%; padding:8px; margin-top:4px; background:rgba(255,255,255,.04);
border:1px solid var(--line); border-radius:8px; color:var(--text); font-size:13px;
}
.setup-channel { display:flex; align-items:center; gap:12px; }
.setup-avatar { width:32px; height:32px; border-radius:50%; }
.setup-prompt-row { display:flex; gap:8px; margin:6px 0; }
.setup-prompt-genre { width:80px; font-size:12px; color:var(--muted); padding-top:8px; }
.setup-saving { position:fixed; bottom:16px; right:16px; background:#222; padding:8px 12px; border-radius:8px; }
```
- [ ] **Step 3: Manual UI verify**
Run: `cd web-ui && npm run dev`
Open: http://localhost:3007/lab/music → YouTube 탭 → (still need 6번 추가 후) 구성 탭. Verify 카드 6개 렌더링.
- [ ] **Step 4: Commit**
```bash
git -C web-ui add src/pages/music/components/SetupTab.jsx src/pages/music/MusicStudio.css
git -C web-ui commit -m "feat(web-ui): SetupTab — YouTube 자동화 구성 허브"
```
---
## Task 13: 프론트엔드 — PipelineTab + 카드 + 시작 모달
**Files:**
- Create: `web-ui/src/pages/music/components/PipelineCard.jsx`
- Create: `web-ui/src/pages/music/components/PipelineStartModal.jsx`
- Create: `web-ui/src/pages/music/components/PipelineTab.jsx`
- Modify: `web-ui/src/pages/music/MusicStudio.css`
- [ ] **Step 1: PipelineCard**
```jsx
// PipelineCard.jsx
import { cancelPipeline, publishPipeline } from '../../../api';
const STEP_LABELS = ['커버','영상','썸네','메타','검토','발행'];
const STEP_KEYS = ['cover','video','thumb','meta','review','publish'];
function stepIndex(state) {
if (state.startsWith('cover')) return 0;
if (state.startsWith('video')) return 1;
if (state.startsWith('thumb')) return 2;
if (state.startsWith('meta')) return 3;
if (state.startsWith('ai_review') || state.startsWith('publish_pending')) return 4;
if (state.startsWith('publish')) return 5;
if (state === 'published') return 6;
return -1;
}
export default function PipelineCard({ pipeline, onChanged }) {
const i = stepIndex(pipeline.state);
const isPending = pipeline.state.endsWith('_pending');
return (
<div className="pipeline-card">
<div className="pipeline-card__head">
<h4>{pipeline.track_title || `Track #${pipeline.track_id}`}</h4>
{!['published','cancelled','failed'].includes(pipeline.state) && (
<button onClick={async () => { await cancelPipeline(pipeline.id); onChanged(); }}>
취소
</button>
)}
</div>
<div className="pipeline-progress">
{STEP_LABELS.map((lbl, idx) => (
<div key={lbl} className={`pipeline-dot ${idx <= i ? 'is-done' : ''} ${idx === i ? 'is-current' : ''}`}>
<span>{lbl}</span>
</div>
))}
</div>
<div className="pipeline-state">현재: {pipeline.state}</div>
{pipeline.review && (
<div className="pipeline-review">
AI 검토: <strong>{pipeline.review.verdict}</strong>
({pipeline.review.weighted_total}/100)
</div>
)}
{pipeline.state === 'publish_pending' && (
<button className="button primary"
onClick={async () => { await publishPipeline(pipeline.id); onChanged(); }}>
YouTube 업로드
</button>
)}
{pipeline.youtube_video_id && (
<a href={`https://youtu.be/${pipeline.youtube_video_id}`} target="_blank" rel="noreferrer">
유튜브에서 보기
</a>
)}
{pipeline.feedback && pipeline.feedback.length > 0 && (
<details className="pipeline-feedback">
<summary>피드백 히스토리 ({pipeline.feedback.length})</summary>
{pipeline.feedback.map(f => (
<div key={f.id}> [{f.step}] {f.feedback_text}</div>
))}
</details>
)}
</div>
);
}
```
- [ ] **Step 2: PipelineStartModal**
```jsx
// PipelineStartModal.jsx
import { useState } from 'react';
import { createPipeline, startPipeline } from '../../../api';
export default function PipelineStartModal({ library, onClose, onCreated }) {
const [tid, setTid] = useState(library?.[0]?.id || '');
const [error, setError] = useState('');
const submit = async () => {
try {
const p = await createPipeline(parseInt(tid));
await startPipeline(p.id);
onCreated(p);
} catch (e) { setError(String(e)); }
};
return (
<div className="modal-overlay" onClick={onClose}>
<div className="modal-body" onClick={e => e.stopPropagation()}>
<h3> 파이프라인 시작</h3>
<select value={tid} onChange={e => setTid(e.target.value)}>
{(library || []).map(t => (
<option key={t.id} value={t.id}>{t.title} ({t.genre})</option>
))}
</select>
{error && <div className="ms-error">{error}</div>}
<div className="modal-actions">
<button onClick={onClose}>취소</button>
<button className="button primary" onClick={submit}>시작</button>
</div>
</div>
</div>
);
}
```
- [ ] **Step 3: PipelineTab**
```jsx
// PipelineTab.jsx
import { useEffect, useState, useRef } from 'react';
import { listPipelines } from '../../../api';
import PipelineCard from './PipelineCard';
import PipelineStartModal from './PipelineStartModal';
export default function PipelineTab({ library }) {
const [pipelines, setPipelines] = useState([]);
const [filter, setFilter] = useState('active');
const [modalOpen, setModalOpen] = useState(false);
const timer = useRef(null);
const load = async () => {
try {
const r = await listPipelines(filter);
setPipelines(r.pipelines || []);
} catch (e) { /* swallow */ }
};
useEffect(() => {
load();
timer.current = setInterval(load, 5000);
return () => clearInterval(timer.current);
}, [filter]);
return (
<div className="pipeline-container">
<div className="pipeline-toolbar">
<button className="button primary" onClick={() => setModalOpen(true)}>+ 파이프라인</button>
<select value={filter} onChange={e => setFilter(e.target.value)}>
<option value="active">진행 </option>
<option value="all">전체</option>
</select>
</div>
<div className="pipeline-grid">
{pipelines.map(p => (
<PipelineCard key={p.id} pipeline={p} onChanged={load} />
))}
{pipelines.length === 0 && <p className="ms-empty">진행 중인 파이프라인이 없습니다</p>}
</div>
{modalOpen && (
<PipelineStartModal
library={library}
onClose={() => setModalOpen(false)}
onCreated={() => { setModalOpen(false); load(); }}
/>
)}
</div>
);
}
```
- [ ] **Step 4: CSS**
```css
.pipeline-container { padding:16px; }
.pipeline-toolbar { display:flex; gap:12px; margin-bottom:16px; }
.pipeline-grid { display:grid; grid-template-columns:repeat(auto-fill, minmax(320px, 1fr)); gap:16px; }
.pipeline-card { background:rgba(0,0,0,.3); border:1px solid var(--line); border-radius:14px; padding:16px; }
.pipeline-card__head { display:flex; justify-content:space-between; margin-bottom:12px; }
.pipeline-progress { display:flex; gap:6px; margin:12px 0; }
.pipeline-dot { flex:1; text-align:center; padding:6px 0; border-radius:8px;
background:rgba(255,255,255,.05); font-size:11px; color:var(--muted); }
.pipeline-dot.is-done { background:rgba(56,189,248,.2); color:#bae6fd; }
.pipeline-dot.is-current { box-shadow:0 0 8px rgba(56,189,248,.6); }
.pipeline-state { font-size:13px; color:var(--text); margin:8px 0; }
.pipeline-review { font-size:12px; color:var(--muted); }
.pipeline-feedback { margin-top:12px; font-size:12px; }
.modal-overlay { position:fixed; inset:0; background:rgba(0,0,0,.6);
display:flex; align-items:center; justify-content:center; z-index:1000; }
.modal-body { background:#1a1a2e; padding:24px; border-radius:14px; min-width:320px; }
.modal-actions { display:flex; justify-content:flex-end; gap:8px; margin-top:16px; }
```
- [ ] **Step 5: Commit**
```bash
git -C web-ui add src/pages/music/components/PipelineCard.jsx \
src/pages/music/components/PipelineStartModal.jsx \
src/pages/music/components/PipelineTab.jsx \
src/pages/music/MusicStudio.css
git -C web-ui commit -m "feat(web-ui): PipelineTab — 진행 중 파이프라인 카드 보드"
```
---
## Task 14: YoutubeTab 6 서브탭 연결 + Library 트리거
**Files:**
- Modify: `web-ui/src/pages/music/components/YoutubeTab.jsx`
- Modify: `web-ui/src/pages/music/MusicStudio.jsx` (Library 카드에 버튼)
- [ ] **Step 1: Update YoutubeTab**
```jsx
// YoutubeTab.jsx (replace existing)
import { useState, useEffect } from 'react';
import VideoProjectsTab from './VideoProjectsTab';
import RevenueTab from './RevenueTab';
import TrendsTab from './TrendsTab';
import CompileTab from './CompileTab';
import PipelineTab from './PipelineTab';
import SetupTab from './SetupTab';
export default function YoutubeTab({ library, initialTrackId, onClearInitialTrack, openPipelineFor }) {
const [subtab, setSubtab] = useState('pipeline');
useEffect(() => {
if (initialTrackId) setSubtab('video');
if (openPipelineFor) setSubtab('pipeline');
}, [initialTrackId, openPipelineFor]);
const tabs = [
['pipeline','🚀 진행'],
['video','🎬 영상 제작'],
['compile','🎵 컴파일'],
['trends','📊 시장 트렌드'],
['revenue','💰 수익 추적'],
['setup','⚙️ 구성'],
];
return (
<div className="yt-container">
<nav className="yt-subtabs">
{tabs.map(([key, label]) => (
<button key={key} type="button"
className={`yt-subtab ${subtab === key ? 'is-active' : ''}`}
onClick={() => setSubtab(key)}>{label}</button>
))}
</nav>
{subtab === 'pipeline' && <PipelineTab library={library} initialTrackId={openPipelineFor} />}
{subtab === 'video' && <VideoProjectsTab library={library} initialTrackId={initialTrackId} onClearInitialTrack={onClearInitialTrack} />}
{subtab === 'compile' && <CompileTab library={library} />}
{subtab === 'trends' && <TrendsTab />}
{subtab === 'revenue' && <RevenueTab />}
{subtab === 'setup' && <SetupTab />}
</div>
);
}
```
- [ ] **Step 2: Library 카드에 영상 파이프라인 트리거**
`MusicStudio.jsx`에서 Library 트랙 카드 액션에 추가:
```jsx
// 기존 트랙 카드 버튼 옆
<button
type="button"
className="ms-lib-action"
onClick={() => {
setActiveTab('youtube');
setOpenPipelineFor(track.id);
}}>
🎬 영상 파이프라인
</button>
```
`MusicStudio.jsx` 상태에 `openPipelineFor` 추가, YoutubeTab에 prop 전달.
또한 `PipelineTab`에서 `initialTrackId`(prop)로 모달 자동 열기:
```jsx
// PipelineTab.jsx 추가
useEffect(() => {
if (initialTrackId) setModalOpen(true);
}, [initialTrackId]);
```
- [ ] **Step 3: Build + manual verify**
Run: `cd web-ui && npm run build && npm run dev`
Open: http://localhost:3007/lab/music
Verify:
- YouTube 탭에 6개 서브탭 (진행 / 영상 제작 / 컴파일 / 트렌드 / 수익 / 구성)
- 구성 탭 카드 6개
- 진행 탭 빈 상태 "진행 중인 파이프라인이 없습니다"
- Library 카드의 "🎬 영상 파이프라인" 버튼 → 진행 탭 이동 + 시작 모달 자동
- [ ] **Step 4: Commit**
```bash
git -C web-ui add src/pages/music/components/YoutubeTab.jsx \
src/pages/music/MusicStudio.jsx
git -C web-ui commit -m "feat(web-ui): YouTube 6 서브탭 + Library 영상 파이프라인 트리거"
```
---
## Task 15: docker-compose env + 의존성 + 배포
**Files:**
- Modify: `docker-compose.yml`
- Modify: `music-lab/Dockerfile` (only if needs build deps)
- Modify: `nginx/conf.d/default.conf`
- [ ] **Step 1: docker-compose env**
`docker-compose.yml` music-lab 서비스 environment 섹션에 추가:
```yaml
environment:
# 기존 변수들...
OPENAI_API_KEY: ${OPENAI_API_KEY}
YOUTUBE_OAUTH_CLIENT_ID: ${YOUTUBE_OAUTH_CLIENT_ID}
YOUTUBE_OAUTH_CLIENT_SECRET: ${YOUTUBE_OAUTH_CLIENT_SECRET}
YOUTUBE_OAUTH_REDIRECT_URI: ${YOUTUBE_OAUTH_REDIRECT_URI}
CLAUDE_HAIKU_MODEL: claude-haiku-4-5-20251001
CLAUDE_SONNET_MODEL: claude-sonnet-4-6
```
`.env` (NAS local — 커밋 X)에 실제 값 추가.
- [ ] **Step 2: nginx OAuth callback 노출**
`nginx/conf.d/default.conf`의 music-lab 라우트는 이미 `/api/music/`이 있어 자동 매칭. callback이 `/api/music/youtube/callback`이므로 추가 변경 불필요. 다만 외부 redirect URI는:
```
https://gahusb.synology.me/api/music/youtube/callback
```
Google Cloud Console OAuth 클라이언트 → 승인된 리디렉션 URI에 위 URL 등록 필수 (수동, 1회).
- [ ] **Step 3: Dockerfile (필요 시)**
`music-lab/Dockerfile`이 base image에 `libjpeg`, `fontconfig` 등 없으면 추가:
```dockerfile
RUN apt-get update && apt-get install -y --no-install-recommends \
fonts-dejavu-core fontconfig && rm -rf /var/lib/apt/lists/*
```
(기존 `video_producer.py`가 PIL/FFmpeg 쓰고 있어 이미 있을 가능성 — 확인 후 필요 시만 추가)
- [ ] **Step 4: Commit + push (NAS 자동 배포)**
```bash
git -C web-backend add docker-compose.yml music-lab/Dockerfile nginx/conf.d/default.conf
git -C web-backend commit -m "chore(infra): pipeline env + nginx callback 라우팅"
git -C web-backend push origin main
```
배포 완료 후 NAS .env에 값 채우고 `docker compose up -d music-lab`로 재시작.
---
## Task 16: 통합 테스트 + 수동 E2E
- [ ] **Step 1: Integration test**
`music-lab/tests/test_pipeline_flow.py`:
```python
import pytest
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
from app import db
@pytest.fixture
def client(monkeypatch, tmp_path):
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
db.init_db()
db.save_track({"id": 1, "title": "T", "genre": "lo-fi", "bpm": 85,
"key":"C","scale":"minor","moods":[],"instruments":[],
"audio_url":"/x.mp3","duration_sec":120})
return TestClient(app)
@patch("app.pipeline.orchestrator.youtube.upload_video", return_value={"video_id": "VID999"})
@patch("app.pipeline.orchestrator.review.run_4_axis", new=AsyncMock(return_value={
"metadata_quality":{"score":80,"notes":""},"policy_compliance":{"score":90,"issues":[]},
"viewer_experience":{"score":80,"notes":""},"trend_alignment":{"score":70,"matched_keywords":[]},
"weighted_total":80.0,"verdict":"pass","summary":"good","used_fallback":False}))
@patch("app.pipeline.orchestrator.metadata.generate", new=AsyncMock(return_value={
"title":"X","description":"Y","tags":["lofi"],"category_id":10,"used_fallback":False,"error":None}))
@patch("app.pipeline.thumb.generate", return_value={"url":"/m/t.jpg","used_fallback":False})
@patch("app.pipeline.video.generate", return_value={"url":"/m/v.mp4","used_fallback":False,"duration_sec":120})
@patch("app.pipeline.cover.generate", new=AsyncMock(return_value={"url":"/m/c.jpg","used_fallback":False,"error":None}))
def test_full_pipeline_happy_path(*_, client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
client.post(f"/api/music/pipeline/{pid}/start")
# cover 자동 생성 후 cover_pending
p = db.get_pipeline(pid)
assert p["state"] == "cover_pending"
# 4단계 모두 승인
for step in ["cover", "video", "thumb", "meta"]:
client.post(f"/api/music/pipeline/{pid}/feedback",
json={"step": step, "intent": "approve"})
# ai_review 후 publish_pending
p = db.get_pipeline(pid)
assert p["state"] == "publish_pending"
# 발행
client.post(f"/api/music/pipeline/{pid}/publish")
p = db.get_pipeline(pid)
assert p["state"] == "published"
assert p["youtube_video_id"] == "VID999"
```
- [ ] **Step 2: Run**
Run: `python -m pytest tests/test_pipeline_flow.py -v`
Expected: PASS
- [ ] **Step 3: Commit**
```bash
git add music-lab/tests/test_pipeline_flow.py
git commit -m "test(music-lab): 풀 파이프라인 통합 테스트 (mock)"
git push origin main
```
- [ ] **Step 4: 수동 E2E 체크리스트**
배포 후 운영 환경에서:
- [ ] Google Cloud Console에서 OAuth 클라이언트 생성, redirect URI 등록
- [ ] NAS .env에 OPENAI_API_KEY, YOUTUBE_OAUTH_* 채우고 music-lab 재시작
- [ ] 구성 탭 → "Google 계정 연결" → 채널명 표시 확인
- [ ] Library 트랙 → "🎬 영상 파이프라인" → 진행 탭 카드 생성
- [ ] 텔레그램에 "커버 검토" 알림 도착 (커버 이미지 URL 포함)
- [ ] 텔레그램에 "승인" 답장 → 다음 단계 카드 도착
- [ ] 어떤 단계에서 "반려, 더 어둡게" 답장 → 같은 단계 재생성 + 알림 재도착
- [ ] AI 검토 verdict + 점수 텔레그램 도착
- [ ] "승인" → YouTube에 private 업로드 → URL 수신 + 진행 탭 카드 published
- [ ] YouTube Studio에서 비공개 영상 확인
---
## Self-Review
**Spec coverage**: 스펙 15개 섹션 → 본 plan 16 task 매핑 OK.
- 1 배경: 의도만, plan에 직접 매핑 없음 (OK, 코드 안 만듦)
- 2 비목표: 명시 (OK)
- 3 사용자 흐름: Task 8(엔드포인트), Task 13(PipelineTab), Task 10(텔레그램)
- 4 아키텍처: Task 8, 10
- 5 상태 머신: Task 2
- 6 프론트: Task 11, 12, 13, 14
- 7 백엔드 상세: Task 3-9
- 8 데이터 모델: Task 1
- 9 API: Task 8 (12개 + telegram-msg + lookup-by-msg = 14개. 스펙은 12개 명시했으나 telegram 매칭용 2개 보완 필요로 추가)
- 10 비동기 + 폴백: 각 Task의 fallback 분기에 포함
- 11 에러 처리: Task 8(409, 404), Task 7(quota), Task 3(timeout 폴백) 등
- 12 보안: 의존성 추가 외 별도 스텝 없음 (스펙도 "기본"이라 명시) → OK
- 13 테스트 전략: Task 1, 2, 3, 5, 6, 7, 8, 9, 10, 16에 분산
- 14 마이그레이션 / 환경: Task 1 (init_db), Task 7 (deps), Task 15 (compose)
- 15 산출물 / 후속: 주석으로만, 코드 작업 없음
**Placeholders**: "TBD", "implement later" 등 검색 → 없음. "차후"는 비목표 명시용.
**Type consistency**: `state` 값들(`cover_pending` 등) state_machine.py 한 곳에서 관리, 다른 모든 곳 일치. `intent` 값(`approve`/`reject`/`unclear`) 일관. 함수명 일관 (`run_step`, `classify`, `next_state_on_approve`).
**스펙 커버 보정**: telegram-msg 저장/조회 endpoint 2개를 plan Task 10에 명시(스펙 12개 + 운영용 2개). 큰 디자인 변경 아님.
---