"""FastAPI entrypoint for video-lab. POST /api/video/generate — provider + params → Redis push → task_id 반환 GET /api/video/tasks/{id} — DB 조회 """ from __future__ import annotations import json import logging import os import uuid from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional import redis.asyncio as aioredis from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from . import db from .internal_router import router as internal_router logger = logging.getLogger(__name__) CORS_ALLOW_ORIGINS = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080") REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379") redis_client = aioredis.from_url(REDIS_URL, decode_responses=False) SUPPORTED_PROVIDERS = {"sora", "veo", "kling", "seedance"} app = FastAPI() app.include_router(internal_router) app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in CORS_ALLOW_ORIGINS.split(",")], allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"], allow_headers=["Content-Type"], ) @app.on_event("startup") def on_startup(): db.init_db() @app.get("/health") def health(): return {"ok": True, "service": "video-lab"} @app.get("/api/video/providers") def list_providers(): """4 provider 항상 노출 (key 누락은 worker가 failed 보고).""" return {"providers": [ {"id": "sora", "name": "Sora 2", "models": ["sora-2", "sora-2-pro"], "durations": [8, 16, 20], "sizes": ["1280x720", "1920x1080", "1080x1920", "848x480", "480x848"]}, {"id": "veo", "name": "Veo 3.1", "models": ["veo-3.1-generate-001", "veo-3.1-fast-generate-001"], "durations": [4, 6, 8], "aspect_ratios": ["16:9", "9:16"]}, {"id": "kling", "name": "Kling", "models": ["1.5", "1.6", "2.1", "2.1-master", "2.5", "2.6"], "durations": [5, 10], "aspect_ratios": ["16:9", "9:16", "1:1"]}, {"id": "seedance", "name": "Seedance 2.0", "models": ["seedance-2.0"], "durations": [4, 5, 6, 8, 10, 12, 15], "aspect_ratios": ["16:9", "9:16", "1:1", "4:3"]}, ]} class GenerateRequest(BaseModel): provider: str = Field(..., description="sora|veo|kling|seedance") model: Optional[str] = None prompt: str # Optional 공통 duration: Optional[int] = None aspect_ratio: Optional[str] = None image_url: Optional[str] = None negative_prompt: Optional[str] = None # Provider 별 추가 키는 extra 허용 extra: Optional[Dict[str, Any]] = None class Config: extra = "allow" async def _push_render_job(task_id: str, job_type: str, params: dict) -> None: """Redis queue:video-render에 push.""" kst = timezone(timedelta(hours=9)) payload = { "task_id": task_id, "kind": "video", "job_type": job_type, "params": params, "submitted_at": datetime.now(kst).isoformat(), } await redis_client.rpush("queue:video-render", json.dumps(payload)) @app.post("/api/video/generate") async def generate_video(req: GenerateRequest): """영상 생성 — Redis 큐로 Windows video-render에 위임.""" if req.provider not in SUPPORTED_PROVIDERS: raise HTTPException(400, f"지원하지 않는 provider: {req.provider} (supported: {sorted(SUPPORTED_PROVIDERS)})") task_id = str(uuid.uuid4()) params = req.model_dump(exclude_none=True) db.create_task(task_id, req.provider, params) job_type = f"{req.provider}_generation" # sora_generation, veo_generation, kling_generation, seedance_generation await _push_render_job(task_id, job_type, params) return {"task_id": task_id, "provider": req.provider} @app.get("/api/video/tasks/{task_id}") def get_task_status(task_id: str): t = db.get_task(task_id) if not t: raise HTTPException(404, "task not found") return t