"""Kling AI video generation — PiAPI gateway 경유. POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드. """ from __future__ import annotations import logging import os import time from typing import Optional import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) PIAPI_BASE_URL = "https://api.piapi.ai/api/v1" VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") POLL_INTERVAL = 10 # Kling은 30~180초 POLL_MAX_ATTEMPTS = 60 # 최대 10분 DEFAULT_VERSION = "2.6" def _headers() -> dict: api_key = os.getenv("PIAPI_API_KEY", "") return { "x-api-key": api_key, "Content-Type": "application/json", } def run_kling_generation(task_id: str, params: dict) -> None: """Kling으로 영상 생성 → mp4 → NAS SMB → webhook.""" try: if not os.getenv("PIAPI_API_KEY"): webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정") return webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...") input_obj = { "prompt": params["prompt"][:2500], "duration": params.get("duration", 5), "aspect_ratio": params.get("aspect_ratio", "16:9"), "mode": params.get("mode", "std"), "version": params.get("model") or DEFAULT_VERSION, } if params.get("negative_prompt"): input_obj["negative_prompt"] = params["negative_prompt"][:2500] if params.get("cfg_scale") is not None: input_obj["cfg_scale"] = str(params["cfg_scale"]) if params.get("image_url"): input_obj["image_url"] = params["image_url"] body = { "model": "kling", "task_type": "video_generation", "input": input_obj, "config": {"service_mode": "public"}, } resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30) if resp.status_code != 200: webhook_update_task(task_id, "failed", 0, "", error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}") return body_json = resp.json() if body_json.get("code") != 200: webhook_update_task(task_id, "failed", 0, "", error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}") return piapi_task_id = (body_json.get("data") or {}).get("task_id", "") if not piapi_task_id: webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음") return webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨") # 폴링 — GET /task/{id} video_url = None for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}", headers=_headers(), timeout=30) if fetch.status_code != 200: continue fd = fetch.json() data = fd.get("data", {}) status = data.get("status", "") scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})") if status == "Completed": video_url = (data.get("output") or {}).get("video_url", "") break elif status in ("Failed", "failed"): err = (data.get("error") or {}).get("message", "Kling 작업 실패") webhook_update_task(task_id, "failed", 0, "", error=err) return # Pending/Processing/Staged → 계속 폴링 else: webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)") return if not video_url: webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음") return webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...") filename = f"{task_id}.mp4" os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) dl = requests.get(video_url, stream=True, timeout=300) dl.raise_for_status() with open(file_path, "wb") as f: for chunk in dl.iter_content(chunk_size=8192): f.write(chunk) local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url) except requests.Timeout: webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃") except Exception as e: logger.exception("Kling generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e))