Files
web-page-backend/docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
2026-06-12 00:12:33 +09:00

23 KiB

music/YouTube 파이프라인 신뢰성·복구 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 파이프라인 step 실패를 자동 재시도(일시적, publish 제외)로 흡수하고, 영구 실패는 terminal failed로 둔 뒤 실패 step부터 수동 재개(텔레그램 [🔄재시도])할 수 있게 한다.

Architecture: music-lab orchestrator.run_step에 bounded 재시도 루프 + POST /pipeline/{id}/retry 재개 엔드포인트 + db.get_last_failed_step. agent-office youtube_publisherfailed 감지 → 텔레그램 알림+버튼, webhookytpub_retry_{pid} 콜백을 music-lab retry로 프록시.

Tech Stack: Python 3.12 / FastAPI / SQLite / asyncio / pytest. 기존 패턴: orchestrator.run_step(BackgroundTask), main.py pipeline 엔드포인트(404/409 + _db_module), service_proxy(httpx + MUSIC_LAB_URL), telegram/webhook.py(callback prefix 디스패치).

Spec: docs/superpowers/specs/2026-06-12-music-pipeline-reliability-design.md

테스트 fixture 주의: music-lab/agent-office 각 tests/conftest.py의 DB 격리 방식(db.DB_PATH monkeypatch + init_db)을 먼저 확인하고 아래 테스트의 fixture를 그 관례에 맞춰라. 아래 코드는 db.DB_PATH를 tmp로 monkeypatch하는 표준 패턴을 가정한다.


File Structure

파일 변경 책임
music-lab/app/db.py Modify get_last_failed_step(pid) 추가
music-lab/app/pipeline/orchestrator.py Modify _dispatch_step 추출 + run_step 재시도 루프
music-lab/app/main.py Modify POST /api/music/pipeline/{pid}/retry
music-lab/tests/test_pipeline_retry.py Create db + orchestrator + endpoint 테스트
agent-office/app/service_proxy.py Modify pipeline_retry(pid), list_failed_pipelines()
agent-office/app/agents/youtube_publisher.py Modify failed 감지 → 텔레그램 알림+버튼
agent-office/app/telegram/webhook.py Modify ytpub_retry_ 디스패치
agent-office/tests/test_youtube_publisher_retry.py Create 알림 + 콜백 테스트
web-backend/CLAUDE.md + memory/service_music.md Modify API 표 + 메모리

Task 1: music-lab db — get_last_failed_step

Files: Modify music-lab/app/db.py; Test music-lab/tests/test_pipeline_retry.py (Create)

  • Step 1: 실패 테스트 작성

music-lab/tests/test_pipeline_retry.py (fixture는 music-lab conftest 관례에 맞춰 조정):

import pytest
from app import db


@pytest.fixture(autouse=True)
def _tmp_db(tmp_path, monkeypatch):
    monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
    db.init_db()


def _make_pipeline_with_failed_step(step: str) -> int:
    pid = db.create_pipeline(track_id=1)  # 시그니처는 conftest/db 확인 후 맞출 것
    job = db.create_pipeline_job(pid, step)
    db.update_pipeline_job(job, status="failed", error="boom")
    db.update_pipeline_state(pid, "failed", failed_reason=f"{step}: boom")
    return pid


def test_get_last_failed_step_returns_step():
    pid = _make_pipeline_with_failed_step("video")
    assert db.get_last_failed_step(pid) == "video"


def test_get_last_failed_step_none_when_no_failure():
    pid = db.create_pipeline(track_id=1)
    db.create_pipeline_job(pid, "cover")  # status 기본(running/succeeded), failed 아님
    assert db.get_last_failed_step(pid) is None
  • Step 2: 실패 확인

Run: cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py::test_get_last_failed_step_returns_step -v Expected: FAIL — db.get_last_failed_step 미존재. (create_pipeline 시그니처가 다르면 helper를 db의 실제 생성 함수에 맞춰 수정.)

  • Step 3: 구현

music-lab/app/db.py의 pipeline_jobs 섹션(list_pipeline_jobs 근처)에 추가:

def get_last_failed_step(pid: int) -> Optional[str]:
    """파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None."""
    with _connect() as conn:   # music-lab의 커넥션 헬퍼 이름에 맞출 것
        row = conn.execute(
            "SELECT step FROM pipeline_jobs "
            "WHERE pipeline_id = ? AND status = 'failed' "
            "ORDER BY id DESC LIMIT 1",
            (pid,),
        ).fetchone()
    return row["step"] if row else None

