# music/YouTube 파이프라인 신뢰성·복구 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 파이프라인 step 실패를 자동 재시도(일시적, publish 제외)로 흡수하고, 영구 실패는 terminal `failed`로 둔 뒤 실패 step부터 수동 재개(텔레그램 [🔄재시도])할 수 있게 한다. **Architecture:** music-lab `orchestrator.run_step`에 bounded 재시도 루프 + `POST /pipeline/{id}/retry` 재개 엔드포인트 + `db.get_last_failed_step`. agent-office `youtube_publisher`가 `failed` 감지 → 텔레그램 알림+버튼, `webhook`이 `ytpub_retry_{pid}` 콜백을 music-lab retry로 프록시. **Tech Stack:** Python 3.12 / FastAPI / SQLite / asyncio / pytest. 기존 패턴: `orchestrator.run_step`(BackgroundTask), `main.py` pipeline 엔드포인트(404/409 + `_db_module`), `service_proxy`(httpx + `MUSIC_LAB_URL`), `telegram/webhook.py`(callback prefix 디스패치). **Spec:** `docs/superpowers/specs/2026-06-12-music-pipeline-reliability-design.md` > **테스트 fixture 주의**: music-lab/agent-office 각 `tests/conftest.py`의 DB 격리 방식(`db.DB_PATH` monkeypatch + `init_db`)을 먼저 확인하고 아래 테스트의 fixture를 그 관례에 맞춰라. 아래 코드는 `db.DB_PATH`를 tmp로 monkeypatch하는 표준 패턴을 가정한다. --- ## File Structure | 파일 | 변경 | 책임 | |------|------|------| | `music-lab/app/db.py` | Modify | `get_last_failed_step(pid)` 추가 | | `music-lab/app/pipeline/orchestrator.py` | Modify | `_dispatch_step` 추출 + `run_step` 재시도 루프 | | `music-lab/app/main.py` | Modify | `POST /api/music/pipeline/{pid}/retry` | | `music-lab/tests/test_pipeline_retry.py` | Create | db + orchestrator + endpoint 테스트 | | `agent-office/app/service_proxy.py` | Modify | `pipeline_retry(pid)`, `list_failed_pipelines()` | | `agent-office/app/agents/youtube_publisher.py` | Modify | `failed` 감지 → 텔레그램 알림+버튼 | | `agent-office/app/telegram/webhook.py` | Modify | `ytpub_retry_` 디스패치 | | `agent-office/tests/test_youtube_publisher_retry.py` | Create | 알림 + 콜백 테스트 | | `web-backend/CLAUDE.md` + `memory/service_music.md` | Modify | API 표 + 메모리 | --- ## Task 1: music-lab db — `get_last_failed_step` **Files:** Modify `music-lab/app/db.py`; Test `music-lab/tests/test_pipeline_retry.py` (Create) - [ ] **Step 1: 실패 테스트 작성** `music-lab/tests/test_pipeline_retry.py` (fixture는 music-lab conftest 관례에 맞춰 조정): ```python import pytest from app import db @pytest.fixture(autouse=True) def _tmp_db(tmp_path, monkeypatch): monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db")) db.init_db() def _make_pipeline_with_failed_step(step: str) -> int: pid = db.create_pipeline(track_id=1) # 시그니처는 conftest/db 확인 후 맞출 것 job = db.create_pipeline_job(pid, step) db.update_pipeline_job(job, status="failed", error="boom") db.update_pipeline_state(pid, "failed", failed_reason=f"{step}: boom") return pid def test_get_last_failed_step_returns_step(): pid = _make_pipeline_with_failed_step("video") assert db.get_last_failed_step(pid) == "video" def test_get_last_failed_step_none_when_no_failure(): pid = db.create_pipeline(track_id=1) db.create_pipeline_job(pid, "cover") # status 기본(running/succeeded), failed 아님 assert db.get_last_failed_step(pid) is None ``` - [ ] **Step 2: 실패 확인** Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py::test_get_last_failed_step_returns_step -v` Expected: FAIL — `db.get_last_failed_step` 미존재. (create_pipeline 시그니처가 다르면 helper를 db의 실제 생성 함수에 맞춰 수정.) - [ ] **Step 3: 구현** `music-lab/app/db.py`의 pipeline_jobs 섹션(`list_pipeline_jobs` 근처)에 추가: ```python def get_last_failed_step(pid: int) -> Optional[str]: """파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None.""" with _connect() as conn: # music-lab의 커넥션 헬퍼 이름에 맞출 것 row = conn.execute( "SELECT step FROM pipeline_jobs " "WHERE pipeline_id = ? AND status = 'failed' " "ORDER BY id DESC LIMIT 1", (pid,), ).fetchone() return row["step"] if row else None ``` (`_connect`/`_conn` 등 실제 커넥션 컨텍스트매니저 이름은 db.py 상단 확인 후 일치시킬 것.) - [ ] **Step 4: 통과 확인** Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k get_last_failed` Expected: 2 PASS. - [ ] **Step 5: 커밋** ```bash git add music-lab/app/db.py music-lab/tests/test_pipeline_retry.py git commit -m "feat(music-lab): get_last_failed_step — 파이프라인 재개용 실패 step 판별 Co-Authored-By: Claude Opus 4.8 (1M context) " ``` --- ## Task 2: orchestrator 자동 재시도 **Files:** Modify `music-lab/app/pipeline/orchestrator.py`; Test `music-lab/tests/test_pipeline_retry.py` - [ ] **Step 1: 실패 테스트 작성** (test_pipeline_retry.py에 추가) ```python import asyncio from app.pipeline import orchestrator @pytest.fixture(autouse=True) def _no_backoff(monkeypatch): monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0]) @pytest.mark.asyncio async def test_retryable_step_retries_then_succeeds(monkeypatch): pid = db.create_pipeline(track_id=1) calls = {"n": 0} async def flaky(step, p, ctx, feedback): calls["n"] += 1 if calls["n"] < 3: raise RuntimeError("transient") return {"next_state": "video_pending", "fields": {}} monkeypatch.setattr(orchestrator, "_dispatch_step", flaky) monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}) await orchestrator.run_step(pid, "cover") assert calls["n"] == 3 assert db.get_pipeline(pid)["state"] == "video_pending" @pytest.mark.asyncio async def test_retryable_step_exhausts_to_failed(monkeypatch): pid = db.create_pipeline(track_id=1) async def always_fail(step, p, ctx, feedback): raise RuntimeError("permanent") monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail) monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}) await orchestrator.run_step(pid, "cover") assert db.get_pipeline(pid)["state"] == "failed" @pytest.mark.asyncio async def test_publish_not_retried(monkeypatch): pid = db.create_pipeline(track_id=1) calls = {"n": 0} async def fail_publish(step, p, ctx, feedback): calls["n"] += 1 raise RuntimeError("upload error") monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish) monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}) await orchestrator.run_step(pid, "publish") assert calls["n"] == 1 # 재시도 없음 assert db.get_pipeline(pid)["state"] == "failed" ``` - [ ] **Step 2: 실패 확인** Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"` Expected: FAIL — `_dispatch_step`/`STEP_RETRY_BACKOFF_SEC` 미존재. - [ ] **Step 3: 구현 — `_dispatch_step` 추출 + 재시도 루프** `orchestrator.py` 상단 상수 추가: ```python STEP_MAX_RETRIES = 2 # 추가 재시도 횟수 (총 시도 = +1) STEP_RETRY_BACKOFF_SEC = [5, 15] NON_RETRY_STEPS = {"publish"} ``` 기존 if/elif 분기(현재 `run_step` 내 lines 32-45)를 헬퍼로 추출: ```python async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict: if step == "cover": return await _run_cover(p, ctx, feedback) if step == "video": return await _run_video(p, ctx) if step == "thumb": return await _run_thumb(p, ctx, feedback) if step == "meta": return await _run_meta(p, ctx, feedback) if step == "review": return await _run_review(p, ctx) if step == "publish": return await _run_publish(p, ctx) raise ValueError(f"unknown step: {step}") ``` `run_step`의 try 블록(step 실행부)을 재시도 루프로 교체: ```python try: ctx = _resolve_input(p) except ValueError as e: db.update_pipeline_job(job_id, status="failed", error=str(e)) db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") return attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1) last_err = None for i in range(attempts): try: result = await _dispatch_step(step, p, ctx, feedback) db.update_pipeline_job(job_id, status="succeeded") db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {})) return except Exception as e: last_err = e logger.exception("step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts) if i < attempts - 1: await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)]) db.update_pipeline_job(job_id, status="failed", error=str(last_err)) db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}") ``` (`asyncio`는 이미 import됨.) - [ ] **Step 4: 통과 확인** Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"` Expected: 3 PASS. - [ ] **Step 5: 커밋** ```bash git add music-lab/app/pipeline/orchestrator.py music-lab/tests/test_pipeline_retry.py git commit -m "feat(music-lab): orchestrator step 자동 재시도 (publish 제외) Co-Authored-By: Claude Opus 4.8 (1M context) " ``` --- ## Task 3: retry 엔드포인트 **Files:** Modify `music-lab/app/main.py`; Test `music-lab/tests/test_pipeline_retry.py` - [ ] **Step 1: 실패 테스트 작성** ```python from fastapi.testclient import TestClient @pytest.fixture def client(monkeypatch): from app.main import app return TestClient(app) def test_retry_failed_pipeline_retriggers(client, monkeypatch): pid = db.create_pipeline(track_id=1) job = db.create_pipeline_job(pid, "video") db.update_pipeline_job(job, status="failed", error="boom") db.update_pipeline_state(pid, "failed", failed_reason="video: boom") called = {} from app.pipeline import orchestrator async def fake_run(p, step, *a): called["pid"], called["step"] = p, step monkeypatch.setattr(orchestrator, "run_step", fake_run) r = client.post(f"/api/music/pipeline/{pid}/retry") assert r.status_code in (200, 202) assert r.json()["retrying_step"] == "video" def test_retry_non_failed_409(client): pid = db.create_pipeline(track_id=1) # state='created' r = client.post(f"/api/music/pipeline/{pid}/retry") assert r.status_code == 409 def test_retry_publish_with_video_id_rejected(client): pid = db.create_pipeline(track_id=1) job = db.create_pipeline_job(pid, "publish") db.update_pipeline_job(job, status="failed", error="x") db.update_pipeline_state(pid, "failed", failed_reason="publish: x", youtube_video_id="abc123") r = client.post(f"/api/music/pipeline/{pid}/retry") assert r.status_code == 409 ``` - [ ] **Step 2: 실패 확인** Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k retry_` Expected: FAIL — 라우트 404. - [ ] **Step 3: 구현** `music-lab/app/main.py`의 `cancel_pipeline` 아래에 추가: ```python @app.post("/api/music/pipeline/{pid}/retry", status_code=202) async def retry_pipeline(pid: int, bg: BackgroundTasks): p = _db_module.get_pipeline(pid) if not p: raise HTTPException(404) if p["state"] != "failed": raise HTTPException(409, f"재개 불가 (state={p['state']})") failed_step = _db_module.get_last_failed_step(pid) if not failed_step: # 폴백: failed_reason "{step}: ..." prefix reason = p.get("failed_reason") or "" failed_step = reason.split(":", 1)[0].strip() or None if not failed_step: raise HTTPException(409, "실패 step을 판별할 수 없음") if failed_step == "publish" and p.get("youtube_video_id"): raise HTTPException(409, "이미 업로드됨 (중복 방지)") bg.add_task(orchestrator.run_step, pid, failed_step) return {"ok": True, "retrying_step": failed_step} ``` - [ ] **Step 4: 통과 확인 + 전체 회귀** Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v` → 모두 PASS Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q` → 회귀 0 - [ ] **Step 5: 커밋** ```bash git add music-lab/app/main.py music-lab/tests/test_pipeline_retry.py git commit -m "feat(music-lab): POST /pipeline/{id}/retry — 실패 step 수동 재개 Co-Authored-By: Claude Opus 4.8 (1M context) " ``` --- ## Task 4: agent-office service_proxy — pipeline_retry + list_failed **Files:** Modify `agent-office/app/service_proxy.py` > **먼저 확인**: `list_active_pipelines`가 호출하는 `GET /api/music/pipeline?status=active`가 failed를 포함하는지. 미포함이면 music-lab의 pipeline list 엔드포인트가 `status=failed`도 지원하는지 확인하고, 없으면 그 엔드포인트에 failed 필터를 추가(별도 작은 수정)하거나 `status` 화이트리스트에 'failed' 추가. - [ ] **Step 1: 헬퍼 추가** — 기존 `list_active_pipelines`/`post_pipeline_feedback` 패턴(async with httpx.AsyncClient + MUSIC_LAB_URL) 그대로: ```python async def list_failed_pipelines() -> list[dict]: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=failed") resp.raise_for_status() data = resp.json() return data if isinstance(data, list) else data.get("items", data.get("pipelines", [])) async def pipeline_retry(pid: int) -> dict: async with httpx.AsyncClient(timeout=15) as client: resp = await client.post(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry") # 409(재개 불가/중복)도 본문 반환 위해 raise 안 함 return {"status_code": resp.status_code, **(resp.json() if resp.headers.get("content-type","").startswith("application/json") else {})} ``` (`list_active_pipelines`가 이미 failed를 포함하면 `list_failed_pipelines`는 생략하고 Task 5에서 active 목록에서 state=='failed' 필터.) - [ ] **Step 2: import sanity** — `cd agent-office && PYTHONPATH=.. python -c "from app import service_proxy; print('OK')"` → OK - [ ] **Step 3: 커밋** ```bash git add agent-office/app/service_proxy.py git commit -m "feat(agent-office): service_proxy pipeline_retry + list_failed_pipelines Co-Authored-By: Claude Opus 4.8 (1M context) " ``` --- ## Task 5: youtube_publisher — failed 감지 + 텔레그램 알림/버튼 **Files:** Modify `agent-office/app/agents/youtube_publisher.py`; Test `agent-office/tests/test_youtube_publisher_retry.py` (Create) - [ ] **Step 1: 실패 테스트 작성** `agent-office/tests/test_youtube_publisher_retry.py` (DB fixture는 agent-office conftest 관례 따름): ```python import pytest from unittest.mock import AsyncMock from app.agents.youtube_publisher import YoutubePublisherAgent @pytest.mark.asyncio async def test_failed_pipeline_notified_with_retry_button(monkeypatch): agent = YoutubePublisherAgent() monkeypatch.setattr( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", AsyncMock(return_value=[ {"id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T"} ]), ) sent = AsyncMock(return_value={"ok": True, "message_id": 1}) monkeypatch.setattr("app.agents.youtube_publisher.send_raw", sent) await agent.poll_state_changes() assert sent.await_count == 1 args, kwargs = sent.await_args text = kwargs.get("text") or (args[0] if args else "") assert "실패" in text # 인라인 retry 버튼 callback_data rm = kwargs.get("reply_markup") or {} cb = rm["inline_keyboard"][0][0]["callback_data"] assert cb == "ytpub_retry_7" # 중복 방지: 같은 failed 재폴링 시 미발송 await agent.poll_state_changes() assert sent.await_count == 1 ``` (주의: `send_raw`가 `reply_markup`을 지원하는지 messaging 확인 — 미지원 시 Task에 messaging.send_raw에 reply_markup 인자 추가 포함. insta는 send_photo로 했으나 여기선 텍스트+버튼이므로 send_raw에 reply_markup 필요.) - [ ] **Step 2: 실패 확인** — `cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → FAIL (failed 미처리) - [ ] **Step 3: 구현** — `poll_state_changes`에 failed 분기 추가: ```python def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._notified_state_per_pipeline: dict[int, tuple] = {} self._notified_failed: set[int] = set() ``` `poll_state_changes` 루프 내, `*_pending` 처리 뒤: ```python if state == "failed" and pid not in self._notified_failed: await self._notify_failed(p) self._notified_failed.add(pid) if state != "failed": self._notified_failed.discard(pid) # 재개 후 다시 실패하면 재알림 ``` 새 메서드: ```python async def _notify_failed(self, p: dict) -> None: reason = p.get("failed_reason") or "?" step = reason.split(":", 1)[0].strip() title = p.get("track_title") or f"Pipeline #{p['id']}" text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}" kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]} await send_raw(text=text, reply_markup=kb) add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning") ``` `send_raw`가 `reply_markup`을 받도록 `agent-office/app/telegram/messaging.py`의 `send_raw` 시그니처 확인/확장(이미 지원하면 그대로). - [ ] **Step 4: 통과 확인** — `cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → PASS + 전체 회귀 - [ ] **Step 5: 커밋** ```bash git add agent-office/app/agents/youtube_publisher.py agent-office/app/telegram/messaging.py agent-office/tests/test_youtube_publisher_retry.py git commit -m "feat(agent-office): youtube_publisher 파이프라인 실패 텔레그램 알림+재시도 버튼 Co-Authored-By: Claude Opus 4.8 (1M context) " ``` --- ## Task 6: webhook ytpub_retry 디스패치 **Files:** Modify `agent-office/app/telegram/webhook.py`; Test `agent-office/tests/test_youtube_publisher_retry.py` > **먼저 확인**: `_handle_callback`의 prefix 분기 구조 + 기존 핸들러(`_handle_insta_issue` 등)가 service_proxy를 호출/회신하는 패턴. - [ ] **Step 1: 실패 테스트 추가** ```python @pytest.mark.asyncio async def test_handle_ytpub_retry_calls_proxy(monkeypatch): from app.telegram import webhook retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"}) monkeypatch.setattr("app.telegram.webhook.service_proxy.pipeline_retry", retry, raising=False) monkeypatch.setattr("app.telegram.webhook.send_raw", AsyncMock(), raising=False) res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7") retry.assert_awaited_once_with(7) ``` (import 경로/`send_raw` 위치는 webhook.py 실제에 맞춤.) - [ ] **Step 2: 실패 확인** → FAIL (`_handle_ytpub_retry` 미존재) - [ ] **Step 3: 구현** — `_handle_callback`에 분기: ```python if callback_id.startswith("ytpub_retry_"): return await _handle_ytpub_retry(callback_query, callback_id) ``` 핸들러: ```python async def _handle_ytpub_retry(callback_query: dict, callback_id: str) -> dict: try: pid = int(callback_id.removeprefix("ytpub_retry_")) except (ValueError, AttributeError): return {"ok": False, "error": "invalid_callback_data"} res = await service_proxy.pipeline_retry(pid) sc = res.get("status_code") if sc in (200, 202): await send_raw(text=f"🔄 파이프라인 #{pid} 재개: {res.get('retrying_step','?')}") else: await send_raw(text=f"⚠️ 재개 불가 (#{pid}): {res.get('detail', sc)}") return {"ok": True} ``` (`service_proxy`/`send_raw` import는 webhook.py 기존 방식 따름.) - [ ] **Step 4: 통과 확인** + 전체 agent-office 회귀 - [ ] **Step 5: 커밋** ```bash git add agent-office/app/telegram/webhook.py agent-office/tests/test_youtube_publisher_retry.py git commit -m "feat(agent-office): ytpub_retry 텔레그램 콜백 → music-lab retry 프록시 Co-Authored-By: Claude Opus 4.8 (1M context) " ``` --- ## Task 7: 문서 + 배포 + 메모리 **Files:** Modify `web-backend/CLAUDE.md`, `memory/service_music.md` - [ ] **Step 1: CLAUDE.md music API 표에 추가** ``` | POST | `/api/music/pipeline/{id}/retry` | 실패 파이프라인 실패 step부터 재개 (publish+업로드완료 시 409) | ``` - [ ] **Step 2: 전체 회귀** ```bash cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q cd ../agent-office && PYTHONPATH=.. python -m pytest tests/ -q ``` Expected: 모두 PASS (사전존재 stale 제외). - [ ] **Step 3: 커밋 + push (NAS 배포)** ```bash cd C:/Users/jaeoh/Desktop/workspace/web-backend git add CLAUDE.md docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md git commit -m "docs(music): 파이프라인 retry API 문서 + 구현 계획 Co-Authored-By: Claude Opus 4.8 (1M context) " git push origin main ``` - [ ] **Step 4: 메모리 갱신** — `service_music.md`에 신뢰성/복구(자동 재시도 publish 제외 + 수동 retry 엔드포인트 + youtube_publisher 실패 알림) 추가. - [ ] **Step 5: 프로덕션 확인(경량)** — 배포 후 `POST /api/music/pipeline/<없는id>/retry` → 404, 실제 failed 파이프라인 있으면 retry 동작. (없으면 단위 테스트로 갈음.) --- ## Self-Review **Spec coverage:** - 자동 재시도(publish 제외, _resolve_input 제외) → Task 2 ✓ - 수동 재개(실패 step, publish+video_id 가드) → Task 1(step 판별)+Task 3 ✓ - 실패 알림 + [🔄재시도] → Task 5 ✓ - 재시도 콜백 → Task 4(proxy)+Task 6(dispatch) ✓ - stuck 감지 제외(YAGNI) → 계획에 없음 ✓ **Placeholder scan:** 코드 스텝 모두 구체. "conftest 관례 확인"·"list_active가 failed 포함하는지 확인"은 기존 코드 소유를 존중하는 의도적 검증 지시(placeholder 아님). **Type consistency:** `get_last_failed_step(pid)` Task1↔Task3 일치. `_dispatch_step(step,p,ctx,feedback)` Task2 정의↔테스트 mock 일치. `run_step(pid, step)` 시그니처 기존 일치. callback `ytpub_retry_{pid}` Task5 생성↔Task6 파싱 일치. `pipeline_retry(pid)` Task4↔Task6 일치. retry 응답 `retrying_step`/`status_code` Task3↔Task4↔Task6 일치.