Files
web-page-backend/docs/superpowers/plans/2026-05-07-music-youtube-pipeline.md
gahusb e03d074222 docs(plan): Music YouTube 파이프라인 구현 계획 — 16 task
스펙 2026-05-07-music-youtube-pipeline-design.md를 16개 task로 분해.
TDD 패턴: 각 task = 실패 테스트 → 구현 → 통과 → 커밋.

태스크 흐름:
1. DB 5개 테이블 + 헬퍼
2. 상태 머신
3. Storage + 커버 (DALL·E + 폴백)
4. 영상/썸네일 (FFmpeg)
5. 메타데이터 (Claude Haiku)
6. AI 검토 4축 (Claude Sonnet + 휴리스틱)
7. YouTube OAuth + 업로드
8. 오케스트레이터 + 13 엔드포인트
9. agent-office 자연어 의도 분류
10. youtube_publisher 에이전트 + 30s 폴링
11. web-ui api.js 헬퍼
12. SetupTab
13. PipelineTab + 카드
14. YoutubeTab 6 서브탭 + Library 트리거
15. docker-compose env + nginx
16. 통합 테스트 + 수동 E2E

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:23:46 +09:00

118 KiB
Raw Permalink Blame History

Music YouTube Pipeline Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 트랙 → 영상 → YouTube 발행까지 단계별 텔레그램 승인 파이프라인 구축. 각 단계 자동 산출물 생성 → 텔레그램 알림 → 자연어 응답으로 승인/반려/피드백 처리 → 다음 단계 또는 재생성.

Architecture: music-lab(생성+상태머신+OAuth+업로드) ↔ agent-office(텔레그램 단일 채널 + 자연어 분류 + 폴링 오케스트레이션) ↔ web-ui(구성·진행 탭 + Library 트리거). 모든 AI/생성 작업은 BackgroundTask + DB job 상태 폴링.

Tech Stack:

  • music-lab: FastAPI, SQLite, Anthropic Claude SDK, OpenAI SDK, google-api-python-client, google-auth-oauthlib, FFmpeg, Pillow
  • agent-office: FastAPI, APScheduler, python-telegram (기존), Anthropic SDK
  • web-ui: React 18, Vite, fetch-based API helpers (기존 src/api.js)
  • Tests: pytest + httpx 모킹 (respx/httpx_mock), freezegun, 기존 conftest 패턴

Spec: docs/superpowers/specs/2026-05-07-music-youtube-pipeline-design.md


File Structure

music-lab (web-backend/music-lab/)

경로 책임
app/db.py (modify) 5개 신규 테이블 + 헬퍼 함수
app/pipeline/__init__.py (new) 패키지 초기화
app/pipeline/state_machine.py (new) 상태 전이 검증, transition() 함수
app/pipeline/orchestrator.py (new) start_step(pipeline_id, step) BackgroundTask 등록
app/pipeline/cover.py (new) DALL·E 3 호출 + 그라데이션 폴백
app/pipeline/video.py (move/rename from video_producer.py) FFmpeg visualizer/슬라이드쇼 (커버 입력 지원)
app/pipeline/thumb.py (new) 썸네일 추출 + 텍스트 오버레이
app/pipeline/metadata.py (new) Claude Haiku 메타 생성 + 템플릿 치환
app/pipeline/review.py (new) Claude Sonnet 4축 검토 + 가중평균
app/pipeline/youtube.py (new) OAuth flow + resumable upload
app/pipeline/storage.py (new) /data/videos/{id}/ 디렉토리 관리
app/pipeline/setup.py (new) youtube_setup CRUD
app/main.py (modify) 13개 엔드포인트 추가
tests/test_state_machine.py (new) 전이 검증
tests/test_pipeline_endpoints.py (new) CRUD + feedback
tests/test_cover_generation.py (new) DALL·E mock + 폴백
tests/test_metadata_generation.py (new) Claude mock + 템플릿
tests/test_review.py (new) 4축 검토 + verdict
tests/test_youtube_upload.py (new) google-api mock + retry
requirements.txt (modify) openai, google-api-python-client, google-auth-oauthlib

agent-office (web-backend/agent-office/)

경로 책임
app/agents/youtube_publisher.py (new) 오케스트레이터 — poll + classify + feedback
app/agents/__init__.py (modify) AGENT_REGISTRY 등록
app/scheduler.py (modify) _poll_pipelines 30초 잡 추가
app/telegram/conversational.py (modify) reply 매칭 → youtube_publisher 라우팅
app/service_proxy.py (modify) music-lab pipeline 헬퍼
tests/test_classify_intent.py (new) 화이트리스트/LLM 분기
tests/test_pipeline_polling.py (new) 멱등 폴링

web-ui (web-ui/)

경로 책임
src/api.js (modify) pipeline/setup/youtube 헬퍼
src/pages/music/components/SetupTab.jsx (new) 구성 탭
src/pages/music/components/PipelineTab.jsx (new) 진행 탭
src/pages/music/components/PipelineCard.jsx (new) 카드 1장 (진행도/현재상태/피드백)
src/pages/music/components/PipelineStartModal.jsx (new) Library 트랙 선택 모달
src/pages/music/components/YoutubeTab.jsx (modify) 서브탭 6개로
src/pages/music/components/Library.jsx 또는 MusicStudio 라이브러리 부분 (modify) "🎬 영상 파이프라인" 버튼
src/pages/music/MusicStudio.css (modify) 진행 탭/구성 탭 스타일

docker-compose / nginx

경로 변경
docker-compose.yml (modify) music-lab env 4개 추가
nginx/conf.d/default.conf (modify) /api/music/youtube/callback 외부 노출
music-lab/Dockerfile (modify) 새 의존성 빌드

Task 1: music-lab DB 신규 테이블 + 헬퍼

Files:

  • Modify: music-lab/app/db.py

  • Test: music-lab/tests/test_pipeline_db.py

  • Step 1: Write the failing test tests/test_pipeline_db.py

import os
import tempfile
import pytest
from app import db


@pytest.fixture
def fresh_db(monkeypatch, tmp_path):
    db_path = tmp_path / "music.db"
    monkeypatch.setattr(db, "DB_PATH", str(db_path))
    db.init_db()
    return db_path


def test_create_pipeline_inserts_row(fresh_db):
    pid = db.create_pipeline(track_id=1)
    row = db.get_pipeline(pid)
    assert row["id"] == pid
    assert row["state"] == "created"
    assert row["track_id"] == 1
    assert row["feedback_count_per_step"] == {}


def test_update_pipeline_state_records_started_at(fresh_db, freezer):
    pid = db.create_pipeline(track_id=1)
    freezer.move_to("2026-05-07T08:00:00")
    db.update_pipeline_state(pid, "cover_pending")
    row = db.get_pipeline(pid)
    assert row["state"] == "cover_pending"
    assert row["state_started_at"] == "2026-05-07T08:00:00"


def test_increment_feedback_count(fresh_db):
    pid = db.create_pipeline(track_id=1)
    db.increment_feedback_count(pid, "cover")
    db.increment_feedback_count(pid, "cover")
    row = db.get_pipeline(pid)
    assert row["feedback_count_per_step"] == {"cover": 2}


def test_record_feedback(fresh_db):
    pid = db.create_pipeline(track_id=1)
    db.record_feedback(pid, "cover", "더 어둡게")
    rows = db.get_feedback_history(pid)
    assert len(rows) == 1
    assert rows[0]["feedback_text"] == "더 어둡게"


def test_create_pipeline_job_lifecycle(fresh_db):
    pid = db.create_pipeline(track_id=1)
    job_id = db.create_pipeline_job(pid, "cover")
    db.update_pipeline_job(job_id, status="running")
    db.update_pipeline_job(job_id, status="succeeded", duration_ms=1234)
    jobs = db.list_pipeline_jobs(pid)
    assert jobs[0]["status"] == "succeeded"
    assert jobs[0]["duration_ms"] == 1234


def test_youtube_setup_default_row_created_on_init(fresh_db):
    setup = db.get_youtube_setup()
    assert setup["review_threshold"] == 60
    assert "metadata_template_json" in setup


def test_youtube_oauth_token_upsert(fresh_db):
    db.upsert_oauth_token(
        channel_id="UC123",
        channel_title="My Channel",
        avatar_url="https://...",
        refresh_token="r1",
        access_token="a1",
        expires_at="2026-05-07T09:00:00",
    )
    tok = db.get_oauth_token()
    assert tok["channel_id"] == "UC123"
    assert tok["refresh_token"] == "r1"
    db.upsert_oauth_token(
        channel_id="UC123", channel_title="My Channel",
        avatar_url=None, refresh_token="r2",
        access_token="a2", expires_at="2026-05-07T10:00:00",
    )
    tok = db.get_oauth_token()
    assert tok["refresh_token"] == "r2"  # upsert

requirements.txtfreezegun 추가 필요.

  • Step 2: Run test to verify it fails

Run: cd music-lab && python -m pytest tests/test_pipeline_db.py -v Expected: ImportError on db.create_pipeline 등 (함수 미존재)

  • Step 3: Add tables and helpers to db.py

init_db() 끝에 다음 5개 CREATE TABLE IF NOT EXISTS 추가:

def init_db():
    # ... existing tables ...
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS video_pipelines (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            track_id INTEGER NOT NULL,
            state TEXT NOT NULL DEFAULT 'created',
            state_started_at TEXT NOT NULL,
            cover_url TEXT,
            video_url TEXT,
            thumbnail_url TEXT,
            metadata_json TEXT,
            review_json TEXT,
            youtube_video_id TEXT,
            feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
            last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
            created_at TEXT NOT NULL,
            updated_at TEXT NOT NULL,
            cancelled_at TEXT,
            failed_reason TEXT
        )
    """)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS pipeline_jobs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            pipeline_id INTEGER NOT NULL,
            step TEXT NOT NULL,
            status TEXT NOT NULL DEFAULT 'queued',
            error TEXT,
            started_at TEXT,
            finished_at TEXT,
            duration_ms INTEGER,
            FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
        )
    """)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS pipeline_feedback (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            pipeline_id INTEGER NOT NULL,
            step TEXT NOT NULL,
            feedback_text TEXT NOT NULL,
            received_at TEXT NOT NULL,
            FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
        )
    """)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS youtube_oauth_tokens (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            channel_id TEXT NOT NULL,
            channel_title TEXT,
            avatar_url TEXT,
            refresh_token TEXT NOT NULL,
            access_token TEXT,
            expires_at TEXT,
            created_at TEXT NOT NULL
        )
    """)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS youtube_setup (
            id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1),
            metadata_template_json TEXT NOT NULL,
            cover_prompts_json TEXT NOT NULL,
            review_weights_json TEXT NOT NULL,
            review_threshold INTEGER NOT NULL DEFAULT 60,
            visual_defaults_json TEXT NOT NULL,
            publish_policy_json TEXT NOT NULL,
            updated_at TEXT NOT NULL
        )
    """)

    # 기본 setup 1행 보장
    cursor.execute("SELECT COUNT(*) FROM youtube_setup")
    if cursor.fetchone()[0] == 0:
        import json
        from datetime import datetime
        defaults = (
            json.dumps({
                "title": "[{genre}] {title} | {bpm}BPM",
                "description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n",
                "tags": ["lofi", "chill", "instrumental"],
                "category_id": 10,
            }),
            json.dumps({
                "lo-fi": "moody anime cityscape at dusk, lofi aesthetic",
                "phonk": "dark drift car aesthetic, neon, phonk vibe",
                "ambient": "ethereal mountain landscape, ambient mood",
                "default": "abstract music album cover art",
            }),
            json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}),
            60,
            json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}),
            json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}),
            datetime.utcnow().isoformat(timespec="seconds"),
        )
        cursor.execute("""
            INSERT INTO youtube_setup
              (metadata_template_json, cover_prompts_json, review_weights_json,
               review_threshold, visual_defaults_json, publish_policy_json, updated_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, defaults)

    conn.commit()
    conn.close()

다음 헬퍼 함수 추가 (파일 하단에):

import json as _json
from datetime import datetime as _dt


def _now() -> str:
    return _dt.utcnow().isoformat(timespec="seconds")


def create_pipeline(track_id: int) -> int:
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    now = _now()
    cur.execute("""
        INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at)
        VALUES (?, 'created', ?, ?, ?)
    """, (track_id, now, now, now))
    pid = cur.lastrowid
    conn.commit()
    conn.close()
    return pid


def get_pipeline(pid: int) -> dict | None:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    row = conn.execute("SELECT * FROM video_pipelines WHERE id = ?", (pid,)).fetchone()
    conn.close()
    if not row:
        return None
    d = dict(row)
    d["feedback_count_per_step"] = _json.loads(d["feedback_count_per_step"] or "{}")
    d["last_telegram_msg_ids"] = _json.loads(d["last_telegram_msg_ids"] or "{}")
    if d.get("metadata_json"):
        d["metadata"] = _json.loads(d["metadata_json"])
    if d.get("review_json"):
        d["review"] = _json.loads(d["review_json"])
    return d


def update_pipeline_state(pid: int, state: str, **fields) -> None:
    cols = ["state = ?", "state_started_at = ?", "updated_at = ?"]
    vals = [state, _now(), _now()]
    for k, v in fields.items():
        cols.append(f"{k} = ?")
        vals.append(v)
    vals.append(pid)
    conn = sqlite3.connect(DB_PATH)
    conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals)
    conn.commit()
    conn.close()


def list_pipelines(active_only: bool = False) -> list[dict]:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    if active_only:
        rows = conn.execute("""
            SELECT * FROM video_pipelines
            WHERE state NOT IN ('published','cancelled','failed','awaiting_manual')
            ORDER BY created_at DESC
        """).fetchall()
    else:
        rows = conn.execute("SELECT * FROM video_pipelines ORDER BY created_at DESC").fetchall()
    conn.close()
    return [get_pipeline(r["id"]) for r in rows]


def increment_feedback_count(pid: int, step: str) -> int:
    p = get_pipeline(pid)
    counts = p["feedback_count_per_step"]
    counts[step] = counts.get(step, 0) + 1
    conn = sqlite3.connect(DB_PATH)
    conn.execute("UPDATE video_pipelines SET feedback_count_per_step = ?, updated_at = ? WHERE id = ?",
                 (_json.dumps(counts), _now(), pid))
    conn.commit()
    conn.close()
    return counts[step]


def record_feedback(pid: int, step: str, feedback_text: str) -> None:
    conn = sqlite3.connect(DB_PATH)
    conn.execute("""
        INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at)
        VALUES (?, ?, ?, ?)
    """, (pid, step, feedback_text, _now()))
    conn.commit()
    conn.close()


