Compare commits

..

7 Commits

Author SHA1 Message Date
4db0551d33 feat(video-render): main.py + services/docker-compose entry (SP-7)
FastAPI lifespan에서 worker_loop 스폰. /health endpoint.
docker-compose: port 18712, NAS_BASE_URL default=18801 (video-lab),
4 provider env (OPENAI_API_KEY, GOOGLE_*, PIAPI_API_KEY, SEEDANCE_API_KEY),
GCP service account JSON read-only mount.
Plan-B-Video Phase 2 완료 — 박재오 머신에서 .env + GCP JSON 작성 + 빌드 대기.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:42:34 +09:00
4d837fdd31 feat(video-render): worker.py — Redis BLPOP + 4 job_type dispatch (SP-7)
queue:video-render BLPOP, queue:paused 체크 후 dispatch.
string-based _DISPATCH_TABLE + getattr (테스트 patch 호환, Plan-B-Music 패턴).
AttributeError 가드 포함. asyncio.to_thread로 sync provider wrap.
4 job_type: sora/veo/kling/seedance _generation.
Plan-B-Video Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:41:15 +09:00
2567a6f10b feat(video-render): providers/seedance.py — Seedance 2.0 BytePlus client (SP-7)
POST /seedance/v1/videos → GET /videos/{id} 폴링 (8초 × 60) → output.video_url 다운로드.
Bearer 토큰. resolution 1080p/720p/2k, duration 4~15s.
references 배열로 image-to-video 지원.
Plan-B-Video Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:39:54 +09:00
17ed1943f1 feat(video-render): providers/kling.py — Kling AI via PiAPI gateway (SP-7)
POST /api/v1/task (model=kling, task_type=video_generation) →
GET /api/v1/task/{id} 폴링 (10초 × 60) → data.output.video_url 다운로드.
x-api-key 헤더. version 1.5/1.6/2.1/2.5/2.6 지원.
Plan-B-Video Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:38:51 +09:00
8d246b5b32 feat(video-render): providers/veo.py — Veo 3.1 Vertex AI client (SP-7)
predictLongRunning → fetchPredictOperation 폴링 (12초 × 50).
결과 gs://bucket/veo/{task_id}/sample_0.mp4 → google-cloud-storage SDK로
다운로드 → NAS SMB. GOOGLE_PROJECT_ID/LOCATION/GCS_BUCKET/APPLICATION_CREDENTIALS env.
Plan-B-Video Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:37:45 +09:00
b4bec9d51b feat(video-render): providers/sora.py — Sora 2 client (SP-7)
POST /v1/videos → GET /v1/videos/{id} 폴링 (15초 × 40) → /content?variant=video 다운로드.
sora-2 / sora-2-pro 모델. aspect_ratio → size 매핑.
⚠️ OpenAI Sora 2 API deprecated 2026-09-24.
Plan-B-Video Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:36:27 +09:00
f32792e4a9 feat(video-render): scaffold + nas_client webhook adapter (SP-7)
Dockerfile (python:3.12-slim), requirements (openai + google-cloud-storage + httpx + redis).
.env.example: OPENAI/GOOGLE/PIAPI/SEEDANCE keys + VIDEO_MEDIA_ROOT.
nas_client.webhook_update_task: call-time os.getenv (테스트 격리), respx mock 5 tests.
Plan-B-Video Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:35:20 +09:00
15 changed files with 917 additions and 0 deletions

View File

@@ -49,3 +49,33 @@ services:
interval: 60s
timeout: 5s
retries: 3
video-render:
build:
context: ./video-render
container_name: video-render
restart: unless-stopped
ports:
- "18712:8000"
environment:
- TZ=Asia/Seoul
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18801}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- GOOGLE_PROJECT_ID=${GOOGLE_PROJECT_ID:-}
- GOOGLE_LOCATION=${GOOGLE_LOCATION:-us-central1}
- GOOGLE_GCS_BUCKET=${GOOGLE_GCS_BUCKET:-}
- GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
- PIAPI_API_KEY=${PIAPI_API_KEY:-}
- SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-}
- VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video}
- VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video}
volumes:
- /mnt/nas/webpage/data/video:/mnt/nas/webpage/data/video
- ${GCP_SA_JSON_HOST_PATH:-/etc/webai/gcp-sa.json}:/app/keys/gcp-sa.json:ro
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3

