- worker.py: poll_once + ReliableQueue + startup recovery - 3 provider (gpt_image/nano_banana/flux) dispatch table 보존 - Dockerfile: build context=services/, _shared 포함, PYTHONPATH=/app - docker-compose.yml: image-render build context 갱신 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
103 lines
3.5 KiB
Python
103 lines
3.5 KiB
Python
"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
|
|
|
|
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
|
string-based dispatch + getattr (테스트 patch 호환).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
import redis.asyncio as aioredis
|
|
|
|
from nas_client import webhook_update_task
|
|
from providers.gpt_image import run_gpt_image_generation
|
|
from providers.nano_banana import run_nano_banana_generation
|
|
from providers.flux import run_flux_generation
|
|
from _shared.reliable_queue import ReliableQueue
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
|
QUEUE_KEY = "queue:image-render"
|
|
PAUSED_KEY = "queue:paused"
|
|
|
|
# string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.<name>`
|
|
# is correctly intercepted by getattr(sys.modules[__name__], ...)
|
|
_DISPATCH_TABLE = {
|
|
"gpt_image_generation": "run_gpt_image_generation",
|
|
"nano_banana_generation": "run_nano_banana_generation",
|
|
"flux_generation": "run_flux_generation",
|
|
}
|
|
|
|
|
|
def _dispatch(payload: dict) -> None:
|
|
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
|
|
job_type = payload.get("job_type", "")
|
|
task_id = payload.get("task_id", "")
|
|
params = payload.get("params", {})
|
|
fn_name = _DISPATCH_TABLE.get(job_type)
|
|
if fn_name is None:
|
|
logger.error("unknown job_type=%s task=%s", job_type, task_id)
|
|
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
|
|
return
|
|
try:
|
|
fn = getattr(sys.modules[__name__], fn_name)
|
|
except AttributeError:
|
|
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
|
|
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
|
|
return
|
|
fn(task_id, params)
|
|
|
|
|
|
async def poll_once(queue: ReliableQueue) -> bool:
|
|
"""F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled."""
|
|
result = await queue.dequeue(timeout=5)
|
|
if result is None:
|
|
return False
|
|
payload, raw = result
|
|
try:
|
|
await asyncio.to_thread(_dispatch, payload)
|
|
except Exception:
|
|
logger.exception("dispatch unhandled exception task_id=%s",
|
|
payload.get("task_id"))
|
|
await queue.fail(raw, payload)
|
|
return True
|
|
await queue.ack(raw)
|
|
return True
|
|
|
|
|
|
async def worker_loop():
|
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
|
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
|
|
logger.info("image-render worker started worker_id=%s queue=%s",
|
|
queue.worker_id, QUEUE_KEY)
|
|
try:
|
|
recovered = await queue.recover()
|
|
if recovered:
|
|
logger.info("recovered %d orphaned items at startup", recovered)
|
|
except Exception:
|
|
logger.exception("startup recover failed")
|
|
|
|
while True:
|
|
try:
|
|
paused = await redis.get(PAUSED_KEY)
|
|
if paused == b"1":
|
|
await asyncio.sleep(10)
|
|
continue
|
|
await poll_once(queue)
|
|
except asyncio.CancelledError:
|
|
logger.info("worker_loop cancelled")
|
|
raise
|
|
except Exception:
|
|
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
|
|
await asyncio.sleep(5)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
asyncio.run(worker_loop())
|