11 Commits

Author SHA1 Message Date
796ac6d39f test(agent-office): test_init_and_seed stale 단언 수정 (고정 개수→subset)
에이전트 레지스트리가 2→7로 늘어 len==2/{stock,music} 고정 단언이 stale였음. 핵심 시드 subset 검증으로 변경(레지스트리 확장에 견고). 이번 세션 audit에서 반복 플래그된 부채.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:48:58 +09:00
18cea427be docs(music): 파이프라인 retry 엔드포인트 문서화
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:46:04 +09:00
6c178006d3 feat(agent-office): ytpub_retry 텔레그램 콜백 → music-lab retry 프록시
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:39:31 +09:00
084e4f1b4d feat(agent-office): youtube_publisher 파이프라인 실패 텔레그램 알림+재시도 버튼
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:36:38 +09:00
d048251a97 feat(agent-office): service_proxy pipeline_retry/list_failed_pipelines (+ music-lab status=failed 필터)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:33:28 +09:00
ef1a7a92fd fix(music-lab): retry 레이스 가드(retrying 전이) + failed_step 검증 + backoff 빈리스트 가드
- Fix 1: retry_pipeline이 bg.add_task 직전 상태를 'retrying'으로 전이 → 동시 retry 409 방지
- Fix 2: test_retry_failed_pipeline_retriggers에 called[pid/step] assert 추가
- Fix 3: failed_step이 STEPS에 없으면 409 (엉뚱한 prefix 방지)
- Fix 4: STEP_RETRY_BACKOFF_SEC 빈 리스트 시 IndexError → 0으로 폴백

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:31:19 +09:00
44dbe7c426 feat(music-lab): POST /pipeline/{id}/retry — 실패 step 수동 재개
terminal failed 파이프라인을 마지막 실패 step부터 재개.
publish + youtube_video_id 있으면 중복 업로드 방지 409.
pytest.ini에 pythonpath=.. 추가 (PYTHONPATH=.. 없이 TestClient 테스트 구동).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:23:24 +09:00
e90e25d78f feat(music-lab): orchestrator step 자동 재시도 (publish 제외)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:20:29 +09:00
d638666659 feat(music-lab): get_last_failed_step — 파이프라인 재개용 실패 step 판별
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:18:07 +09:00
51eff1538e docs(plan): music 파이프라인 신뢰성·복구 구현 계획 (7 tasks, TDD)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:12:33 +09:00
ffb96de61d docs(spec): music/YouTube 파이프라인 신뢰성·복구 설계
step 자동 재시도(publish 제외) + terminal failed의 실패 step 수동 재개(텔레그램 [재시도]). orchestrator + retry 엔드포인트 + youtube_publisher 실패 알림.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:08:01 +09:00
15 changed files with 1238 additions and 28 deletions

View File

@@ -265,7 +265,7 @@ docker compose up -d
| POST/GET | `/api/music/generate-batch` | 배치 생성 | | POST/GET | `/api/music/generate-batch` | 배치 생성 |
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 | | POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 | | POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/telegram-msg/lookup) | YouTube 자동화 파이프라인 | | ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) |
| GET/PUT | `/api/music/setup` | 파이프라인 설정 | | GET/PUT | `/api/music/setup` | 파이프라인 설정 |
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth | | GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 | | GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |

View File

@@ -26,6 +26,7 @@ class YoutubePublisherAgent(BaseAgent):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._notified_state_per_pipeline: dict[int, tuple] = {} self._notified_state_per_pipeline: dict[int, tuple] = {}
self._notified_failed: set[int] = set()
async def poll_state_changes(self) -> None: async def poll_state_changes(self) -> None:
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
@@ -48,6 +49,32 @@ class YoutubePublisherAgent(BaseAgent):
await self._notify_step(p) await self._notify_step(p)
self._notified_state_per_pipeline[pid] = key self._notified_state_per_pipeline[pid] = key
try:
failed = await service_proxy.list_failed_pipelines()
except Exception as e:
logger.warning("failed 폴링 실패: %s", e)
failed = []
for p in failed:
pid = p.get("id")
if pid is None:
continue
if pid not in self._notified_failed:
await self._notify_failed(p)
self._notified_failed.add(pid)
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제
failed_ids = {p.get("id") for p in failed}
self._notified_failed &= failed_ids
async def _notify_failed(self, p: dict) -> None:
reason = p.get("failed_reason") or "?"
step = reason.split(":", 1)[0].strip()
title = p.get("track_title") or f"Pipeline #{p['id']}"
text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}"
kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]}
sent = await send_raw(text=text, reply_markup=kb)
if sent.get("ok"):
add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning")
async def _notify_step(self, pipeline: dict) -> None: async def _notify_step(self, pipeline: dict) -> None:
state = pipeline["state"] state = pipeline["state"]
title_name, step = _STEP_TITLES[state] title_name, step = _STEP_TITLES[state]

View File

@@ -352,6 +352,25 @@ async def list_active_pipelines() -> list[dict]:
return resp.json().get("pipelines", []) return resp.json().get("pipelines", [])
async def list_failed_pipelines() -> list[dict]:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=failed")
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else data.get("items", data.get("pipelines", []))
async def pipeline_retry(pid: int) -> dict:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry")
out = {"status_code": resp.status_code}
try:
out.update(resp.json())
except Exception:
pass
return out
async def get_pipeline(pid: int) -> dict: async def get_pipeline(pid: int) -> dict:
async with httpx.AsyncClient(timeout=15) as client: async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}") resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}")

View File

@@ -43,6 +43,9 @@ async def _handle_callback(callback_query: dict) -> Optional[dict]:
if callback_id.startswith("issue_"): if callback_id.startswith("issue_"):
return await _handle_insta_issue(callback_query, callback_id) return await _handle_insta_issue(callback_query, callback_id)
if callback_id.startswith("ytpub_retry_"):
return await _handle_ytpub_retry(callback_query, callback_id)
cb = get_telegram_callback(callback_id) cb = get_telegram_callback(callback_id)
if not cb: if not cb:
return None return None
@@ -169,6 +172,30 @@ async def _handle_insta_issue(callback_query: dict, callback_id: str) -> dict:
return {"ok": False, "error": str(e)} return {"ok": False, "error": str(e)}
async def _handle_ytpub_retry(callback_query: dict, callback_id: str) -> dict:
"""ytpub_retry_{pipeline_id} 콜백 → music-lab pipeline retry 프록시."""
from .. import service_proxy
from .messaging import send_raw
await api_call(
"answerCallbackQuery",
{"callback_query_id": callback_query["id"], "text": "재시도 요청 중..."},
)
try:
pid = int(callback_id.removeprefix("ytpub_retry_"))
except (ValueError, AttributeError):
return {"ok": False, "error": "invalid_callback_data"}
res = await service_proxy.pipeline_retry(pid)
sc = res.get("status_code")
if sc in (200, 202):
await send_raw(text=f"🔄 파이프라인 #{pid} 재개: {res.get('retrying_step', '?')}")
else:
await send_raw(text=f"⚠️ 재개 불가 (#{pid}): {res.get('detail', sc)}")
return {"ok": True}
async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]: async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]:
"""슬래시 명령 메시지 처리.""" """슬래시 명령 메시지 처리."""
from .router import parse_command, resolve_agent_command, HELP_TEXT from .router import parse_command, resolve_agent_command, HELP_TEXT

