Files
ai-trade/services/video-render/worker.py
gahusb f79c5c26df fix(video-render): F6 ReliableQueue 적용 (F6 part 4)
- worker.py: poll_once + ReliableQueue + startup recovery
- 4 provider (sora/veo/kling/seedance) dispatch table 보존
- Dockerfile: build context=services/, _shared 포함, PYTHONPATH=/app
- docker-compose.yml: video-render build context 갱신

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 20:16:01 +09:00

99 lines
3.4 KiB
Python

"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
string-based dispatch + getattr (테스트 patch 호환).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
import redis.asyncio as aioredis
from nas_client import webhook_update_task
from providers.sora import run_sora_generation
from providers.veo import run_veo_generation
from providers.kling import run_kling_generation
from providers.seedance import run_seedance_generation
from _shared.reliable_queue import ReliableQueue
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:video-render"
PAUSED_KEY = "queue:paused"
# string names so `unittest.mock.patch` on `worker.<name>` is correctly intercepted
_DISPATCH_TABLE = {
"sora_generation": "run_sora_generation",
"veo_generation": "run_veo_generation",
"kling_generation": "run_kling_generation",
"seedance_generation": "run_seedance_generation",
}
def _dispatch(payload: dict) -> None:
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(sys.modules[__name__], fn_name)
except AttributeError:
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def poll_once(queue: ReliableQueue) -> bool:
"""F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled."""
result = await queue.dequeue(timeout=5)
if result is None:
return False
payload, raw = result
try:
await asyncio.to_thread(_dispatch, payload)
except Exception:
logger.exception("dispatch unhandled exception task_id=%s",
payload.get("task_id"))
await queue.fail(raw, payload)
return True
await queue.ack(raw)
return True
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
logger.info("video-render worker started worker_id=%s queue=%s",
queue.worker_id, QUEUE_KEY)
try:
recovered = await queue.recover()
if recovered:
logger.info("recovered %d orphaned items at startup", recovered)
except Exception:
logger.exception("startup recover failed")
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
await poll_once(queue)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)