def get_feedback_history(pid: int) -> list[dict]:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    rows = conn.execute("""
        SELECT * FROM pipeline_feedback
        WHERE pipeline_id = ? ORDER BY id DESC
    """, (pid,)).fetchall()
    conn.close()
    return [dict(r) for r in rows]


def create_pipeline_job(pid: int, step: str) -> int:
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at)
        VALUES (?, ?, 'queued', ?)
    """, (pid, step, _now()))
    job_id = cur.lastrowid
    conn.commit()
    conn.close()
    return job_id


def update_pipeline_job(job_id: int, **fields) -> None:
    if "status" in fields and fields["status"] in ("succeeded", "failed"):
        fields["finished_at"] = _now()
    cols = ", ".join(f"{k} = ?" for k in fields)
    vals = list(fields.values()) + [job_id]
    conn = sqlite3.connect(DB_PATH)
    conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals)
    conn.commit()
    conn.close()


def list_pipeline_jobs(pid: int) -> list[dict]:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    rows = conn.execute("""
        SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC
    """, (pid,)).fetchall()
    conn.close()
    return [dict(r) for r in rows]


def get_youtube_setup() -> dict:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
    conn.close()
    d = dict(row)
    for k in ("metadata_template_json", "cover_prompts_json",
              "review_weights_json", "visual_defaults_json", "publish_policy_json"):
        d[k.replace("_json", "")] = _json.loads(d[k])
    return d


def update_youtube_setup(**kwargs) -> None:
    field_map = {
        "metadata_template": "metadata_template_json",
        "cover_prompts": "cover_prompts_json",
        "review_weights": "review_weights_json",
        "visual_defaults": "visual_defaults_json",
        "publish_policy": "publish_policy_json",
    }
    cols = []
    vals = []
    for k, v in kwargs.items():
        if k in field_map:
            cols.append(f"{field_map[k]} = ?")
            vals.append(_json.dumps(v))
        elif k == "review_threshold":
            cols.append("review_threshold = ?")
            vals.append(int(v))
    if not cols:
        return
    cols.append("updated_at = ?")
    vals.append(_now())
    conn = sqlite3.connect(DB_PATH)
    conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals)
    conn.commit()
    conn.close()


def upsert_oauth_token(channel_id: str, channel_title: str | None,
                        avatar_url: str | None, refresh_token: str,
                        access_token: str | None, expires_at: str | None) -> None:
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("DELETE FROM youtube_oauth_tokens")
    cur.execute("""
        INSERT INTO youtube_oauth_tokens
          (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now()))
    conn.commit()
    conn.close()


def get_oauth_token() -> dict | None:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone()
    conn.close()
    return dict(row) if row else None


def delete_oauth_token() -> None:
    conn = sqlite3.connect(DB_PATH)
    conn.execute("DELETE FROM youtube_oauth_tokens")
    conn.commit()
    conn.close()
  • Step 4: Run tests to verify pass

Run: cd music-lab && python -m pytest tests/test_pipeline_db.py -v Expected: 모두 PASS (7 tests)

  • Step 5: Commit
git add music-lab/app/db.py music-lab/tests/test_pipeline_db.py music-lab/requirements.txt
git commit -m "feat(music-lab): pipeline 5개 DB 테이블 + 헬퍼"

Task 2: 상태 머신

Files:

  • Create: music-lab/app/pipeline/__init__.py

  • Create: music-lab/app/pipeline/state_machine.py

  • Test: music-lab/tests/test_state_machine.py

  • Step 1: Write the failing test

# tests/test_state_machine.py
import pytest
from app.pipeline.state_machine import (
    next_state_on_approve, next_state_on_reject, can_transition, STEPS, USER_GATES,
)


def test_steps_sequence():
    assert STEPS == ["cover", "video", "thumb", "meta", "review", "publish"]


def test_user_gates_excludes_review():
    assert "review" not in USER_GATES
    assert "publish" in USER_GATES
    assert "cover" in USER_GATES


def test_approve_progression():
    assert next_state_on_approve("cover_pending") == "video_pending"
    assert next_state_on_approve("video_pending") == "thumb_pending"
    assert next_state_on_approve("thumb_pending") == "meta_pending"
    assert next_state_on_approve("meta_pending") == "ai_review"
    assert next_state_on_approve("publish_pending") == "publishing"


def test_approve_invalid_state_raises():
    with pytest.raises(ValueError):
        next_state_on_approve("ai_review")  # 자동 전이 — approve 호출 자체가 무효


def test_reject_keeps_same_state():
    # 반려는 같은 *_pending 상태를 유지(재생성 트리거)
    assert next_state_on_reject("cover_pending") == "cover_pending"
    assert next_state_on_reject("publish_pending") == "publish_pending"


def test_can_transition_blocks_terminal_states():
    assert not can_transition("published", "cover_pending")
    assert not can_transition("cancelled", "cover_pending")
    assert not can_transition("failed", "cover_pending")


def test_can_transition_allows_cancel_from_anywhere():
    assert can_transition("cover_pending", "cancelled")
    assert can_transition("publishing", "cancelled")


def test_can_transition_allows_failed_from_pending():
    assert can_transition("video_pending", "failed")
    assert can_transition("publishing", "failed")
  • Step 2: Run test to verify it fails

Run: cd music-lab && python -m pytest tests/test_state_machine.py -v Expected: ImportError

  • Step 3: Implement state_machine.py

app/pipeline/__init__.py:

# 빈 파일

app/pipeline/state_machine.py:

"""파이프라인 상태 머신 — 전이 규칙 단일 소스."""

STEPS = ["cover", "video", "thumb", "meta", "review", "publish"]
USER_GATES = ["cover", "video", "thumb", "meta", "publish"]  # review는 자동

_APPROVE_NEXT = {
    "cover_pending":   "video_pending",
    "video_pending":   "thumb_pending",
    "thumb_pending":   "meta_pending",
    "meta_pending":    "ai_review",      # 자동 검토 단계로
    "publish_pending": "publishing",
}

TERMINAL_STATES = {"published", "cancelled", "failed", "awaiting_manual"}


def next_state_on_approve(state: str) -> str:
    if state not in _APPROVE_NEXT:
        raise ValueError(f"승인 불가 상태: {state}")
    return _APPROVE_NEXT[state]


def next_state_on_reject(state: str) -> str:
    if not state.endswith("_pending"):
        raise ValueError(f"반려 불가 상태: {state}")
    return state  # 같은 상태 유지 (재생성 후 다시 _pending)


def can_transition(from_state: str, to_state: str) -> bool:
    if from_state in TERMINAL_STATES:
        return False
    if to_state in {"cancelled", "failed", "awaiting_manual"}:
        return True
    if to_state == _APPROVE_NEXT.get(from_state):
        return True
    # 자동 전이 (ai_review → publish_pending, publishing → published)
    auto_transitions = {
        ("ai_review", "publish_pending"),
        ("publishing", "published"),
    }
    return (from_state, to_state) in auto_transitions
  • Step 4: Run tests

Run: cd music-lab && python -m pytest tests/test_state_machine.py -v Expected: 8 PASS

  • Step 5: Commit
git add music-lab/app/pipeline/ music-lab/tests/test_state_machine.py
git commit -m "feat(music-lab): pipeline 상태 머신"

Task 3: Storage 헬퍼 + Cover 생성 (DALL·E + 폴백)

Files:

  • Create: music-lab/app/pipeline/storage.py

  • Create: music-lab/app/pipeline/cover.py

  • Test: music-lab/tests/test_cover_generation.py

  • Modify: music-lab/requirements.txt (openai, respx)

  • Step 1: Add deps

requirements.txt에 추가: openai>=1.20.0, respx>=0.21, freezegun>=1.4

  • Step 2: Write failing test
# tests/test_cover_generation.py
import pytest
import respx
from httpx import Response
from app.pipeline import cover, storage


@pytest.fixture
def tmp_storage(monkeypatch, tmp_path):
    monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path))
    return tmp_path


@pytest.mark.asyncio
@respx.mock
async def test_dalle_success_saves_jpg(tmp_storage, monkeypatch):
    monkeypatch.setenv("OPENAI_API_KEY", "test-key")
    image_url = "https://oaidalleapiprodscus.blob.core.windows.net/x.png"
    respx.post("https://api.openai.com/v1/images/generations").mock(
        return_value=Response(200, json={"data": [{"url": image_url}]})
    )
    # PNG 1x1 픽셀 (간단)
    png_bytes = bytes.fromhex(
        "89504e470d0a1a0a0000000d49484452000000010000000108020000009077"
        "53de0000000c4944415478da6300010000050001"
    )
    respx.get(image_url).mock(return_value=Response(200, content=png_bytes))

    out = await cover.generate(pipeline_id=42, genre="lo-fi",
                               prompt_template="moody anime", mood="chill")
    assert out["used_fallback"] is False
    assert out["url"].startswith("/media/videos/42/cover")
    assert (tmp_storage / "42" / "cover.jpg").exists()


@pytest.mark.asyncio
@respx.mock
async def test_dalle_timeout_falls_back_to_gradient(tmp_storage, monkeypatch):
    monkeypatch.setenv("OPENAI_API_KEY", "test-key")
    respx.post("https://api.openai.com/v1/images/generations").mock(
        side_effect=lambda req: Response(504)
    )
    out = await cover.generate(pipeline_id=43, genre="phonk",
                               prompt_template="dark drift", mood="aggressive",
                               track_title="Midnight Drive")
    assert out["used_fallback"] is True
    assert (tmp_storage / "43" / "cover.jpg").exists()


@pytest.mark.asyncio
async def test_no_api_key_falls_back(tmp_storage, monkeypatch):
    monkeypatch.delenv("OPENAI_API_KEY", raising=False)
    out = await cover.generate(pipeline_id=44, genre="ambient",
                               prompt_template="x", mood="calm",
                               track_title="Calm")
    assert out["used_fallback"] is True


@pytest.mark.asyncio
@respx.mock
async def test_dalle_with_feedback_appends_to_prompt(tmp_storage, monkeypatch):
    monkeypatch.setenv("OPENAI_API_KEY", "test-key")
    captured = {}
    def hook(req):
        import json as _json
        captured["body"] = _json.loads(req.content)
        return Response(200, json={"data": [{"url": "https://x"}]})
    respx.post("https://api.openai.com/v1/images/generations").mock(side_effect=hook)
    respx.get("https://x").mock(return_value=Response(200, content=b"\x89PNG\r\n\x1a\n"))
    await cover.generate(pipeline_id=45, genre="lo-fi",
                          prompt_template="moody anime", mood="chill",
                          feedback="더 어둡게")
    assert "더 어둡게" in captured["body"]["prompt"]

pytest.ini 또는 conftest.pyasyncio_mode = auto 또는 마커 등록 필요. 기존 conftest 확인.

  • Step 3: Run test, verify fail

Run: python -m pytest tests/test_cover_generation.py -v Expected: ImportError

  • Step 4: Implement storage.py
"""파이프라인 산출물 디렉토리 관리."""
import os

VIDEO_DATA_DIR = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
VIDEO_MEDIA_BASE = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")


def pipeline_dir(pipeline_id: int) -> str:
    path = os.path.join(VIDEO_DATA_DIR, str(pipeline_id))
    os.makedirs(path, exist_ok=True)
    return path


def media_url(pipeline_id: int, filename: str) -> str:
    return f"{VIDEO_MEDIA_BASE}/{pipeline_id}/{filename}"
  • Step 5: Implement cover.py
"""AI 커버 아트 생성 — DALL·E 3 + 그라데이션 폴백."""
import os
import logging
from typing import Optional

import httpx
from PIL import Image, ImageDraw, ImageFont

from . import storage

logger = logging.getLogger("music-lab.cover")

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_MODEL = os.getenv("OPENAI_IMAGE_MODEL", "gpt-image-1")
DALLE_TIMEOUT_S = 90

# 그라데이션 폴백 색 (장르별 RGB 페어)
GRADIENT_COLORS = {
    "lo-fi":   ((26, 26, 46),  (22, 33, 62)),
    "phonk":   ((26, 10, 10),  (45, 0,  0)),
    "ambient": ((13, 33, 55),  (10, 22, 40)),
    "pop":     ((26, 10, 46),  (45, 27, 78)),
    "default": ((17, 24, 39),  (31, 41, 55)),
}


async def generate(*, pipeline_id: int, genre: str, prompt_template: str,
                   mood: str = "", track_title: str = "", feedback: str = "") -> dict:
    """커버 아트 생성. 성공 시 jpg 저장 + URL 반환. 실패 시 그라데이션 폴백.

    반환: {"url": str, "used_fallback": bool, "error": str | None}
    """
    out_path = os.path.join(storage.pipeline_dir(pipeline_id), "cover.jpg")
    used_fallback = False
    error = None

    if OPENAI_API_KEY:
        try:
            await _generate_with_dalle(prompt_template, mood, feedback, out_path)
        except Exception as e:
            logger.warning("DALL·E 실패 — 폴백: %s", e)
            error = str(e)
            used_fallback = True
            _generate_gradient(genre, track_title, out_path)
    else:
        used_fallback = True
        error = "OPENAI_API_KEY 미설정"
        _generate_gradient(genre, track_title, out_path)

    return {
        "url": storage.media_url(pipeline_id, "cover.jpg"),
        "used_fallback": used_fallback,
        "error": error,
    }


async def _generate_with_dalle(prompt_template: str, mood: str,
                                feedback: str, out_path: str) -> None:
    prompt = prompt_template
    if mood:
        prompt = f"{prompt}, {mood} mood"
    if feedback:
        prompt = f"{prompt}. 추가 지시: {feedback}"
    prompt = f"{prompt}, no text, high quality"

    async with httpx.AsyncClient(timeout=DALLE_TIMEOUT_S) as client:
        resp = await client.post(
            "https://api.openai.com/v1/images/generations",
            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
            json={"model": OPENAI_MODEL, "prompt": prompt, "size": "1024x1024", "n": 1},
        )
        resp.raise_for_status()
        url = resp.json()["data"][0]["url"]
        img_resp = await client.get(url)
        img_resp.raise_for_status()
        # PNG → JPG 변환
        from io import BytesIO
        img = Image.open(BytesIO(img_resp.content)).convert("RGB")
        img.save(out_path, "JPEG", quality=92)