View File

@@ -18,9 +18,11 @@ from app.db import (
def test_init_and_seed(): def test_init_and_seed():
init_db() init_db()
agents = get_all_agents() agents = get_all_agents()
assert len(agents) == 2, f"Expected 2 agents, got {len(agents)}"
ids = {a["agent_id"] for a in agents} ids = {a["agent_id"] for a in agents}
assert ids == {"stock", "music"}, f"Unexpected agent ids: {ids}" # 시드된 핵심 에이전트 존재 검증 — 레지스트리 확장(insta/lotto/realestate/youtube 등)에 견고하도록
# 고정 개수/집합이 아닌 subset으로 단언 (이전 len==2/{stock,music} 고정 단언은 stale였음).
assert {"stock", "music"} <= ids, f"core agents missing: {ids}"
assert len(agents) >= 2
print(" [PASS] test_init_and_seed") print(" [PASS] test_init_and_seed")

View File

@@ -40,6 +40,9 @@ async def test_poll_notifies_once_per_state():
with patch( with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines", "app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=pipelines), new=AsyncMock(return_value=pipelines),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[]),
), patch( ), patch(
"app.agents.youtube_publisher.send_raw", "app.agents.youtube_publisher.send_raw",
new=AsyncMock(return_value={"ok": True, "message_id": 99}), new=AsyncMock(return_value={"ok": True, "message_id": 99}),
@@ -63,6 +66,8 @@ async def test_poll_renotifies_on_reject_regen(monkeypatch):
"track_title": "Test", "feedback_count_per_step": {"cover": 1}}] "track_title": "Test", "feedback_count_per_step": {"cover": 1}}]
list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2]) list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2])
with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \ with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \
patch("app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[])), \
patch("app.agents.youtube_publisher.send_raw", patch("app.agents.youtube_publisher.send_raw",
new=AsyncMock(return_value={"ok": True, "message_id": 99})), \ new=AsyncMock(return_value={"ok": True, "message_id": 99})), \
patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg", patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
@@ -83,7 +88,7 @@ async def test_on_telegram_reply_approve_calls_feedback():
new=AsyncMock(), new=AsyncMock(),
) as mock_fb, patch( ) as mock_fb, patch(
"app.agents.youtube_publisher.send_raw", "app.agents.youtube_publisher.send_raw",
new=AsyncMock(), new=AsyncMock(return_value={"ok": True, "message_id": 1}),
): ):
a = YoutubePublisherAgent() a = YoutubePublisherAgent()
await a.on_telegram_reply(pipeline_id=42, step="cover", user_text="승인") await a.on_telegram_reply(pipeline_id=42, step="cover", user_text="승인")
@@ -99,7 +104,7 @@ async def test_on_telegram_reply_reject_with_feedback():
new=AsyncMock(), new=AsyncMock(),
) as mock_fb, patch( ) as mock_fb, patch(
"app.agents.youtube_publisher.send_raw", "app.agents.youtube_publisher.send_raw",
new=AsyncMock(), new=AsyncMock(return_value={"ok": True, "message_id": 1}),
): ):
a = YoutubePublisherAgent() a = YoutubePublisherAgent()
await a.on_telegram_reply(pipeline_id=43, step="meta", user_text="반려, 제목 짧게") await a.on_telegram_reply(pipeline_id=43, step="meta", user_text="반려, 제목 짧게")

View File

