From 8d246b5b322ea5c1427b3fdcea7a2b089814eea5 Mon Sep 17 00:00:00 2001 From: gahusb Date: Tue, 19 May 2026 08:37:45 +0900 Subject: [PATCH] =?UTF-8?q?feat(video-render):=20providers/veo.py=20?= =?UTF-8?q?=E2=80=94=20Veo=203.1=20Vertex=20AI=20client=20(SP-7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit predictLongRunning → fetchPredictOperation 폴링 (12초 × 50). 결과 gs://bucket/veo/{task_id}/sample_0.mp4 → google-cloud-storage SDK로 다운로드 → NAS SMB. GOOGLE_PROJECT_ID/LOCATION/GCS_BUCKET/APPLICATION_CREDENTIALS env. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/video-render/providers/veo.py | 176 +++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 services/video-render/providers/veo.py diff --git a/services/video-render/providers/veo.py b/services/video-render/providers/veo.py new file mode 100644 index 0000000..7f8340f --- /dev/null +++ b/services/video-render/providers/veo.py @@ -0,0 +1,176 @@ +"""Veo 3.1 video generation — Google Vertex AI. + +POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 → +결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB. +""" +from __future__ import annotations + +import logging +import os +import subprocess +import time +from typing import Optional + +import requests + +from nas_client import webhook_update_task + +logger = logging.getLogger(__name__) + +VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") +VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") + +POLL_INTERVAL = 12 # Veo는 30~120초 소요 +POLL_MAX_ATTEMPTS = 50 # 최대 ~10분 + +DEFAULT_MODEL = "veo-3.1-fast-generate-001" + + +def _gcloud_access_token() -> Optional[str]: + """GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행. + + google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용. + REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행. + """ + try: + from google.auth import default as google_default_auth + from google.auth.transport.requests import Request as GoogleAuthRequest + + credentials, _ = google_default_auth( + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + credentials.refresh(GoogleAuthRequest()) + return credentials.token + except Exception: + logger.exception("Google credentials refresh 실패") + return None + + +def _download_gcs(gcs_uri: str, local_path: str) -> bool: + """gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환.""" + try: + from google.cloud import storage as gcs_storage + if not gcs_uri.startswith("gs://"): + return False + without_scheme = gcs_uri[len("gs://"):] + bucket_name, blob_path = without_scheme.split("/", 1) + client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID")) + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + blob.download_to_filename(local_path) + return True + except Exception: + logger.exception("GCS 다운로드 실패: %s", gcs_uri) + return False + + +def run_veo_generation(task_id: str, params: dict) -> None: + """Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook.""" + try: + project_id = os.getenv("GOOGLE_PROJECT_ID", "") + location = os.getenv("GOOGLE_LOCATION", "us-central1") + gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "") + + if not project_id or not gcs_bucket: + webhook_update_task(task_id, "failed", 0, "", + error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정") + return + + token = _gcloud_access_token() + if not token: + webhook_update_task(task_id, "failed", 0, "", + error="Google access token 발행 실패 (서비스 계정 JSON 확인)") + return + + webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...") + + model_id = params.get("model") or DEFAULT_MODEL + endpoint_base = ( + f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}" + f"/locations/{location}/publishers/google/models/{model_id}" + ) + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + body = { + "instances": [{"prompt": params["prompt"]}], + "parameters": { + "storageUri": f"gs://{gcs_bucket}/veo/{task_id}/", + "sampleCount": 1, + "aspectRatio": params.get("aspect_ratio") or "16:9", + }, + } + if params.get("duration"): + body["parameters"]["duration"] = params["duration"] + if params.get("negative_prompt"): + body["parameters"]["negativePrompt"] = params["negative_prompt"] + + resp = requests.post(f"{endpoint_base}:predictLongRunning", + headers=headers, json=body, timeout=30) + if resp.status_code != 200: + webhook_update_task(task_id, "failed", 0, "", + error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}") + return + + op_name = resp.json().get("name", "") + if not op_name: + webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음") + return + + webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨") + + # 폴링 — fetchPredictOperation + gcs_uri = None + for attempt in range(POLL_MAX_ATTEMPTS): + time.sleep(POLL_INTERVAL) + fetch = requests.post( + f"{endpoint_base}:fetchPredictOperation", + headers=headers, + json={"operationName": op_name}, + timeout=30, + ) + if fetch.status_code != 200: + continue + fd = fetch.json() + done = fd.get("done", False) + scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) + webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...") + + if done: + if "error" in fd: + webhook_update_task(task_id, "failed", 0, "", + error=f"Veo 작업 실패: {fd['error'].get('message','?')}") + return + videos = (fd.get("response") or {}).get("videos") or [] + if not videos: + webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음") + return + gcs_uri = videos[0].get("gcsUri", "") + break + else: + webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)") + return + + if not gcs_uri: + webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음") + return + + webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...") + filename = f"{task_id}.mp4" + os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) + file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) + + ok = _download_gcs(gcs_uri, file_path) + if not ok: + webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}") + return + + local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" + webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url) + + except requests.Timeout: + webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃") + except Exception as e: + logger.exception("Veo generation error task=%s", task_id) + webhook_update_task(task_id, "failed", 0, "", error=str(e))