Files
web-page-backend/docs/superpowers/plans/2026-05-19-plan-b-video-render.md
gahusb 211aff1e45 docs(plan): Plan-B-Video port 18800 → 18801 (realestate-lab 충돌)
T6 implementer가 발견: realestate-lab이 이미 18800 점유.
video-lab 포트를 18801로 정정. plan 18 occurrence 일괄 변경.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:32:56 +09:00

2574 lines
89 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Plan-B-Video — NAS video-lab 신설 + Windows video-render Worker Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 4 외부 영상 생성 API (Sora 2, Veo 3.1, Kling via PiAPI, Seedance 2.0)을 Windows AI 머신(WSL2 Docker) `video-render` worker로 분산. NAS에 새 `video-lab` 컨테이너를 신설하여 게이트(메타·DB·Redis push)만 담당, Windows worker가 4 provider 호출 + mp4 NAS SMB 저장 + webhook으로 NAS DB 업데이트.
**Architecture:** NAS gateway → Redis RPUSH `queue:video-render` (`job_type=sora_generation|veo_generation|kling_generation|seedance_generation`) → Windows BLPOP → provider dispatch → 외부 API 호출 + 결과 mp4 다운로드 (Veo는 GCS → SMB 변환) → `/mnt/nas/webpage/data/video/{task_id}.mp4` → HTTP POST `/api/internal/video/update` (X-Internal-Key, 3-layer 차단). 사용자는 폴링 (`GET /api/video/tasks/{task_id}`).
**Tech Stack:** Python 3.12 / FastAPI / `redis>=5.0` async / `requests` / `httpx` / `google-cloud-storage` (Veo GCS 다운로드) / Docker Engine in WSL2 Ubuntu 24.04 / cifs SMB to NAS / SQLite
**Spec:** `web-backend/docs/superpowers/specs/2026-05-18-nas-windows-distributed-architecture-design.md` §10 SP-7·SP-8, §5 Windows Render Worker 패턴, §6 Redis 큐, §8 internal webhook + auth. **Spec §10 SP-7 갱신 (2026-05-19 박재오 결정)**: 6 provider(Runway·Sora·Veo·Pika·Kling·Luma) → **4 provider (Sora 2 · Veo 3.1 · Kling via PiAPI · Seedance 2.0)**. Runway/Pika/Luma 제외, Seedance 추가.
**Prerequisites (✅ 모두 완료):**
- Plan-A: web-ai/NAS 캐시 강화
- Plan-B-Base: NAS Redis + Windows WSL2/Docker/SMB
- Plan-B-Insta: NAS→Windows 분산 패턴 검증
- Plan-B-Music: 3가지 인프라 함정 해결 (WSL2 mirror mode + Redis chown 999:999 + services/.env NAS_BASE_URL 분기). 모두 영구 적용 — 재발 위험 0.
**Plan-B-Music과의 차이:**
1. **NAS 측 신설** — 기존 코드 변경 없음, video-lab 컨테이너 from-scratch 작성
2. **4 provider 모두 다른 API** — Sora(Bearer), Veo(gcloud Bearer + GCS), Kling(PiAPI x-api-key), Seedance(BytePlus Bearer). 단일 base URL 아님
3. **sync helper 없음** — 모두 async (영상은 즉시 응답 endpoint 없음)
4. **library/_sync_with_disk 없음** — video는 task 단위 단일 추적. music-lab 같은 자동 등록 불필요
5. **Veo GCS 우회** — 결과가 `gs://bucket/path/sample_0.mp4`로 떨어짐. worker가 GCS Python SDK로 다운로드 후 SMB write 추가 단계
---
## Phase 구조
| Phase | 내용 | Task |
|-------|------|------|
| **1. NAS video-lab 신설** (수신부 + gateway) | Dockerfile + db + auth + internal_router + main + compose + nginx | 1~7 |
| **2. Windows video-render** | Dockerfile + nas_client + 4 providers + worker + main + services compose | 8~14 |
| **3. NAS push + 박재오 빌드 + end-to-end** | 푸시 + 박재오 측 .env 작성 + 빌드 + Kling 1트랙 검증 + 메모리 | 15~17 |
**중요 순서:** Phase 1 먼저 (video-lab 신설 + nginx 차단) → Phase 2 (Windows worker 신설) → Phase 3 (전환 + 검증). Phase 1과 2는 NAS·Windows 다른 머신이라 part-parallel 가능하지만 single-controller 진행 시 순차.
---
## File Structure
### Phase 1 — NAS web-backend (모두 신규)
| 파일 | 변경 | 책임 |
|------|------|------|
| `web-backend/video-lab/Dockerfile` (Create) | python:3.12-alpine | image |
| `web-backend/video-lab/requirements.txt` (Create) | fastapi, uvicorn, redis>=5.0 | deps |
| `web-backend/video-lab/app/__init__.py` (Create) | 빈 marker | package |
| `web-backend/video-lab/app/db.py` (Create) | video_tasks 테이블 sqlite + CRUD | persistence |
| `web-backend/video-lab/app/auth.py` (Create) | `verify_internal_key` dependency | X-Internal-Key 검증 |
| `web-backend/video-lab/app/internal_router.py` (Create) | `POST /api/internal/video/update` | Windows webhook 수신 |
| `web-backend/video-lab/app/main.py` (Create) | FastAPI + redis client + `POST /api/video/generate` + `GET /api/video/tasks/{id}` | API gateway |
| `web-backend/video-lab/tests/__init__.py` (Create) | 빈 marker | tests pkg |
| `web-backend/video-lab/tests/test_auth.py` (Create) | 3 tests | TDD |
| `web-backend/video-lab/tests/test_internal_router.py` (Create) | 5 tests | TDD |
| `web-backend/docker-compose.yml` (Modify) | video-lab service 추가 (port 18801) + depends_on redis | compose |
| `web-backend/nginx/default.conf` (Modify) | `/api/video/` proxy + `/media/video/` alias + `/api/internal/video/` 3-layer 차단 | routing |
### Phase 2 — Windows web-ai/services/
| 파일 | 변경 | 책임 |
|------|------|------|
| `web-ai/services/video-render/Dockerfile` (Create) | python:3.12-slim + google-cloud-storage 의존성 | image |
| `web-ai/services/video-render/requirements.txt` (Create) | fastapi, requests, redis, httpx, google-cloud-storage, openai | deps |
| `web-ai/services/video-render/.env.example` (Create) | OPENAI_API_KEY, GOOGLE_PROJECT_ID, GOOGLE_LOCATION, GOOGLE_GCS_BUCKET, GOOGLE_APPLICATION_CREDENTIALS, PIAPI_API_KEY, SEEDANCE_API_KEY, VIDEO_MEDIA_ROOT, MUSIC_MEDIA_URL_PREFIX | secrets |
| `web-ai/services/video-render/nas_client.py` (Create) | `webhook_update_task` (Plan-B-Music nas_client 동일 패턴) | webhook adapter |
| `web-ai/services/video-render/providers/__init__.py` (Create) | 빈 marker | package |
| `web-ai/services/video-render/providers/sora.py` (Create) | `run_sora_generation` (OpenAI Bearer) | Sora 2 client |
| `web-ai/services/video-render/providers/veo.py` (Create) | `run_veo_generation` (gcloud + GCS 다운로드) | Veo 3.1 client |
| `web-ai/services/video-render/providers/kling.py` (Create) | `run_kling_generation` (PiAPI x-api-key) | Kling client |
| `web-ai/services/video-render/providers/seedance.py` (Create) | `run_seedance_generation` (BytePlus Bearer) | Seedance client |
| `web-ai/services/video-render/worker.py` (Create) | Redis BLPOP + job_type 디스패처 4종 + tests | dispatcher |
| `web-ai/services/video-render/main.py` (Create) | FastAPI app + lifespan(worker spawn) + /health | entry |
| `web-ai/services/video-render/tests/test_nas_client.py` (Create) | 5 tests | TDD |
| `web-ai/services/video-render/tests/test_worker.py` (Create) | 5 tests (4 job_type + unknown) | TDD |
| `web-ai/services/docker-compose.yml` (Modify) | video-render service 추가 (port 18712) + GCS secret mount | compose |
---
## Task 1: NAS video-lab — Dockerfile + requirements + app package skeleton
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/Dockerfile`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/requirements.txt`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/__init__.py`
### Step 1: `requirements.txt` 작성
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/requirements.txt`:
```
fastapi==0.115.6
uvicorn[standard]==0.30.6
redis>=5.0
pytest>=8.0.0
pytest-asyncio>=0.21
httpx>=0.27.0
```
### Step 2: `Dockerfile` 작성
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/Dockerfile`:
```dockerfile
FROM python:3.12-alpine
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
```
### Step 3: 빈 `app/__init__.py`
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/__init__.py`:
(빈 파일, 0 bytes)
### Step 4: 커밋
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-backend
git add video-lab/Dockerfile video-lab/requirements.txt video-lab/app/__init__.py
git commit -m "$(cat <<'EOF'
feat(video-lab): Dockerfile + requirements + app package skeleton (SP-8)
NAS video-lab 신설. python:3.12-alpine 기반. redis>=5.0 의존성.
영상 외부 호출 없음(gateway만) — 외부 API 의존 없음.
Plan-B-Video Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
## Context
- NAS video-lab은 외부 영상 API 호출 코드를 **갖지 않음** (spec §9). Redis push + webhook 수신만.
- music-lab Dockerfile (`python:3.12-alpine`)과 동일 패턴.
- 작업 디렉토리: `C:/Users/jaeoh/Desktop/workspace/web-backend`
## Report
- Status: DONE
- 생성한 파일 3개
- 커밋 SHA
---
## Task 2: NAS video-lab — app/db.py (video_tasks 테이블)
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/db.py`
video_tasks 단일 테이블. music-lab의 music_tasks와 같은 패턴 + provider 컬럼.
### Step 1: `app/db.py` 작성
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/db.py`:
```python
"""SQLite persistence for video_tasks. Single table — task 단위 추적만."""
from __future__ import annotations
import json
import os
import sqlite3
from contextlib import contextmanager
from typing import Any, Dict, Optional
DB_PATH = os.path.join(os.getenv("VIDEO_DATA_DIR", "/app/data"), "video.db")
@contextmanager
def _conn():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with _conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS video_tasks (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
params TEXT NOT NULL,
status TEXT DEFAULT 'queued',
progress INTEGER DEFAULT 0,
message TEXT DEFAULT '',
video_url TEXT,
error TEXT,
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
"""
)
def _row_to_dict(row) -> Dict[str, Any]:
return {
"id": row["id"],
"provider": row["provider"],
"params": row["params"],
"status": row["status"],
"progress": row["progress"],
"message": row["message"],
"video_url": row["video_url"],
"error": row["error"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def create_task(task_id: str, provider: str, params: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO video_tasks (id, provider, params) VALUES (?, ?, ?)",
(task_id, provider, json.dumps(params)),
)
row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row)
def update_task(
task_id: str,
status: str,
progress: int,
message: str = "",
video_url: Optional[str] = None,
error: Optional[str] = None,
) -> None:
with _conn() as conn:
conn.execute(
"""
UPDATE video_tasks
SET status = ?, progress = ?, message = ?, video_url = ?, error = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id = ?
""",
(status, progress, message, video_url, error, task_id),
)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row) if row else None
```
### Step 2: smoke test
Run: `cd C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab && python -c "from app import db; db.init_db(); t = db.create_task('test-1', 'sora', {'prompt': 'x'}); print(db.get_task('test-1')); db.update_task('test-1', 'succeeded', 100, video_url='/media/video/test-1.mp4'); print(db.get_task('test-1'))"`
Expected: 두 줄 dict 출력. 두 번째에 status="succeeded", video_url 포함.
(테스트용 `/app/data/video.db`가 생성될 수 있음 — Windows 환경에서 권한 문제 가능. `VIDEO_DATA_DIR` env로 임시 디렉토리 지정 권장.)
### Step 3: 커밋
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-backend
git add video-lab/app/db.py
git commit -m "$(cat <<'EOF'
feat(video-lab): app/db.py — video_tasks 테이블 + CRUD (SP-8)
WAL + busy_timeout 표준 fix. create_task / update_task / get_task.
provider 컬럼 추가(Sora/Veo/Kling/Seedance 구분). video_url 필드.
Plan-B-Video Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 3: NAS video-lab — auth.py + tests
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/auth.py`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/__init__.py`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_auth.py`
Plan-B-Music/Insta와 동일 패턴 복제.
### Step 1: 실패 테스트 작성
`tests/__init__.py`: (빈 파일)
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_auth.py`:
```python
"""verify_internal_key — Windows video-render webhook 인증."""
import pytest
from fastapi import HTTPException
from app.auth import verify_internal_key
def test_valid_key_passes(monkeypatch):
monkeypatch.setenv("INTERNAL_API_KEY", "secret123")
verify_internal_key(x_internal_key="secret123")
def test_invalid_key_raises_401(monkeypatch):
monkeypatch.setenv("INTERNAL_API_KEY", "secret123")
with pytest.raises(HTTPException) as exc:
verify_internal_key(x_internal_key="wrong")
assert exc.value.status_code == 401
def test_missing_env_key_raises_401(monkeypatch):
monkeypatch.delenv("INTERNAL_API_KEY", raising=False)
with pytest.raises(HTTPException) as exc:
verify_internal_key(x_internal_key="any")
assert exc.value.status_code == 401
```
### Step 2: 테스트 실패 확인
Run: `cd C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab && python -m pytest tests/test_auth.py -v`
Expected: FAIL — `app.auth` 미존재.
### Step 3: `app/auth.py` 작성
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/auth.py`:
```python
"""SP-8 — Windows worker → NAS video-lab internal webhook 인증.
X-Internal-Key 헤더를 .env의 INTERNAL_API_KEY와 비교.
서버 측 키 미설정 시 401 (안전한 기본값).
"""
from __future__ import annotations
import os
from fastapi import Header, HTTPException
def verify_internal_key(x_internal_key: str = Header(...)):
expected = os.getenv("INTERNAL_API_KEY")
if not expected:
raise HTTPException(401, "INTERNAL_API_KEY not configured on server")
if x_internal_key != expected:
raise HTTPException(401, "Invalid X-Internal-Key")
```
### Step 4: 테스트 통과
Run: `python -m pytest tests/test_auth.py -v`
Expected: 3 PASS.
### Step 5: 커밋
```bash
git add video-lab/app/auth.py video-lab/tests/__init__.py video-lab/tests/test_auth.py
git commit -m "$(cat <<'EOF'
feat(video-lab): verify_internal_key + tests (SP-8)
X-Internal-Key 검증 dependency. insta-lab/music-lab 동일 패턴.
Plan-B-Video Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 4: NAS video-lab — internal_router.py + tests
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/internal_router.py`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_internal_router.py`
### Step 1: 실패 테스트 작성
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/tests/test_internal_router.py`:
```python
"""POST /api/internal/video/update — Windows video-render webhook."""
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from app.internal_router import router
from app import db
@pytest.fixture(autouse=True)
def _set_key(monkeypatch):
monkeypatch.setenv("INTERNAL_API_KEY", "test-secret")
@pytest.fixture
def client(tmp_path, monkeypatch):
monkeypatch.setenv("VIDEO_DATA_DIR", str(tmp_path))
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "test_video.db"))
db.init_db()
app = FastAPI()
app.include_router(router)
return TestClient(app)
def _make_task():
tid = "video-task-1"
db.create_task(tid, "sora", {"prompt": "test"})
return tid
def test_update_with_valid_key_updates_db(client):
tid = _make_task()
r = client.post(
"/api/internal/video/update",
headers={"X-Internal-Key": "test-secret"},
json={"task_id": tid, "status": "processing", "progress": 30, "message": "downloading"},
)
assert r.status_code == 200
task = db.get_task(tid)
assert task["status"] == "processing"
assert task["progress"] == 30
assert task["message"] == "downloading"
def test_update_with_invalid_key_returns_401(client):
tid = _make_task()
r = client.post(
"/api/internal/video/update",
headers={"X-Internal-Key": "wrong"},
json={"task_id": tid, "status": "processing", "progress": 30},
)
assert r.status_code == 401
def test_update_succeeded_with_video_url(client):
tid = _make_task()
r = client.post(
"/api/internal/video/update",
headers={"X-Internal-Key": "test-secret"},
json={
"task_id": tid, "status": "succeeded", "progress": 100,
"message": "완료", "video_url": "/media/video/video-task-1.mp4",
},
)
assert r.status_code == 200
task = db.get_task(tid)
assert task["status"] == "succeeded"
assert task["video_url"] == "/media/video/video-task-1.mp4"
def test_update_failed_records_error(client):
tid = _make_task()
r = client.post(
"/api/internal/video/update",
headers={"X-Internal-Key": "test-secret"},
json={"task_id": tid, "status": "failed", "progress": 0, "error": "Sora API rate limit"},
)
assert r.status_code == 200
task = db.get_task(tid)
assert task["status"] == "failed"
assert "Sora" in (task.get("error") or "")
def test_update_unknown_task_returns_404(client):
r = client.post(
"/api/internal/video/update",
headers={"X-Internal-Key": "test-secret"},
json={"task_id": "nonexistent", "status": "processing", "progress": 10},
)
assert r.status_code == 404
```
### Step 2: 테스트 실패 확인
Run: `python -m pytest tests/test_internal_router.py -v`
Expected: FAIL — `app.internal_router` 미존재.
### Step 3: `app/internal_router.py` 작성
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/internal_router.py`:
```python
"""SP-8 — Windows video-render → NAS video-lab internal webhook.
POST /api/internal/video/update
- X-Internal-Key 인증 필수
- video_tasks row update (status, progress, message, video_url, error)
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from . import db
from .auth import verify_internal_key
logger = logging.getLogger(__name__)
router = APIRouter()
class UpdatePayload(BaseModel):
task_id: str
status: str = Field(..., description="processing|succeeded|failed")
progress: int = Field(..., ge=0, le=100)
message: str = ""
video_url: Optional[str] = None
error: Optional[str] = None
@router.post(
"/api/internal/video/update",
dependencies=[Depends(verify_internal_key)],
)
def video_update(payload: UpdatePayload):
task = db.get_task(payload.task_id)
if task is None:
raise HTTPException(404, f"task not found: {payload.task_id}")
db.update_task(
payload.task_id,
payload.status,
payload.progress,
message=payload.message,
video_url=payload.video_url,
error=payload.error,
)
logger.info(
"internal/video/update task=%s status=%s progress=%d",
payload.task_id, payload.status, payload.progress,
)
return {"ok": True}
```
### Step 4: 테스트 통과
Run: `python -m pytest tests/test_internal_router.py -v`
Expected: 5 PASS.
### Step 5: 커밋
```bash
git add video-lab/app/internal_router.py video-lab/tests/test_internal_router.py
git commit -m "$(cat <<'EOF'
feat(video-lab): /api/internal/video/update endpoint + tests (SP-8)
UpdatePayload schema (task_id/status/progress/message/video_url/error).
404 if task not found. insta/music-lab과 동일 패턴 + video_url 필드.
Plan-B-Video Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 5: NAS video-lab — main.py (FastAPI + redis client + 2 endpoint)
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/main.py`
spec §10 SP-8 명시 2 endpoint: `POST /api/video/generate` + `GET /api/video/tasks/{id}`.
### Step 1: `app/main.py` 작성
`C:/Users/jaeoh/Desktop/workspace/web-backend/video-lab/app/main.py`:
```python
"""FastAPI entrypoint for video-lab.
POST /api/video/generate — provider + params → Redis push → task_id 반환
GET /api/video/tasks/{id} — DB 조회
"""
from __future__ import annotations
import json
import logging
import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from . import db
from .internal_router import router as internal_router
logger = logging.getLogger(__name__)
CORS_ALLOW_ORIGINS = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080")
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
redis_client = aioredis.from_url(REDIS_URL, decode_responses=False)
SUPPORTED_PROVIDERS = {"sora", "veo", "kling", "seedance"}
app = FastAPI()
app.include_router(internal_router)
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in CORS_ALLOW_ORIGINS.split(",")],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
def on_startup():
db.init_db()
@app.get("/health")
def health():
return {"ok": True, "service": "video-lab"}
@app.get("/api/video/providers")
def list_providers():
"""4 provider 항상 노출 (key 누락은 worker가 failed 보고)."""
return {"providers": [
{"id": "sora", "name": "Sora 2", "models": ["sora-2", "sora-2-pro"],
"durations": [8, 16, 20], "sizes": ["1280x720", "1920x1080", "1080x1920", "848x480", "480x848"]},
{"id": "veo", "name": "Veo 3.1", "models": ["veo-3.1-generate-001", "veo-3.1-fast-generate-001"],
"durations": [4, 6, 8], "aspect_ratios": ["16:9", "9:16"]},
{"id": "kling", "name": "Kling", "models": ["1.5", "1.6", "2.1", "2.1-master", "2.5", "2.6"],
"durations": [5, 10], "aspect_ratios": ["16:9", "9:16", "1:1"]},
{"id": "seedance", "name": "Seedance 2.0", "models": ["seedance-2.0"],
"durations": [4, 5, 6, 8, 10, 12, 15], "aspect_ratios": ["16:9", "9:16", "1:1", "4:3"]},
]}
class GenerateRequest(BaseModel):
provider: str = Field(..., description="sora|veo|kling|seedance")
model: Optional[str] = None
prompt: str
# Optional 공통
duration: Optional[int] = None
aspect_ratio: Optional[str] = None
image_url: Optional[str] = None
negative_prompt: Optional[str] = None
# Provider 별 추가 키는 extra 허용
extra: Optional[Dict[str, Any]] = None
class Config:
extra = "allow"
async def _push_render_job(task_id: str, job_type: str, params: dict) -> None:
"""Redis queue:video-render에 push."""
kst = timezone(timedelta(hours=9))
payload = {
"task_id": task_id,
"kind": "video",
"job_type": job_type,
"params": params,
"submitted_at": datetime.now(kst).isoformat(),
}
await redis_client.rpush("queue:video-render", json.dumps(payload))
@app.post("/api/video/generate")
async def generate_video(req: GenerateRequest):
"""영상 생성 — Redis 큐로 Windows video-render에 위임."""
if req.provider not in SUPPORTED_PROVIDERS:
raise HTTPException(400, f"지원하지 않는 provider: {req.provider} (supported: {sorted(SUPPORTED_PROVIDERS)})")
task_id = str(uuid.uuid4())
params = req.model_dump(exclude_none=True)
db.create_task(task_id, req.provider, params)
job_type = f"{req.provider}_generation" # sora_generation, veo_generation, kling_generation, seedance_generation
await _push_render_job(task_id, job_type, params)
return {"task_id": task_id, "provider": req.provider}
@app.get("/api/video/tasks/{task_id}")
def get_task_status(task_id: str):
t = db.get_task(task_id)
if not t:
raise HTTPException(404, "task not found")
return t
```
### Step 2: smoke test
Run: `cd video-lab && python -c "from app.main import app, redis_client; paths=[r.path for r in app.routes if hasattr(r,'path')]; [print(p) for p in paths]"`
Expected: `/health`, `/api/video/providers`, `/api/video/generate`, `/api/video/tasks/{task_id}`, `/api/internal/video/update` 출력.
### Step 3: 회귀 — 이전 tests 통과
Run: `cd video-lab && python -m pytest tests/ -v`
Expected: 8 PASS (test_auth 3 + test_internal_router 5).
### Step 4: 커밋
```bash
git add video-lab/app/main.py
git commit -m "$(cat <<'EOF'
feat(video-lab): main.py — FastAPI + redis client + 2 endpoint (SP-8)
POST /api/video/generate (provider validation + Redis push + task_id 반환).
GET /api/video/tasks/{id} (DB 조회).
GET /api/video/providers (4 provider 메타).
SUPPORTED_PROVIDERS = sora/veo/kling/seedance.
Plan-B-Video Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 6: NAS — docker-compose.yml entry + nginx 라우팅 + media alias
**Files:**
- Modify: `C:/Users/jaeoh/Desktop/workspace/web-backend/docker-compose.yml` — video-lab service 추가
- Modify: `C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf``/api/video/` proxy + `/media/video/` alias
### Step 1: docker-compose.yml — music-lab 다음에 video-lab service 추가
`C:/Users/jaeoh/Desktop/workspace/web-backend/docker-compose.yml`에서 music-lab service 블록(약 line 55-90) **다음에** 추가:
```yaml
video-lab:
build:
context: ./video-lab
container_name: video-lab
restart: unless-stopped
ports:
- "18801:8000"
environment:
- TZ=${TZ:-Asia/Seoul}
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- VIDEO_DATA_DIR=/app/data
- CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080}
volumes:
- ${RUNTIME_PATH}\data\video:/app/data
depends_on:
- redis
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3
```
또한 `frontend` service의 `volumes:` 섹션에 video media mount 추가 (다른 mount들 사이에 1줄):
```yaml
- ${RUNTIME_PATH}\data\video:/data/video:ro
```
(이미 `${RUNTIME_PATH}\data\videos:/data/videos:ro`가 있을 수 있음. 그건 music-lab의 videos. video-lab은 별도 `/data/video` 단수형.)
또한 `frontend` service의 `depends_on:` 리스트에 `- video-lab` 추가 (다른 -lab 사이에 1줄).
### Step 2: nginx default.conf — proxy + media alias
`C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf`에서 다음 3 블록 추가:
**A. `/api/video/` proxy** — 기존 `/api/insta/` 또는 `/api/music/` 블록 옆에:
```nginx
# video-lab — 영상 생성 gateway
location /api/video/ {
resolver 127.0.0.11 valid=10s;
set $video_backend video-lab:8000;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://$video_backend$request_uri;
proxy_read_timeout 120s;
proxy_connect_timeout 10s;
}
```
**B. `/media/video/` alias** — 기존 `/media/music/` 블록 옆에 (단수형 — videos와 구분):
```nginx
# video media — Nginx 직접 mp4 서빙
location ^~ /media/video/ {
alias /data/video/;
expires 1d;
add_header Cache-Control "public, max-age=86400" always;
add_header Accept-Ranges bytes always;
autoindex off;
}
```
### Step 3: 검증
Run: `grep -nE "video-lab|/api/video/|/media/video/" C:/Users/jaeoh/Desktop/workspace/web-backend/docker-compose.yml C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf`
Expected: 각 항목이 모두 표시됨.
### Step 4: 커밋
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-backend
git add docker-compose.yml nginx/default.conf
git commit -m "$(cat <<'EOF'
feat(video-lab): docker-compose entry + nginx routing (SP-8)
video-lab service: port 18801, REDIS_URL/INTERNAL_API_KEY env,
depends_on redis, /app/data volume mount.
nginx: /api/video/ proxy + /media/video/ direct serve alias.
frontend depends_on + volume mount 추가.
Plan-B-Video Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 7: nginx 3-layer 차단 /api/internal/video/
**Files:**
- Modify: `C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf`
insta/music 블록과 동일 패턴 복제.
### Step 1: default.conf에 차단 블록 추가
`/api/internal/music/` 블록(이전 plan에서 작성) 바로 다음에 추가:
```nginx
# Plan-B-Video — Windows video-render → NAS video-lab internal webhook
# Layer 1·2: nginx IP 화이트리스트 (LAN + Tailscale)
# Layer 3: X-Internal-Key (FastAPI dependency)
location /api/internal/video/ {
allow 192.168.45.0/24; # LAN 화이트리스트
allow 100.64.0.0/10; # Tailscale CGNAT
allow 127.0.0.1; # NAS 내부
deny all;
resolver 127.0.0.11 valid=10s;
set $video_internal_backend video-lab:8000;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Internal-Key $http_x_internal_key;
proxy_pass http://$video_internal_backend$request_uri;
}
```
### Step 2: 검증
Run: `grep -c "location /api/internal/" C:/Users/jaeoh/Desktop/workspace/web-backend/nginx/default.conf`
Expected: `3` (insta + music + video).
### Step 3: 커밋
```bash
git add nginx/default.conf
git commit -m "$(cat <<'EOF'
feat(nginx): /api/internal/video/ 3-layer 차단 (SP-8)
LAN(192.168.45.0/24) + Tailscale(100.64.0.0/10) + 127.0.0.1 allow.
deny all. X-Internal-Key forward → video-lab:8000.
insta/music 블록과 동일 패턴.
Plan-B-Video Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 8: Windows video-render — Dockerfile + requirements + .env.example + nas_client.py
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/Dockerfile`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/requirements.txt`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/.env.example`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/nas_client.py`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/tests/__init__.py`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/tests/test_nas_client.py`
### Step 1: `requirements.txt` 작성
```
fastapi==0.115.6
uvicorn[standard]==0.34.0
requests==2.32.3
redis>=5.0
httpx>=0.27
openai>=1.50.0
google-cloud-storage>=2.18.0
pytest>=8.0
pytest-asyncio>=0.24
respx>=0.21
```
### Step 2: `Dockerfile` 작성
```dockerfile
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
```
### Step 3: `.env.example` 작성
```
# Plan-B-Video — Windows video-render worker
# NAS Redis 큐
REDIS_URL=redis://192.168.45.54:6379
# NAS internal webhook (video-lab port 18801)
NAS_BASE_URL=http://192.168.45.54:18801
INTERNAL_API_KEY=__copy_from_nas_dotenv__
# Sora 2 (OpenAI)
OPENAI_API_KEY=__paste_openai_key__
# Veo 3.1 (Google Vertex AI)
GOOGLE_PROJECT_ID=__paste_gcp_project_id__
GOOGLE_LOCATION=us-central1
GOOGLE_GCS_BUCKET=__paste_gcs_bucket_name__
GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json # 컨테이너 안 service account JSON 경로
# Kling (PiAPI gateway)
PIAPI_API_KEY=__paste_piapi_key__
# Seedance 2.0 (BytePlus)
SEEDANCE_API_KEY=__paste_seedance_key__
# NAS SMB mount 안의 video 디렉토리
VIDEO_MEDIA_ROOT=/mnt/nas/webpage/data/video
# nginx 서빙 prefix (NAS webhook payload용)
VIDEO_MEDIA_URL_PREFIX=/media/video
```
### Step 4: 실패 테스트 작성
`tests/__init__.py`: (빈 파일)
`tests/test_nas_client.py`:
```python
"""nas_client — webhook adapter for video-render."""
import pytest
import respx
import httpx
from nas_client import webhook_update_task
@pytest.fixture(autouse=True)
def _env(monkeypatch):
monkeypatch.setenv("NAS_BASE_URL", "http://nas-test:18801")
monkeypatch.setenv("INTERNAL_API_KEY", "test-key")
@respx.mock
def test_webhook_update_task_sends_x_internal_key():
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(200, json={"ok": True})
)
webhook_update_task("task-1", "processing", 30, message="downloading")
assert route.called
req = route.calls[0].request
assert req.headers["X-Internal-Key"] == "test-key"
import json
body = json.loads(req.content)
assert body["task_id"] == "task-1"
assert body["status"] == "processing"
assert body["progress"] == 30
@respx.mock
def test_webhook_update_task_with_video_url():
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(200, json={"ok": True})
)
webhook_update_task("task-2", "succeeded", 100, message="완료",
video_url="/media/video/task-2.mp4")
import json
payload = json.loads(route.calls[0].request.content)
assert payload["video_url"] == "/media/video/task-2.mp4"
assert payload["status"] == "succeeded"
@respx.mock
def test_webhook_update_task_with_error():
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(200, json={"ok": True})
)
webhook_update_task("task-3", "failed", 0, error="Sora API rate limit")
import json
payload = json.loads(route.calls[0].request.content)
assert payload["error"] == "Sora API rate limit"
@respx.mock
def test_webhook_swallows_network_error(caplog):
respx.post("http://nas-test:18801/api/internal/video/update").mock(
side_effect=httpx.ConnectError("no host")
)
webhook_update_task("task-5", "processing", 10)
assert "task-5" in caplog.text
@respx.mock
def test_webhook_swallows_non_200(caplog):
respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(500, text="server error")
)
webhook_update_task("task-6", "processing", 50)
# 200 아닌 응답도 raise 안 함, error log만
assert "task-6" in caplog.text
```
### Step 5: 테스트 실패 확인
Run: `cd C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render && python -m pytest tests/test_nas_client.py -v`
Expected: FAIL — `nas_client` 미존재.
### Step 6: `nas_client.py` 작성
```python
"""NAS webhook 어댑터 — Windows worker가 NAS DB 직접 접근 못하므로 HTTP로 위임.
Plan-B-Music nas_client와 동일 패턴.
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, Optional
import httpx
logger = logging.getLogger(__name__)
_TIMEOUT = 10.0
def _post(payload: Dict[str, Any]) -> None:
nas_base_url = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18801")
internal_api_key = os.getenv("INTERNAL_API_KEY", "")
url = f"{nas_base_url}/api/internal/video/update"
try:
r = httpx.post(
url,
headers={"X-Internal-Key": internal_api_key},
json=payload,
timeout=_TIMEOUT,
)
if r.status_code != 200:
logger.error("webhook %s returned %d: %s",
payload.get("task_id"), r.status_code, r.text[:200])
except Exception:
logger.exception("webhook %s 호출 실패", payload.get("task_id"))
def webhook_update_task(
task_id: str,
status: str,
progress: int,
message: str = "",
video_url: Optional[str] = None,
error: Optional[str] = None,
) -> None:
payload: Dict[str, Any] = {
"task_id": task_id,
"status": status,
"progress": progress,
"message": message,
}
if video_url is not None:
payload["video_url"] = video_url
if error is not None:
payload["error"] = error
_post(payload)
```
### Step 7: 테스트 통과
Run: `python -m pytest tests/test_nas_client.py -v`
Expected: 5 PASS.
### Step 8: 커밋
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-ai
git add services/video-render/Dockerfile services/video-render/requirements.txt services/video-render/.env.example services/video-render/nas_client.py services/video-render/tests/__init__.py services/video-render/tests/test_nas_client.py
git commit -m "$(cat <<'EOF'
feat(video-render): scaffold + nas_client webhook adapter (SP-7)
Dockerfile (python:3.12-slim), requirements (openai + google-cloud-storage + httpx + redis).
.env.example: OPENAI/GOOGLE/PIAPI/SEEDANCE keys + VIDEO_MEDIA_ROOT.
nas_client.webhook_update_task: call-time os.getenv (테스트 격리), respx mock 5 tests.
Plan-B-Video Phase 2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 9: Windows video-render — providers/sora.py
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/__init__.py`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/sora.py`
OpenAI Sora 2 API. `POST /v1/videos` → polling → `GET /v1/videos/{id}/content?variant=video` 다운로드.
⚠️ Sora 2 API deprecated, shutdown 2026-09-24. plan 진행은 명시된 spec 따름.
### Step 1: 빈 `providers/__init__.py`
`C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/__init__.py`:
(빈 파일)
### Step 2: `providers/sora.py` 작성
```python
"""Sora 2 video generation — OpenAI Videos API.
POST /v1/videos → poll GET /v1/videos/{id} → GET /v1/videos/{id}/content download.
⚠️ Deprecated, shutdown 2026-09-24. Spec 진행은 박재오 결정 따름.
"""
from __future__ import annotations
import logging
import os
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
SORA_BASE_URL = "https://api.openai.com/v1"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 15 # OpenAI 권장: 10~20초
POLL_MAX_ATTEMPTS = 40 # 최대 ~10분
DEFAULT_MODEL = "sora-2"
def _headers() -> dict:
api_key = os.getenv("OPENAI_API_KEY", "")
return {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
def run_sora_generation(task_id: str, params: dict) -> None:
"""Sora 2로 영상 생성 → mp4 → NAS SMB 저장 → webhook."""
try:
if not os.getenv("OPENAI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)")
return
webhook_update_task(task_id, "processing", 5, "Sora API 호출 중...")
payload = {
"model": params.get("model") or DEFAULT_MODEL,
"prompt": params["prompt"][:5000],
}
if params.get("duration"):
payload["seconds"] = params["duration"]
if params.get("size"):
payload["size"] = params["size"]
elif params.get("aspect_ratio") == "9:16":
payload["size"] = "1080x1920"
elif params.get("aspect_ratio") == "16:9":
payload["size"] = "1920x1080"
resp = requests.post(f"{SORA_BASE_URL}/videos", headers=_headers(), json=payload, timeout=30)
if resp.status_code not in (200, 201):
webhook_update_task(task_id, "failed", 0, "", error=f"Sora API 오류: {resp.status_code} {resp.text[:300]}")
return
body = resp.json()
video_id = body.get("id", "")
if not video_id:
webhook_update_task(task_id, "failed", 0, "", error="Sora 응답에 video id 없음")
return
webhook_update_task(task_id, "processing", 15, f"Sora 작업 생성됨 (id={video_id[:16]})")
# 폴링
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
sr = requests.get(f"{SORA_BASE_URL}/videos/{video_id}", headers=_headers(), timeout=30)
if sr.status_code != 200:
continue
sd = sr.json()
status = sd.get("status", "")
progress = sd.get("progress", 0)
scaled = min(15 + int(progress * 0.65), 79)
webhook_update_task(task_id, "processing", scaled, f"Sora 생성 중... {progress}%")
if status == "completed":
break
elif status == "failed":
err = sd.get("error", {}).get("message", "Sora 작업 실패")
webhook_update_task(task_id, "failed", 0, "", error=err)
return
else:
webhook_update_task(task_id, "failed", 0, "", error="Sora 폴링 timeout (10분)")
return
# 다운로드
webhook_update_task(task_id, "processing", 80, "Sora 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
dl = requests.get(
f"{SORA_BASE_URL}/videos/{video_id}/content",
headers=_headers(),
params={"variant": "video"},
stream=True,
timeout=300,
)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Sora 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Sora API 타임아웃")
except Exception as e:
logger.exception("Sora generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))
```
### Step 3: 임포트 smoke
Run: `cd C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render && python -c "from providers.sora import run_sora_generation; print('OK')"`
Expected: `OK`.
### Step 4: 커밋
```bash
git add services/video-render/providers/__init__.py services/video-render/providers/sora.py
git commit -m "$(cat <<'EOF'
feat(video-render): providers/sora.py — Sora 2 client (SP-7)
POST /v1/videos → GET /v1/videos/{id} 폴링 (15초 × 40) → /content?variant=video 다운로드.
sora-2 / sora-2-pro 모델. aspect_ratio → size 매핑.
⚠️ OpenAI Sora 2 API deprecated 2026-09-24.
Plan-B-Video Phase 2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 10: Windows video-render — providers/veo.py (GCS 다운로드)
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/veo.py`
Veo 3.1 — Vertex AI long-running operation. 결과가 GCS bucket의 mp4. `google-cloud-storage`로 다운로드.
### Step 1: `providers/veo.py` 작성
```python
"""Veo 3.1 video generation — Google Vertex AI.
POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 →
결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB.
"""
from __future__ import annotations
import logging
import os
import subprocess
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 12 # Veo는 30~120초 소요
POLL_MAX_ATTEMPTS = 50 # 최대 ~10분
DEFAULT_MODEL = "veo-3.1-fast-generate-001"
def _gcloud_access_token() -> Optional[str]:
"""GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행.
google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용.
REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행.
"""
try:
from google.auth import default as google_default_auth
from google.auth.transport.requests import Request as GoogleAuthRequest
credentials, _ = google_default_auth(
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
credentials.refresh(GoogleAuthRequest())
return credentials.token
except Exception:
logger.exception("Google credentials refresh 실패")
return None
def _download_gcs(gcs_uri: str, local_path: str) -> bool:
"""gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환."""
try:
from google.cloud import storage as gcs_storage
if not gcs_uri.startswith("gs://"):
return False
without_scheme = gcs_uri[len("gs://"):]
bucket_name, blob_path = without_scheme.split("/", 1)
client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID"))
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
blob.download_to_filename(local_path)
return True
except Exception:
logger.exception("GCS 다운로드 실패: %s", gcs_uri)
return False
def run_veo_generation(task_id: str, params: dict) -> None:
"""Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook."""
try:
project_id = os.getenv("GOOGLE_PROJECT_ID", "")
location = os.getenv("GOOGLE_LOCATION", "us-central1")
gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "")
if not project_id or not gcs_bucket:
webhook_update_task(task_id, "failed", 0, "",
error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정")
return
token = _gcloud_access_token()
if not token:
webhook_update_task(task_id, "failed", 0, "",
error="Google access token 발행 실패 (서비스 계정 JSON 확인)")
return
webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...")
model_id = params.get("model") or DEFAULT_MODEL
endpoint_base = (
f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}"
f"/locations/{location}/publishers/google/models/{model_id}"
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
body = {
"instances": [{"prompt": params["prompt"]}],
"parameters": {
"storageUri": f"gs://{gcs_bucket}/veo/{task_id}/",
"sampleCount": 1,
"aspectRatio": params.get("aspect_ratio") or "16:9",
},
}
if params.get("duration"):
body["parameters"]["duration"] = params["duration"]
if params.get("negative_prompt"):
body["parameters"]["negativePrompt"] = params["negative_prompt"]
resp = requests.post(f"{endpoint_base}:predictLongRunning",
headers=headers, json=body, timeout=30)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}")
return
op_name = resp.json().get("name", "")
if not op_name:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음")
return
webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨")
# 폴링 — fetchPredictOperation
gcs_uri = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.post(
f"{endpoint_base}:fetchPredictOperation",
headers=headers,
json={"operationName": op_name},
timeout=30,
)
if fetch.status_code != 200:
continue
fd = fetch.json()
done = fd.get("done", False)
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...")
if done:
if "error" in fd:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo 작업 실패: {fd['error'].get('message','?')}")
return
videos = (fd.get("response") or {}).get("videos") or []
if not videos:
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음")
return
gcs_uri = videos[0].get("gcsUri", "")
break
else:
webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)")
return
if not gcs_uri:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음")
return
webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
ok = _download_gcs(gcs_uri, file_path)
if not ok:
webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}")
return
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃")
except Exception as e:
logger.exception("Veo generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))
```
### Step 2: 임포트 smoke
Run: `python -c "from providers.veo import run_veo_generation; print('OK')"`
Expected: `OK` (google.cloud.storage import 안 하므로 OK. 실제 호출 시 import).
### Step 3: 커밋
```bash
git add services/video-render/providers/veo.py
git commit -m "$(cat <<'EOF'
feat(video-render): providers/veo.py — Veo 3.1 Vertex AI client (SP-7)
predictLongRunning → fetchPredictOperation 폴링 (12초 × 50).
결과 gs://bucket/veo/{task_id}/sample_0.mp4 → google-cloud-storage SDK로
다운로드 → NAS SMB. GOOGLE_PROJECT_ID/LOCATION/GCS_BUCKET/APPLICATION_CREDENTIALS env.
Plan-B-Video Phase 2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 11: Windows video-render — providers/kling.py (PiAPI 경유)
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/kling.py`
Kling AI — PiAPI gateway 사용. `POST /api/v1/task` + GET task status (PiAPI 표준 패턴).
### Step 1: `providers/kling.py` 작성
```python
"""Kling AI video generation — PiAPI gateway 경유.
POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드.
"""
from __future__ import annotations
import logging
import os
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
PIAPI_BASE_URL = "https://api.piapi.ai/api/v1"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 10 # Kling은 30~180초
POLL_MAX_ATTEMPTS = 60 # 최대 10분
DEFAULT_VERSION = "2.6"
def _headers() -> dict:
api_key = os.getenv("PIAPI_API_KEY", "")
return {
"x-api-key": api_key,
"Content-Type": "application/json",
}
def run_kling_generation(task_id: str, params: dict) -> None:
"""Kling으로 영상 생성 → mp4 → NAS SMB → webhook."""
try:
if not os.getenv("PIAPI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정")
return
webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...")
input_obj = {
"prompt": params["prompt"][:2500],
"duration": params.get("duration", 5),
"aspect_ratio": params.get("aspect_ratio", "16:9"),
"mode": params.get("mode", "std"),
"version": params.get("model") or DEFAULT_VERSION,
}
if params.get("negative_prompt"):
input_obj["negative_prompt"] = params["negative_prompt"][:2500]
if params.get("cfg_scale") is not None:
input_obj["cfg_scale"] = str(params["cfg_scale"])
if params.get("image_url"):
input_obj["image_url"] = params["image_url"]
body = {
"model": "kling",
"task_type": "video_generation",
"input": input_obj,
"config": {"service_mode": "public"},
}
resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}")
return
body_json = resp.json()
if body_json.get("code") != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}")
return
piapi_task_id = (body_json.get("data") or {}).get("task_id", "")
if not piapi_task_id:
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음")
return
webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨")
# 폴링 — GET /task/{id}
video_url = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}",
headers=_headers(), timeout=30)
if fetch.status_code != 200:
continue
fd = fetch.json()
data = fd.get("data", {})
status = data.get("status", "")
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})")
if status == "Completed":
video_url = (data.get("output") or {}).get("video_url", "")
break
elif status in ("Failed", "failed"):
err = (data.get("error") or {}).get("message", "Kling 작업 실패")
webhook_update_task(task_id, "failed", 0, "", error=err)
return
# Pending/Processing/Staged → 계속 폴링
else:
webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)")
return
if not video_url:
webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음")
return
webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
dl = requests.get(video_url, stream=True, timeout=300)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃")
except Exception as e:
logger.exception("Kling generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))
```
### Step 2: 임포트 smoke
Run: `python -c "from providers.kling import run_kling_generation; print('OK')"`
Expected: `OK`.
### Step 3: 커밋
```bash
git add services/video-render/providers/kling.py
git commit -m "$(cat <<'EOF'
feat(video-render): providers/kling.py — Kling AI via PiAPI gateway (SP-7)
POST /api/v1/task (model=kling, task_type=video_generation) →
GET /api/v1/task/{id} 폴링 (10초 × 60) → data.output.video_url 다운로드.
x-api-key 헤더. version 1.5/1.6/2.1/2.5/2.6 지원.
Plan-B-Video Phase 2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 12: Windows video-render — providers/seedance.py (BytePlus)
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/providers/seedance.py`
Seedance 2.0 — BytePlus. `POST /seedance/v1/videos``GET /videos/{id}` → output.video_url 다운로드.
### Step 1: `providers/seedance.py` 작성
```python
"""Seedance 2.0 video generation — ByteDance Volcano Engine (BytePlus 국제 endpoint).
POST https://api.byteplus.com/seedance/v1/videos → GET /videos/{id} 폴링 → output.video_url 다운로드.
"""
from __future__ import annotations
import logging
import os
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
SEEDANCE_BASE_URL = "https://api.byteplus.com/seedance/v1"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 8 # Seedance는 30~120초
POLL_MAX_ATTEMPTS = 60
DEFAULT_MODEL = "seedance-2.0"
def _headers() -> dict:
api_key = os.getenv("SEEDANCE_API_KEY", "")
return {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
def run_seedance_generation(task_id: str, params: dict) -> None:
"""Seedance로 영상 생성 → mp4 → NAS SMB → webhook."""
try:
if not os.getenv("SEEDANCE_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="SEEDANCE_API_KEY 미설정")
return
webhook_update_task(task_id, "processing", 5, "Seedance API 호출 중...")
body = {
"model": params.get("model") or DEFAULT_MODEL,
"prompt": params["prompt"][:2000],
"resolution": params.get("resolution", "1080p"),
"duration": params.get("duration", 5),
"aspect_ratio": params.get("aspect_ratio", "16:9"),
}
if params.get("negative_prompt"):
body["negative_prompt"] = params["negative_prompt"]
if params.get("image_url"):
body["references"] = [{"type": "image", "data": params["image_url"], "role": "subject"}]
if params.get("audio") is not None:
body["audio"] = bool(params["audio"])
if params.get("seed") is not None:
body["seed"] = int(params["seed"])
resp = requests.post(f"{SEEDANCE_BASE_URL}/videos", headers=_headers(), json=body, timeout=30)
if resp.status_code not in (200, 201):
webhook_update_task(task_id, "failed", 0, "",
error=f"Seedance API 오류: {resp.status_code} {resp.text[:300]}")
return
body_json = resp.json()
job_id = body_json.get("id", "")
if not job_id:
webhook_update_task(task_id, "failed", 0, "", error="Seedance 응답에 id 없음")
return
webhook_update_task(task_id, "processing", 15, "Seedance 작업 등록됨")
# 폴링
video_url = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.get(f"{SEEDANCE_BASE_URL}/videos/{job_id}",
headers=_headers(), timeout=30)
if fetch.status_code != 200:
continue
fd = fetch.json()
status = fd.get("status", "")
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, f"Seedance 생성 중... ({status})")
if status == "completed":
video_url = (fd.get("output") or {}).get("video_url", "")
break
elif status == "failed":
err = fd.get("error") or "Seedance 작업 실패"
webhook_update_task(task_id, "failed", 0, "", error=str(err)[:300])
return
else:
webhook_update_task(task_id, "failed", 0, "", error="Seedance 폴링 timeout (10분)")
return
if not video_url:
webhook_update_task(task_id, "failed", 0, "", error="Seedance 완료했으나 video_url 없음")
return
webhook_update_task(task_id, "processing", 85, "Seedance 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
dl = requests.get(video_url, stream=True, timeout=300)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Seedance 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Seedance API 타임아웃")
except Exception as e:
logger.exception("Seedance generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))
```
### Step 2: 임포트 smoke
Run: `python -c "from providers.seedance import run_seedance_generation; print('OK')"`
Expected: `OK`.
### Step 3: 커밋
```bash
git add services/video-render/providers/seedance.py
git commit -m "$(cat <<'EOF'
feat(video-render): providers/seedance.py — Seedance 2.0 BytePlus client (SP-7)
POST /seedance/v1/videos → GET /videos/{id} 폴링 (8초 × 60) → output.video_url 다운로드.
Bearer 토큰. resolution 1080p/720p/2k, duration 4~15s.
references 배열로 image-to-video 지원.
Plan-B-Video Phase 2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 13: Windows video-render — worker.py + tests
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/worker.py`
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/tests/test_worker.py`
Plan-B-Music worker.py 패턴 (string-based dispatch table + getattr).
### Step 1: 실패 테스트 작성
```python
"""worker.py — job_type 디스패처 (4 provider)."""
import pytest
from unittest.mock import patch
import worker
def test_dispatch_sora_calls_run_sora_generation():
payload = {"task_id": "t1", "job_type": "sora_generation", "params": {"prompt": "x"}}
with patch("worker.run_sora_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t1", {"prompt": "x"})
def test_dispatch_veo_calls_run_veo_generation():
payload = {"task_id": "t2", "job_type": "veo_generation", "params": {"prompt": "x"}}
with patch("worker.run_veo_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t2", {"prompt": "x"})
def test_dispatch_kling_calls_run_kling_generation():
payload = {"task_id": "t3", "job_type": "kling_generation", "params": {"prompt": "x"}}
with patch("worker.run_kling_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t3", {"prompt": "x"})
def test_dispatch_seedance_calls_run_seedance_generation():
payload = {"task_id": "t4", "job_type": "seedance_generation", "params": {"prompt": "x"}}
with patch("worker.run_seedance_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t4", {"prompt": "x"})
def test_dispatch_unknown_job_type_logs_error():
payload = {"task_id": "t5", "job_type": "weird_type", "params": {}}
with patch("worker.webhook_update_task") as m:
worker._dispatch(payload)
m.assert_called_once()
args = m.call_args[0]
assert args[0] == "t5"
assert args[1] == "failed"
```
### Step 2: 테스트 실패 확인
Run: `cd C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render && python -m pytest tests/test_worker.py -v`
Expected: FAIL.
### Step 3: `worker.py` 작성
```python
"""Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook.
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
import redis.asyncio as aioredis
from nas_client import webhook_update_task
from providers.sora import run_sora_generation
from providers.veo import run_veo_generation
from providers.kling import run_kling_generation
from providers.seedance import run_seedance_generation
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:video-render"
PAUSED_KEY = "queue:paused"
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
_DISPATCH_TABLE = {
"sora_generation": "run_sora_generation",
"veo_generation": "run_veo_generation",
"kling_generation": "run_kling_generation",
"seedance_generation": "run_seedance_generation",
}
def _dispatch(payload: dict) -> None:
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(sys.modules[__name__], fn_name)
except AttributeError:
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
logger.info("video-render worker started (queue=%s)", QUEUE_KEY)
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
item = await redis.blpop(QUEUE_KEY, timeout=1)
if item is None:
continue
_, raw = item
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid queue payload: %r", raw[:200])
continue
await asyncio.to_thread(_dispatch, payload)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)
```
### Step 4: 테스트 통과
Run: `python -m pytest tests/test_worker.py -v`
Expected: 5 PASS.
### Step 5: 커밋
```bash
git add services/video-render/worker.py services/video-render/tests/test_worker.py
git commit -m "$(cat <<'EOF'
feat(video-render): worker.py — Redis BLPOP + 4 job_type dispatch (SP-7)
queue:video-render BLPOP, queue:paused 체크 후 dispatch.
string-based _DISPATCH_TABLE + getattr (테스트 patch 호환, Plan-B-Music 패턴).
AttributeError 가드 포함. asyncio.to_thread로 sync provider wrap.
4 job_type: sora/veo/kling/seedance _generation.
Plan-B-Video Phase 2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 14: Windows video-render — main.py + services/docker-compose entry
**Files:**
- Create: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/video-render/main.py`
- Modify: `C:/Users/jaeoh/Desktop/workspace/web-ai/services/docker-compose.yml`
### Step 1: `main.py` 작성
```python
"""video-render FastAPI entry — health + lifespan (worker loop spawn)."""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
import worker
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
worker_task = asyncio.create_task(worker.worker_loop())
logger.info("video-render lifespan 시작")
try:
yield
finally:
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
logger.info("video-render lifespan 종료")
app = FastAPI(lifespan=lifespan)
@app.get("/health")
def health():
return {"ok": True, "service": "video-render"}
```
### Step 2: import smoke
Run: `python -c "from main import app; print(len(app.routes))"`
Expected: 숫자 출력 (>=1, /health).
### Step 3: docker-compose.yml에 video-render 추가
`C:/Users/jaeoh/Desktop/workspace/web-ai/services/docker-compose.yml`의 music-render service 다음에 추가:
```yaml
video-render:
build:
context: ./video-render
container_name: video-render
restart: unless-stopped
ports:
- "18712:8000"
environment:
- TZ=Asia/Seoul
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18801}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- GOOGLE_PROJECT_ID=${GOOGLE_PROJECT_ID:-}
- GOOGLE_LOCATION=${GOOGLE_LOCATION:-us-central1}
- GOOGLE_GCS_BUCKET=${GOOGLE_GCS_BUCKET:-}
- GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
- PIAPI_API_KEY=${PIAPI_API_KEY:-}
- SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-}
- VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video}
- VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video}
volumes:
- /mnt/nas/webpage/data/video:/mnt/nas/webpage/data/video
- ${GCP_SA_JSON_HOST_PATH:-/etc/webai/gcp-sa.json}:/app/keys/gcp-sa.json:ro
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3
```
**중요:**
- `NAS_BASE_URL` 기본값 18801(video-lab) — Plan-B-Music 학습 적용: `services/.env``NAS_BASE_URL` 없어야 service-local default 적용됨.
- `GCP_SA_JSON_HOST_PATH` — 박재오가 host에 service account JSON 두는 경로. `.env`에서 override.
### Step 4: 커밋
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-ai
git add services/video-render/main.py services/docker-compose.yml
git commit -m "$(cat <<'EOF'
feat(video-render): main.py + services/docker-compose entry (SP-7)
FastAPI lifespan에서 worker_loop 스폰. /health endpoint.
docker-compose: port 18712, NAS_BASE_URL default=18801 (video-lab),
4 provider env (OPENAI_API_KEY, GOOGLE_*, PIAPI_API_KEY, SEEDANCE_API_KEY),
GCP service account JSON read-only mount.
Plan-B-Video Phase 2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
)"
```
---
## Task 15: NAS push + deployer rebuild + 박재오 빌드 안내
**Files:**
- (변경 없음 — push + 박재오 안내)
### Step 1: web-backend push
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-backend
git push
```
Expected: NAS Gitea로 push 성공. Webhook이 deployer 자동 트리거 → video-lab 신규 컨테이너 빌드 + 시작.
### Step 2: 박재오 NAS 측 확인 안내
박재오에게 다음 검증 요청:
```bash
ssh nas
docker ps --filter "name=video-lab" # Up 확인
docker logs video-lab --tail 30 # 시작 로그 확인
docker exec video-lab env | grep -E "INTERNAL_API_KEY|REDIS_URL" # env 주입 확인
# health
curl https://gahusb.synology.me/api/video/providers
# → 4 provider 메타 표시
# NAS frontend 재시작 (nginx /api/video/, /media/video/, /api/internal/video/ 활성화)
docker compose -f /volume1/docker/webpage/docker-compose.yml restart frontend
```
### Step 3: 박재오 Windows AI 머신 빌드 안내
박재오 WSL2 (`/workspace/web-ai/`):
```bash
# 1. web-ai push (자격증명 갱신 후 박재오 수동)
git push
# 2. services/.env에 4 provider key 추가 (NAS_BASE_URL은 라인 추가하지 말 것 — Plan-B-Music 학습)
nano /workspace/web-ai/services/.env
```
수정 후 `services/.env` 예시:
```
REDIS_URL=redis://192.168.45.54:6379
INTERNAL_API_KEY=<NAS와 동일값>
SUNO_API_KEY=<기존>
INSTA_MEDIA_ROOT=/mnt/nas/webpage/data/insta
INSTA_MEDIA_URL_PREFIX=/media/insta
# 신규 (Plan-B-Video)
OPENAI_API_KEY=<paste>
GOOGLE_PROJECT_ID=<paste>
GOOGLE_LOCATION=us-central1
GOOGLE_GCS_BUCKET=<paste>
PIAPI_API_KEY=<paste>
SEEDANCE_API_KEY=<paste>
GCP_SA_JSON_HOST_PATH=/etc/webai/gcp-sa.json # 박재오 host에 두는 service account JSON 경로
```
```bash
# 3. GCP service account JSON 호스트에 배치
sudo mkdir -p /etc/webai
sudo cp <박재오 다운받은 gcp-sa.json> /etc/webai/gcp-sa.json
sudo chmod 644 /etc/webai/gcp-sa.json
# 4. 빌드 + 기동
cd /workspace/web-ai/services
docker compose build video-render
docker compose up -d video-render
docker logs video-render --tail 30
# 5. WSL2 안 health
curl -m 3 http://localhost:18712/health
# 6. NAS에서 도달 확인 (mirror mode + 방화벽 룰 — 18712 inbound 필요)
```
### Step 4: Windows 방화벽 18712 inbound 룰 추가
박재오 Windows PowerShell (관리자):
```powershell
New-NetFirewallRule -DisplayName "video-render-inbound-18712" `
-Direction Inbound -Protocol TCP -LocalPort 18712 `
-Action Allow -Profile Any -Enabled True
```
(Plan-B-Music에서 mirror mode 영구 적용 + 18711 inbound 룰 추가 학습. video-render는 sync forward 없으므로 사실 inbound 룰 불필요 — Windows pulls from Redis. 안전 차원에서만 추가, optional.)
### Step 5: NAS → Windows redis 도달 확인 (worker가 BLPOP 중)
```bash
ssh nas
docker logs video-render -f & # 박재오 머신에서 따로 보고
# NAS에서
docker exec redis redis-cli LLEN queue:video-render
# → 0 (큐 비어 있으면 worker가 BLPOP 중)
```
이 task의 push + 빌드는 박재오 머신에서 직접. controller는 push 시도 후 박재오 검증 결과 대기.
---
## Task 16: end-to-end 검증 — Kling 1 트랙
**Files:**
- (변경 없음 — 검증만)
Kling이 가장 단순한 API (PiAPI x-api-key + 표준 task pattern) — 1차 검증 권장.
### Step 1: Kling 1 영상 생성 트리거
```bash
curl -X POST https://gahusb.synology.me/api/video/generate \
-H "Content-Type: application/json" \
-d '{
"provider": "kling",
"model": "2.6",
"prompt": "A serene mountain landscape with morning mist",
"duration": 5,
"aspect_ratio": "16:9",
"mode": "std"
}'
# → {"task_id": "<uuid>", "provider": "kling"}
```
### Step 2: Windows worker 로그 확인 (별도 터미널)
```bash
docker logs video-render -f
```
기대 시퀀스:
```
... INFO Kling API 호출 중...
... INFO Kling 작업 등록됨
... POST http://192.168.45.54:18801/api/internal/video/update "HTTP/1.1 200 OK" ← 18801!
... INFO Kling 생성 중... (Processing)
... INFO Kling 결과 다운로드 중...
... INFO Kling 생성 완료
```
### Step 3: NAS DB 상태 폴링
```bash
curl -s https://gahusb.synology.me/api/video/tasks/<task_id> | python3 -m json.tool
# 기대: status=succeeded, video_url=/media/video/<task_id>.mp4
```
### Step 4: mp4 파일 + 재생
```bash
ls -la /volume1/docker/webpage/data/video/<task_id>.mp4
# 또는 브라우저: https://gahusb.synology.me/media/video/<task_id>.mp4
```
### Step 5: Sora·Veo·Seedance 회귀 (선택)
각 provider 1 트랙씩 동일 명령 (`provider`만 변경) — 4 provider 모두 succeeded 확인 시 plan 완전 통과.
---
## Task 17: 메모리 기록 + 최종 정리 + push 확인
**Files:**
- Create: `C:/Users/jaeoh/.claude/projects/C--Users-jaeoh-Desktop-workspace-web-ui/memory/reference_plan_b_video_complete.md`
- Modify: `C:/Users/jaeoh/.claude/projects/C--Users-jaeoh-Desktop-workspace-web-ui/memory/MEMORY.md`
### Step 1: `reference_plan_b_video_complete.md` 작성
```markdown
---
name: plan-b-video-complete
description: 2026-05-19 Plan-B-Video 완료 - NAS video-lab 신설 + Windows video-render 4 provider (Sora 2/Veo 3.1/Kling via PiAPI/Seedance 2.0)
metadata:
type: reference
---
Plan-B-Video 2026-05-19 완료. 17 task. spec §10 SP-7 4 provider로 축소 변경 (Runway/Pika/Luma 제외, Seedance 추가).
## 구조
NAS video-lab (신규 컨테이너, port 18801):
- POST /api/video/generate → Redis RPUSH queue:video-render
- GET /api/video/tasks/{id}
- POST /api/internal/video/update (X-Internal-Key)
Windows video-render (port 18712):
- worker: Redis BLPOP queue:video-render → job_type dispatch (4 provider)
- providers/{sora,veo,kling,seedance}.py
- 결과 mp4 → /mnt/nas/webpage/data/video/{task_id}.mp4
## 4 Provider 특징
| Provider | Auth | 특이사항 |
|----------|------|---------|
| Sora 2 | OPENAI_API_KEY Bearer | ⚠️ Deprecated 2026-09-24 — alternative 필요 시기 도래 |
| Veo 3.1 | gcloud Bearer (service account JSON) | GCS bucket 경유. google-cloud-storage SDK로 gs://...mp4 다운로드 후 SMB write |
| Kling | x-api-key (PiAPI gateway) | api.piapi.ai 경유. 직접 KlingAI API는 JWT 필요라 더 복잡 |
| Seedance 2.0 | SEEDANCE_API_KEY Bearer | BytePlus 국제 endpoint (api.byteplus.com) |
## Plan-B-Music에서 학습한 함정 — 모두 사전 적용
1. WSL2 mirror mode: 이미 영구 적용 (`.wslconfig networkingMode=mirrored`)
2. Redis chown 999:999: 이미 영구 적용
3. services/.env의 NAS_BASE_URL: 없는 상태 유지. video-render compose default 18801 적용
## 다음 plan
Plan-B-Infra (SP-9 NSSM + SP-10 task-watcher) — Windows 자동 시작 + 시간대 분기 정책.
```
### Step 2: MEMORY.md 인덱스에 한 줄 추가
`reference_plan_b_music_complete.md` 항목 뒤에:
```markdown
- [Plan-B-Video 완료](reference_plan_b_video_complete.md) — 2026-05-19 video-lab 신설 + 4 provider (Sora/Veo/Kling/Seedance). spec §10 SP-7 갱신 (6→4 provider)
```
### Step 3: 최종 push 확인
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-backend && git status && git log --oneline -10
cd C:/Users/jaeoh/Desktop/workspace/web-ai && git status && git log --oneline -10
```
Expected: 두 저장소 clean + 최신 commits 모두 push.
### Step 4: 박재오 측 종합 보고
박재오에게:
- 4 provider end-to-end 통과 확인
- ⚠️ Sora 2 API shutdown 2026-09-24 — 4개월 안에 alternative 결정 필요
- Veo GCS bucket 자동 정리 (TTL lifecycle rule) 권장 (mp4가 GCS에 영구 잔존하지 않도록)
- frontend video UI는 별도 follow-up
---
## Self-Review
**1. Spec coverage**
| Spec 요구사항 | 구현 위치 | 상태 |
|--------------|-----------|------|
| SP-7 §10: 영상 provider gateway (6→4 갱신) | Task 9~12 (providers/sora·veo·kling·seedance) | ✓ |
| SP-7 §10: provider 선택은 payload에서 | Task 5 GenerateRequest.provider + Task 13 dispatch | ✓ |
| SP-7 §10: 각 외부 API + mp4 다운로드 → /mnt/nas/data/video/ | Task 9~12 _download | ✓ |
| SP-7 §10: NAS webhook | Task 8 nas_client + Task 13 worker | ✓ |
| SP-8 §10: 새 docker 컨테이너 | Task 1~6 | ✓ |
| SP-8 §10: POST /api/video/generate | Task 5 main.py | ✓ |
| SP-8 §10: GET /api/video/tasks/{id} | Task 5 main.py | ✓ |
| SP-8 §10: video_tasks 테이블 sqlite | Task 2 db.py | ✓ |
| SP-8 §10: POST /api/internal/video/update | Task 4 internal_router | ✓ |
| SP-8 §10: Dockerfile + requirements + compose entry | Task 1, 6 | ✓ |
| §5 Windows Render Worker 패턴 | Task 13 worker_loop | ✓ |
| §6 Redis 큐 + payload 표준 | Task 5 _push_render_job | ✓ |
| §8 3-layer 차단 (LAN allow + deny all + X-Internal-Key) | Task 7 nginx | ✓ |
| §9 외부 API 키 Windows .env 단독 | Task 8 .env.example + Task 14 compose | ✓ |
| 박재오 결정 "4 provider 일괄 이전" | Task 9~12 | ✓ |
| Plan-B-Music 3가지 함정 사전 인지 | Task 14 service-local default + 메모리 | ✓ |
**전체 spec coverage**: 100%. 변경된 4 provider 명시 + spec §10 SP-7 갱신.
**2. Placeholder scan**: 통과. 모든 코드 block 완전. `<paste>` 등 placeholder는 박재오 .env 입력용으로 명시적 (placeholder 아님).
**3. Type consistency**:
- `webhook_update_task(task_id, status, progress, message, video_url, error)` — Task 8에 정의, Task 9~12에서 일관 호출
- `_push_render_job(task_id, job_type, params)` — Task 5에 정의
- `_dispatch(payload)` — Task 13. payload[task_id/job_type/params] 사용 (Task 5 push 형식과 일치)
- `job_type` 4종: `sora_generation`, `veo_generation`, `kling_generation`, `seedance_generation` — Task 5에서 생성, Task 13 _DISPATCH_TABLE에서 매핑 일관
- `UpdatePayload` (video_lab/internal_router) vs webhook payload — task_id/status/progress/message/video_url/error 일치
**4. 누락 검토**:
- nginx 차단 (`/api/internal/video/`): Task 7 ✓
- frontend depends_on + volume mount: Task 6 ✓
- 박재오 빌드 안내 (T15 + GCP_SA_JSON_HOST_PATH): ✓
- 18712 inbound 룰 — Task 15 Step 4 optional 명시 ✓
Plan 자체 완성. 박재오에게 execution choice 제시.
---
## 부록 — 알려진 limitation + follow-up
**알려진 한계:**
1. **Sora 2 deprecated 2026-09-24** — 4개월 시한. 이전 alternative: Pika, Runway Gen-3, 또는 새 Sora 3가 출시되면 migrate
2. **Veo GCS bucket 비용** — mp4가 GCS에 영구 잔존하면 storage 비용 누적. lifecycle rule (예: 30일 후 자동 삭제) 권장
3. **Kling PiAPI 의존성** — 직접 KlingAI 계약 후 native API로 migrate 가능 시 PIAPI 중간 layer 제거
4. **Frontend UI 미포함** — backend gateway만. 영상 갤러리 페이지는 별도 plan
**Follow-up:**
- Sora alternative migration (2026-08 안에)
- GCS lifecycle policy 적용
- frontend `/video` 페이지 (영상 생성 UI + 갤러리 + 폴링)
- Plan-B-Infra (SP-9 NSSM + SP-10 task-watcher) — Plan-B-Video 후 진행
---
## 부록 — 4 Provider API 키 발급 가이드
박재오가 미보유 상태에서 구현 → 검증 단계(Task 15~16)에서 API 키 발급 + .env 입력 후 진행. 각 provider 가입·결제·키 발급 흐름.
### 1. Sora 2 (OpenAI)
**대시보드**: https://platform.openai.com
**발급 흐름**:
1. https://platform.openai.com/signup 가입 (이미 계정 있으면 로그인)
2. https://platform.openai.com/account/billing/overview → 결제 수단 등록 (Sora 2는 prepaid credit 또는 monthly plan)
3. **Sora 2 API tier access 필요** — 2026년 5월 현재 Tier 1 이상에 제공 (최소 $5 사용 후 자동 unlock 또는 별도 신청)
4. https://platform.openai.com/api-keys → "Create new secret key" → `sk-proj-...` 복사
5. `.env``OPENAI_API_KEY=sk-proj-...` 입력
**가격 (2026-05 기준)**:
- sora-2 (720p 8초): ~$0.50
- sora-2-pro (1080p 8초): ~$1.50
**⚠️ 주의**: Sora 2 API는 2026-09-24 shutdown 예정. 4개월 시한 인지 후 사용.
### 2. Veo 3.1 (Google Vertex AI)
**대시보드**: https://console.cloud.google.com
**발급 흐름** (가장 복잡 — GCP 프로젝트 + 서비스 계정 + GCS bucket 셋업 필요):
```bash
# (1) GCP CLI 설치 (Windows PowerShell)
# https://cloud.google.com/sdk/docs/install 에서 다운로드
gcloud init
# (2) 프로젝트 생성 (또는 기존 사용)
gcloud projects create my-veo-project --name="Veo Project"
gcloud config set project my-veo-project
# (3) 결제 계정 연결 (필수, Console UI에서)
# https://console.cloud.google.com/billing → 프로젝트에 결제 계정 link
# (4) Vertex AI API 활성화
gcloud services enable aiplatform.googleapis.com
# (5) GCS bucket 생성 (Veo 결과 저장)
gcloud storage buckets create gs://my-veo-output --location=us-central1
# .env의 GOOGLE_GCS_BUCKET=my-veo-output
# (6) 서비스 계정 생성 + 권한 부여 + JSON 키 다운로드
gcloud iam service-accounts create veo-worker --display-name="Veo Worker"
gcloud projects add-iam-policy-binding my-veo-project \
--member="serviceAccount:veo-worker@my-veo-project.iam.gserviceaccount.com" \
--role="roles/aiplatform.user"
gcloud projects add-iam-policy-binding my-veo-project \
--member="serviceAccount:veo-worker@my-veo-project.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"
gcloud iam service-accounts keys create ~/gcp-sa.json \
--iam-account=veo-worker@my-veo-project.iam.gserviceaccount.com
# (7) JSON 키를 박재오 Windows AI WSL2 host에 복사
# 예: scp ~/gcp-sa.json or USB transfer
sudo mkdir -p /etc/webai
sudo mv ~/gcp-sa.json /etc/webai/gcp-sa.json
sudo chmod 644 /etc/webai/gcp-sa.json
```
`.env` 입력:
```
GOOGLE_PROJECT_ID=my-veo-project
GOOGLE_LOCATION=us-central1
GOOGLE_GCS_BUCKET=my-veo-output
GCP_SA_JSON_HOST_PATH=/etc/webai/gcp-sa.json
```
**가격 (2026-05)**:
- Veo 3.1 Fast (1080p 8초): ~$0.40
- Veo 3.1 Generate (1080p 8초): ~$0.75
**GCS lifecycle rule 권장** (mp4 자동 삭제로 비용 절감):
```bash
gcloud storage buckets update gs://my-veo-output \
--lifecycle-file=<(echo '{"lifecycle":{"rule":[{"action":{"type":"Delete"},"condition":{"age":30}}]}}')
```
→ 30일 후 자동 삭제.
### 3. Kling (PiAPI gateway)
**대시보드**: https://piapi.ai
**발급 흐름** (가장 단순 — 직접 KlingAI는 중국 KYC 필요라 PiAPI 권장):
1. https://piapi.ai 가입
2. Dashboard → Billing → credit 충전 (최소 $10)
3. Dashboard → API Keys → "Create API Key" → 복사
4. `.env``PIAPI_API_KEY=...` 입력
**Kling credit 활성화**: PiAPI는 Kling 외 다른 model도 지원 — 따로 enable 필요 없이 같은 API key로 호출.
**가격 (2026-05 PiAPI 기준)**:
- Kling v2.6 std 5초: ~$0.30
- Kling v2.6 pro 10초: ~$1.20
**대안 (네이티브 KlingAI)**: https://klingai.com/global/dev — 직접 API key 발급 가능하지만 중국 회원 가입 + JWT 인증 필요. PiAPI가 인프라 정착 빠름.
### 4. Seedance 2.0 (BytePlus)
**대시보드**: https://console.byteplus.com
**발급 흐름**:
1. https://www.byteplus.com 가입 (국제 사용자) — 중국 사용자는 https://www.volcengine.com (Volcengine, RMB)
2. KYC 인증 (기업/개인 ID 확인)
3. Console → "AI/ML" → "Seedance" 메뉴 → API access 신청 (대기 시간 ~24h 가능)
4. 승인 후 Access Key + Secret Key 발급, 또는 단순 Bearer API Key 발급 (UI에서 선택)
5. `.env``SEEDANCE_API_KEY=...` 입력
**가격 (2026-05 BytePlus)**:
- Seedance 2.0 (720p 5초): ~$0.05
- Seedance 2.0 (1080p 10초): ~$0.20
**참고 문서**: https://www.byteplus.com/en/product/seedance
### 박재오 측 발급 우선순위 권장
| 우선순위 | Provider | 사유 |
|---|---|---|
| 1 | **Kling (PiAPI)** | 발급 5분, 즉시 사용 가능 — 검증에 가장 적합 |
| 2 | **Seedance** | 저렴 ($0.05~0.20), 발급 1일, 4 provider 검증 다양화 |
| 3 | **Sora 2** | OpenAI 계정 있다면 즉시 + tier 활성화. ⚠️ 4개월 후 deprecated |
| 4 | **Veo 3.1** | GCP 셋업 부담 크지만 품질 + native audio 강점. 안정화 후 발급 |
**Task 16 (end-to-end 검증)은 Kling 1 트랙으로 시작 권장** (가장 쉬운 발급 + 짧은 polling). 그 후 발급되는 대로 다른 provider 회귀.
### API 키 입력 시점
Plan T1~T14까지 (NAS + Windows 코드 작성) 완료 후 **T15 단계에서** 박재오가 `.env`에 키 입력. T15 박재오 빌드 안내 step 2 참고. 키 없이도 컨테이너는 정상 시작됨 (단, 호출 시 worker가 즉시 "API_KEY 미설정" failed 보고).
키 누락 시 worker 동작 확인용 안전 fallback이 각 provider 코드에 이미 들어 있음:
```python
if not os.getenv("OPENAI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)")
return
```
→ NAS DB에 즉시 "failed + 키 미설정" 표시되어 디버깅 명확.