# Render Queue Reliability — Code Review F6 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 4개 render worker(insta/music/video/image-render)가 BLPOP 직후 crash 시 작업 손실되는 문제 해결. BLMOVE(또는 BRPOPLPUSH)로 atomic dequeue + processing list 패턴 + startup recovery + retry/dead-letter. **Architecture:** 1. 각 worker가 unique `worker_id` 보유: `--` (env로 override 가능). 2. atomic dequeue: `BLMOVE queue:-render processing:-render: RIGHT LEFT 5` — 5초 timeout. (`BRPOPLPUSH`는 Redis 6.2+ deprecated, `BLMOVE`가 후속). 3. 작업 성공: `LREM processing:-render: 1 ` — 정확 1개 제거. 4. 작업 실패: payload에 `attempts` counter 증가시켜 main queue 끝으로 LPUSH; 한계(기본 3) 초과 시 `dead_letter:` 로 이동. 5. **Startup recovery**: worker 시작 시 자신의 processing list가 비어있지 않으면 → 모두 main queue로 되돌림 (재시도). attempts 증가. 6. NAS측 producer는 무변경 (LPUSH 그대로). 단, payload schema에 `attempts: int` (optional) 필드 명시 — producer는 안 채워도 worker가 default 0으로. **Shared module 전략:** 4개 worker가 동일 패턴이므로 `services/_shared/reliable_queue.py` 1개 만들고 각 Dockerfile에서 `COPY services/_shared /app/_shared` 후 `from _shared.reliable_queue import ReliableQueue`. compose entry/dockerfile 변경 4건. (DRY > inline 4중복.) **Tech Stack:** Python 3.12, redis.asyncio 5.x, fakeredis (pytest dep), pytest-asyncio. **Working directory:** `C:\Users\jaeoh\Desktop\workspace\web-ai`. --- ## File Map | 파일 | 변경 | 책임 | |------|------|------| | `services/_shared/__init__.py` | Create | namespace package | | `services/_shared/reliable_queue.py` | Create | `ReliableQueue` 클래스 — dequeue, ack, fail, recover | | `services/_shared/tests/test_reliable_queue.py` | Create | fakeredis 단위 테스트 6개 | | `services/_shared/requirements.txt` | Create | redis>=5.0, fakeredis (test only) | | `services/insta-render/Dockerfile` | Modify | `COPY services/_shared /app/_shared` + PYTHONPATH | | `services/insta-render/worker.py` | Modify L1~ | BLPOP → ReliableQueue 사용 | | `services/insta-render/tests/test_worker.py` | Append | 1 integration test (recovery) | | `services/music-render/Dockerfile` | Modify | shared copy | | `services/music-render/worker.py` | Modify | ReliableQueue 사용 | | `services/music-render/tests/test_worker.py` | Append | recovery test | | `services/video-render/Dockerfile` | Modify | shared copy | | `services/video-render/worker.py` | Modify | ReliableQueue 사용 | | `services/video-render/tests/test_worker.py` | Append | recovery test | | `services/image-render/Dockerfile` | Modify | shared copy | | `services/image-render/worker.py` | Modify | ReliableQueue 사용 | | `services/image-render/tests/test_worker.py` | Append | recovery test | | `services/docker-compose.yml` (있다면) | Verify | build context가 services/ 루트 포함하는지 | --- ## Task 1: ReliableQueue 공유 모듈 작성 **Files:** - Create: `services/_shared/__init__.py` - Create: `services/_shared/reliable_queue.py` - Create: `services/_shared/tests/__init__.py` - Create: `services/_shared/tests/test_reliable_queue.py` - Create: `services/_shared/requirements.txt` - [ ] **Step 1: Create namespace package** ```python # services/_shared/__init__.py ``` (빈 파일) ```python # services/_shared/tests/__init__.py ``` (빈 파일) - [ ] **Step 2: Write failing tests first** ```python # services/_shared/tests/test_reliable_queue.py """F6 — ReliableQueue: atomic dequeue + recovery + retry.""" import json import fakeredis.aioredis import pytest from _shared.reliable_queue import ReliableQueue @pytest.fixture async def redis(): r = fakeredis.aioredis.FakeRedis(decode_responses=False) yield r await r.flushall() await r.aclose() async def test_dequeue_atomically_moves_to_processing(redis): """BLMOVE: queue → processing 원자적 이동.""" q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode()) payload, raw = await q.dequeue(timeout=1) assert payload["task_id"] == "t1" # main queue는 비어있고, processing list에 들어있어야 함 assert await redis.llen("queue:test") == 0 assert await redis.llen("processing:queue:test:w1") == 1 async def test_dequeue_returns_none_on_timeout(redis): q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") result = await q.dequeue(timeout=1) assert result is None async def test_ack_removes_from_processing(redis): q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") await redis.lpush("queue:test", json.dumps({"task_id": "t1"}).encode()) payload, raw = await q.dequeue(timeout=1) await q.ack(raw) assert await redis.llen("processing:queue:test:w1") == 0 async def test_recover_returns_orphaned_to_main_queue(redis): """startup recovery: 잔존 processing list 항목을 main queue로 되돌림.""" # 이전 crash 시뮬레이션: processing list에 잔존 orphan = json.dumps({"task_id": "t1", "attempts": 0}).encode() await redis.lpush("processing:queue:test:w1", orphan) q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1") recovered = await q.recover() assert recovered == 1 assert await redis.llen("processing:queue:test:w1") == 0 # 다시 dequeue 가능 payload, raw = await q.dequeue(timeout=1) assert payload["task_id"] == "t1" assert payload["attempts"] == 1 # incremented on recover async def test_fail_below_max_attempts_returns_to_main_queue(redis): q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3) await redis.lpush("queue:test", json.dumps({"task_id": "t1", "attempts": 0}).encode()) payload, raw = await q.dequeue(timeout=1) await q.fail(raw, payload) assert await redis.llen("processing:queue:test:w1") == 0 assert await redis.llen("queue:test") == 1 # attempts 증가됐는지 requeued_raw = await redis.lindex("queue:test", 0) requeued = json.loads(requeued_raw) assert requeued["attempts"] == 1 async def test_fail_at_max_attempts_moves_to_dead_letter(redis): q = ReliableQueue(redis, queue_key="queue:test", worker_id="w1", max_attempts=3) await redis.lpush( "queue:test", json.dumps({"task_id": "t1", "attempts": 2}).encode() ) payload, raw = await q.dequeue(timeout=1) await q.fail(raw, payload) # attempts 2 → 3 (== max) → dead-letter assert await redis.llen("queue:test") == 0 assert await redis.llen("processing:queue:test:w1") == 0 assert await redis.llen("dead_letter:queue:test") == 1 ``` - [ ] **Step 3: Add requirements** ```text # services/_shared/requirements.txt redis>=5.0.0 ``` 별도 dev requirements (test): ```text # services/_shared/tests/requirements-dev.txt (optional) fakeredis>=2.20.0 pytest>=8.0.0 pytest-asyncio>=0.23.0 ``` - [ ] **Step 4: Run tests to verify they fail** ``` cd services/_shared && python -m pytest tests/ -v ``` Expected: ImportError — reliable_queue.py 미존재. - [ ] **Step 5: Write reliable_queue.py** ```python # services/_shared/reliable_queue.py """F6 — Reliable Redis queue with processing list + recovery + retry. Pattern: BLMOVE main → processing (atomic), then either ack (LREM processing) or fail (LREM processing + re-enqueue or dead-letter). Startup recovery: any items left in the worker's processing list from a previous crash are pushed back to main queue (with attempts incremented). """ from __future__ import annotations import json import logging import os import socket from typing import Optional logger = logging.getLogger(__name__) def default_worker_id(queue_key: str) -> str: """env > hostname-pid.""" explicit = os.getenv("WORKER_ID") if explicit: return explicit return f"{queue_key}-{socket.gethostname()}-{os.getpid()}" class ReliableQueue: """Wraps a redis client to provide BLMOVE-backed atomic dequeue + processing list + retry/dead-letter. Producer side stays unchanged: LPUSH queue: . Worker side: dequeue() → process → ack(raw) on success or fail(raw, payload) on error. Startup: await queue.recover() to re-enqueue orphans. """ def __init__( self, redis, queue_key: str, worker_id: Optional[str] = None, max_attempts: int = 3, ): self._redis = redis self._queue_key = queue_key self._worker_id = worker_id or default_worker_id(queue_key) self._processing_key = f"processing:{queue_key}:{self._worker_id}" self._dead_letter_key = f"dead_letter:{queue_key}" self._max_attempts = max_attempts @property def processing_key(self) -> str: return self._processing_key async def dequeue(self, timeout: int = 5) -> Optional[tuple[dict, bytes]]: """Atomically move 1 item from main queue tail to processing head. Returns (parsed_dict, raw_bytes) or None on timeout. Caller MUST call ack(raw) on success or fail(raw, payload) on error. """ raw = await self._redis.blmove( self._queue_key, self._processing_key, timeout=timeout, src="RIGHT", dest="LEFT", ) if raw is None: return None try: payload = json.loads(raw) except json.JSONDecodeError: logger.error("invalid payload on dequeue, moving to dead-letter: %r", raw[:200]) await self._redis.lrem(self._processing_key, 1, raw) await self._redis.lpush(self._dead_letter_key, raw) return None return payload, raw async def ack(self, raw: bytes) -> None: """Successful processing — remove from processing list.""" removed = await self._redis.lrem(self._processing_key, 1, raw) if removed == 0: logger.warning("ack on missing payload (already removed?): %r", raw[:100]) async def fail(self, raw: bytes, payload: dict) -> None: """Failed processing — remove from processing list and either re-enqueue or dead-letter.""" await self._redis.lrem(self._processing_key, 1, raw) attempts = int(payload.get("attempts", 0)) + 1 if attempts >= self._max_attempts: payload["attempts"] = attempts await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode()) logger.error( "task moved to dead-letter after %d attempts: task_id=%s", attempts, payload.get("task_id"), ) return payload["attempts"] = attempts await self._redis.lpush(self._queue_key, json.dumps(payload).encode()) logger.info( "task re-enqueued (attempt %d/%d): task_id=%s", attempts, self._max_attempts, payload.get("task_id"), ) async def recover(self) -> int: """Startup: move all orphans from this worker's processing list back to main queue. Increments attempts counter (orphan == implicit failure). Returns count of recovered items. """ count = 0 while True: raw = await self._redis.lpop(self._processing_key) if raw is None: break try: payload = json.loads(raw) except json.JSONDecodeError: await self._redis.lpush(self._dead_letter_key, raw) count += 1 continue payload["attempts"] = int(payload.get("attempts", 0)) + 1 if payload["attempts"] >= self._max_attempts: await self._redis.lpush(self._dead_letter_key, json.dumps(payload).encode()) else: await self._redis.lpush(self._queue_key, json.dumps(payload).encode()) count += 1 if count: logger.info("recovered %d orphaned items for worker %s", count, self._worker_id) return count ``` **참고: redis-py blmove API**: `client.blmove(first_list, second_list, timeout, src=..., dest=...)`. timeout=0 은 block forever. payload는 bytes로 받음 (`decode_responses=False` 가정). - [ ] **Step 6: Run tests to verify they pass** ``` cd services/_shared && python -m pytest tests/ -v ``` Expected: 6 PASS. 만약 ImportError (`fakeredis` 미설치) 발생 시: ``` python -m pip install fakeredis pytest-asyncio ``` 또한 `pytest.ini` 또는 `conftest.py`에 `asyncio_mode = "auto"` 필요. 신규 conftest: ```python # services/_shared/tests/conftest.py import pytest pytest_plugins = ["pytest_asyncio"] def pytest_collection_modifyitems(config, items): for item in items: if "asyncio" in item.fixturenames or item.get_closest_marker("asyncio") is not None: continue # auto-mark all async tests if item.function.__name__.startswith("test_"): import asyncio, inspect if inspect.iscoroutinefunction(item.function): item.add_marker(pytest.mark.asyncio) ``` 또는 더 간단히 `services/_shared/pytest.ini`: ```ini [pytest] asyncio_mode = auto ``` - [ ] **Step 7: Commit** ```bash git add services/_shared/ git commit -m "feat(services): _shared/reliable_queue 신설 — BLMOVE + processing list + retry (F6 part 1)" ``` --- ## Task 2: insta-render에 ReliableQueue 적용 **Files:** - Modify: `services/insta-render/Dockerfile` - Modify: `services/insta-render/worker.py` - Modify: `services/insta-render/tests/test_worker.py` (append) - [ ] **Step 1: Update Dockerfile** `services/insta-render/Dockerfile` 에 `_shared` 복사 추가. 기존 Dockerfile 패턴을 먼저 읽고, `COPY services/insta-render /app` 같은 라인이 있다면 그 위 또는 옆에: ```dockerfile COPY services/_shared /app/_shared ENV PYTHONPATH=/app:/app/_shared:${PYTHONPATH} ``` build context가 `services/` 루트여야 함. compose에서 `build: { context: ./services, dockerfile: insta-render/Dockerfile }` 인지 확인 — 아니라면 context 조정 필요. - [ ] **Step 2: Modify worker.py — failing test first** `services/insta-render/tests/test_worker.py` 끝에 추가: ```python import json from unittest.mock import AsyncMock, patch import pytest @pytest.mark.asyncio async def test_worker_calls_ack_on_success(): """성공 시 ack() 호출 (F6).""" import worker fake_payload = {"task_id": "t1", "job_type": "card_generation", "params": {}} fake_raw = json.dumps(fake_payload).encode() fake_queue = AsyncMock() fake_queue.dequeue = AsyncMock(side_effect=[(fake_payload, fake_raw), None]) fake_queue.ack = AsyncMock() fake_queue.fail = AsyncMock() fake_queue.recover = AsyncMock(return_value=0) with patch.object(worker, "ReliableQueue", return_value=fake_queue), \ patch.object(worker, "_dispatch") as disp: # poll_once로 1 cycle만 실행 (실제 loop 끊기 위해) await worker.poll_once(fake_queue) disp.assert_called_once() fake_queue.ack.assert_called_once_with(fake_raw) fake_queue.fail.assert_not_called() @pytest.mark.asyncio async def test_worker_calls_fail_on_dispatch_exception(): """dispatch 예외 시 fail() 호출 — 작업 손실 안 됨 (F6).""" import worker fake_payload = {"task_id": "t2", "job_type": "card_generation", "params": {}} fake_raw = json.dumps(fake_payload).encode() fake_queue = AsyncMock() fake_queue.dequeue = AsyncMock(return_value=(fake_payload, fake_raw)) fake_queue.ack = AsyncMock() fake_queue.fail = AsyncMock() with patch.object(worker, "_dispatch", side_effect=RuntimeError("boom")): await worker.poll_once(fake_queue) fake_queue.fail.assert_called_once_with(fake_raw, fake_payload) fake_queue.ack.assert_not_called() ``` - [ ] **Step 3: Run test to fail** ``` cd services/insta-render && python -m pytest tests/ -v -k "ack_on_success or fail_on_dispatch" ``` Expected: AttributeError (`worker.poll_once` 미존재, `worker.ReliableQueue` 미존재). - [ ] **Step 4: Rewrite insta-render worker.py** ```python """Redis 기반 worker — F6 신뢰성 패턴 적용 (BLMOVE + processing list + retry).""" from __future__ import annotations import asyncio import logging import os import sys import redis.asyncio as aioredis from _shared.reliable_queue import ReliableQueue from nas_client import webhook_update_task # 기존 dispatch 대상 import 유지 from card_renderer import render_card logger = logging.getLogger(__name__) REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379") QUEUE_KEY = "queue:insta-render" PAUSED_KEY = "queue:paused" _DISPATCH_TABLE = { "card_generation": "render_card", } def _dispatch(payload: dict) -> None: job_type = payload.get("job_type", "") task_id = payload.get("task_id", "") params = payload.get("params", {}) fn_name = _DISPATCH_TABLE.get(job_type) if fn_name is None: logger.error("unknown job_type=%s task=%s", job_type, task_id) webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}") return try: fn = getattr(sys.modules[__name__], fn_name) except AttributeError: webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}") return fn(task_id, params) async def poll_once(queue: ReliableQueue) -> bool: """1 cycle: dequeue → dispatch → ack/fail. Returns True if a job was handled.""" result = await queue.dequeue(timeout=5) if result is None: return False payload, raw = result try: await asyncio.to_thread(_dispatch, payload) except Exception: logger.exception("dispatch failed task_id=%s", payload.get("task_id")) await queue.fail(raw, payload) return True await queue.ack(raw) return True async def worker_loop(): redis = aioredis.from_url(REDIS_URL, decode_responses=False) queue = ReliableQueue(redis, queue_key=QUEUE_KEY) logger.info("insta-render worker started worker_id=%s", queue._worker_id) # F6: startup recovery try: recovered = await queue.recover() if recovered: logger.info("recovered %d orphaned items at startup", recovered) except Exception: logger.exception("startup recover failed") while True: try: paused = await redis.get(PAUSED_KEY) if paused == b"1": await asyncio.sleep(10) continue await poll_once(queue) except asyncio.CancelledError: logger.info("worker_loop cancelled") raise except Exception: logger.exception("worker_loop iteration 실패, 5초 후 재시도") await asyncio.sleep(5) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(worker_loop()) ``` **NOTE**: 기존 `insta-render/worker.py`의 dispatch table·import는 실제 파일을 보고 매핑 유지. 위 예시는 minimal — job_type / function 이름은 기존 파일과 맞춰야 함. 변경 전 `Read services/insta-render/worker.py`로 정확한 dispatch table 확인할 것. - [ ] **Step 5: Run tests** ``` cd services/insta-render && python -m pytest tests/ -v ``` Expected: 신규 2 PASS, 기존 PASS (dispatch table test 등). - [ ] **Step 6: Commit** ```bash git add services/insta-render/ git commit -m "fix(insta-render): F6 ReliableQueue 적용 — BLMOVE + ack/fail (F6 part 2)" ``` --- ## Task 3: music-render에 동일 적용 **Files:** - Modify: `services/music-render/Dockerfile`, `worker.py` - Modify: `services/music-render/tests/test_worker.py` (append) - [ ] **Step 1: Dockerfile에 `COPY services/_shared` 추가** - [ ] **Step 2: Test 추가 (Task 2 패턴 동일, 단 dispatch target은 `run_suno_generation` 등 기존 패턴)** ```python @pytest.mark.asyncio async def test_music_worker_ack_on_success(): import worker payload = {"task_id": "t1", "job_type": "suno_generation", "params": {}} raw = json.dumps(payload).encode() fake_queue = AsyncMock() fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) fake_queue.ack = AsyncMock() with patch.object(worker, "_dispatch"): await worker.poll_once(fake_queue) fake_queue.ack.assert_called_once_with(raw) @pytest.mark.asyncio async def test_music_worker_fail_on_exception(): import worker payload = {"task_id": "t2", "job_type": "suno_generation", "params": {}} raw = json.dumps(payload).encode() fake_queue = AsyncMock() fake_queue.dequeue = AsyncMock(return_value=(payload, raw)) fake_queue.fail = AsyncMock() with patch.object(worker, "_dispatch", side_effect=RuntimeError("x")): await worker.poll_once(fake_queue) fake_queue.fail.assert_called_once_with(raw, payload) ``` - [ ] **Step 3: Run test to fail** - [ ] **Step 4: Rewrite music-render worker.py — `worker_loop` 구조는 insta-render와 동일, `_dispatch` + `_DISPATCH_TABLE`은 기존 12개 함수 그대로 유지** - [ ] **Step 5: Run tests** - [ ] **Step 6: Commit** ```bash git add services/music-render/ git commit -m "fix(music-render): F6 ReliableQueue 적용 (F6 part 3)" ``` --- ## Task 4: video-render에 동일 적용 (Task 3와 동일 패턴 — sora/veo/kling/seedance 4 provider table 유지) - [ ] **Step 1: Dockerfile 수정** - [ ] **Step 2: 신규 test 2개 추가 (`test_video_worker_ack_on_success`, `test_video_worker_fail_on_exception`) — job_type은 `sora_generation`** - [ ] **Step 3: Run failing test** - [ ] **Step 4: Rewrite worker.py — 동일 패턴** - [ ] **Step 5: Run tests** - [ ] **Step 6: Commit** ```bash git add services/video-render/ git commit -m "fix(video-render): F6 ReliableQueue 적용 (F6 part 4)" ``` --- ## Task 5: image-render에 동일 적용 (gpt_image / nano_banana / flux 3 provider table 유지) - [ ] **Step 1-6: Task 3/4 동일 패턴** ```bash git add services/image-render/ git commit -m "fix(image-render): F6 ReliableQueue 적용 (F6 part 5)" ``` --- ## Task 6: 운영 검증 + push - [ ] **Step 1: 전체 services test 실행** ``` cd services && for d in _shared insta-render music-render video-render image-render; do echo "--- $d ---" (cd $d && python -m pytest tests/ -q) || true done ``` (또는 PowerShell:) ```powershell foreach ($d in @("_shared","insta-render","music-render","video-render","image-render")) { Write-Output "--- $d ---" Push-Location services/$d python -m pytest tests/ -q Pop-Location } ``` Expected: 4개 worker 각 신규 2개 + _shared 6개 + 기존 test 전부 PASS. - [ ] **Step 2: Docker build 시뮬 (옵션, 시간 허용 시)** ``` cd services && docker compose build insta-render music-render video-render image-render ``` Expected: build context에 `_shared` 포함됨 검증. - [ ] **Step 3: Push** ```bash git push origin main ``` - [ ] **Step 4: 운영 deploy 시 주의사항 (수동)** NAS에서 컨테이너 재배포 시: 1. `redis-cli -h 192.168.45.54 KEYS 'processing:*'` 로 기존 orphan 확인 — 있다면 worker_id 다르면 안 잡힘. 수동으로 `LMOVE` 해야 할 수도 있음. 2. `redis-cli -h 192.168.45.54 KEYS 'dead_letter:*'` 로 dead-letter 모니터 — 누적되면 alerting 필요. 3. WORKER_ID env로 unique 하게 (`WORKER_ID=insta-render-prod-1` 등) 권장 — hostname이 컨테이너 재기동 시 바뀌면 orphan 추적 안 됨. --- ## Self-Review 1. **atomic dequeue**: `BLMOVE` 단일 명령 — Redis 단일 트랜잭션 ✅ 2. **ack on success**: `LREM processing 1 raw` — 정확 1개 ✅ 3. **fail with retry**: attempts < max → 재큐, attempts >= max → dead-letter ✅ 4. **startup recovery**: orphan 자동 재큐 (attempts 증가) ✅ 5. **4 worker 적용**: insta/music/video/image 동일 패턴 ✅ 6. **NAS producer 호환**: LPUSH 그대로, payload schema에 attempts 선택적 ✅ **미커버 (의도적)**: - dead-letter monitor/alert — 운영 작업 (CHECK_POINT 백로그) - worker_id env 미설정 시 hostname 변경 시 orphan 분실 — 운영 가이드에 명시 **가정 검증**: - `redis-py.aioredis.blmove` 시그니처: `(first_list, second_list, timeout, src='LEFT', dest='RIGHT')`. redis>=5.0 권장. - fakeredis: `fakeredis.aioredis.FakeRedis` (>=2.20.0) 가 BLMOVE 지원함 — 미지원 시 plan 적용 전 검증. --- ## Execution Handoff **Plan complete and saved to `docs/superpowers/plans/2026-05-25-render-queue-reliability.md`.** **1. Subagent-Driven (recommended)** — Task 별 fresh subagent. 4개 worker는 패턴 같으나 dispatch table은 각 worker 고유 — subagent가 정확히 일관성 유지하도록 review checkpoint. **2. Inline Execution** — 현 세션 실행. 박재오 결정 대기. Plan 1·2 마친 후 진입 권장 (작업량 가장 큼 — 4개 worker × 약 1시간 = 4시간).