"""SP-8 — Windows video-render → NAS video-lab internal webhook. POST /api/internal/video/update - X-Internal-Key 인증 필수 - video_tasks row update (status, progress, message, video_url, error) """ from __future__ import annotations import logging from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from . import db from .auth import verify_internal_key logger = logging.getLogger(__name__) router = APIRouter() class UpdatePayload(BaseModel): task_id: str status: str = Field(..., description="processing|succeeded|failed") progress: int = Field(..., ge=0, le=100) message: str = "" video_url: Optional[str] = None error: Optional[str] = None @router.post( "/api/internal/video/update", dependencies=[Depends(verify_internal_key)], ) def video_update(payload: UpdatePayload): task = db.get_task(payload.task_id) if task is None: raise HTTPException(404, f"task not found: {payload.task_id}") db.update_task( payload.task_id, payload.status, payload.progress, message=payload.message, video_url=payload.video_url, error=payload.error, ) logger.info( "internal/video/update task=%s status=%s progress=%d", payload.task_id, payload.status, payload.progress, ) return {"ok": True}