fix(video-render): Veo Vertex AI → Gemini API (T10 follow-up)

박재오 발견: Veo는 Gemini API key 단일로 충분 (ai.google.dev).
Vertex AI의 GCP project + service account JSON + GCS bucket 셋업 불필요.

변경:
- providers/veo.py: generativelanguage.googleapis.com/v1beta endpoint
  + x-goog-api-key 헤더 + response.generateVideoResponse.generatedSamples[0].video.uri
- .env.example: GOOGLE_PROJECT_ID/LOCATION/GCS_BUCKET/SA_JSON 4 변수 → GEMINI_API_KEY 1개
- docker-compose: GCP 4 env + SA JSON volume mount 제거, GEMINI_API_KEY 추가
- requirements.txt: google-cloud-storage 제거 (requests만 사용)

박재오 측 영향: /etc/webai/gcp-sa.json 더미 파일 + GCP_SA_JSON_HOST_PATH env 무관.
GEMINI_API_KEY 1개만 발급하여 .env에 추가하면 됨.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 02:32:11 +09:00
parent 4db0551d33
commit 8aa3f1c3b2
4 changed files with 54 additions and 101 deletions

View File

@@ -63,17 +63,13 @@ services:
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18801} - NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18801}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-} - INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- OPENAI_API_KEY=${OPENAI_API_KEY:-} - OPENAI_API_KEY=${OPENAI_API_KEY:-}
- GOOGLE_PROJECT_ID=${GOOGLE_PROJECT_ID:-} - GEMINI_API_KEY=${GEMINI_API_KEY:-}
- GOOGLE_LOCATION=${GOOGLE_LOCATION:-us-central1}
- GOOGLE_GCS_BUCKET=${GOOGLE_GCS_BUCKET:-}
- GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
- PIAPI_API_KEY=${PIAPI_API_KEY:-} - PIAPI_API_KEY=${PIAPI_API_KEY:-}
- SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-} - SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-}
- VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video} - VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video}
- VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video} - VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video}
volumes: volumes:
- /mnt/nas/webpage/data/video:/mnt/nas/webpage/data/video - /mnt/nas/webpage/data/video:/mnt/nas/webpage/data/video
- ${GCP_SA_JSON_HOST_PATH:-/etc/webai/gcp-sa.json}:/app/keys/gcp-sa.json:ro
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s interval: 60s

View File

@@ -10,11 +10,8 @@ INTERNAL_API_KEY=__copy_from_nas_dotenv__
# Sora 2 (OpenAI) # Sora 2 (OpenAI)
OPENAI_API_KEY=__paste_openai_key__ OPENAI_API_KEY=__paste_openai_key__
# Veo 3.1 (Google Vertex AI) # Veo (Google Gemini API — ai.google.dev. Vertex AI 경로 아님, GCS bucket 불필요)
GOOGLE_PROJECT_ID=__paste_gcp_project_id__ GEMINI_API_KEY=__paste_gemini_key__
GOOGLE_LOCATION=us-central1
GOOGLE_GCS_BUCKET=__paste_gcs_bucket_name__
GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
# Kling (PiAPI gateway) # Kling (PiAPI gateway)
PIAPI_API_KEY=__paste_piapi_key__ PIAPI_API_KEY=__paste_piapi_key__

View File

@@ -1,13 +1,13 @@
"""Veo 3.1 video generation — Google Vertex AI. """Veo 3.1 video generation — Gemini API (ai.google.dev).
POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 → POST https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:predictLongRunning
결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB. GET https://generativelanguage.googleapis.com/v1beta/{operation_name}
→ done=true 시 response.generateVideoResponse.generatedSamples[0].video.uri 다운로드
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
import os import os
import subprocess
import time import time
from typing import Optional from typing import Optional
@@ -17,100 +17,57 @@ from nas_client import webhook_update_task
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 12 # Veo는 30~120초 소요 POLL_INTERVAL = 10 # Veo는 30~120초 소요
POLL_MAX_ATTEMPTS = 50 # 최대 ~10분 POLL_MAX_ATTEMPTS = 60 # 최대 ~10분
DEFAULT_MODEL = "veo-3.1-fast-generate-001" DEFAULT_MODEL = "veo-3.1-fast-generate-preview"
def _gcloud_access_token() -> Optional[str]: def _headers() -> dict:
"""GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행. api_key = os.getenv("GEMINI_API_KEY", "")
return {
google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용. "x-goog-api-key": api_key,
REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행. "Content-Type": "application/json",
""" }
try:
from google.auth import default as google_default_auth
from google.auth.transport.requests import Request as GoogleAuthRequest
credentials, _ = google_default_auth(
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
credentials.refresh(GoogleAuthRequest())
return credentials.token
except Exception:
logger.exception("Google credentials refresh 실패")
return None
def _download_gcs(gcs_uri: str, local_path: str) -> bool:
"""gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환."""
try:
from google.cloud import storage as gcs_storage
if not gcs_uri.startswith("gs://"):
return False
without_scheme = gcs_uri[len("gs://"):]
bucket_name, blob_path = without_scheme.split("/", 1)
client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID"))
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
blob.download_to_filename(local_path)
return True
except Exception:
logger.exception("GCS 다운로드 실패: %s", gcs_uri)
return False
def run_veo_generation(task_id: str, params: dict) -> None: def run_veo_generation(task_id: str, params: dict) -> None:
"""Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook.""" """Veo로 영상 생성 → mp4 → NAS SMB → webhook."""
try: try:
project_id = os.getenv("GOOGLE_PROJECT_ID", "") if not os.getenv("GEMINI_API_KEY"):
location = os.getenv("GOOGLE_LOCATION", "us-central1") webhook_update_task(task_id, "failed", 0, "", error="GEMINI_API_KEY 미설정 (Windows .env)")
gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "")
if not project_id or not gcs_bucket:
webhook_update_task(task_id, "failed", 0, "",
error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정")
return return
token = _gcloud_access_token() webhook_update_task(task_id, "processing", 5, "Veo (Gemini API) 호출 중...")
if not token:
webhook_update_task(task_id, "failed", 0, "",
error="Google access token 발행 실패 (서비스 계정 JSON 확인)")
return
webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...")
model_id = params.get("model") or DEFAULT_MODEL model_id = params.get("model") or DEFAULT_MODEL
endpoint_base = (
f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}"
f"/locations/{location}/publishers/google/models/{model_id}"
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
body = { body = {
"instances": [{"prompt": params["prompt"]}], "instances": [{"prompt": params["prompt"]}],
"parameters": { "parameters": {
"storageUri": f"gs://{gcs_bucket}/veo/{task_id}/",
"sampleCount": 1,
"aspectRatio": params.get("aspect_ratio") or "16:9", "aspectRatio": params.get("aspect_ratio") or "16:9",
"numberOfVideos": 1,
}, },
} }
if params.get("duration"): if params.get("duration"):
body["parameters"]["duration"] = params["duration"] body["parameters"]["durationSeconds"] = str(params["duration"])
if params.get("resolution"):
body["parameters"]["resolution"] = params["resolution"]
if params.get("negative_prompt"): if params.get("negative_prompt"):
body["parameters"]["negativePrompt"] = params["negative_prompt"] body["parameters"]["negativePrompt"] = params["negative_prompt"]
if params.get("person_generation"):
body["parameters"]["personGeneration"] = params["person_generation"]
resp = requests.post(f"{endpoint_base}:predictLongRunning", resp = requests.post(
headers=headers, json=body, timeout=30) f"{GEMINI_BASE_URL}/models/{model_id}:predictLongRunning",
headers=_headers(), json=body, timeout=30,
)
if resp.status_code != 200: if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "", webhook_update_task(task_id, "failed", 0, "",
error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}") error=f"Veo Gemini API 오류: {resp.status_code} {resp.text[:300]}")
return return
op_name = resp.json().get("name", "") op_name = resp.json().get("name", "")
@@ -118,16 +75,15 @@ def run_veo_generation(task_id: str, params: dict) -> None:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음") webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음")
return return
webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨") webhook_update_task(task_id, "processing", 15, "Veo 작업 시작됨")
# 폴링 — fetchPredictOperation # 폴링 — GET /v1beta/{operation_name}
gcs_uri = None video_uri = None
for attempt in range(POLL_MAX_ATTEMPTS): for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL) time.sleep(POLL_INTERVAL)
fetch = requests.post( fetch = requests.get(
f"{endpoint_base}:fetchPredictOperation", f"{GEMINI_BASE_URL}/{op_name}",
headers=headers, headers=_headers(),
json={"operationName": op_name},
timeout=30, timeout=30,
) )
if fetch.status_code != 200: if fetch.status_code != 200:
@@ -142,29 +98,34 @@ def run_veo_generation(task_id: str, params: dict) -> None:
webhook_update_task(task_id, "failed", 0, "", webhook_update_task(task_id, "failed", 0, "",
error=f"Veo 작업 실패: {fd['error'].get('message','?')}") error=f"Veo 작업 실패: {fd['error'].get('message','?')}")
return return
videos = (fd.get("response") or {}).get("videos") or [] # response.generateVideoResponse.generatedSamples[0].video.uri
if not videos: response = fd.get("response") or {}
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음") gen = response.get("generateVideoResponse") or {}
samples = gen.get("generatedSamples") or []
if not samples:
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 generatedSamples 비어 있음")
return return
gcs_uri = videos[0].get("gcsUri", "") video_uri = (samples[0].get("video") or {}).get("uri", "")
break break
else: else:
webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)") webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)")
return return
if not gcs_uri: if not video_uri:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음") webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 video.uri 없음")
return return
webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...") webhook_update_task(task_id, "processing", 85, "Veo 결과 다운로드 중...")
filename = f"{task_id}.mp4" filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
ok = _download_gcs(gcs_uri, file_path) # 다운로드 — x-goog-api-key 헤더 그대로 사용 (Gemini API가 인증 처리)
if not ok: dl = requests.get(video_uri, headers=_headers(), stream=True, timeout=300)
webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}") dl.raise_for_status()
return with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url) webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url)

View File

@@ -4,7 +4,6 @@ requests==2.32.3
redis>=5.0 redis>=5.0
httpx>=0.27 httpx>=0.27
openai>=1.50.0 openai>=1.50.0
google-cloud-storage>=2.18.0
pytest>=8.0 pytest>=8.0
pytest-asyncio>=0.24 pytest-asyncio>=0.24
respx>=0.21 respx>=0.21