From 43ee61078014379b61ef67894160830612cba8af Mon Sep 17 00:00:00 2001 From: gahusb Date: Mon, 25 May 2026 20:17:08 +0900 Subject: [PATCH] =?UTF-8?q?fix(image-render):=20F6=20ReliableQueue=20?= =?UTF-8?q?=EC=A0=81=EC=9A=A9=20(F6=20part=205)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - worker.py: poll_once + ReliableQueue + startup recovery - 3 provider (gpt_image/nano_banana/flux) dispatch table 보존 - Dockerfile: build context=services/, _shared 포함, PYTHONPATH=/app - docker-compose.yml: image-render build context 갱신 Co-Authored-By: Claude Opus 4.7 (1M context) --- services/docker-compose.yml | 3 +- services/image-render/Dockerfile | 7 +- services/image-render/conftest.py | 5 ++ .../image-render/providers/flux_workflow.json | 83 +++++++++++++++++++ services/image-render/tests/test_worker.py | 54 ++++++++++++ services/image-render/worker.py | 44 +++++++--- 6 files changed, 180 insertions(+), 16 deletions(-) create mode 100644 services/image-render/conftest.py create mode 100644 services/image-render/providers/flux_workflow.json diff --git a/services/docker-compose.yml b/services/docker-compose.yml index d14d305..f7b7c76 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -101,7 +101,8 @@ services: image-render: build: - context: ./image-render + context: . + dockerfile: image-render/Dockerfile container_name: image-render restart: unless-stopped ports: diff --git a/services/image-render/Dockerfile b/services/image-render/Dockerfile index ab72368..ec391d5 100644 --- a/services/image-render/Dockerfile +++ b/services/image-render/Dockerfile @@ -7,10 +7,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ && rm -rf /var/lib/apt/lists/* -COPY requirements.txt . +COPY image-render/requirements.txt /app/ RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt -COPY . . +# F6: 공통 ReliableQueue 모듈 (services/_shared) +COPY _shared /app/_shared +COPY image-render/. /app/ +ENV PYTHONPATH=/app EXPOSE 8000 CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"] diff --git a/services/image-render/conftest.py b/services/image-render/conftest.py new file mode 100644 index 0000000..b2e8533 --- /dev/null +++ b/services/image-render/conftest.py @@ -0,0 +1,5 @@ +"""Make services/ root importable so `from _shared.reliable_queue import ...` works during tests.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) diff --git a/services/image-render/providers/flux_workflow.json b/services/image-render/providers/flux_workflow.json new file mode 100644 index 0000000..2dab30f --- /dev/null +++ b/services/image-render/providers/flux_workflow.json @@ -0,0 +1,83 @@ +{ + "5": { + "inputs": { + "width": 1024, + "height": 1024, + "batch_size": 1 + }, + "class_type": "EmptyLatentImage", + "_meta": {"title": "Empty Latent Image"} + }, + "6": { + "inputs": { + "text": "%PROMPT%", + "clip": ["11", 0] + }, + "class_type": "CLIPTextEncode", + "_meta": {"title": "Positive Prompt"} + }, + "8": { + "inputs": { + "samples": ["13", 0], + "vae": ["10", 0] + }, + "class_type": "VAEDecode", + "_meta": {"title": "VAE Decode"} + }, + "9": { + "inputs": { + "filename_prefix": "flux", + "images": ["8", 0] + }, + "class_type": "SaveImage", + "_meta": {"title": "Save Image"} + }, + "10": { + "inputs": { + "vae_name": "ae.safetensors" + }, + "class_type": "VAELoader", + "_meta": {"title": "Load VAE"} + }, + "11": { + "inputs": { + "clip_name1": "clip_l.safetensors", + "clip_name2": "t5xxl_fp8_e4m3fn.safetensors", + "type": "flux" + }, + "class_type": "DualCLIPLoader", + "_meta": {"title": "Dual CLIP Loader"} + }, + "12": { + "inputs": { + "unet_name": "flux1-schnell-fp8.safetensors", + "weight_dtype": "default" + }, + "class_type": "UNETLoader", + "_meta": {"title": "Load Diffusion Model"} + }, + "13": { + "inputs": { + "seed": 0, + "steps": 4, + "cfg": 1.0, + "sampler_name": "euler", + "scheduler": "simple", + "denoise": 1.0, + "model": ["12", 0], + "positive": ["6", 0], + "negative": ["33", 0], + "latent_image": ["5", 0] + }, + "class_type": "KSampler", + "_meta": {"title": "KSampler"} + }, + "33": { + "inputs": { + "text": "", + "clip": ["11", 0] + }, + "class_type": "CLIPTextEncode", + "_meta": {"title": "Negative Prompt (empty for Schnell)"} + } +} diff --git a/services/image-render/tests/test_worker.py b/services/image-render/tests/test_worker.py index 879bb6c..207b737 100644 --- a/services/image-render/tests/test_worker.py +++ b/services/image-render/tests/test_worker.py @@ -1,3 +1,8 @@ +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + import worker @@ -13,3 +18,52 @@ def test_dispatch_unknown_job_type_reports_failed(monkeypatch): monkeypatch.setattr(worker, "webhook_update_task", lambda *a, **k: calls.append((a, k))) worker._dispatch({"job_type": "midjourney_generation", "task_id": "t9", "params": {}}) assert calls[-1][0][1] == "failed" + + +# ----- F6: ReliableQueue poll_once ----- + +@pytest.mark.asyncio +async def test_poll_once_acks_on_success(monkeypatch): + payload = {"task_id": "t1", "job_type": "gpt_image_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + monkeypatch.setattr(worker, "_dispatch", MagicMock()) + handled = await worker.poll_once(fake_queue) + assert handled is True + fake_queue.ack.assert_awaited_once_with(raw) + fake_queue.fail.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_calls_fail_on_dispatch_exception(monkeypatch): + payload = {"task_id": "t2", "job_type": "gpt_image_generation", "params": {}} + raw = json.dumps(payload).encode() + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + + def _boom(p): + raise RuntimeError("dispatch crash") + + monkeypatch.setattr(worker, "_dispatch", _boom) + handled = await worker.poll_once(fake_queue) + assert handled is True + fake_queue.fail.assert_awaited_once_with(raw, payload) + fake_queue.ack.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_poll_once_returns_false_on_timeout(monkeypatch): + fake_queue = AsyncMock() + fake_queue.dequeue = AsyncMock(return_value=None) + fake_queue.ack = AsyncMock() + fake_queue.fail = AsyncMock() + monkeypatch.setattr(worker, "_dispatch", MagicMock()) + handled = await worker.poll_once(fake_queue) + assert handled is False + fake_queue.ack.assert_not_awaited() + fake_queue.fail.assert_not_awaited() diff --git a/services/image-render/worker.py b/services/image-render/worker.py index 963a900..03ab021 100644 --- a/services/image-render/worker.py +++ b/services/image-render/worker.py @@ -1,7 +1,7 @@ -"""Redis BLPOP worker — queue:image-render → job_type dispatch → NAS webhook. +"""Redis ReliableQueue worker — F6 신뢰성 패턴 (BLMOVE + ack/fail + recovery). queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set). -video-render worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환). +string-based dispatch + getattr (테스트 patch 호환). """ from __future__ import annotations @@ -17,6 +17,7 @@ from nas_client import webhook_update_task from providers.gpt_image import run_gpt_image_generation from providers.nano_banana import run_nano_banana_generation from providers.flux import run_flux_generation +from _shared.reliable_queue import ReliableQueue logger = logging.getLogger(__name__) @@ -52,25 +53,42 @@ def _dispatch(payload: dict) -> None: fn(task_id, params) +async def poll_once(queue: ReliableQueue) -> bool: + """F6 — 1 cycle: dequeue → _dispatch → ack/fail. Returns True if a job handled.""" + result = await queue.dequeue(timeout=5) + if result is None: + return False + payload, raw = result + try: + await asyncio.to_thread(_dispatch, payload) + except Exception: + logger.exception("dispatch unhandled exception task_id=%s", + payload.get("task_id")) + await queue.fail(raw, payload) + return True + await queue.ack(raw) + return True + + async def worker_loop(): redis = aioredis.from_url(REDIS_URL, decode_responses=False) - logger.info("image-render worker started (queue=%s)", QUEUE_KEY) + queue = ReliableQueue(redis, queue_key=QUEUE_KEY) + logger.info("image-render worker started worker_id=%s queue=%s", + queue.worker_id, QUEUE_KEY) + try: + recovered = await queue.recover() + if recovered: + logger.info("recovered %d orphaned items at startup", recovered) + except Exception: + logger.exception("startup recover failed") + while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue - item = await redis.blpop(QUEUE_KEY, timeout=5) - if item is None: - continue - _, raw = item - try: - payload = json.loads(raw) - except json.JSONDecodeError: - logger.error("invalid queue payload: %r", raw[:200]) - continue - await asyncio.to_thread(_dispatch, payload) + await poll_once(queue) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise