fix(image-render): F6 ReliableQueue 적용 (F6 part 5)

- worker.py: poll_once + ReliableQueue + startup recovery
- 3 provider (gpt_image/nano_banana/flux) dispatch table 보존
- Dockerfile: build context=services/, _shared 포함, PYTHONPATH=/app
- docker-compose.yml: image-render build context 갱신

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 20:17:08 +09:00
parent f79c5c26df
commit 43ee610780
6 changed files with 180 additions and 16 deletions

View File

@@ -101,7 +101,8 @@ services:
image-render:
build:
context: ./image-render
context: .
dockerfile: image-render/Dockerfile
container_name: image-render
restart: unless-stopped
ports:

View File

@@ -7,10 +7,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
COPY image-render/requirements.txt /app/
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
COPY . .
# F6: 공통 ReliableQueue 모듈 (services/_shared)
COPY _shared /app/_shared
COPY image-render/. /app/
ENV PYTHONPATH=/app
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

View File

@@ -0,0 +1,5 @@
"""Make services/ root importable so `from _shared.reliable_queue import ...` works during tests."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

View File

@@ -0,0 +1,83 @@
{
"5": {
"inputs": {
"width": 1024,
"height": 1024,
"batch_size": 1
},
"class_type": "EmptyLatentImage",
"_meta": {"title": "Empty Latent Image"}
},
"6": {
"inputs": {
"text": "%PROMPT%",
"clip": ["11", 0]
},
"class_type": "CLIPTextEncode",
"_meta": {"title": "Positive Prompt"}
},
"8": {
"inputs": {
"samples": ["13", 0],
"vae": ["10", 0]
},
"class_type": "VAEDecode",
"_meta": {"title": "VAE Decode"}
},
"9": {
"inputs": {
"filename_prefix": "flux",
"images": ["8", 0]
},
"class_type": "SaveImage",
"_meta": {"title": "Save Image"}
},
"10": {
"inputs": {
"vae_name": "ae.safetensors"
},
"class_type": "VAELoader",
"_meta": {"title": "Load VAE"}
},
"11": {
"inputs": {
"clip_name1": "clip_l.safetensors",
"clip_name2": "t5xxl_fp8_e4m3fn.safetensors",
"type": "flux"
},
"class_type": "DualCLIPLoader",
"_meta": {"title": "Dual CLIP Loader"}
},
"12": {
"inputs": {
"unet_name": "flux1-schnell-fp8.safetensors",
"weight_dtype": "default"
},
"class_type": "UNETLoader",
"_meta": {"title": "Load Diffusion Model"}
},
"13": {
"inputs": {
"seed": 0,
"steps": 4,
"cfg": 1.0,
"sampler_name": "euler",
"scheduler": "simple",
"denoise": 1.0,
"model": ["12", 0],
"positive": ["6", 0],
"negative": ["33", 0],
"latent_image": ["5", 0]
},
"class_type": "KSampler",
"_meta": {"title": "KSampler"}
},
"33": {
"inputs": {
"text": "",
"clip": ["11", 0]
},
"class_type": "CLIPTextEncode",
"_meta": {"title": "Negative Prompt (empty for Schnell)"}
}
}

View File

@@ -1,3 +1,8 @@
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
import worker
@@ -13,3 +18,52 @@ def test_dispatch_unknown_job_type_reports_failed(monkeypatch):
monkeypatch.setattr(worker, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
worker._dispatch({"job_type": "midjourney_generation", "task_id": "t9", "params": {}})
assert calls[-1][0][1] == "failed"
# ----- F6: ReliableQueue poll_once -----
@pytest.mark.asyncio
async def test_poll_once_acks_on_success(monkeypatch):
payload = {"task_id": "t1", "job_type": "gpt_image_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
monkeypatch.setattr(worker, "_dispatch", MagicMock())
handled = await worker.poll_once(fake_queue)
assert handled is True
fake_queue.ack.assert_awaited_once_with(raw)
fake_queue.fail.assert_not_awaited()
@pytest.mark.asyncio
async def test_poll_once_calls_fail_on_dispatch_exception(monkeypatch):
payload = {"task_id": "t2", "job_type": "gpt_image_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
def _boom(p):
raise RuntimeError("dispatch crash")
monkeypatch.setattr(worker, "_dispatch", _boom)
handled = await worker.poll_once(fake_queue)
assert handled is True
fake_queue.fail.assert_awaited_once_with(raw, payload)
fake_queue.ack.assert_not_awaited()
@pytest.mark.asyncio
async def test_poll_once_returns_false_on_timeout(monkeypatch):
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=None)
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
monkeypatch.setattr(worker, "_dispatch", MagicMock())
handled = await worker.poll_once(fake_queue)
assert handled is False
fake_queue.ack.assert_not_awaited()
fake_queue.fail.assert_not_awaited()

View File

@@ -1,7 +1,7 @@
"""Redis BLPOP worker — queue:image-render → job_type dispatch → NAS webhook.
"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery).
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
video-render worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
string-based dispatch + getattr (테스트 patch 호환).
"""
from __future__ import annotations
@@ -17,6 +17,7 @@ from nas_client import webhook_update_task
from providers.gpt_image import run_gpt_image_generation
from providers.nano_banana import run_nano_banana_generation
from providers.flux import run_flux_generation
from _shared.reliable_queue import ReliableQueue
logger = logging.getLogger(__name__)
@@ -52,25 +53,42 @@ def _dispatch(payload: dict) -> None:
fn(task_id, params)
async def poll_once(queue: ReliableQueue) -> bool:
"""F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled."""
result = await queue.dequeue(timeout=5)
if result is None:
return False
payload, raw = result
try:
await asyncio.to_thread(_dispatch, payload)
except Exception:
logger.exception("dispatch unhandled exception task_id=%s",
payload.get("task_id"))
await queue.fail(raw, payload)
return True
await queue.ack(raw)
return True
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
logger.info("image-render worker started (queue=%s)", QUEUE_KEY)
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
logger.info("image-render worker started worker_id=%s queue=%s",
queue.worker_id, QUEUE_KEY)
try:
recovered = await queue.recover()
if recovered:
logger.info("recovered %d orphaned items at startup", recovered)
except Exception:
logger.exception("startup recover failed")
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
item = await redis.blpop(QUEUE_KEY, timeout=5)
if item is None:
continue
_, raw = item
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid queue payload: %r", raw[:200])
continue
await asyncio.to_thread(_dispatch, payload)
await poll_once(queue)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise