docs(plan): music 파이프라인 신뢰성·복구 구현 계획 (7 tasks, TDD)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
556
docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
Normal file
556
docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
Normal file
@@ -0,0 +1,556 @@
|
||||
# music/YouTube 파이프라인 신뢰성·복구 Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** 파이프라인 step 실패를 자동 재시도(일시적, publish 제외)로 흡수하고, 영구 실패는 terminal `failed`로 둔 뒤 실패 step부터 수동 재개(텔레그램 [🔄재시도])할 수 있게 한다.
|
||||
|
||||
**Architecture:** music-lab `orchestrator.run_step`에 bounded 재시도 루프 + `POST /pipeline/{id}/retry` 재개 엔드포인트 + `db.get_last_failed_step`. agent-office `youtube_publisher`가 `failed` 감지 → 텔레그램 알림+버튼, `webhook`이 `ytpub_retry_{pid}` 콜백을 music-lab retry로 프록시.
|
||||
|
||||
**Tech Stack:** Python 3.12 / FastAPI / SQLite / asyncio / pytest. 기존 패턴: `orchestrator.run_step`(BackgroundTask), `main.py` pipeline 엔드포인트(404/409 + `_db_module`), `service_proxy`(httpx + `MUSIC_LAB_URL`), `telegram/webhook.py`(callback prefix 디스패치).
|
||||
|
||||
**Spec:** `docs/superpowers/specs/2026-06-12-music-pipeline-reliability-design.md`
|
||||
|
||||
> **테스트 fixture 주의**: music-lab/agent-office 각 `tests/conftest.py`의 DB 격리 방식(`db.DB_PATH` monkeypatch + `init_db`)을 먼저 확인하고 아래 테스트의 fixture를 그 관례에 맞춰라. 아래 코드는 `db.DB_PATH`를 tmp로 monkeypatch하는 표준 패턴을 가정한다.
|
||||
|
||||
---
|
||||
|
||||
## File Structure
|
||||
|
||||
| 파일 | 변경 | 책임 |
|
||||
|------|------|------|
|
||||
| `music-lab/app/db.py` | Modify | `get_last_failed_step(pid)` 추가 |
|
||||
| `music-lab/app/pipeline/orchestrator.py` | Modify | `_dispatch_step` 추출 + `run_step` 재시도 루프 |
|
||||
| `music-lab/app/main.py` | Modify | `POST /api/music/pipeline/{pid}/retry` |
|
||||
| `music-lab/tests/test_pipeline_retry.py` | Create | db + orchestrator + endpoint 테스트 |
|
||||
| `agent-office/app/service_proxy.py` | Modify | `pipeline_retry(pid)`, `list_failed_pipelines()` |
|
||||
| `agent-office/app/agents/youtube_publisher.py` | Modify | `failed` 감지 → 텔레그램 알림+버튼 |
|
||||
| `agent-office/app/telegram/webhook.py` | Modify | `ytpub_retry_` 디스패치 |
|
||||
| `agent-office/tests/test_youtube_publisher_retry.py` | Create | 알림 + 콜백 테스트 |
|
||||
| `web-backend/CLAUDE.md` + `memory/service_music.md` | Modify | API 표 + 메모리 |
|
||||
|
||||
---
|
||||
|
||||
## Task 1: music-lab db — `get_last_failed_step`
|
||||
|
||||
**Files:** Modify `music-lab/app/db.py`; Test `music-lab/tests/test_pipeline_retry.py` (Create)
|
||||
|
||||
- [ ] **Step 1: 실패 테스트 작성**
|
||||
|
||||
`music-lab/tests/test_pipeline_retry.py` (fixture는 music-lab conftest 관례에 맞춰 조정):
|
||||
```python
|
||||
import pytest
|
||||
from app import db
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _tmp_db(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
|
||||
db.init_db()
|
||||
|
||||
|
||||
def _make_pipeline_with_failed_step(step: str) -> int:
|
||||
pid = db.create_pipeline(track_id=1) # 시그니처는 conftest/db 확인 후 맞출 것
|
||||
job = db.create_pipeline_job(pid, step)
|
||||
db.update_pipeline_job(job, status="failed", error="boom")
|
||||
db.update_pipeline_state(pid, "failed", failed_reason=f"{step}: boom")
|
||||
return pid
|
||||
|
||||
|
||||
def test_get_last_failed_step_returns_step():
|
||||
pid = _make_pipeline_with_failed_step("video")
|
||||
assert db.get_last_failed_step(pid) == "video"
|
||||
|
||||
|
||||
def test_get_last_failed_step_none_when_no_failure():
|
||||
pid = db.create_pipeline(track_id=1)
|
||||
db.create_pipeline_job(pid, "cover") # status 기본(running/succeeded), failed 아님
|
||||
assert db.get_last_failed_step(pid) is None
|
||||
```
|
||||
|
||||
- [ ] **Step 2: 실패 확인**
|
||||
|
||||
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py::test_get_last_failed_step_returns_step -v`
|
||||
Expected: FAIL — `db.get_last_failed_step` 미존재. (create_pipeline 시그니처가 다르면 helper를 db의 실제 생성 함수에 맞춰 수정.)
|
||||
|
||||
- [ ] **Step 3: 구현**
|
||||
|
||||
`music-lab/app/db.py`의 pipeline_jobs 섹션(`list_pipeline_jobs` 근처)에 추가:
|
||||
```python
|
||||
def get_last_failed_step(pid: int) -> Optional[str]:
|
||||
"""파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None."""
|
||||
with _connect() as conn: # music-lab의 커넥션 헬퍼 이름에 맞출 것
|
||||
row = conn.execute(
|
||||
"SELECT step FROM pipeline_jobs "
|
||||
"WHERE pipeline_id = ? AND status = 'failed' "
|
||||
"ORDER BY id DESC LIMIT 1",
|
||||
(pid,),
|
||||
).fetchone()
|
||||
return row["step"] if row else None
|
||||
```
|
||||
(`_connect`/`_conn` 등 실제 커넥션 컨텍스트매니저 이름은 db.py 상단 확인 후 일치시킬 것.)
|
||||
|
||||
- [ ] **Step 4: 통과 확인**
|
||||
|
||||
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k get_last_failed`
|
||||
Expected: 2 PASS.
|
||||
|
||||
- [ ] **Step 5: 커밋**
|
||||
```bash
|
||||
git add music-lab/app/db.py music-lab/tests/test_pipeline_retry.py
|
||||
git commit -m "feat(music-lab): get_last_failed_step — 파이프라인 재개용 실패 step 판별
|
||||
|
||||
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: orchestrator 자동 재시도
|
||||
|
||||
**Files:** Modify `music-lab/app/pipeline/orchestrator.py`; Test `music-lab/tests/test_pipeline_retry.py`
|
||||
|
||||
- [ ] **Step 1: 실패 테스트 작성** (test_pipeline_retry.py에 추가)
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from app.pipeline import orchestrator
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _no_backoff(monkeypatch):
|
||||
monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retryable_step_retries_then_succeeds(monkeypatch):
|
||||
pid = db.create_pipeline(track_id=1)
|
||||
calls = {"n": 0}
|
||||
|
||||
async def flaky(step, p, ctx, feedback):
|
||||
calls["n"] += 1
|
||||
if calls["n"] < 3:
|
||||
raise RuntimeError("transient")
|
||||
return {"next_state": "video_pending", "fields": {}}
|
||||
|
||||
monkeypatch.setattr(orchestrator, "_dispatch_step", flaky)
|
||||
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
|
||||
|
||||
await orchestrator.run_step(pid, "cover")
|
||||
assert calls["n"] == 3
|
||||
assert db.get_pipeline(pid)["state"] == "video_pending"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retryable_step_exhausts_to_failed(monkeypatch):
|
||||
pid = db.create_pipeline(track_id=1)
|
||||
|
||||
async def always_fail(step, p, ctx, feedback):
|
||||
raise RuntimeError("permanent")
|
||||
|
||||
monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail)
|
||||
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
|
||||
|
||||
await orchestrator.run_step(pid, "cover")
|
||||
assert db.get_pipeline(pid)["state"] == "failed"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_not_retried(monkeypatch):
|
||||
pid = db.create_pipeline(track_id=1)
|
||||
calls = {"n": 0}
|
||||
|
||||
async def fail_publish(step, p, ctx, feedback):
|
||||
calls["n"] += 1
|
||||
raise RuntimeError("upload error")
|
||||
|
||||
monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish)
|
||||
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
|
||||
|
||||
await orchestrator.run_step(pid, "publish")
|
||||
assert calls["n"] == 1 # 재시도 없음
|
||||
assert db.get_pipeline(pid)["state"] == "failed"
|
||||
```
|
||||
|
||||
- [ ] **Step 2: 실패 확인**
|
||||
|
||||
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"`
|
||||
Expected: FAIL — `_dispatch_step`/`STEP_RETRY_BACKOFF_SEC` 미존재.
|
||||
|
||||
- [ ] **Step 3: 구현 — `_dispatch_step` 추출 + 재시도 루프**
|
||||
|
||||
`orchestrator.py` 상단 상수 추가:
|
||||
```python
|
||||
STEP_MAX_RETRIES = 2 # 추가 재시도 횟수 (총 시도 = +1)
|
||||
STEP_RETRY_BACKOFF_SEC = [5, 15]
|
||||
NON_RETRY_STEPS = {"publish"}
|
||||
```
|
||||
|
||||
기존 if/elif 분기(현재 `run_step` 내 lines 32-45)를 헬퍼로 추출:
|
||||
```python
|
||||
async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict:
|
||||
if step == "cover":
|
||||
return await _run_cover(p, ctx, feedback)
|
||||
if step == "video":
|
||||
return await _run_video(p, ctx)
|
||||
if step == "thumb":
|
||||
return await _run_thumb(p, ctx, feedback)
|
||||
if step == "meta":
|
||||
return await _run_meta(p, ctx, feedback)
|
||||
if step == "review":
|
||||
return await _run_review(p, ctx)
|
||||
if step == "publish":
|
||||
return await _run_publish(p, ctx)
|
||||
raise ValueError(f"unknown step: {step}")
|
||||
```
|
||||
|
||||
`run_step`의 try 블록(step 실행부)을 재시도 루프로 교체:
|
||||
```python
|
||||
try:
|
||||
ctx = _resolve_input(p)
|
||||
except ValueError as e:
|
||||
db.update_pipeline_job(job_id, status="failed", error=str(e))
|
||||
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
|
||||
return
|
||||
|
||||
attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
|
||||
last_err = None
|
||||
for i in range(attempts):
|
||||
try:
|
||||
result = await _dispatch_step(step, p, ctx, feedback)
|
||||
db.update_pipeline_job(job_id, status="succeeded")
|
||||
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
|
||||
return
|
||||
except Exception as e:
|
||||
last_err = e
|
||||
logger.exception("step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts)
|
||||
if i < attempts - 1:
|
||||
await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)])
|
||||
db.update_pipeline_job(job_id, status="failed", error=str(last_err))
|
||||
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")
|
||||
```
|
||||
(`asyncio`는 이미 import됨.)
|
||||
|
||||
- [ ] **Step 4: 통과 확인**
|
||||
|
||||
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"`
|
||||
Expected: 3 PASS.
|
||||
|
||||
- [ ] **Step 5: 커밋**
|
||||
```bash
|
||||
git add music-lab/app/pipeline/orchestrator.py music-lab/tests/test_pipeline_retry.py
|
||||
git commit -m "feat(music-lab): orchestrator step 자동 재시도 (publish 제외)
|
||||
|
||||
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: retry 엔드포인트
|
||||
|
||||
**Files:** Modify `music-lab/app/main.py`; Test `music-lab/tests/test_pipeline_retry.py`
|
||||
|
||||
- [ ] **Step 1: 실패 테스트 작성**
|
||||
|
||||
```python
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(monkeypatch):
|
||||
from app.main import app
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
def test_retry_failed_pipeline_retriggers(client, monkeypatch):
|
||||
pid = db.create_pipeline(track_id=1)
|
||||
job = db.create_pipeline_job(pid, "video")
|
||||
db.update_pipeline_job(job, status="failed", error="boom")
|
||||
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||
|
||||
called = {}
|
||||
from app.pipeline import orchestrator
|
||||
async def fake_run(p, step, *a):
|
||||
called["pid"], called["step"] = p, step
|
||||
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
||||
|
||||
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||
assert r.status_code in (200, 202)
|
||||
assert r.json()["retrying_step"] == "video"
|
||||
|
||||
|
||||
def test_retry_non_failed_409(client):
|
||||
pid = db.create_pipeline(track_id=1) # state='created'
|
||||
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||
assert r.status_code == 409
|
||||
|
||||
|
||||
def test_retry_publish_with_video_id_rejected(client):
|
||||
pid = db.create_pipeline(track_id=1)
|
||||
job = db.create_pipeline_job(pid, "publish")
|
||||
db.update_pipeline_job(job, status="failed", error="x")
|
||||
db.update_pipeline_state(pid, "failed", failed_reason="publish: x", youtube_video_id="abc123")
|
||||
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||
assert r.status_code == 409
|
||||
```
|
||||
|
||||
- [ ] **Step 2: 실패 확인**
|
||||
|
||||
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k retry_`
|
||||
Expected: FAIL — 라우트 404.
|
||||
|
||||
- [ ] **Step 3: 구현**
|
||||
|
||||
`music-lab/app/main.py`의 `cancel_pipeline` 아래에 추가:
|
||||
```python
|
||||
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
|
||||
async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
||||
p = _db_module.get_pipeline(pid)
|
||||
if not p:
|
||||
raise HTTPException(404)
|
||||
if p["state"] != "failed":
|
||||
raise HTTPException(409, f"재개 불가 (state={p['state']})")
|
||||
failed_step = _db_module.get_last_failed_step(pid)
|
||||
if not failed_step:
|
||||
# 폴백: failed_reason "{step}: ..." prefix
|
||||
reason = p.get("failed_reason") or ""
|
||||
failed_step = reason.split(":", 1)[0].strip() or None
|
||||
if not failed_step:
|
||||
raise HTTPException(409, "실패 step을 판별할 수 없음")
|
||||
if failed_step == "publish" and p.get("youtube_video_id"):
|
||||
raise HTTPException(409, "이미 업로드됨 (중복 방지)")
|
||||
bg.add_task(orchestrator.run_step, pid, failed_step)
|
||||
return {"ok": True, "retrying_step": failed_step}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: 통과 확인 + 전체 회귀**
|
||||
|
||||
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v` → 모두 PASS
|
||||
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q` → 회귀 0
|
||||
|
||||
- [ ] **Step 5: 커밋**
|
||||
```bash
|
||||
git add music-lab/app/main.py music-lab/tests/test_pipeline_retry.py
|
||||
git commit -m "feat(music-lab): POST /pipeline/{id}/retry — 실패 step 수동 재개
|
||||
|
||||
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: agent-office service_proxy — pipeline_retry + list_failed
|
||||
|
||||
**Files:** Modify `agent-office/app/service_proxy.py`
|
||||
|
||||
> **먼저 확인**: `list_active_pipelines`가 호출하는 `GET /api/music/pipeline?status=active`가 failed를 포함하는지. 미포함이면 music-lab의 pipeline list 엔드포인트가 `status=failed`도 지원하는지 확인하고, 없으면 그 엔드포인트에 failed 필터를 추가(별도 작은 수정)하거나 `status` 화이트리스트에 'failed' 추가.
|
||||
|
||||
- [ ] **Step 1: 헬퍼 추가** — 기존 `list_active_pipelines`/`post_pipeline_feedback` 패턴(async with httpx.AsyncClient + MUSIC_LAB_URL) 그대로:
|
||||
```python
|
||||
async def list_failed_pipelines() -> list[dict]:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=failed")
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return data if isinstance(data, list) else data.get("items", data.get("pipelines", []))
|
||||
|
||||
|
||||
async def pipeline_retry(pid: int) -> dict:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.post(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry")
|
||||
# 409(재개 불가/중복)도 본문 반환 위해 raise 안 함
|
||||
return {"status_code": resp.status_code, **(resp.json() if resp.headers.get("content-type","").startswith("application/json") else {})}
|
||||
```
|
||||
(`list_active_pipelines`가 이미 failed를 포함하면 `list_failed_pipelines`는 생략하고 Task 5에서 active 목록에서 state=='failed' 필터.)
|
||||
|
||||
- [ ] **Step 2: import sanity** — `cd agent-office && PYTHONPATH=.. python -c "from app import service_proxy; print('OK')"` → OK
|
||||
|
||||
- [ ] **Step 3: 커밋**
|
||||
```bash
|
||||
git add agent-office/app/service_proxy.py
|
||||
git commit -m "feat(agent-office): service_proxy pipeline_retry + list_failed_pipelines
|
||||
|
||||
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5: youtube_publisher — failed 감지 + 텔레그램 알림/버튼
|
||||
|
||||
**Files:** Modify `agent-office/app/agents/youtube_publisher.py`; Test `agent-office/tests/test_youtube_publisher_retry.py` (Create)
|
||||
|
||||
- [ ] **Step 1: 실패 테스트 작성**
|
||||
|
||||
`agent-office/tests/test_youtube_publisher_retry.py` (DB fixture는 agent-office conftest 관례 따름):
|
||||
```python
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock
|
||||
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_failed_pipeline_notified_with_retry_button(monkeypatch):
|
||||
agent = YoutubePublisherAgent()
|
||||
monkeypatch.setattr(
|
||||
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||
AsyncMock(return_value=[
|
||||
{"id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T"}
|
||||
]),
|
||||
)
|
||||
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||
monkeypatch.setattr("app.agents.youtube_publisher.send_raw", sent)
|
||||
|
||||
await agent.poll_state_changes()
|
||||
assert sent.await_count == 1
|
||||
args, kwargs = sent.await_args
|
||||
text = kwargs.get("text") or (args[0] if args else "")
|
||||
assert "실패" in text
|
||||
# 인라인 retry 버튼 callback_data
|
||||
rm = kwargs.get("reply_markup") or {}
|
||||
cb = rm["inline_keyboard"][0][0]["callback_data"]
|
||||
assert cb == "ytpub_retry_7"
|
||||
|
||||
# 중복 방지: 같은 failed 재폴링 시 미발송
|
||||
await agent.poll_state_changes()
|
||||
assert sent.await_count == 1
|
||||
```
|
||||
(주의: `send_raw`가 `reply_markup`을 지원하는지 messaging 확인 — 미지원 시 Task에 messaging.send_raw에 reply_markup 인자 추가 포함. insta는 send_photo로 했으나 여기선 텍스트+버튼이므로 send_raw에 reply_markup 필요.)
|
||||
|
||||
- [ ] **Step 2: 실패 확인** — `cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → FAIL (failed 미처리)
|
||||
|
||||
- [ ] **Step 3: 구현** — `poll_state_changes`에 failed 분기 추가:
|
||||
```python
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
||||
self._notified_failed: set[int] = set()
|
||||
```
|
||||
`poll_state_changes` 루프 내, `*_pending` 처리 뒤:
|
||||
```python
|
||||
if state == "failed" and pid not in self._notified_failed:
|
||||
await self._notify_failed(p)
|
||||
self._notified_failed.add(pid)
|
||||
if state != "failed":
|
||||
self._notified_failed.discard(pid) # 재개 후 다시 실패하면 재알림
|
||||
```
|
||||
새 메서드:
|
||||
```python
|
||||
async def _notify_failed(self, p: dict) -> None:
|
||||
reason = p.get("failed_reason") or "?"
|
||||
step = reason.split(":", 1)[0].strip()
|
||||
title = p.get("track_title") or f"Pipeline #{p['id']}"
|
||||
text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}"
|
||||
kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]}
|
||||
await send_raw(text=text, reply_markup=kb)
|
||||
add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning")
|
||||
```
|
||||
`send_raw`가 `reply_markup`을 받도록 `agent-office/app/telegram/messaging.py`의 `send_raw` 시그니처 확인/확장(이미 지원하면 그대로).
|
||||
|
||||
- [ ] **Step 4: 통과 확인** — `cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → PASS + 전체 회귀
|
||||
|
||||
- [ ] **Step 5: 커밋**
|
||||
```bash
|
||||
git add agent-office/app/agents/youtube_publisher.py agent-office/app/telegram/messaging.py agent-office/tests/test_youtube_publisher_retry.py
|
||||
git commit -m "feat(agent-office): youtube_publisher 파이프라인 실패 텔레그램 알림+재시도 버튼
|
||||
|
||||
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 6: webhook ytpub_retry 디스패치
|
||||
|
||||
**Files:** Modify `agent-office/app/telegram/webhook.py`; Test `agent-office/tests/test_youtube_publisher_retry.py`
|
||||
|
||||
> **먼저 확인**: `_handle_callback`의 prefix 분기 구조 + 기존 핸들러(`_handle_insta_issue` 등)가 service_proxy를 호출/회신하는 패턴.
|
||||
|
||||
- [ ] **Step 1: 실패 테스트 추가**
|
||||
```python
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_ytpub_retry_calls_proxy(monkeypatch):
|
||||
from app.telegram import webhook
|
||||
retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"})
|
||||
monkeypatch.setattr("app.telegram.webhook.service_proxy.pipeline_retry", retry, raising=False)
|
||||
monkeypatch.setattr("app.telegram.webhook.send_raw", AsyncMock(), raising=False)
|
||||
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7")
|
||||
retry.assert_awaited_once_with(7)
|
||||
```
|
||||
(import 경로/`send_raw` 위치는 webhook.py 실제에 맞춤.)
|
||||
|
||||
- [ ] **Step 2: 실패 확인** → FAIL (`_handle_ytpub_retry` 미존재)
|
||||
|
||||
- [ ] **Step 3: 구현** — `_handle_callback`에 분기:
|
||||
```python
|
||||
if callback_id.startswith("ytpub_retry_"):
|
||||
return await _handle_ytpub_retry(callback_query, callback_id)
|
||||
```
|
||||
핸들러:
|
||||
```python
|
||||
async def _handle_ytpub_retry(callback_query: dict, callback_id: str) -> dict:
|
||||
try:
|
||||
pid = int(callback_id.removeprefix("ytpub_retry_"))
|
||||
except (ValueError, AttributeError):
|
||||
return {"ok": False, "error": "invalid_callback_data"}
|
||||
res = await service_proxy.pipeline_retry(pid)
|
||||
sc = res.get("status_code")
|
||||
if sc in (200, 202):
|
||||
await send_raw(text=f"🔄 파이프라인 #{pid} 재개: {res.get('retrying_step','?')}")
|
||||
else:
|
||||
await send_raw(text=f"⚠️ 재개 불가 (#{pid}): {res.get('detail', sc)}")
|
||||
return {"ok": True}
|
||||
```
|
||||
(`service_proxy`/`send_raw` import는 webhook.py 기존 방식 따름.)
|
||||
|
||||
- [ ] **Step 4: 통과 확인** + 전체 agent-office 회귀
|
||||
|
||||
- [ ] **Step 5: 커밋**
|
||||
```bash
|
||||
git add agent-office/app/telegram/webhook.py agent-office/tests/test_youtube_publisher_retry.py
|
||||
git commit -m "feat(agent-office): ytpub_retry 텔레그램 콜백 → music-lab retry 프록시
|
||||
|
||||
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 7: 문서 + 배포 + 메모리
|
||||
|
||||
**Files:** Modify `web-backend/CLAUDE.md`, `memory/service_music.md`
|
||||
|
||||
- [ ] **Step 1: CLAUDE.md music API 표에 추가**
|
||||
```
|
||||
| POST | `/api/music/pipeline/{id}/retry` | 실패 파이프라인 실패 step부터 재개 (publish+업로드완료 시 409) |
|
||||
```
|
||||
|
||||
- [ ] **Step 2: 전체 회귀**
|
||||
```bash
|
||||
cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q
|
||||
cd ../agent-office && PYTHONPATH=.. python -m pytest tests/ -q
|
||||
```
|
||||
Expected: 모두 PASS (사전존재 stale 제외).
|
||||
|
||||
- [ ] **Step 3: 커밋 + push (NAS 배포)**
|
||||
```bash
|
||||
cd C:/Users/jaeoh/Desktop/workspace/web-backend
|
||||
git add CLAUDE.md docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
|
||||
git commit -m "docs(music): 파이프라인 retry API 문서 + 구현 계획
|
||||
|
||||
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||
git push origin main
|
||||
```
|
||||
|
||||
- [ ] **Step 4: 메모리 갱신** — `service_music.md`에 신뢰성/복구(자동 재시도 publish 제외 + 수동 retry 엔드포인트 + youtube_publisher 실패 알림) 추가.
|
||||
|
||||
- [ ] **Step 5: 프로덕션 확인(경량)** — 배포 후 `POST /api/music/pipeline/<없는id>/retry` → 404, 실제 failed 파이프라인 있으면 retry 동작. (없으면 단위 테스트로 갈음.)
|
||||
|
||||
---
|
||||
|
||||
## Self-Review
|
||||
|
||||
**Spec coverage:**
|
||||
- 자동 재시도(publish 제외, _resolve_input 제외) → Task 2 ✓
|
||||
- 수동 재개(실패 step, publish+video_id 가드) → Task 1(step 판별)+Task 3 ✓
|
||||
- 실패 알림 + [🔄재시도] → Task 5 ✓
|
||||
- 재시도 콜백 → Task 4(proxy)+Task 6(dispatch) ✓
|
||||
- stuck 감지 제외(YAGNI) → 계획에 없음 ✓
|
||||
|
||||
**Placeholder scan:** 코드 스텝 모두 구체. "conftest 관례 확인"·"list_active가 failed 포함하는지 확인"은 기존 코드 소유를 존중하는 의도적 검증 지시(placeholder 아님).
|
||||
|
||||
**Type consistency:** `get_last_failed_step(pid)` Task1↔Task3 일치. `_dispatch_step(step,p,ctx,feedback)` Task2 정의↔테스트 mock 일치. `run_step(pid, step)` 시그니처 기존 일치. callback `ytpub_retry_{pid}` Task5 생성↔Task6 파싱 일치. `pipeline_retry(pid)` Task4↔Task6 일치. retry 응답 `retrying_step`/`status_code` Task3↔Task4↔Task6 일치.
|
||||
Reference in New Issue
Block a user