"""Veo 3.1 video generation — Gemini API (ai.google.dev). POST https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:predictLongRunning GET https://generativelanguage.googleapis.com/v1beta/{operation_name} → done=true 시 response.generateVideoResponse.generatedSamples[0].video.uri 다운로드 """ from __future__ import annotations import logging import os import time from typing import Optional import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta" VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") POLL_INTERVAL = 10 # Veo는 30~120초 소요 POLL_MAX_ATTEMPTS = 60 # 최대 ~10분 DEFAULT_MODEL = "veo-3.1-fast-generate-preview" def _headers() -> dict: api_key = os.getenv("GEMINI_API_KEY", "") return { "x-goog-api-key": api_key, "Content-Type": "application/json", } def run_veo_generation(task_id: str, params: dict) -> None: """Veo로 영상 생성 → mp4 → NAS SMB → webhook.""" try: if not os.getenv("GEMINI_API_KEY"): webhook_update_task(task_id, "failed", 0, "", error="GEMINI_API_KEY 미설정 (Windows .env)") return webhook_update_task(task_id, "processing", 5, "Veo (Gemini API) 호출 중...") model_id = params.get("model") or DEFAULT_MODEL body = { "instances": [{"prompt": params["prompt"]}], "parameters": { "aspectRatio": params.get("aspect_ratio") or "16:9", }, } # numberOfVideos는 일부 모델(veo-3.0-fast 등) 미지원 — 호출자 명시 시에만 추가 if params.get("number_of_videos"): body["parameters"]["numberOfVideos"] = int(params["number_of_videos"]) if params.get("duration"): body["parameters"]["durationSeconds"] = str(params["duration"]) if params.get("resolution"): body["parameters"]["resolution"] = params["resolution"] if params.get("negative_prompt"): body["parameters"]["negativePrompt"] = params["negative_prompt"] if params.get("person_generation"): body["parameters"]["personGeneration"] = params["person_generation"] resp = requests.post( f"{GEMINI_BASE_URL}/models/{model_id}:predictLongRunning", headers=_headers(), json=body, timeout=30, ) if resp.status_code != 200: webhook_update_task(task_id, "failed", 0, "", error=f"Veo Gemini API 오류: {resp.status_code} {resp.text[:300]}") return op_name = resp.json().get("name", "") if not op_name: webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음") return webhook_update_task(task_id, "processing", 15, "Veo 작업 시작됨") # 폴링 — GET /v1beta/{operation_name} video_uri = None for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) fetch = requests.get( f"{GEMINI_BASE_URL}/{op_name}", headers=_headers(), timeout=30, ) if fetch.status_code != 200: continue fd = fetch.json() done = fd.get("done", False) scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) webhook_update_task(task_id, "processing", scaled, "Veo 생성 중...") if done: if "error" in fd: webhook_update_task(task_id, "failed", 0, "", error=f"Veo 작업 실패: {fd['error'].get('message','?')}") return # response.generateVideoResponse.generatedSamples[0].video.uri response = fd.get("response") or {} gen = response.get("generateVideoResponse") or {} samples = gen.get("generatedSamples") or [] if not samples: webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 generatedSamples 비어 있음") return video_uri = (samples[0].get("video") or {}).get("uri", "") break else: webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)") return if not video_uri: webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 video.uri 없음") return webhook_update_task(task_id, "processing", 85, "Veo 결과 다운로드 중...") filename = f"{task_id}.mp4" os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) # 다운로드 — x-goog-api-key 헤더 그대로 사용 (Gemini API가 인증 처리) dl = requests.get(video_uri, headers=_headers(), stream=True, timeout=300) dl.raise_for_status() with open(file_path, "wb") as f: for chunk in dl.iter_content(chunk_size=8192): f.write(chunk) local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url) except requests.Timeout: webhook_update_task(task_id, "failed", 0, "", error="Veo API 타임아웃") except Exception as e: logger.exception("Veo generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e))