Compare commits
7 Commits
f152545d3b
...
4db0551d33
| Author | SHA1 | Date | |
|---|---|---|---|
| 4db0551d33 | |||
| 4d837fdd31 | |||
| 2567a6f10b | |||
| 17ed1943f1 | |||
| 8d246b5b32 | |||
| b4bec9d51b | |||
| f32792e4a9 |
@@ -49,3 +49,33 @@ services:
|
|||||||
interval: 60s
|
interval: 60s
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 3
|
retries: 3
|
||||||
|
|
||||||
|
video-render:
|
||||||
|
build:
|
||||||
|
context: ./video-render
|
||||||
|
container_name: video-render
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- "18712:8000"
|
||||||
|
environment:
|
||||||
|
- TZ=Asia/Seoul
|
||||||
|
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
|
||||||
|
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18801}
|
||||||
|
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
|
||||||
|
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
|
||||||
|
- GOOGLE_PROJECT_ID=${GOOGLE_PROJECT_ID:-}
|
||||||
|
- GOOGLE_LOCATION=${GOOGLE_LOCATION:-us-central1}
|
||||||
|
- GOOGLE_GCS_BUCKET=${GOOGLE_GCS_BUCKET:-}
|
||||||
|
- GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
|
||||||
|
- PIAPI_API_KEY=${PIAPI_API_KEY:-}
|
||||||
|
- SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-}
|
||||||
|
- VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video}
|
||||||
|
- VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video}
|
||||||
|
volumes:
|
||||||
|
- /mnt/nas/webpage/data/video:/mnt/nas/webpage/data/video
|
||||||
|
- ${GCP_SA_JSON_HOST_PATH:-/etc/webai/gcp-sa.json}:/app/keys/gcp-sa.json:ro
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
|
||||||
|
interval: 60s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 3
|
||||||
|
|||||||
29
services/video-render/.env.example
Normal file
29
services/video-render/.env.example
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
# Plan-B-Video — Windows video-render worker
|
||||||
|
|
||||||
|
# NAS Redis 큐
|
||||||
|
REDIS_URL=redis://192.168.45.54:6379
|
||||||
|
|
||||||
|
# NAS internal webhook (video-lab port 18801)
|
||||||
|
NAS_BASE_URL=http://192.168.45.54:18801
|
||||||
|
INTERNAL_API_KEY=__copy_from_nas_dotenv__
|
||||||
|
|
||||||
|
# Sora 2 (OpenAI)
|
||||||
|
OPENAI_API_KEY=__paste_openai_key__
|
||||||
|
|
||||||
|
# Veo 3.1 (Google Vertex AI)
|
||||||
|
GOOGLE_PROJECT_ID=__paste_gcp_project_id__
|
||||||
|
GOOGLE_LOCATION=us-central1
|
||||||
|
GOOGLE_GCS_BUCKET=__paste_gcs_bucket_name__
|
||||||
|
GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
|
||||||
|
|
||||||
|
# Kling (PiAPI gateway)
|
||||||
|
PIAPI_API_KEY=__paste_piapi_key__
|
||||||
|
|
||||||
|
# Seedance 2.0 (BytePlus)
|
||||||
|
SEEDANCE_API_KEY=__paste_seedance_key__
|
||||||
|
|
||||||
|
# NAS SMB mount 안의 video 디렉토리
|
||||||
|
VIDEO_MEDIA_ROOT=/mnt/nas/webpage/data/video
|
||||||
|
|
||||||
|
# nginx 서빙 prefix (NAS webhook payload용)
|
||||||
|
VIDEO_MEDIA_URL_PREFIX=/media/video
|
||||||
16
services/video-render/Dockerfile
Normal file
16
services/video-render/Dockerfile
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
FROM python:3.12-slim-bookworm
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
ca-certificates \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
EXPOSE 8000
|
||||||
|
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
||||||
36
services/video-render/main.py
Normal file
36
services/video-render/main.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
"""video-render FastAPI entry — health + lifespan (worker loop spawn)."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
import worker
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
worker_task = asyncio.create_task(worker.worker_loop())
|
||||||
|
logger.info("video-render lifespan 시작")
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
worker_task.cancel()
|
||||||
|
try:
|
||||||
|
await worker_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
logger.info("video-render lifespan 종료")
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI(lifespan=lifespan)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
def health():
|
||||||
|
return {"ok": True, "service": "video-render"}
|
||||||
54
services/video-render/nas_client.py
Normal file
54
services/video-render/nas_client.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
"""NAS webhook 어댑터 — Windows worker가 NAS DB 직접 접근 못하므로 HTTP로 위임.
|
||||||
|
|
||||||
|
Plan-B-Music nas_client와 동일 패턴 (call-time os.getenv으로 테스트 격리).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_TIMEOUT = 10.0
|
||||||
|
|
||||||
|
|
||||||
|
def _post(payload: Dict[str, Any]) -> None:
|
||||||
|
nas_base_url = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18801")
|
||||||
|
internal_api_key = os.getenv("INTERNAL_API_KEY", "")
|
||||||
|
url = f"{nas_base_url}/api/internal/video/update"
|
||||||
|
try:
|
||||||
|
r = httpx.post(
|
||||||
|
url,
|
||||||
|
headers={"X-Internal-Key": internal_api_key},
|
||||||
|
json=payload,
|
||||||
|
timeout=_TIMEOUT,
|
||||||
|
)
|
||||||
|
if r.status_code != 200:
|
||||||
|
logger.error("webhook %s returned %d: %s",
|
||||||
|
payload.get("task_id"), r.status_code, r.text[:200])
|
||||||
|
except Exception:
|
||||||
|
logger.exception("webhook %s 호출 실패", payload.get("task_id"))
|
||||||
|
|
||||||
|
|
||||||
|
def webhook_update_task(
|
||||||
|
task_id: str,
|
||||||
|
status: str,
|
||||||
|
progress: int,
|
||||||
|
message: str = "",
|
||||||
|
video_url: Optional[str] = None,
|
||||||
|
error: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
payload: Dict[str, Any] = {
|
||||||
|
"task_id": task_id,
|
||||||
|
"status": status,
|
||||||
|
"progress": progress,
|
||||||
|
"message": message,
|
||||||
|
}
|
||||||
|
if video_url is not None:
|
||||||
|
payload["video_url"] = video_url
|
||||||
|
if error is not None:
|
||||||
|
payload["error"] = error
|
||||||
|
_post(payload)
|
||||||
0
services/video-render/providers/__init__.py
Normal file
0
services/video-render/providers/__init__.py
Normal file
133
services/video-render/providers/kling.py
Normal file
133
services/video-render/providers/kling.py
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
"""Kling AI video generation — PiAPI gateway 경유.
|
||||||
|
|
||||||
|
POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
PIAPI_BASE_URL = "https://api.piapi.ai/api/v1"
|
||||||
|
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
|
||||||
|
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
|
||||||
|
|
||||||
|
POLL_INTERVAL = 10 # Kling은 30~180초
|
||||||
|
POLL_MAX_ATTEMPTS = 60 # 최대 10분
|
||||||
|
|
||||||
|
DEFAULT_VERSION = "2.6"
|
||||||
|
|
||||||
|
|
||||||
|
def _headers() -> dict:
|
||||||
|
api_key = os.getenv("PIAPI_API_KEY", "")
|
||||||
|
return {
|
||||||
|
"x-api-key": api_key,
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def run_kling_generation(task_id: str, params: dict) -> None:
|
||||||
|
"""Kling으로 영상 생성 → mp4 → NAS SMB → webhook."""
|
||||||
|
try:
|
||||||
|
if not os.getenv("PIAPI_API_KEY"):
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...")
|
||||||
|
|
||||||
|
input_obj = {
|
||||||
|
"prompt": params["prompt"][:2500],
|
||||||
|
"duration": params.get("duration", 5),
|
||||||
|
"aspect_ratio": params.get("aspect_ratio", "16:9"),
|
||||||
|
"mode": params.get("mode", "std"),
|
||||||
|
"version": params.get("model") or DEFAULT_VERSION,
|
||||||
|
}
|
||||||
|
if params.get("negative_prompt"):
|
||||||
|
input_obj["negative_prompt"] = params["negative_prompt"][:2500]
|
||||||
|
if params.get("cfg_scale") is not None:
|
||||||
|
input_obj["cfg_scale"] = str(params["cfg_scale"])
|
||||||
|
if params.get("image_url"):
|
||||||
|
input_obj["image_url"] = params["image_url"]
|
||||||
|
|
||||||
|
body = {
|
||||||
|
"model": "kling",
|
||||||
|
"task_type": "video_generation",
|
||||||
|
"input": input_obj,
|
||||||
|
"config": {"service_mode": "public"},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "",
|
||||||
|
error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}")
|
||||||
|
return
|
||||||
|
|
||||||
|
body_json = resp.json()
|
||||||
|
if body_json.get("code") != 200:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "",
|
||||||
|
error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}")
|
||||||
|
return
|
||||||
|
|
||||||
|
piapi_task_id = (body_json.get("data") or {}).get("task_id", "")
|
||||||
|
if not piapi_task_id:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨")
|
||||||
|
|
||||||
|
# 폴링 — GET /task/{id}
|
||||||
|
video_url = None
|
||||||
|
for attempt in range(POLL_MAX_ATTEMPTS):
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}",
|
||||||
|
headers=_headers(), timeout=30)
|
||||||
|
if fetch.status_code != 200:
|
||||||
|
continue
|
||||||
|
fd = fetch.json()
|
||||||
|
data = fd.get("data", {})
|
||||||
|
status = data.get("status", "")
|
||||||
|
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
|
||||||
|
webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})")
|
||||||
|
|
||||||
|
if status == "Completed":
|
||||||
|
video_url = (data.get("output") or {}).get("video_url", "")
|
||||||
|
break
|
||||||
|
elif status in ("Failed", "failed"):
|
||||||
|
err = (data.get("error") or {}).get("message", "Kling 작업 실패")
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=err)
|
||||||
|
return
|
||||||
|
# Pending/Processing/Staged → 계속 폴링
|
||||||
|
else:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not video_url:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...")
|
||||||
|
filename = f"{task_id}.mp4"
|
||||||
|
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
|
||||||
|
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
|
||||||
|
|
||||||
|
dl = requests.get(video_url, stream=True, timeout=300)
|
||||||
|
dl.raise_for_status()
|
||||||
|
with open(file_path, "wb") as f:
|
||||||
|
for chunk in dl.iter_content(chunk_size=8192):
|
||||||
|
f.write(chunk)
|
||||||
|
|
||||||
|
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
|
||||||
|
webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url)
|
||||||
|
|
||||||
|
except requests.Timeout:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("Kling generation error task=%s", task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(e))
|
||||||
121
services/video-render/providers/seedance.py
Normal file
121
services/video-render/providers/seedance.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
"""Seedance 2.0 video generation — ByteDance Volcano Engine (BytePlus 국제 endpoint).
|
||||||
|
|
||||||
|
POST https://api.byteplus.com/seedance/v1/videos → GET /videos/{id} 폴링 → output.video_url 다운로드.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
SEEDANCE_BASE_URL = "https://api.byteplus.com/seedance/v1"
|
||||||
|
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
|
||||||
|
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
|
||||||
|
|
||||||
|
POLL_INTERVAL = 8 # Seedance는 30~120초
|
||||||
|
POLL_MAX_ATTEMPTS = 60
|
||||||
|
|
||||||
|
DEFAULT_MODEL = "seedance-2.0"
|
||||||
|
|
||||||
|
|
||||||
|
def _headers() -> dict:
|
||||||
|
api_key = os.getenv("SEEDANCE_API_KEY", "")
|
||||||
|
return {
|
||||||
|
"Authorization": f"Bearer {api_key}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def run_seedance_generation(task_id: str, params: dict) -> None:
|
||||||
|
"""Seedance로 영상 생성 → mp4 → NAS SMB → webhook."""
|
||||||
|
try:
|
||||||
|
if not os.getenv("SEEDANCE_API_KEY"):
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="SEEDANCE_API_KEY 미설정")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 5, "Seedance API 호출 중...")
|
||||||
|
|
||||||
|
body = {
|
||||||
|
"model": params.get("model") or DEFAULT_MODEL,
|
||||||
|
"prompt": params["prompt"][:2000],
|
||||||
|
"resolution": params.get("resolution", "1080p"),
|
||||||
|
"duration": params.get("duration", 5),
|
||||||
|
"aspect_ratio": params.get("aspect_ratio", "16:9"),
|
||||||
|
}
|
||||||
|
if params.get("negative_prompt"):
|
||||||
|
body["negative_prompt"] = params["negative_prompt"]
|
||||||
|
if params.get("image_url"):
|
||||||
|
body["references"] = [{"type": "image", "data": params["image_url"], "role": "subject"}]
|
||||||
|
if params.get("audio") is not None:
|
||||||
|
body["audio"] = bool(params["audio"])
|
||||||
|
if params.get("seed") is not None:
|
||||||
|
body["seed"] = int(params["seed"])
|
||||||
|
|
||||||
|
resp = requests.post(f"{SEEDANCE_BASE_URL}/videos", headers=_headers(), json=body, timeout=30)
|
||||||
|
if resp.status_code not in (200, 201):
|
||||||
|
webhook_update_task(task_id, "failed", 0, "",
|
||||||
|
error=f"Seedance API 오류: {resp.status_code} {resp.text[:300]}")
|
||||||
|
return
|
||||||
|
|
||||||
|
body_json = resp.json()
|
||||||
|
job_id = body_json.get("id", "")
|
||||||
|
if not job_id:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Seedance 응답에 id 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 15, "Seedance 작업 등록됨")
|
||||||
|
|
||||||
|
# 폴링
|
||||||
|
video_url = None
|
||||||
|
for attempt in range(POLL_MAX_ATTEMPTS):
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
fetch = requests.get(f"{SEEDANCE_BASE_URL}/videos/{job_id}",
|
||||||
|
headers=_headers(), timeout=30)
|
||||||
|
if fetch.status_code != 200:
|
||||||
|
continue
|
||||||
|
fd = fetch.json()
|
||||||
|
status = fd.get("status", "")
|
||||||
|
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
|
||||||
|
webhook_update_task(task_id, "processing", scaled, f"Seedance 생성 중... ({status})")
|
||||||
|
|
||||||
|
if status == "completed":
|
||||||
|
video_url = (fd.get("output") or {}).get("video_url", "")
|
||||||
|
break
|
||||||
|
elif status == "failed":
|
||||||
|
err = fd.get("error") or "Seedance 작업 실패"
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(err)[:300])
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Seedance 폴링 timeout (10분)")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not video_url:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Seedance 완료했으나 video_url 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 85, "Seedance 결과 다운로드 중...")
|
||||||
|
filename = f"{task_id}.mp4"
|
||||||
|
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
|
||||||
|
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
|
||||||
|
|
||||||
|
dl = requests.get(video_url, stream=True, timeout=300)
|
||||||
|
dl.raise_for_status()
|
||||||
|
with open(file_path, "wb") as f:
|
||||||
|
for chunk in dl.iter_content(chunk_size=8192):
|
||||||
|
f.write(chunk)
|
||||||
|
|
||||||
|
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
|
||||||
|
webhook_update_task(task_id, "succeeded", 100, "Seedance 생성 완료", video_url=local_url)
|
||||||
|
|
||||||
|
except requests.Timeout:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Seedance API 타임아웃")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("Seedance generation error task=%s", task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(e))
|
||||||
119
services/video-render/providers/sora.py
Normal file
119
services/video-render/providers/sora.py
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
"""Sora 2 video generation — OpenAI Videos API.
|
||||||
|
|
||||||
|
POST /v1/videos → poll GET /v1/videos/{id} → GET /v1/videos/{id}/content download.
|
||||||
|
⚠️ Deprecated, shutdown 2026-09-24. Spec 진행은 박재오 결정 따름.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
SORA_BASE_URL = "https://api.openai.com/v1"
|
||||||
|
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
|
||||||
|
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
|
||||||
|
|
||||||
|
POLL_INTERVAL = 15 # OpenAI 권장: 10~20초
|
||||||
|
POLL_MAX_ATTEMPTS = 40 # 최대 ~10분
|
||||||
|
|
||||||
|
DEFAULT_MODEL = "sora-2"
|
||||||
|
|
||||||
|
|
||||||
|
def _headers() -> dict:
|
||||||
|
api_key = os.getenv("OPENAI_API_KEY", "")
|
||||||
|
return {
|
||||||
|
"Authorization": f"Bearer {api_key}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def run_sora_generation(task_id: str, params: dict) -> None:
|
||||||
|
"""Sora 2로 영상 생성 → mp4 → NAS SMB 저장 → webhook."""
|
||||||
|
try:
|
||||||
|
if not os.getenv("OPENAI_API_KEY"):
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 5, "Sora API 호출 중...")
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"model": params.get("model") or DEFAULT_MODEL,
|
||||||
|
"prompt": params["prompt"][:5000],
|
||||||
|
}
|
||||||
|
if params.get("duration"):
|
||||||
|
payload["seconds"] = params["duration"]
|
||||||
|
if params.get("size"):
|
||||||
|
payload["size"] = params["size"]
|
||||||
|
elif params.get("aspect_ratio") == "9:16":
|
||||||
|
payload["size"] = "1080x1920"
|
||||||
|
elif params.get("aspect_ratio") == "16:9":
|
||||||
|
payload["size"] = "1920x1080"
|
||||||
|
|
||||||
|
resp = requests.post(f"{SORA_BASE_URL}/videos", headers=_headers(), json=payload, timeout=30)
|
||||||
|
if resp.status_code not in (200, 201):
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"Sora API 오류: {resp.status_code} {resp.text[:300]}")
|
||||||
|
return
|
||||||
|
|
||||||
|
body = resp.json()
|
||||||
|
video_id = body.get("id", "")
|
||||||
|
if not video_id:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Sora 응답에 video id 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 15, f"Sora 작업 생성됨 (id={video_id[:16]})")
|
||||||
|
|
||||||
|
# 폴링
|
||||||
|
for attempt in range(POLL_MAX_ATTEMPTS):
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
sr = requests.get(f"{SORA_BASE_URL}/videos/{video_id}", headers=_headers(), timeout=30)
|
||||||
|
if sr.status_code != 200:
|
||||||
|
continue
|
||||||
|
sd = sr.json()
|
||||||
|
status = sd.get("status", "")
|
||||||
|
progress = sd.get("progress", 0)
|
||||||
|
scaled = min(15 + int(progress * 0.65), 79)
|
||||||
|
webhook_update_task(task_id, "processing", scaled, f"Sora 생성 중... {progress}%")
|
||||||
|
|
||||||
|
if status == "completed":
|
||||||
|
break
|
||||||
|
elif status == "failed":
|
||||||
|
err = sd.get("error", {}).get("message", "Sora 작업 실패")
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=err)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Sora 폴링 timeout (10분)")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 다운로드
|
||||||
|
webhook_update_task(task_id, "processing", 80, "Sora 결과 다운로드 중...")
|
||||||
|
filename = f"{task_id}.mp4"
|
||||||
|
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
|
||||||
|
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
|
||||||
|
|
||||||
|
dl = requests.get(
|
||||||
|
f"{SORA_BASE_URL}/videos/{video_id}/content",
|
||||||
|
headers=_headers(),
|
||||||
|
params={"variant": "video"},
|
||||||
|
stream=True,
|
||||||
|
timeout=300,
|
||||||
|
)
|
||||||
|
dl.raise_for_status()
|
||||||
|
with open(file_path, "wb") as f:
|
||||||
|
for chunk in dl.iter_content(chunk_size=8192):
|
||||||
|
f.write(chunk)
|
||||||
|
|
||||||
|
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
|
||||||
|
webhook_update_task(task_id, "succeeded", 100, "Sora 생성 완료", video_url=local_url)
|
||||||
|
|
||||||
|
except requests.Timeout:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Sora API 타임아웃")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("Sora generation error task=%s", task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(e))
|
||||||
176
services/video-render/providers/veo.py
Normal file
176
services/video-render/providers/veo.py
Normal file
@@ -0,0 +1,176 @@
|
|||||||
|
"""Veo 3.1 video generation — Google Vertex AI.
|
||||||
|
|
||||||
|
POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 →
|
||||||
|
결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
|
||||||
|
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
|
||||||
|
|
||||||
|
POLL_INTERVAL = 12 # Veo는 30~120초 소요
|
||||||
|
POLL_MAX_ATTEMPTS = 50 # 최대 ~10분
|
||||||
|
|
||||||
|
DEFAULT_MODEL = "veo-3.1-fast-generate-001"
|
||||||
|
|
||||||
|
|
||||||
|
def _gcloud_access_token() -> Optional[str]:
|
||||||
|
"""GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행.
|
||||||
|
|
||||||
|
google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용.
|
||||||
|
REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from google.auth import default as google_default_auth
|
||||||
|
from google.auth.transport.requests import Request as GoogleAuthRequest
|
||||||
|
|
||||||
|
credentials, _ = google_default_auth(
|
||||||
|
scopes=["https://www.googleapis.com/auth/cloud-platform"],
|
||||||
|
)
|
||||||
|
credentials.refresh(GoogleAuthRequest())
|
||||||
|
return credentials.token
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Google credentials refresh 실패")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _download_gcs(gcs_uri: str, local_path: str) -> bool:
|
||||||
|
"""gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환."""
|
||||||
|
try:
|
||||||
|
from google.cloud import storage as gcs_storage
|
||||||
|
if not gcs_uri.startswith("gs://"):
|
||||||
|
return False
|
||||||
|
without_scheme = gcs_uri[len("gs://"):]
|
||||||
|
bucket_name, blob_path = without_scheme.split("/", 1)
|
||||||
|
client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID"))
|
||||||
|
bucket = client.bucket(bucket_name)
|
||||||
|
blob = bucket.blob(blob_path)
|
||||||
|
blob.download_to_filename(local_path)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
logger.exception("GCS 다운로드 실패: %s", gcs_uri)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def run_veo_generation(task_id: str, params: dict) -> None:
|
||||||
|
"""Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook."""
|
||||||
|
try:
|
||||||
|
project_id = os.getenv("GOOGLE_PROJECT_ID", "")
|
||||||
|
location = os.getenv("GOOGLE_LOCATION", "us-central1")
|
||||||
|
gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "")
|
||||||
|
|
||||||
|
if not project_id or not gcs_bucket:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "",
|
||||||
|
error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정")
|
||||||
|
return
|
||||||
|
|
||||||
|
token = _gcloud_access_token()
|
||||||
|
if not token:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "",
|
||||||
|
error="Google access token 발행 실패 (서비스 계정 JSON 확인)")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...")
|
||||||
|
|
||||||
|
model_id = params.get("model") or DEFAULT_MODEL
|
||||||
|
endpoint_base = (
|
||||||
|
f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}"
|
||||||
|
f"/locations/{location}/publishers/google/models/{model_id}"
|
||||||
|
)
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"Bearer {token}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
body = {
|
||||||
|
"instances": [{"prompt": params["prompt"]}],
|
||||||
|
"parameters": {
|
||||||
|
"storageUri": f"gs://{gcs_bucket}/veo/{task_id}/",
|
||||||
|
"sampleCount": 1,
|
||||||
|
"aspectRatio": params.get("aspect_ratio") or "16:9",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if params.get("duration"):
|
||||||
|
body["parameters"]["duration"] = params["duration"]
|
||||||
|
if params.get("negative_prompt"):
|
||||||
|
body["parameters"]["negativePrompt"] = params["negative_prompt"]
|
||||||
|
|
||||||
|
resp = requests.post(f"{endpoint_base}:predictLongRunning",
|
||||||
|
headers=headers, json=body, timeout=30)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "",
|
||||||
|
error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}")
|
||||||
|
return
|
||||||
|
|
||||||
|
op_name = resp.json().get("name", "")
|
||||||
|
if not op_name:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨")
|
||||||
|
|
||||||
|
# 폴링 — fetchPredictOperation
|
||||||
|
gcs_uri = None
|
||||||
|
for attempt in range(POLL_MAX_ATTEMPTS):
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
fetch = requests.post(
|
||||||
|
f"{endpoint_base}:fetchPredictOperation",
|
||||||
|
headers=headers,
|
||||||
|
json={"operationName": op_name},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
if fetch.status_code != 200:
|
||||||
|
continue
|
||||||
|
fd = fetch.json()
|
||||||
|
done = fd.get("done", False)
|
||||||
|
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
|
||||||
|
webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...")
|
||||||
|
|
||||||
|
if done:
|
||||||
|
if "error" in fd:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "",
|
||||||
|
error=f"Veo 작업 실패: {fd['error'].get('message','?')}")
|
||||||
|
return
|
||||||
|
videos = (fd.get("response") or {}).get("videos") or []
|
||||||
|
if not videos:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음")
|
||||||
|
return
|
||||||
|
gcs_uri = videos[0].get("gcsUri", "")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not gcs_uri:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...")
|
||||||
|
filename = f"{task_id}.mp4"
|
||||||
|
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
|
||||||
|
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
|
||||||
|
|
||||||
|
ok = _download_gcs(gcs_uri, file_path)
|
||||||
|
if not ok:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}")
|
||||||
|
return
|
||||||
|
|
||||||
|
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
|
||||||
|
webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url)
|
||||||
|
|
||||||
|
except requests.Timeout:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("Veo generation error task=%s", task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(e))
|
||||||
10
services/video-render/requirements.txt
Normal file
10
services/video-render/requirements.txt
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
fastapi==0.115.6
|
||||||
|
uvicorn[standard]==0.34.0
|
||||||
|
requests==2.32.3
|
||||||
|
redis>=5.0
|
||||||
|
httpx>=0.27
|
||||||
|
openai>=1.50.0
|
||||||
|
google-cloud-storage>=2.18.0
|
||||||
|
pytest>=8.0
|
||||||
|
pytest-asyncio>=0.24
|
||||||
|
respx>=0.21
|
||||||
0
services/video-render/tests/__init__.py
Normal file
0
services/video-render/tests/__init__.py
Normal file
70
services/video-render/tests/test_nas_client.py
Normal file
70
services/video-render/tests/test_nas_client.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
"""nas_client — webhook adapter for video-render."""
|
||||||
|
import pytest
|
||||||
|
import respx
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _env(monkeypatch):
|
||||||
|
monkeypatch.setenv("NAS_BASE_URL", "http://nas-test:18801")
|
||||||
|
monkeypatch.setenv("INTERNAL_API_KEY", "test-key")
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
def test_webhook_update_task_sends_x_internal_key():
|
||||||
|
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
|
||||||
|
return_value=httpx.Response(200, json={"ok": True})
|
||||||
|
)
|
||||||
|
webhook_update_task("task-1", "processing", 30, message="downloading")
|
||||||
|
assert route.called
|
||||||
|
req = route.calls[0].request
|
||||||
|
assert req.headers["X-Internal-Key"] == "test-key"
|
||||||
|
import json
|
||||||
|
body = json.loads(req.content)
|
||||||
|
assert body["task_id"] == "task-1"
|
||||||
|
assert body["status"] == "processing"
|
||||||
|
assert body["progress"] == 30
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
def test_webhook_update_task_with_video_url():
|
||||||
|
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
|
||||||
|
return_value=httpx.Response(200, json={"ok": True})
|
||||||
|
)
|
||||||
|
webhook_update_task("task-2", "succeeded", 100, message="완료",
|
||||||
|
video_url="/media/video/task-2.mp4")
|
||||||
|
import json
|
||||||
|
payload = json.loads(route.calls[0].request.content)
|
||||||
|
assert payload["video_url"] == "/media/video/task-2.mp4"
|
||||||
|
assert payload["status"] == "succeeded"
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
def test_webhook_update_task_with_error():
|
||||||
|
route = respx.post("http://nas-test:18801/api/internal/video/update").mock(
|
||||||
|
return_value=httpx.Response(200, json={"ok": True})
|
||||||
|
)
|
||||||
|
webhook_update_task("task-3", "failed", 0, error="Sora API rate limit")
|
||||||
|
import json
|
||||||
|
payload = json.loads(route.calls[0].request.content)
|
||||||
|
assert payload["error"] == "Sora API rate limit"
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
def test_webhook_swallows_network_error(caplog):
|
||||||
|
respx.post("http://nas-test:18801/api/internal/video/update").mock(
|
||||||
|
side_effect=httpx.ConnectError("no host")
|
||||||
|
)
|
||||||
|
webhook_update_task("task-5", "processing", 10)
|
||||||
|
assert "task-5" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
@respx.mock
|
||||||
|
def test_webhook_swallows_non_200(caplog):
|
||||||
|
respx.post("http://nas-test:18801/api/internal/video/update").mock(
|
||||||
|
return_value=httpx.Response(500, text="server error")
|
||||||
|
)
|
||||||
|
webhook_update_task("task-6", "processing", 50)
|
||||||
|
assert "task-6" in caplog.text
|
||||||
43
services/video-render/tests/test_worker.py
Normal file
43
services/video-render/tests/test_worker.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
"""worker.py — job_type 디스패처 (4 provider)."""
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import worker
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_sora_calls_run_sora_generation():
|
||||||
|
payload = {"task_id": "t1", "job_type": "sora_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_sora_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t1", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_veo_calls_run_veo_generation():
|
||||||
|
payload = {"task_id": "t2", "job_type": "veo_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_veo_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t2", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_kling_calls_run_kling_generation():
|
||||||
|
payload = {"task_id": "t3", "job_type": "kling_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_kling_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t3", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_seedance_calls_run_seedance_generation():
|
||||||
|
payload = {"task_id": "t4", "job_type": "seedance_generation", "params": {"prompt": "x"}}
|
||||||
|
with patch("worker.run_seedance_generation") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once_with("t4", {"prompt": "x"})
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_unknown_job_type_logs_error():
|
||||||
|
payload = {"task_id": "t5", "job_type": "weird_type", "params": {}}
|
||||||
|
with patch("worker.webhook_update_task") as m:
|
||||||
|
worker._dispatch(payload)
|
||||||
|
m.assert_called_once()
|
||||||
|
args = m.call_args[0]
|
||||||
|
assert args[0] == "t5"
|
||||||
|
assert args[1] == "failed"
|
||||||
80
services/video-render/worker.py
Normal file
80
services/video-render/worker.py
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
"""Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook.
|
||||||
|
|
||||||
|
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
||||||
|
Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
from providers.sora import run_sora_generation
|
||||||
|
from providers.veo import run_veo_generation
|
||||||
|
from providers.kling import run_kling_generation
|
||||||
|
from providers.seedance import run_seedance_generation
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||||||
|
QUEUE_KEY = "queue:video-render"
|
||||||
|
PAUSED_KEY = "queue:paused"
|
||||||
|
|
||||||
|
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
|
||||||
|
_DISPATCH_TABLE = {
|
||||||
|
"sora_generation": "run_sora_generation",
|
||||||
|
"veo_generation": "run_veo_generation",
|
||||||
|
"kling_generation": "run_kling_generation",
|
||||||
|
"seedance_generation": "run_seedance_generation",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _dispatch(payload: dict) -> None:
|
||||||
|
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
|
||||||
|
job_type = payload.get("job_type", "")
|
||||||
|
task_id = payload.get("task_id", "")
|
||||||
|
params = payload.get("params", {})
|
||||||
|
fn_name = _DISPATCH_TABLE.get(job_type)
|
||||||
|
if fn_name is None:
|
||||||
|
logger.error("unknown job_type=%s task=%s", job_type, task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
fn = getattr(sys.modules[__name__], fn_name)
|
||||||
|
except AttributeError:
|
||||||
|
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
|
||||||
|
return
|
||||||
|
fn(task_id, params)
|
||||||
|
|
||||||
|
|
||||||
|
async def worker_loop():
|
||||||
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||||
|
logger.info("video-render worker started (queue=%s)", QUEUE_KEY)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
paused = await redis.get(PAUSED_KEY)
|
||||||
|
if paused == b"1":
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
continue
|
||||||
|
item = await redis.blpop(QUEUE_KEY, timeout=1)
|
||||||
|
if item is None:
|
||||||
|
continue
|
||||||
|
_, raw = item
|
||||||
|
try:
|
||||||
|
payload = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.error("invalid queue payload: %r", raw[:200])
|
||||||
|
continue
|
||||||
|
await asyncio.to_thread(_dispatch, payload)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("worker_loop cancelled")
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
|
||||||
|
await asyncio.sleep(5)
|
||||||
Reference in New Issue
Block a user