(_connect/_conn 등 실제 커넥션 컨텍스트매니저 이름은 db.py 상단 확인 후 일치시킬 것.)

  • Step 4: 통과 확인

Run: cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k get_last_failed Expected: 2 PASS.

  • Step 5: 커밋
git add music-lab/app/db.py music-lab/tests/test_pipeline_retry.py
git commit -m "feat(music-lab): get_last_failed_step — 파이프라인 재개용 실패 step 판별

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"

Task 2: orchestrator 자동 재시도

Files: Modify music-lab/app/pipeline/orchestrator.py; Test music-lab/tests/test_pipeline_retry.py

  • Step 1: 실패 테스트 작성 (test_pipeline_retry.py에 추가)
import asyncio
from app.pipeline import orchestrator


@pytest.fixture(autouse=True)
def _no_backoff(monkeypatch):
    monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0])


@pytest.mark.asyncio
async def test_retryable_step_retries_then_succeeds(monkeypatch):
    pid = db.create_pipeline(track_id=1)
    calls = {"n": 0}

    async def flaky(step, p, ctx, feedback):
        calls["n"] += 1
        if calls["n"] < 3:
            raise RuntimeError("transient")
        return {"next_state": "video_pending", "fields": {}}

    monkeypatch.setattr(orchestrator, "_dispatch_step", flaky)
    monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})

    await orchestrator.run_step(pid, "cover")
    assert calls["n"] == 3
    assert db.get_pipeline(pid)["state"] == "video_pending"


@pytest.mark.asyncio
async def test_retryable_step_exhausts_to_failed(monkeypatch):
    pid = db.create_pipeline(track_id=1)

    async def always_fail(step, p, ctx, feedback):
        raise RuntimeError("permanent")

    monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail)
    monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})

    await orchestrator.run_step(pid, "cover")
    assert db.get_pipeline(pid)["state"] == "failed"


@pytest.mark.asyncio
async def test_publish_not_retried(monkeypatch):
    pid = db.create_pipeline(track_id=1)
    calls = {"n": 0}

    async def fail_publish(step, p, ctx, feedback):
        calls["n"] += 1
        raise RuntimeError("upload error")

    monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish)
    monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})

    await orchestrator.run_step(pid, "publish")
    assert calls["n"] == 1          # 재시도 없음
    assert db.get_pipeline(pid)["state"] == "failed"
  • Step 2: 실패 확인

Run: cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not" Expected: FAIL — _dispatch_step/STEP_RETRY_BACKOFF_SEC 미존재.

  • Step 3: 구현 — _dispatch_step 추출 + 재시도 루프

orchestrator.py 상단 상수 추가:

STEP_MAX_RETRIES = 2          # 추가 재시도 횟수 (총 시도 = +1)
STEP_RETRY_BACKOFF_SEC = [5, 15]
NON_RETRY_STEPS = {"publish"}

기존 if/elif 분기(현재 run_step 내 lines 32-45)를 헬퍼로 추출:

async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict:
    if step == "cover":
        return await _run_cover(p, ctx, feedback)
    if step == "video":
        return await _run_video(p, ctx)
    if step == "thumb":
        return await _run_thumb(p, ctx, feedback)
    if step == "meta":
        return await _run_meta(p, ctx, feedback)
    if step == "review":
        return await _run_review(p, ctx)
    if step == "publish":
        return await _run_publish(p, ctx)
    raise ValueError(f"unknown step: {step}")

run_step의 try 블록(step 실행부)을 재시도 루프로 교체:

    try:
        ctx = _resolve_input(p)
    except ValueError as e:
        db.update_pipeline_job(job_id, status="failed", error=str(e))
        db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
        return

    attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
    last_err = None
    for i in range(attempts):
        try:
            result = await _dispatch_step(step, p, ctx, feedback)
            db.update_pipeline_job(job_id, status="succeeded")
            db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
            return
        except Exception as e:
            last_err = e
            logger.exception("step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts)
            if i < attempts - 1:
                await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)])
    db.update_pipeline_job(job_id, status="failed", error=str(last_err))
    db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")

(asyncio는 이미 import됨.)

  • Step 4: 통과 확인

Run: cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not" Expected: 3 PASS.

  • Step 5: 커밋
git add music-lab/app/pipeline/orchestrator.py music-lab/tests/test_pipeline_retry.py
git commit -m "feat(music-lab): orchestrator step 자동 재시도 (publish 제외)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"

Task 3: retry 엔드포인트

Files: Modify music-lab/app/main.py; Test music-lab/tests/test_pipeline_retry.py

  • Step 1: 실패 테스트 작성
from fastapi.testclient import TestClient


@pytest.fixture
def client(monkeypatch):
    from app.main import app
    return TestClient(app)


def test_retry_failed_pipeline_retriggers(client, monkeypatch):
    pid = db.create_pipeline(track_id=1)
    job = db.create_pipeline_job(pid, "video")
    db.update_pipeline_job(job, status="failed", error="boom")
    db.update_pipeline_state(pid, "failed", failed_reason="video: boom")

    called = {}
    from app.pipeline import orchestrator
    async def fake_run(p, step, *a):
        called["pid"], called["step"] = p, step
    monkeypatch.setattr(orchestrator, "run_step", fake_run)

    r = client.post(f"/api/music/pipeline/{pid}/retry")
    assert r.status_code in (200, 202)
    assert r.json()["retrying_step"] == "video"


def test_retry_non_failed_409(client):
    pid = db.create_pipeline(track_id=1)  # state='created'
    r = client.post(f"/api/music/pipeline/{pid}/retry")
    assert r.status_code == 409


def test_retry_publish_with_video_id_rejected(client):
    pid = db.create_pipeline(track_id=1)
    job = db.create_pipeline_job(pid, "publish")
    db.update_pipeline_job(job, status="failed", error="x")
    db.update_pipeline_state(pid, "failed", failed_reason="publish: x", youtube_video_id="abc123")
    r = client.post(f"/api/music/pipeline/{pid}/retry")
    assert r.status_code == 409
  • Step 2: 실패 확인

Run: cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k retry_ Expected: FAIL — 라우트 404.

  • Step 3: 구현

music-lab/app/main.pycancel_pipeline 아래에 추가:

@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
async def retry_pipeline(pid: int, bg: BackgroundTasks):
    p = _db_module.get_pipeline(pid)
    if not p:
        raise HTTPException(404)
    if p["state"] != "failed":
        raise HTTPException(409, f"재개 불가 (state={p['state']})")
    failed_step = _db_module.get_last_failed_step(pid)
    if not failed_step:
        # 폴백: failed_reason "{step}: ..." prefix
        reason = p.get("failed_reason") or ""
        failed_step = reason.split(":", 1)[0].strip() or None
    if not failed_step:
        raise HTTPException(409, "실패 step을 판별할 수 없음")
    if failed_step == "publish" and p.get("youtube_video_id"):
        raise HTTPException(409, "이미 업로드됨 (중복 방지)")
    bg.add_task(orchestrator.run_step, pid, failed_step)
    return {"ok": True, "retrying_step": failed_step}
  • Step 4: 통과 확인 + 전체 회귀

Run: cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v → 모두 PASS Run: cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q → 회귀 0

  • Step 5: 커밋
git add music-lab/app/main.py music-lab/tests/test_pipeline_retry.py
git commit -m "feat(music-lab): POST /pipeline/{id}/retry — 실패 step 수동 재개

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"

Task 4: agent-office service_proxy — pipeline_retry + list_failed

Files: Modify agent-office/app/service_proxy.py

먼저 확인: list_active_pipelines가 호출하는 GET /api/music/pipeline?status=active가 failed를 포함하는지. 미포함이면 music-lab의 pipeline list 엔드포인트가 status=failed도 지원하는지 확인하고, 없으면 그 엔드포인트에 failed 필터를 추가(별도 작은 수정)하거나 status 화이트리스트에 'failed' 추가.

  • Step 1: 헬퍼 추가 — 기존 list_active_pipelines/post_pipeline_feedback 패턴(async with httpx.AsyncClient + MUSIC_LAB_URL) 그대로:
async def list_failed_pipelines() -> list[dict]:
    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=failed")
        resp.raise_for_status()
        data = resp.json()
        return data if isinstance(data, list) else data.get("items", data.get("pipelines", []))


async def pipeline_retry(pid: int) -> dict:
    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.post(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry")
        # 409(재개 불가/중복)도 본문 반환 위해 raise 안 함
        return {"status_code": resp.status_code, **(resp.json() if resp.headers.get("content-type","").startswith("application/json") else {})}

(list_active_pipelines가 이미 failed를 포함하면 list_failed_pipelines는 생략하고 Task 5에서 active 목록에서 state=='failed' 필터.)

  • Step 2: import sanitycd agent-office && PYTHONPATH=.. python -c "from app import service_proxy; print('OK')" → OK

  • Step 3: 커밋

git add agent-office/app/service_proxy.py
git commit -m "feat(agent-office): service_proxy pipeline_retry + list_failed_pipelines

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"

Task 5: youtube_publisher — failed 감지 + 텔레그램 알림/버튼

Files: Modify agent-office/app/agents/youtube_publisher.py; Test agent-office/tests/test_youtube_publisher_retry.py (Create)

  • Step 1: 실패 테스트 작성

agent-office/tests/test_youtube_publisher_retry.py (DB fixture는 agent-office conftest 관례 따름):

import pytest
from unittest.mock import AsyncMock
from app.agents.youtube_publisher import YoutubePublisherAgent


@pytest.mark.asyncio
async def test_failed_pipeline_notified_with_retry_button(monkeypatch):
    agent = YoutubePublisherAgent()
    monkeypatch.setattr(
        "app.agents.youtube_publisher.service_proxy.list_active_pipelines",
        AsyncMock(return_value=[
            {"id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T"}
        ]),
    )
    sent = AsyncMock(return_value={"ok": True, "message_id": 1})
    monkeypatch.setattr("app.agents.youtube_publisher.send_raw", sent)

    await agent.poll_state_changes()
    assert sent.await_count == 1
    args, kwargs = sent.await_args
    text = kwargs.get("text") or (args[0] if args else "")
    assert "실패" in text
    # 인라인 retry 버튼 callback_data
    rm = kwargs.get("reply_markup") or {}
    cb = rm["inline_keyboard"][0][0]["callback_data"]
    assert cb == "ytpub_retry_7"

    # 중복 방지: 같은 failed 재폴링 시 미발송
    await agent.poll_state_changes()
    assert sent.await_count == 1

(주의: send_rawreply_markup을 지원하는지 messaging 확인 — 미지원 시 Task에 messaging.send_raw에 reply_markup 인자 추가 포함. insta는 send_photo로 했으나 여기선 텍스트+버튼이므로 send_raw에 reply_markup 필요.)

  • Step 2: 실패 확인cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v → FAIL (failed 미처리)

  • Step 3: 구현poll_state_changes에 failed 분기 추가:

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._notified_state_per_pipeline: dict[int, tuple] = {}
        self._notified_failed: set[int] = set()

poll_state_changes 루프 내, *_pending 처리 뒤:

            if state == "failed" and pid not in self._notified_failed:
                await self._notify_failed(p)
                self._notified_failed.add(pid)
            if state != "failed":
                self._notified_failed.discard(pid)  # 재개 후 다시 실패하면 재알림

새 메서드:

    async def _notify_failed(self, p: dict) -> None:
        reason = p.get("failed_reason") or "?"
        step = reason.split(":", 1)[0].strip()
        title = p.get("track_title") or f"Pipeline #{p['id']}"
        text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}"
        kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]}
        await send_raw(text=text, reply_markup=kb)
        add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning")

send_rawreply_markup을 받도록 agent-office/app/telegram/messaging.pysend_raw 시그니처 확인/확장(이미 지원하면 그대로).

  • Step 4: 통과 확인cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v → PASS + 전체 회귀

  • Step 5: 커밋

git add agent-office/app/agents/youtube_publisher.py agent-office/app/telegram/messaging.py agent-office/tests/test_youtube_publisher_retry.py
git commit -m "feat(agent-office): youtube_publisher 파이프라인 실패 텔레그램 알림+재시도 버튼

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"

Task 6: webhook ytpub_retry 디스패치

Files: Modify agent-office/app/telegram/webhook.py; Test agent-office/tests/test_youtube_publisher_retry.py

먼저 확인: _handle_callback의 prefix 분기 구조 + 기존 핸들러(_handle_insta_issue 등)가 service_proxy를 호출/회신하는 패턴.

  • Step 1: 실패 테스트 추가
@pytest.mark.asyncio
async def test_handle_ytpub_retry_calls_proxy(monkeypatch):
    from app.telegram import webhook
    retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"})
    monkeypatch.setattr("app.telegram.webhook.service_proxy.pipeline_retry", retry, raising=False)
    monkeypatch.setattr("app.telegram.webhook.send_raw", AsyncMock(), raising=False)
    res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7")
    retry.assert_awaited_once_with(7)

(import 경로/send_raw 위치는 webhook.py 실제에 맞춤.)

  • Step 2: 실패 확인 → FAIL (_handle_ytpub_retry 미존재)

  • Step 3: 구현_handle_callback에 분기:

    if callback_id.startswith("ytpub_retry_"):
        return await _handle_ytpub_retry(callback_query, callback_id)

핸들러:

async def _handle_ytpub_retry(callback_query: dict, callback_id: str) -> dict:
    try:
        pid = int(callback_id.removeprefix("ytpub_retry_"))
    except (ValueError, AttributeError):
        return {"ok": False, "error": "invalid_callback_data"}
    res = await service_proxy.pipeline_retry(pid)
    sc = res.get("status_code")
    if sc in (200, 202):
        await send_raw(text=f"🔄 파이프라인 #{pid} 재개: {res.get('retrying_step','?')}")
    else:
        await send_raw(text=f"⚠️ 재개 불가 (#{pid}): {res.get('detail', sc)}")
    return {"ok": True}

(service_proxy/send_raw import는 webhook.py 기존 방식 따름.)

  • Step 4: 통과 확인 + 전체 agent-office 회귀

  • Step 5: 커밋

git add agent-office/app/telegram/webhook.py agent-office/tests/test_youtube_publisher_retry.py
git commit -m "feat(agent-office): ytpub_retry 텔레그램 콜백 → music-lab retry 프록시

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"

Task 7: 문서 + 배포 + 메모리

Files: Modify web-backend/CLAUDE.md, memory/service_music.md

  • Step 1: CLAUDE.md music API 표에 추가
| POST | `/api/music/pipeline/{id}/retry` | 실패 파이프라인 실패 step부터 재개 (publish+업로드완료 시 409) |
  • Step 2: 전체 회귀
cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q
cd ../agent-office && PYTHONPATH=.. python -m pytest tests/ -q

Expected: 모두 PASS (사전존재 stale 제외).

  • Step 3: 커밋 + push (NAS 배포)
cd C:/Users/jaeoh/Desktop/workspace/web-backend
git add CLAUDE.md docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
git commit -m "docs(music): 파이프라인 retry API 문서 + 구현 계획

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
git push origin main
  • Step 4: 메모리 갱신service_music.md에 신뢰성/복구(자동 재시도 publish 제외 + 수동 retry 엔드포인트 + youtube_publisher 실패 알림) 추가.

  • Step 5: 프로덕션 확인(경량) — 배포 후 POST /api/music/pipeline/<없는id>/retry → 404, 실제 failed 파이프라인 있으면 retry 동작. (없으면 단위 테스트로 갈음.)


Self-Review

Spec coverage:

  • 자동 재시도(publish 제외, _resolve_input 제외) → Task 2 ✓
  • 수동 재개(실패 step, publish+video_id 가드) → Task 1(step 판별)+Task 3 ✓
  • 실패 알림 + [🔄재시도] → Task 5 ✓
  • 재시도 콜백 → Task 4(proxy)+Task 6(dispatch) ✓
  • stuck 감지 제외(YAGNI) → 계획에 없음 ✓

Placeholder scan: 코드 스텝 모두 구체. "conftest 관례 확인"·"list_active가 failed 포함하는지 확인"은 기존 코드 소유를 존중하는 의도적 검증 지시(placeholder 아님).

Type consistency: get_last_failed_step(pid) Task1↔Task3 일치. _dispatch_step(step,p,ctx,feedback) Task2 정의↔테스트 mock 일치. run_step(pid, step) 시그니처 기존 일치. callback ytpub_retry_{pid} Task5 생성↔Task6 파싱 일치. pipeline_retry(pid) Task4↔Task6 일치. retry 응답 retrying_step/status_code Task3↔Task4↔Task6 일치.