Files
web-page/docs/superpowers/plans/2026-05-25-render-queue-reliability.md

25 KiB
Raw Blame History

Render Queue Reliability — Code Review F6 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 4개 render worker(insta/music/video/image-render)가 BLPOP 직후 crash 시 작업 손실되는 문제 해결. BLMOVE(또는 BRPOPLPUSH)로 atomic dequeue + processing list 패턴 + startup recovery + retry/dead-letter.

Architecture:

  1. 각 worker가 unique worker_id 보유: <queue>-<hostname>-<pid> (env로 override 가능).
  2. atomic dequeue: BLMOVE queue:<x>-render processing:<x>-render:<worker_id> RIGHT LEFT 5 — 5초 timeout. (BRPOPLPUSH는 Redis 6.2+ deprecated, BLMOVE가 후속).
  3. 작업 성공: LREM processing:<x>-render:<worker_id> 1 <payload> — 정확 1개 제거.
  4. 작업 실패: payload에 attempts counter 증가시켜 main queue 끝으로 LPUSH; 한계(기본 3) 초과 시 dead_letter:<queue> 로 이동.
  5. Startup recovery: worker 시작 시 자신의 processing list가 비어있지 않으면 → 모두 main queue로 되돌림 (재시도). attempts 증가.
  6. NAS측 producer는 무변경 (LPUSH 그대로). 단, payload schema에 attempts: int (optional) 필드 명시 — producer는 안 채워도 worker가 default 0으로.

Shared module 전략: 4개 worker가 동일 패턴이므로 services/_shared/reliable_queue.py 1개 만들고 각 Dockerfile에서 COPY services/_shared /app/_sharedfrom _shared.reliable_queue import ReliableQueue. compose entry/dockerfile 변경 4건. (DRY > inline 4중복.)

Tech Stack: Python 3.12, redis.asyncio 5.x, fakeredis (pytest dep), pytest-asyncio.

Working directory: C:\Users\jaeoh\Desktop\workspace\web-ai.


File Map

파일 변경 책임
services/_shared/__init__.py Create namespace package
services/_shared/reliable_queue.py Create ReliableQueue 클래스 — dequeue, ack, fail, recover
services/_shared/tests/test_reliable_queue.py Create fakeredis 단위 테스트 6개
services/_shared/requirements.txt Create redis>=5.0, fakeredis (test only)
services/insta-render/Dockerfile Modify COPY services/_shared /app/_shared + PYTHONPATH
services/insta-render/worker.py Modify L1~ BLPOP → ReliableQueue 사용
services/insta-render/tests/test_worker.py Append 1 integration test (recovery)
services/music-render/Dockerfile Modify shared copy
services/music-render/worker.py Modify ReliableQueue 사용
services/music-render/tests/test_worker.py Append recovery test
services/video-render/Dockerfile Modify shared copy
services/video-render/worker.py Modify ReliableQueue 사용
services/video-render/tests/test_worker.py Append recovery test
services/image-render/Dockerfile Modify shared copy
services/image-render/worker.py Modify ReliableQueue 사용
services/image-render/tests/test_worker.py Append recovery test
services/docker-compose.yml (있다면) Verify build context가 services/ 루트 포함하는지

Task 1: ReliableQueue 공유 모듈 작성

Files:

  • Create: services/_shared/__init__.py

  • Create: services/_shared/reliable_queue.py

  • Create: services/_shared/tests/__init__.py

  • Create: services/_shared/tests/test_reliable_queue.py

  • Create: services/_shared/requirements.txt

  • Step 1: Create namespace package

# services/_shared/__init__.py

(빈 파일)

# services/_shared/tests/__init__.py

(빈 파일)

  • Step 2: Write failing tests first
# services/_shared/tests/test_reliable_queue.py
"""F6 — ReliableQueue: atomic dequeue + recovery + retry."""
import json

import fakeredis.aioredis
import pytest

from _shared.reliable_queue import ReliableQueue