View File

@@ -0,0 +1,29 @@
# Plan-B-Video — Windows video-render worker
# NAS Redis 큐
REDIS_URL=redis://192.168.45.54:6379
# NAS internal webhook (video-lab port 18801)
NAS_BASE_URL=http://192.168.45.54:18801
INTERNAL_API_KEY=__copy_from_nas_dotenv__
# Sora 2 (OpenAI)
OPENAI_API_KEY=__paste_openai_key__
# Veo 3.1 (Google Vertex AI)
GOOGLE_PROJECT_ID=__paste_gcp_project_id__
GOOGLE_LOCATION=us-central1
GOOGLE_GCS_BUCKET=__paste_gcs_bucket_name__
GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
# Kling (PiAPI gateway)
PIAPI_API_KEY=__paste_piapi_key__
# Seedance 2.0 (BytePlus)
SEEDANCE_API_KEY=__paste_seedance_key__
# NAS SMB mount 안의 video 디렉토리
VIDEO_MEDIA_ROOT=/mnt/nas/webpage/data/video
# nginx 서빙 prefix (NAS webhook payload용)
VIDEO_MEDIA_URL_PREFIX=/media/video

View File

@@ -0,0 +1,16 @@
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

View File

@@ -0,0 +1,36 @@
"""video-render FastAPI entry — health + lifespan (worker loop spawn)."""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
import worker
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
worker_task = asyncio.create_task(worker.worker_loop())
logger.info("video-render lifespan 시작")
try:
yield
finally:
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
logger.info("video-render lifespan 종료")
app = FastAPI(lifespan=lifespan)
@app.get("/health")
def health():
return {"ok": True, "service": "video-render"}

View File