@@ -0,0 +1,213 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True)
def _init_db():
import gc
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
from app.db import init_db
init_db()
yield
gc.collect()
@pytest.mark.asyncio
async def test_failed_pipeline_notified_with_retry_button():
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
failed_pipeline = {
"id": 7,
"state": "failed",
"failed_reason": "video: boom",
"track_title": "T",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 1
_, kwargs = sent.await_args
assert "실패" in (kwargs.get("text") or "")
assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"] == "ytpub_retry_7"
@pytest.mark.asyncio
async def test_failed_pipeline_no_duplicate_notification():
"""같은 failed 파이프라인은 두 번째 poll에서 알림 안 함."""
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
failed_pipeline = {
"id": 7,
"state": "failed",
"failed_reason": "video: boom",
"track_title": "T",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
await agent.poll_state_changes()
# 중복 방지: 같은 failed 파이프라인에 대해 1회만 알림
assert sent.await_count == 1
@pytest.mark.asyncio
async def test_failed_pipeline_renotify_after_recovery():
"""failed에서 벗어난 파이프라인이 다시 failed 되면 재알림."""
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
failed_pipeline = {
"id": 7,
"state": "failed",
"failed_reason": "video: boom",
"track_title": "T",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
# 첫 번째 poll: failed 존재 → 알림
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 1
# 두 번째 poll: failed 목록에서 사라짐(재개됨) → _notified_failed에서 제거
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 1 # 아직 추가 알림 없음
# 세 번째 poll: 다시 failed → 재알림 가능
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 2 # 재알림
@pytest.mark.asyncio
async def test_handle_ytpub_retry_calls_proxy():
from app import service_proxy
from app.telegram import webhook
retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"})
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
with patch.object(service_proxy, "pipeline_retry", retry), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7")
retry.assert_awaited_once_with(7)
assert res["ok"] is True
@pytest.mark.asyncio
async def test_handle_ytpub_retry_invalid_data():
from app.telegram import webhook
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
with patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_abc")
assert res["ok"] is False
@pytest.mark.asyncio
async def test_failed_poll_exception_is_silent():
"""list_failed_pipelines 예외 시 poll이 조용히 넘어감 (active 알림에 영향 없음)."""
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
active_pipeline = {
"id": 1,
"state": "cover_pending",
"cover_url": "/x.jpg",
"track_title": "Track",
"feedback_count_per_step": {},
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[active_pipeline]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(side_effect=Exception("network error")),
), patch(
"app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
new=AsyncMock(),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
# active 알림은 정상 발송
assert sent.await_count == 1

View File

@@ -0,0 +1,556 @@
# music/YouTube 파이프라인 신뢰성·복구 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 파이프라인 step 실패를 자동 재시도(일시적, publish 제외)로 흡수하고, 영구 실패는 terminal `failed`로 둔 뒤 실패 step부터 수동 재개(텔레그램 [🔄재시도])할 수 있게 한다.
**Architecture:** music-lab `orchestrator.run_step`에 bounded 재시도 루프 + `POST /pipeline/{id}/retry` 재개 엔드포인트 + `db.get_last_failed_step`. agent-office `youtube_publisher``failed` 감지 → 텔레그램 알림+버튼, `webhook``ytpub_retry_{pid}` 콜백을 music-lab retry로 프록시.
**Tech Stack:** Python 3.12 / FastAPI / SQLite / asyncio / pytest. 기존 패턴: `orchestrator.run_step`(BackgroundTask), `main.py` pipeline 엔드포인트(404/409 + `_db_module`), `service_proxy`(httpx + `MUSIC_LAB_URL`), `telegram/webhook.py`(callback prefix 디스패치).
**Spec:** `docs/superpowers/specs/2026-06-12-music-pipeline-reliability-design.md`
> **테스트 fixture 주의**: music-lab/agent-office 각 `tests/conftest.py`의 DB 격리 방식(`db.DB_PATH` monkeypatch + `init_db`)을 먼저 확인하고 아래 테스트의 fixture를 그 관례에 맞춰라. 아래 코드는 `db.DB_PATH`를 tmp로 monkeypatch하는 표준 패턴을 가정한다.
---
## File Structure
| 파일 | 변경 | 책임 |
|------|------|------|
| `music-lab/app/db.py` | Modify | `get_last_failed_step(pid)` 추가 |
| `music-lab/app/pipeline/orchestrator.py` | Modify | `_dispatch_step` 추출 + `run_step` 재시도 루프 |
| `music-lab/app/main.py` | Modify | `POST /api/music/pipeline/{pid}/retry` |
| `music-lab/tests/test_pipeline_retry.py` | Create | db + orchestrator + endpoint 테스트 |
| `agent-office/app/service_proxy.py` | Modify | `pipeline_retry(pid)`, `list_failed_pipelines()` |
| `agent-office/app/agents/youtube_publisher.py` | Modify | `failed` 감지 → 텔레그램 알림+버튼 |
| `agent-office/app/telegram/webhook.py` | Modify | `ytpub_retry_` 디스패치 |
| `agent-office/tests/test_youtube_publisher_retry.py` | Create | 알림 + 콜백 테스트 |
| `web-backend/CLAUDE.md` + `memory/service_music.md` | Modify | API 표 + 메모리 |
---
## Task 1: music-lab db — `get_last_failed_step`
**Files:** Modify `music-lab/app/db.py`; Test `music-lab/tests/test_pipeline_retry.py` (Create)
- [ ] **Step 1: 실패 테스트 작성**
`music-lab/tests/test_pipeline_retry.py` (fixture는 music-lab conftest 관례에 맞춰 조정):
```python
import pytest
from app import db
@pytest.fixture(autouse=True)
def _tmp_db(tmp_path, monkeypatch):
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
db.init_db()
def _make_pipeline_with_failed_step(step: str) -> int:
pid = db.create_pipeline(track_id=1) # 시그니처는 conftest/db 확인 후 맞출 것
job = db.create_pipeline_job(pid, step)
db.update_pipeline_job(job, status="failed", error="boom")
db.update_pipeline_state(pid, "failed", failed_reason=f"{step}: boom")
return pid
def test_get_last_failed_step_returns_step():
pid = _make_pipeline_with_failed_step("video")
assert db.get_last_failed_step(pid) == "video"
def test_get_last_failed_step_none_when_no_failure():
pid = db.create_pipeline(track_id=1)
db.create_pipeline_job(pid, "cover") # status 기본(running/succeeded), failed 아님
assert db.get_last_failed_step(pid) is None
```
- [ ] **Step 2: 실패 확인**
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py::test_get_last_failed_step_returns_step -v`
Expected: FAIL — `db.get_last_failed_step` 미존재. (create_pipeline 시그니처가 다르면 helper를 db의 실제 생성 함수에 맞춰 수정.)
- [ ] **Step 3: 구현**
`music-lab/app/db.py`의 pipeline_jobs 섹션(`list_pipeline_jobs` 근처)에 추가:
```python
def get_last_failed_step(pid: int) -> Optional[str]:
"""파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None."""
with _connect() as conn: # music-lab의 커넥션 헬퍼 이름에 맞출 것
row = conn.execute(
"SELECT step FROM pipeline_jobs "
"WHERE pipeline_id = ? AND status = 'failed' "
"ORDER BY id DESC LIMIT 1",
(pid,),
).fetchone()
return row["step"] if row else None
```
(`_connect`/`_conn` 등 실제 커넥션 컨텍스트매니저 이름은 db.py 상단 확인 후 일치시킬 것.)
- [ ] **Step 4: 통과 확인**
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k get_last_failed`
Expected: 2 PASS.
- [ ] **Step 5: 커밋**
```bash
git add music-lab/app/db.py music-lab/tests/test_pipeline_retry.py
git commit -m "feat(music-lab): get_last_failed_step — 파이프라인 재개용 실패 step 판별
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
```
---
## Task 2: orchestrator 자동 재시도
**Files:** Modify `music-lab/app/pipeline/orchestrator.py`; Test `music-lab/tests/test_pipeline_retry.py`
- [ ] **Step 1: 실패 테스트 작성** (test_pipeline_retry.py에 추가)
```python
import asyncio
from app.pipeline import orchestrator
@pytest.fixture(autouse=True)
def _no_backoff(monkeypatch):
monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0])
@pytest.mark.asyncio
async def test_retryable_step_retries_then_succeeds(monkeypatch):
pid = db.create_pipeline(track_id=1)
calls = {"n": 0}
async def flaky(step, p, ctx, feedback):
calls["n"] += 1
if calls["n"] < 3:
raise RuntimeError("transient")
return {"next_state": "video_pending", "fields": {}}
monkeypatch.setattr(orchestrator, "_dispatch_step", flaky)
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
await orchestrator.run_step(pid, "cover")
assert calls["n"] == 3
assert db.get_pipeline(pid)["state"] == "video_pending"
@pytest.mark.asyncio
async def test_retryable_step_exhausts_to_failed(monkeypatch):
pid = db.create_pipeline(track_id=1)
async def always_fail(step, p, ctx, feedback):
raise RuntimeError("permanent")
monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail)
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
await orchestrator.run_step(pid, "cover")
assert db.get_pipeline(pid)["state"] == "failed"
@pytest.mark.asyncio
async def test_publish_not_retried(monkeypatch):
pid = db.create_pipeline(track_id=1)
calls = {"n": 0}
async def fail_publish(step, p, ctx, feedback):
calls["n"] += 1
raise RuntimeError("upload error")
monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish)
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
await orchestrator.run_step(pid, "publish")
assert calls["n"] == 1 # 재시도 없음
assert db.get_pipeline(pid)["state"] == "failed"
```
- [ ] **Step 2: 실패 확인**
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"`
Expected: FAIL — `_dispatch_step`/`STEP_RETRY_BACKOFF_SEC` 미존재.
- [ ] **Step 3: 구현 — `_dispatch_step` 추출 + 재시도 루프**
`orchestrator.py` 상단 상수 추가:
```python
STEP_MAX_RETRIES = 2 # 추가 재시도 횟수 (총 시도 = +1)
STEP_RETRY_BACKOFF_SEC = [5, 15]
NON_RETRY_STEPS = {"publish"}
```
기존 if/elif 분기(현재 `run_step` 내 lines 32-45)를 헬퍼로 추출:
```python
async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict:
if step == "cover":
return await _run_cover(p, ctx, feedback)
if step == "video":
return await _run_video(p, ctx)
if step == "thumb":
return await _run_thumb(p, ctx, feedback)
if step == "meta":
return await _run_meta(p, ctx, feedback)
if step == "review":
return await _run_review(p, ctx)
if step == "publish":
return await _run_publish(p, ctx)
raise ValueError(f"unknown step: {step}")
```
`run_step`의 try 블록(step 실행부)을 재시도 루프로 교체:
```python
try:
ctx = _resolve_input(p)
except ValueError as e:
db.update_pipeline_job(job_id, status="failed", error=str(e))
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
return
attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
last_err = None
for i in range(attempts):
try:
result = await _dispatch_step(step, p, ctx, feedback)
db.update_pipeline_job(job_id, status="succeeded")
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
return
except Exception as e:
last_err = e
logger.exception("step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts)
if i < attempts - 1:
await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)])
db.update_pipeline_job(job_id, status="failed", error=str(last_err))
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")
```
(`asyncio`는 이미 import됨.)
- [ ] **Step 4: 통과 확인**
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"`
Expected: 3 PASS.
- [ ] **Step 5: 커밋**
```bash
git add music-lab/app/pipeline/orchestrator.py music-lab/tests/test_pipeline_retry.py
git commit -m "feat(music-lab): orchestrator step 자동 재시도 (publish 제외)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
```
---
## Task 3: retry 엔드포인트
**Files:** Modify `music-lab/app/main.py`; Test `music-lab/tests/test_pipeline_retry.py`
- [ ] **Step 1: 실패 테스트 작성**
```python
from fastapi.testclient import TestClient
@pytest.fixture
def client(monkeypatch):
from app.main import app
return TestClient(app)
def test_retry_failed_pipeline_retriggers(client, monkeypatch):
pid = db.create_pipeline(track_id=1)
job = db.create_pipeline_job(pid, "video")
db.update_pipeline_job(job, status="failed", error="boom")
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
called = {}
from app.pipeline import orchestrator
async def fake_run(p, step, *a):
called["pid"], called["step"] = p, step
monkeypatch.setattr(orchestrator, "run_step", fake_run)
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code in (200, 202)
assert r.json()["retrying_step"] == "video"
def test_retry_non_failed_409(client):
pid = db.create_pipeline(track_id=1) # state='created'
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code == 409
def test_retry_publish_with_video_id_rejected(client):
pid = db.create_pipeline(track_id=1)
job = db.create_pipeline_job(pid, "publish")
db.update_pipeline_job(job, status="failed", error="x")
db.update_pipeline_state(pid, "failed", failed_reason="publish: x", youtube_video_id="abc123")
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code == 409
```
- [ ] **Step 2: 실패 확인**
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k retry_`
Expected: FAIL — 라우트 404.
- [ ] **Step 3: 구현**
`music-lab/app/main.py``cancel_pipeline` 아래에 추가:
```python
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
async def retry_pipeline(pid: int, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "failed":
raise HTTPException(409, f"재개 불가 (state={p['state']})")
failed_step = _db_module.get_last_failed_step(pid)
if not failed_step:
# 폴백: failed_reason "{step}: ..." prefix
reason = p.get("failed_reason") or ""
failed_step = reason.split(":", 1)[0].strip() or None
if not failed_step:
raise HTTPException(409, "실패 step을 판별할 수 없음")
if failed_step == "publish" and p.get("youtube_video_id"):
raise HTTPException(409, "이미 업로드됨 (중복 방지)")
bg.add_task(orchestrator.run_step, pid, failed_step)
return {"ok": True, "retrying_step": failed_step}
```
- [ ] **Step 4: 통과 확인 + 전체 회귀**
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v` → 모두 PASS
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q` → 회귀 0
- [ ] **Step 5: 커밋**
```bash
git add music-lab/app/main.py music-lab/tests/test_pipeline_retry.py
git commit -m "feat(music-lab): POST /pipeline/{id}/retry — 실패 step 수동 재개
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
```
---
## Task 4: agent-office service_proxy — pipeline_retry + list_failed
**Files:** Modify `agent-office/app/service_proxy.py`
> **먼저 확인**: `list_active_pipelines`가 호출하는 `GET /api/music/pipeline?status=active`가 failed를 포함하는지. 미포함이면 music-lab의 pipeline list 엔드포인트가 `status=failed`도 지원하는지 확인하고, 없으면 그 엔드포인트에 failed 필터를 추가(별도 작은 수정)하거나 `status` 화이트리스트에 'failed' 추가.
- [ ] **Step 1: 헬퍼 추가** — 기존 `list_active_pipelines`/`post_pipeline_feedback` 패턴(async with httpx.AsyncClient + MUSIC_LAB_URL) 그대로:
```python
async def list_failed_pipelines() -> list[dict]:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=failed")
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else data.get("items", data.get("pipelines", []))
async def pipeline_retry(pid: int) -> dict:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry")
# 409(재개 불가/중복)도 본문 반환 위해 raise 안 함
return {"status_code": resp.status_code, **(resp.json() if resp.headers.get("content-type","").startswith("application/json") else {})}
```
(`list_active_pipelines`가 이미 failed를 포함하면 `list_failed_pipelines`는 생략하고 Task 5에서 active 목록에서 state=='failed' 필터.)
- [ ] **Step 2: import sanity**`cd agent-office && PYTHONPATH=.. python -c "from app import service_proxy; print('OK')"` → OK
- [ ] **Step 3: 커밋**
```bash
git add agent-office/app/service_proxy.py
git commit -m "feat(agent-office): service_proxy pipeline_retry + list_failed_pipelines
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
```
---
## Task 5: youtube_publisher — failed 감지 + 텔레그램 알림/버튼
**Files:** Modify `agent-office/app/agents/youtube_publisher.py`; Test `agent-office/tests/test_youtube_publisher_retry.py` (Create)
- [ ] **Step 1: 실패 테스트 작성**
`agent-office/tests/test_youtube_publisher_retry.py` (DB fixture는 agent-office conftest 관례 따름):
```python
import pytest
from unittest.mock import AsyncMock
from app.agents.youtube_publisher import YoutubePublisherAgent
@pytest.mark.asyncio
async def test_failed_pipeline_notified_with_retry_button(monkeypatch):
agent = YoutubePublisherAgent()
monkeypatch.setattr(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
AsyncMock(return_value=[
{"id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T"}
]),
)
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
monkeypatch.setattr("app.agents.youtube_publisher.send_raw", sent)
await agent.poll_state_changes()
assert sent.await_count == 1
args, kwargs = sent.await_args
text = kwargs.get("text") or (args[0] if args else "")
assert "실패" in text
# 인라인 retry 버튼 callback_data
rm = kwargs.get("reply_markup") or {}
cb = rm["inline_keyboard"][0][0]["callback_data"]
assert cb == "ytpub_retry_7"
# 중복 방지: 같은 failed 재폴링 시 미발송
await agent.poll_state_changes()
assert sent.await_count == 1
```
(주의: `send_raw``reply_markup`을 지원하는지 messaging 확인 — 미지원 시 Task에 messaging.send_raw에 reply_markup 인자 추가 포함. insta는 send_photo로 했으나 여기선 텍스트+버튼이므로 send_raw에 reply_markup 필요.)
- [ ] **Step 2: 실패 확인**`cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → FAIL (failed 미처리)
- [ ] **Step 3: 구현**`poll_state_changes`에 failed 분기 추가:
```python
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._notified_state_per_pipeline: dict[int, tuple] = {}
self._notified_failed: set[int] = set()
```
`poll_state_changes` 루프 내, `*_pending` 처리 뒤:
```python
if state == "failed" and pid not in self._notified_failed:
await self._notify_failed(p)
self._notified_failed.add(pid)
if state != "failed":
self._notified_failed.discard(pid) # 재개 후 다시 실패하면 재알림
```
새 메서드:
```python
async def _notify_failed(self, p: dict) -> None:
reason = p.get("failed_reason") or "?"
step = reason.split(":", 1)[0].strip()
title = p.get("track_title") or f"Pipeline #{p['id']}"
text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}"
kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]}
await send_raw(text=text, reply_markup=kb)
add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning")
```
`send_raw``reply_markup`을 받도록 `agent-office/app/telegram/messaging.py``send_raw` 시그니처 확인/확장(이미 지원하면 그대로).
- [ ] **Step 4: 통과 확인**`cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → PASS + 전체 회귀
- [ ] **Step 5: 커밋**
```bash
git add agent-office/app/agents/youtube_publisher.py agent-office/app/telegram/messaging.py agent-office/tests/test_youtube_publisher_retry.py
git commit -m "feat(agent-office): youtube_publisher 파이프라인 실패 텔레그램 알림+재시도 버튼
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
```
---
## Task 6: webhook ytpub_retry 디스패치
**Files:** Modify `agent-office/app/telegram/webhook.py`; Test `agent-office/tests/test_youtube_publisher_retry.py`
> **먼저 확인**: `_handle_callback`의 prefix 분기 구조 + 기존 핸들러(`_handle_insta_issue` 등)가 service_proxy를 호출/회신하는 패턴.
- [ ] **Step 1: 실패 테스트 추가**
```python
@pytest.mark.asyncio
async def test_handle_ytpub_retry_calls_proxy(monkeypatch):
from app.telegram import webhook
retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"})
monkeypatch.setattr("app.telegram.webhook.service_proxy.pipeline_retry", retry, raising=False)
monkeypatch.setattr("app.telegram.webhook.send_raw", AsyncMock(), raising=False)
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7")
retry.assert_awaited_once_with(7)
```
(import 경로/`send_raw` 위치는 webhook.py 실제에 맞춤.)
- [ ] **Step 2: 실패 확인** → FAIL (`_handle_ytpub_retry` 미존재)
- [ ] **Step 3: 구현**`_handle_callback`에 분기:
```python
if callback_id.startswith("ytpub_retry_"):
return await _handle_ytpub_retry(callback_query, callback_id)
```
핸들러:
```python
async def _handle_ytpub_retry(callback_query: dict, callback_id: str) -> dict:
try:
pid = int(callback_id.removeprefix("ytpub_retry_"))
except (ValueError, AttributeError):
return {"ok": False, "error": "invalid_callback_data"}
res = await service_proxy.pipeline_retry(pid)
sc = res.get("status_code")
if sc in (200, 202):
await send_raw(text=f"🔄 파이프라인 #{pid} 재개: {res.get('retrying_step','?')}")
else:
await send_raw(text=f"⚠️ 재개 불가 (#{pid}): {res.get('detail', sc)}")
return {"ok": True}
```
(`service_proxy`/`send_raw` import는 webhook.py 기존 방식 따름.)
- [ ] **Step 4: 통과 확인** + 전체 agent-office 회귀
- [ ] **Step 5: 커밋**
```bash
git add agent-office/app/telegram/webhook.py agent-office/tests/test_youtube_publisher_retry.py
git commit -m "feat(agent-office): ytpub_retry 텔레그램 콜백 → music-lab retry 프록시
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
```
---
## Task 7: 문서 + 배포 + 메모리
**Files:** Modify `web-backend/CLAUDE.md`, `memory/service_music.md`
- [ ] **Step 1: CLAUDE.md music API 표에 추가**
```
| POST | `/api/music/pipeline/{id}/retry` | 실패 파이프라인 실패 step부터 재개 (publish+업로드완료 시 409) |
```
- [ ] **Step 2: 전체 회귀**
```bash
cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q
cd ../agent-office && PYTHONPATH=.. python -m pytest tests/ -q
```
Expected: 모두 PASS (사전존재 stale 제외).
- [ ] **Step 3: 커밋 + push (NAS 배포)**
```bash
cd C:/Users/jaeoh/Desktop/workspace/web-backend
git add CLAUDE.md docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
git commit -m "docs(music): 파이프라인 retry API 문서 + 구현 계획
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
git push origin main
```
- [ ] **Step 4: 메모리 갱신**`service_music.md`에 신뢰성/복구(자동 재시도 publish 제외 + 수동 retry 엔드포인트 + youtube_publisher 실패 알림) 추가.
- [ ] **Step 5: 프로덕션 확인(경량)** — 배포 후 `POST /api/music/pipeline/<없는id>/retry` → 404, 실제 failed 파이프라인 있으면 retry 동작. (없으면 단위 테스트로 갈음.)
---
## Self-Review
**Spec coverage:**
- 자동 재시도(publish 제외, _resolve_input 제외) → Task 2 ✓
- 수동 재개(실패 step, publish+video_id 가드) → Task 1(step 판별)+Task 3 ✓
- 실패 알림 + [🔄재시도] → Task 5 ✓
- 재시도 콜백 → Task 4(proxy)+Task 6(dispatch) ✓
- stuck 감지 제외(YAGNI) → 계획에 없음 ✓
**Placeholder scan:** 코드 스텝 모두 구체. "conftest 관례 확인"·"list_active가 failed 포함하는지 확인"은 기존 코드 소유를 존중하는 의도적 검증 지시(placeholder 아님).
**Type consistency:** `get_last_failed_step(pid)` Task1↔Task3 일치. `_dispatch_step(step,p,ctx,feedback)` Task2 정의↔테스트 mock 일치. `run_step(pid, step)` 시그니처 기존 일치. callback `ytpub_retry_{pid}` Task5 생성↔Task6 파싱 일치. `pipeline_retry(pid)` Task4↔Task6 일치. retry 응답 `retrying_step`/`status_code` Task3↔Task4↔Task6 일치.

View File

@@ -0,0 +1,105 @@
# music/YouTube 파이프라인 신뢰성·복구 — 설계
> 작성 2026-06-12. YouTube 자동화 파이프라인의 step 실패를 자동 재시도(일시적)하고, 영구 실패는 실패 step부터 수동 재개(텔레그램 [🔄재시도])할 수 있게 한다. "music/YouTube 파이프라인 고도화" 중 **신뢰성/복구** 슬라이스.
## 1. 목표
파이프라인 step(`cover→video→thumb→meta→review→publish`) 실패가 ① 일시적이면 자동 재시도로 흡수하고, ② 영구적이면 terminal `failed`로 둔 뒤 **이전 산출물을 보존한 채 실패 step부터 재개**할 수 있게 한다. 현재는 step 한 번 실패하면 전체 파이프라인이 terminal `failed`가 되고 복구 경로가 없어 처음부터 다시 만들어야 한다.
## 2. 배경 (현재 동작)
- `orchestrator.run_step(pipeline_id, step, feedback)`: `pipeline_jobs` row 생성 → step 실행 → 성공 시 `update_pipeline_state(next_state)`, 예외 시 `pipeline_jobs.status='failed'` + 파이프라인 `state='failed'` + `failed_reason="{step}: {e}"`. **재시도/재개 없음.**
- 항상 `bg.add_task(orchestrator.run_step, pid, step, ...)`로 BackgroundTask 호출(start_pipeline→cover, feedback→next_step, publish_pipeline→publish).
- 이전 step 산출물(`cover_url`/`video_url`/`thumbnail_url`/`metadata_json`/`review_json`)은 파이프라인 row에 **보존**됨 → 실패 step만 재실행하면 이어갈 수 있는 구조.
- `state_machine`: STEPS, `_APPROVE_NEXT`, TERMINAL_STATES={published, cancelled, **failed**, awaiting_manual}.
- `agent-office youtube_publisher.poll_state_changes`: `*_pending` 신규 진입만 텔레그램 알림. **`failed`는 무알림(silent)** — 사용자가 실패를 모름.
## 3. 요구사항 (확정)
- **자동 재시도**: step 실행 실패 시 `STEP_MAX_RETRIES`(기본 2 → 총 3회)까지 backoff 재시도. 소진 후 terminal `failed`.
- `_resolve_input` 에러(입력/설정)는 재시도 안 함(재시도해도 안 고쳐짐).
- **`publish` step은 자동 재시도 제외** — youtube 업로드는 비멱등(중복 업로드 위험). 1회 시도 후 실패면 즉시 terminal.
- 재시도 대상 = `cover/video/thumb/meta/review`.
- **수동 재개**: terminal `failed` 파이프라인을 실패 step부터 재실행. 이전 산출물 보존.
- publish 재개 가드: `youtube_video_id`가 이미 있으면 재개 거부(원 업로드 성공 가능성 → 중복 방지).
- **실패 알림**: 영구 실패 시 텔레그램 알림 + 인라인 `[🔄재시도]` 버튼(현재 silent 갭 해소).
- **범위 밖(YAGNI)**: stuck 감지(*_running hang / *_pending 방치). 수동 재시도로 복구 가능하므로 이번 슬라이스 제외.
## 4. 아키텍처
3 컴포넌트:
```
[music-lab orchestrator] run_step: step 실행을 재시도 루프로 (publish 제외) → 소진 시 failed
[music-lab API] POST /api/music/pipeline/{id}/retry → 실패 step부터 run_step 재트리거
[agent-office] youtube_publisher: failed 감지 → 텔레그램 알림+[🔄재시도]
webhook: ytpub_retry_{pid} → service_proxy.pipeline_retry → music-lab retry
```
## 5. music-lab 상세
### 5.1 자동 재시도 (`pipeline/orchestrator.py`)
- 상수: `STEP_MAX_RETRIES = 2`, `STEP_RETRY_BACKOFF_SEC = [5, 15]`(시도 간 대기), `NON_RETRY_STEPS = {"publish"}`.
- `run_step`의 step 실행부(현재 try lines 31-47)를 루프로:
```
attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
for i in range(attempts):
try:
result = await _dispatch_step(step, p, ctx, feedback)
update_pipeline_job(job_id, status="succeeded")
update_pipeline_state(pipeline_id, result["next_state"], **fields)
return
except Exception as e:
last = e
if i < attempts - 1:
add_log/pipeline_job note "retry {i+1}"
await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len-1)])
# 소진
update_pipeline_job(job_id, status="failed", error=str(last))
update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last}")
```
- `_resolve_input` 실패는 루프 진입 전 early-return(현행 유지, 재시도 X).
- 재시도 시도 가시화: `pipeline_jobs`에 attempt별 기록(또는 error 메시지에 "attempt n/N").
### 5.2 resume 엔드포인트 (`main.py`)
- `POST /api/music/pipeline/{id}/retry`:
- 파이프라인 조회 없으면 404.
- `state != "failed"` → 409 "재개 불가 (state=...)".
- 실패 step 판별: `db.get_last_failed_step(pipeline_id)` (pipeline_jobs에서 status='failed' 최신 step). 없으면 `failed_reason.split(":")[0].strip()` 폴백.
- 실패 step이 `publish`이고 `youtube_video_id`가 이미 있으면 → 409 "이미 업로드됨 (중복 방지)".
- `bg.add_task(orchestrator.run_step, pid, failed_step)` 재트리거. 반환 `{ok: true, retrying_step}`.
- `db.get_last_failed_step(pipeline_id) -> str | None` 헬퍼 신규.
## 6. agent-office 상세
### 6.1 실패 알림 (`agents/youtube_publisher.py`)
- `poll_state_changes`: `_STEP_TITLES`(*_pending) 처리 후, `state == "failed"` 인 파이프라인도 검사.
- 신규 failed(중복 방지: `self._notified_failed: set[int]`, 또는 기존 dict에 ('failed', reason_hash))면 텔레그램 발송:
`⚠️ [{track_title}] 파이프라인 #{id} '{step}' 실패\n사유: {failed_reason}` + 인라인 `[🔄 재시도]` (callback_data `ytpub_retry_{id}`).
- 발송 후 notified 기록.
- `service_proxy.list_active_pipelines()`가 failed를 포함하는지 확인 — 미포함이면 failed도 반환하도록 보강(또는 별도 조회). (plan에서 확인.)
### 6.2 재시도 콜백 (`telegram/webhook.py`)
- `_handle_callback`에 `callback_id.startswith("ytpub_retry_")` 분기 → `_handle_ytpub_retry`.
- `_handle_ytpub_retry`: `pid = int(callback_id.removeprefix("ytpub_retry_"))` → `service_proxy.pipeline_retry(pid)` → 결과 텔레그램 회신("재개: {step}" / 거부 사유).
- `service_proxy.pipeline_retry(pid)` 신규: `POST {MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry`.
## 7. 에러 처리 / 엣지
- 재시도 backoff 중 컨테이너 재시작 → 해당 step 작업 유실, 파이프라인 비-terminal stuck. 범위 밖이나 수동 [🔄재시도]로 복구 가능(안전망).
- resume 시 state≠failed → 409(중복 재개·동시성 방지). 텔레그램 [🔄재시도] 중복 탭도 멱등 거부.
- pipeline_jobs에 failed row 없고 state만 failed → `failed_reason` prefix 폴백.
- publish 재개 + `youtube_video_id` 존재 → 409(중복 업로드 방지).
- 알림 중복: notified 기록으로 같은 failed 1회만 발송.
## 8. 테스트
- **orchestrator (재시도)**: step 2회 실패 후 성공 → next_state 도달(3시도). 끝까지 실패 → failed. publish는 1시도 후 즉시 failed(재시도 X). `_resolve_input` 실패 → 재시도 없이 failed.
- **API retry**: failed→run_step 재트리거(mock 확인) + retrying_step 반환. 비-failed→409. publish+youtube_video_id→409.
- **db**: `get_last_failed_step` — 최신 failed job step 반환, 없으면 None.
- **agent-office**: poll 신규 failed→텔레그램 발송(중복 방지). `_handle_ytpub_retry`→service_proxy.pipeline_retry 호출 + pid 파싱.
## 9. 영향받는 파일
- music-lab: `app/pipeline/orchestrator.py`(재시도 루프 + `_dispatch_step` 추출), `app/main.py`(retry 엔드포인트), `app/db.py`(`get_last_failed_step`), `tests/`.
- agent-office: `app/agents/youtube_publisher.py`(failed 알림), `app/telegram/webhook.py`(ytpub_retry 디스패치), `app/service_proxy.py`(`pipeline_retry`, 필요 시 `list_active_pipelines` failed 포함), `tests/`.
- web-backend/CLAUDE.md music API 표 + `service_music.md` 메모리 갱신.

View File

@@ -1135,6 +1135,21 @@ def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]:
return [_parse_pipeline_row(r) for r in rows] return [_parse_pipeline_row(r) for r in rows]
def list_pipelines_by_state(state: str) -> List[Dict[str, Any]]:
"""특정 state의 파이프라인만 조회 (예: 'failed')."""
sql = """
SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
FROM video_pipelines vp
LEFT JOIN music_library ml ON ml.id = vp.track_id
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
WHERE vp.state = ?
ORDER BY vp.created_at DESC
"""
with _conn() as conn:
rows = conn.execute(sql, (state,)).fetchall()
return [_parse_pipeline_row(r) for r in rows]
def increment_feedback_count(pid: int, step: str) -> int: def increment_feedback_count(pid: int, step: str) -> int:
"""원자적으로 feedback_count_per_step.<step>를 +1 한 뒤 새 값을 반환. """원자적으로 feedback_count_per_step.<step>를 +1 한 뒤 새 값을 반환.
@@ -1220,6 +1235,18 @@ def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]:
return [dict(r) for r in rows] return [dict(r) for r in rows]
def get_last_failed_step(pid: int) -> Optional[str]:
"""파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None."""
with _conn() as conn:
row = conn.execute(
"SELECT step FROM pipeline_jobs "
"WHERE pipeline_id = ? AND status = 'failed' "
"ORDER BY id DESC LIMIT 1",
(pid,),
).fetchone()
return row["step"] if row else None
def get_youtube_setup() -> Dict[str, Any]: def get_youtube_setup() -> Dict[str, Any]:
"""youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회.""" """youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회."""
with _conn() as conn: with _conn() as conn:

View File

@@ -1030,7 +1030,12 @@ def create_pipeline(req: PipelineCreate):
@app.get("/api/music/pipeline") @app.get("/api/music/pipeline")
def list_pipelines_endpoint(status: str = "all"): def list_pipelines_endpoint(status: str = "all"):
pipelines = _db_module.list_pipelines(active_only=(status == "active")) if status == "active":
pipelines = _db_module.list_pipelines(active_only=True)
elif status == "failed":
pipelines = _db_module.list_pipelines_by_state("failed")
else:
pipelines = _db_module.list_pipelines(active_only=False)
return {"pipelines": pipelines} return {"pipelines": pipelines}
@@ -1128,6 +1133,31 @@ def cancel_pipeline(pid: int):
return {"ok": True} return {"ok": True}
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
async def retry_pipeline(pid: int, bg: BackgroundTasks):
from .pipeline.state_machine import STEPS
p = _db_module.get_pipeline(pid)
if not p:
raise HTTPException(404)
if p["state"] != "failed":
raise HTTPException(409, f"재개 불가 (state={p['state']})")
failed_step = _db_module.get_last_failed_step(pid)
if not failed_step:
reason = p.get("failed_reason") or ""
failed_step = reason.split(":", 1)[0].strip() or None
if not failed_step:
raise HTTPException(409, "실패 step을 판별할 수 없음")
# Fix 3: failed_step이 알려진 STEPS에 없으면 409
if failed_step not in STEPS:
raise HTTPException(409, "실패 step 판별 불가")
if failed_step == "publish" and p.get("youtube_video_id"):
raise HTTPException(409, "이미 업로드됨 (중복 방지)")
# Fix 1: bg.add_task 직전에 상태를 'retrying'으로 전이 → 동시 retry 409 방지
_db_module.update_pipeline_state(pid, "retrying")
bg.add_task(orchestrator.run_step, pid, failed_step)
return {"ok": True, "retrying_step": failed_step}
@app.post("/api/music/pipeline/{pid}/publish", status_code=202) @app.post("/api/music/pipeline/{pid}/publish", status_code=202)
async def publish_pipeline(pid: int, bg: BackgroundTasks): async def publish_pipeline(pid: int, bg: BackgroundTasks):
p = _db_module.get_pipeline(pid) p = _db_module.get_pipeline(pid)