@pytest.fixture
async def redis():
    r = fakeredis.aioredis.FakeRedis(decode_responses=False)
    yield r
    await r.flushall()
    await r.aclose()


async def test_dequeue_atomically_moves_to_processing(redis):
    """BLMOVE: queue → processing 원자적 이동."""
    q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
    await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
    payload, raw = await q.dequeue(timeout=1)
    assert payload["task_id"] == "t1"
    # main queue는 비어있고, processing list에 들어있어야 함
    assert await redis.llen("queue:test") == 0
    assert await redis.llen("processing:queue:test:w1") == 1


async def test_dequeue_returns_none_on_timeout(redis):
    q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
    result = await q.dequeue(timeout=1)
    assert result is None


async def test_ack_removes_from_processing(redis):
    q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
    await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode())
    payload, raw = await q.dequeue(timeout=1)
    await q.ack(raw)
    assert await redis.llen("processing:queue:test:w1") == 0


async def test_recover_returns_orphaned_to_main_queue(redis):
    """startup recovery: 잔존 processing list 항목을 main queue로 되돌림."""
    # 이전 crash 시뮬레이션: processing list에 잔존
    orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode()
    await redis.lpush("processing:queue:test:w1", orphan)
    q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1")
    recovered = await q.recover()
    assert recovered == 1
    assert await redis.llen("processing:queue:test:w1") == 0
    # 다시 dequeue 가능
    payload, raw = await q.dequeue(timeout=1)
    assert payload["task_id"] == "t1"
    assert payload["attempts"] == 1  # incremented on recover


async def test_fail_below_max_attempts_returns_to_main_queue(redis):
    q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
    await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode())
    payload, raw = await q.dequeue(timeout=1)
    await q.fail(raw, payload)
    assert await redis.llen("processing:queue:test:w1") == 0
    assert await redis.llen("queue:test") == 1
    # attempts 증가됐는지
    requeued_raw = await redis.lindex("queue:test", 0)
    requeued = json.loads(requeued_raw)
    assert requeued["attempts"] == 1


async def test_fail_at_max_attempts_moves_to_dead_letter(redis):
    q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3)
    await redis.lpush(
        "queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode()
    )
    payload, raw = await q.dequeue(timeout=1)
    await q.fail(raw, payload)
    # attempts 2 → 3 (== max) → dead-letter
    assert await redis.llen("queue:test") == 0
    assert await redis.llen("processing:queue:test:w1") == 0
    assert await redis.llen("dead_letter:queue:test") == 1
  • Step 3: Add requirements
# services/_shared/requirements.txt
redis>=5.0.0

별도 dev requirements (test):

# services/_shared/tests/requirements-dev.txt (optional)
fakeredis>=2.20.0
pytest>=8.0.0
pytest-asyncio>=0.23.0
  • Step 4: Run tests to verify they fail
cd services/_shared && python -m pytest tests/ -v

Expected: ImportError — reliable_queue.py 미존재.

  • Step 5: Write reliable_queue.py
# services/_shared/reliable_queue.py
"""F6 — Reliable Redis queue with processing list + recovery + retry.

Pattern: BLMOVE main → processing (atomic), then either ack (LREM processing) or
fail (LREM processing + re-enqueue or dead-letter).

Startup recovery: any items left in the worker's processing list from a previous
crash are pushed back to main queue (with attempts incremented).
"""
from __future__ import annotations

import json
import logging
import os
import socket
from typing import Optional

logger = logging.getLogger(__name__)


def default_worker_id(queue_key: str) -> str:
    """env > hostname-pid."""
    explicit = os.getenv("WORKER_ID")
    if explicit:
        return explicit
    return f"{queue_key}-{socket.gethostname()}-{os.getpid()}"