@@ -0,0 +1,54 @@
"""NAS webhook 어댑터 — Windows worker가 NAS DB 직접 접근 못하므로 HTTP로 위임.
Plan-B-Music nas_client와 동일 패턴 (call-time os.getenv으로 테스트 격리).
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, Optional
import httpx
logger = logging.getLogger(__name__)
_TIMEOUT = 10.0
def _post(payload: Dict[str, Any]) -> None:
nas_base_url = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18801")
internal_api_key = os.getenv("INTERNAL_API_KEY", "")
url = f"{nas_base_url}/api/internal/video/update"
try:
r = httpx.post(
url,
headers={"X-Internal-Key": internal_api_key},
json=payload,
timeout=_TIMEOUT,
)
if r.status_code != 200:
logger.error("webhook %s returned %d: %s",
payload.get("task_id"), r.status_code, r.text[:200])
except Exception:
logger.exception("webhook %s 호출 실패", payload.get("task_id"))
def webhook_update_task(
task_id: str,
status: str,
progress: int,
message: str = "",
video_url: Optional[str] = None,
error: Optional[str] = None,
) -> None:
payload: Dict[str, Any] = {
"task_id": task_id,
"status": status,
"progress": progress,
"message": message,
}
if video_url is not None:
payload["video_url"] = video_url
if error is not None:
payload["error"] = error
_post(payload)

View File

@@ -0,0 +1,133 @@
"""Kling AI video generation — PiAPI gateway 경유.
POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드.
"""
from __future__ import annotations
import logging
import os
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
PIAPI_BASE_URL = "https://api.piapi.ai/api/v1"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 10 # Kling은 30~180초
POLL_MAX_ATTEMPTS = 60 # 최대 10분
DEFAULT_VERSION = "2.6"
def _headers() -> dict:
api_key = os.getenv("PIAPI_API_KEY", "")
return {
"x-api-key": api_key,
"Content-Type": "application/json",
}
def run_kling_generation(task_id: str, params: dict) -> None:
"""Kling으로 영상 생성 → mp4 → NAS SMB → webhook."""
try:
if not os.getenv("PIAPI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정")
return
webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...")
input_obj = {
"prompt": params["prompt"][:2500],
"duration": params.get("duration", 5),
"aspect_ratio": params.get("aspect_ratio", "16:9"),
"mode": params.get("mode", "std"),
"version": params.get("model") or DEFAULT_VERSION,
}
if params.get("negative_prompt"):
input_obj["negative_prompt"] = params["negative_prompt"][:2500]
if params.get("cfg_scale") is not None:
input_obj["cfg_scale"] = str(params["cfg_scale"])
if params.get("image_url"):
input_obj["image_url"] = params["image_url"]
body = {
"model": "kling",
"task_type": "video_generation",
"input": input_obj,
"config": {"service_mode": "public"},
}
resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}")
return
body_json = resp.json()
if body_json.get("code") != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}")
return
piapi_task_id = (body_json.get("data") or {}).get("task_id", "")
if not piapi_task_id:
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음")
return
webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨")
# 폴링 — GET /task/{id}
video_url = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}",
headers=_headers(), timeout=30)
if fetch.status_code != 200:
continue
fd = fetch.json()
data = fd.get("data", {})
status = data.get("status", "")
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})")
if status == "Completed":
video_url = (data.get("output") or {}).get("video_url", "")
break
elif status in ("Failed", "failed"):
err = (data.get("error") or {}).get("message", "Kling 작업 실패")
webhook_update_task(task_id, "failed", 0, "", error=err)
return
# Pending/Processing/Staged → 계속 폴링
else:
webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)")
return
if not video_url:
webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음")
return
webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
dl = requests.get(video_url, stream=True, timeout=300)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃")
except Exception as e:
logger.exception("Kling generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -0,0 +1,121 @@
"""Seedance 2.0 video generation — ByteDance Volcano Engine (BytePlus 국제 endpoint).
POST https://api.byteplus.com/seedance/v1/videos → GET /videos/{id} 폴링 → output.video_url 다운로드.
"""
from __future__ import annotations
import logging
import os
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
SEEDANCE_BASE_URL = "https://api.byteplus.com/seedance/v1"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 8 # Seedance는 30~120초
POLL_MAX_ATTEMPTS = 60
DEFAULT_MODEL = "seedance-2.0"
def _headers() -> dict:
api_key = os.getenv("SEEDANCE_API_KEY", "")
return {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
def run_seedance_generation(task_id: str, params: dict) -> None:
"""Seedance로 영상 생성 → mp4 → NAS SMB → webhook."""
try:
if not os.getenv("SEEDANCE_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="SEEDANCE_API_KEY 미설정")
return
webhook_update_task(task_id, "processing", 5, "Seedance API 호출 중...")
body = {
"model": params.get("model") or DEFAULT_MODEL,
"prompt": params["prompt"][:2000],
"resolution": params.get("resolution", "1080p"),
"duration": params.get("duration", 5),
"aspect_ratio": params.get("aspect_ratio", "16:9"),
}
if params.get("negative_prompt"):
body["negative_prompt"] = params["negative_prompt"]
if params.get("image_url"):
body["references"] = [{"type": "image", "data": params["image_url"], "role": "subject"}]
if params.get("audio") is not None:
body["audio"] = bool(params["audio"])
if params.get("seed") is not None:
body["seed"] = int(params["seed"])
resp = requests.post(f"{SEEDANCE_BASE_URL}/videos", headers=_headers(), json=body, timeout=30)
if resp.status_code not in (200, 201):
webhook_update_task(task_id, "failed", 0, "",
error=f"Seedance API 오류: {resp.status_code} {resp.text[:300]}")
return
body_json = resp.json()
job_id = body_json.get("id", "")
if not job_id:
webhook_update_task(task_id, "failed", 0, "", error="Seedance 응답에 id 없음")
return
webhook_update_task(task_id, "processing", 15, "Seedance 작업 등록됨")
# 폴링
video_url = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.get(f"{SEEDANCE_BASE_URL}/videos/{job_id}",
headers=_headers(), timeout=30)
if fetch.status_code != 200:
continue
fd = fetch.json()
status = fd.get("status", "")
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, f"Seedance 생성 중... ({status})")
if status == "completed":
video_url = (fd.get("output") or {}).get("video_url", "")
break
elif status == "failed":
err = fd.get("error") or "Seedance 작업 실패"
webhook_update_task(task_id, "failed", 0, "", error=str(err)[:300])
return
else:
webhook_update_task(task_id, "failed", 0, "", error="Seedance 폴링 timeout (10분)")
return
if not video_url:
webhook_update_task(task_id, "failed", 0, "", error="Seedance 완료했으나 video_url 없음")
return
webhook_update_task(task_id, "processing", 85, "Seedance 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
dl = requests.get(video_url, stream=True, timeout=300)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Seedance 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Seedance API 타임아웃")
except Exception as e:
logger.exception("Seedance generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -0,0 +1,119 @@
"""Sora 2 video generation — OpenAI Videos API.
POST /v1/videos → poll GET /v1/videos/{id} → GET /v1/videos/{id}/content download.
⚠️ Deprecated, shutdown 2026-09-24. Spec 진행은 박재오 결정 따름.
"""
from __future__ import annotations
import logging
import os
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
SORA_BASE_URL = "https://api.openai.com/v1"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 15 # OpenAI 권장: 10~20초
POLL_MAX_ATTEMPTS = 40 # 최대 ~10분
DEFAULT_MODEL = "sora-2"
def _headers() -> dict:
api_key = os.getenv("OPENAI_API_KEY", "")
return {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
def run_sora_generation(task_id: str, params: dict) -> None:
"""Sora 2로 영상 생성 → mp4 → NAS SMB 저장 → webhook."""
try:
if not os.getenv("OPENAI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)")
return
webhook_update_task(task_id, "processing", 5, "Sora API 호출 중...")
payload = {
"model": params.get("model") or DEFAULT_MODEL,
"prompt": params["prompt"][:5000],
}
if params.get("duration"):
payload["seconds"] = params["duration"]
if params.get("size"):
payload["size"] = params["size"]
elif params.get("aspect_ratio") == "9:16":
payload["size"] = "1080x1920"
elif params.get("aspect_ratio") == "16:9":
payload["size"] = "1920x1080"
resp = requests.post(f"{SORA_BASE_URL}/videos", headers=_headers(), json=payload, timeout=30)
if resp.status_code not in (200, 201):
webhook_update_task(task_id, "failed", 0, "", error=f"Sora API 오류: {resp.status_code} {resp.text[:300]}")
return
body = resp.json()
video_id = body.get("id", "")
if not video_id:
webhook_update_task(task_id, "failed", 0, "", error="Sora 응답에 video id 없음")
return
webhook_update_task(task_id, "processing", 15, f"Sora 작업 생성됨 (id={video_id[:16]})")
# 폴링
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
sr = requests.get(f"{SORA_BASE_URL}/videos/{video_id}", headers=_headers(), timeout=30)
if sr.status_code != 200:
continue
sd = sr.json()
status = sd.get("status", "")
progress = sd.get("progress", 0)
scaled = min(15 + int(progress * 0.65), 79)
webhook_update_task(task_id, "processing", scaled, f"Sora 생성 중... {progress}%")
if status == "completed":
break
elif status == "failed":
err = sd.get("error", {}).get("message", "Sora 작업 실패")
webhook_update_task(task_id, "failed", 0, "", error=err)
return
else:
webhook_update_task(task_id, "failed", 0, "", error="Sora 폴링 timeout (10분)")
return
# 다운로드
webhook_update_task(task_id, "processing", 80, "Sora 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
dl = requests.get(
f"{SORA_BASE_URL}/videos/{video_id}/content",
headers=_headers(),
params={"variant": "video"},
stream=True,
timeout=300,
)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Sora 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Sora API 타임아웃")
except Exception as e:
logger.exception("Sora generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -0,0 +1,176 @@
"""Veo 3.1 video generation — Google Vertex AI.
POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 →
결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB.
"""
from __future__ import annotations
import logging
import os
import subprocess
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 12 # Veo는 30~120초 소요
POLL_MAX_ATTEMPTS = 50 # 최대 ~10분
DEFAULT_MODEL = "veo-3.1-fast-generate-001"
def _gcloud_access_token() -> Optional[str]:
"""GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행.
google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용.
REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행.
"""
try:
from google.auth import default as google_default_auth
from google.auth.transport.requests import Request as GoogleAuthRequest
credentials, _ = google_default_auth(
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
credentials.refresh(GoogleAuthRequest())
return credentials.token
except Exception:
logger.exception("Google credentials refresh 실패")
return None
def _download_gcs(gcs_uri: str, local_path: str) -> bool:
"""gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환."""
try:
from google.cloud import storage as gcs_storage
if not gcs_uri.startswith("gs://"):
return False
without_scheme = gcs_uri[len("gs://"):]
bucket_name, blob_path = without_scheme.split("/", 1)
client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID"))
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
blob.download_to_filename(local_path)
return True
except Exception:
logger.exception("GCS 다운로드 실패: %s", gcs_uri)
return False
def run_veo_generation(task_id: str, params: dict) -> None:
"""Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook."""
try:
project_id = os.getenv("GOOGLE_PROJECT_ID", "")
location = os.getenv("GOOGLE_LOCATION", "us-central1")
gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "")
if not project_id or not gcs_bucket:
webhook_update_task(task_id, "failed", 0, "",
error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정")
return
token = _gcloud_access_token()
if not token:
webhook_update_task(task_id, "failed", 0, "",
error="Google access token 발행 실패 (서비스 계정 JSON 확인)")
return
webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...")
model_id = params.get("model") or DEFAULT_MODEL
endpoint_base = (
f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}"
f"/locations/{location}/publishers/google/models/{model_id}"
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
body = {
"instances": [{"prompt": params["prompt"]}],
"parameters": {
"storageUri": f"gs://{gcs_bucket}/veo/{task_id}/",
"sampleCount": 1,
"aspectRatio": params.get("aspect_ratio") or "16:9",
},
}
if params.get("duration"):
body["parameters"]["duration"] = params["duration"]
if params.get("negative_prompt"):
body["parameters"]["negativePrompt"] = params["negative_prompt"]
resp = requests.post(f"{endpoint_base}:predictLongRunning",
headers=headers, json=body, timeout=30)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}")
return
op_name = resp.json().get("name", "")
if not op_name:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음")
return
webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨")
# 폴링 — fetchPredictOperation
gcs_uri = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.post(
f"{endpoint_base}:fetchPredictOperation",
headers=headers,
json={"operationName": op_name},
timeout=30,
)
if fetch.status_code != 200:
continue
fd = fetch.json()
done = fd.get("done", False)
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...")
if done:
if "error" in fd:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo 작업 실패: {fd['error'].get('message','?')}")
return
videos = (fd.get("response") or {}).get("videos") or []
if not videos:
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음")
return
gcs_uri = videos[0].get("gcsUri", "")
break
else:
webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)")
return
if not gcs_uri:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음")
return
webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
ok = _download_gcs(gcs_uri, file_path)
if not ok:
webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}")
return
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃")
except Exception as e:
logger.exception("Veo generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -0,0 +1,10 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
requests==2.32.3
redis>=5.0
httpx>=0.27
openai>=1.50.0
google-cloud-storage>=2.18.0
pytest>=8.0
pytest-asyncio>=0.24
respx>=0.21

View File

View File

@@ -0,0 +1,70 @@
"""nas_client — webhook adapter for video-render."""
import pytest
import respx
import httpx
from nas_client import webhook_update_task
@pytest.fixture(autouse=True)
def _env(monkeypatch):
monkeypatch.setenv("NAS_BASE_URL", "http://nas-test:18801")
monkeypatch.setenv("INTERNAL_API_KEY", "test-key")
@respx.mock
def test_webhook_update_task_sends_x_internal_key():
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(200, json={"ok": True})
)
webhook_update_task("task-1", "processing", 30, message="downloading")
assert route.called
req = route.calls[0].request
assert req.headers["X-Internal-Key"] == "test-key"
import json
body = json.loads(req.content)
assert body["task_id"] == "task-1"
assert body["status"] == "processing"
assert body["progress"] == 30
@respx.mock
def test_webhook_update_task_with_video_url():
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(200, json={"ok": True})
)
webhook_update_task("task-2", "succeeded", 100, message="완료",
video_url="/media/video/task-2.mp4")
import json
payload = json.loads(route.calls[0].request.content)
assert payload["video_url"] == "/media/video/task-2.mp4"
assert payload["status"] == "succeeded"
@respx.mock
def test_webhook_update_task_with_error():
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(200, json={"ok": True})
)
webhook_update_task("task-3", "failed", 0, error="Sora API rate limit")
import json
payload = json.loads(route.calls[0].request.content)
assert payload["error"] == "Sora API rate limit"
@respx.mock
def test_webhook_swallows_network_error(caplog):
respx.post("http://nas-test:18801/api/internal/video/update").mock(
side_effect=httpx.ConnectError("no host")
)
webhook_update_task("task-5", "processing", 10)
assert "task-5" in caplog.text
@respx.mock
def test_webhook_swallows_non_200(caplog):
respx.post("http://nas-test:18801/api/internal/video/update").mock(
return_value=httpx.Response(500, text="server error")
)
webhook_update_task("task-6", "processing", 50)
assert "task-6" in caplog.text

View File

@@ -0,0 +1,43 @@
"""worker.py — job_type 디스패처 (4 provider)."""
import pytest
from unittest.mock import patch
import worker
def test_dispatch_sora_calls_run_sora_generation():
payload = {"task_id": "t1", "job_type": "sora_generation", "params": {"prompt": "x"}}
with patch("worker.run_sora_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t1", {"prompt": "x"})
def test_dispatch_veo_calls_run_veo_generation():
payload = {"task_id": "t2", "job_type": "veo_generation", "params": {"prompt": "x"}}
with patch("worker.run_veo_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t2", {"prompt": "x"})
def test_dispatch_kling_calls_run_kling_generation():
payload = {"task_id": "t3", "job_type": "kling_generation", "params": {"prompt": "x"}}
with patch("worker.run_kling_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t3", {"prompt": "x"})
def test_dispatch_seedance_calls_run_seedance_generation():
payload = {"task_id": "t4", "job_type": "seedance_generation", "params": {"prompt": "x"}}
with patch("worker.run_seedance_generation") as m:
worker._dispatch(payload)
m.assert_called_once_with("t4", {"prompt": "x"})
def test_dispatch_unknown_job_type_logs_error():
payload = {"task_id": "t5", "job_type": "weird_type", "params": {}}
with patch("worker.webhook_update_task") as m:
worker._dispatch(payload)
m.assert_called_once()
args = m.call_args[0]
assert args[0] == "t5"
assert args[1] == "failed"

View File

@@ -0,0 +1,80 @@
"""Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook.
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
import redis.asyncio as aioredis
from nas_client import webhook_update_task
from providers.sora import run_sora_generation
from providers.veo import run_veo_generation
from providers.kling import run_kling_generation
from providers.seedance import run_seedance_generation
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:video-render"
PAUSED_KEY = "queue:paused"
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
_DISPATCH_TABLE = {
"sora_generation": "run_sora_generation",
"veo_generation": "run_veo_generation",
"kling_generation": "run_kling_generation",
"seedance_generation": "run_seedance_generation",
}
def _dispatch(payload: dict) -> None:
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(sys.modules[__name__], fn_name)
except AttributeError:
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
logger.info("video-render worker started (queue=%s)", QUEUE_KEY)
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
item = await redis.blpop(QUEUE_KEY, timeout=1)
if item is None:
continue
_, raw = item
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid queue payload: %r", raw[:200])
continue
await asyncio.to_thread(_dispatch, payload)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)