From 17ed1943f1bbae3271e229e615252a38e5523607 Mon Sep 17 00:00:00 2001 From: gahusb Date: Tue, 19 May 2026 08:38:51 +0900 Subject: [PATCH] =?UTF-8?q?feat(video-render):=20providers/kling.py=20?= =?UTF-8?q?=E2=80=94=20Kling=20AI=20via=20PiAPI=20gateway=20(SP-7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /api/v1/task (model=kling, task_type=video_generation) → GET /api/v1/task/{id} 폴링 (10초 × 60) → data.output.video_url 다운로드. x-api-key 헤더. version 1.5/1.6/2.1/2.5/2.6 지원. Plan-B-Video Phase 2. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/video-render/providers/kling.py | 133 +++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 services/video-render/providers/kling.py diff --git a/services/video-render/providers/kling.py b/services/video-render/providers/kling.py new file mode 100644 index 0000000..c45e161 --- /dev/null +++ b/services/video-render/providers/kling.py @@ -0,0 +1,133 @@ +"""Kling AI video generation — PiAPI gateway 경유. + +POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드. +""" +from __future__ import annotations + +import logging +import os +import time +from typing import Optional + +import requests + +from nas_client import webhook_update_task + +logger = logging.getLogger(__name__) + +PIAPI_BASE_URL = "https://api.piapi.ai/api/v1" +VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") +VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") + +POLL_INTERVAL = 10 # Kling은 30~180초 +POLL_MAX_ATTEMPTS = 60 # 최대 10분 + +DEFAULT_VERSION = "2.6" + + +def _headers() -> dict: + api_key = os.getenv("PIAPI_API_KEY", "") + return { + "x-api-key": api_key, + "Content-Type": "application/json", + } + + +def run_kling_generation(task_id: str, params: dict) -> None: + """Kling으로 영상 생성 → mp4 → NAS SMB → webhook.""" + try: + if not os.getenv("PIAPI_API_KEY"): + webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정") + return + + webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...") + + input_obj = { + "prompt": params["prompt"][:2500], + "duration": params.get("duration", 5), + "aspect_ratio": params.get("aspect_ratio", "16:9"), + "mode": params.get("mode", "std"), + "version": params.get("model") or DEFAULT_VERSION, + } + if params.get("negative_prompt"): + input_obj["negative_prompt"] = params["negative_prompt"][:2500] + if params.get("cfg_scale") is not None: + input_obj["cfg_scale"] = str(params["cfg_scale"]) + if params.get("image_url"): + input_obj["image_url"] = params["image_url"] + + body = { + "model": "kling", + "task_type": "video_generation", + "input": input_obj, + "config": {"service_mode": "public"}, + } + + resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30) + if resp.status_code != 200: + webhook_update_task(task_id, "failed", 0, "", + error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}") + return + + body_json = resp.json() + if body_json.get("code") != 200: + webhook_update_task(task_id, "failed", 0, "", + error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}") + return + + piapi_task_id = (body_json.get("data") or {}).get("task_id", "") + if not piapi_task_id: + webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음") + return + + webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨") + + # 폴링 — GET /task/{id} + video_url = None + for attempt in range(POLL_MAX_ATTEMPTS): + time.sleep(POLL_INTERVAL) + fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}", + headers=_headers(), timeout=30) + if fetch.status_code != 200: + continue + fd = fetch.json() + data = fd.get("data", {}) + status = data.get("status", "") + scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) + webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})") + + if status == "Completed": + video_url = (data.get("output") or {}).get("video_url", "") + break + elif status in ("Failed", "failed"): + err = (data.get("error") or {}).get("message", "Kling 작업 실패") + webhook_update_task(task_id, "failed", 0, "", error=err) + return + # Pending/Processing/Staged → 계속 폴링 + else: + webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)") + return + + if not video_url: + webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음") + return + + webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...") + filename = f"{task_id}.mp4" + os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) + file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) + + dl = requests.get(video_url, stream=True, timeout=300) + dl.raise_for_status() + with open(file_path, "wb") as f: + for chunk in dl.iter_content(chunk_size=8192): + f.write(chunk) + + local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}" + webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url) + + except requests.Timeout: + webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃") + except Exception as e: + logger.exception("Kling generation error task=%s", task_id) + webhook_update_task(task_id, "failed", 0, "", error=str(e))