class ReliableQueue:
    """Wraps a redis client to provide BLMOVE-backed atomic dequeue +
    processing list + retry/dead-letter.

    Producer side stays unchanged: LPUSH queue:<x> <json payload>.
    Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error.
    Startup: await queue.recover() to re-enqueue orphans.
    """

    def __init__(
        self,
        redis,
        queue_key: str,
        worker_id: Optional[str] = None,
        max_attempts: int = 3,
    ):
        self._redis = redis
        self._queue_key = queue_key
        self._worker_id = worker_id or default_worker_id(queue_key)
        self._processing_key = f"processing:{queue_key}:{self._worker_id}"
        self._dead_letter_key = f"dead_letter:{queue_key}"
        self._max_attempts = max_attempts

    @property
    def processing_key(self) -> str:
        return self._processing_key

    async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]:
        """Atomically move 1 item from main queue tail to processing head.

        Returns (parsed_dict, raw_bytes) or None on timeout.
        Caller MUST call ack(raw) on success or fail(raw, payload) on error.
        """
        raw = await self._redis.blmove(
            self._queue_key, self._processing_key,
            timeout=timeout, src="RIGHT", dest="LEFT",
        )
        if raw is None:
            return None
        try:
            payload = json.loads(raw)
        except json.JSONDecodeError:
            logger.error("invalid payload on dequeue, moving to dead-letter: %r", raw[:200])
            await self._redis.lrem(self._processing_key, 1, raw)
            await self._redis.lpush(self._dead_letter_key, raw)
            return None
        return payload, raw

    async def ack(self, raw: bytes) -> None:
        """Successful processing — remove from processing list."""
        removed = await self._redis.lrem(self._processing_key, 1, raw)
        if removed == 0:
            logger.warning("ack on missing payload (already removed?): %r", raw[:100])

    async def fail(self, raw: bytes, payload: dict) -> None:
        """Failed processing — remove from processing list and either re-enqueue or dead-letter."""
        await self._redis.lrem(self._processing_key, 1, raw)
        attempts = int(payload.get("attempts", 0)) + 1
        if attempts >= self._max_attempts:
            payload["attempts"] = attempts
            await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
            logger.error(
                "task moved to dead-letter after %d attempts: task_id=%s",
                attempts, payload.get("task_id"),
            )
            return
        payload["attempts"] = attempts
        await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
        logger.info(
            "task re-enqueued (attempt %d/%d): task_id=%s",
            attempts, self._max_attempts, payload.get("task_id"),
        )

    async def recover(self) -> int:
        """Startup: move all orphans from this worker's processing list back to main queue.

        Increments attempts counter (orphan == implicit failure).
        Returns count of recovered items.
        """
        count = 0
        while True:
            raw = await self._redis.lpop(self._processing_key)
            if raw is None:
                break
            try:
                payload = json.loads(raw)
            except json.JSONDecodeError:
                await self._redis.lpush(self._dead_letter_key, raw)
                count += 1
                continue
            payload["attempts"] = int(payload.get("attempts", 0)) + 1
            if payload["attempts"] >= self._max_attempts:
                await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode())
            else:
                await self._redis.lpush(self._queue_key, json.dumps(payload).encode())
            count += 1
        if count:
            logger.info("recovered %d orphaned items for worker %s", count, self._worker_id)
        return count

참고: redis-py blmove API: client.blmove(first_list, second_list, timeout, src=..., dest=...). timeout=0 은 block forever. payload는 bytes로 받음 (decode_responses=False 가정).

  • Step 6: Run tests to verify they pass
cd services/_shared && python -m pytest tests/ -v

Expected: 6 PASS.

만약 ImportError (fakeredis 미설치) 발생 시:

python -m pip install fakeredis pytest-asyncio

또한 pytest.ini 또는 conftest.pyasyncio_mode = "auto" 필요. 신규 conftest:

# services/_shared/tests/conftest.py
import pytest
pytest_plugins = ["pytest_asyncio"]


def pytest_collection_modifyitems(config, items):
    for item in items:
        if "asyncio" in item.fixturenames or item.get_closest_marker("asyncio") is not None:
            continue
        # auto-mark all async tests
        if item.function.__name__.startswith("test_"):
            import asyncio, inspect
            if inspect.iscoroutinefunction(item.function):
                item.add_marker(pytest.mark.asyncio)

