# Plan-B-Video — NAS video-lab 신설 + Windows video-render Worker Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 4 외부 영상 생성 API (Sora 2, Veo 3.1, Kling via PiAPI, Seedance 2.0)을 Windows AI 머신(WSL2 Docker) `video-render` worker로 분산. NAS에 새 `video-lab` 컨테이너를 신설하여 게이트(메타·DB·Redis push)만 담당, Windows worker가 4 provider 호출 + mp4 NAS SMB 저장 + webhook으로 NAS DB 업데이트. **Architecture:** NAS gateway → Redis RPUSH `queue:video-render` (`job_type=sora_generation|veo_generation|kling_generation|seedance_generation`) → Windows BLPOP → provider dispatch → 외부 API 호출 + 결과 mp4 다운로드 (Veo는 GCS → SMB 변환) → `/mnt/nas/webpage/data/video/{task_id}.mp4` → HTTP POST `/api/internal/video/update` (X-Internal-Key, 3-layer 차단). 사용자는 폴링 (`GET /api/video/tasks/{task_id}`). **Tech Stack:** Python 3.12 / FastAPI / `redis>=5.0` async / `requests` / `httpx` / `google-cloud-storage` (Veo GCS 다운로드) / Docker Engine in WSL2 Ubuntu 24.04 / cifs SMB to NAS / SQLite **Spec:** `web-backend/docs/superpowers/specs/2026-05-18-nas-windows-distributed-architecture-design.md` §10 SP-7·SP-8, §5 Windows Render Worker 패턴, §6 Redis 큐, §8 internal webhook + auth. **Spec §10 SP-7 갱신 (2026-05-19 박재오 결정)**: 6 provider(Runway·Sora·Veo·Pika·Kling·Luma) → **4 provider (Sora 2 · Veo 3.1 · Kling via PiAPI · Seedance 2.0)**. Runway/Pika/Luma 제외, Seedance 추가. **Prerequisites (✅ 모두 완료):** - Plan-A: web-ai/NAS 캐시 강화 - Plan-B-Base: NAS Redis + Windows WSL2/Docker/SMB - Plan-B-Insta: NAS→Windows 분산 패턴 검증 - Plan-B-Music: 3가지 인프라 함정 해결 (WSL2 mirror mode + Redis chown 999:999 + services/.env NAS_BASE_URL 분기). 모두 영구 적용 — 재발 위험 0. **Plan-B-Music과의 차이:** 1. **NAS 측 신설** — 기존 코드 변경 없음, video-lab 컨테이너 from-scratch 작성 2. **4 provider 모두 다른 API** — Sora(Bearer), Veo(gcloud Bearer + GCS), Kling(PiAPI x-api-key), Seedance(BytePlus Bearer). 단일 base URL 아님 3. **sync helper 없음** — 모두 async (영상은 즉시 응답 endpoint 없음) 4. **library/_sync_with_disk 없음** — video는 task 단위 단일 추적. music-lab 같은 자동 등록 불필요 5. **Veo GCS 우회** — 결과가 `gs://bucket/path/sample_0.mp4`로 떨어짐. worker가 GCS Python SDK로 다운로드 후 SMB write 추가 단계 --- ## Phase 구조 | Phase | 내용 | Task | |-------|------|------| | **1. NAS video-lab 신설** (수신부 + gateway) | Dockerfile + db + auth + internal_router + main + compose + nginx | 1~7 | | **2. Windows video-render** | Dockerfile + nas_client + 4 providers + worker + main + services compose | 8~14 | | **3. NAS push + 박재오 빌드 + end-to-end** | 푸시 + 박재오 측 .env 작성 + 빌드 + Kling 1트랙 검증 + 메모리 | 15~17 | **중요 순서:** Phase 1 먼저 (video-lab 신설 + nginx 차단) → Phase 2 (Windows worker 신설) → Phase 3 (전환 + 검증). Phase 1과 2는 NAS·Windows 다른 머신이라 part-parallel 가능하지만 single-controller 진행 시 순차. --- ## File Structure ### Phase 1 — NAS web-backend (모두 신규) | 파일 | 변경 | 책임 | |------|------|------| | `web-backend/video-lab/Dockerfile` (Create) | python:3.12-alpine | image | | `web-backend/video-lab/requirements.txt` (Create) | fastapi, uvicorn, redis>=5.0 | deps | | `web-backend/video-lab/app/__init__.py` (Create) | 빈 marker | package | | `web-backend/video-lab/app/db.py` (Create) | video_tasks 테이블 sqlite + CRUD | persistence | | `web-backend/video-lab/app/auth.py` (Create) | `verify_internal_key` dependency | X-Internal-Key 검증 | | `web-backend/video-lab/app/internal_router.py` (Create) | `POST /api/internal/video/update` | Windows webhook 수신 | | `web-backend/video-lab/app/main.py` (Create) | FastAPI + redis client + `POST /api/video/generate` + `GET /api/video/tasks/{id}` | API gateway | | `web-backend/video-lab/tests/__init__.py` (Create) | 빈 marker | tests pkg | | `web-backend/video-lab/tests/test_auth.py` (Create) | 3 tests | TDD | | `web-backend/video-lab/tests/test_internal_router.py` (Create) | 5 tests | TDD | | `web-backend/docker-compose.yml` (Modify) | video-lab service 추가 (port 18800) + depends_on redis | compose | | `web-backend/nginx/default.conf` (Modify) | `/api/video/` proxy + `/media/video/` alias + `/api/internal/video/` 3-layer 차단 | routing | ### Phase 2 — Windows web-ai/services/ | 파일 | 변경 | 책임 | |------|------|------| | `web-ai/services/video-render/Dockerfile` (Create) | python:3.12-slim + google-cloud-storage 의존성 | image | | `web-ai/services/video-render/requirements.txt` (Create) | fastapi, requests, redis, httpx, google-cloud-storage, openai | deps | | `web-ai/services/video-render/.env.example` (Create) | OPENAI_API_KEY, GOOGLE_PROJECT_ID, GOOGLE_LOCATION, GOOGLE_GCS_BUCKET, GOOGLE_APPLICATION_CREDENTIALS, PIAPI_API_KEY, SEEDANCE_API_KEY, VIDEO_MEDIA_ROOT, MUSIC_MEDIA_URL_PREFIX | secrets | | `web-ai/services/video-render/nas_client.py` (Create) | `webhook_update_task` (Plan-B-Music nas_client 동일 패턴) | webhook adapter | | `web-ai/services/video-render/providers/__init__.py` (Create) | 빈 marker | package | | `web-ai/services/video-render/providers/sora.py` (Create) | `run_sora_generation` (OpenAI Bearer) | Sora 2 client | | `web-ai/services/video-render/providers/veo.py` (Create) | `run_veo_generation` (gcloud + GCS 다운로드) | Veo 3.1 client | | `web-ai/services/video-render/providers/kling.py` (Create) | `run_kling_generation` (PiAPI x-api-key) | Kling client | | `web-ai/services/video-render/providers/seedance.py` (Create) | `run_seedance_generation` (BytePlus Bearer) | Seedance client | | `web-ai/services/video-render/worker.py` (Create) | Redis BLPOP + job_type 디스패처 4종 + tests | dispatcher | | `web-ai/services/video-render/main.py` (Create) | FastAPI app + lifespan(worker spawn) + /health | entry | | `web-ai/services/video-render/tests/test_nas_client.py` (Create) | 5 tests | TDD | | `web-ai/services/video-render/tests/test_worker.py` (Create) | 5 tests (4 job_type + unknown) | TDD | | `web-ai/services/docker-compose.yml` (Modify) | video-render service 추가 (port 18712) + GCS secret mount | compose | --- ## Task 1: NAS video-lab — Dockerfile + requirements + app package skeleton **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/Dockerfile` - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/requirements.txt` - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/__init__.py` ### Step 1: `requirements.txt` 작성 `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/requirements.txt`: ``` fastapi==0.115.6 uvicorn[standard]==0.30.6 redis>=5.0 pytest>=8.0.0 pytest-asyncio>=0.21 httpx>=0.27.0 ``` ### Step 2: `Dockerfile` 작성 `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/Dockerfile`: ```dockerfile FROM python:3.12-alpine ENV PYTHONUNBUFFERED=1 WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] ``` ### Step 3: 빈 `app/__init__.py` `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/__init__.py`: (빈 파일, 0 bytes) ### Step 4: 커밋 ```bash cd C:/Users/jaeoh/Desktop/workspace/web-backend git add video-lab/Dockerfile video-lab/requirements.txt video-lab/app/__init__.py git commit -m "$(cat <<'EOF' feat(video-lab): Dockerfile + requirements + app package skeleton (SP-8) NAS video-lab 신설. python:3.12-alpine 기반. redis>=5.0 의존성. 영상 외부 호출 없음(gateway만) — 외부 API 의존 없음. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` ## Context - NAS video-lab은 외부 영상 API 호출 코드를 **갖지 않음** (spec §9). Redis push + webhook 수신만. - music-lab Dockerfile (`python:3.12-alpine`)과 동일 패턴. - 작업 디렉토리: `C:/Users/jaeoh/Desktop/workspace/web-backend` ## Report - Status: DONE - 생성한 파일 3개 - 커밋 SHA --- ## Task 2: NAS video-lab — app/db.py (video_tasks 테이블) **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/db.py` video_tasks 단일 테이블. music-lab의 music_tasks와 같은 패턴 + provider 컬럼. ### Step 1: `app/db.py` 작성 `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/db.py`: ```python """SQLite persistence for video_tasks. Single table — task 단위 추적만.""" from __future__ import annotations import json import os import sqlite3 from contextlib import contextmanager from typing import Any, Dict, Optional DB_PATH = os.path.join(os.getenv("VIDEO_DATA_DIR", "/app/data"), "video.db") @contextmanager def _conn(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=5000") try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with _conn() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS video_tasks ( id TEXT PRIMARY KEY, provider TEXT NOT NULL, params TEXT NOT NULL, status TEXT DEFAULT 'queued', progress INTEGER DEFAULT 0, message TEXT DEFAULT '', video_url TEXT, error TEXT, created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """ ) def _row_to_dict(row) -> Dict[str, Any]: return { "id": row["id"], "provider": row["provider"], "params": row["params"], "status": row["status"], "progress": row["progress"], "message": row["message"], "video_url": row["video_url"], "error": row["error"], "created_at": row["created_at"], "updated_at": row["updated_at"], } def create_task(task_id: str, provider: str, params: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO video_tasks (id, provider, params) VALUES (?, ?, ?)", (task_id, provider, json.dumps(params)), ) row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone() return _row_to_dict(row) def update_task( task_id: str, status: str, progress: int, message: str = "", video_url: Optional[str] = None, error: Optional[str] = None, ) -> None: with _conn() as conn: conn.execute( """ UPDATE video_tasks SET status = ?, progress = ?, message = ?, video_url = ?, error = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ? """, (status, progress, message, video_url, error, task_id), ) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone() return _row_to_dict(row) if row else None ``` ### Step 2: smoke test Run: `cd C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab && python -c "from app import db; db.init_db(); t = db.create_task('test-1', 'sora', {'prompt': 'x'}); print(db.get_task('test-1')); db.update_task('test-1', 'succeeded', 100, video_url='/media/video/test-1.mp4'); print(db.get_task('test-1'))"` Expected: 두 줄 dict 출력. 두 번째에 status="succeeded", video_url 포함. (테스트용 `/app/data/video.db`가 생성될 수 있음 — Windows 환경에서 권한 문제 가능. `VIDEO_DATA_DIR` env로 임시 디렉토리 지정 권장.) ### Step 3: 커밋 ```bash cd C:/Users/jaeoh/Desktop/workspace/web-backend git add video-lab/app/db.py git commit -m "$(cat <<'EOF' feat(video-lab): app/db.py — video_tasks 테이블 + CRUD (SP-8) WAL + busy_timeout 표준 fix. create_task / update_task / get_task. provider 컬럼 추가(Sora/Veo/Kling/Seedance 구분). video_url 필드. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 3: NAS video-lab — auth.py + tests **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/auth.py` - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/__init__.py` - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_auth.py` Plan-B-Music/Insta와 동일 패턴 복제. ### Step 1: 실패 테스트 작성 `tests/__init__.py`: (빈 파일) `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_auth.py`: ```python """verify_internal_key — Windows video-render webhook 인증.""" import pytest from fastapi import HTTPException from app.auth import verify_internal_key def test_valid_key_passes(monkeypatch): monkeypatch.setenv("INTERNAL_API_KEY", "secret123") verify_internal_key(x_internal_key="secret123") def test_invalid_key_raises_401(monkeypatch): monkeypatch.setenv("INTERNAL_API_KEY", "secret123") with pytest.raises(HTTPException) as exc: verify_internal_key(x_internal_key="wrong") assert exc.value.status_code == 401 def test_missing_env_key_raises_401(monkeypatch): monkeypatch.delenv("INTERNAL_API_KEY", raising=False) with pytest.raises(HTTPException) as exc: verify_internal_key(x_internal_key="any") assert exc.value.status_code == 401 ``` ### Step 2: 테스트 실패 확인 Run: `cd C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab && python -m pytest tests/test_auth.py -v` Expected: FAIL — `app.auth` 미존재. ### Step 3: `app/auth.py` 작성 `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/auth.py`: ```python """SP-8 — Windows worker → NAS video-lab internal webhook 인증. X-Internal-Key 헤더를 .env의 INTERNAL_API_KEY와 비교. 서버 측 키 미설정 시 401 (안전한 기본값). """ from __future__ import annotations import os from fastapi import Header, HTTPException def verify_internal_key(x_internal_key: str = Header(...)): expected = os.getenv("INTERNAL_API_KEY") if not expected: raise HTTPException(401, "INTERNAL_API_KEY not configured on server") if x_internal_key != expected: raise HTTPException(401, "Invalid X-Internal-Key") ``` ### Step 4: 테스트 통과 Run: `python -m pytest tests/test_auth.py -v` Expected: 3 PASS. ### Step 5: 커밋 ```bash git add video-lab/app/auth.py video-lab/tests/__init__.py video-lab/tests/test_auth.py git commit -m "$(cat <<'EOF' feat(video-lab): verify_internal_key + tests (SP-8) X-Internal-Key 검증 dependency. insta-lab/music-lab 동일 패턴. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 4: NAS video-lab — internal_router.py + tests **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/internal_router.py` - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_internal_router.py` ### Step 1: 실패 테스트 작성 `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_internal_router.py`: ```python """POST /api/internal/video/update — Windows video-render webhook.""" import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from app.internal_router import router from app import db @pytest.fixture(autouse=True) def _set_key(monkeypatch): monkeypatch.setenv("INTERNAL_API_KEY", "test-secret") @pytest.fixture def client(tmp_path, monkeypatch): monkeypatch.setenv("VIDEO_DATA_DIR", str(tmp_path)) monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "test_video.db")) db.init_db() app = FastAPI() app.include_router(router) return TestClient(app) def _make_task(): tid = "video-task-1" db.create_task(tid, "sora", {"prompt": "test"}) return tid def test_update_with_valid_key_updates_db(client): tid = _make_task() r = client.post( "/api/internal/video/update", headers={"X-Internal-Key": "test-secret"}, json={"task_id": tid, "status": "processing", "progress": 30, "message": "downloading"}, ) assert r.status_code == 200 task = db.get_task(tid) assert task["status"] == "processing" assert task["progress"] == 30 assert task["message"] == "downloading" def test_update_with_invalid_key_returns_401(client): tid = _make_task() r = client.post( "/api/internal/video/update", headers={"X-Internal-Key": "wrong"}, json={"task_id": tid, "status": "processing", "progress": 30}, ) assert r.status_code == 401 def test_update_succeeded_with_video_url(client): tid = _make_task() r = client.post( "/api/internal/video/update", headers={"X-Internal-Key": "test-secret"}, json={ "task_id": tid, "status": "succeeded", "progress": 100, "message": "완료", "video_url": "/media/video/video-task-1.mp4", }, ) assert r.status_code == 200 task = db.get_task(tid) assert task["status"] == "succeeded" assert task["video_url"] == "/media/video/video-task-1.mp4" def test_update_failed_records_error(client): tid = _make_task() r = client.post( "/api/internal/video/update", headers={"X-Internal-Key": "test-secret"}, json={"task_id": tid, "status": "failed", "progress": 0, "error": "Sora API rate limit"}, ) assert r.status_code == 200 task = db.get_task(tid) assert task["status"] == "failed" assert "Sora" in (task.get("error") or "") def test_update_unknown_task_returns_404(client): r = client.post( "/api/internal/video/update", headers={"X-Internal-Key": "test-secret"}, json={"task_id": "nonexistent", "status": "processing", "progress": 10}, ) assert r.status_code == 404 ``` ### Step 2: 테스트 실패 확인 Run: `python -m pytest tests/test_internal_router.py -v` Expected: FAIL — `app.internal_router` 미존재. ### Step 3: `app/internal_router.py` 작성 `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/internal_router.py`: ```python """SP-8 — Windows video-render → NAS video-lab internal webhook. POST /api/internal/video/update - X-Internal-Key 인증 필수 - video_tasks row update (status, progress, message, video_url, error) """ from __future__ import annotations import logging from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from . import db from .auth import verify_internal_key logger = logging.getLogger(__name__) router = APIRouter() class UpdatePayload(BaseModel): task_id: str status: str = Field(..., description="processing|succeeded|failed") progress: int = Field(..., ge=0, le=100) message: str = "" video_url: Optional[str] = None error: Optional[str] = None @router.post( "/api/internal/video/update", dependencies=[Depends(verify_internal_key)], ) def video_update(payload: UpdatePayload): task = db.get_task(payload.task_id) if task is None: raise HTTPException(404, f"task not found: {payload.task_id}") db.update_task( payload.task_id, payload.status, payload.progress, message=payload.message, video_url=payload.video_url, error=payload.error, ) logger.info( "internal/video/update task=%s status=%s progress=%d", payload.task_id, payload.status, payload.progress, ) return {"ok": True} ``` ### Step 4: 테스트 통과 Run: `python -m pytest tests/test_internal_router.py -v` Expected: 5 PASS. ### Step 5: 커밋 ```bash git add video-lab/app/internal_router.py video-lab/tests/test_internal_router.py git commit -m "$(cat <<'EOF' feat(video-lab): /api/internal/video/update endpoint + tests (SP-8) UpdatePayload schema (task_id/status/progress/message/video_url/error). 404 if task not found. insta/music-lab과 동일 패턴 + video_url 필드. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 5: NAS video-lab — main.py (FastAPI + redis client + 2 endpoint) **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/main.py` spec §10 SP-8 명시 2 endpoint: `POST /api/video/generate` + `GET /api/video/tasks/{id}`. ### Step 1: `app/main.py` 작성 `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/main.py`: ```python """FastAPI entrypoint for video-lab. POST /api/video/generate — provider + params → Redis push → task_id 반환 GET /api/video/tasks/{id} — DB 조회 """ from __future__ import annotations import json import logging import os import uuid from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional import redis.asyncio as aioredis from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from . import db from .internal_router import router as internal_router logger = logging.getLogger(__name__) CORS_ALLOW_ORIGINS = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080") REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379") redis_client = aioredis.from_url(REDIS_URL, decode_responses=False) SUPPORTED_PROVIDERS = {"sora", "veo", "kling", "seedance"} app = FastAPI() app.include_router(internal_router) app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in CORS_ALLOW_ORIGINS.split(",")], allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"], allow_headers=["Content-Type"], ) @app.on_event("startup") def on_startup(): db.init_db() @app.get("/health") def health(): return {"ok": True, "service": "video-lab"} @app.get("/api/video/providers") def list_providers(): """4 provider 항상 노출 (key 누락은 worker가 failed 보고).""" return {"providers": [ {"id": "sora", "name": "Sora 2", "models": ["sora-2", "sora-2-pro"], "durations": [8, 16, 20], "sizes": ["1280x720", "1920x1080", "1080x1920", "848x480", "480x848"]}, {"id": "veo", "name": "Veo 3.1", "models": ["veo-3.1-generate-001", "veo-3.1-fast-generate-001"], "durations": [4, 6, 8], "aspect_ratios": ["16:9", "9:16"]}, {"id": "kling", "name": "Kling", "models": ["1.5", "1.6", "2.1", "2.1-master", "2.5", "2.6"], "durations": [5, 10], "aspect_ratios": ["16:9", "9:16", "1:1"]}, {"id": "seedance", "name": "Seedance 2.0", "models": ["seedance-2.0"], "durations": [4, 5, 6, 8, 10, 12, 15], "aspect_ratios": ["16:9", "9:16", "1:1", "4:3"]}, ]} class GenerateRequest(BaseModel): provider: str = Field(..., description="sora|veo|kling|seedance") model: Optional[str] = None prompt: str # Optional 공통 duration: Optional[int] = None aspect_ratio: Optional[str] = None image_url: Optional[str] = None negative_prompt: Optional[str] = None # Provider 별 추가 키는 extra 허용 extra: Optional[Dict[str, Any]] = None class Config: extra = "allow" async def _push_render_job(task_id: str, job_type: str, params: dict) -> None: """Redis queue:video-render에 push.""" kst = timezone(timedelta(hours=9)) payload = { "task_id": task_id, "kind": "video", "job_type": job_type, "params": params, "submitted_at": datetime.now(kst).isoformat(), } await redis_client.rpush("queue:video-render", json.dumps(payload)) @app.post("/api/video/generate") async def generate_video(req: GenerateRequest): """영상 생성 — Redis 큐로 Windows video-render에 위임.""" if req.provider not in SUPPORTED_PROVIDERS: raise HTTPException(400, f"지원하지 않는 provider: {req.provider} (supported: {sorted(SUPPORTED_PROVIDERS)})") task_id = str(uuid.uuid4()) params = req.model_dump(exclude_none=True) db.create_task(task_id, req.provider, params) job_type = f"{req.provider}_generation" # sora_generation, veo_generation, kling_generation, seedance_generation await _push_render_job(task_id, job_type, params) return {"task_id": task_id, "provider": req.provider} @app.get("/api/video/tasks/{task_id}") def get_task_status(task_id: str): t = db.get_task(task_id) if not t: raise HTTPException(404, "task not found") return t ``` ### Step 2: smoke test Run: `cd video-lab && python -c "from app.main import app, redis_client; paths=[r.path for r in app.routes if hasattr(r,'path')]; [print(p) for p in paths]"` Expected: `/health`, `/api/video/providers`, `/api/video/generate`, `/api/video/tasks/{task_id}`, `/api/internal/video/update` 출력. ### Step 3: 회귀 — 이전 tests 통과 Run: `cd video-lab && python -m pytest tests/ -v` Expected: 8 PASS (test_auth 3 + test_internal_router 5). ### Step 4: 커밋 ```bash git add video-lab/app/main.py git commit -m "$(cat <<'EOF' feat(video-lab): main.py — FastAPI + redis client + 2 endpoint (SP-8) POST /api/video/generate (provider validation + Redis push + task_id 반환). GET /api/video/tasks/{id} (DB 조회). GET /api/video/providers (4 provider 메타). SUPPORTED_PROVIDERS = sora/veo/kling/seedance. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 6: NAS — docker-compose.yml entry + nginx 라우팅 + media alias **Files:** - Modify: `C:/Users/jaeoh/Desktop/workspace/web-backend/docker-compose.yml` — video-lab service 추가 - Modify: `C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf` — `/api/video/` proxy + `/media/video/` alias ### Step 1: docker-compose.yml — music-lab 다음에 video-lab service 추가 `C:/Users/jaeoh/Desktop/workspace/web-backend/docker-compose.yml`에서 music-lab service 블록(약 line 55-90) **다음에** 추가: ```yaml video-lab: build: context: ./video-lab container_name: video-lab restart: unless-stopped ports: - "18800:8000" environment: - TZ=${TZ:-Asia/Seoul} - REDIS_URL=${REDIS_URL:-redis://redis:6379} - INTERNAL_API_KEY=${INTERNAL_API_KEY:-} - VIDEO_DATA_DIR=/app/data - CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080} volumes: - ${RUNTIME_PATH}\data\video:/app/data depends_on: - redis healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] interval: 60s timeout: 5s retries: 3 ``` 또한 `frontend` service의 `volumes:` 섹션에 video media mount 추가 (다른 mount들 사이에 1줄): ```yaml - ${RUNTIME_PATH}\data\video:/data/video:ro ``` (이미 `${RUNTIME_PATH}\data\videos:/data/videos:ro`가 있을 수 있음. 그건 music-lab의 videos. video-lab은 별도 `/data/video` 단수형.) 또한 `frontend` service의 `depends_on:` 리스트에 `- video-lab` 추가 (다른 -lab 사이에 1줄). ### Step 2: nginx default.conf — proxy + media alias `C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf`에서 다음 3 블록 추가: **A. `/api/video/` proxy** — 기존 `/api/insta/` 또는 `/api/music/` 블록 옆에: ```nginx # video-lab — 영상 생성 gateway location /api/video/ { resolver 127.0.0.11 valid=10s; set $video_backend video-lab:8000; proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_pass http://$video_backend$request_uri; proxy_read_timeout 120s; proxy_connect_timeout 10s; } ``` **B. `/media/video/` alias** — 기존 `/media/music/` 블록 옆에 (단수형 — videos와 구분): ```nginx # video media — Nginx 직접 mp4 서빙 location ^~ /media/video/ { alias /data/video/; expires 1d; add_header Cache-Control "public, max-age=86400" always; add_header Accept-Ranges bytes always; autoindex off; } ``` ### Step 3: 검증 Run: `grep -nE "video-lab|/api/video/|/media/video/" C:/Users/jaeoh/Desktop/workspace/web-backend/docker-compose.yml C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf` Expected: 각 항목이 모두 표시됨. ### Step 4: 커밋 ```bash cd C:/Users/jaeoh/Desktop/workspace/web-backend git add docker-compose.yml nginx/default.conf git commit -m "$(cat <<'EOF' feat(video-lab): docker-compose entry + nginx routing (SP-8) video-lab service: port 18800, REDIS_URL/INTERNAL_API_KEY env, depends_on redis, /app/data volume mount. nginx: /api/video/ proxy + /media/video/ direct serve alias. frontend depends_on + volume mount 추가. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 7: nginx 3-layer 차단 /api/internal/video/ **Files:** - Modify: `C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf` insta/music 블록과 동일 패턴 복제. ### Step 1: default.conf에 차단 블록 추가 `/api/internal/music/` 블록(이전 plan에서 작성) 바로 다음에 추가: ```nginx # Plan-B-Video — Windows video-render → NAS video-lab internal webhook # Layer 1·2: nginx IP 화이트리스트 (LAN + Tailscale) # Layer 3: X-Internal-Key (FastAPI dependency) location /api/internal/video/ { allow 192.168.45.0/24; # LAN 화이트리스트 allow 100.64.0.0/10; # Tailscale CGNAT allow 127.0.0.1; # NAS 내부 deny all; resolver 127.0.0.11 valid=10s; set $video_internal_backend video-lab:8000; proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Internal-Key $http_x_internal_key; proxy_pass http://$video_internal_backend$request_uri; } ``` ### Step 2: 검증 Run: `grep -c "location /api/internal/" C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf` Expected: `3` (insta + music + video). ### Step 3: 커밋 ```bash git add nginx/default.conf git commit -m "$(cat <<'EOF' feat(nginx): /api/internal/video/ 3-layer 차단 (SP-8) LAN(192.168.45.0/24) + Tailscale(100.64.0.0/10) + 127.0.0.1 allow. deny all. X-Internal-Key forward → video-lab:8000. insta/music 블록과 동일 패턴. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 8: Windows video-render — Dockerfile + requirements + .env.example + nas_client.py **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/Dockerfile` - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/requirements.txt` - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/.env.example` - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/nas_client.py` - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/tests/__init__.py` - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/tests/test_nas_client.py` ### Step 1: `requirements.txt` 작성 ``` fastapi==0.115.6 uvicorn[standard]==0.34.0 requests==2.32.3 redis>=5.0 httpx>=0.27 openai>=1.50.0 google-cloud-storage>=2.18.0 pytest>=8.0 pytest-asyncio>=0.24 respx>=0.21 ``` ### Step 2: `Dockerfile` 작성 ```dockerfile FROM python:3.12-slim-bookworm ENV PYTHONUNBUFFERED=1 WORKDIR /app RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ && rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt COPY . . EXPOSE 8000 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] ``` ### Step 3: `.env.example` 작성 ``` # Plan-B-Video — Windows video-render worker # NAS Redis 큐 REDIS_URL=redis://192.168.45.54:6379 # NAS internal webhook (video-lab port 18800) NAS_BASE_URL=http://192.168.45.54:18800 INTERNAL_API_KEY=__copy_from_nas_dotenv__ # Sora 2 (OpenAI) OPENAI_API_KEY=__paste_openai_key__ # Veo 3.1 (Google Vertex AI) GOOGLE_PROJECT_ID=__paste_gcp_project_id__ GOOGLE_LOCATION=us-central1 GOOGLE_GCS_BUCKET=__paste_gcs_bucket_name__ GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json # 컨테이너 안 service account JSON 경로 # Kling (PiAPI gateway) PIAPI_API_KEY=__paste_piapi_key__ # Seedance 2.0 (BytePlus) SEEDANCE_API_KEY=__paste_seedance_key__ # NAS SMB mount 안의 video 디렉토리 VIDEO_MEDIA_ROOT=/mnt/nas/webpage/data/video # nginx 서빙 prefix (NAS webhook payload용) VIDEO_MEDIA_URL_PREFIX=/media/video ``` ### Step 4: 실패 테스트 작성 `tests/__init__.py`: (빈 파일) `tests/test_nas_client.py`: ```python """nas_client — webhook adapter for video-render.""" import pytest import respx import httpx from nas_client import webhook_update_task @pytest.fixture(autouse=True) def _env(monkeypatch): monkeypatch.setenv("NAS_BASE_URL", "http://nas-test:18800") monkeypatch.setenv("INTERNAL_API_KEY", "test-key") @respx.mock def test_webhook_update_task_sends_x_internal_key(): route = respx.post("http://nas-test:18800/api/internal/video/update").mock( return_value=httpx.Response(200, json={"ok": True}) ) webhook_update_task("task-1", "processing", 30, message="downloading") assert route.called req = route.calls[0].request assert req.headers["X-Internal-Key"] == "test-key" import json body = json.loads(req.content) assert body["task_id"] == "task-1" assert body["status"] == "processing" assert body["progress"] == 30 @respx.mock def test_webhook_update_task_with_video_url(): route = respx.post("http://nas-test:18800/api/internal/video/update").mock( return_value=httpx.Response(200, json={"ok": True}) ) webhook_update_task("task-2", "succeeded", 100, message="완료", video_url="/media/video/task-2.mp4") import json payload = json.loads(route.calls[0].request.content) assert payload["video_url"] == "/media/video/task-2.mp4" assert payload["status"] == "succeeded" @respx.mock def test_webhook_update_task_with_error(): route = respx.post("http://nas-test:18800/api/internal/video/update").mock( return_value=httpx.Response(200, json={"ok": True}) ) webhook_update_task("task-3", "failed", 0, error="Sora API rate limit") import json payload = json.loads(route.calls[0].request.content) assert payload["error"] == "Sora API rate limit" @respx.mock def test_webhook_swallows_network_error(caplog): respx.post("http://nas-test:18800/api/internal/video/update").mock( side_effect=httpx.ConnectError("no host") ) webhook_update_task("task-5", "processing", 10) assert "task-5" in caplog.text @respx.mock def test_webhook_swallows_non_200(caplog): respx.post("http://nas-test:18800/api/internal/video/update").mock( return_value=httpx.Response(500, text="server error") ) webhook_update_task("task-6", "processing", 50) # 200 아닌 응답도 raise 안 함, error log만 assert "task-6" in caplog.text ``` ### Step 5: 테스트 실패 확인 Run: `cd C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render && python -m pytest tests/test_nas_client.py -v` Expected: FAIL — `nas_client` 미존재. ### Step 6: `nas_client.py` 작성 ```python """NAS webhook 어댑터 — Windows worker가 NAS DB 직접 접근 못하므로 HTTP로 위임. Plan-B-Music nas_client와 동일 패턴. """ from __future__ import annotations import logging import os from typing import Any, Dict, Optional import httpx logger = logging.getLogger(__name__) _TIMEOUT = 10.0 def _post(payload: Dict[str, Any]) -> None: nas_base_url = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18800") internal_api_key = os.getenv("INTERNAL_API_KEY", "") url = f"{nas_base_url}/api/internal/video/update" try: r = httpx.post( url, headers={"X-Internal-Key": internal_api_key}, json=payload, timeout=_TIMEOUT, ) if r.status_code != 200: logger.error("webhook %s returned %d: %s", payload.get("task_id"), r.status_code, r.text[:200]) except Exception: logger.exception("webhook %s 호출 실패", payload.get("task_id")) def webhook_update_task( task_id: str, status: str, progress: int, message: str = "", video_url: Optional[str] = None, error: Optional[str] = None, ) -> None: payload: Dict[str, Any] = { "task_id": task_id, "status": status, "progress": progress, "message": message, } if video_url is not None: payload["video_url"] = video_url if error is not None: payload["error"] = error _post(payload) ``` ### Step 7: 테스트 통과 Run: `python -m pytest tests/test_nas_client.py -v` Expected: 5 PASS. ### Step 8: 커밋 ```bash cd C:/Users/jaeoh/Desktop/workspace/web-ai git add services/video-render/Dockerfile services/video-render/requirements.txt services/video-render/.env.example services/video-render/nas_client.py services/video-render/tests/__init__.py services/video-render/tests/test_nas_client.py git commit -m "$(cat <<'EOF' feat(video-render): scaffold + nas_client webhook adapter (SP-7) Dockerfile (python:3.12-slim), requirements (openai + google-cloud-storage + httpx + redis). .env.example: OPENAI/GOOGLE/PIAPI/SEEDANCE keys + VIDEO_MEDIA_ROOT. nas_client.webhook_update_task: call-time os.getenv (테스트 격리), respx mock 5 tests. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 9: Windows video-render — providers/sora.py **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/__init__.py` - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/sora.py` OpenAI Sora 2 API. `POST /v1/videos` → polling → `GET /v1/videos/{id}/content?variant=video` 다운로드. ⚠️ Sora 2 API deprecated, shutdown 2026-09-24. plan 진행은 명시된 spec 따름. ### Step 1: 빈 `providers/__init__.py` `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/__init__.py`: (빈 파일) ### Step 2: `providers/sora.py` 작성 ```python """Sora 2 video generation — OpenAI Videos API. POST /v1/videos → poll GET /v1/videos/{id} → GET /v1/videos/{id}/content download. ⚠️ Deprecated, shutdown 2026-09-24. Spec 진행은 박재오 결정 따름. """ from __future__ import annotations import logging import os import time from typing import Optional import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) SORA_BASE_URL = "https://api.openai.com/v1" VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") POLL_INTERVAL = 15 # OpenAI 권장: 10~20초 POLL_MAX_ATTEMPTS = 40 # 최대 ~10분 DEFAULT_MODEL = "sora-2" def _headers() -> dict: api_key = os.getenv("OPENAI_API_KEY", "") return { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } def run_sora_generation(task_id: str, params: dict) -> None: """Sora 2로 영상 생성 → mp4 → NAS SMB 저장 → webhook.""" try: if not os.getenv("OPENAI_API_KEY"): webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)") return webhook_update_task(task_id, "processing", 5, "Sora API 호출 중...") payload = { "model": params.get("model") or DEFAULT_MODEL, "prompt": params["prompt"][:5000], } if params.get("duration"): payload["seconds"] = params["duration"] if params.get("size"): payload["size"] = params["size"] elif params.get("aspect_ratio") == "9:16": payload["size"] = "1080x1920" elif params.get("aspect_ratio") == "16:9": payload["size"] = "1920x1080" resp = requests.post(f"{SORA_BASE_URL}/videos", headers=_headers(), json=payload, timeout=30) if resp.status_code not in (200, 201): webhook_update_task(task_id, "failed", 0, "", error=f"Sora API 오류: {resp.status_code} {resp.text[:300]}") return body = resp.json() video_id = body.get("id", "") if not video_id: webhook_update_task(task_id, "failed", 0, "", error="Sora 응답에 video id 없음") return webhook_update_task(task_id, "processing", 15, f"Sora 작업 생성됨 (id={video_id[:16]})") # 폴링 for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) sr = requests.get(f"{SORA_BASE_URL}/videos/{video_id}", headers=_headers(), timeout=30) if sr.status_code != 200: continue sd = sr.json() status = sd.get("status", "") progress = sd.get("progress", 0) scaled = min(15 + int(progress * 0.65), 79) webhook_update_task(task_id, "processing", scaled, f"Sora 생성 중... {progress}%") if status == "completed": break elif status == "failed": err = sd.get("error", {}).get("message", "Sora 작업 실패") webhook_update_task(task_id, "failed", 0, "", error=err) return else: webhook_update_task(task_id, "failed", 0, "", error="Sora 폴링 timeout (10분)") return # 다운로드 webhook_update_task(task_id, "processing", 80, "Sora 결과 다운로드 중...") filename = f"{task_id}.mp4" os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) dl = requests.get( f"{SORA_BASE_URL}/videos/{video_id}/content", headers=_headers(), params={"variant": "video"}, stream=True, timeout=300, ) dl.raise_for_status() with open(file_path, "wb") as f: for chunk in dl.iter_content(chunk_size=8192): f.write(chunk) local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" webhook_update_task(task_id, "succeeded", 100, "Sora 생성 완료", video_url=local_url) except requests.Timeout: webhook_update_task(task_id, "failed", 0, "", error="Sora API 타임아웃") except Exception as e: logger.exception("Sora generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e)) ``` ### Step 3: 임포트 smoke Run: `cd C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render && python -c "from providers.sora import run_sora_generation; print('OK')"` Expected: `OK`. ### Step 4: 커밋 ```bash git add services/video-render/providers/__init__.py services/video-render/providers/sora.py git commit -m "$(cat <<'EOF' feat(video-render): providers/sora.py — Sora 2 client (SP-7) POST /v1/videos → GET /v1/videos/{id} 폴링 (15초 × 40) → /content?variant=video 다운로드. sora-2 / sora-2-pro 모델. aspect_ratio → size 매핑. ⚠️ OpenAI Sora 2 API deprecated 2026-09-24. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 10: Windows video-render — providers/veo.py (GCS 다운로드) **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/veo.py` Veo 3.1 — Vertex AI long-running operation. 결과가 GCS bucket의 mp4. `google-cloud-storage`로 다운로드. ### Step 1: `providers/veo.py` 작성 ```python """Veo 3.1 video generation — Google Vertex AI. POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 → 결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB. """ from __future__ import annotations import logging import os import subprocess import time from typing import Optional import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") POLL_INTERVAL = 12 # Veo는 30~120초 소요 POLL_MAX_ATTEMPTS = 50 # 최대 ~10분 DEFAULT_MODEL = "veo-3.1-fast-generate-001" def _gcloud_access_token() -> Optional[str]: """GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행. google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용. REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행. """ try: from google.auth import default as google_default_auth from google.auth.transport.requests import Request as GoogleAuthRequest credentials, _ = google_default_auth( scopes=["https://www.googleapis.com/auth/cloud-platform"], ) credentials.refresh(GoogleAuthRequest()) return credentials.token except Exception: logger.exception("Google credentials refresh 실패") return None def _download_gcs(gcs_uri: str, local_path: str) -> bool: """gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환.""" try: from google.cloud import storage as gcs_storage if not gcs_uri.startswith("gs://"): return False without_scheme = gcs_uri[len("gs://"):] bucket_name, blob_path = without_scheme.split("/", 1) client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID")) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) blob.download_to_filename(local_path) return True except Exception: logger.exception("GCS 다운로드 실패: %s", gcs_uri) return False def run_veo_generation(task_id: str, params: dict) -> None: """Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook.""" try: project_id = os.getenv("GOOGLE_PROJECT_ID", "") location = os.getenv("GOOGLE_LOCATION", "us-central1") gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "") if not project_id or not gcs_bucket: webhook_update_task(task_id, "failed", 0, "", error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정") return token = _gcloud_access_token() if not token: webhook_update_task(task_id, "failed", 0, "", error="Google access token 발행 실패 (서비스 계정 JSON 확인)") return webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...") model_id = params.get("model") or DEFAULT_MODEL endpoint_base = ( f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}" f"/locations/{location}/publishers/google/models/{model_id}" ) headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } body = { "instances": [{"prompt": params["prompt"]}], "parameters": { "storageUri": f"gs://{gcs_bucket}/veo/{task_id}/", "sampleCount": 1, "aspectRatio": params.get("aspect_ratio") or "16:9", }, } if params.get("duration"): body["parameters"]["duration"] = params["duration"] if params.get("negative_prompt"): body["parameters"]["negativePrompt"] = params["negative_prompt"] resp = requests.post(f"{endpoint_base}:predictLongRunning", headers=headers, json=body, timeout=30) if resp.status_code != 200: webhook_update_task(task_id, "failed", 0, "", error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}") return op_name = resp.json().get("name", "") if not op_name: webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음") return webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨") # 폴링 — fetchPredictOperation gcs_uri = None for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) fetch = requests.post( f"{endpoint_base}:fetchPredictOperation", headers=headers, json={"operationName": op_name}, timeout=30, ) if fetch.status_code != 200: continue fd = fetch.json() done = fd.get("done", False) scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...") if done: if "error" in fd: webhook_update_task(task_id, "failed", 0, "", error=f"Veo 작업 실패: {fd['error'].get('message','?')}") return videos = (fd.get("response") or {}).get("videos") or [] if not videos: webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음") return gcs_uri = videos[0].get("gcsUri", "") break else: webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)") return if not gcs_uri: webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음") return webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...") filename = f"{task_id}.mp4" os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) ok = _download_gcs(gcs_uri, file_path) if not ok: webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}") return local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url) except requests.Timeout: webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃") except Exception as e: logger.exception("Veo generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e)) ``` ### Step 2: 임포트 smoke Run: `python -c "from providers.veo import run_veo_generation; print('OK')"` Expected: `OK` (google.cloud.storage import 안 하므로 OK. 실제 호출 시 import). ### Step 3: 커밋 ```bash git add services/video-render/providers/veo.py git commit -m "$(cat <<'EOF' feat(video-render): providers/veo.py — Veo 3.1 Vertex AI client (SP-7) predictLongRunning → fetchPredictOperation 폴링 (12초 × 50). 결과 gs://bucket/veo/{task_id}/sample_0.mp4 → google-cloud-storage SDK로 다운로드 → NAS SMB. GOOGLE_PROJECT_ID/LOCATION/GCS_BUCKET/APPLICATION_CREDENTIALS env. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 11: Windows video-render — providers/kling.py (PiAPI 경유) **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/kling.py` Kling AI — PiAPI gateway 사용. `POST /api/v1/task` + GET task status (PiAPI 표준 패턴). ### Step 1: `providers/kling.py` 작성 ```python """Kling AI video generation — PiAPI gateway 경유. POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드. """ from __future__ import annotations import logging import os import time from typing import Optional import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) PIAPI_BASE_URL = "https://api.piapi.ai/api/v1" VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") POLL_INTERVAL = 10 # Kling은 30~180초 POLL_MAX_ATTEMPTS = 60 # 최대 10분 DEFAULT_VERSION = "2.6" def _headers() -> dict: api_key = os.getenv("PIAPI_API_KEY", "") return { "x-api-key": api_key, "Content-Type": "application/json", } def run_kling_generation(task_id: str, params: dict) -> None: """Kling으로 영상 생성 → mp4 → NAS SMB → webhook.""" try: if not os.getenv("PIAPI_API_KEY"): webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정") return webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...") input_obj = { "prompt": params["prompt"][:2500], "duration": params.get("duration", 5), "aspect_ratio": params.get("aspect_ratio", "16:9"), "mode": params.get("mode", "std"), "version": params.get("model") or DEFAULT_VERSION, } if params.get("negative_prompt"): input_obj["negative_prompt"] = params["negative_prompt"][:2500] if params.get("cfg_scale") is not None: input_obj["cfg_scale"] = str(params["cfg_scale"]) if params.get("image_url"): input_obj["image_url"] = params["image_url"] body = { "model": "kling", "task_type": "video_generation", "input": input_obj, "config": {"service_mode": "public"}, } resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30) if resp.status_code != 200: webhook_update_task(task_id, "failed", 0, "", error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}") return body_json = resp.json() if body_json.get("code") != 200: webhook_update_task(task_id, "failed", 0, "", error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}") return piapi_task_id = (body_json.get("data") or {}).get("task_id", "") if not piapi_task_id: webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음") return webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨") # 폴링 — GET /task/{id} video_url = None for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}", headers=_headers(), timeout=30) if fetch.status_code != 200: continue fd = fetch.json() data = fd.get("data", {}) status = data.get("status", "") scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})") if status == "Completed": video_url = (data.get("output") or {}).get("video_url", "") break elif status in ("Failed", "failed"): err = (data.get("error") or {}).get("message", "Kling 작업 실패") webhook_update_task(task_id, "failed", 0, "", error=err) return # Pending/Processing/Staged → 계속 폴링 else: webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)") return if not video_url: webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음") return webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...") filename = f"{task_id}.mp4" os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) dl = requests.get(video_url, stream=True, timeout=300) dl.raise_for_status() with open(file_path, "wb") as f: for chunk in dl.iter_content(chunk_size=8192): f.write(chunk) local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url) except requests.Timeout: webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃") except Exception as e: logger.exception("Kling generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e)) ``` ### Step 2: 임포트 smoke Run: `python -c "from providers.kling import run_kling_generation; print('OK')"` Expected: `OK`. ### Step 3: 커밋 ```bash git add services/video-render/providers/kling.py git commit -m "$(cat <<'EOF' feat(video-render): providers/kling.py — Kling AI via PiAPI gateway (SP-7) POST /api/v1/task (model=kling, task_type=video_generation) → GET /api/v1/task/{id} 폴링 (10초 × 60) → data.output.video_url 다운로드. x-api-key 헤더. version 1.5/1.6/2.1/2.5/2.6 지원. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 12: Windows video-render — providers/seedance.py (BytePlus) **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/seedance.py` Seedance 2.0 — BytePlus. `POST /seedance/v1/videos` → `GET /videos/{id}` → output.video_url 다운로드. ### Step 1: `providers/seedance.py` 작성 ```python """Seedance 2.0 video generation — ByteDance Volcano Engine (BytePlus 국제 endpoint). POST https://api.byteplus.com/seedance/v1/videos → GET /videos/{id} 폴링 → output.video_url 다운로드. """ from __future__ import annotations import logging import os import time from typing import Optional import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) SEEDANCE_BASE_URL = "https://api.byteplus.com/seedance/v1" VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") POLL_INTERVAL = 8 # Seedance는 30~120초 POLL_MAX_ATTEMPTS = 60 DEFAULT_MODEL = "seedance-2.0" def _headers() -> dict: api_key = os.getenv("SEEDANCE_API_KEY", "") return { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } def run_seedance_generation(task_id: str, params: dict) -> None: """Seedance로 영상 생성 → mp4 → NAS SMB → webhook.""" try: if not os.getenv("SEEDANCE_API_KEY"): webhook_update_task(task_id, "failed", 0, "", error="SEEDANCE_API_KEY 미설정") return webhook_update_task(task_id, "processing", 5, "Seedance API 호출 중...") body = { "model": params.get("model") or DEFAULT_MODEL, "prompt": params["prompt"][:2000], "resolution": params.get("resolution", "1080p"), "duration": params.get("duration", 5), "aspect_ratio": params.get("aspect_ratio", "16:9"), } if params.get("negative_prompt"): body["negative_prompt"] = params["negative_prompt"] if params.get("image_url"): body["references"] = [{"type": "image", "data": params["image_url"], "role": "subject"}] if params.get("audio") is not None: body["audio"] = bool(params["audio"]) if params.get("seed") is not None: body["seed"] = int(params["seed"]) resp = requests.post(f"{SEEDANCE_BASE_URL}/videos", headers=_headers(), json=body, timeout=30) if resp.status_code not in (200, 201): webhook_update_task(task_id, "failed", 0, "", error=f"Seedance API 오류: {resp.status_code} {resp.text[:300]}") return body_json = resp.json() job_id = body_json.get("id", "") if not job_id: webhook_update_task(task_id, "failed", 0, "", error="Seedance 응답에 id 없음") return webhook_update_task(task_id, "processing", 15, "Seedance 작업 등록됨") # 폴링 video_url = None for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) fetch = requests.get(f"{SEEDANCE_BASE_URL}/videos/{job_id}", headers=_headers(), timeout=30) if fetch.status_code != 200: continue fd = fetch.json() status = fd.get("status", "") scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) webhook_update_task(task_id, "processing", scaled, f"Seedance 생성 중... ({status})") if status == "completed": video_url = (fd.get("output") or {}).get("video_url", "") break elif status == "failed": err = fd.get("error") or "Seedance 작업 실패" webhook_update_task(task_id, "failed", 0, "", error=str(err)[:300]) return else: webhook_update_task(task_id, "failed", 0, "", error="Seedance 폴링 timeout (10분)") return if not video_url: webhook_update_task(task_id, "failed", 0, "", error="Seedance 완료했으나 video_url 없음") return webhook_update_task(task_id, "processing", 85, "Seedance 결과 다운로드 중...") filename = f"{task_id}.mp4" os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) dl = requests.get(video_url, stream=True, timeout=300) dl.raise_for_status() with open(file_path, "wb") as f: for chunk in dl.iter_content(chunk_size=8192): f.write(chunk) local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" webhook_update_task(task_id, "succeeded", 100, "Seedance 생성 완료", video_url=local_url) except requests.Timeout: webhook_update_task(task_id, "failed", 0, "", error="Seedance API 타임아웃") except Exception as e: logger.exception("Seedance generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e)) ``` ### Step 2: 임포트 smoke Run: `python -c "from providers.seedance import run_seedance_generation; print('OK')"` Expected: `OK`. ### Step 3: 커밋 ```bash git add services/video-render/providers/seedance.py git commit -m "$(cat <<'EOF' feat(video-render): providers/seedance.py — Seedance 2.0 BytePlus client (SP-7) POST /seedance/v1/videos → GET /videos/{id} 폴링 (8초 × 60) → output.video_url 다운로드. Bearer 토큰. resolution 1080p/720p/2k, duration 4~15s. references 배열로 image-to-video 지원. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 13: Windows video-render — worker.py + tests **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/worker.py` - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/tests/test_worker.py` Plan-B-Music worker.py 패턴 (string-based dispatch table + getattr). ### Step 1: 실패 테스트 작성 ```python """worker.py — job_type 디스패처 (4 provider).""" import pytest from unittest.mock import patch import worker def test_dispatch_sora_calls_run_sora_generation(): payload = {"task_id": "t1", "job_type": "sora_generation", "params": {"prompt": "x"}} with patch("worker.run_sora_generation") as m: worker._dispatch(payload) m.assert_called_once_with("t1", {"prompt": "x"}) def test_dispatch_veo_calls_run_veo_generation(): payload = {"task_id": "t2", "job_type": "veo_generation", "params": {"prompt": "x"}} with patch("worker.run_veo_generation") as m: worker._dispatch(payload) m.assert_called_once_with("t2", {"prompt": "x"}) def test_dispatch_kling_calls_run_kling_generation(): payload = {"task_id": "t3", "job_type": "kling_generation", "params": {"prompt": "x"}} with patch("worker.run_kling_generation") as m: worker._dispatch(payload) m.assert_called_once_with("t3", {"prompt": "x"}) def test_dispatch_seedance_calls_run_seedance_generation(): payload = {"task_id": "t4", "job_type": "seedance_generation", "params": {"prompt": "x"}} with patch("worker.run_seedance_generation") as m: worker._dispatch(payload) m.assert_called_once_with("t4", {"prompt": "x"}) def test_dispatch_unknown_job_type_logs_error(): payload = {"task_id": "t5", "job_type": "weird_type", "params": {}} with patch("worker.webhook_update_task") as m: worker._dispatch(payload) m.assert_called_once() args = m.call_args[0] assert args[0] == "t5" assert args[1] == "failed" ``` ### Step 2: 테스트 실패 확인 Run: `cd C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render && python -m pytest tests/test_worker.py -v` Expected: FAIL. ### Step 3: `worker.py` 작성 ```python """Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook. queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환). """ from __future__ import annotations import asyncio import json import logging import os import sys import redis.asyncio as aioredis from nas_client import webhook_update_task from providers.sora import run_sora_generation from providers.veo import run_veo_generation from providers.kling import run_kling_generation from providers.seedance import run_seedance_generation logger = logging.getLogger(__name__) REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") QUEUE_KEY = "queue:video-render" PAUSED_KEY = "queue:paused" # string names so `unittest.mock.patch` on `worker.` is correctly intercepted _DISPATCH_TABLE = { "sora_generation": "run_sora_generation", "veo_generation": "run_veo_generation", "kling_generation": "run_kling_generation", "seedance_generation": "run_seedance_generation", } def _dispatch(payload: dict) -> None: """payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap).""" job_type = payload.get("job_type", "") task_id = payload.get("task_id", "") params = payload.get("params", {}) fn_name = _DISPATCH_TABLE.get(job_type) if fn_name is None: logger.error("unknown job_type=%s task=%s", job_type, task_id) webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}") return try: fn = getattr(sys.modules[__name__], fn_name) except AttributeError: logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id) webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}") return fn(task_id, params) async def worker_loop(): redis = aioredis.from_url(REDIS_URL, decode_responses=False) logger.info("video-render worker started (queue=%s)", QUEUE_KEY) while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue item = await redis.blpop(QUEUE_KEY, timeout=1) if item is None: continue _, raw = item try: payload = json.loads(raw) except json.JSONDecodeError: logger.error("invalid queue payload: %r", raw[:200]) continue await asyncio.to_thread(_dispatch, payload) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise except Exception: logger.exception("worker_loop iteration 실패, 5초 후 재시도") await asyncio.sleep(5) ``` ### Step 4: 테스트 통과 Run: `python -m pytest tests/test_worker.py -v` Expected: 5 PASS. ### Step 5: 커밋 ```bash git add services/video-render/worker.py services/video-render/tests/test_worker.py git commit -m "$(cat <<'EOF' feat(video-render): worker.py — Redis BLPOP + 4 job_type dispatch (SP-7) queue:video-render BLPOP, queue:paused 체크 후 dispatch. string-based _DISPATCH_TABLE + getattr (테스트 patch 호환, Plan-B-Music 패턴). AttributeError 가드 포함. asyncio.to_thread로 sync provider wrap. 4 job_type: sora/veo/kling/seedance _generation. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 14: Windows video-render — main.py + services/docker-compose entry **Files:** - Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/main.py` - Modify: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/docker-compose.yml` ### Step 1: `main.py` 작성 ```python """video-render FastAPI entry — health + lifespan (worker loop spawn).""" from __future__ import annotations import asyncio import logging from contextlib import asynccontextmanager from fastapi import FastAPI import worker logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): worker_task = asyncio.create_task(worker.worker_loop()) logger.info("video-render lifespan 시작") try: yield finally: worker_task.cancel() try: await worker_task except asyncio.CancelledError: pass logger.info("video-render lifespan 종료") app = FastAPI(lifespan=lifespan) @app.get("/health") def health(): return {"ok": True, "service": "video-render"} ``` ### Step 2: import smoke Run: `python -c "from main import app; print(len(app.routes))"` Expected: 숫자 출력 (>=1, /health). ### Step 3: docker-compose.yml에 video-render 추가 `C:/Users/jaeoh/Desktop/workspace/web-ai/services/docker-compose.yml`의 music-render service 다음에 추가: ```yaml video-render: build: context: ./video-render container_name: video-render restart: unless-stopped ports: - "18712:8000" environment: - TZ=Asia/Seoul - REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379} - NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18800} - INTERNAL_API_KEY=${INTERNAL_API_KEY:-} - OPENAI_API_KEY=${OPENAI_API_KEY:-} - GOOGLE_PROJECT_ID=${GOOGLE_PROJECT_ID:-} - GOOGLE_LOCATION=${GOOGLE_LOCATION:-us-central1} - GOOGLE_GCS_BUCKET=${GOOGLE_GCS_BUCKET:-} - GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json - PIAPI_API_KEY=${PIAPI_API_KEY:-} - SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-} - VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video} - VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video} volumes: - /mnt/nas/webpage/data/video:/mnt/nas/webpage/data/video - ${GCP_SA_JSON_HOST_PATH:-/etc/webai/gcp-sa.json}:/app/keys/gcp-sa.json:ro healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] interval: 60s timeout: 5s retries: 3 ``` **중요:** - `NAS_BASE_URL` 기본값 18800(video-lab) — Plan-B-Music 학습 적용: `services/.env`에 `NAS_BASE_URL` 없어야 service-local default 적용됨. - `GCP_SA_JSON_HOST_PATH` — 박재오가 host에 service account JSON 두는 경로. `.env`에서 override. ### Step 4: 커밋 ```bash cd C:/Users/jaeoh/Desktop/workspace/web-ai git add services/video-render/main.py services/docker-compose.yml git commit -m "$(cat <<'EOF' feat(video-render): main.py + services/docker-compose entry (SP-7) FastAPI lifespan에서 worker_loop 스폰. /health endpoint. docker-compose: port 18712, NAS_BASE_URL default=18800 (video-lab), 4 provider env (OPENAI_API_KEY, GOOGLE_*, PIAPI_API_KEY, SEEDANCE_API_KEY), GCP service account JSON read-only mount. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) EOF )" ``` --- ## Task 15: NAS push + deployer rebuild + 박재오 빌드 안내 **Files:** - (변경 없음 — push + 박재오 안내) ### Step 1: web-backend push ```bash cd C:/Users/jaeoh/Desktop/workspace/web-backend git push ``` Expected: NAS Gitea로 push 성공. Webhook이 deployer 자동 트리거 → video-lab 신규 컨테이너 빌드 + 시작. ### Step 2: 박재오 NAS 측 확인 안내 박재오에게 다음 검증 요청: ```bash ssh nas docker ps --filter "name=video-lab" # Up 확인 docker logs video-lab --tail 30 # 시작 로그 확인 docker exec video-lab env | grep -E "INTERNAL_API_KEY|REDIS_URL" # env 주입 확인 # health curl https://gahusb.synology.me/api/video/providers # → 4 provider 메타 표시 # NAS frontend 재시작 (nginx /api/video/, /media/video/, /api/internal/video/ 활성화) docker compose -f /volume1/docker/webpage/docker-compose.yml restart frontend ``` ### Step 3: 박재오 Windows AI 머신 빌드 안내 박재오 WSL2 (`/workspace/web-ai/`): ```bash # 1. web-ai push (자격증명 갱신 후 박재오 수동) git push # 2. services/.env에 4 provider key 추가 (NAS_BASE_URL은 라인 추가하지 말 것 — Plan-B-Music 학습) nano /workspace/web-ai/services/.env ``` 수정 후 `services/.env` 예시: ``` REDIS_URL=redis://192.168.45.54:6379 INTERNAL_API_KEY= SUNO_API_KEY=<기존> INSTA_MEDIA_ROOT=/mnt/nas/webpage/data/insta INSTA_MEDIA_URL_PREFIX=/media/insta # 신규 (Plan-B-Video) OPENAI_API_KEY= GOOGLE_PROJECT_ID= GOOGLE_LOCATION=us-central1 GOOGLE_GCS_BUCKET= PIAPI_API_KEY= SEEDANCE_API_KEY= GCP_SA_JSON_HOST_PATH=/etc/webai/gcp-sa.json # 박재오 host에 두는 service account JSON 경로 ``` ```bash # 3. GCP service account JSON 호스트에 배치 sudo mkdir -p /etc/webai sudo cp <박재오 다운받은 gcp-sa.json> /etc/webai/gcp-sa.json sudo chmod 644 /etc/webai/gcp-sa.json # 4. 빌드 + 기동 cd /workspace/web-ai/services docker compose build video-render docker compose up -d video-render docker logs video-render --tail 30 # 5. WSL2 안 health curl -m 3 http://localhost:18712/health # 6. NAS에서 도달 확인 (mirror mode + 방화벽 룰 — 18712 inbound 필요) ``` ### Step 4: Windows 방화벽 18712 inbound 룰 추가 박재오 Windows PowerShell (관리자): ```powershell New-NetFirewallRule -DisplayName "video-render-inbound-18712" ` -Direction Inbound -Protocol TCP -LocalPort 18712 ` -Action Allow -Profile Any -Enabled True ``` (Plan-B-Music에서 mirror mode 영구 적용 + 18711 inbound 룰 추가 학습. video-render는 sync forward 없으므로 사실 inbound 룰 불필요 — Windows pulls from Redis. 안전 차원에서만 추가, optional.) ### Step 5: NAS → Windows redis 도달 확인 (worker가 BLPOP 중) ```bash ssh nas docker logs video-render -f & # 박재오 머신에서 따로 보고 # NAS에서 docker exec redis redis-cli LLEN queue:video-render # → 0 (큐 비어 있으면 worker가 BLPOP 중) ``` 이 task의 push + 빌드는 박재오 머신에서 직접. controller는 push 시도 후 박재오 검증 결과 대기. --- ## Task 16: end-to-end 검증 — Kling 1 트랙 **Files:** - (변경 없음 — 검증만) Kling이 가장 단순한 API (PiAPI x-api-key + 표준 task pattern) — 1차 검증 권장. ### Step 1: Kling 1 영상 생성 트리거 ```bash curl -X POST https://gahusb.synology.me/api/video/generate \ -H "Content-Type: application/json" \ -d '{ "provider": "kling", "model": "2.6", "prompt": "A serene mountain landscape with morning mist", "duration": 5, "aspect_ratio": "16:9", "mode": "std" }' # → {"task_id": "", "provider": "kling"} ``` ### Step 2: Windows worker 로그 확인 (별도 터미널) ```bash docker logs video-render -f ``` 기대 시퀀스: ``` ... INFO Kling API 호출 중... ... INFO Kling 작업 등록됨 ... POST http://192.168.45.54:18800/api/internal/video/update "HTTP/1.1 200 OK" ← 18800! ... INFO Kling 생성 중... (Processing) ... INFO Kling 결과 다운로드 중... ... INFO Kling 생성 완료 ``` ### Step 3: NAS DB 상태 폴링 ```bash curl -s https://gahusb.synology.me/api/video/tasks/ | python3 -m json.tool # 기대: status=succeeded, video_url=/media/video/.mp4 ``` ### Step 4: mp4 파일 + 재생 ```bash ls -la /volume1/docker/webpage/data/video/.mp4 # 또는 브라우저: https://gahusb.synology.me/media/video/.mp4 ``` ### Step 5: Sora·Veo·Seedance 회귀 (선택) 각 provider 1 트랙씩 동일 명령 (`provider`만 변경) — 4 provider 모두 succeeded 확인 시 plan 완전 통과. --- ## Task 17: 메모리 기록 + 최종 정리 + push 확인 **Files:** - Create: `C:/Users/jaeoh/.claude/projects/C--Users-jaeoh-Desktop-workspace-web-ui/memory/reference_plan_b_video_complete.md` - Modify: `C:/Users/jaeoh/.claude/projects/C--Users-jaeoh-Desktop-workspace-web-ui/memory/MEMORY.md` ### Step 1: `reference_plan_b_video_complete.md` 작성 ```markdown --- name: plan-b-video-complete description: 2026-05-19 Plan-B-Video 완료 - NAS video-lab 신설 + Windows video-render 4 provider (Sora 2/Veo 3.1/Kling via PiAPI/Seedance 2.0) metadata: type: reference --- Plan-B-Video 2026-05-19 완료. 17 task. spec §10 SP-7 4 provider로 축소 변경 (Runway/Pika/Luma 제외, Seedance 추가). ## 구조 NAS video-lab (신규 컨테이너, port 18800): - POST /api/video/generate → Redis RPUSH queue:video-render - GET /api/video/tasks/{id} - POST /api/internal/video/update (X-Internal-Key) Windows video-render (port 18712): - worker: Redis BLPOP queue:video-render → job_type dispatch (4 provider) - providers/{sora,veo,kling,seedance}.py - 결과 mp4 → /mnt/nas/webpage/data/video/{task_id}.mp4 ## 4 Provider 특징 | Provider | Auth | 특이사항 | |----------|------|---------| | Sora 2 | OPENAI_API_KEY Bearer | ⚠️ Deprecated 2026-09-24 — alternative 필요 시기 도래 | | Veo 3.1 | gcloud Bearer (service account JSON) | GCS bucket 경유. google-cloud-storage SDK로 gs://...mp4 다운로드 후 SMB write | | Kling | x-api-key (PiAPI gateway) | api.piapi.ai 경유. 직접 KlingAI API는 JWT 필요라 더 복잡 | | Seedance 2.0 | SEEDANCE_API_KEY Bearer | BytePlus 국제 endpoint (api.byteplus.com) | ## Plan-B-Music에서 학습한 함정 — 모두 사전 적용 1. WSL2 mirror mode: 이미 영구 적용 (`.wslconfig networkingMode=mirrored`) 2. Redis chown 999:999: 이미 영구 적용 3. services/.env의 NAS_BASE_URL: 없는 상태 유지. video-render compose default 18800 적용 ## 다음 plan Plan-B-Infra (SP-9 NSSM + SP-10 task-watcher) — Windows 자동 시작 + 시간대 분기 정책. ``` ### Step 2: MEMORY.md 인덱스에 한 줄 추가 `reference_plan_b_music_complete.md` 항목 뒤에: ```markdown - [Plan-B-Video 완료](reference_plan_b_video_complete.md) — 2026-05-19 video-lab 신설 + 4 provider (Sora/Veo/Kling/Seedance). spec §10 SP-7 갱신 (6→4 provider) ``` ### Step 3: 최종 push 확인 ```bash cd C:/Users/jaeoh/Desktop/workspace/web-backend && git status && git log --oneline -10 cd C:/Users/jaeoh/Desktop/workspace/web-ai && git status && git log --oneline -10 ``` Expected: 두 저장소 clean + 최신 commits 모두 push. ### Step 4: 박재오 측 종합 보고 박재오에게: - 4 provider end-to-end 통과 확인 - ⚠️ Sora 2 API shutdown 2026-09-24 — 4개월 안에 alternative 결정 필요 - Veo GCS bucket 자동 정리 (TTL lifecycle rule) 권장 (mp4가 GCS에 영구 잔존하지 않도록) - frontend video UI는 별도 follow-up --- ## Self-Review **1. Spec coverage** | Spec 요구사항 | 구현 위치 | 상태 | |--------------|-----------|------| | SP-7 §10: 영상 provider gateway (6→4 갱신) | Task 9~12 (providers/sora·veo·kling·seedance) | ✓ | | SP-7 §10: provider 선택은 payload에서 | Task 5 GenerateRequest.provider + Task 13 dispatch | ✓ | | SP-7 §10: 각 외부 API + mp4 다운로드 → /mnt/nas/data/video/ | Task 9~12 _download | ✓ | | SP-7 §10: NAS webhook | Task 8 nas_client + Task 13 worker | ✓ | | SP-8 §10: 새 docker 컨테이너 | Task 1~6 | ✓ | | SP-8 §10: POST /api/video/generate | Task 5 main.py | ✓ | | SP-8 §10: GET /api/video/tasks/{id} | Task 5 main.py | ✓ | | SP-8 §10: video_tasks 테이블 sqlite | Task 2 db.py | ✓ | | SP-8 §10: POST /api/internal/video/update | Task 4 internal_router | ✓ | | SP-8 §10: Dockerfile + requirements + compose entry | Task 1, 6 | ✓ | | §5 Windows Render Worker 패턴 | Task 13 worker_loop | ✓ | | §6 Redis 큐 + payload 표준 | Task 5 _push_render_job | ✓ | | §8 3-layer 차단 (LAN allow + deny all + X-Internal-Key) | Task 7 nginx | ✓ | | §9 외부 API 키 Windows .env 단독 | Task 8 .env.example + Task 14 compose | ✓ | | 박재오 결정 "4 provider 일괄 이전" | Task 9~12 | ✓ | | Plan-B-Music 3가지 함정 사전 인지 | Task 14 service-local default + 메모리 | ✓ | **전체 spec coverage**: 100%. 변경된 4 provider 명시 + spec §10 SP-7 갱신. **2. Placeholder scan**: 통과. 모든 코드 block 완전. `` 등 placeholder는 박재오 .env 입력용으로 명시적 (placeholder 아님). **3. Type consistency**: - `webhook_update_task(task_id, status, progress, message, video_url, error)` — Task 8에 정의, Task 9~12에서 일관 호출 - `_push_render_job(task_id, job_type, params)` — Task 5에 정의 - `_dispatch(payload)` — Task 13. payload[task_id/job_type/params] 사용 (Task 5 push 형식과 일치) - `job_type` 4종: `sora_generation`, `veo_generation`, `kling_generation`, `seedance_generation` — Task 5에서 생성, Task 13 _DISPATCH_TABLE에서 매핑 일관 - `UpdatePayload` (video_lab/internal_router) vs webhook payload — task_id/status/progress/message/video_url/error 일치 **4. 누락 검토**: - nginx 차단 (`/api/internal/video/`): Task 7 ✓ - frontend depends_on + volume mount: Task 6 ✓ - 박재오 빌드 안내 (T15 + GCP_SA_JSON_HOST_PATH): ✓ - 18712 inbound 룰 — Task 15 Step 4 optional 명시 ✓ Plan 자체 완성. 박재오에게 execution choice 제시. --- ## 부록 — 알려진 limitation + follow-up **알려진 한계:** 1. **Sora 2 deprecated 2026-09-24** — 4개월 시한. 이전 alternative: Pika, Runway Gen-3, 또는 새 Sora 3가 출시되면 migrate 2. **Veo GCS bucket 비용** — mp4가 GCS에 영구 잔존하면 storage 비용 누적. lifecycle rule (예: 30일 후 자동 삭제) 권장 3. **Kling PiAPI 의존성** — 직접 KlingAI 계약 후 native API로 migrate 가능 시 PIAPI 중간 layer 제거 4. **Frontend UI 미포함** — backend gateway만. 영상 갤러리 페이지는 별도 plan **Follow-up:** - Sora alternative migration (2026-08 안에) - GCS lifecycle policy 적용 - frontend `/video` 페이지 (영상 생성 UI + 갤러리 + 폴링) - Plan-B-Infra (SP-9 NSSM + SP-10 task-watcher) — Plan-B-Video 후 진행 --- ## 부록 — 4 Provider API 키 발급 가이드 박재오가 미보유 상태에서 구현 → 검증 단계(Task 15~16)에서 API 키 발급 + .env 입력 후 진행. 각 provider 가입·결제·키 발급 흐름. ### 1. Sora 2 (OpenAI) **대시보드**: https://platform.openai.com **발급 흐름**: 1. https://platform.openai.com/signup 가입 (이미 계정 있으면 로그인) 2. https://platform.openai.com/account/billing/overview → 결제 수단 등록 (Sora 2는 prepaid credit 또는 monthly plan) 3. **Sora 2 API tier access 필요** — 2026년 5월 현재 Tier 1 이상에 제공 (최소 $5 사용 후 자동 unlock 또는 별도 신청) 4. https://platform.openai.com/api-keys → "Create new secret key" → `sk-proj-...` 복사 5. `.env`의 `OPENAI_API_KEY=sk-proj-...` 입력 **가격 (2026-05 기준)**: - sora-2 (720p 8초): ~$0.50 - sora-2-pro (1080p 8초): ~$1.50 **⚠️ 주의**: Sora 2 API는 2026-09-24 shutdown 예정. 4개월 시한 인지 후 사용. ### 2. Veo 3.1 (Google Vertex AI) **대시보드**: https://console.cloud.google.com **발급 흐름** (가장 복잡 — GCP 프로젝트 + 서비스 계정 + GCS bucket 셋업 필요): ```bash # (1) GCP CLI 설치 (Windows PowerShell) # https://cloud.google.com/sdk/docs/install 에서 다운로드 gcloud init # (2) 프로젝트 생성 (또는 기존 사용) gcloud projects create my-veo-project --name="Veo Project" gcloud config set project my-veo-project # (3) 결제 계정 연결 (필수, Console UI에서) # https://console.cloud.google.com/billing → 프로젝트에 결제 계정 link # (4) Vertex AI API 활성화 gcloud services enable aiplatform.googleapis.com # (5) GCS bucket 생성 (Veo 결과 저장) gcloud storage buckets create gs://my-veo-output --location=us-central1 # .env의 GOOGLE_GCS_BUCKET=my-veo-output # (6) 서비스 계정 생성 + 권한 부여 + JSON 키 다운로드 gcloud iam service-accounts create veo-worker --display-name="Veo Worker" gcloud projects add-iam-policy-binding my-veo-project \ --member="serviceAccount:veo-worker@my-veo-project.iam.gserviceaccount.com" \ --role="roles/aiplatform.user" gcloud projects add-iam-policy-binding my-veo-project \ --member="serviceAccount:veo-worker@my-veo-project.iam.gserviceaccount.com" \ --role="roles/storage.objectAdmin" gcloud iam service-accounts keys create ~/gcp-sa.json \ --iam-account=veo-worker@my-veo-project.iam.gserviceaccount.com # (7) JSON 키를 박재오 Windows AI WSL2 host에 복사 # 예: scp ~/gcp-sa.json or USB transfer sudo mkdir -p /etc/webai sudo mv ~/gcp-sa.json /etc/webai/gcp-sa.json sudo chmod 644 /etc/webai/gcp-sa.json ``` `.env` 입력: ``` GOOGLE_PROJECT_ID=my-veo-project GOOGLE_LOCATION=us-central1 GOOGLE_GCS_BUCKET=my-veo-output GCP_SA_JSON_HOST_PATH=/etc/webai/gcp-sa.json ``` **가격 (2026-05)**: - Veo 3.1 Fast (1080p 8초): ~$0.40 - Veo 3.1 Generate (1080p 8초): ~$0.75 **GCS lifecycle rule 권장** (mp4 자동 삭제로 비용 절감): ```bash gcloud storage buckets update gs://my-veo-output \ --lifecycle-file=<(echo '{"lifecycle":{"rule":[{"action":{"type":"Delete"},"condition":{"age":30}}]}}') ``` → 30일 후 자동 삭제. ### 3. Kling (PiAPI gateway) **대시보드**: https://piapi.ai **발급 흐름** (가장 단순 — 직접 KlingAI는 중국 KYC 필요라 PiAPI 권장): 1. https://piapi.ai 가입 2. Dashboard → Billing → credit 충전 (최소 $10) 3. Dashboard → API Keys → "Create API Key" → 복사 4. `.env`의 `PIAPI_API_KEY=...` 입력 **Kling credit 활성화**: PiAPI는 Kling 외 다른 model도 지원 — 따로 enable 필요 없이 같은 API key로 호출. **가격 (2026-05 PiAPI 기준)**: - Kling v2.6 std 5초: ~$0.30 - Kling v2.6 pro 10초: ~$1.20 **대안 (네이티브 KlingAI)**: https://klingai.com/global/dev — 직접 API key 발급 가능하지만 중국 회원 가입 + JWT 인증 필요. PiAPI가 인프라 정착 빠름. ### 4. Seedance 2.0 (BytePlus) **대시보드**: https://console.byteplus.com **발급 흐름**: 1. https://www.byteplus.com 가입 (국제 사용자) — 중국 사용자는 https://www.volcengine.com (Volcengine, RMB) 2. KYC 인증 (기업/개인 ID 확인) 3. Console → "AI/ML" → "Seedance" 메뉴 → API access 신청 (대기 시간 ~24h 가능) 4. 승인 후 Access Key + Secret Key 발급, 또는 단순 Bearer API Key 발급 (UI에서 선택) 5. `.env`의 `SEEDANCE_API_KEY=...` 입력 **가격 (2026-05 BytePlus)**: - Seedance 2.0 (720p 5초): ~$0.05 - Seedance 2.0 (1080p 10초): ~$0.20 **참고 문서**: https://www.byteplus.com/en/product/seedance ### 박재오 측 발급 우선순위 권장 | 우선순위 | Provider | 사유 | |---|---|---| | 1 | **Kling (PiAPI)** | 발급 5분, 즉시 사용 가능 — 검증에 가장 적합 | | 2 | **Seedance** | 저렴 ($0.05~0.20), 발급 1일, 4 provider 검증 다양화 | | 3 | **Sora 2** | OpenAI 계정 있다면 즉시 + tier 활성화. ⚠️ 4개월 후 deprecated | | 4 | **Veo 3.1** | GCP 셋업 부담 크지만 품질 + native audio 강점. 안정화 후 발급 | **Task 16 (end-to-end 검증)은 Kling 1 트랙으로 시작 권장** (가장 쉬운 발급 + 짧은 polling). 그 후 발급되는 대로 다른 provider 회귀. ### API 키 입력 시점 Plan T1~T14까지 (NAS + Windows 코드 작성) 완료 후 **T15 단계에서** 박재오가 `.env`에 키 입력. T15 박재오 빌드 안내 step 2 참고. 키 없이도 컨테이너는 정상 시작됨 (단, 호출 시 worker가 즉시 "API_KEY 미설정" failed 보고). 키 누락 시 worker 동작 확인용 안전 fallback이 각 provider 코드에 이미 들어 있음: ```python if not os.getenv("OPENAI_API_KEY"): webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)") return ``` → NAS DB에 즉시 "failed + 키 미설정" 표시되어 디버깅 명확.