def _generate_gradient(genre: str, track_title: str, out_path: str) -> None:
    w, h = 1024, 1024
    top, bot = GRADIENT_COLORS.get(genre.lower(), GRADIENT_COLORS["default"])
    img = Image.new("RGB", (w, h))
    px = img.load()
    for y in range(h):
        t = y / h
        r = int(top[0] + (bot[0] - top[0]) * t)
        g = int(top[1] + (bot[1] - top[1]) * t)
        b = int(top[2] + (bot[2] - top[2]) * t)
        for x in range(w):
            px[x, y] = (r, g, b)

    if track_title:
        try:
            font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 64)
        except OSError:
            font = ImageFont.load_default()
        draw = ImageDraw.Draw(img)
        bbox = draw.textbbox((0, 0), track_title, font=font)
        tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
        draw.text(((w - tw) // 2, (h - th) // 2), track_title, fill=(255, 255, 255), font=font)

    img.save(out_path, "JPEG", quality=92)
  • Step 6: Run tests

Run: python -m pytest tests/test_cover_generation.py -v Expected: 4 PASS

  • Step 7: Commit
git add music-lab/app/pipeline/storage.py music-lab/app/pipeline/cover.py \
        music-lab/tests/test_cover_generation.py music-lab/requirements.txt
git commit -m "feat(music-lab): AI 커버 생성 + 그라데이션 폴백"

Task 4: 영상/썸네일 생성 (FFmpeg, 기존 video_producer 이전)

Files:

  • Create: music-lab/app/pipeline/video.py (move logic from app/video_producer.py)

  • Create: music-lab/app/pipeline/thumb.py

  • Test: music-lab/tests/test_video_thumb.py

  • Step 1: Write failing test

FFmpeg 직접 실행 대신 subprocess.run을 mock 처리.

# tests/test_video_thumb.py
import os
import pytest
from unittest.mock import patch, MagicMock
from app.pipeline import video, thumb, storage


@pytest.fixture
def tmp_storage(monkeypatch, tmp_path):
    monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path))
    # 더미 입력 파일들
    audio = tmp_path / "audio.mp3"
    audio.write_bytes(b"\x00" * 100)
    cover = tmp_path / str(50) / "cover.jpg"
    cover.parent.mkdir(parents=True)
    cover.write_bytes(b"\x00" * 100)
    return tmp_path


@patch("subprocess.run")
def test_generate_video_calls_ffmpeg(mock_run, tmp_storage):
    mock_run.return_value = MagicMock(returncode=0, stderr="")
    out = video.generate(pipeline_id=50, audio_path=str(tmp_storage / "audio.mp3"),
                         cover_path=str(tmp_storage / "50" / "cover.jpg"),
                         genre="lo-fi", duration_sec=120, resolution="1920x1080",
                         style="visualizer")
    assert out["url"].endswith("/50/video.mp4")
    assert out["used_fallback"] is False
    args = mock_run.call_args[0][0]
    assert args[0] == "ffmpeg"
    assert "-i" in args
    assert "showwaves" in " ".join(args)


@patch("subprocess.run")
def test_generate_video_failure_marks_failed(mock_run, tmp_storage):
    mock_run.return_value = MagicMock(returncode=1, stderr="bad codec")
    with pytest.raises(video.VideoGenerationError):
        video.generate(pipeline_id=51, audio_path=str(tmp_storage / "audio.mp3"),
                        cover_path=str(tmp_storage / "50" / "cover.jpg"),
                        genre="lo-fi", duration_sec=120, resolution="1920x1080",
                        style="visualizer")


@patch("subprocess.run")
def test_thumb_extracts_frame(mock_run, tmp_storage):
    mock_run.return_value = MagicMock(returncode=0, stderr="")
    video_path = tmp_storage / "60" / "video.mp4"
    video_path.parent.mkdir(parents=True)
    video_path.write_bytes(b"\x00" * 100)
    out = thumb.generate(pipeline_id=60, video_path=str(video_path),
                          track_title="Midnight Drive", overlay_text=True)
    assert out["url"].endswith("/60/thumbnail.jpg")
    args = mock_run.call_args[0][0]
    assert args[0] == "ffmpeg"
  • Step 2: Run, verify fail

Run: python -m pytest tests/test_video_thumb.py -v Expected: ImportError

  • Step 3: Implement video.py — 기존 video_producer.py의 ffmpeg 명령 빌더 활용
"""영상 비주얼 생성 — visualizer/슬라이드쇼 스타일."""
import os
import subprocess
import logging
from . import storage

logger = logging.getLogger("music-lab.video")

VIDEO_TIMEOUT_S = 300  # 5분


class VideoGenerationError(Exception):
    pass


def generate(*, pipeline_id: int, audio_path: str, cover_path: str,
             genre: str, duration_sec: int, resolution: str = "1920x1080",
             style: str = "visualizer") -> dict:
    """영상 생성. 성공 시 mp4 저장 + URL 반환. 실패 시 예외."""
    w, h = resolution.split("x")
    out_path = os.path.join(storage.pipeline_dir(pipeline_id), "video.mp4")

    if style == "visualizer":
        cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h)
    else:
        # 차후: 슬라이드쇼 등 다른 스타일
        cmd = _build_visualizer_cmd(audio_path, cover_path, out_path, w, h)

    logger.info("ffmpeg 실행: %s", " ".join(cmd))
    result = subprocess.run(cmd, capture_output=True, text=True, timeout=VIDEO_TIMEOUT_S)
    if result.returncode != 0:
        raise VideoGenerationError(f"ffmpeg 실패: {result.stderr[:500]}")

    return {
        "url": storage.media_url(pipeline_id, "video.mp4"),
        "used_fallback": False,
        "duration_sec": duration_sec,
    }


def _build_visualizer_cmd(audio: str, bg: str, out: str, w: str, h: str) -> list:
    return [
        "ffmpeg", "-y",
        "-loop", "1", "-i", bg,
        "-i", audio,
        "-filter_complex",
        f"[0:v]scale={w}:{h}[bg];"
        f"[1:a]showwaves=s={w}x200:mode=cline:colors=0xFF4444@0.8[wave];"
        f"[bg][wave]overlay=0:({h}-200)[out]",
        "-map", "[out]", "-map", "1:a",
        "-c:v", "libx264", "-preset", "fast", "-crf", "23",
        "-c:a", "aac", "-b:a", "192k",
        "-shortest", out,
    ]
  • Step 4: Implement thumb.py
"""썸네일 생성 — 영상 5초 프레임 추출 + 텍스트 오버레이."""
import os
import subprocess
import logging
from PIL import Image, ImageDraw, ImageFont

from . import storage

logger = logging.getLogger("music-lab.thumb")
THUMB_TIMEOUT_S = 60


class ThumbGenerationError(Exception):
    pass


def generate(*, pipeline_id: int, video_path: str,
             track_title: str = "", overlay_text: bool = True) -> dict:
    out_path = os.path.join(storage.pipeline_dir(pipeline_id), "thumbnail.jpg")
    cmd = ["ffmpeg", "-y", "-i", video_path,
           "-ss", "00:00:05", "-vframes", "1", "-q:v", "2", out_path]
    result = subprocess.run(cmd, capture_output=True, text=True, timeout=THUMB_TIMEOUT_S)
    if result.returncode != 0:
        raise ThumbGenerationError(f"ffmpeg 썸네일 실패: {result.stderr[:300]}")

    if overlay_text and track_title:
        _overlay_title(out_path, track_title)

    return {"url": storage.media_url(pipeline_id, "thumbnail.jpg"), "used_fallback": False}


def _overlay_title(path: str, title: str) -> None:
    try:
        img = Image.open(path).convert("RGB")
        draw = ImageDraw.Draw(img)
        try:
            font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 80)
        except OSError:
            font = ImageFont.load_default()
        # 하단 30% 영역에 검정 반투명 박스 + 흰 글씨
        w, h = img.size
        box_h = int(h * 0.3)
        overlay = Image.new("RGBA", (w, box_h), (0, 0, 0, 160))
        img.paste(overlay, (0, h - box_h), overlay)
        bbox = draw.textbbox((0, 0), title, font=font)
        tw = bbox[2] - bbox[0]
        draw.text(((w - tw) // 2, h - box_h + 30), title, fill=(255, 255, 255), font=font)
        img.save(path, "JPEG", quality=92)
    except Exception as e:
        logger.warning("썸네일 오버레이 실패: %s", e)
  • Step 5: Run tests

Run: python -m pytest tests/test_video_thumb.py -v Expected: 3 PASS

  • Step 6: Commit
git add music-lab/app/pipeline/video.py music-lab/app/pipeline/thumb.py \
        music-lab/tests/test_video_thumb.py
git commit -m "feat(music-lab): pipeline 영상·썸네일 생성"

Task 5: 메타데이터 생성 (Claude Haiku)

Files:

  • Create: music-lab/app/pipeline/metadata.py

  • Test: music-lab/tests/test_metadata_generation.py

  • Step 1: Write failing test

# tests/test_metadata_generation.py
import pytest
import respx
from httpx import Response
from app.pipeline import metadata


@pytest.mark.asyncio
@respx.mock
async def test_metadata_calls_claude_and_parses_json(monkeypatch):
    monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
    payload = {
        "content": [{"type": "text", "text": '{"title":"[Lo-fi] Drive | 85BPM",'
                                              '"description":"chill","tags":["lofi","85bpm"],'
                                              '"category_id":10}'}]
    }
    respx.post("https://api.anthropic.com/v1/messages").mock(
        return_value=Response(200, json=payload)
    )
    result = await metadata.generate(
        track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
               "moods": ["chill"], "instruments": ["piano"]},
        template={"title": "[{genre}] {title} | {bpm}BPM",
                  "description": "{title}\n", "tags": [], "category_id": 10},
        trend_keywords=["lofi", "study"],
        feedback="",
    )
    assert result["title"].startswith("[Lo-fi]")
    assert "lofi" in result["tags"]


@pytest.mark.asyncio
@respx.mock
async def test_metadata_fallback_when_no_api_key(monkeypatch):
    monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
    result = await metadata.generate(
        track={"title": "Drive", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
               "moods": [], "instruments": []},
        template={"title": "[{genre}] {title} | {bpm}BPM",
                  "description": "{title}", "tags": ["lofi"], "category_id": 10},
        trend_keywords=[],
    )
    # 템플릿 변수 그대로 치환된 폴백
    assert result["title"] == "[lo-fi] Drive | 85BPM"
    assert result["used_fallback"] is True


@pytest.mark.asyncio
@respx.mock
async def test_metadata_includes_feedback_in_prompt(monkeypatch):
    monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
    captured = {}
    def hook(req):
        import json
        captured["body"] = json.loads(req.content)
        return Response(200, json={"content": [{"type": "text",
            "text": '{"title":"x","description":"y","tags":[],"category_id":10}'}]})
    respx.post("https://api.anthropic.com/v1/messages").mock(side_effect=hook)
    await metadata.generate(
        track={"title": "X", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
               "moods": [], "instruments": []},
        template={"title": "{title}", "description": "{title}", "tags": [], "category_id": 10},
        trend_keywords=[],
        feedback="제목을 짧게",
    )
    assert "제목을 짧게" in str(captured["body"])
  • Step 2: Run, verify fail

Run: python -m pytest tests/test_metadata_generation.py -v Expected: ImportError

  • Step 3: Implement metadata.py
"""메타데이터 생성 — Claude Haiku + 템플릿 폴백."""
import os
import json
import logging

import httpx

logger = logging.getLogger("music-lab.metadata")

ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
CLAUDE_MODEL = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001")
TIMEOUT_S = 30


async def generate(*, track: dict, template: dict, trend_keywords: list[str],
                   feedback: str = "") -> dict:
    """메타데이터 생성. 성공 시 LLM, 실패/미설정 시 템플릿 치환 폴백.

    반환: {"title", "description", "tags", "category_id", "used_fallback", "error"}
    """
    if not ANTHROPIC_API_KEY:
        return {**_fallback_template(track, template), "used_fallback": True, "error": "no api key"}

    try:
        result = await _call_claude(track, template, trend_keywords, feedback)
        return {**result, "used_fallback": False, "error": None}
    except Exception as e:
        logger.warning("메타데이터 LLM 실패 — 폴백: %s", e)
        return {**_fallback_template(track, template), "used_fallback": True, "error": str(e)}


def _fallback_template(track: dict, template: dict) -> dict:
    fmt_vars = {
        "title": track.get("title", ""),
        "genre": track.get("genre", ""),
        "bpm": track.get("bpm", ""),
        "key": track.get("key", ""),
        "scale": track.get("scale", ""),
    }
    title = template.get("title", "{title}").format(**fmt_vars)
    description = template.get("description", "{title}").format(**fmt_vars)
    return {
        "title": title[:100],
        "description": description[:5000],
        "tags": (template.get("tags") or [])[:15],
        "category_id": template.get("category_id", 10),
    }


async def _call_claude(track: dict, template: dict,
                        trend_keywords: list[str], feedback: str) -> dict:
    user_prompt = (
        "다음 트랙의 YouTube 메타데이터를 생성하세요. JSON으로만 응답.\n\n"
        f"트랙: {json.dumps(track, ensure_ascii=False)}\n"
        f"템플릿: {json.dumps(template, ensure_ascii=False)}\n"
        f"트렌드 키워드: {', '.join(trend_keywords)}\n"
    )
    if feedback:
        user_prompt += f"\n사용자 피드백: {feedback}\n"
    user_prompt += (
        '\n출력 JSON: {"title": "60자 이내", "description": "1000자 이내, 3-5문단",'
        ' "tags": ["15개 이내"], "category_id": 10}'
    )

    async with httpx.AsyncClient(timeout=TIMEOUT_S) as client:
        resp = await client.post(
            "https://api.anthropic.com/v1/messages",
            headers={
                "x-api-key": ANTHROPIC_API_KEY,
                "anthropic-version": "2023-06-01",
                "content-type": "application/json",
            },
            json={
                "model": CLAUDE_MODEL,
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": user_prompt}],
            },
        )
        resp.raise_for_status()
        text = resp.json()["content"][0]["text"]
        # 가장 첫 JSON 블록 추출
        start = text.find("{")
        end = text.rfind("}") + 1
        return json.loads(text[start:end])
  • Step 4: Run tests

Run: python -m pytest tests/test_metadata_generation.py -v Expected: 3 PASS

  • Step 5: Commit
git add music-lab/app/pipeline/metadata.py music-lab/tests/test_metadata_generation.py
git commit -m "feat(music-lab): pipeline 메타데이터 LLM 생성 + 폴백"

Task 6: AI 최종 검토 (4축)

Files:

  • Create: music-lab/app/pipeline/review.py

  • Test: music-lab/tests/test_review.py

  • Step 1: Write failing test

# tests/test_review.py
import pytest
import respx
from httpx import Response
from app.pipeline import review


@pytest.mark.asyncio
@respx.mock
async def test_review_returns_pass_when_above_threshold(monkeypatch):
    monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
    body = {"content": [{"type": "text", "text":
        '{"metadata_quality":{"score":80,"notes":"x"},'
        '"policy_compliance":{"score":90,"issues":[]},'
        '"viewer_experience":{"score":75,"notes":"y"},'
        '"trend_alignment":{"score":70,"matched_keywords":["lofi"]},'
        '"summary":"good"}'}]}
    respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body))
    result = await review.run_4_axis(
        pipeline={"id": 1}, track={"title": "x", "genre": "lo-fi", "bpm": 85},
        video_meta={"length_sec": 120, "resolution": "1920x1080"},
        metadata={"title": "Y", "description": "Z", "tags": ["lofi"], "category_id": 10},
        thumbnail_url="/m/x.jpg", trend_top=["lofi"],
        weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20},
        threshold=60,
    )
    assert result["verdict"] == "pass"
    expected_total = 0.25 * 80 + 0.30 * 90 + 0.25 * 75 + 0.20 * 70
    assert result["weighted_total"] == pytest.approx(expected_total, abs=0.01)