또는 더 간단히 services/_shared/pytest.ini:

[pytest]
asyncio_mode = auto
  • Step 7: Commit
git add services/_shared/
git commit -m "feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)"

Task 2: insta-render에 ReliableQueue 적용

Files:

  • Modify: services/insta-render/Dockerfile

  • Modify: services/insta-render/worker.py

  • Modify: services/insta-render/tests/test_worker.py (append)

  • Step 1: Update Dockerfile

services/insta-render/Dockerfile_shared 복사 추가. 기존 Dockerfile 패턴을 먼저 읽고, COPY services/insta-render /app 같은 라인이 있다면 그 위 또는 옆에:

COPY services/_shared /app/_shared
ENV PYTHONPATH=/app:/app/_shared:${PYTHONPATH}

build context가 services/ 루트여야 함. compose에서 build: { context: ./services, dockerfile: insta-render/Dockerfile } 인지 확인 — 아니라면 context 조정 필요.

  • Step 2: Modify worker.py — failing test first

services/insta-render/tests/test_worker.py 끝에 추가:

import json
from unittest.mock import AsyncMock, patch
import pytest


@pytest.mark.asyncio
async def test_worker_calls_ack_on_success():
    """성공 시 ack() 호출 (F6)."""
    import worker
    fake_payload = {"task_id": "t1", "job_type": "card_generation", "params": {}}
    fake_raw = json.dumps(fake_payload).encode()

    fake_queue = AsyncMock()
    fake_queue.dequeue = AsyncMock(side_effect=[(fake_payload, fake_raw), None])
    fake_queue.ack = AsyncMock()
    fake_queue.fail = AsyncMock()
    fake_queue.recover = AsyncMock(return_value=0)

    with patch.object(worker, "ReliableQueue", return_value=fake_queue), \
         patch.object(worker, "_dispatch") as disp:
        # poll_once로 1 cycle만 실행 (실제 loop 끊기 위해)
        await worker.poll_once(fake_queue)
        disp.assert_called_once()
        fake_queue.ack.assert_called_once_with(fake_raw)
        fake_queue.fail.assert_not_called()


@pytest.mark.asyncio
async def test_worker_calls_fail_on_dispatch_exception():
    """dispatch 예외 시 fail() 호출 — 작업 손실 안 됨 (F6)."""
    import worker
    fake_payload = {"task_id": "t2", "job_type": "card_generation", "params": {}}
    fake_raw = json.dumps(fake_payload).encode()

    fake_queue = AsyncMock()
    fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw))
    fake_queue.ack = AsyncMock()
    fake_queue.fail = AsyncMock()

    with patch.object(worker, "_dispatch", side_effect=RuntimeError("boom")):
        await worker.poll_once(fake_queue)
        fake_queue.fail.assert_called_once_with(fake_raw, fake_payload)
        fake_queue.ack.assert_not_called()
  • Step 3: Run test to fail
cd services/insta-render && python -m pytest tests/ -v -k "ack_on_success or fail_on_dispatch"

Expected: AttributeError (worker.poll_once 미존재, worker.ReliableQueue 미존재).

  • Step 4: Rewrite insta-render worker.py
"""Redis 기반 worker — F6 신뢰성 패턴 적용 (BLMOVE + processing list + retry)."""
from __future__ import annotations

import asyncio
import logging
import os
import sys

import redis.asyncio as aioredis

from _shared.reliable_queue import ReliableQueue
from nas_client import webhook_update_task
# 기존 dispatch 대상 import 유지
from card_renderer import render_card

logger = logging.getLogger(__name__)

REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:insta-render"
PAUSED_KEY = "queue:paused"


_DISPATCH_TABLE = {
    "card_generation": "render_card",
}


