25 KiB
Render Queue Reliability — Code Review F6 Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 4개 render worker(insta/music/video/image-render)가 BLPOP 직후 crash 시 작업 손실되는 문제 해결. BLMOVE(또는 BRPOPLPUSH)로 atomic dequeue + processing list 패턴 + startup recovery + retry/dead-letter.
Architecture:
- 각 worker가 unique
worker_id보유:<queue>-<hostname>-<pid>(env로 override 가능). - atomic dequeue:
BLMOVE queue:<x>-render processing:<x>-render:<worker_id> RIGHT LEFT 5— 5초 timeout. (BRPOPLPUSH는 Redis 6.2+ deprecated,BLMOVE가 후속). - 작업 성공:
LREM processing:<x>-render:<worker_id> 1 <payload>— 정확 1개 제거. - 작업 실패: payload에
attemptscounter 증가시켜 main queue 끝으로 LPUSH; 한계(기본 3) 초과 시dead_letter:<queue>로 이동. - Startup recovery: worker 시작 시 자신의 processing list가 비어있지 않으면 → 모두 main queue로 되돌림 (재시도). attempts 증가.
- NAS측 producer는 무변경 (LPUSH 그대로). 단, payload schema에
attempts: int(optional) 필드 명시 — producer는 안 채워도 worker가 default 0으로.
Shared module 전략: 4개 worker가 동일 패턴이므로 services/_shared/reliable_queue.py 1개 만들고 각 Dockerfile에서 COPY services/_shared /app/_shared 후 from _shared.reliable_queue import ReliableQueue. compose entry/dockerfile 변경 4건. (DRY > inline 4중복.)
Tech Stack: Python 3.12, redis.asyncio 5.x, fakeredis (pytest dep), pytest-asyncio.
Working directory: C:\Users\jaeoh\Desktop\workspace\web-ai.
File Map
| 파일 | 변경 | 책임 |
|---|---|---|
services/_shared/__init__.py |
Create | namespace package |
services/_shared/reliable_queue.py |
Create | ReliableQueue 클래스 — dequeue, ack, fail, recover |
services/_shared/tests/test_reliable_queue.py |
Create | fakeredis 단위 테스트 6개 |
services/_shared/requirements.txt |
Create | redis>=5.0, fakeredis (test only) |
services/insta-render/Dockerfile |
Modify | COPY services/_shared /app/_shared + PYTHONPATH |
services/insta-render/worker.py |
Modify L1~ | BLPOP → ReliableQueue 사용 |
services/insta-render/tests/test_worker.py |
Append | 1 integration test (recovery) |
services/music-render/Dockerfile |
Modify | shared copy |
services/music-render/worker.py |
Modify | ReliableQueue 사용 |
services/music-render/tests/test_worker.py |
Append | recovery test |
services/video-render/Dockerfile |
Modify | shared copy |
services/video-render/worker.py |
Modify | ReliableQueue 사용 |
services/video-render/tests/test_worker.py |
Append | recovery test |
services/image-render/Dockerfile |
Modify | shared copy |
services/image-render/worker.py |
Modify | ReliableQueue 사용 |
services/image-render/tests/test_worker.py |
Append | recovery test |
services/docker-compose.yml (있다면) |
Verify | build context가 services/ 루트 포함하는지 |
Task 1: ReliableQueue 공유 모듈 작성
Files:
-
Create:
services/_shared/__init__.py -
Create:
services/_shared/reliable_queue.py -
Create:
services/_shared/tests/__init__.py -
Create:
services/_shared/tests/test_reliable_queue.py -
Create:
services/_shared/requirements.txt -
Step 1: Create namespace package
# services/_shared/__init__.py
(빈 파일)
# services/_shared/tests/__init__.py
(빈 파일)
- Step 2: Write failing tests first
# services/_shared/tests/test_reliable_queue.py
"""F6 — ReliableQueue: atomic dequeue + recovery + retry."""
import json
import fakeredis.aioredis
import pytest
from _shared.reliable_queue import ReliableQueue
@pytest.fixture
async def redis():
r = fakeredis.aioredis.FakeRedis(decode_responses=False)
yield r
await r.flushall()
await r.aclose()
async def test_dequeue_atomically_moves_to_processing(redis):
"""BLMOVE: queue → processing 원자적 이동."""
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
payload, raw = await q.dequeue(timeout=1)
assert payload["task_id"] == "t1"
# main queue는 비어있고, processing list에 들어있어야 함
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 1
async def test_dequeue_returns_none_on_timeout(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
result = await q.dequeue(timeout=1)
assert result is None
async def test_ack_removes_from_processing(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
payload, raw = await q.dequeue(timeout=1)
await q.ack(raw)
assert await redis.llen("processing:queue:test:w1") == 0
async def test_recover_returns_orphaned_to_main_queue(redis):
"""startup recovery: 잔존 processing list 항목을 main queue로 되돌림."""
# 이전 crash 시뮬레이션: processing list에 잔존
orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode()
await redis.lpush("processing:queue:test:w1", orphan)
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
recovered = await q.recover()
assert recovered == 1
assert await redis.llen("processing:queue:test:w1") == 0
# 다시 dequeue 가능
payload, raw = await q.dequeue(timeout=1)
assert payload["task_id"] == "t1"
assert payload["attempts"] == 1 # incremented on recover
async def test_fail_below_max_attempts_returns_to_main_queue(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode())
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("queue:test") == 1
# attempts 증가됐는지
requeued_raw = await redis.lindex("queue:test", 0)
requeued = json.loads(requeued_raw)
assert requeued["attempts"] == 1
async def test_fail_at_max_attempts_moves_to_dead_letter(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush(
"queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode()
)
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
# attempts 2 → 3 (== max) → dead-letter
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("dead_letter:queue:test") == 1
- Step 3: Add requirements
# services/_shared/requirements.txt
redis>=5.0.0
별도 dev requirements (test):
# services/_shared/tests/requirements-dev.txt (optional)
fakeredis>=2.20.0
pytest>=8.0.0
pytest-asyncio>=0.23.0
- Step 4: Run tests to verify they fail
cd services/_shared && python -m pytest tests/ -v
Expected: ImportError — reliable_queue.py 미존재.
- Step 5: Write reliable_queue.py
# services/_shared/reliable_queue.py
"""F6 — Reliable Redis queue with processing list + recovery + retry.
Pattern: BLMOVE main → processing (atomic), then either ack (LREM processing) or
fail (LREM processing + re-enqueue or dead-letter).
Startup recovery: any items left in the worker's processing list from a previous
crash are pushed back to main queue (with attempts incremented).
"""
from __future__ import annotations
import json
import logging
import os
import socket
from typing import Optional
logger = logging.getLogger(__name__)
def default_worker_id(queue_key: str) -> str:
"""env > hostname-pid."""
explicit = os.getenv("WORKER_ID")
if explicit:
return explicit
return f"{queue_key}-{socket.gethostname()}-{os.getpid()}"
class ReliableQueue:
"""Wraps a redis client to provide BLMOVE-backed atomic dequeue +
processing list + retry/dead-letter.
Producer side stays unchanged: LPUSH queue:<x> <json payload>.
Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error.
Startup: await queue.recover() to re-enqueue orphans.
"""
def __init__(
self,
redis,
queue_key: str,
worker_id: Optional[str] = None,
max_attempts: int = 3,
):
self._redis = redis
self._queue_key = queue_key
self._worker_id = worker_id or default_worker_id(queue_key)
self._processing_key = f"processing:{queue_key}:{self._worker_id}"
self._dead_letter_key = f"dead_letter:{queue_key}"
self._max_attempts = max_attempts
@property
def processing_key(self) -> str:
return self._processing_key
async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]:
"""Atomically move 1 item from main queue tail to processing head.
Returns (parsed_dict, raw_bytes) or None on timeout.
Caller MUST call ack(raw) on success or fail(raw, payload) on error.
"""
raw = await self._redis.blmove(
self._queue_key, self._processing_key,
timeout=timeout, src="RIGHT", dest="LEFT",
)
if raw is None:
return None
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid payload on dequeue, moving to dead-letter: %r", raw[:200])
await self._redis.lrem(self._processing_key, 1, raw)
await self._redis.lpush(self._dead_letter_key, raw)
return None
return payload, raw
async def ack(self, raw: bytes) -> None:
"""Successful processing — remove from processing list."""
removed = await self._redis.lrem(self._processing_key, 1, raw)
if removed == 0:
logger.warning("ack on missing payload (already removed?): %r", raw[:100])
async def fail(self, raw: bytes, payload: dict) -> None:
"""Failed processing — remove from processing list and either re-enqueue or dead-letter."""
await self._redis.lrem(self._processing_key, 1, raw)
attempts = int(payload.get("attempts", 0)) + 1
if attempts >= self._max_attempts:
payload["attempts"] = attempts
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
logger.error(
"task moved to dead-letter after %d attempts: task_id=%s",
attempts, payload.get("task_id"),
)
return
payload["attempts"] = attempts
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
logger.info(
"task re-enqueued (attempt %d/%d): task_id=%s",
attempts, self._max_attempts, payload.get("task_id"),
)
async def recover(self) -> int:
"""Startup: move all orphans from this worker's processing list back to main queue.
Increments attempts counter (orphan == implicit failure).
Returns count of recovered items.
"""
count = 0
while True:
raw = await self._redis.lpop(self._processing_key)
if raw is None:
break
try:
payload = json.loads(raw)
except json.JSONDecodeError:
await self._redis.lpush(self._dead_letter_key, raw)
count += 1
continue
payload["attempts"] = int(payload.get("attempts", 0)) + 1
if payload["attempts"] >= self._max_attempts:
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
else:
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
count += 1
if count:
logger.info("recovered %d orphaned items for worker %s", count, self._worker_id)
return count
참고: redis-py blmove API: client.blmove(first_list, second_list, timeout, src=..., dest=...). timeout=0 은 block forever. payload는 bytes로 받음 (decode_responses=False 가정).
- Step 6: Run tests to verify they pass
cd services/_shared && python -m pytest tests/ -v
Expected: 6 PASS.
만약 ImportError (fakeredis 미설치) 발생 시:
python -m pip install fakeredis pytest-asyncio
또한 pytest.ini 또는 conftest.py에 asyncio_mode = "auto" 필요. 신규 conftest:
# services/_shared/tests/conftest.py
import pytest
pytest_plugins = ["pytest_asyncio"]
def pytest_collection_modifyitems(config, items):
for item in items:
if "asyncio" in item.fixturenames or item.get_closest_marker("asyncio") is not None:
continue
# auto-mark all async tests
if item.function.__name__.startswith("test_"):
import asyncio, inspect
if inspect.iscoroutinefunction(item.function):
item.add_marker(pytest.mark.asyncio)
또는 더 간단히 services/_shared/pytest.ini:
[pytest]
asyncio_mode = auto
- Step 7: Commit
git add services/_shared/
git commit -m "feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)"
Task 2: insta-render에 ReliableQueue 적용
Files:
-
Modify:
services/insta-render/Dockerfile -
Modify:
services/insta-render/worker.py -
Modify:
services/insta-render/tests/test_worker.py(append) -
Step 1: Update Dockerfile
services/insta-render/Dockerfile 에 _shared 복사 추가. 기존 Dockerfile 패턴을 먼저 읽고, COPY services/insta-render /app 같은 라인이 있다면 그 위 또는 옆에:
COPY services/_shared /app/_shared
ENV PYTHONPATH=/app:/app/_shared:${PYTHONPATH}
build context가 services/ 루트여야 함. compose에서 build: { context: ./services, dockerfile: insta-render/Dockerfile } 인지 확인 — 아니라면 context 조정 필요.
- Step 2: Modify worker.py — failing test first
services/insta-render/tests/test_worker.py 끝에 추가:
import json
from unittest.mock import AsyncMock, patch
import pytest
@pytest.mark.asyncio
async def test_worker_calls_ack_on_success():
"""성공 시 ack() 호출 (F6)."""
import worker
fake_payload = {"task_id": "t1", "job_type": "card_generation", "params": {}}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(side_effect=[(fake_payload, fake_raw), None])
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
fake_queue.recover = AsyncMock(return_value=0)
with patch.object(worker, "ReliableQueue", return_value=fake_queue), \
patch.object(worker, "_dispatch") as disp:
# poll_once로 1 cycle만 실행 (실제 loop 끊기 위해)
await worker.poll_once(fake_queue)
disp.assert_called_once()
fake_queue.ack.assert_called_once_with(fake_raw)
fake_queue.fail.assert_not_called()
@pytest.mark.asyncio
async def test_worker_calls_fail_on_dispatch_exception():
"""dispatch 예외 시 fail() 호출 — 작업 손실 안 됨 (F6)."""
import worker
fake_payload = {"task_id": "t2", "job_type": "card_generation", "params": {}}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
with patch.object(worker, "_dispatch", side_effect=RuntimeError("boom")):
await worker.poll_once(fake_queue)
fake_queue.fail.assert_called_once_with(fake_raw, fake_payload)
fake_queue.ack.assert_not_called()
- Step 3: Run test to fail
cd services/insta-render && python -m pytest tests/ -v -k "ack_on_success or fail_on_dispatch"
Expected: AttributeError (worker.poll_once 미존재, worker.ReliableQueue 미존재).
- Step 4: Rewrite insta-render worker.py
"""Redis 기반 worker — F6 신뢰성 패턴 적용 (BLMOVE + processing list + retry)."""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
from _shared.reliable_queue import ReliableQueue
from nas_client import webhook_update_task
# 기존 dispatch 대상 import 유지
from card_renderer import render_card
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:insta-render"
PAUSED_KEY = "queue:paused"
_DISPATCH_TABLE = {
"card_generation": "render_card",
}
def _dispatch(payload: dict) -> None:
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(sys.modules[__name__], fn_name)
except AttributeError:
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def poll_once(queue: ReliableQueue) -> bool:
"""1 cycle: dequeue → dispatch → ack/fail. Returns True if a job was handled."""
result = await queue.dequeue(timeout=5)
if result is None:
return False
payload, raw = result
try:
await asyncio.to_thread(_dispatch, payload)
except Exception:
logger.exception("dispatch failed task_id=%s", payload.get("task_id"))
await queue.fail(raw, payload)
return True
await queue.ack(raw)
return True
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
logger.info("insta-render worker started worker_id=%s", queue._worker_id)
# F6: startup recovery
try:
recovered = await queue.recover()
if recovered:
logger.info("recovered %d orphaned items at startup", recovered)
except Exception:
logger.exception("startup recover failed")
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
await poll_once(queue)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(worker_loop())
NOTE: 기존 insta-render/worker.py의 dispatch table·import는 실제 파일을 보고 매핑 유지. 위 예시는 minimal — job_type / function 이름은 기존 파일과 맞춰야 함. 변경 전 Read services/insta-render/worker.py로 정확한 dispatch table 확인할 것.
- Step 5: Run tests
cd services/insta-render && python -m pytest tests/ -v
Expected: 신규 2 PASS, 기존 PASS (dispatch table test 등).
- Step 6: Commit
git add services/insta-render/
git commit -m "fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)"
Task 3: music-render에 동일 적용
Files:
-
Modify:
services/music-render/Dockerfile,worker.py -
Modify:
services/music-render/tests/test_worker.py(append) -
Step 1: Dockerfile에
COPY services/_shared추가 -
Step 2: Test 추가 (Task 2 패턴 동일, 단 dispatch target은
run_suno_generation등 기존 패턴)
@pytest.mark.asyncio
async def test_music_worker_ack_on_success():
import worker
payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.ack = AsyncMock()
with patch.object(worker, "_dispatch"):
await worker.poll_once(fake_queue)
fake_queue.ack.assert_called_once_with(raw)
@pytest.mark.asyncio
async def test_music_worker_fail_on_exception():
import worker
payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.fail = AsyncMock()
with patch.object(worker, "_dispatch", side_effect=RuntimeError("x")):
await worker.poll_once(fake_queue)
fake_queue.fail.assert_called_once_with(raw, payload)
- Step 3: Run test to fail
- Step 4: Rewrite music-render worker.py —
worker_loop구조는 insta-render와 동일,_dispatch+_DISPATCH_TABLE은 기존 12개 함수 그대로 유지 - Step 5: Run tests
- Step 6: Commit
git add services/music-render/
git commit -m "fix(music-render): F6 ReliableQueue 적용 (F6 part 3)"
Task 4: video-render에 동일 적용
(Task 3와 동일 패턴 — sora/veo/kling/seedance 4 provider table 유지)
- Step 1: Dockerfile 수정
- Step 2: 신규 test 2개 추가 (
test_video_worker_ack_on_success,test_video_worker_fail_on_exception) — job_type은sora_generation - Step 3: Run failing test
- Step 4: Rewrite worker.py — 동일 패턴
- Step 5: Run tests
- Step 6: Commit
git add services/video-render/
git commit -m "fix(video-render): F6 ReliableQueue 적용 (F6 part 4)"
Task 5: image-render에 동일 적용
(gpt_image / nano_banana / flux 3 provider table 유지)
- Step 1-6: Task 3/4 동일 패턴
git add services/image-render/
git commit -m "fix(image-render): F6 ReliableQueue 적용 (F6 part 5)"
Task 6: 운영 검증 + push
- Step 1: 전체 services test 실행
cd services && for d in _shared insta-render music-render video-render image-render; do
echo "--- $d ---"
(cd $d && python -m pytest tests/ -q) || true
done
(또는 PowerShell:)
foreach ($d in @("_shared","insta-render","music-render","video-render","image-render")) {
Write-Output "--- $d ---"
Push-Location services/$d
python -m pytest tests/ -q
Pop-Location
}
Expected: 4개 worker 각 신규 2개 + _shared 6개 + 기존 test 전부 PASS.
- Step 2: Docker build 시뮬 (옵션, 시간 허용 시)
cd services && docker compose build insta-render music-render video-render image-render
Expected: build context에 _shared 포함됨 검증.
- Step 3: Push
git push origin main
- Step 4: 운영 deploy 시 주의사항 (수동)
NAS에서 컨테이너 재배포 시:
redis-cli -h 192.168.45.54 KEYS 'processing:*'로 기존 orphan 확인 — 있다면 worker_id 다르면 안 잡힘. 수동으로LMOVE해야 할 수도 있음.redis-cli -h 192.168.45.54 KEYS 'dead_letter:*'로 dead-letter 모니터 — 누적되면 alerting 필요.- WORKER_ID env로 unique 하게 (
WORKER_ID=insta-render-prod-1등) 권장 — hostname이 컨테이너 재기동 시 바뀌면 orphan 추적 안 됨.
Self-Review
- atomic dequeue:
BLMOVE단일 명령 — Redis 단일 트랜잭션 ✅ - ack on success:
LREM processing 1 raw— 정확 1개 ✅ - fail with retry: attempts < max → 재큐, attempts >= max → dead-letter ✅
- startup recovery: orphan 자동 재큐 (attempts 증가) ✅
- 4 worker 적용: insta/music/video/image 동일 패턴 ✅
- NAS producer 호환: LPUSH 그대로, payload schema에 attempts 선택적 ✅
미커버 (의도적):
- dead-letter monitor/alert — 운영 작업 (CHECK_POINT 백로그)
- worker_id env 미설정 시 hostname 변경 시 orphan 분실 — 운영 가이드에 명시
가정 검증:
redis-py.aioredis.blmove시그니처:(first_list, second_list, timeout, src='LEFT', dest='RIGHT'). redis>=5.0 권장.- fakeredis:
fakeredis.aioredis.FakeRedis(>=2.20.0) 가 BLMOVE 지원함 — 미지원 시 plan 적용 전 검증.
Execution Handoff
Plan complete and saved to docs/superpowers/plans/2026-05-25-render-queue-reliability.md.
1. Subagent-Driven (recommended) — Task 별 fresh subagent. 4개 worker는 패턴 같으나 dispatch table은 각 worker 고유 — subagent가 정확히 일관성 유지하도록 review checkpoint.
2. Inline Execution — 현 세션 실행.
박재오 결정 대기. Plan 1·2 마친 후 진입 권장 (작업량 가장 큼 — 4개 worker × 약 1시간 = 4시간).