"""Veo 3.1 video generation — Google Vertex AI. POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 → 결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB. """ from __future__ import annotations import logging import os import subprocess import time from typing import Optional import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") POLL_INTERVAL = 12 # Veo는 30~120초 소요 POLL_MAX_ATTEMPTS = 50 # 최대 ~10분 DEFAULT_MODEL = "veo-3.1-fast-generate-001" def _gcloud_access_token() -> Optional[str]: """GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행. google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용. REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행. """ try: from google.auth import default as google_default_auth from google.auth.transport.requests import Request as GoogleAuthRequest credentials, _ = google_default_auth( scopes=["https://www.googleapis.com/auth/cloud-platform"], ) credentials.refresh(GoogleAuthRequest()) return credentials.token except Exception: logger.exception("Google credentials refresh 실패") return None def _download_gcs(gcs_uri: str, local_path: str) -> bool: """gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환.""" try: from google.cloud import storage as gcs_storage if not gcs_uri.startswith("gs://"): return False without_scheme = gcs_uri[len("gs://"):] bucket_name, blob_path = without_scheme.split("/", 1) client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID")) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) blob.download_to_filename(local_path) return True except Exception: logger.exception("GCS 다운로드 실패: %s", gcs_uri) return False def run_veo_generation(task_id: str, params: dict) -> None: """Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook.""" try: project_id = os.getenv("GOOGLE_PROJECT_ID", "") location = os.getenv("GOOGLE_LOCATION", "us-central1") gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "") if not project_id or not gcs_bucket: webhook_update_task(task_id, "failed", 0, "", error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정") return token = _gcloud_access_token() if not token: webhook_update_task(task_id, "failed", 0, "", error="Google access token 발행 실패 (서비스 계정 JSON 확인)") return webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...") model_id = params.get("model") or DEFAULT_MODEL endpoint_base = ( f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}" f"/locations/{location}/publishers/google/models/{model_id}" ) headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } body = { "instances": [{"prompt": params["prompt"]}], "parameters": { "storageUri": f"gs://{gcs_bucket}/veo/{task_id}/", "sampleCount": 1, "aspectRatio": params.get("aspect_ratio") or "16:9", }, } if params.get("duration"): body["parameters"]["duration"] = params["duration"] if params.get("negative_prompt"): body["parameters"]["negativePrompt"] = params["negative_prompt"] resp = requests.post(f"{endpoint_base}:predictLongRunning", headers=headers, json=body, timeout=30) if resp.status_code != 200: webhook_update_task(task_id, "failed", 0, "", error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}") return op_name = resp.json().get("name", "") if not op_name: webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음") return webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨") # 폴링 — fetchPredictOperation gcs_uri = None for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) fetch = requests.post( f"{endpoint_base}:fetchPredictOperation", headers=headers, json={"operationName": op_name}, timeout=30, ) if fetch.status_code != 200: continue fd = fetch.json() done = fd.get("done", False) scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...") if done: if "error" in fd: webhook_update_task(task_id, "failed", 0, "", error=f"Veo 작업 실패: {fd['error'].get('message','?')}") return videos = (fd.get("response") or {}).get("videos") or [] if not videos: webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음") return gcs_uri = videos[0].get("gcsUri", "") break else: webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)") return if not gcs_uri: webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음") return webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...") filename = f"{task_id}.mp4" os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) ok = _download_gcs(gcs_uri, file_path) if not ok: webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}") return local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url) except requests.Timeout: webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃") except Exception as e: logger.exception("Veo generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e))