@pytest.mark.asyncio
@respx.mock
async def test_review_fail_below_threshold(monkeypatch):
    monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
    body = {"content": [{"type": "text", "text":
        '{"metadata_quality":{"score":40,"notes":"x"},'
        '"policy_compliance":{"score":50,"issues":[]},'
        '"viewer_experience":{"score":30,"notes":"y"},'
        '"trend_alignment":{"score":20,"matched_keywords":[]},'
        '"summary":"bad"}'}]}
    respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(200, json=body))
    result = await review.run_4_axis(
        pipeline={"id": 2}, track={"title": "x", "genre": "lo-fi", "bpm": 85},
        video_meta={"length_sec": 120, "resolution": "1920x1080"},
        metadata={"title": "Y", "description": "Z", "tags": [], "category_id": 10},
        thumbnail_url="/m/x.jpg", trend_top=[],
        weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20},
        threshold=60,
    )
    assert result["verdict"] == "fail"


@pytest.mark.asyncio
@respx.mock
async def test_review_heuristic_fallback_on_llm_error(monkeypatch):
    monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
    respx.post("https://api.anthropic.com/v1/messages").mock(return_value=Response(500))
    result = await review.run_4_axis(
        pipeline={"id": 3}, track={"title": "x", "genre": "lo-fi", "bpm": 85},
        video_meta={"length_sec": 120, "resolution": "1920x1080"},
        metadata={"title": "Y" * 30, "description": "Z" * 200, "tags": ["a", "b"], "category_id": 10},
        thumbnail_url="/m/x.jpg", trend_top=["lofi"],
        weights={"meta": 25, "policy": 30, "viewer": 25, "trend": 20},
        threshold=60,
    )
    assert result["used_fallback"] is True
    assert "weighted_total" in result
  • Step 2: Run, verify fail

Run: python -m pytest tests/test_review.py -v Expected: ImportError

  • Step 3: Implement review.py
"""AI 최종 검토 — 4축(메타/정책/시청/트렌드) 가중 평균."""
import os
import json
import logging

import httpx

logger = logging.getLogger("music-lab.review")

ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
CLAUDE_MODEL = os.getenv("CLAUDE_SONNET_MODEL", "claude-sonnet-4-6")
TIMEOUT_S = 60

POLICY_BANNED = {"f-word", "n-word"}  # 실제 단어는 운영 시 별도 파일로 — 데모용 자리


async def run_4_axis(*, pipeline: dict, track: dict, video_meta: dict,
                      metadata: dict, thumbnail_url: str, trend_top: list[str],
                      weights: dict, threshold: int) -> dict:
    if not ANTHROPIC_API_KEY:
        return _heuristic(metadata, video_meta, track, trend_top, weights, threshold,
                          fallback_reason="no api key")
    try:
        scores = await _call_claude(pipeline, track, video_meta, metadata,
                                    thumbnail_url, trend_top)
        return _weighted_verdict(scores, weights, threshold, used_fallback=False)
    except Exception as e:
        logger.warning("검토 LLM 실패 — 휴리스틱: %s", e)
        return _heuristic(metadata, video_meta, track, trend_top, weights, threshold,
                          fallback_reason=str(e))


def _weighted_verdict(scores: dict, weights: dict, threshold: int,
                       used_fallback: bool) -> dict:
    total = (
        weights["meta"]   / 100 * scores["metadata_quality"]["score"] +
        weights["policy"] / 100 * scores["policy_compliance"]["score"] +
        weights["viewer"] / 100 * scores["viewer_experience"]["score"] +
        weights["trend"]  / 100 * scores["trend_alignment"]["score"]
    )
    return {
        **scores,
        "weighted_total": round(total, 2),
        "verdict": "pass" if total >= threshold else "fail",
        "used_fallback": used_fallback,
    }


async def _call_claude(pipeline, track, video_meta, metadata, thumbnail_url, trend_top):
    user = (
        "트랙·영상·메타데이터를 4축으로 평가하고 JSON만 응답:\n"
        f"트랙: {json.dumps(track, ensure_ascii=False)}\n"
        f"영상: {json.dumps(video_meta)}\n"
        f"메타: {json.dumps(metadata, ensure_ascii=False)}\n"
        f"썸네일: {thumbnail_url}\n"
        f"트렌드: {trend_top}\n"
        '출력: {"metadata_quality":{"score":0-100,"notes":""},'
        '"policy_compliance":{"score":0-100,"issues":[]},'
        '"viewer_experience":{"score":0-100,"notes":""},'
        '"trend_alignment":{"score":0-100,"matched_keywords":[]},'
        '"summary":""}'
    )
    async with httpx.AsyncClient(timeout=TIMEOUT_S) as client:
        resp = await client.post(
            "https://api.anthropic.com/v1/messages",
            headers={
                "x-api-key": ANTHROPIC_API_KEY,
                "anthropic-version": "2023-06-01",
                "content-type": "application/json",
            },
            json={"model": CLAUDE_MODEL, "max_tokens": 1024,
                  "messages": [{"role": "user", "content": user}]},
        )
        resp.raise_for_status()
        text = resp.json()["content"][0]["text"]
        return json.loads(text[text.find("{"):text.rfind("}")+1])


def _heuristic(metadata, video_meta, track, trend_top, weights, threshold, fallback_reason):
    # 메타: 길이·태그 카운트
    title_len = len(metadata.get("title", ""))
    desc_len = len(metadata.get("description", ""))
    tag_n = len(metadata.get("tags", []))
    meta_score = 100 if 5 <= title_len <= 60 and 50 <= desc_len <= 1000 and 5 <= tag_n <= 15 else 50

    # 정책: 금칙어 매치
    text_blob = (metadata.get("title", "") + metadata.get("description", "")).lower()
    policy_score = 100 if not any(w in text_blob for w in POLICY_BANNED) else 30

    # 시청: 영상 길이가 트랙과 큰 차이 없는지 휴리스틱(±5초)
    expected = track.get("duration_sec", video_meta.get("length_sec", 0))
    delta = abs(video_meta.get("length_sec", 0) - expected)
    viewer_score = 90 if delta <= 5 else 60

    # 트렌드: 태그가 트렌드와 겹치는지
    overlap = set(metadata.get("tags", [])) & set(trend_top)
    trend_score = 100 if overlap else 40

    scores = {
        "metadata_quality": {"score": meta_score, "notes": "휴리스틱"},
        "policy_compliance": {"score": policy_score, "issues": []},
        "viewer_experience": {"score": viewer_score, "notes": "휴리스틱"},
        "trend_alignment": {"score": trend_score, "matched_keywords": list(overlap)},
        "summary": f"휴리스틱 fallback: {fallback_reason}",
    }
    return _weighted_verdict(scores, weights, threshold, used_fallback=True)
  • Step 4: Run tests

Run: python -m pytest tests/test_review.py -v Expected: 3 PASS

  • Step 5: Commit
git add music-lab/app/pipeline/review.py music-lab/tests/test_review.py
git commit -m "feat(music-lab): pipeline 4축 AI 검토 + 휴리스틱 폴백"

Task 7: YouTube OAuth + Upload

Files:

  • Create: music-lab/app/pipeline/youtube.py

  • Test: music-lab/tests/test_youtube_upload.py

  • Modify: music-lab/requirements.txt (google-api-python-client>=2.100, google-auth-oauthlib>=1.2)

  • Step 1: Add deps

requirements.txt:

google-api-python-client>=2.100
google-auth-oauthlib>=1.2
google-auth-httplib2>=0.2
  • Step 2: Write failing test
# tests/test_youtube_upload.py
import pytest
from unittest.mock import patch, MagicMock
from app.pipeline import youtube


def _setup_token(monkeypatch, db, refresh="r1", access="a1"):
    db.upsert_oauth_token(
        channel_id="UC1", channel_title="t", avatar_url=None,
        refresh_token=refresh, access_token=access, expires_at="2099-01-01T00:00:00",
    )


@pytest.fixture
def fresh_db(monkeypatch, tmp_path):
    from app import db
    monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
    db.init_db()
    return db


@patch("app.pipeline.youtube._build_youtube_client")
def test_upload_succeeds_after_resumable(mock_client, fresh_db, tmp_path, monkeypatch):
    monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid")
    monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec")
    _setup_token(monkeypatch, fresh_db)

    yt = MagicMock()
    insert = MagicMock()
    insert.next_chunk.side_effect = [(None, None), (None, {"id": "VID123"})]
    yt.videos().insert.return_value = insert
    mock_client.return_value = yt

    video_path = tmp_path / "v.mp4"
    video_path.write_bytes(b"\x00")
    out = youtube.upload_video(
        video_path=str(video_path),
        thumbnail_path=None,
        metadata={"title": "T", "description": "D", "tags": ["x"], "category_id": 10},
        privacy="private",
    )
    assert out["video_id"] == "VID123"


@patch("app.pipeline.youtube._build_youtube_client")
def test_upload_no_token_raises(mock_client, fresh_db, tmp_path):
    video_path = tmp_path / "v.mp4"
    video_path.write_bytes(b"\x00")
    with pytest.raises(youtube.NotAuthenticatedError):
        youtube.upload_video(
            video_path=str(video_path), thumbnail_path=None,
            metadata={"title":"T","description":"D","tags":[],"category_id":10},
            privacy="private",
        )


@patch("app.pipeline.youtube._build_youtube_client")
def test_upload_quota_exceeded_marks_quota(mock_client, fresh_db, tmp_path, monkeypatch):
    from googleapiclient.errors import HttpError
    monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_ID", "cid")
    monkeypatch.setenv("YOUTUBE_OAUTH_CLIENT_SECRET", "sec")
    _setup_token(monkeypatch, fresh_db)

    yt = MagicMock()
    err = HttpError(MagicMock(status=403), b'{"error":{"errors":[{"reason":"quotaExceeded"}]}}')
    yt.videos().insert.return_value.next_chunk.side_effect = err
    mock_client.return_value = yt

    video_path = tmp_path / "v.mp4"
    video_path.write_bytes(b"\x00")
    with pytest.raises(youtube.QuotaExceededError):
        youtube.upload_video(
            video_path=str(video_path), thumbnail_path=None,
            metadata={"title":"T","description":"D","tags":[],"category_id":10},
            privacy="private",
        )
  • Step 3: Run, verify fail

Run: python -m pytest tests/test_youtube_upload.py -v Expected: ImportError

  • Step 4: Implement youtube.py
"""YouTube OAuth flow + resumable 업로드."""
import os
import logging
from urllib.parse import urlencode

import httpx
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.errors import HttpError

from app import db

logger = logging.getLogger("music-lab.youtube")

CLIENT_ID = os.getenv("YOUTUBE_OAUTH_CLIENT_ID", "")
CLIENT_SECRET = os.getenv("YOUTUBE_OAUTH_CLIENT_SECRET", "")
REDIRECT_URI = os.getenv("YOUTUBE_OAUTH_REDIRECT_URI", "")
SCOPES = ["https://www.googleapis.com/auth/youtube.upload",
          "https://www.googleapis.com/auth/youtube.readonly"]


class NotAuthenticatedError(Exception):
    pass


class QuotaExceededError(Exception):
    pass


def get_auth_url() -> str:
    if not CLIENT_ID or not REDIRECT_URI:
        raise RuntimeError("OAuth 환경변수 미설정")
    params = {
        "client_id": CLIENT_ID,
        "redirect_uri": REDIRECT_URI,
        "response_type": "code",
        "scope": " ".join(SCOPES),
        "access_type": "offline",
        "prompt": "consent",
    }
    return "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode(params)


async def exchange_code(code: str) -> dict:
    """code → refresh_token + access_token + 채널 정보 → DB 저장."""
    async with httpx.AsyncClient(timeout=30) as client:
        token_resp = await client.post(
            "https://oauth2.googleapis.com/token",
            data={
                "code": code,
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "redirect_uri": REDIRECT_URI,
                "grant_type": "authorization_code",
            },
        )
        token_resp.raise_for_status()
        tok = token_resp.json()
    access = tok["access_token"]
    refresh = tok["refresh_token"]
    expires_at = _expiry_from_seconds(tok["expires_in"])

    # 채널 정보 조회
    creds = _creds(access=access, refresh=refresh)
    yt = build("youtube", "v3", credentials=creds, cache_discovery=False)
    ch = yt.channels().list(part="snippet", mine=True).execute()
    item = ch["items"][0]
    db.upsert_oauth_token(
        channel_id=item["id"],
        channel_title=item["snippet"]["title"],
        avatar_url=item["snippet"]["thumbnails"]["default"]["url"],
        refresh_token=refresh, access_token=access, expires_at=expires_at,
    )
    return {"channel_id": item["id"], "channel_title": item["snippet"]["title"]}


def get_status() -> dict | None:
    tok = db.get_oauth_token()
    if not tok:
        return None
    return {
        "channel_id": tok["channel_id"],
        "channel_title": tok["channel_title"],
        "avatar_url": tok["avatar_url"],
    }


def disconnect() -> None:
    db.delete_oauth_token()