def _dispatch(payload: dict) -> None:
    job_type = payload.get("job_type", "")
    task_id = payload.get("task_id", "")
    params = payload.get("params", {})
    fn_name = _DISPATCH_TABLE.get(job_type)
    if fn_name is None:
        logger.error("unknown job_type=%s task=%s", job_type, task_id)
        webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
        return
    try:
        fn = getattr(sys.modules[__name__], fn_name)
    except AttributeError:
        webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
        return
    fn(task_id, params)


async def poll_once(queue: ReliableQueue) -> bool:
    """1 cycle: dequeue → dispatch → ack/fail. Returns True if a job was handled."""
    result = await queue.dequeue(timeout=5)
    if result is None:
        return False
    payload, raw = result
    try:
        await asyncio.to_thread(_dispatch, payload)
    except Exception:
        logger.exception("dispatch failed task_id=%s", payload.get("task_id"))
        await queue.fail(raw, payload)
        return True
    await queue.ack(raw)
    return True


async def worker_loop():
    redis = aioredis.from_url(REDIS_URL, decode_responses=False)
    queue = ReliableQueue(redis, queue_key=QUEUE_KEY)
    logger.info("insta-render worker started worker_id=%s", queue._worker_id)
    # F6: startup recovery
    try:
        recovered = await queue.recover()
        if recovered:
            logger.info("recovered %d orphaned items at startup", recovered)
    except Exception:
        logger.exception("startup recover failed")
    while True:
        try:
            paused = await redis.get(PAUSED_KEY)
            if paused == b"1":
                await asyncio.sleep(10)
                continue
            await poll_once(queue)
        except asyncio.CancelledError:
            logger.info("worker_loop cancelled")
            raise
        except Exception:
            logger.exception("worker_loop iteration 실패, 5초 후 재시도")
            await asyncio.sleep(5)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(worker_loop())

NOTE: 기존 insta-render/worker.py의 dispatch table·import는 실제 파일을 보고 매핑 유지. 위 예시는 minimal — job_type / function 이름은 기존 파일과 맞춰야 함. 변경 전 Read services/insta-render/worker.py로 정확한 dispatch table 확인할 것.

  • Step 5: Run tests
cd services/insta-render && python -m pytest tests/ -v

Expected: 신규 2 PASS, 기존 PASS (dispatch table test 등).

  • Step 6: Commit
git add services/insta-render/
git commit -m "fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)"

Task 3: music-render에 동일 적용

Files:

  • Modify: services/music-render/Dockerfile, worker.py

  • Modify: services/music-render/tests/test_worker.py (append)

  • Step 1: Dockerfile에 COPY services/_shared 추가

  • Step 2: Test 추가 (Task 2 패턴 동일, 단 dispatch target은 run_suno_generation 등 기존 패턴)

@pytest.mark.asyncio
async def test_music_worker_ack_on_success():
    import worker
    payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}}
    raw = json.dumps(payload).encode()
    fake_queue = AsyncMock()
    fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
    fake_queue.ack = AsyncMock()
    with patch.object(worker, "_dispatch"):
        await worker.poll_once(fake_queue)
        fake_queue.ack.assert_called_once_with(raw)


@pytest.mark.asyncio
async def test_music_worker_fail_on_exception():
    import worker
    payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}}
    raw = json.dumps(payload).encode()
    fake_queue = AsyncMock()
    fake_queue.dequeue = AsyncMock(return_value=(payload, raw))
    fake_queue.fail = AsyncMock()
    with patch.object(worker, "_dispatch", side_effect=RuntimeError("x")):
        await worker.poll_once(fake_queue)
        fake_queue.fail.assert_called_once_with(raw, payload)
  • Step 3: Run test to fail
  • Step 4: Rewrite music-render worker.py — worker_loop 구조는 insta-render와 동일, _dispatch + _DISPATCH_TABLE은 기존 12개 함수 그대로 유지
  • Step 5: Run tests
  • Step 6: Commit
git add services/music-render/
git commit -m "fix(music-render): F6 ReliableQueue 적용 (F6 part 3)"

Task 4: video-render에 동일 적용

