fix(video-render): F6 ReliableQueue 적용 (F6 part 4)
- worker.py: poll_once + ReliableQueue + startup recovery - 4 provider (sora/veo/kling/seedance) dispatch table 보존 - Dockerfile: build context=services/, _shared 포함, PYTHONPATH=/app - docker-compose.yml: video-render build context 갱신 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
"""Redis BLPOP worker — queue:video-render → job_type 디스패치 → NAS webhook.
|
||||
"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
|
||||
|
||||
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
||||
Plan-B-Music worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
|
||||
string-based dispatch + getattr (테스트 patch 호환).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -18,6 +18,7 @@ from providers.sora import run_sora_generation
|
||||
from providers.veo import run_veo_generation
|
||||
from providers.kling import run_kling_generation
|
||||
from providers.seedance import run_seedance_generation
|
||||
from _shared.reliable_queue import ReliableQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -53,25 +54,42 @@ def _dispatch(payload: dict) -> None:
|
||||
fn(task_id, params)
|
||||
|
||||
|
||||
async def poll_once(queue: ReliableQueue) -> bool:
|
||||
"""F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled."""
|
||||
result = await queue.dequeue(timeout=5)
|
||||
if result is None:
|
||||
return False
|
||||
payload, raw = result
|
||||
try:
|
||||
await asyncio.to_thread(_dispatch, payload)
|
||||
except Exception:
|
||||
logger.exception("dispatch unhandled exception task_id=%s",
|
||||
payload.get("task_id"))
|
||||
await queue.fail(raw, payload)
|
||||
return True
|
||||
await queue.ack(raw)
|
||||
return True
|
||||
|
||||
|
||||
async def worker_loop():
|
||||
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||
logger.info("video-render worker started (queue=%s)", QUEUE_KEY)
|
||||
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
|
||||
logger.info("video-render worker started worker_id=%s queue=%s",
|
||||
queue.worker_id, QUEUE_KEY)
|
||||
try:
|
||||
recovered = await queue.recover()
|
||||
if recovered:
|
||||
logger.info("recovered %d orphaned items at startup", recovered)
|
||||
except Exception:
|
||||
logger.exception("startup recover failed")
|
||||
|
||||
while True:
|
||||
try:
|
||||
paused = await redis.get(PAUSED_KEY)
|
||||
if paused == b"1":
|
||||
await asyncio.sleep(10)
|
||||
continue
|
||||
item = await redis.blpop(QUEUE_KEY, timeout=1)
|
||||
if item is None:
|
||||
continue
|
||||
_, raw = item
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
logger.error("invalid queue payload: %r", raw[:200])
|
||||
continue
|
||||
await asyncio.to_thread(_dispatch, payload)
|
||||
await poll_once(queue)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("worker_loop cancelled")
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user