diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 36274e3..cb4d618 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -64,7 +64,8 @@ services: - INTERNAL_API_KEY=${INTERNAL_API_KEY:-} - OPENAI_API_KEY=${OPENAI_API_KEY:-} - GEMINI_API_KEY=${GEMINI_API_KEY:-} - - PIAPI_API_KEY=${PIAPI_API_KEY:-} + - KLING_ACCESS_KEY=${KLING_ACCESS_KEY:-} + - KLING_SECRET_KEY=${KLING_SECRET_KEY:-} - SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-} - VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video} - VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video} diff --git a/services/video-render/.env.example b/services/video-render/.env.example index 459b796..a92d8fd 100644 --- a/services/video-render/.env.example +++ b/services/video-render/.env.example @@ -13,8 +13,9 @@ OPENAI_API_KEY=__paste_openai_key__ # Veo (Google Gemini API — ai.google.dev. Vertex AI 경로 아님, GCS bucket 불필요) GEMINI_API_KEY=__paste_gemini_key__ -# Kling (PiAPI gateway) -PIAPI_API_KEY=__paste_piapi_key__ +# Kling (Native KlingAI — JWT auth with Access Key + Secret Key) +KLING_ACCESS_KEY=__paste_kling_access_key__ +KLING_SECRET_KEY=__paste_kling_secret_key__ # Seedance 2.0 (BytePlus) SEEDANCE_API_KEY=__paste_seedance_key__ diff --git a/services/video-render/providers/kling.py b/services/video-render/providers/kling.py index c45e161..d120286 100644 --- a/services/video-render/providers/kling.py +++ b/services/video-render/providers/kling.py @@ -1,6 +1,7 @@ -"""Kling AI video generation — PiAPI gateway 경유. +"""Kling AI video generation — Native KlingAI API (api.klingai.com). -POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드. +JWT auth: HS256, payload {iss: ACCESS_KEY, exp: now+1800, nbf: now-5}. +POST /v1/videos/text2video → GET /v1/videos/text2video/{task_id} → task_result.videos[0].url 다운로드. """ from __future__ import annotations @@ -9,26 +10,41 @@ import os import time from typing import Optional +import jwt as pyjwt import requests from nas_client import webhook_update_task logger = logging.getLogger(__name__) -PIAPI_BASE_URL = "https://api.piapi.ai/api/v1" +KLING_BASE_URL = "https://api.klingai.com" VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video") VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video") -POLL_INTERVAL = 10 # Kling은 30~180초 -POLL_MAX_ATTEMPTS = 60 # 최대 10분 +POLL_INTERVAL = 10 +POLL_MAX_ATTEMPTS = 60 # 최대 ~10분 -DEFAULT_VERSION = "2.6" +DEFAULT_MODEL = "kling-v1-6" + +JWT_EXP_SECONDS = 1800 # 30분 +JWT_NBF_OFFSET = -5 # 5초 뒤로 + + +def _generate_jwt() -> Optional[str]: + access_key = os.getenv("KLING_ACCESS_KEY", "") + secret_key = os.getenv("KLING_SECRET_KEY", "") + if not access_key or not secret_key: + return None + now = int(time.time()) + headers = {"alg": "HS256", "typ": "JWT"} + payload = {"iss": access_key, "exp": now + JWT_EXP_SECONDS, "nbf": now + JWT_NBF_OFFSET} + return pyjwt.encode(payload, secret_key, algorithm="HS256", headers=headers) def _headers() -> dict: - api_key = os.getenv("PIAPI_API_KEY", "") + token = _generate_jwt() return { - "x-api-key": api_key, + "Authorization": f"Bearer {token}" if token else "", "Content-Type": "application/json", } @@ -36,80 +52,83 @@ def _headers() -> dict: def run_kling_generation(task_id: str, params: dict) -> None: """Kling으로 영상 생성 → mp4 → NAS SMB → webhook.""" try: - if not os.getenv("PIAPI_API_KEY"): - webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정") + if not os.getenv("KLING_ACCESS_KEY") or not os.getenv("KLING_SECRET_KEY"): + webhook_update_task(task_id, "failed", 0, "", + error="KLING_ACCESS_KEY 또는 KLING_SECRET_KEY 미설정") return webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...") - input_obj = { - "prompt": params["prompt"][:2500], - "duration": params.get("duration", 5), - "aspect_ratio": params.get("aspect_ratio", "16:9"), - "mode": params.get("mode", "std"), - "version": params.get("model") or DEFAULT_VERSION, - } - if params.get("negative_prompt"): - input_obj["negative_prompt"] = params["negative_prompt"][:2500] - if params.get("cfg_scale") is not None: - input_obj["cfg_scale"] = str(params["cfg_scale"]) - if params.get("image_url"): - input_obj["image_url"] = params["image_url"] + # image_url 있으면 image2video, 없으면 text2video + is_image2video = bool(params.get("image_url")) + endpoint_path = "/v1/videos/image2video" if is_image2video else "/v1/videos/text2video" body = { - "model": "kling", - "task_type": "video_generation", - "input": input_obj, - "config": {"service_mode": "public"}, + "model_name": params.get("model") or DEFAULT_MODEL, + "prompt": params["prompt"][:2500], + "duration": str(params.get("duration", 5)), + "aspect_ratio": params.get("aspect_ratio", "16:9"), + "mode": params.get("mode", "std"), } + if params.get("negative_prompt"): + body["negative_prompt"] = params["negative_prompt"][:2500] + if params.get("cfg_scale") is not None: + body["cfg_scale"] = float(params["cfg_scale"]) + if is_image2video: + body["image"] = params["image_url"] - resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30) + resp = requests.post(f"{KLING_BASE_URL}{endpoint_path}", + headers=_headers(), json=body, timeout=30) if resp.status_code != 200: webhook_update_task(task_id, "failed", 0, "", - error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}") + error=f"Kling API 오류: {resp.status_code} {resp.text[:300]}") return body_json = resp.json() - if body_json.get("code") != 200: + if body_json.get("code") != 0: webhook_update_task(task_id, "failed", 0, "", - error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}") + error=f"Kling API 거부: {body_json.get('message', '?')}") return - piapi_task_id = (body_json.get("data") or {}).get("task_id", "") - if not piapi_task_id: - webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음") + kling_task_id = (body_json.get("data") or {}).get("task_id", "") + if not kling_task_id: + webhook_update_task(task_id, "failed", 0, "", error="Kling 응답에 task_id 없음") return webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨") - # 폴링 — GET /task/{id} + # 폴링 — GET /v1/videos/{text2video|image2video}/{task_id} video_url = None for attempt in range(POLL_MAX_ATTEMPTS): time.sleep(POLL_INTERVAL) - fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}", + fetch = requests.get(f"{KLING_BASE_URL}{endpoint_path}/{kling_task_id}", headers=_headers(), timeout=30) if fetch.status_code != 200: continue fd = fetch.json() - data = fd.get("data", {}) - status = data.get("status", "") + if fd.get("code") != 0: + continue + data = fd.get("data") or {} + status = data.get("task_status", "") scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79) webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})") - if status == "Completed": - video_url = (data.get("output") or {}).get("video_url", "") + if status == "succeed": + videos = ((data.get("task_result") or {}).get("videos") or []) + if videos: + video_url = videos[0].get("url", "") break - elif status in ("Failed", "failed"): - err = (data.get("error") or {}).get("message", "Kling 작업 실패") + elif status == "failed": + err = data.get("task_status_msg") or "Kling 작업 실패" webhook_update_task(task_id, "failed", 0, "", error=err) return - # Pending/Processing/Staged → 계속 폴링 + # submitted/processing → 계속 폴링 else: webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)") return if not video_url: - webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음") + webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video url 없음") return webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...") @@ -117,6 +136,7 @@ def run_kling_generation(task_id: str, params: dict) -> None: os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True) file_path = os.path.join(VIDEO_MEDIA_ROOT, filename) + # Kling 결과 url은 일반적으로 인증 불필요 (signed URL) dl = requests.get(video_url, stream=True, timeout=300) dl.raise_for_status() with open(file_path, "wb") as f: @@ -127,7 +147,7 @@ def run_kling_generation(task_id: str, params: dict) -> None: webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url) except requests.Timeout: - webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃") + webhook_update_task(task_id, "failed", 0, "", error="Kling API 타임아웃") except Exception as e: logger.exception("Kling generation error task=%s", task_id) webhook_update_task(task_id, "failed", 0, "", error=str(e)) diff --git a/services/video-render/requirements.txt b/services/video-render/requirements.txt index 631b3a0..a66080c 100644 --- a/services/video-render/requirements.txt +++ b/services/video-render/requirements.txt @@ -4,6 +4,7 @@ requests==2.32.3 redis>=5.0 httpx>=0.27 openai>=1.50.0 +PyJWT>=2.8.0 pytest>=8.0 pytest-asyncio>=0.24 respx>=0.21