(Task 3와 동일 패턴 — sora/veo/kling/seedance 4 provider table 유지)

  • Step 1: Dockerfile 수정
  • Step 2: 신규 test 2개 추가 (test_video_worker_ack_on_success, test_video_worker_fail_on_exception) — job_type은 sora_generation
  • Step 3: Run failing test
  • Step 4: Rewrite worker.py — 동일 패턴
  • Step 5: Run tests
  • Step 6: Commit
git add services/video-render/
git commit -m "fix(video-render): F6 ReliableQueue 적용 (F6 part 4)"

Task 5: image-render에 동일 적용

(gpt_image / nano_banana / flux 3 provider table 유지)

  • Step 1-6: Task 3/4 동일 패턴
git add services/image-render/
git commit -m "fix(image-render): F6 ReliableQueue 적용 (F6 part 5)"

Task 6: 운영 검증 + push

  • Step 1: 전체 services test 실행
cd services && for d in _shared insta-render music-render video-render image-render; do
  echo "--- $d ---"
  (cd $d && python -m pytest tests/ -q) || true
done

(또는 PowerShell:)

foreach ($d in @("_shared","insta-render","music-render","video-render","image-render")) {
  Write-Output "--- $d ---"
  Push-Location services/$d
  python -m pytest tests/ -q
  Pop-Location
}

Expected: 4개 worker 각 신규 2개 + _shared 6개 + 기존 test 전부 PASS.

  • Step 2: Docker build 시뮬 (옵션, 시간 허용 시)
cd services && docker compose build insta-render music-render video-render image-render

Expected: build context에 _shared 포함됨 검증.

  • Step 3: Push
git push origin main
  • Step 4: 운영 deploy 시 주의사항 (수동)

NAS에서 컨테이너 재배포 시:

  1. redis-cli -h 192.168.45.54 KEYS 'processing:*' 로 기존 orphan 확인 — 있다면 worker_id 다르면 안 잡힘. 수동으로 LMOVE 해야 할 수도 있음.
  2. redis-cli -h 192.168.45.54 KEYS 'dead_letter:*' 로 dead-letter 모니터 — 누적되면 alerting 필요.
  3. WORKER_ID env로 unique 하게 (WORKER_ID=insta-render-prod-1 등) 권장 — hostname이 컨테이너 재기동 시 바뀌면 orphan 추적 안 됨.

Self-Review

  1. atomic dequeue: BLMOVE 단일 명령 — Redis 단일 트랜잭션
  2. ack on success: LREM processing 1 raw — 정확 1개
  3. fail with retry: attempts < max → 재큐, attempts >= max → dead-letter
  4. startup recovery: orphan 자동 재큐 (attempts 증가)
  5. 4 worker 적용: insta/music/video/image 동일 패턴
  6. NAS producer 호환: LPUSH 그대로, payload schema에 attempts 선택적

미커버 (의도적):

  • dead-letter monitor/alert — 운영 작업 (CHECK_POINT 백로그)
  • worker_id env 미설정 시 hostname 변경 시 orphan 분실 — 운영 가이드에 명시

가정 검증:

  • redis-py.aioredis.blmove 시그니처: (first_list, second_list, timeout, src='LEFT', dest='RIGHT'). redis>=5.0 권장.
  • fakeredis: fakeredis.aioredis.FakeRedis (>=2.20.0) 가 BLMOVE 지원함 — 미지원 시 plan 적용 전 검증.

Execution Handoff

Plan complete and saved to docs/superpowers/plans/2026-05-25-render-queue-reliability.md.

1. Subagent-Driven (recommended) — Task 별 fresh subagent. 4개 worker는 패턴 같으나 dispatch table은 각 worker 고유 — subagent가 정확히 일관성 유지하도록 review checkpoint.

2. Inline Execution — 현 세션 실행.

박재오 결정 대기. Plan 1·2 마친 후 진입 권장 (작업량 가장 큼 — 4개 worker × 약 1시간 = 4시간).