Files
web-page/docs/superpowers/plans/2026-05-25-render-queue-reliability.md

705 lines
25 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Render Queue Reliability — Code Review F6 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 4개 render worker(insta/music/video/image-render)가 BLPOP 직후 crash 시 작업 손실되는 문제 해결. BLMOVE(또는 BRPOPLPUSH)로 atomic dequeue + processing list 패턴 + startup recovery + retry/dead-letter.
**Architecture:**
1. 각 worker가 unique `worker_id` 보유: `<queue>-<hostname>-<pid>` (env로 override 가능).
2. atomic dequeue: `BLMOVE queue:<x>-render processing:<x>-render:<worker_id> RIGHT LEFT 5` — 5초 timeout. (`BRPOPLPUSH`는 Redis 6.2+ deprecated, `BLMOVE`가 후속).
3. 작업 성공: `LREM processing:<x>-render:<worker_id> 1 <payload>` — 정확 1개 제거.
4. 작업 실패: payload에 `attempts` counter 증가시켜 main queue 끝으로 LPUSH; 한계(기본 3) 초과 시 `dead_letter:<queue>` 로 이동.
5. **Startup recovery**: worker 시작 시 자신의 processing list가 비어있지 않으면 → 모두 main queue로 되돌림 (재시도). attempts 증가.
6. NAS측 producer는 무변경 (LPUSH 그대로). 단, payload schema에 `attempts: int` (optional) 필드 명시 — producer는 안 채워도 worker가 default 0으로.
**Shared module 전략:** 4개 worker가 동일 패턴이므로 `services/_shared/reliable_queue.py` 1개 만들고 각 Dockerfile에서 `COPY services/_shared /app/_shared``from _shared.reliable_queue import ReliableQueue`. compose entry/dockerfile 변경 4건. (DRY > inline 4중복.)
**Tech Stack:** Python 3.12, redis.asyncio 5.x, fakeredis (pytest dep), pytest-asyncio.
**Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`.
---
## File Map
| 파일 | 변경 | 책임 |
|------|------|------|
| `services/_shared/__init__.py` | Create | namespace package |
| `services/_shared/reliable_queue.py` | Create | `ReliableQueue` 클래스 — dequeue, ack, fail, recover |
| `services/_shared/tests/test_reliable_queue.py` | Create | fakeredis 단위 테스트 6개 |
| `services/_shared/requirements.txt` | Create | redis>=5.0, fakeredis (test only) |
| `services/insta-render/Dockerfile` | Modify | `COPY services/_shared /app/_shared` + PYTHONPATH |
| `services/insta-render/worker.py` | Modify L1~ | BLPOP → ReliableQueue 사용 |
| `services/insta-render/tests/test_worker.py` | Append | 1 integration test (recovery) |
| `services/music-render/Dockerfile` | Modify | shared copy |
| `services/music-render/worker.py` | Modify | ReliableQueue 사용 |
| `services/music-render/tests/test_worker.py` | Append | recovery test |
| `services/video-render/Dockerfile` | Modify | shared copy |
| `services/video-render/worker.py` | Modify | ReliableQueue 사용 |
| `services/video-render/tests/test_worker.py` | Append | recovery test |
| `services/image-render/Dockerfile` | Modify | shared copy |
| `services/image-render/worker.py` | Modify | ReliableQueue 사용 |
| `services/image-render/tests/test_worker.py` | Append | recovery test |
| `services/docker-compose.yml` (있다면) | Verify | build context가 services/ 루트 포함하는지 |
---
## Task 1: ReliableQueue 공유 모듈 작성
**Files:**
- Create: `services/_shared/__init__.py`
- Create: `services/_shared/reliable_queue.py`
- Create: `services/_shared/tests/__init__.py`
- Create: `services/_shared/tests/test_reliable_queue.py`
- Create: `services/_shared/requirements.txt`
- [ ] **Step 1: Create namespace package**
```python
# services/_shared/__init__.py
```
(빈 파일)
```python
# services/_shared/tests/__init__.py
```
(빈 파일)
- [ ] **Step 2: Write failing tests first**
```python
# services/_shared/tests/test_reliable_queue.py
"""F6 — ReliableQueue: atomic dequeue + recovery + retry."""
import json
import fakeredis.aioredis
import pytest
from _shared.reliable_queue import ReliableQueue
@pytest.fixture
async def redis():
r = fakeredis.aioredis.FakeRedis(decode_responses=False)
yield r
await r.flushall()
await r.aclose()
async def test_dequeue_atomically_moves_to_processing(redis):
"""BLMOVE: queue → processing 원자적 이동."""
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
payload, raw = await q.dequeue(timeout=1)
assert payload["task_id"] == "t1"
# main queue는 비어있고, processing list에 들어있어야 함
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 1
async def test_dequeue_returns_none_on_timeout(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
result = await q.dequeue(timeout=1)
assert result is None
async def test_ack_removes_from_processing(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
payload, raw = await q.dequeue(timeout=1)
await q.ack(raw)
assert await redis.llen("processing:queue:test:w1") == 0
async def test_recover_returns_orphaned_to_main_queue(redis):
"""startup recovery: 잔존 processing list 항목을 main queue로 되돌림."""
# 이전 crash 시뮬레이션: processing list에 잔존
orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode()
await redis.lpush("processing:queue:test:w1", orphan)
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
recovered = await q.recover()
assert recovered == 1
assert await redis.llen("processing:queue:test:w1") == 0
# 다시 dequeue 가능
payload, raw = await q.dequeue(timeout=1)
assert payload["task_id"] == "t1"
assert payload["attempts"] == 1 # incremented on recover
async def test_fail_below_max_attempts_returns_to_main_queue(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode())
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("queue:test") == 1
# attempts 증가됐는지
requeued_raw = await redis.lindex("queue:test", 0)
requeued = json.loads(requeued_raw)
assert requeued["attempts"] == 1
async def test_fail_at_max_attempts_moves_to_dead_letter(redis):
q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
await redis.lpush(
"queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode()
)
payload, raw = await q.dequeue(timeout=1)
await q.fail(raw, payload)
# attempts 2 → 3 (== max) → dead-letter
assert await redis.llen("queue:test") == 0
assert await redis.llen("processing:queue:test:w1") == 0
assert await redis.llen("dead_letter:queue:test") == 1
```
- [ ] **Step 3: Add requirements**
```text
# services/_shared/requirements.txt
redis>=5.0.0
```
별도 dev requirements (test):
```text
# services/_shared/tests/requirements-dev.txt (optional)
fakeredis>=2.20.0
pytest>=8.0.0
pytest-asyncio>=0.23.0
```
- [ ] **Step 4: Run tests to verify they fail**
```
cd services/_shared && python -m pytest tests/ -v
```
Expected: ImportError — reliable_queue.py 미존재.
- [ ] **Step 5: Write reliable_queue.py**
```python
# services/_shared/reliable_queue.py
"""F6 — Reliable Redis queue with processing list + recovery + retry.
Pattern: BLMOVE main → processing (atomic), then either ack (LREM processing) or
fail (LREM processing + re-enqueue or dead-letter).
Startup recovery: any items left in the worker's processing list from a previous
crash are pushed back to main queue (with attempts incremented).
"""
from __future__ import annotations
import json
import logging
import os
import socket
from typing import Optional
logger = logging.getLogger(__name__)
def default_worker_id(queue_key: str) -> str:
"""env > hostname-pid."""
explicit = os.getenv("WORKER_ID")
if explicit:
return explicit
return f"{queue_key}-{socket.gethostname()}-{os.getpid()}"
class ReliableQueue:
"""Wraps a redis client to provide BLMOVE-backed atomic dequeue +
processing list + retry/dead-letter.
Producer side stays unchanged: LPUSH queue:<x> <json payload>.
Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error.
Startup: await queue.recover() to re-enqueue orphans.
"""
def __init__(
self,
redis,
queue_key: str,
worker_id: Optional[str] = None,
max_attempts: int = 3,
):
self._redis = redis
self._queue_key = queue_key
self._worker_id = worker_id or default_worker_id(queue_key)
self._processing_key = f"processing:{queue_key}:{self._worker_id}"
self._dead_letter_key = f"dead_letter:{queue_key}"
self._max_attempts = max_attempts
@property
def processing_key(self) -> str:
return self._processing_key
async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]:
"""Atomically move 1 item from main queue tail to processing head.
Returns (parsed_dict, raw_bytes) or None on timeout.
Caller MUST call ack(raw) on success or fail(raw, payload) on error.
"""
raw = await self._redis.blmove(
self._queue_key, self._processing_key,
timeout=timeout, src="RIGHT", dest="LEFT",
)
if raw is None:
return None
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid payload on dequeue, moving to dead-letter: %r", raw[:200])
await self._redis.lrem(self._processing_key, 1, raw)
await self._redis.lpush(self._dead_letter_key, raw)
return None
return payload, raw
async def ack(self, raw: bytes) -> None:
"""Successful processing — remove from processing list."""
removed = await self._redis.lrem(self._processing_key, 1, raw)
if removed == 0:
logger.warning("ack on missing payload (already removed?): %r", raw[:100])
async def fail(self, raw: bytes, payload: dict) -> None:
"""Failed processing — remove from processing list and either re-enqueue or dead-letter."""
await self._redis.lrem(self._processing_key, 1, raw)
attempts = int(payload.get("attempts", 0)) + 1
if attempts >= self._max_attempts:
payload["attempts"] = attempts
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
logger.error(
"task moved to dead-letter after %d attempts: task_id=%s",
attempts, payload.get("task_id"),
)
return
payload["attempts"] = attempts
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
logger.info(
"task re-enqueued (attempt %d/%d): task_id=%s",
attempts, self._max_attempts, payload.get("task_id"),
)
async def recover(self) -> int:
"""Startup: move all orphans from this worker's processing list back to main queue.
Increments attempts counter (orphan == implicit failure).
Returns count of recovered items.
"""
count = 0
while True:
raw = await self._redis.lpop(self._processing_key)
if raw is None:
break
try:
payload = json.loads(raw)
except json.JSONDecodeError:
await self._redis.lpush(self._dead_letter_key, raw)
count += 1
continue
payload["attempts"] = int(payload.get("attempts", 0)) + 1
if payload["attempts"] >= self._max_attempts:
await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
else:
await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
count += 1
if count:
logger.info("recovered %d orphaned items for worker %s", count, self._worker_id)
return count
```
**참고: redis-py blmove API**: `client.blmove(first_list, second_list, timeout, src=..., dest=...)`. timeout=0 은 block forever. payload는 bytes로 받음 (`decode_responses=False` 가정).
- [ ] **Step 6: Run tests to verify they pass**
```
cd services/_shared && python -m pytest tests/ -v
```
Expected: 6 PASS.
만약 ImportError (`fakeredis` 미설치) 발생 시:
```
python -m pip install fakeredis pytest-asyncio
```
또한 `pytest.ini` 또는 `conftest.py``asyncio_mode = "auto"` 필요. 신규 conftest:
```python
# services/_shared/tests/conftest.py
import pytest
pytest_plugins = ["pytest_asyncio"]
def pytest_collection_modifyitems(config, items):
for item in items:
if "asyncio" in item.fixturenames or item.get_closest_marker("asyncio") is not None:
continue
# auto-mark all async tests
if item.function.__name__.startswith("test_"):
import asyncio, inspect
if inspect.iscoroutinefunction(item.function):
item.add_marker(pytest.mark.asyncio)
```
또는 더 간단히 `services/_shared/pytest.ini`:
```ini
[pytest]
asyncio_mode = auto
```
- [ ] **Step 7: Commit**
```bash
git add services/_shared/
git commit -m "feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)"
```
---
## Task 2: insta-render에 ReliableQueue 적용
**Files:**
- Modify: `services/insta-render/Dockerfile`
- Modify: `services/insta-render/worker.py`
- Modify: `services/insta-render/tests/test_worker.py` (append)
- [ ] **Step 1: Update Dockerfile**
`services/insta-render/Dockerfile``_shared` 복사 추가. 기존 Dockerfile 패턴을 먼저 읽고, `COPY services/insta-render /app` 같은 라인이 있다면 그 위 또는 옆에:
```dockerfile
COPY services/_shared /app/_shared
ENV PYTHONPATH=/app:/app/_shared:${PYTHONPATH}
```
build context가 `services/` 루트여야 함. compose에서 `build: { context: ./services, dockerfile: insta-render/Dockerfile }` 인지 확인 — 아니라면 context 조정 필요.
- [ ] **Step 2: Modify worker.py — failing test first**
`services/insta-render/tests/test_worker.py` 끝에 추가:
```python
import json
from unittest.mock import AsyncMock, patch
import pytest
@pytest.mark.asyncio
async def test_worker_calls_ack_on_success():
"""성공 시 ack() 호출 (F6)."""
import worker
fake_payload = {"task_id": "t1", "job_type": "card_generation", "params": {}}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(side_effect=[(fake_payload, fake_raw), None])
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
fake_queue.recover = AsyncMock(return_value=0)
with patch.object(worker, "ReliableQueue", return_value=fake_queue), \
patch.object(worker, "_dispatch") as disp:
# poll_once로 1 cycle만 실행 (실제 loop 끊기 위해)
await worker.poll_once(fake_queue)
disp.assert_called_once()
fake_queue.ack.assert_called_once_with(fake_raw)
fake_queue.fail.assert_not_called()
@pytest.mark.asyncio
async def test_worker_calls_fail_on_dispatch_exception():
"""dispatch 예외 시 fail() 호출 — 작업 손실 안 됨 (F6)."""
import worker
fake_payload = {"task_id": "t2", "job_type": "card_generation", "params": {}}
fake_raw = json.dumps(fake_payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
fake_queue.ack = AsyncMock()
fake_queue.fail = AsyncMock()
with patch.object(worker, "_dispatch", side_effect=RuntimeError("boom")):
await worker.poll_once(fake_queue)
fake_queue.fail.assert_called_once_with(fake_raw, fake_payload)
fake_queue.ack.assert_not_called()
```
- [ ] **Step 3: Run test to fail**
```
cd services/insta-render && python -m pytest tests/ -v -k "ack_on_success or fail_on_dispatch"
```
Expected: AttributeError (`worker.poll_once` 미존재, `worker.ReliableQueue` 미존재).
- [ ] **Step 4: Rewrite insta-render worker.py**
```python
"""Redis 기반 worker — F6 신뢰성 패턴 적용 (BLMOVE + processing list + retry)."""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import redis.asyncio as aioredis
from _shared.reliable_queue import ReliableQueue
from nas_client import webhook_update_task
# 기존 dispatch 대상 import 유지
from card_renderer import render_card
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:insta-render"
PAUSED_KEY = "queue:paused"
_DISPATCH_TABLE = {
"card_generation": "render_card",
}
def _dispatch(payload: dict) -> None:
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(sys.modules[__name__], fn_name)
except AttributeError:
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def poll_once(queue: ReliableQueue) -> bool:
"""1 cycle: dequeue → dispatch → ack/fail. Returns True if a job was handled."""
result = await queue.dequeue(timeout=5)
if result is None:
return False
payload, raw = result
try:
await asyncio.to_thread(_dispatch, payload)
except Exception:
logger.exception("dispatch failed task_id=%s", payload.get("task_id"))
await queue.fail(raw, payload)
return True
await queue.ack(raw)
return True
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
logger.info("insta-render worker started worker_id=%s", queue._worker_id)
# F6: startup recovery
try:
recovered = await queue.recover()
if recovered:
logger.info("recovered %d orphaned items at startup", recovered)
except Exception:
logger.exception("startup recover failed")
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
await poll_once(queue)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(worker_loop())
```
**NOTE**: 기존 `insta-render/worker.py`의 dispatch table·import는 실제 파일을 보고 매핑 유지. 위 예시는 minimal — job_type / function 이름은 기존 파일과 맞춰야 함. 변경 전 `Read services/insta-render/worker.py`로 정확한 dispatch table 확인할 것.
- [ ] **Step 5: Run tests**
```
cd services/insta-render && python -m pytest tests/ -v
```
Expected: 신규 2 PASS, 기존 PASS (dispatch table test 등).
- [ ] **Step 6: Commit**
```bash
git add services/insta-render/
git commit -m "fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)"
```
---
## Task 3: music-render에 동일 적용
**Files:**
- Modify: `services/music-render/Dockerfile`, `worker.py`
- Modify: `services/music-render/tests/test_worker.py` (append)
- [ ] **Step 1: Dockerfile에 `COPY services/_shared` 추가**
- [ ] **Step 2: Test 추가 (Task 2 패턴 동일, 단 dispatch target은 `run_suno_generation` 등 기존 패턴)**
```python
@pytest.mark.asyncio
async def test_music_worker_ack_on_success():
import worker
payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.ack = AsyncMock()
with patch.object(worker, "_dispatch"):
await worker.poll_once(fake_queue)
fake_queue.ack.assert_called_once_with(raw)
@pytest.mark.asyncio
async def test_music_worker_fail_on_exception():
import worker
payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}}
raw = json.dumps(payload).encode()
fake_queue = AsyncMock()
fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
fake_queue.fail = AsyncMock()
with patch.object(worker, "_dispatch", side_effect=RuntimeError("x")):
await worker.poll_once(fake_queue)
fake_queue.fail.assert_called_once_with(raw, payload)
```
- [ ] **Step 3: Run test to fail**
- [ ] **Step 4: Rewrite music-render worker.py — `worker_loop` 구조는 insta-render와 동일, `_dispatch` + `_DISPATCH_TABLE`은 기존 12개 함수 그대로 유지**
- [ ] **Step 5: Run tests**
- [ ] **Step 6: Commit**
```bash
git add services/music-render/
git commit -m "fix(music-render): F6 ReliableQueue 적용 (F6 part 3)"
```
---
## Task 4: video-render에 동일 적용
(Task 3와 동일 패턴 — sora/veo/kling/seedance 4 provider table 유지)
- [ ] **Step 1: Dockerfile 수정**
- [ ] **Step 2: 신규 test 2개 추가 (`test_video_worker_ack_on_success`, `test_video_worker_fail_on_exception`) — job_type은 `sora_generation`**
- [ ] **Step 3: Run failing test**
- [ ] **Step 4: Rewrite worker.py — 동일 패턴**
- [ ] **Step 5: Run tests**
- [ ] **Step 6: Commit**
```bash
git add services/video-render/
git commit -m "fix(video-render): F6 ReliableQueue 적용 (F6 part 4)"
```
---
## Task 5: image-render에 동일 적용
(gpt_image / nano_banana / flux 3 provider table 유지)
- [ ] **Step 1-6: Task 3/4 동일 패턴**
```bash
git add services/image-render/
git commit -m "fix(image-render): F6 ReliableQueue 적용 (F6 part 5)"
```
---
## Task 6: 운영 검증 + push
- [ ] **Step 1: 전체 services test 실행**
```
cd services && for d in _shared insta-render music-render video-render image-render; do
echo "--- $d ---"
(cd $d && python -m pytest tests/ -q) || true
done
```
(또는 PowerShell:)
```powershell
foreach ($d in @("_shared","insta-render","music-render","video-render","image-render")) {
Write-Output "--- $d ---"
Push-Location services/$d
python -m pytest tests/ -q
Pop-Location
}
```
Expected: 4개 worker 각 신규 2개 + _shared 6개 + 기존 test 전부 PASS.
- [ ] **Step 2: Docker build 시뮬 (옵션, 시간 허용 시)**
```
cd services && docker compose build insta-render music-render video-render image-render
```
Expected: build context에 `_shared` 포함됨 검증.
- [ ] **Step 3: Push**
```bash
git push origin main
```
- [ ] **Step 4: 운영 deploy 시 주의사항 (수동)**
NAS에서 컨테이너 재배포 시:
1. `redis-cli -h 192.168.45.54 KEYS 'processing:*'` 로 기존 orphan 확인 — 있다면 worker_id 다르면 안 잡힘. 수동으로 `LMOVE` 해야 할 수도 있음.
2. `redis-cli -h 192.168.45.54 KEYS 'dead_letter:*'` 로 dead-letter 모니터 — 누적되면 alerting 필요.
3. WORKER_ID env로 unique 하게 (`WORKER_ID=insta-render-prod-1` 등) 권장 — hostname이 컨테이너 재기동 시 바뀌면 orphan 추적 안 됨.
---
## Self-Review
1. **atomic dequeue**: `BLMOVE` 단일 명령 — Redis 단일 트랜잭션 ✅
2. **ack on success**: `LREM processing 1 raw` — 정확 1개 ✅
3. **fail with retry**: attempts < max → 재큐, attempts >= max → dead-letter ✅
4. **startup recovery**: orphan 자동 재큐 (attempts 증가) ✅
5. **4 worker 적용**: insta/music/video/image 동일 패턴 ✅
6. **NAS producer 호환**: LPUSH 그대로, payload schema에 attempts 선택적 ✅
**미커버 (의도적)**:
- dead-letter monitor/alert — 운영 작업 (CHECK_POINT 백로그)
- worker_id env 미설정 시 hostname 변경 시 orphan 분실 — 운영 가이드에 명시
**가정 검증**:
- `redis-py.aioredis.blmove` 시그니처: `(first_list, second_list, timeout, src='LEFT', dest='RIGHT')`. redis>=5.0 권장.
- fakeredis: `fakeredis.aioredis.FakeRedis` (>=2.20.0) 가 BLMOVE 지원함 — 미지원 시 plan 적용 전 검증.
---
## Execution Handoff
**Plan complete and saved to `docs/superpowers/plans/2026-05-25-render-queue-reliability.md`.**
**1. Subagent-Driven (recommended)** — Task 별 fresh subagent. 4개 worker는 패턴 같으나 dispatch table은 각 worker 고유 — subagent가 정확히 일관성 유지하도록 review checkpoint.
**2. Inline Execution** — 현 세션 실행.
박재오 결정 대기. Plan 1·2 마친 후 진입 권장 (작업량 가장 큼 — 4개 worker × 약 1시간 = 4시간).