def upload_video(*, video_path: str, thumbnail_path: str | None,
                  metadata: dict, privacy: str) -> dict:
    tok = db.get_oauth_token()
    if not tok:
        raise NotAuthenticatedError("YouTube 인증 없음")
    creds = _creds(access=tok["access_token"], refresh=tok["refresh_token"])
    yt = _build_youtube_client(creds)

    body = {
        "snippet": {
            "title": metadata["title"],
            "description": metadata["description"],
            "tags": metadata.get("tags", []),
            "categoryId": str(metadata.get("category_id", 10)),
        },
        "status": {"privacyStatus": privacy, "selfDeclaredMadeForKids": False},
    }
    media = MediaFileUpload(video_path, chunksize=4 * 1024 * 1024, resumable=True, mimetype="video/mp4")
    req = yt.videos().insert(part="snippet,status", body=body, media_body=media)

    try:
        response = None
        while response is None:
            status, response = req.next_chunk()
        video_id = response["id"]
    except HttpError as e:
        if b"quotaExceeded" in e.content:
            raise QuotaExceededError(str(e))
        raise

    if thumbnail_path:
        try:
            yt.thumbnails().set(videoId=video_id, media_body=thumbnail_path).execute()
        except HttpError as e:
            logger.warning("썸네일 업로드 실패: %s", e)

    return {"video_id": video_id}


def _build_youtube_client(creds):  # patch 포인트
    return build("youtube", "v3", credentials=creds, cache_discovery=False)


def _creds(access: str, refresh: str) -> Credentials:
    return Credentials(
        token=access, refresh_token=refresh,
        token_uri="https://oauth2.googleapis.com/token",
        client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES,
    )


def _expiry_from_seconds(secs: int) -> str:
    from datetime import datetime, timedelta
    return (datetime.utcnow() + timedelta(seconds=secs)).isoformat(timespec="seconds")
  • Step 5: Run tests

Run: python -m pytest tests/test_youtube_upload.py -v Expected: 3 PASS

  • Step 6: Commit
git add music-lab/app/pipeline/youtube.py music-lab/tests/test_youtube_upload.py \
        music-lab/requirements.txt
git commit -m "feat(music-lab): YouTube OAuth + resumable 업로드"

Task 8: 오케스트레이터 + 13개 엔드포인트

Files:

  • Create: music-lab/app/pipeline/orchestrator.py
  • Modify: music-lab/app/main.py
  • Test: music-lab/tests/test_pipeline_endpoints.py

이 task는 코드량이 가장 많음. 엔드포인트 12개 + orchestrator의 BackgroundTask 분기.

  • Step 1: Write failing test
# tests/test_pipeline_endpoints.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app import db


@pytest.fixture
def client(monkeypatch, tmp_path):
    monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
    db.init_db()
    # 최소 트랙 1개 (라이브러리)
    from app.db import save_track
    save_track({"id": 1, "title": "T", "genre": "lo-fi", "bpm": 85, "key": "C", "scale": "minor",
                 "moods": [], "instruments": [], "audio_url": "/x.mp3", "duration_sec": 120})
    return TestClient(app)


def test_create_pipeline(client):
    r = client.post("/api/music/pipeline", json={"track_id": 1})
    assert r.status_code == 201
    assert r.json()["state"] == "created"


def test_create_duplicate_pipeline_returns_409(client):
    client.post("/api/music/pipeline", json={"track_id": 1})
    r = client.post("/api/music/pipeline", json={"track_id": 1})
    assert r.status_code == 409


def test_get_pipeline_returns_jobs_and_feedback(client):
    pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
    r = client.get(f"/api/music/pipeline/{pid}")
    assert "jobs" in r.json()
    assert "feedback" in r.json()


def test_list_pipelines_active_filter(client):
    pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
    db.update_pipeline_state(pid, "published")
    r = client.get("/api/music/pipeline?status=active")
    assert all(p["state"] != "published" for p in r.json()["pipelines"])


def test_feedback_approve_advances_state(client):
    pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
    db.update_pipeline_state(pid, "cover_pending", cover_url="/m/x.jpg")
    r = client.post(f"/api/music/pipeline/{pid}/feedback",
                     json={"step": "cover", "intent": "approve"})
    assert r.status_code == 202
    after = db.get_pipeline(pid)
    # video_pending이 BackgroundTask로 진입 후 결과는 mock 환경에서 즉시 안 보일 수 있음
    # 최소 cover_approved 또는 video_pending 단계 확인
    assert after["state"] in ("cover_approved", "video_pending", "video_running")


def test_feedback_reject_records_feedback_and_increments_count(client):
    pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
    db.update_pipeline_state(pid, "cover_pending")
    r = client.post(f"/api/music/pipeline/{pid}/feedback",
                     json={"step": "cover", "intent": "reject", "feedback_text": "더 어둡게"})
    assert r.status_code == 202
    p = db.get_pipeline(pid)
    assert p["feedback_count_per_step"]["cover"] == 1
    history = db.get_feedback_history(pid)
    assert history[0]["feedback_text"] == "더 어둡게"


def test_feedback_after_5_rejects_marks_awaiting_manual(client):
    pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
    db.update_pipeline_state(pid, "cover_pending")
    for i in range(5):
        client.post(f"/api/music/pipeline/{pid}/feedback",
                     json={"step": "cover", "intent": "reject", "feedback_text": f"again {i}"})
    r = client.post(f"/api/music/pipeline/{pid}/feedback",
                     json={"step": "cover", "intent": "reject", "feedback_text": "6th"})
    assert r.status_code == 409
    assert db.get_pipeline(pid)["state"] == "awaiting_manual"


def test_cancel_pipeline(client):
    pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
    r = client.post(f"/api/music/pipeline/{pid}/cancel")
    assert r.status_code == 200
    assert db.get_pipeline(pid)["state"] == "cancelled"
  • Step 2: Run, verify fail

Run: python -m pytest tests/test_pipeline_endpoints.py -v Expected: 404 또는 collection error

  • Step 3: Implement orchestrator.py
"""파이프라인 오케스트레이터 — 단계별 BackgroundTask 등록 및 산출물 → DB 반영."""
import asyncio
import json
import logging
from typing import Optional

from app import db
from . import cover, video, thumb, metadata, review, youtube

logger = logging.getLogger("music-lab.orchestrator")

REVIEW_WEIGHTS_DEFAULT = {"meta": 25, "policy": 30, "viewer": 25, "trend": 20}


async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
    """단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이.

    호출 직후 _running 상태로 전환, 끝나면 _pending(사용자 게이트) 또는 자동 다음.
    실패 시 failed 상태 + reason.
    """
    job_id = db.create_pipeline_job(pipeline_id, step)
    db.update_pipeline_job(job_id, status="running")
    p = db.get_pipeline(pipeline_id)
    track = _get_track(p["track_id"])

    try:
        if step == "cover":
            result = await _run_cover(p, track, feedback)
        elif step == "video":
            result = await _run_video(p, track)
        elif step == "thumb":
            result = await _run_thumb(p, track, feedback)
        elif step == "meta":
            result = await _run_meta(p, track, feedback)
        elif step == "review":
            result = await _run_review(p, track)
        elif step == "publish":
            result = await _run_publish(p, track)
        else:
            raise ValueError(f"unknown step: {step}")
        db.update_pipeline_job(job_id, status="succeeded")
        db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
    except Exception as e:
        logger.exception("step %s failed for pipeline %s", step, pipeline_id)
        db.update_pipeline_job(job_id, status="failed", error=str(e))
        db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")


def _get_track(track_id: int) -> dict:
    # tracks 테이블 스키마는 기존 music-lab — 여기서는 dict로 추정
    t = db.get_track(track_id)
    if not t:
        raise ValueError(f"트랙 {track_id} 없음")
    return t


async def _run_cover(p, track, feedback):
    setup = db.get_youtube_setup()
    prompts = setup["cover_prompts"]
    template = prompts.get(track["genre"].lower(), prompts.get("default", ""))
    out = await cover.generate(
        pipeline_id=p["id"], genre=track["genre"], prompt_template=template,
        mood=", ".join(track.get("moods", [])), track_title=track["title"], feedback=feedback,
    )
    return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}}


async def _run_video(p, track):
    setup = db.get_youtube_setup()
    vd = setup["visual_defaults"]
    audio_path = _local_path(track["audio_url"])  # /media/... → /data/...
    cover_path = _local_path(p["cover_url"])
    out = video.generate(
        pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path,
        genre=track["genre"], duration_sec=track.get("duration_sec", 120),
        resolution=vd["resolution"], style=vd["style"],
    )
    return {"next_state": "video_pending", "fields": {"video_url": out["url"]}}


async def _run_thumb(p, track, feedback):
    video_path = _local_path(p["video_url"])
    out = thumb.generate(pipeline_id=p["id"], video_path=video_path,
                          track_title=track["title"], overlay_text=True)
    return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}}


async def _run_meta(p, track, feedback):
    setup = db.get_youtube_setup()
    trend_top = _get_trend_top()
    out = await metadata.generate(
        track=track, template=setup["metadata_template"],
        trend_keywords=trend_top, feedback=feedback,
    )
    return {"next_state": "meta_pending",
            "fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}}


async def _run_review(p, track):
    setup = db.get_youtube_setup()
    meta = json.loads(p["metadata_json"])
    result = await review.run_4_axis(
        pipeline=p, track=track,
        video_meta={"length_sec": track.get("duration_sec", 120),
                    "resolution": setup["visual_defaults"]["resolution"]},
        metadata=meta, thumbnail_url=p["thumbnail_url"],
        trend_top=_get_trend_top(),
        weights=setup["review_weights"], threshold=setup["review_threshold"],
    )
    return {"next_state": "publish_pending",
            "fields": {"review_json": json.dumps(result, ensure_ascii=False)}}


async def _run_publish(p, track):
    setup = db.get_youtube_setup()
    meta = json.loads(p["metadata_json"])
    privacy = setup["publish_policy"].get("privacy", "private")
    result = youtube.upload_video(
        video_path=_local_path(p["video_url"]),
        thumbnail_path=_local_path(p["thumbnail_url"]),
        metadata=meta, privacy=privacy,
    )
    return {"next_state": "published",
            "fields": {"youtube_video_id": result["video_id"]}}


def _local_path(media_url: str) -> str:
    """ /media/videos/123/cover.jpg → /app/data/videos/123/cover.jpg """
    import os
    base_media = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")
    base_data  = os.getenv("VIDEO_DATA_DIR",   "/app/data/videos")
    if media_url.startswith(base_media):
        return media_url.replace(base_media, base_data, 1)
    # /media/music/abc.mp3 → /app/data/music/abc.mp3
    return media_url.replace("/media/", "/app/data/", 1)


def _get_trend_top(n: int = 10) -> list[str]:
    try:
        rows = db.get_market_trends(limit=n)  # 기존 market_trends 테이블
        return [r["genre"] for r in rows]
    except Exception:
        return []
  • Step 4: Add endpoints to main.py

app/main.py에 다음 추가 (기존 endpoints 아래):

from fastapi import BackgroundTasks
from pydantic import BaseModel
from app.pipeline import orchestrator, youtube as yt_module
from app.pipeline.state_machine import (
    next_state_on_approve, next_state_on_reject, USER_GATES,
)


class PipelineCreate(BaseModel):
    track_id: int


class FeedbackRequest(BaseModel):
    step: str
    intent: str            # approve | reject
    feedback_text: str | None = None


@app.post("/api/music/pipeline", status_code=201)
def create_pipeline(req: PipelineCreate):
    # 동일 트랙 활성 파이프라인 중복 방지
    actives = db.list_pipelines(active_only=True)
    if any(p["track_id"] == req.track_id for p in actives):
        raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다")
    pid = db.create_pipeline(req.track_id)
    return db.get_pipeline(pid)


@app.get("/api/music/pipeline")
def list_pipelines_endpoint(status: str = "all"):
    pipelines = db.list_pipelines(active_only=(status == "active"))
    return {"pipelines": pipelines}


@app.get("/api/music/pipeline/{pid}")
def get_pipeline_endpoint(pid: int):
    p = db.get_pipeline(pid)
    if not p:
        raise HTTPException(404)
    p["jobs"] = db.list_pipeline_jobs(pid)
    p["feedback"] = db.get_feedback_history(pid)
    return p


@app.post("/api/music/pipeline/{pid}/start", status_code=202)
async def start_pipeline(pid: int, bg: BackgroundTasks):
    p = db.get_pipeline(pid)
    if not p:
        raise HTTPException(404)
    if p["state"] != "created":
        raise HTTPException(409, f"이미 시작됨 ({p['state']})")
    bg.add_task(orchestrator.run_step, pid, "cover")
    return {"ok": True}


@app.post("/api/music/pipeline/{pid}/feedback", status_code=202)
async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks):
    p = db.get_pipeline(pid)
    if not p:
        raise HTTPException(404)
    if p["state"] == "awaiting_manual":
        raise HTTPException(409, "수동 개입 대기 중")
    state = p["state"]
    expected = f"{req.step}_pending"
    if state != expected:
        # 멱등 처리 — 이미 다음 단계로 넘어갔으면 무시
        return {"ok": True, "skipped": True}

    if req.intent == "approve":
        next_st = next_state_on_approve(state)
        db.update_pipeline_state(pid, next_st)
        # 다음 단계가 *_pending이면 자동 생성 트리거; ai_review/publishing도 자동 트리거
        next_step = _state_to_step(next_st)
        if next_step:
            bg.add_task(orchestrator.run_step, pid, next_step)
        return {"ok": True}

    elif req.intent == "reject":
        count = db.increment_feedback_count(pid, req.step)
        if count > 5:
            db.update_pipeline_state(pid, "awaiting_manual")
            raise HTTPException(409, "재생성 한도 초과")
        if req.feedback_text:
            db.record_feedback(pid, req.step, req.feedback_text)
        bg.add_task(orchestrator.run_step, pid, req.step, req.feedback_text or "")
        return {"ok": True}

    else:
        raise HTTPException(400, f"unknown intent: {req.intent}")


def _state_to_step(state: str) -> str | None:
    return {
        "video_pending":   "video",
        "thumb_pending":   "thumb",
        "meta_pending":    "meta",
        "ai_review":       "review",
        "publish_pending": None,    # 사용자 명시 발행 호출 필요
        "publishing":      "publish",
    }.get(state)


@app.post("/api/music/pipeline/{pid}/cancel")
def cancel_pipeline(pid: int):
    p = db.get_pipeline(pid)
    if not p:
        raise HTTPException(404)
    db.update_pipeline_state(pid, "cancelled")
    return {"ok": True}


@app.post("/api/music/pipeline/{pid}/publish", status_code=202)
async def publish_pipeline(pid: int, bg: BackgroundTasks):
    p = db.get_pipeline(pid)
    if not p:
        raise HTTPException(404)
    if p["state"] != "publish_pending":
        raise HTTPException(409, f"발행 단계 아님 ({p['state']})")
    db.update_pipeline_state(pid, "publishing")
    bg.add_task(orchestrator.run_step, pid, "publish")
    return {"ok": True}


# --- Setup ---

class SetupRequest(BaseModel):
    metadata_template: dict | None = None
    cover_prompts: dict | None = None
    review_weights: dict | None = None
    review_threshold: int | None = None
    visual_defaults: dict | None = None
    publish_policy: dict | None = None


@app.get("/api/music/setup")
def get_setup():
    return db.get_youtube_setup()


@app.put("/api/music/setup")
def put_setup(req: SetupRequest):
    db.update_youtube_setup(**{k: v for k, v in req.dict().items() if v is not None})
    return db.get_youtube_setup()


# --- YouTube OAuth ---

@app.get("/api/music/youtube/auth-url")
def youtube_auth_url():
    return {"url": yt_module.get_auth_url()}


@app.get("/api/music/youtube/callback")
async def youtube_callback(code: str):
    info = await yt_module.exchange_code(code)
    return info


@app.post("/api/music/youtube/disconnect")
def youtube_disconnect():
    yt_module.disconnect()
    return {"ok": True}


@app.get("/api/music/youtube/status")
def youtube_status():
    return yt_module.get_status() or {"connected": False}
  • Step 5: Run tests

Run: python -m pytest tests/test_pipeline_endpoints.py -v Expected: 8 PASS (BackgroundTask는 TestClient에서 동기 실행 — 테스트는 mock orchestrator 또는 첫 단계만 검증)

참고: BackgroundTasks가 실제 cover.generate를 호출하면 OPENAI_API_KEY 없이 그라데이션 폴백을 만든다. 테스트에서는 monkeypatch.setattr(orchestrator, "run_step", AsyncMock())로 우회해도 됨. 위 테스트는 상태 검증 위주라 OK.

  • Step 6: Commit
git add music-lab/app/pipeline/orchestrator.py music-lab/app/main.py \
        music-lab/tests/test_pipeline_endpoints.py
git commit -m "feat(music-lab): pipeline 오케스트레이터 + 13개 엔드포인트"

Task 9: agent-office 자연어 의도 분류

Files:

  • Create: agent-office/app/agents/classify_intent.py

  • Test: agent-office/tests/test_classify_intent.py

  • Step 1: Write failing test

# tests/test_classify_intent.py
import pytest
import respx
from httpx import Response
from app.agents import classify_intent as ci


def test_clear_approve_no_llm(monkeypatch):
    called = {"n": 0}
    monkeypatch.setattr(ci, "_llm_classify", lambda t: (called.update(n=called["n"]+1), ("unclear", None))[1])
    assert ci.classify("승인") == ("approve", None)
    assert ci.classify("OK") == ("approve", None)
    assert ci.classify("진행") == ("approve", None)
    assert ci.classify("agree") == ("approve", None)
    assert called["n"] == 0


def test_clear_reject_only_no_llm(monkeypatch):
    monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None))
    assert ci.classify("반려") == ("reject", None)
    assert ci.classify("거절") == ("reject", None)


def test_reject_with_text_split(monkeypatch):
    monkeypatch.setattr(ci, "_llm_classify", lambda t: ("unclear", None))
    intent, fb = ci.classify("반려, 제목 짧게")
    assert intent == "reject"
    assert "제목 짧게" in fb


@respx.mock
def test_ambiguous_calls_llm(monkeypatch):
    monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
    respx.post("https://api.anthropic.com/v1/messages").mock(
        return_value=Response(200, json={"content": [{"type": "text",
            "text": '{"intent":"reject","feedback":"좀 더 화려하게"}'}]})
    )
    intent, fb = ci.classify("음... 좀 더 화려한 분위기가 좋겠어")
    assert intent == "reject"
    assert "화려하게" in fb
  • Step 2: Run, verify fail

Run: cd agent-office && python -m pytest tests/test_classify_intent.py -v Expected: ImportError

  • Step 3: Implement classify_intent.py
"""텔레그램 사용자 응답 자연어 분류 — 화이트리스트 우선, 모호 시 LLM."""
import os
import json
import logging
import httpx

logger = logging.getLogger("agent-office.classify_intent")

ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
CLAUDE_HAIKU = os.getenv("CLAUDE_HAIKU_MODEL", "claude-haiku-4-5-20251001")

APPROVE_WORDS = {
    "승인", "시작", "진행", "ok", "okay", "agree",
    "네", "예", "좋아", "좋아요", "go", "yes", "y",
}
REJECT_WORDS = {"반려", "거절", "취소", "no", "nope", "n"}


def classify(text: str) -> tuple[str, str | None]:
    """returns (intent, feedback) — intent ∈ {approve, reject, unclear}"""
    if not text:
        return ("unclear", None)
    t = text.strip().lower()
    if t in APPROVE_WORDS:
        return ("approve", None)
    if t in REJECT_WORDS:
        return ("reject", None)
    # 반려 단어로 시작 + 추가 텍스트
    for w in REJECT_WORDS:
        if t.startswith(w):
            rest = text.strip()[len(w):].lstrip(" ,.-:").strip()
            if rest:
                return ("reject", rest)
    # 승인 단어로 시작 + 추가 텍스트(추가 텍스트 무시)
    for w in APPROVE_WORDS:
        if t.startswith(w + " ") or t == w:
            return ("approve", None)
    return _llm_classify(text)


def _llm_classify(text: str) -> tuple[str, str | None]:
    if not ANTHROPIC_API_KEY:
        return ("unclear", None)
    prompt = (
        "사용자 응답을 분류하세요. JSON으로만 응답.\n"
        f'응답: "{text}"\n\n'
        '출력: {"intent":"approve|reject|unclear","feedback":"반려면 수정 방향, 아니면 빈 문자열"}'
    )
    try:
        resp = httpx.post(
            "https://api.anthropic.com/v1/messages",
            headers={"x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01"},
            json={"model": CLAUDE_HAIKU, "max_tokens": 200,
                  "messages": [{"role": "user", "content": prompt}]},
            timeout=15,
        )
        resp.raise_for_status()
        text_out = resp.json()["content"][0]["text"]
        data = json.loads(text_out[text_out.find("{"):text_out.rfind("}")+1])
        return (data.get("intent", "unclear"), data.get("feedback") or None)
    except Exception as e:
        logger.warning("LLM 분류 실패: %s", e)
        return ("unclear", None)
  • Step 4: Run tests

Run: python -m pytest tests/test_classify_intent.py -v Expected: 4 PASS

  • Step 5: Commit
git add agent-office/app/agents/classify_intent.py agent-office/tests/test_classify_intent.py
git commit -m "feat(agent-office): 텔레그램 자연어 의도 분류"

Task 10: youtube_publisher 에이전트 + 폴링 잡

Files:

  • Create: agent-office/app/agents/youtube_publisher.py

  • Modify: agent-office/app/agents/__init__.py

  • Modify: agent-office/app/scheduler.py

  • Modify: agent-office/app/service_proxy.py

  • Modify: agent-office/app/telegram/conversational.py

  • Test: agent-office/tests/test_pipeline_polling.py

  • Step 1: Add service_proxy helpers

service_proxy.py에 추가:

async def list_active_pipelines() -> list[dict]:
    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=active")
        resp.raise_for_status()
        return resp.json()["pipelines"]


async def get_pipeline(pid: int) -> dict:
    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}")
        resp.raise_for_status()
        return resp.json()


async def post_pipeline_feedback(pid: int, step: str, intent: str,
                                   feedback_text: str | None = None) -> dict:
    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.post(
            f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/feedback",
            json={"step": step, "intent": intent, "feedback_text": feedback_text},
        )
        resp.raise_for_status()
        return resp.json()
  • Step 2: Implement youtube_publisher
# agents/youtube_publisher.py
"""텔레그램 단일 채널로 단계별 승인 인터랙션 오케스트레이션."""
import logging
from typing import Optional

from .base import BaseAgent
from . import classify_intent
from .. import service_proxy
from ..db import add_log
from ..telegram.messaging import send_raw

logger = logging.getLogger("agent-office.youtube_publisher")


_STEP_TITLES = {
    "cover_pending": ("커버 아트", "cover"),
    "video_pending": ("영상 비주얼", "video"),
    "thumb_pending": ("썸네일", "thumb"),
    "meta_pending":  ("메타데이터", "meta"),
    "publish_pending": ("최종 검토 + 발행", "publish"),
}