View File

@@ -11,6 +11,10 @@ from .gradient import make_gradient_with_title
logger = logging.getLogger("music-lab.orchestrator") logger = logging.getLogger("music-lab.orchestrator")
STEP_MAX_RETRIES = 2 # 추가 재시도 (총 시도 = +1)
STEP_RETRY_BACKOFF_SEC = [5, 15]
NON_RETRY_STEPS = {"publish"}
async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
"""단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이. """단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이.
@@ -28,27 +32,35 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
return return
try: attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
if step == "cover": last_err = None
result = await _run_cover(p, ctx, feedback) for i in range(attempts):
elif step == "video": try:
result = await _run_video(p, ctx) result = await _dispatch_step(step, p, ctx, feedback)
elif step == "thumb": db.update_pipeline_job(job_id, status="succeeded")
result = await _run_thumb(p, ctx, feedback) db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
elif step == "meta": return
result = await _run_meta(p, ctx, feedback) except Exception as e:
elif step == "review": last_err = e
result = await _run_review(p, ctx) logger.exception(
elif step == "publish": "step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts
result = await _run_publish(p, ctx) )
else: if i < attempts - 1:
raise ValueError(f"unknown step: {step}") backoff = STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)] if STEP_RETRY_BACKOFF_SEC else 0
db.update_pipeline_job(job_id, status="succeeded") await asyncio.sleep(backoff)
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {})) db.update_pipeline_job(job_id, status="failed", error=str(last_err))
except Exception as e: db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")
logger.exception("step %s failed for pipeline %s", step, pipeline_id)
db.update_pipeline_job(job_id, status="failed", error=str(e))
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict:
"""step 이름으로 실행 함수 디스패치."""
if step == "cover": return await _run_cover(p, ctx, feedback)
if step == "video": return await _run_video(p, ctx)
if step == "thumb": return await _run_thumb(p, ctx, feedback)
if step == "meta": return await _run_meta(p, ctx, feedback)
if step == "review": return await _run_review(p, ctx)
if step == "publish": return await _run_publish(p, ctx)
raise ValueError(f"unknown step: {step}")
def _resolve_input(p: dict) -> dict: def _resolve_input(p: dict) -> dict:

View File

@@ -1,4 +1,4 @@
[pytest] [pytest]
testpaths = tests testpaths = tests
pythonpath = . pythonpath = . ..
asyncio_mode = auto asyncio_mode = auto

View File

@@ -52,6 +52,19 @@ def test_list_pipelines_active_filter(client):
assert all(p["state"] != "published" for p in r.json()["pipelines"]) assert all(p["state"] != "published" for p in r.json()["pipelines"])
def test_list_pipelines_failed_filter(client):
"""status=failed 필터는 state='failed' 파이프라인만 반환한다."""
# failed 파이프라인 생성
pid_f = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid_f, "failed", failed_reason="cover: oops")
r = client.get("/api/music/pipeline?status=failed")
assert r.status_code == 200
pipelines = r.json()["pipelines"]
assert len(pipelines) == 1
assert pipelines[0]["state"] == "failed"
assert pipelines[0]["id"] == pid_f
def test_feedback_reject_records_feedback_and_increments_count(client): def test_feedback_reject_records_feedback_and_increments_count(client):
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"] pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
db.update_pipeline_state(pid, "cover_pending") db.update_pipeline_state(pid, "cover_pending")

View File

@@ -0,0 +1,174 @@
import pytest
from fastapi.testclient import TestClient
from app import db
from app.pipeline import orchestrator
@pytest.fixture
def fresh_db(monkeypatch, tmp_path):
db_path = tmp_path / "music.db"
monkeypatch.setattr(db, "DB_PATH", str(db_path))
db.init_db()
return db_path
@pytest.fixture(autouse=True)
def _no_backoff(monkeypatch):
monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0])
def test_get_last_failed_step_returns_step(fresh_db):
pid = db.create_pipeline(track_id=1)
job_id = db.create_pipeline_job(pid, "video")
db.update_pipeline_job(job_id, status="failed", error="boom")
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
assert db.get_last_failed_step(pid) == "video"
def test_get_last_failed_step_none_when_no_failure(fresh_db):
pid = db.create_pipeline(track_id=1)
db.create_pipeline_job(pid, "cover")
assert db.get_last_failed_step(pid) is None
async def test_retryable_step_retries_then_succeeds(fresh_db, monkeypatch):
pid = db.create_pipeline(track_id=1)
calls = {"n": 0}
async def flaky(step, p, ctx, feedback):
calls["n"] += 1
if calls["n"] < 3:
raise RuntimeError("transient")
return {"next_state": "video_pending", "fields": {}}
monkeypatch.setattr(orchestrator, "_dispatch_step", flaky)
monkeypatch.setattr(
orchestrator, "_resolve_input",
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
)
await orchestrator.run_step(pid, "cover")
assert calls["n"] == 3
assert db.get_pipeline(pid)["state"] == "video_pending"
async def test_retryable_step_exhausts_to_failed(fresh_db, monkeypatch):
pid = db.create_pipeline(track_id=1)
async def always_fail(step, p, ctx, feedback):
raise RuntimeError("permanent")
monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail)
monkeypatch.setattr(
orchestrator, "_resolve_input",
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
)
await orchestrator.run_step(pid, "cover")
assert db.get_pipeline(pid)["state"] == "failed"
async def test_publish_not_retried(fresh_db, monkeypatch):
pid = db.create_pipeline(track_id=1)
calls = {"n": 0}
async def fail_publish(step, p, ctx, feedback):
calls["n"] += 1
raise RuntimeError("upload error")
monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish)
monkeypatch.setattr(
orchestrator, "_resolve_input",
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
)
await orchestrator.run_step(pid, "publish")
assert calls["n"] == 1
assert db.get_pipeline(pid)["state"] == "failed"
# ── Task 3: retry endpoint tests ─────────────────────────────────────────────
@pytest.fixture
def client(fresh_db):
from app.main import app
return TestClient(app)
def test_retry_failed_pipeline_retriggers(fresh_db, client, monkeypatch):
pid = db.create_pipeline(track_id=1)
job = db.create_pipeline_job(pid, "video")
db.update_pipeline_job(job, status="failed", error="boom")
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
called = {}
async def fake_run(p, step, *a):
called["pid"], called["step"] = p, step
monkeypatch.setattr(orchestrator, "run_step", fake_run)
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code in (200, 202)
assert r.json()["retrying_step"] == "video"
def test_retry_non_failed_409(fresh_db, client):
pid = db.create_pipeline(track_id=1) # state='created'
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code == 409
def test_retry_publish_with_video_id_rejected(fresh_db, client):
pid = db.create_pipeline(track_id=1)
job = db.create_pipeline_job(pid, "publish")
db.update_pipeline_job(job, status="failed", error="x")
db.update_pipeline_state(pid, "failed", failed_reason="publish: x",
youtube_video_id="abc123")
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code == 409
# ── Fix 2: fake_run 인자 검증 ────────────────────────────────────────────────
def test_retry_failed_pipeline_retriggers_with_correct_args(fresh_db, client, monkeypatch):
"""fake_run이 (pid, failed_step)으로 호출되는지 검증."""
pid = db.create_pipeline(track_id=1)
job = db.create_pipeline_job(pid, "video")
db.update_pipeline_job(job, status="failed", error="boom")
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
called = {}
async def fake_run(p, step, *a):
called["pid"], called["step"] = p, step
monkeypatch.setattr(orchestrator, "run_step", fake_run)
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code in (200, 202)
assert called["pid"] == pid
assert called["step"] == "video"
# ── Fix 1: retrying 전이로 중복 retry 409 ────────────────────────────────────
def test_retry_twice_second_is_409(fresh_db, client, monkeypatch):
"""첫 번째 retry가 상태를 'retrying'으로 전이 → 두 번째 retry는 409."""
pid = db.create_pipeline(track_id=1)
job = db.create_pipeline_job(pid, "video")
db.update_pipeline_job(job, status="failed", error="boom")
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
async def fake_run(p, step, *a):
pass
monkeypatch.setattr(orchestrator, "run_step", fake_run)
r1 = client.post(f"/api/music/pipeline/{pid}/retry")
assert r1.status_code in (200, 202)
r2 = client.post(f"/api/music/pipeline/{pid}/retry") # 이미 retrying → 409
assert r2.status_code == 409
# ── Fix 3: 알 수 없는 step prefix → 409 ─────────────────────────────────────
def test_retry_unparseable_failed_reason_409(fresh_db, client):
"""failed_reason이 known STEPS에 없는 prefix면 409."""
pid = db.create_pipeline(track_id=1)
# failed job row 없이 state만 failed + 비-step prefix reason
db.update_pipeline_state(pid, "failed", failed_reason="ValueError: track 1 없음")
r = client.post(f"/api/music/pipeline/{pid}/retry")
assert r.status_code == 409