feat(video-lab): /api/internal/video/update endpoint + tests (SP-8)

UpdatePayload schema (task_id/status/progress/message/video_url/error).
404 if task not found. insta/music-lab과 동일 패턴 + video_url 필드.
Plan-B-Video Phase 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-19 08:29:05 +09:00
parent 21cf0114f4
commit e8dbf8092a
2 changed files with 141 additions and 0 deletions

View File

@@ -0,0 +1,52 @@
"""SP-8 — Windows video-render → NAS video-lab internal webhook.
POST /api/internal/video/update
- X-Internal-Key 인증 필수
- video_tasks row update (status, progress, message, video_url, error)
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from . import db
from .auth import verify_internal_key
logger = logging.getLogger(__name__)
router = APIRouter()
class UpdatePayload(BaseModel):
task_id: str
status: str = Field(..., description="processing|succeeded|failed")
progress: int = Field(..., ge=0, le=100)
message: str = ""
video_url: Optional[str] = None
error: Optional[str] = None
@router.post(
"/api/internal/video/update",
dependencies=[Depends(verify_internal_key)],
)
def video_update(payload: UpdatePayload):
task = db.get_task(payload.task_id)
if task is None:
raise HTTPException(404, f"task not found: {payload.task_id}")
db.update_task(
payload.task_id,
payload.status,
payload.progress,
message=payload.message,
video_url=payload.video_url,
error=payload.error,
)
logger.info(
"internal/video/update task=%s status=%s progress=%d",
payload.task_id, payload.status, payload.progress,
)
return {"ok": True}