Files
ai-trade/services/video-render/providers/veo.py
gahusb 53a0657027 fix(video-render): Veo durationSeconds str → int (T10 follow-up 2)
end-to-end 검증 2차: Gemini API는 durationSeconds를 number로 요구.
str("6") → 400 INVALID_ARGUMENT. int(params["duration"])로 전송.
(WebFetch 문서는 string으로 표기했으나 실제 API는 number.)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 01:25:22 +09:00

140 lines
5.6 KiB
Python

"""Veo 3.1 video generation — Gemini API (ai.google.dev).
POST https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:predictLongRunning
GET https://generativelanguage.googleapis.com/v1beta/{operation_name}
→ done=true 시 response.generateVideoResponse.generatedSamples[0].video.uri 다운로드
"""
from __future__ import annotations
import logging
import os
import time
from typing import Optional
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 10 # Veo는 30~120초 소요
POLL_MAX_ATTEMPTS = 60 # 최대 ~10분
DEFAULT_MODEL = "veo-3.1-fast-generate-preview"
def _headers() -> dict:
api_key = os.getenv("GEMINI_API_KEY", "")
return {
"x-goog-api-key": api_key,
"Content-Type": "application/json",
}
def run_veo_generation(task_id: str, params: dict) -> None:
"""Veo로 영상 생성 → mp4 → NAS SMB → webhook."""
try:
if not os.getenv("GEMINI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="GEMINI_API_KEY 미설정 (Windows .env)")
return
webhook_update_task(task_id, "processing", 5, "Veo (Gemini API) 호출 중...")
model_id = params.get("model") or DEFAULT_MODEL
body = {
"instances": [{"prompt": params["prompt"]}],
"parameters": {
"aspectRatio": params.get("aspect_ratio") or "16:9",
},
}
# numberOfVideos는 일부 모델(veo-3.0-fast 등) 미지원 — 호출자 명시 시에만 추가
if params.get("number_of_videos"):
body["parameters"]["numberOfVideos"] = int(params["number_of_videos"])
if params.get("duration"):
body["parameters"]["durationSeconds"] = int(params["duration"])
if params.get("resolution"):
body["parameters"]["resolution"] = params["resolution"]
if params.get("negative_prompt"):
body["parameters"]["negativePrompt"] = params["negative_prompt"]
if params.get("person_generation"):
body["parameters"]["personGeneration"] = params["person_generation"]
resp = requests.post(
f"{GEMINI_BASE_URL}/models/{model_id}:predictLongRunning",
headers=_headers(), json=body, timeout=30,
)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo Gemini API 오류: {resp.status_code} {resp.text[:300]}")
return
op_name = resp.json().get("name", "")
if not op_name:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음")
return
webhook_update_task(task_id, "processing", 15, "Veo 작업 시작됨")
# 폴링 — GET /v1beta/{operation_name}
video_uri = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.get(
f"{GEMINI_BASE_URL}/{op_name}",
headers=_headers(),
timeout=30,
)
if fetch.status_code != 200:
continue
fd = fetch.json()
done = fd.get("done", False)
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...")
if done:
if "error" in fd:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo 작업 실패: {fd['error'].get('message','?')}")
return
# response.generateVideoResponse.generatedSamples[0].video.uri
response = fd.get("response") or {}
gen = response.get("generateVideoResponse") or {}
samples = gen.get("generatedSamples") or []
if not samples:
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 generatedSamples 비어 있음")
return
video_uri = (samples[0].get("video") or {}).get("uri", "")
break
else:
webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)")
return
if not video_uri:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 video.uri 없음")
return
webhook_update_task(task_id, "processing", 85, "Veo 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
# 다운로드 — x-goog-api-key 헤더 그대로 사용 (Gemini API가 인증 처리)
dl = requests.get(video_uri, headers=_headers(), stream=True, timeout=300)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃")
except Exception as e:
logger.exception("Veo generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))