class YoutubePublisherAgent(BaseAgent):
    agent_id = "youtube_publisher"
    display_name = "YouTube 퍼블리셔"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._notified_state_per_pipeline: dict[int, str] = {}

    async def poll_state_changes(self) -> None:
        """30초마다 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
        try:
            pipelines = await service_proxy.list_active_pipelines()
        except Exception as e:
            logger.warning("폴링 실패: %s", e)
            return

        for p in pipelines:
            state = p["state"]
            pid = p["id"]
            if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state:
                await self._notify_step(p)
                self._notified_state_per_pipeline[pid] = state

    async def _notify_step(self, pipeline: dict) -> None:
        state = pipeline["state"]
        title_name, step = _STEP_TITLES[state]
        body = self._format_body(pipeline, step)
        sent = await send_raw(
            text=f"🎵 [{pipeline.get('track_title', 'Pipeline')}] {title_name} 검토\n\n{body}\n\n"
                 f"➡️ 답장으로 알려주세요: '승인' 또는 '반려 + 수정 방향'",
            metadata={"pipeline_id": pipeline["id"], "step": step},
        )
        if sent.get("ok"):
            add_log(self.agent_id, f"pipeline {pipeline['id']} {step} 알림 전송", "info")

    def _format_body(self, p: dict, step: str) -> str:
        if step == "cover":
            return f"🖼️ 커버: {p.get('cover_url', '-')}"
        if step == "video":
            return f"🎬 영상: {p.get('video_url', '-')}"
        if step == "thumb":
            return f"🎴 썸네일: {p.get('thumbnail_url', '-')}"
        if step == "meta":
            m = p.get("metadata", {})
            return (f"📝 제목: {m.get('title','')}\n"
                    f"🏷️ 태그: {', '.join(m.get('tags', [])[:8])}\n"
                    f"📄 설명(앞부분): {(m.get('description','') or '')[:200]}")
        if step == "publish":
            r = p.get("review", {})
            return (f"AI 검토 결과: {r.get('verdict','?')} "
                    f"(가중 {r.get('weighted_total','?')}/100)\n"
                    f"{r.get('summary','')}")
        return ""

    async def on_telegram_reply(self, pipeline_id: int, step: str, user_text: str) -> None:
        intent, feedback = classify_intent.classify(user_text)
        if intent == "unclear":
            await send_raw("다시 입력해주세요. 예: '승인' 또는 '반려, 제목 짧게'")
            return
        try:
            await service_proxy.post_pipeline_feedback(pipeline_id, step, intent, feedback)
        except Exception as e:
            await send_raw(f"⚠️ 처리 실패: {e}")

    async def on_schedule(self) -> None:
        # 폴링은 스케줄러에서 호출
        await self.poll_state_changes()

    async def on_command(self, command: str, params: dict) -> dict:
        return {"ok": False, "message": f"Unknown command: {command}"}

    async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
        pass
  • Step 3: Register agent + scheduler

agents/__init__.py에 등록:

from .youtube_publisher import YoutubePublisherAgent

AGENT_REGISTRY = {
    # ... existing ...
    "youtube_publisher": YoutubePublisherAgent(...),
}

scheduler.py에 추가:

async def _poll_pipelines():
    agent = AGENT_REGISTRY.get("youtube_publisher")
    if agent:
        await agent.poll_state_changes()


def init_scheduler():
    # ... existing jobs ...
    scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")
    scheduler.start()
  • Step 4: Telegram reply routing

telegram/conversational.py에서 reply 메시지 수신 시:

# 기존 핸들러에 추가
async def handle_reply(message: dict) -> None:
    reply_to = message.get("reply_to_message", {})
    msg_id = reply_to.get("message_id")
    if not msg_id:
        return
    # DB의 last_telegram_msg_ids에서 pipeline_id, step 찾기
    # (이는 messaging.send_raw가 metadata에 저장해두는 게 깔끔하지만,
    #  현재 스펙은 단순하게 매칭 — 또는 DB 별도 테이블 telegram_message_links 추가)
    from .. import service_proxy
    from ..agents import AGENT_REGISTRY

    # 단순 구현: pipeline_id/step을 메시지 캡션 또는 DB에서 조회
    link = _lookup_message_link(msg_id)
    if not link:
        return
    agent = AGENT_REGISTRY.get("youtube_publisher")
    await agent.on_telegram_reply(link["pipeline_id"], link["step"], message.get("text", ""))

Note

: _lookup_message_link는 별도 테이블(telegram_message_links) 또는 video_pipelines.last_telegram_msg_ids JSON 사용. 본 task에선 후자 사용 — 추가 마이그레이션 없이 기존 컬럼 활용.

messaging.send_raw 호출 후 반환된 message_id를 music-lab에 저장하기 위해 service_proxy에 헬퍼 추가 후 youtube_publisher에서 _notify_step 끝에 호출.

# service_proxy.py
async def save_pipeline_telegram_msg(pid: int, step: str, msg_id: int) -> None:
    async with httpx.AsyncClient(timeout=10) as client:
        await client.patch(
            f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/telegram-msg",
            json={"step": step, "message_id": msg_id},
        )

music-lab main.py에 endpoint 추가:

class TelegramMsgPatch(BaseModel):
    step: str
    message_id: int

@app.patch("/api/music/pipeline/{pid}/telegram-msg")
def save_telegram_msg(pid: int, req: TelegramMsgPatch):
    p = db.get_pipeline(pid)
    if not p: raise HTTPException(404)
    ids = p["last_telegram_msg_ids"]
    ids[req.step] = req.message_id
    conn = sqlite3.connect(db.DB_PATH)
    conn.execute("UPDATE video_pipelines SET last_telegram_msg_ids = ?, updated_at = ? WHERE id = ?",
                 (json.dumps(ids), db._now(), pid))
    conn.commit(); conn.close()
    return {"ok": True}

reply 매칭용 lookup endpoint:

@app.get("/api/music/pipeline/lookup-by-msg/{msg_id}")
def lookup_by_msg(msg_id: int):
    # DB 전체 스캔 (소수의 active 파이프라인만 — 성능 OK)
    for p in db.list_pipelines(active_only=True):
        for step, mid in p["last_telegram_msg_ids"].items():
            if mid == msg_id:
                return {"pipeline_id": p["id"], "step": step}
    raise HTTPException(404)

agent-office _lookup_message_link 구현:

def _lookup_message_link(msg_id: int) -> Optional[dict]:
    import httpx
    from ..config import MUSIC_LAB_URL
    try:
        resp = httpx.get(f"{MUSIC_LAB_URL}/api/music/pipeline/lookup-by-msg/{msg_id}", timeout=5)
        if resp.status_code == 200:
            return resp.json()
    except Exception:
        pass
    return None
  • Step 5: Polling test
# tests/test_pipeline_polling.py
import pytest
from unittest.mock import AsyncMock, patch
from app.agents.youtube_publisher import YoutubePublisherAgent


@pytest.mark.asyncio
@patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines",
       new=AsyncMock(return_value=[{"id": 1, "state": "cover_pending", "cover_url": "/x.jpg"}]))
@patch("app.agents.youtube_publisher.send_raw", new=AsyncMock(return_value={"ok": True, "message_id": 99}))
async def test_poll_notifies_once_per_state(monkeypatch):
    a = YoutubePublisherAgent(ws_manager=None)
    await a.poll_state_changes()
    await a.poll_state_changes()  # 같은 상태 — 두 번째는 알림 안 함
    from app.agents.youtube_publisher import send_raw as sr
    assert sr.call_count == 1
  • Step 6: Run tests

Run: python -m pytest tests/test_pipeline_polling.py -v Expected: PASS

  • Step 7: Commit
git add agent-office/app/agents/youtube_publisher.py \
        agent-office/app/agents/__init__.py \
        agent-office/app/scheduler.py \
        agent-office/app/service_proxy.py \
        agent-office/app/telegram/conversational.py \
        agent-office/tests/test_pipeline_polling.py \
        music-lab/app/main.py
git commit -m "feat(agent-office): youtube_publisher 에이전트 + 30s 폴링"

Task 11: 프론트엔드 — api.js 헬퍼

Files:

  • Modify: web-ui/src/api.js

  • Step 1: Add helpers

src/api.js 끝에 추가:

// --- Music Pipeline ---
export const listPipelines       = (status='all')   => apiGet(`/api/music/pipeline?status=${status}`);
export const getPipeline         = (id)             => apiGet(`/api/music/pipeline/${id}`);
export const createPipeline      = (track_id)       => apiPost('/api/music/pipeline', { track_id });
export const startPipeline       = (id)             => apiPost(`/api/music/pipeline/${id}/start`);
export const cancelPipeline      = (id)             => apiPost(`/api/music/pipeline/${id}/cancel`);
export const publishPipeline     = (id)             => apiPost(`/api/music/pipeline/${id}/publish`);

// --- Music Setup ---
export const getMusicSetup       = ()               => apiGet('/api/music/setup');
export const updateMusicSetup    = (payload)        => apiPut('/api/music/setup', payload);

// --- YouTube OAuth ---
export const getYoutubeAuthUrl   = ()               => apiGet('/api/music/youtube/auth-url');
export const getYoutubeStatus    = ()               => apiGet('/api/music/youtube/status');
export const disconnectYoutube   = ()               => apiPost('/api/music/youtube/disconnect');
  • Step 2: Commit
git -C web-ui add src/api.js
git -C web-ui commit -m "feat(web-ui): pipeline/setup/youtube API 헬퍼"

Task 12: 프론트엔드 — SetupTab

Files:

  • Create: web-ui/src/pages/music/components/SetupTab.jsx

  • Modify: web-ui/src/pages/music/MusicStudio.css (스타일)

  • Step 1: Implement SetupTab

// SetupTab.jsx
import { useEffect, useState } from 'react';
import {
    getMusicSetup, updateMusicSetup,
    getYoutubeAuthUrl, getYoutubeStatus, disconnectYoutube,
} from '../../../api';

export default function SetupTab() {
    const [setup, setSetup] = useState(null);
    const [yt, setYt] = useState(null);
    const [saving, setSaving] = useState(false);
    const [error, setError] = useState('');

    useEffect(() => {
        Promise.all([getMusicSetup(), getYoutubeStatus()])
            .then(([s, y]) => { setSetup(s); setYt(y); })
            .catch(e => setError(String(e)));
    }, []);

    if (!setup) return <p className="ms-loading">Loading</p>;

    const save = async (patch) => {
        setSaving(true);
        try {
            const next = await updateMusicSetup(patch);
            setSetup(next);
        } catch (e) { setError(String(e)); }
        finally { setSaving(false); }
    };

    const connectYoutube = async () => {
        const { url } = await getYoutubeAuthUrl();
        window.location.href = url;  // OAuth flow
    };

    return (
        <div className="setup-container">
            {error && <div className="ms-error">{error}</div>}

            <section className="setup-card">
                <h3>YouTube 채널 연동</h3>
                {yt && yt.channel_id ? (
                    <div className="setup-channel">
                        <img src={yt.avatar_url} alt="" className="setup-avatar" />
                        <span>{yt.channel_title}</span>
                        <button onClick={async () => { await disconnectYoutube(); setYt({}); }}>
                            연결 해제
                        </button>
                    </div>
                ) : (
                    <button className="button primary" onClick={connectYoutube}>
                        Google 계정 연결
                    </button>
                )}
            </section>

            <section className="setup-card">
                <h3>메타데이터 템플릿</h3>
                <label>제목 패턴
                    <input
                        value={setup.metadata_template.title}
                        onChange={e => setSetup(s => ({...s, metadata_template: {...s.metadata_template, title: e.target.value}}))}
                    />
                </label>
                <label>설명 템플릿
                    <textarea
                        rows={6}
                        value={setup.metadata_template.description}
                        onChange={e => setSetup(s => ({...s, metadata_template: {...s.metadata_template, description: e.target.value}}))}
                    />
                </label>
                <label>기본 태그 (쉼표 구분)
                    <input
                        value={(setup.metadata_template.tags || []).join(', ')}
                        onChange={e => setSetup(s => ({...s, metadata_template: {...s.metadata_template,
                            tags: e.target.value.split(',').map(t => t.trim()).filter(Boolean)}}))}
                    />
                </label>
                <button onClick={() => save({ metadata_template: setup.metadata_template })}>저장</button>
            </section>

            <section className="setup-card">
                <h3>AI 커버 prompt (장르별)</h3>
                {Object.entries(setup.cover_prompts).map(([g, p]) => (
                    <div key={g} className="setup-prompt-row">
                        <span className="setup-prompt-genre">{g}</span>
                        <input
                            value={p}
                            onChange={e => setSetup(s => ({...s, cover_prompts: {...s.cover_prompts, [g]: e.target.value}}))}
                        />
                    </div>
                ))}
                <button onClick={() => save({ cover_prompts: setup.cover_prompts })}>저장</button>
            </section>

            <section className="setup-card">
                <h3>AI 최종 검토 기준</h3>
                {['meta','policy','viewer','trend'].map(k => (
                    <label key={k}>
                        {k} 가중치 ({setup.review_weights[k]})
                        <input type="range" min="0" max="100"
                            value={setup.review_weights[k]}
                            onChange={e => setSetup(s => ({...s, review_weights: {...s.review_weights, [k]: parseInt(e.target.value)}}))}
                        />
                    </label>
                ))}
                <label>임계값 ({setup.review_threshold})
                    <input type="range" min="0" max="100" value={setup.review_threshold}
                        onChange={e => setSetup(s => ({...s, review_threshold: parseInt(e.target.value)}))}
                    />
                </label>
                <button onClick={() => save({ review_weights: setup.review_weights, review_threshold: setup.review_threshold })}>저장</button>
            </section>

            <section className="setup-card">
                <h3>영상 비주얼 기본값</h3>
                <label>해상도
                    <select value={setup.visual_defaults.resolution}
                        onChange={e => setSetup(s => ({...s, visual_defaults: {...s.visual_defaults, resolution: e.target.value}}))}>
                        <option value="1920x1080">1920×1080 (가로)</option>
                        <option value="1080x1920">1080×1920 (세로/Shorts)</option>
                    </select>
                </label>
                <label>스타일
                    <select value={setup.visual_defaults.style}
                        onChange={e => setSetup(s => ({...s, visual_defaults: {...s.visual_defaults, style: e.target.value}}))}>
                        <option value="visualizer">Visualizer (파형)</option>
                    </select>
                </label>
                <button onClick={() => save({ visual_defaults: setup.visual_defaults })}>저장</button>
            </section>

            <section className="setup-card">
                <h3>발행 정책</h3>
                <label>privacy
                    <select value={setup.publish_policy.privacy}
                        onChange={e => setSetup(s => ({...s, publish_policy: {...s.publish_policy, privacy: e.target.value}}))}>
                        <option value="private">Private (비공개)</option>
                        <option value="unlisted">Unlisted</option>
                        <option value="public">Public</option>
                    </select>
                </label>
                <button onClick={() => save({ publish_policy: setup.publish_policy })}>저장</button>
            </section>

            {saving && <div className="setup-saving">저장 ...</div>}
        </div>
    );
}
  • Step 2: Add CSS

MusicStudio.css 끝에 추가:

.setup-container { display:flex; flex-direction:column; gap:16px; padding:16px; }
.setup-card { background:rgba(0,0,0,.3); border:1px solid var(--line); border-radius:14px; padding:16px; }
.setup-card h3 { margin:0 0 12px; font-size:15px; color:var(--text); }
.setup-card label { display:block; margin:8px 0; font-size:12px; color:var(--muted); }
.setup-card input, .setup-card textarea, .setup-card select {
    width:100%; padding:8px; margin-top:4px; background:rgba(255,255,255,.04);
    border:1px solid var(--line); border-radius:8px; color:var(--text); font-size:13px;
}
.setup-channel { display:flex; align-items:center; gap:12px; }
.setup-avatar { width:32px; height:32px; border-radius:50%; }
.setup-prompt-row { display:flex; gap:8px; margin:6px 0; }
.setup-prompt-genre { width:80px; font-size:12px; color:var(--muted); padding-top:8px; }
.setup-saving { position:fixed; bottom:16px; right:16px; background:#222; padding:8px 12px; border-radius:8px; }
  • Step 3: Manual UI verify

Run: cd web-ui && npm run dev Open: http://localhost:3007/lab/music → YouTube 탭 → (still need 6번 추가 후) 구성 탭. Verify 카드 6개 렌더링.

  • Step 4: Commit
git -C web-ui add src/pages/music/components/SetupTab.jsx src/pages/music/MusicStudio.css
git -C web-ui commit -m "feat(web-ui): SetupTab — YouTube 자동화 구성 허브"

Task 13: 프론트엔드 — PipelineTab + 카드 + 시작 모달

Files:

  • Create: web-ui/src/pages/music/components/PipelineCard.jsx

  • Create: web-ui/src/pages/music/components/PipelineStartModal.jsx

  • Create: web-ui/src/pages/music/components/PipelineTab.jsx

  • Modify: web-ui/src/pages/music/MusicStudio.css

  • Step 1: PipelineCard

// PipelineCard.jsx
import { cancelPipeline, publishPipeline } from '../../../api';

const STEP_LABELS = ['커버','영상','썸네','메타','검토','발행'];
const STEP_KEYS   = ['cover','video','thumb','meta','review','publish'];

function stepIndex(state) {
    if (state.startsWith('cover'))   return 0;
    if (state.startsWith('video'))   return 1;
    if (state.startsWith('thumb'))   return 2;
    if (state.startsWith('meta'))    return 3;
    if (state.startsWith('ai_review') || state.startsWith('publish_pending')) return 4;
    if (state.startsWith('publish')) return 5;
    if (state === 'published')       return 6;
    return -1;
}

export default function PipelineCard({ pipeline, onChanged }) {
    const i = stepIndex(pipeline.state);
    const isPending = pipeline.state.endsWith('_pending');
    return (
        <div className="pipeline-card">
            <div className="pipeline-card__head">
                <h4>{pipeline.track_title || `Track #${pipeline.track_id}`}</h4>
                {!['published','cancelled','failed'].includes(pipeline.state) && (
                    <button onClick={async () => { await cancelPipeline(pipeline.id); onChanged(); }}>
                        취소
                    </button>
                )}
            </div>
            <div className="pipeline-progress">
                {STEP_LABELS.map((lbl, idx) => (
                    <div key={lbl} className={`pipeline-dot ${idx <= i ? 'is-done' : ''} ${idx === i ? 'is-current' : ''}`}>
                        <span>{lbl}</span>
                    </div>
                ))}
            </div>
            <div className="pipeline-state">현재: {pipeline.state}</div>
            {pipeline.review && (
                <div className="pipeline-review">
                    AI 검토: <strong>{pipeline.review.verdict}</strong>
                    ({pipeline.review.weighted_total}/100)
                </div>
            )}
            {pipeline.state === 'publish_pending' && (
                <button className="button primary"
                    onClick={async () => { await publishPipeline(pipeline.id); onChanged(); }}>
                    YouTube 업로드
                </button>
            )}
            {pipeline.youtube_video_id && (
                <a href={`https://youtu.be/${pipeline.youtube_video_id}`} target="_blank" rel="noreferrer">
                    유튜브에서 보기
                </a>
            )}
            {pipeline.feedback && pipeline.feedback.length > 0 && (
                <details className="pipeline-feedback">
                    <summary>피드백 히스토리 ({pipeline.feedback.length})</summary>
                    {pipeline.feedback.map(f => (
                        <div key={f.id}> [{f.step}] {f.feedback_text}</div>
                    ))}
                </details>
            )}
        </div>
    );
}
  • Step 2: PipelineStartModal
// PipelineStartModal.jsx
import { useState } from 'react';
import { createPipeline, startPipeline } from '../../../api';

export default function PipelineStartModal({ library, onClose, onCreated }) {
    const [tid, setTid] = useState(library?.[0]?.id || '');
    const [error, setError] = useState('');

    const submit = async () => {
        try {
            const p = await createPipeline(parseInt(tid));
            await startPipeline(p.id);
            onCreated(p);
        } catch (e) { setError(String(e)); }
    };

    return (
        <div className="modal-overlay" onClick={onClose}>
            <div className="modal-body" onClick={e => e.stopPropagation()}>
                <h3> 파이프라인 시작</h3>
                <select value={tid} onChange={e => setTid(e.target.value)}>
                    {(library || []).map(t => (
                        <option key={t.id} value={t.id}>{t.title} ({t.genre})</option>
                    ))}
                </select>
                {error && <div className="ms-error">{error}</div>}
                <div className="modal-actions">
                    <button onClick={onClose}>취소</button>
                    <button className="button primary" onClick={submit}>시작</button>
                </div>
            </div>
        </div>
    );
}
  • Step 3: PipelineTab
// PipelineTab.jsx
import { useEffect, useState, useRef } from 'react';
import { listPipelines } from '../../../api';
import PipelineCard from './PipelineCard';
import PipelineStartModal from './PipelineStartModal';

export default function PipelineTab({ library }) {
    const [pipelines, setPipelines] = useState([]);
    const [filter, setFilter] = useState('active');
    const [modalOpen, setModalOpen] = useState(false);
    const timer = useRef(null);

    const load = async () => {
        try {
            const r = await listPipelines(filter);
            setPipelines(r.pipelines || []);
        } catch (e) { /* swallow */ }
    };

    useEffect(() => {
        load();
        timer.current = setInterval(load, 5000);
        return () => clearInterval(timer.current);
    }, [filter]);

    return (
        <div className="pipeline-container">
            <div className="pipeline-toolbar">
                <button className="button primary" onClick={() => setModalOpen(true)}>+  파이프라인</button>
                <select value={filter} onChange={e => setFilter(e.target.value)}>
                    <option value="active">진행 </option>
                    <option value="all">전체</option>
                </select>
            </div>
            <div className="pipeline-grid">
                {pipelines.map(p => (
                    <PipelineCard key={p.id} pipeline={p} onChanged={load} />
                ))}
                {pipelines.length === 0 && <p className="ms-empty">진행 중인 파이프라인이 없습니다</p>}
            </div>
            {modalOpen && (
                <PipelineStartModal
                    library={library}
                    onClose={() => setModalOpen(false)}
                    onCreated={() => { setModalOpen(false); load(); }}
                />
            )}
        </div>
    );
}
  • Step 4: CSS
.pipeline-container { padding:16px; }
.pipeline-toolbar { display:flex; gap:12px; margin-bottom:16px; }
.pipeline-grid { display:grid; grid-template-columns:repeat(auto-fill, minmax(320px, 1fr)); gap:16px; }
.pipeline-card { background:rgba(0,0,0,.3); border:1px solid var(--line); border-radius:14px; padding:16px; }
.pipeline-card__head { display:flex; justify-content:space-between; margin-bottom:12px; }
.pipeline-progress { display:flex; gap:6px; margin:12px 0; }
.pipeline-dot { flex:1; text-align:center; padding:6px 0; border-radius:8px;
                background:rgba(255,255,255,.05); font-size:11px; color:var(--muted); }
.pipeline-dot.is-done { background:rgba(56,189,248,.2); color:#bae6fd; }
.pipeline-dot.is-current { box-shadow:0 0 8px rgba(56,189,248,.6); }
.pipeline-state { font-size:13px; color:var(--text); margin:8px 0; }
.pipeline-review { font-size:12px; color:var(--muted); }
.pipeline-feedback { margin-top:12px; font-size:12px; }
.modal-overlay { position:fixed; inset:0; background:rgba(0,0,0,.6);
                display:flex; align-items:center; justify-content:center; z-index:1000; }
.modal-body { background:#1a1a2e; padding:24px; border-radius:14px; min-width:320px; }
.modal-actions { display:flex; justify-content:flex-end; gap:8px; margin-top:16px; }
  • Step 5: Commit
git -C web-ui add src/pages/music/components/PipelineCard.jsx \
                  src/pages/music/components/PipelineStartModal.jsx \
                  src/pages/music/components/PipelineTab.jsx \
                  src/pages/music/MusicStudio.css
git -C web-ui commit -m "feat(web-ui): PipelineTab — 진행 중 파이프라인 카드 보드"

Task 14: YoutubeTab 6 서브탭 연결 + Library 트리거

Files:

  • Modify: web-ui/src/pages/music/components/YoutubeTab.jsx

  • Modify: web-ui/src/pages/music/MusicStudio.jsx (Library 카드에 버튼)

  • Step 1: Update YoutubeTab

// YoutubeTab.jsx (replace existing)
import { useState, useEffect } from 'react';
import VideoProjectsTab from './VideoProjectsTab';
import RevenueTab from './RevenueTab';
import TrendsTab from './TrendsTab';
import CompileTab from './CompileTab';
import PipelineTab from './PipelineTab';
import SetupTab from './SetupTab';

export default function YoutubeTab({ library, initialTrackId, onClearInitialTrack, openPipelineFor }) {
    const [subtab, setSubtab] = useState('pipeline');

    useEffect(() => {
        if (initialTrackId) setSubtab('video');
        if (openPipelineFor) setSubtab('pipeline');
    }, [initialTrackId, openPipelineFor]);

    const tabs = [
        ['pipeline','🚀 진행'],
        ['video','🎬 영상 제작'],
        ['compile','🎵 컴파일'],
        ['trends','📊 시장 트렌드'],
        ['revenue','💰 수익 추적'],
        ['setup','⚙️ 구성'],
    ];

    return (
        <div className="yt-container">
            <nav className="yt-subtabs">
                {tabs.map(([key, label]) => (
                    <button key={key} type="button"
                        className={`yt-subtab ${subtab === key ? 'is-active' : ''}`}
                        onClick={() => setSubtab(key)}>{label}</button>
                ))}
            </nav>

            {subtab === 'pipeline' && <PipelineTab library={library} initialTrackId={openPipelineFor} />}
            {subtab === 'video'    && <VideoProjectsTab library={library} initialTrackId={initialTrackId} onClearInitialTrack={onClearInitialTrack} />}
            {subtab === 'compile'  && <CompileTab library={library} />}
            {subtab === 'trends'   && <TrendsTab />}
            {subtab === 'revenue'  && <RevenueTab />}
            {subtab === 'setup'    && <SetupTab />}
        </div>
    );
}
  • Step 2: Library 카드에 영상 파이프라인 트리거

MusicStudio.jsx에서 Library 트랙 카드 액션에 추가:

// 기존 트랙 카드 버튼 옆
<button
  type="button"
  className="ms-lib-action"
  onClick={() => {
      setActiveTab('youtube');
      setOpenPipelineFor(track.id);
  }}>
  🎬 영상 파이프라인
</button>

MusicStudio.jsx 상태에 openPipelineFor 추가, YoutubeTab에 prop 전달.

또한 PipelineTab에서 initialTrackId(prop)로 모달 자동 열기:

// PipelineTab.jsx 추가
useEffect(() => {
    if (initialTrackId) setModalOpen(true);
}, [initialTrackId]);
  • Step 3: Build + manual verify

Run: cd web-ui && npm run build && npm run dev Open: http://localhost:3007/lab/music Verify:

  • YouTube 탭에 6개 서브탭 (진행 / 영상 제작 / 컴파일 / 트렌드 / 수익 / 구성)

  • 구성 탭 카드 6개

  • 진행 탭 빈 상태 "진행 중인 파이프라인이 없습니다"

  • Library 카드의 "🎬 영상 파이프라인" 버튼 → 진행 탭 이동 + 시작 모달 자동

  • Step 4: Commit

git -C web-ui add src/pages/music/components/YoutubeTab.jsx \
                  src/pages/music/MusicStudio.jsx
git -C web-ui commit -m "feat(web-ui): YouTube 6 서브탭 + Library 영상 파이프라인 트리거"

Task 15: docker-compose env + 의존성 + 배포

Files:

  • Modify: docker-compose.yml

  • Modify: music-lab/Dockerfile (only if needs build deps)

  • Modify: nginx/conf.d/default.conf

  • Step 1: docker-compose env

docker-compose.yml music-lab 서비스 environment 섹션에 추가:

environment:
  # 기존 변수들...
  OPENAI_API_KEY: ${OPENAI_API_KEY}
  YOUTUBE_OAUTH_CLIENT_ID: ${YOUTUBE_OAUTH_CLIENT_ID}
  YOUTUBE_OAUTH_CLIENT_SECRET: ${YOUTUBE_OAUTH_CLIENT_SECRET}
  YOUTUBE_OAUTH_REDIRECT_URI: ${YOUTUBE_OAUTH_REDIRECT_URI}
  CLAUDE_HAIKU_MODEL: claude-haiku-4-5-20251001
  CLAUDE_SONNET_MODEL: claude-sonnet-4-6

.env (NAS local — 커밋 X)에 실제 값 추가.

  • Step 2: nginx OAuth callback 노출

nginx/conf.d/default.conf의 music-lab 라우트는 이미 /api/music/이 있어 자동 매칭. callback이 /api/music/youtube/callback이므로 추가 변경 불필요. 다만 외부 redirect URI는:

https://gahusb.synology.me/api/music/youtube/callback

Google Cloud Console OAuth 클라이언트 → 승인된 리디렉션 URI에 위 URL 등록 필수 (수동, 1회).

  • Step 3: Dockerfile (필요 시)

music-lab/Dockerfile이 base image에 libjpeg, fontconfig 등 없으면 추가:

RUN apt-get update && apt-get install -y --no-install-recommends \
    fonts-dejavu-core fontconfig && rm -rf /var/lib/apt/lists/*

(기존 video_producer.py가 PIL/FFmpeg 쓰고 있어 이미 있을 가능성 — 확인 후 필요 시만 추가)

  • Step 4: Commit + push (NAS 자동 배포)
git -C web-backend add docker-compose.yml music-lab/Dockerfile nginx/conf.d/default.conf
git -C web-backend commit -m "chore(infra): pipeline env + nginx callback 라우팅"
git -C web-backend push origin main

배포 완료 후 NAS .env에 값 채우고 docker compose up -d music-lab로 재시작.


Task 16: 통합 테스트 + 수동 E2E

  • Step 1: Integration test

music-lab/tests/test_pipeline_flow.py:

import pytest
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
from app import db


@pytest.fixture
def client(monkeypatch, tmp_path):
    monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
    db.init_db()
    db.save_track({"id": 1, "title": "T", "genre": "lo-fi", "bpm": 85,
                   "key":"C","scale":"minor","moods":[],"instruments":[],
                   "audio_url":"/x.mp3","duration_sec":120})
    return TestClient(app)


@patch("app.pipeline.orchestrator.youtube.upload_video", return_value={"video_id": "VID999"})
@patch("app.pipeline.orchestrator.review.run_4_axis", new=AsyncMock(return_value={
    "metadata_quality":{"score":80,"notes":""},"policy_compliance":{"score":90,"issues":[]},
    "viewer_experience":{"score":80,"notes":""},"trend_alignment":{"score":70,"matched_keywords":[]},
    "weighted_total":80.0,"verdict":"pass","summary":"good","used_fallback":False}))
@patch("app.pipeline.orchestrator.metadata.generate", new=AsyncMock(return_value={
    "title":"X","description":"Y","tags":["lofi"],"category_id":10,"used_fallback":False,"error":None}))
@patch("app.pipeline.thumb.generate", return_value={"url":"/m/t.jpg","used_fallback":False})
@patch("app.pipeline.video.generate", return_value={"url":"/m/v.mp4","used_fallback":False,"duration_sec":120})
@patch("app.pipeline.cover.generate", new=AsyncMock(return_value={"url":"/m/c.jpg","used_fallback":False,"error":None}))
def test_full_pipeline_happy_path(*_, client):
    pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
    client.post(f"/api/music/pipeline/{pid}/start")
    # cover 자동 생성 후 cover_pending
    p = db.get_pipeline(pid)
    assert p["state"] == "cover_pending"
    # 4단계 모두 승인
    for step in ["cover", "video", "thumb", "meta"]:
        client.post(f"/api/music/pipeline/{pid}/feedback",
                     json={"step": step, "intent": "approve"})
    # ai_review 후 publish_pending
    p = db.get_pipeline(pid)
    assert p["state"] == "publish_pending"
    # 발행
    client.post(f"/api/music/pipeline/{pid}/publish")
    p = db.get_pipeline(pid)
    assert p["state"] == "published"
    assert p["youtube_video_id"] == "VID999"
  • Step 2: Run

Run: python -m pytest tests/test_pipeline_flow.py -v Expected: PASS

  • Step 3: Commit
git add music-lab/tests/test_pipeline_flow.py
git commit -m "test(music-lab): 풀 파이프라인 통합 테스트 (mock)"
git push origin main
  • Step 4: 수동 E2E 체크리스트

배포 후 운영 환경에서:

  • Google Cloud Console에서 OAuth 클라이언트 생성, redirect URI 등록
  • NAS .env에 OPENAI_API_KEY, YOUTUBE_OAUTH_* 채우고 music-lab 재시작
  • 구성 탭 → "Google 계정 연결" → 채널명 표시 확인
  • Library 트랙 → "🎬 영상 파이프라인" → 진행 탭 카드 생성
  • 텔레그램에 "커버 검토" 알림 도착 (커버 이미지 URL 포함)
  • 텔레그램에 "승인" 답장 → 다음 단계 카드 도착
  • 어떤 단계에서 "반려, 더 어둡게" 답장 → 같은 단계 재생성 + 알림 재도착
  • AI 검토 verdict + 점수 텔레그램 도착
  • "승인" → YouTube에 private 업로드 → URL 수신 + 진행 탭 카드 published
  • YouTube Studio에서 비공개 영상 확인

Self-Review

Spec coverage: 스펙 15개 섹션 → 본 plan 16 task 매핑 OK.

  • 1 배경: 의도만, plan에 직접 매핑 없음 (OK, 코드 안 만듦)
  • 2 비목표: 명시 (OK)
  • 3 사용자 흐름: Task 8(엔드포인트), Task 13(PipelineTab), Task 10(텔레그램)
  • 4 아키텍처: Task 8, 10
  • 5 상태 머신: Task 2
  • 6 프론트: Task 11, 12, 13, 14
  • 7 백엔드 상세: Task 3-9
  • 8 데이터 모델: Task 1
  • 9 API: Task 8 (12개 + telegram-msg + lookup-by-msg = 14개. 스펙은 12개 명시했으나 telegram 매칭용 2개 보완 필요로 추가)
  • 10 비동기 + 폴백: 각 Task의 fallback 분기에 포함
  • 11 에러 처리: Task 8(409, 404), Task 7(quota), Task 3(timeout 폴백) 등
  • 12 보안: 의존성 추가 외 별도 스텝 없음 (스펙도 "기본"이라 명시) → OK
  • 13 테스트 전략: Task 1, 2, 3, 5, 6, 7, 8, 9, 10, 16에 분산
  • 14 마이그레이션 / 환경: Task 1 (init_db), Task 7 (deps), Task 15 (compose)
  • 15 산출물 / 후속: 주석으로만, 코드 작업 없음

Placeholders: "TBD", "implement later" 등 검색 → 없음. "차후"는 비목표 명시용. Type consistency: state 값들(cover_pending 등) state_machine.py 한 곳에서 관리, 다른 모든 곳 일치. intent 값(approve/reject/unclear) 일관. 함수명 일관 (run_step, classify, next_state_on_approve). 스펙 커버 보정: telegram-msg 저장/조회 endpoint 2개를 plan Task 10에 명시(스펙 12개 + 운영용 2개). 큰 디자인 변경 아님.