Compare commits
11 Commits
c8ce6cb617
...
796ac6d39f
| Author | SHA1 | Date | |
|---|---|---|---|
| 796ac6d39f | |||
| 18cea427be | |||
| 6c178006d3 | |||
| 084e4f1b4d | |||
| d048251a97 | |||
| ef1a7a92fd | |||
| 44dbe7c426 | |||
| e90e25d78f | |||
| d638666659 | |||
| 51eff1538e | |||
| ffb96de61d |
@@ -265,7 +265,7 @@ docker compose up -d
|
|||||||
| POST/GET | `/api/music/generate-batch` | 배치 생성 |
|
| POST/GET | `/api/music/generate-batch` | 배치 생성 |
|
||||||
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
|
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
|
||||||
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
|
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
|
||||||
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/telegram-msg/lookup) | YouTube 자동화 파이프라인 |
|
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) |
|
||||||
| GET/PUT | `/api/music/setup` | 파이프라인 설정 |
|
| GET/PUT | `/api/music/setup` | 파이프라인 설정 |
|
||||||
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
|
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
|
||||||
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |
|
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ class YoutubePublisherAgent(BaseAgent):
|
|||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
||||||
|
self._notified_failed: set[int] = set()
|
||||||
|
|
||||||
async def poll_state_changes(self) -> None:
|
async def poll_state_changes(self) -> None:
|
||||||
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
|
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
|
||||||
@@ -48,6 +49,32 @@ class YoutubePublisherAgent(BaseAgent):
|
|||||||
await self._notify_step(p)
|
await self._notify_step(p)
|
||||||
self._notified_state_per_pipeline[pid] = key
|
self._notified_state_per_pipeline[pid] = key
|
||||||
|
|
||||||
|
try:
|
||||||
|
failed = await service_proxy.list_failed_pipelines()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("failed 폴링 실패: %s", e)
|
||||||
|
failed = []
|
||||||
|
for p in failed:
|
||||||
|
pid = p.get("id")
|
||||||
|
if pid is None:
|
||||||
|
continue
|
||||||
|
if pid not in self._notified_failed:
|
||||||
|
await self._notify_failed(p)
|
||||||
|
self._notified_failed.add(pid)
|
||||||
|
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제
|
||||||
|
failed_ids = {p.get("id") for p in failed}
|
||||||
|
self._notified_failed &= failed_ids
|
||||||
|
|
||||||
|
async def _notify_failed(self, p: dict) -> None:
|
||||||
|
reason = p.get("failed_reason") or "?"
|
||||||
|
step = reason.split(":", 1)[0].strip()
|
||||||
|
title = p.get("track_title") or f"Pipeline #{p['id']}"
|
||||||
|
text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}"
|
||||||
|
kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]}
|
||||||
|
sent = await send_raw(text=text, reply_markup=kb)
|
||||||
|
if sent.get("ok"):
|
||||||
|
add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning")
|
||||||
|
|
||||||
async def _notify_step(self, pipeline: dict) -> None:
|
async def _notify_step(self, pipeline: dict) -> None:
|
||||||
state = pipeline["state"]
|
state = pipeline["state"]
|
||||||
title_name, step = _STEP_TITLES[state]
|
title_name, step = _STEP_TITLES[state]
|
||||||
|
|||||||
@@ -352,6 +352,25 @@ async def list_active_pipelines() -> list[dict]:
|
|||||||
return resp.json().get("pipelines", [])
|
return resp.json().get("pipelines", [])
|
||||||
|
|
||||||
|
|
||||||
|
async def list_failed_pipelines() -> list[dict]:
|
||||||
|
async with httpx.AsyncClient(timeout=10) as client:
|
||||||
|
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=failed")
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
return data if isinstance(data, list) else data.get("items", data.get("pipelines", []))
|
||||||
|
|
||||||
|
|
||||||
|
async def pipeline_retry(pid: int) -> dict:
|
||||||
|
async with httpx.AsyncClient(timeout=15) as client:
|
||||||
|
resp = await client.post(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry")
|
||||||
|
out = {"status_code": resp.status_code}
|
||||||
|
try:
|
||||||
|
out.update(resp.json())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
async def get_pipeline(pid: int) -> dict:
|
async def get_pipeline(pid: int) -> dict:
|
||||||
async with httpx.AsyncClient(timeout=15) as client:
|
async with httpx.AsyncClient(timeout=15) as client:
|
||||||
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}")
|
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}")
|
||||||
|
|||||||
@@ -43,6 +43,9 @@ async def _handle_callback(callback_query: dict) -> Optional[dict]:
|
|||||||
if callback_id.startswith("issue_"):
|
if callback_id.startswith("issue_"):
|
||||||
return await _handle_insta_issue(callback_query, callback_id)
|
return await _handle_insta_issue(callback_query, callback_id)
|
||||||
|
|
||||||
|
if callback_id.startswith("ytpub_retry_"):
|
||||||
|
return await _handle_ytpub_retry(callback_query, callback_id)
|
||||||
|
|
||||||
cb = get_telegram_callback(callback_id)
|
cb = get_telegram_callback(callback_id)
|
||||||
if not cb:
|
if not cb:
|
||||||
return None
|
return None
|
||||||
@@ -169,6 +172,30 @@ async def _handle_insta_issue(callback_query: dict, callback_id: str) -> dict:
|
|||||||
return {"ok": False, "error": str(e)}
|
return {"ok": False, "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_ytpub_retry(callback_query: dict, callback_id: str) -> dict:
|
||||||
|
"""ytpub_retry_{pipeline_id} 콜백 → music-lab pipeline retry 프록시."""
|
||||||
|
from .. import service_proxy
|
||||||
|
from .messaging import send_raw
|
||||||
|
|
||||||
|
await api_call(
|
||||||
|
"answerCallbackQuery",
|
||||||
|
{"callback_query_id": callback_query["id"], "text": "재시도 요청 중..."},
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
pid = int(callback_id.removeprefix("ytpub_retry_"))
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return {"ok": False, "error": "invalid_callback_data"}
|
||||||
|
|
||||||
|
res = await service_proxy.pipeline_retry(pid)
|
||||||
|
sc = res.get("status_code")
|
||||||
|
if sc in (200, 202):
|
||||||
|
await send_raw(text=f"🔄 파이프라인 #{pid} 재개: {res.get('retrying_step', '?')}")
|
||||||
|
else:
|
||||||
|
await send_raw(text=f"⚠️ 재개 불가 (#{pid}): {res.get('detail', sc)}")
|
||||||
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]:
|
async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]:
|
||||||
"""슬래시 명령 메시지 처리."""
|
"""슬래시 명령 메시지 처리."""
|
||||||
from .router import parse_command, resolve_agent_command, HELP_TEXT
|
from .router import parse_command, resolve_agent_command, HELP_TEXT
|
||||||
|
|||||||
@@ -18,9 +18,11 @@ from app.db import (
|
|||||||
def test_init_and_seed():
|
def test_init_and_seed():
|
||||||
init_db()
|
init_db()
|
||||||
agents = get_all_agents()
|
agents = get_all_agents()
|
||||||
assert len(agents) == 2, f"Expected 2 agents, got {len(agents)}"
|
|
||||||
ids = {a["agent_id"] for a in agents}
|
ids = {a["agent_id"] for a in agents}
|
||||||
assert ids == {"stock", "music"}, f"Unexpected agent ids: {ids}"
|
# 시드된 핵심 에이전트 존재 검증 — 레지스트리 확장(insta/lotto/realestate/youtube 등)에 견고하도록
|
||||||
|
# 고정 개수/집합이 아닌 subset으로 단언 (이전 len==2/{stock,music} 고정 단언은 stale였음).
|
||||||
|
assert {"stock", "music"} <= ids, f"core agents missing: {ids}"
|
||||||
|
assert len(agents) >= 2
|
||||||
print(" [PASS] test_init_and_seed")
|
print(" [PASS] test_init_and_seed")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,9 @@ async def test_poll_notifies_once_per_state():
|
|||||||
with patch(
|
with patch(
|
||||||
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
new=AsyncMock(return_value=pipelines),
|
new=AsyncMock(return_value=pipelines),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
), patch(
|
), patch(
|
||||||
"app.agents.youtube_publisher.send_raw",
|
"app.agents.youtube_publisher.send_raw",
|
||||||
new=AsyncMock(return_value={"ok": True, "message_id": 99}),
|
new=AsyncMock(return_value={"ok": True, "message_id": 99}),
|
||||||
@@ -63,6 +66,8 @@ async def test_poll_renotifies_on_reject_regen(monkeypatch):
|
|||||||
"track_title": "Test", "feedback_count_per_step": {"cover": 1}}]
|
"track_title": "Test", "feedback_count_per_step": {"cover": 1}}]
|
||||||
list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2])
|
list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2])
|
||||||
with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \
|
with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \
|
||||||
|
patch("app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[])), \
|
||||||
patch("app.agents.youtube_publisher.send_raw",
|
patch("app.agents.youtube_publisher.send_raw",
|
||||||
new=AsyncMock(return_value={"ok": True, "message_id": 99})), \
|
new=AsyncMock(return_value={"ok": True, "message_id": 99})), \
|
||||||
patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
|
patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
|
||||||
@@ -83,7 +88,7 @@ async def test_on_telegram_reply_approve_calls_feedback():
|
|||||||
new=AsyncMock(),
|
new=AsyncMock(),
|
||||||
) as mock_fb, patch(
|
) as mock_fb, patch(
|
||||||
"app.agents.youtube_publisher.send_raw",
|
"app.agents.youtube_publisher.send_raw",
|
||||||
new=AsyncMock(),
|
new=AsyncMock(return_value={"ok": True, "message_id": 1}),
|
||||||
):
|
):
|
||||||
a = YoutubePublisherAgent()
|
a = YoutubePublisherAgent()
|
||||||
await a.on_telegram_reply(pipeline_id=42, step="cover", user_text="승인")
|
await a.on_telegram_reply(pipeline_id=42, step="cover", user_text="승인")
|
||||||
@@ -99,7 +104,7 @@ async def test_on_telegram_reply_reject_with_feedback():
|
|||||||
new=AsyncMock(),
|
new=AsyncMock(),
|
||||||
) as mock_fb, patch(
|
) as mock_fb, patch(
|
||||||
"app.agents.youtube_publisher.send_raw",
|
"app.agents.youtube_publisher.send_raw",
|
||||||
new=AsyncMock(),
|
new=AsyncMock(return_value={"ok": True, "message_id": 1}),
|
||||||
):
|
):
|
||||||
a = YoutubePublisherAgent()
|
a = YoutubePublisherAgent()
|
||||||
await a.on_telegram_reply(pipeline_id=43, step="meta", user_text="반려, 제목 짧게")
|
await a.on_telegram_reply(pipeline_id=43, step="meta", user_text="반려, 제목 짧게")
|
||||||
|
|||||||
213
agent-office/tests/test_youtube_publisher_retry.py
Normal file
213
agent-office/tests/test_youtube_publisher_retry.py
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
_fd, _TMP = tempfile.mkstemp(suffix=".db")
|
||||||
|
os.close(_fd)
|
||||||
|
os.unlink(_TMP)
|
||||||
|
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _init_db():
|
||||||
|
import gc
|
||||||
|
gc.collect()
|
||||||
|
if os.path.exists(_TMP):
|
||||||
|
os.remove(_TMP)
|
||||||
|
from app.db import init_db
|
||||||
|
init_db()
|
||||||
|
yield
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_pipeline_notified_with_retry_button():
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
agent = YoutubePublisherAgent()
|
||||||
|
failed_pipeline = {
|
||||||
|
"id": 7,
|
||||||
|
"state": "failed",
|
||||||
|
"failed_reason": "video: boom",
|
||||||
|
"track_title": "T",
|
||||||
|
}
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[failed_pipeline]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
assert sent.await_count == 1
|
||||||
|
_, kwargs = sent.await_args
|
||||||
|
assert "실패" in (kwargs.get("text") or "")
|
||||||
|
assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"] == "ytpub_retry_7"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_pipeline_no_duplicate_notification():
|
||||||
|
"""같은 failed 파이프라인은 두 번째 poll에서 알림 안 함."""
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
agent = YoutubePublisherAgent()
|
||||||
|
failed_pipeline = {
|
||||||
|
"id": 7,
|
||||||
|
"state": "failed",
|
||||||
|
"failed_reason": "video: boom",
|
||||||
|
"track_title": "T",
|
||||||
|
}
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[failed_pipeline]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
# 중복 방지: 같은 failed 파이프라인에 대해 1회만 알림
|
||||||
|
assert sent.await_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_pipeline_renotify_after_recovery():
|
||||||
|
"""failed에서 벗어난 파이프라인이 다시 failed 되면 재알림."""
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
agent = YoutubePublisherAgent()
|
||||||
|
failed_pipeline = {
|
||||||
|
"id": 7,
|
||||||
|
"state": "failed",
|
||||||
|
"failed_reason": "video: boom",
|
||||||
|
"track_title": "T",
|
||||||
|
}
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
# 첫 번째 poll: failed 존재 → 알림
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[failed_pipeline]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
assert sent.await_count == 1
|
||||||
|
|
||||||
|
# 두 번째 poll: failed 목록에서 사라짐(재개됨) → _notified_failed에서 제거
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
assert sent.await_count == 1 # 아직 추가 알림 없음
|
||||||
|
|
||||||
|
# 세 번째 poll: 다시 failed → 재알림 가능
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[failed_pipeline]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
assert sent.await_count == 2 # 재알림
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_handle_ytpub_retry_calls_proxy():
|
||||||
|
from app import service_proxy
|
||||||
|
from app.telegram import webhook
|
||||||
|
|
||||||
|
retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"})
|
||||||
|
fake_send = AsyncMock(return_value={"ok": True})
|
||||||
|
fake_api_call = AsyncMock(return_value={"ok": True})
|
||||||
|
|
||||||
|
with patch.object(service_proxy, "pipeline_retry", retry), \
|
||||||
|
patch("app.telegram.messaging.send_raw", fake_send), \
|
||||||
|
patch("app.telegram.webhook.api_call", fake_api_call):
|
||||||
|
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7")
|
||||||
|
|
||||||
|
retry.assert_awaited_once_with(7)
|
||||||
|
assert res["ok"] is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_handle_ytpub_retry_invalid_data():
|
||||||
|
from app.telegram import webhook
|
||||||
|
|
||||||
|
fake_send = AsyncMock(return_value={"ok": True})
|
||||||
|
fake_api_call = AsyncMock(return_value={"ok": True})
|
||||||
|
|
||||||
|
with patch("app.telegram.messaging.send_raw", fake_send), \
|
||||||
|
patch("app.telegram.webhook.api_call", fake_api_call):
|
||||||
|
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_abc")
|
||||||
|
|
||||||
|
assert res["ok"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_poll_exception_is_silent():
|
||||||
|
"""list_failed_pipelines 예외 시 poll이 조용히 넘어감 (active 알림에 영향 없음)."""
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
agent = YoutubePublisherAgent()
|
||||||
|
active_pipeline = {
|
||||||
|
"id": 1,
|
||||||
|
"state": "cover_pending",
|
||||||
|
"cover_url": "/x.jpg",
|
||||||
|
"track_title": "Track",
|
||||||
|
"feedback_count_per_step": {},
|
||||||
|
}
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[active_pipeline]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(side_effect=Exception("network error")),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
|
||||||
|
new=AsyncMock(),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
|
||||||
|
# active 알림은 정상 발송
|
||||||
|
assert sent.await_count == 1
|
||||||
556
docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
Normal file
556
docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
Normal file
@@ -0,0 +1,556 @@
|
|||||||
|
# music/YouTube 파이프라인 신뢰성·복구 Implementation Plan
|
||||||
|
|
||||||
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||||
|
|
||||||
|
**Goal:** 파이프라인 step 실패를 자동 재시도(일시적, publish 제외)로 흡수하고, 영구 실패는 terminal `failed`로 둔 뒤 실패 step부터 수동 재개(텔레그램 [🔄재시도])할 수 있게 한다.
|
||||||
|
|
||||||
|
**Architecture:** music-lab `orchestrator.run_step`에 bounded 재시도 루프 + `POST /pipeline/{id}/retry` 재개 엔드포인트 + `db.get_last_failed_step`. agent-office `youtube_publisher`가 `failed` 감지 → 텔레그램 알림+버튼, `webhook`이 `ytpub_retry_{pid}` 콜백을 music-lab retry로 프록시.
|
||||||
|
|
||||||
|
**Tech Stack:** Python 3.12 / FastAPI / SQLite / asyncio / pytest. 기존 패턴: `orchestrator.run_step`(BackgroundTask), `main.py` pipeline 엔드포인트(404/409 + `_db_module`), `service_proxy`(httpx + `MUSIC_LAB_URL`), `telegram/webhook.py`(callback prefix 디스패치).
|
||||||
|
|
||||||
|
**Spec:** `docs/superpowers/specs/2026-06-12-music-pipeline-reliability-design.md`
|
||||||
|
|
||||||
|
> **테스트 fixture 주의**: music-lab/agent-office 각 `tests/conftest.py`의 DB 격리 방식(`db.DB_PATH` monkeypatch + `init_db`)을 먼저 확인하고 아래 테스트의 fixture를 그 관례에 맞춰라. 아래 코드는 `db.DB_PATH`를 tmp로 monkeypatch하는 표준 패턴을 가정한다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## File Structure
|
||||||
|
|
||||||
|
| 파일 | 변경 | 책임 |
|
||||||
|
|------|------|------|
|
||||||
|
| `music-lab/app/db.py` | Modify | `get_last_failed_step(pid)` 추가 |
|
||||||
|
| `music-lab/app/pipeline/orchestrator.py` | Modify | `_dispatch_step` 추출 + `run_step` 재시도 루프 |
|
||||||
|
| `music-lab/app/main.py` | Modify | `POST /api/music/pipeline/{pid}/retry` |
|
||||||
|
| `music-lab/tests/test_pipeline_retry.py` | Create | db + orchestrator + endpoint 테스트 |
|
||||||
|
| `agent-office/app/service_proxy.py` | Modify | `pipeline_retry(pid)`, `list_failed_pipelines()` |
|
||||||
|
| `agent-office/app/agents/youtube_publisher.py` | Modify | `failed` 감지 → 텔레그램 알림+버튼 |
|
||||||
|
| `agent-office/app/telegram/webhook.py` | Modify | `ytpub_retry_` 디스패치 |
|
||||||
|
| `agent-office/tests/test_youtube_publisher_retry.py` | Create | 알림 + 콜백 테스트 |
|
||||||
|
| `web-backend/CLAUDE.md` + `memory/service_music.md` | Modify | API 표 + 메모리 |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 1: music-lab db — `get_last_failed_step`
|
||||||
|
|
||||||
|
**Files:** Modify `music-lab/app/db.py`; Test `music-lab/tests/test_pipeline_retry.py` (Create)
|
||||||
|
|
||||||
|
- [ ] **Step 1: 실패 테스트 작성**
|
||||||
|
|
||||||
|
`music-lab/tests/test_pipeline_retry.py` (fixture는 music-lab conftest 관례에 맞춰 조정):
|
||||||
|
```python
|
||||||
|
import pytest
|
||||||
|
from app import db
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _tmp_db(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setattr(db, "DB_PATH", str(tmp_path / "music.db"))
|
||||||
|
db.init_db()
|
||||||
|
|
||||||
|
|
||||||
|
def _make_pipeline_with_failed_step(step: str) -> int:
|
||||||
|
pid = db.create_pipeline(track_id=1) # 시그니처는 conftest/db 확인 후 맞출 것
|
||||||
|
job = db.create_pipeline_job(pid, step)
|
||||||
|
db.update_pipeline_job(job, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason=f"{step}: boom")
|
||||||
|
return pid
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_last_failed_step_returns_step():
|
||||||
|
pid = _make_pipeline_with_failed_step("video")
|
||||||
|
assert db.get_last_failed_step(pid) == "video"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_last_failed_step_none_when_no_failure():
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
db.create_pipeline_job(pid, "cover") # status 기본(running/succeeded), failed 아님
|
||||||
|
assert db.get_last_failed_step(pid) is None
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: 실패 확인**
|
||||||
|
|
||||||
|
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py::test_get_last_failed_step_returns_step -v`
|
||||||
|
Expected: FAIL — `db.get_last_failed_step` 미존재. (create_pipeline 시그니처가 다르면 helper를 db의 실제 생성 함수에 맞춰 수정.)
|
||||||
|
|
||||||
|
- [ ] **Step 3: 구현**
|
||||||
|
|
||||||
|
`music-lab/app/db.py`의 pipeline_jobs 섹션(`list_pipeline_jobs` 근처)에 추가:
|
||||||
|
```python
|
||||||
|
def get_last_failed_step(pid: int) -> Optional[str]:
|
||||||
|
"""파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None."""
|
||||||
|
with _connect() as conn: # music-lab의 커넥션 헬퍼 이름에 맞출 것
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT step FROM pipeline_jobs "
|
||||||
|
"WHERE pipeline_id = ? AND status = 'failed' "
|
||||||
|
"ORDER BY id DESC LIMIT 1",
|
||||||
|
(pid,),
|
||||||
|
).fetchone()
|
||||||
|
return row["step"] if row else None
|
||||||
|
```
|
||||||
|
(`_connect`/`_conn` 등 실제 커넥션 컨텍스트매니저 이름은 db.py 상단 확인 후 일치시킬 것.)
|
||||||
|
|
||||||
|
- [ ] **Step 4: 통과 확인**
|
||||||
|
|
||||||
|
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k get_last_failed`
|
||||||
|
Expected: 2 PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 5: 커밋**
|
||||||
|
```bash
|
||||||
|
git add music-lab/app/db.py music-lab/tests/test_pipeline_retry.py
|
||||||
|
git commit -m "feat(music-lab): get_last_failed_step — 파이프라인 재개용 실패 step 판별
|
||||||
|
|
||||||
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 2: orchestrator 자동 재시도
|
||||||
|
|
||||||
|
**Files:** Modify `music-lab/app/pipeline/orchestrator.py`; Test `music-lab/tests/test_pipeline_retry.py`
|
||||||
|
|
||||||
|
- [ ] **Step 1: 실패 테스트 작성** (test_pipeline_retry.py에 추가)
|
||||||
|
|
||||||
|
```python
|
||||||
|
import asyncio
|
||||||
|
from app.pipeline import orchestrator
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _no_backoff(monkeypatch):
|
||||||
|
monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retryable_step_retries_then_succeeds(monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
calls = {"n": 0}
|
||||||
|
|
||||||
|
async def flaky(step, p, ctx, feedback):
|
||||||
|
calls["n"] += 1
|
||||||
|
if calls["n"] < 3:
|
||||||
|
raise RuntimeError("transient")
|
||||||
|
return {"next_state": "video_pending", "fields": {}}
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "_dispatch_step", flaky)
|
||||||
|
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
|
||||||
|
|
||||||
|
await orchestrator.run_step(pid, "cover")
|
||||||
|
assert calls["n"] == 3
|
||||||
|
assert db.get_pipeline(pid)["state"] == "video_pending"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retryable_step_exhausts_to_failed(monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
|
||||||
|
async def always_fail(step, p, ctx, feedback):
|
||||||
|
raise RuntimeError("permanent")
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail)
|
||||||
|
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
|
||||||
|
|
||||||
|
await orchestrator.run_step(pid, "cover")
|
||||||
|
assert db.get_pipeline(pid)["state"] == "failed"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_not_retried(monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
calls = {"n": 0}
|
||||||
|
|
||||||
|
async def fail_publish(step, p, ctx, feedback):
|
||||||
|
calls["n"] += 1
|
||||||
|
raise RuntimeError("upload error")
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish)
|
||||||
|
monkeypatch.setattr(orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0})
|
||||||
|
|
||||||
|
await orchestrator.run_step(pid, "publish")
|
||||||
|
assert calls["n"] == 1 # 재시도 없음
|
||||||
|
assert db.get_pipeline(pid)["state"] == "failed"
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: 실패 확인**
|
||||||
|
|
||||||
|
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"`
|
||||||
|
Expected: FAIL — `_dispatch_step`/`STEP_RETRY_BACKOFF_SEC` 미존재.
|
||||||
|
|
||||||
|
- [ ] **Step 3: 구현 — `_dispatch_step` 추출 + 재시도 루프**
|
||||||
|
|
||||||
|
`orchestrator.py` 상단 상수 추가:
|
||||||
|
```python
|
||||||
|
STEP_MAX_RETRIES = 2 # 추가 재시도 횟수 (총 시도 = +1)
|
||||||
|
STEP_RETRY_BACKOFF_SEC = [5, 15]
|
||||||
|
NON_RETRY_STEPS = {"publish"}
|
||||||
|
```
|
||||||
|
|
||||||
|
기존 if/elif 분기(현재 `run_step` 내 lines 32-45)를 헬퍼로 추출:
|
||||||
|
```python
|
||||||
|
async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict:
|
||||||
|
if step == "cover":
|
||||||
|
return await _run_cover(p, ctx, feedback)
|
||||||
|
if step == "video":
|
||||||
|
return await _run_video(p, ctx)
|
||||||
|
if step == "thumb":
|
||||||
|
return await _run_thumb(p, ctx, feedback)
|
||||||
|
if step == "meta":
|
||||||
|
return await _run_meta(p, ctx, feedback)
|
||||||
|
if step == "review":
|
||||||
|
return await _run_review(p, ctx)
|
||||||
|
if step == "publish":
|
||||||
|
return await _run_publish(p, ctx)
|
||||||
|
raise ValueError(f"unknown step: {step}")
|
||||||
|
```
|
||||||
|
|
||||||
|
`run_step`의 try 블록(step 실행부)을 재시도 루프로 교체:
|
||||||
|
```python
|
||||||
|
try:
|
||||||
|
ctx = _resolve_input(p)
|
||||||
|
except ValueError as e:
|
||||||
|
db.update_pipeline_job(job_id, status="failed", error=str(e))
|
||||||
|
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
|
||||||
|
last_err = None
|
||||||
|
for i in range(attempts):
|
||||||
|
try:
|
||||||
|
result = await _dispatch_step(step, p, ctx, feedback)
|
||||||
|
db.update_pipeline_job(job_id, status="succeeded")
|
||||||
|
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
last_err = e
|
||||||
|
logger.exception("step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts)
|
||||||
|
if i < attempts - 1:
|
||||||
|
await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)])
|
||||||
|
db.update_pipeline_job(job_id, status="failed", error=str(last_err))
|
||||||
|
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")
|
||||||
|
```
|
||||||
|
(`asyncio`는 이미 import됨.)
|
||||||
|
|
||||||
|
- [ ] **Step 4: 통과 확인**
|
||||||
|
|
||||||
|
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k "retry or publish_not"`
|
||||||
|
Expected: 3 PASS.
|
||||||
|
|
||||||
|
- [ ] **Step 5: 커밋**
|
||||||
|
```bash
|
||||||
|
git add music-lab/app/pipeline/orchestrator.py music-lab/tests/test_pipeline_retry.py
|
||||||
|
git commit -m "feat(music-lab): orchestrator step 자동 재시도 (publish 제외)
|
||||||
|
|
||||||
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 3: retry 엔드포인트
|
||||||
|
|
||||||
|
**Files:** Modify `music-lab/app/main.py`; Test `music-lab/tests/test_pipeline_retry.py`
|
||||||
|
|
||||||
|
- [ ] **Step 1: 실패 테스트 작성**
|
||||||
|
|
||||||
|
```python
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client(monkeypatch):
|
||||||
|
from app.main import app
|
||||||
|
return TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_failed_pipeline_retriggers(client, monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "video")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||||
|
|
||||||
|
called = {}
|
||||||
|
from app.pipeline import orchestrator
|
||||||
|
async def fake_run(p, step, *a):
|
||||||
|
called["pid"], called["step"] = p, step
|
||||||
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
||||||
|
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code in (200, 202)
|
||||||
|
assert r.json()["retrying_step"] == "video"
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_non_failed_409(client):
|
||||||
|
pid = db.create_pipeline(track_id=1) # state='created'
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code == 409
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_publish_with_video_id_rejected(client):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "publish")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="x")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="publish: x", youtube_video_id="abc123")
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code == 409
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: 실패 확인**
|
||||||
|
|
||||||
|
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v -k retry_`
|
||||||
|
Expected: FAIL — 라우트 404.
|
||||||
|
|
||||||
|
- [ ] **Step 3: 구현**
|
||||||
|
|
||||||
|
`music-lab/app/main.py`의 `cancel_pipeline` 아래에 추가:
|
||||||
|
```python
|
||||||
|
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
|
||||||
|
async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
||||||
|
p = _db_module.get_pipeline(pid)
|
||||||
|
if not p:
|
||||||
|
raise HTTPException(404)
|
||||||
|
if p["state"] != "failed":
|
||||||
|
raise HTTPException(409, f"재개 불가 (state={p['state']})")
|
||||||
|
failed_step = _db_module.get_last_failed_step(pid)
|
||||||
|
if not failed_step:
|
||||||
|
# 폴백: failed_reason "{step}: ..." prefix
|
||||||
|
reason = p.get("failed_reason") or ""
|
||||||
|
failed_step = reason.split(":", 1)[0].strip() or None
|
||||||
|
if not failed_step:
|
||||||
|
raise HTTPException(409, "실패 step을 판별할 수 없음")
|
||||||
|
if failed_step == "publish" and p.get("youtube_video_id"):
|
||||||
|
raise HTTPException(409, "이미 업로드됨 (중복 방지)")
|
||||||
|
bg.add_task(orchestrator.run_step, pid, failed_step)
|
||||||
|
return {"ok": True, "retrying_step": failed_step}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: 통과 확인 + 전체 회귀**
|
||||||
|
|
||||||
|
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/test_pipeline_retry.py -v` → 모두 PASS
|
||||||
|
Run: `cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q` → 회귀 0
|
||||||
|
|
||||||
|
- [ ] **Step 5: 커밋**
|
||||||
|
```bash
|
||||||
|
git add music-lab/app/main.py music-lab/tests/test_pipeline_retry.py
|
||||||
|
git commit -m "feat(music-lab): POST /pipeline/{id}/retry — 실패 step 수동 재개
|
||||||
|
|
||||||
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 4: agent-office service_proxy — pipeline_retry + list_failed
|
||||||
|
|
||||||
|
**Files:** Modify `agent-office/app/service_proxy.py`
|
||||||
|
|
||||||
|
> **먼저 확인**: `list_active_pipelines`가 호출하는 `GET /api/music/pipeline?status=active`가 failed를 포함하는지. 미포함이면 music-lab의 pipeline list 엔드포인트가 `status=failed`도 지원하는지 확인하고, 없으면 그 엔드포인트에 failed 필터를 추가(별도 작은 수정)하거나 `status` 화이트리스트에 'failed' 추가.
|
||||||
|
|
||||||
|
- [ ] **Step 1: 헬퍼 추가** — 기존 `list_active_pipelines`/`post_pipeline_feedback` 패턴(async with httpx.AsyncClient + MUSIC_LAB_URL) 그대로:
|
||||||
|
```python
|
||||||
|
async def list_failed_pipelines() -> list[dict]:
|
||||||
|
async with httpx.AsyncClient(timeout=10) as client:
|
||||||
|
resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=failed")
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
return data if isinstance(data, list) else data.get("items", data.get("pipelines", []))
|
||||||
|
|
||||||
|
|
||||||
|
async def pipeline_retry(pid: int) -> dict:
|
||||||
|
async with httpx.AsyncClient(timeout=15) as client:
|
||||||
|
resp = await client.post(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry")
|
||||||
|
# 409(재개 불가/중복)도 본문 반환 위해 raise 안 함
|
||||||
|
return {"status_code": resp.status_code, **(resp.json() if resp.headers.get("content-type","").startswith("application/json") else {})}
|
||||||
|
```
|
||||||
|
(`list_active_pipelines`가 이미 failed를 포함하면 `list_failed_pipelines`는 생략하고 Task 5에서 active 목록에서 state=='failed' 필터.)
|
||||||
|
|
||||||
|
- [ ] **Step 2: import sanity** — `cd agent-office && PYTHONPATH=.. python -c "from app import service_proxy; print('OK')"` → OK
|
||||||
|
|
||||||
|
- [ ] **Step 3: 커밋**
|
||||||
|
```bash
|
||||||
|
git add agent-office/app/service_proxy.py
|
||||||
|
git commit -m "feat(agent-office): service_proxy pipeline_retry + list_failed_pipelines
|
||||||
|
|
||||||
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 5: youtube_publisher — failed 감지 + 텔레그램 알림/버튼
|
||||||
|
|
||||||
|
**Files:** Modify `agent-office/app/agents/youtube_publisher.py`; Test `agent-office/tests/test_youtube_publisher_retry.py` (Create)
|
||||||
|
|
||||||
|
- [ ] **Step 1: 실패 테스트 작성**
|
||||||
|
|
||||||
|
`agent-office/tests/test_youtube_publisher_retry.py` (DB fixture는 agent-office conftest 관례 따름):
|
||||||
|
```python
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import AsyncMock
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_pipeline_notified_with_retry_button(monkeypatch):
|
||||||
|
agent = YoutubePublisherAgent()
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
AsyncMock(return_value=[
|
||||||
|
{"id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T"}
|
||||||
|
]),
|
||||||
|
)
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
monkeypatch.setattr("app.agents.youtube_publisher.send_raw", sent)
|
||||||
|
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
assert sent.await_count == 1
|
||||||
|
args, kwargs = sent.await_args
|
||||||
|
text = kwargs.get("text") or (args[0] if args else "")
|
||||||
|
assert "실패" in text
|
||||||
|
# 인라인 retry 버튼 callback_data
|
||||||
|
rm = kwargs.get("reply_markup") or {}
|
||||||
|
cb = rm["inline_keyboard"][0][0]["callback_data"]
|
||||||
|
assert cb == "ytpub_retry_7"
|
||||||
|
|
||||||
|
# 중복 방지: 같은 failed 재폴링 시 미발송
|
||||||
|
await agent.poll_state_changes()
|
||||||
|
assert sent.await_count == 1
|
||||||
|
```
|
||||||
|
(주의: `send_raw`가 `reply_markup`을 지원하는지 messaging 확인 — 미지원 시 Task에 messaging.send_raw에 reply_markup 인자 추가 포함. insta는 send_photo로 했으나 여기선 텍스트+버튼이므로 send_raw에 reply_markup 필요.)
|
||||||
|
|
||||||
|
- [ ] **Step 2: 실패 확인** — `cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → FAIL (failed 미처리)
|
||||||
|
|
||||||
|
- [ ] **Step 3: 구현** — `poll_state_changes`에 failed 분기 추가:
|
||||||
|
```python
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
||||||
|
self._notified_failed: set[int] = set()
|
||||||
|
```
|
||||||
|
`poll_state_changes` 루프 내, `*_pending` 처리 뒤:
|
||||||
|
```python
|
||||||
|
if state == "failed" and pid not in self._notified_failed:
|
||||||
|
await self._notify_failed(p)
|
||||||
|
self._notified_failed.add(pid)
|
||||||
|
if state != "failed":
|
||||||
|
self._notified_failed.discard(pid) # 재개 후 다시 실패하면 재알림
|
||||||
|
```
|
||||||
|
새 메서드:
|
||||||
|
```python
|
||||||
|
async def _notify_failed(self, p: dict) -> None:
|
||||||
|
reason = p.get("failed_reason") or "?"
|
||||||
|
step = reason.split(":", 1)[0].strip()
|
||||||
|
title = p.get("track_title") or f"Pipeline #{p['id']}"
|
||||||
|
text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}"
|
||||||
|
kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]}
|
||||||
|
await send_raw(text=text, reply_markup=kb)
|
||||||
|
add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning")
|
||||||
|
```
|
||||||
|
`send_raw`가 `reply_markup`을 받도록 `agent-office/app/telegram/messaging.py`의 `send_raw` 시그니처 확인/확장(이미 지원하면 그대로).
|
||||||
|
|
||||||
|
- [ ] **Step 4: 통과 확인** — `cd agent-office && PYTHONPATH=.. python -m pytest tests/test_youtube_publisher_retry.py -v` → PASS + 전체 회귀
|
||||||
|
|
||||||
|
- [ ] **Step 5: 커밋**
|
||||||
|
```bash
|
||||||
|
git add agent-office/app/agents/youtube_publisher.py agent-office/app/telegram/messaging.py agent-office/tests/test_youtube_publisher_retry.py
|
||||||
|
git commit -m "feat(agent-office): youtube_publisher 파이프라인 실패 텔레그램 알림+재시도 버튼
|
||||||
|
|
||||||
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 6: webhook ytpub_retry 디스패치
|
||||||
|
|
||||||
|
**Files:** Modify `agent-office/app/telegram/webhook.py`; Test `agent-office/tests/test_youtube_publisher_retry.py`
|
||||||
|
|
||||||
|
> **먼저 확인**: `_handle_callback`의 prefix 분기 구조 + 기존 핸들러(`_handle_insta_issue` 등)가 service_proxy를 호출/회신하는 패턴.
|
||||||
|
|
||||||
|
- [ ] **Step 1: 실패 테스트 추가**
|
||||||
|
```python
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_handle_ytpub_retry_calls_proxy(monkeypatch):
|
||||||
|
from app.telegram import webhook
|
||||||
|
retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"})
|
||||||
|
monkeypatch.setattr("app.telegram.webhook.service_proxy.pipeline_retry", retry, raising=False)
|
||||||
|
monkeypatch.setattr("app.telegram.webhook.send_raw", AsyncMock(), raising=False)
|
||||||
|
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7")
|
||||||
|
retry.assert_awaited_once_with(7)
|
||||||
|
```
|
||||||
|
(import 경로/`send_raw` 위치는 webhook.py 실제에 맞춤.)
|
||||||
|
|
||||||
|
- [ ] **Step 2: 실패 확인** → FAIL (`_handle_ytpub_retry` 미존재)
|
||||||
|
|
||||||
|
- [ ] **Step 3: 구현** — `_handle_callback`에 분기:
|
||||||
|
```python
|
||||||
|
if callback_id.startswith("ytpub_retry_"):
|
||||||
|
return await _handle_ytpub_retry(callback_query, callback_id)
|
||||||
|
```
|
||||||
|
핸들러:
|
||||||
|
```python
|
||||||
|
async def _handle_ytpub_retry(callback_query: dict, callback_id: str) -> dict:
|
||||||
|
try:
|
||||||
|
pid = int(callback_id.removeprefix("ytpub_retry_"))
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return {"ok": False, "error": "invalid_callback_data"}
|
||||||
|
res = await service_proxy.pipeline_retry(pid)
|
||||||
|
sc = res.get("status_code")
|
||||||
|
if sc in (200, 202):
|
||||||
|
await send_raw(text=f"🔄 파이프라인 #{pid} 재개: {res.get('retrying_step','?')}")
|
||||||
|
else:
|
||||||
|
await send_raw(text=f"⚠️ 재개 불가 (#{pid}): {res.get('detail', sc)}")
|
||||||
|
return {"ok": True}
|
||||||
|
```
|
||||||
|
(`service_proxy`/`send_raw` import는 webhook.py 기존 방식 따름.)
|
||||||
|
|
||||||
|
- [ ] **Step 4: 통과 확인** + 전체 agent-office 회귀
|
||||||
|
|
||||||
|
- [ ] **Step 5: 커밋**
|
||||||
|
```bash
|
||||||
|
git add agent-office/app/telegram/webhook.py agent-office/tests/test_youtube_publisher_retry.py
|
||||||
|
git commit -m "feat(agent-office): ytpub_retry 텔레그램 콜백 → music-lab retry 프록시
|
||||||
|
|
||||||
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 7: 문서 + 배포 + 메모리
|
||||||
|
|
||||||
|
**Files:** Modify `web-backend/CLAUDE.md`, `memory/service_music.md`
|
||||||
|
|
||||||
|
- [ ] **Step 1: CLAUDE.md music API 표에 추가**
|
||||||
|
```
|
||||||
|
| POST | `/api/music/pipeline/{id}/retry` | 실패 파이프라인 실패 step부터 재개 (publish+업로드완료 시 409) |
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: 전체 회귀**
|
||||||
|
```bash
|
||||||
|
cd music-lab && PYTHONPATH=.. python -m pytest tests/ -q
|
||||||
|
cd ../agent-office && PYTHONPATH=.. python -m pytest tests/ -q
|
||||||
|
```
|
||||||
|
Expected: 모두 PASS (사전존재 stale 제외).
|
||||||
|
|
||||||
|
- [ ] **Step 3: 커밋 + push (NAS 배포)**
|
||||||
|
```bash
|
||||||
|
cd C:/Users/jaeoh/Desktop/workspace/web-backend
|
||||||
|
git add CLAUDE.md docs/superpowers/plans/2026-06-12-music-pipeline-reliability.md
|
||||||
|
git commit -m "docs(music): 파이프라인 retry API 문서 + 구현 계획
|
||||||
|
|
||||||
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
||||||
|
git push origin main
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: 메모리 갱신** — `service_music.md`에 신뢰성/복구(자동 재시도 publish 제외 + 수동 retry 엔드포인트 + youtube_publisher 실패 알림) 추가.
|
||||||
|
|
||||||
|
- [ ] **Step 5: 프로덕션 확인(경량)** — 배포 후 `POST /api/music/pipeline/<없는id>/retry` → 404, 실제 failed 파이프라인 있으면 retry 동작. (없으면 단위 테스트로 갈음.)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Self-Review
|
||||||
|
|
||||||
|
**Spec coverage:**
|
||||||
|
- 자동 재시도(publish 제외, _resolve_input 제외) → Task 2 ✓
|
||||||
|
- 수동 재개(실패 step, publish+video_id 가드) → Task 1(step 판별)+Task 3 ✓
|
||||||
|
- 실패 알림 + [🔄재시도] → Task 5 ✓
|
||||||
|
- 재시도 콜백 → Task 4(proxy)+Task 6(dispatch) ✓
|
||||||
|
- stuck 감지 제외(YAGNI) → 계획에 없음 ✓
|
||||||
|
|
||||||
|
**Placeholder scan:** 코드 스텝 모두 구체. "conftest 관례 확인"·"list_active가 failed 포함하는지 확인"은 기존 코드 소유를 존중하는 의도적 검증 지시(placeholder 아님).
|
||||||
|
|
||||||
|
**Type consistency:** `get_last_failed_step(pid)` Task1↔Task3 일치. `_dispatch_step(step,p,ctx,feedback)` Task2 정의↔테스트 mock 일치. `run_step(pid, step)` 시그니처 기존 일치. callback `ytpub_retry_{pid}` Task5 생성↔Task6 파싱 일치. `pipeline_retry(pid)` Task4↔Task6 일치. retry 응답 `retrying_step`/`status_code` Task3↔Task4↔Task6 일치.
|
||||||
@@ -0,0 +1,105 @@
|
|||||||
|
# music/YouTube 파이프라인 신뢰성·복구 — 설계
|
||||||
|
|
||||||
|
> 작성 2026-06-12. YouTube 자동화 파이프라인의 step 실패를 자동 재시도(일시적)하고, 영구 실패는 실패 step부터 수동 재개(텔레그램 [🔄재시도])할 수 있게 한다. "music/YouTube 파이프라인 고도화" 중 **신뢰성/복구** 슬라이스.
|
||||||
|
|
||||||
|
## 1. 목표
|
||||||
|
|
||||||
|
파이프라인 step(`cover→video→thumb→meta→review→publish`) 실패가 ① 일시적이면 자동 재시도로 흡수하고, ② 영구적이면 terminal `failed`로 둔 뒤 **이전 산출물을 보존한 채 실패 step부터 재개**할 수 있게 한다. 현재는 step 한 번 실패하면 전체 파이프라인이 terminal `failed`가 되고 복구 경로가 없어 처음부터 다시 만들어야 한다.
|
||||||
|
|
||||||
|
## 2. 배경 (현재 동작)
|
||||||
|
|
||||||
|
- `orchestrator.run_step(pipeline_id, step, feedback)`: `pipeline_jobs` row 생성 → step 실행 → 성공 시 `update_pipeline_state(next_state)`, 예외 시 `pipeline_jobs.status='failed'` + 파이프라인 `state='failed'` + `failed_reason="{step}: {e}"`. **재시도/재개 없음.**
|
||||||
|
- 항상 `bg.add_task(orchestrator.run_step, pid, step, ...)`로 BackgroundTask 호출(start_pipeline→cover, feedback→next_step, publish_pipeline→publish).
|
||||||
|
- 이전 step 산출물(`cover_url`/`video_url`/`thumbnail_url`/`metadata_json`/`review_json`)은 파이프라인 row에 **보존**됨 → 실패 step만 재실행하면 이어갈 수 있는 구조.
|
||||||
|
- `state_machine`: STEPS, `_APPROVE_NEXT`, TERMINAL_STATES={published, cancelled, **failed**, awaiting_manual}.
|
||||||
|
- `agent-office youtube_publisher.poll_state_changes`: `*_pending` 신규 진입만 텔레그램 알림. **`failed`는 무알림(silent)** — 사용자가 실패를 모름.
|
||||||
|
|
||||||
|
## 3. 요구사항 (확정)
|
||||||
|
|
||||||
|
- **자동 재시도**: step 실행 실패 시 `STEP_MAX_RETRIES`(기본 2 → 총 3회)까지 backoff 재시도. 소진 후 terminal `failed`.
|
||||||
|
- `_resolve_input` 에러(입력/설정)는 재시도 안 함(재시도해도 안 고쳐짐).
|
||||||
|
- **`publish` step은 자동 재시도 제외** — youtube 업로드는 비멱등(중복 업로드 위험). 1회 시도 후 실패면 즉시 terminal.
|
||||||
|
- 재시도 대상 = `cover/video/thumb/meta/review`.
|
||||||
|
- **수동 재개**: terminal `failed` 파이프라인을 실패 step부터 재실행. 이전 산출물 보존.
|
||||||
|
- publish 재개 가드: `youtube_video_id`가 이미 있으면 재개 거부(원 업로드 성공 가능성 → 중복 방지).
|
||||||
|
- **실패 알림**: 영구 실패 시 텔레그램 알림 + 인라인 `[🔄재시도]` 버튼(현재 silent 갭 해소).
|
||||||
|
- **범위 밖(YAGNI)**: stuck 감지(*_running hang / *_pending 방치). 수동 재시도로 복구 가능하므로 이번 슬라이스 제외.
|
||||||
|
|
||||||
|
## 4. 아키텍처
|
||||||
|
|
||||||
|
3 컴포넌트:
|
||||||
|
```
|
||||||
|
[music-lab orchestrator] run_step: step 실행을 재시도 루프로 (publish 제외) → 소진 시 failed
|
||||||
|
[music-lab API] POST /api/music/pipeline/{id}/retry → 실패 step부터 run_step 재트리거
|
||||||
|
[agent-office] youtube_publisher: failed 감지 → 텔레그램 알림+[🔄재시도]
|
||||||
|
webhook: ytpub_retry_{pid} → service_proxy.pipeline_retry → music-lab retry
|
||||||
|
```
|
||||||
|
|
||||||
|
## 5. music-lab 상세
|
||||||
|
|
||||||
|
### 5.1 자동 재시도 (`pipeline/orchestrator.py`)
|
||||||
|
- 상수: `STEP_MAX_RETRIES = 2`, `STEP_RETRY_BACKOFF_SEC = [5, 15]`(시도 간 대기), `NON_RETRY_STEPS = {"publish"}`.
|
||||||
|
- `run_step`의 step 실행부(현재 try lines 31-47)를 루프로:
|
||||||
|
```
|
||||||
|
attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
|
||||||
|
for i in range(attempts):
|
||||||
|
try:
|
||||||
|
result = await _dispatch_step(step, p, ctx, feedback)
|
||||||
|
update_pipeline_job(job_id, status="succeeded")
|
||||||
|
update_pipeline_state(pipeline_id, result["next_state"], **fields)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
last = e
|
||||||
|
if i < attempts - 1:
|
||||||
|
add_log/pipeline_job note "retry {i+1}"
|
||||||
|
await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len-1)])
|
||||||
|
# 소진
|
||||||
|
update_pipeline_job(job_id, status="failed", error=str(last))
|
||||||
|
update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last}")
|
||||||
|
```
|
||||||
|
- `_resolve_input` 실패는 루프 진입 전 early-return(현행 유지, 재시도 X).
|
||||||
|
- 재시도 시도 가시화: `pipeline_jobs`에 attempt별 기록(또는 error 메시지에 "attempt n/N").
|
||||||
|
|
||||||
|
### 5.2 resume 엔드포인트 (`main.py`)
|
||||||
|
- `POST /api/music/pipeline/{id}/retry`:
|
||||||
|
- 파이프라인 조회 없으면 404.
|
||||||
|
- `state != "failed"` → 409 "재개 불가 (state=...)".
|
||||||
|
- 실패 step 판별: `db.get_last_failed_step(pipeline_id)` (pipeline_jobs에서 status='failed' 최신 step). 없으면 `failed_reason.split(":")[0].strip()` 폴백.
|
||||||
|
- 실패 step이 `publish`이고 `youtube_video_id`가 이미 있으면 → 409 "이미 업로드됨 (중복 방지)".
|
||||||
|
- `bg.add_task(orchestrator.run_step, pid, failed_step)` 재트리거. 반환 `{ok: true, retrying_step}`.
|
||||||
|
- `db.get_last_failed_step(pipeline_id) -> str | None` 헬퍼 신규.
|
||||||
|
|
||||||
|
## 6. agent-office 상세
|
||||||
|
|
||||||
|
### 6.1 실패 알림 (`agents/youtube_publisher.py`)
|
||||||
|
- `poll_state_changes`: `_STEP_TITLES`(*_pending) 처리 후, `state == "failed"` 인 파이프라인도 검사.
|
||||||
|
- 신규 failed(중복 방지: `self._notified_failed: set[int]`, 또는 기존 dict에 ('failed', reason_hash))면 텔레그램 발송:
|
||||||
|
`⚠️ [{track_title}] 파이프라인 #{id} '{step}' 실패\n사유: {failed_reason}` + 인라인 `[🔄 재시도]` (callback_data `ytpub_retry_{id}`).
|
||||||
|
- 발송 후 notified 기록.
|
||||||
|
- `service_proxy.list_active_pipelines()`가 failed를 포함하는지 확인 — 미포함이면 failed도 반환하도록 보강(또는 별도 조회). (plan에서 확인.)
|
||||||
|
|
||||||
|
### 6.2 재시도 콜백 (`telegram/webhook.py`)
|
||||||
|
- `_handle_callback`에 `callback_id.startswith("ytpub_retry_")` 분기 → `_handle_ytpub_retry`.
|
||||||
|
- `_handle_ytpub_retry`: `pid = int(callback_id.removeprefix("ytpub_retry_"))` → `service_proxy.pipeline_retry(pid)` → 결과 텔레그램 회신("재개: {step}" / 거부 사유).
|
||||||
|
- `service_proxy.pipeline_retry(pid)` 신규: `POST {MUSIC_LAB_URL}/api/music/pipeline/{pid}/retry`.
|
||||||
|
|
||||||
|
## 7. 에러 처리 / 엣지
|
||||||
|
|
||||||
|
- 재시도 backoff 중 컨테이너 재시작 → 해당 step 작업 유실, 파이프라인 비-terminal stuck. 범위 밖이나 수동 [🔄재시도]로 복구 가능(안전망).
|
||||||
|
- resume 시 state≠failed → 409(중복 재개·동시성 방지). 텔레그램 [🔄재시도] 중복 탭도 멱등 거부.
|
||||||
|
- pipeline_jobs에 failed row 없고 state만 failed → `failed_reason` prefix 폴백.
|
||||||
|
- publish 재개 + `youtube_video_id` 존재 → 409(중복 업로드 방지).
|
||||||
|
- 알림 중복: notified 기록으로 같은 failed 1회만 발송.
|
||||||
|
|
||||||
|
## 8. 테스트
|
||||||
|
|
||||||
|
- **orchestrator (재시도)**: step 2회 실패 후 성공 → next_state 도달(3시도). 끝까지 실패 → failed. publish는 1시도 후 즉시 failed(재시도 X). `_resolve_input` 실패 → 재시도 없이 failed.
|
||||||
|
- **API retry**: failed→run_step 재트리거(mock 확인) + retrying_step 반환. 비-failed→409. publish+youtube_video_id→409.
|
||||||
|
- **db**: `get_last_failed_step` — 최신 failed job step 반환, 없으면 None.
|
||||||
|
- **agent-office**: poll 신규 failed→텔레그램 발송(중복 방지). `_handle_ytpub_retry`→service_proxy.pipeline_retry 호출 + pid 파싱.
|
||||||
|
|
||||||
|
## 9. 영향받는 파일
|
||||||
|
|
||||||
|
- music-lab: `app/pipeline/orchestrator.py`(재시도 루프 + `_dispatch_step` 추출), `app/main.py`(retry 엔드포인트), `app/db.py`(`get_last_failed_step`), `tests/`.
|
||||||
|
- agent-office: `app/agents/youtube_publisher.py`(failed 알림), `app/telegram/webhook.py`(ytpub_retry 디스패치), `app/service_proxy.py`(`pipeline_retry`, 필요 시 `list_active_pipelines` failed 포함), `tests/`.
|
||||||
|
- web-backend/CLAUDE.md music API 표 + `service_music.md` 메모리 갱신.
|
||||||
@@ -1135,6 +1135,21 @@ def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]:
|
|||||||
return [_parse_pipeline_row(r) for r in rows]
|
return [_parse_pipeline_row(r) for r in rows]
|
||||||
|
|
||||||
|
|
||||||
|
def list_pipelines_by_state(state: str) -> List[Dict[str, Any]]:
|
||||||
|
"""특정 state의 파이프라인만 조회 (예: 'failed')."""
|
||||||
|
sql = """
|
||||||
|
SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
|
||||||
|
FROM video_pipelines vp
|
||||||
|
LEFT JOIN music_library ml ON ml.id = vp.track_id
|
||||||
|
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
|
||||||
|
WHERE vp.state = ?
|
||||||
|
ORDER BY vp.created_at DESC
|
||||||
|
"""
|
||||||
|
with _conn() as conn:
|
||||||
|
rows = conn.execute(sql, (state,)).fetchall()
|
||||||
|
return [_parse_pipeline_row(r) for r in rows]
|
||||||
|
|
||||||
|
|
||||||
def increment_feedback_count(pid: int, step: str) -> int:
|
def increment_feedback_count(pid: int, step: str) -> int:
|
||||||
"""원자적으로 feedback_count_per_step.<step>를 +1 한 뒤 새 값을 반환.
|
"""원자적으로 feedback_count_per_step.<step>를 +1 한 뒤 새 값을 반환.
|
||||||
|
|
||||||
@@ -1220,6 +1235,18 @@ def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]:
|
|||||||
return [dict(r) for r in rows]
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
|
||||||
|
def get_last_failed_step(pid: int) -> Optional[str]:
|
||||||
|
"""파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None."""
|
||||||
|
with _conn() as conn:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT step FROM pipeline_jobs "
|
||||||
|
"WHERE pipeline_id = ? AND status = 'failed' "
|
||||||
|
"ORDER BY id DESC LIMIT 1",
|
||||||
|
(pid,),
|
||||||
|
).fetchone()
|
||||||
|
return row["step"] if row else None
|
||||||
|
|
||||||
|
|
||||||
def get_youtube_setup() -> Dict[str, Any]:
|
def get_youtube_setup() -> Dict[str, Any]:
|
||||||
"""youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회."""
|
"""youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회."""
|
||||||
with _conn() as conn:
|
with _conn() as conn:
|
||||||
|
|||||||
@@ -1030,7 +1030,12 @@ def create_pipeline(req: PipelineCreate):
|
|||||||
|
|
||||||
@app.get("/api/music/pipeline")
|
@app.get("/api/music/pipeline")
|
||||||
def list_pipelines_endpoint(status: str = "all"):
|
def list_pipelines_endpoint(status: str = "all"):
|
||||||
pipelines = _db_module.list_pipelines(active_only=(status == "active"))
|
if status == "active":
|
||||||
|
pipelines = _db_module.list_pipelines(active_only=True)
|
||||||
|
elif status == "failed":
|
||||||
|
pipelines = _db_module.list_pipelines_by_state("failed")
|
||||||
|
else:
|
||||||
|
pipelines = _db_module.list_pipelines(active_only=False)
|
||||||
return {"pipelines": pipelines}
|
return {"pipelines": pipelines}
|
||||||
|
|
||||||
|
|
||||||
@@ -1128,6 +1133,31 @@ def cancel_pipeline(pid: int):
|
|||||||
return {"ok": True}
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
|
||||||
|
async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
||||||
|
from .pipeline.state_machine import STEPS
|
||||||
|
p = _db_module.get_pipeline(pid)
|
||||||
|
if not p:
|
||||||
|
raise HTTPException(404)
|
||||||
|
if p["state"] != "failed":
|
||||||
|
raise HTTPException(409, f"재개 불가 (state={p['state']})")
|
||||||
|
failed_step = _db_module.get_last_failed_step(pid)
|
||||||
|
if not failed_step:
|
||||||
|
reason = p.get("failed_reason") or ""
|
||||||
|
failed_step = reason.split(":", 1)[0].strip() or None
|
||||||
|
if not failed_step:
|
||||||
|
raise HTTPException(409, "실패 step을 판별할 수 없음")
|
||||||
|
# Fix 3: failed_step이 알려진 STEPS에 없으면 409
|
||||||
|
if failed_step not in STEPS:
|
||||||
|
raise HTTPException(409, "실패 step 판별 불가")
|
||||||
|
if failed_step == "publish" and p.get("youtube_video_id"):
|
||||||
|
raise HTTPException(409, "이미 업로드됨 (중복 방지)")
|
||||||
|
# Fix 1: bg.add_task 직전에 상태를 'retrying'으로 전이 → 동시 retry 409 방지
|
||||||
|
_db_module.update_pipeline_state(pid, "retrying")
|
||||||
|
bg.add_task(orchestrator.run_step, pid, failed_step)
|
||||||
|
return {"ok": True, "retrying_step": failed_step}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/music/pipeline/{pid}/publish", status_code=202)
|
@app.post("/api/music/pipeline/{pid}/publish", status_code=202)
|
||||||
async def publish_pipeline(pid: int, bg: BackgroundTasks):
|
async def publish_pipeline(pid: int, bg: BackgroundTasks):
|
||||||
p = _db_module.get_pipeline(pid)
|
p = _db_module.get_pipeline(pid)
|
||||||
|
|||||||
@@ -11,6 +11,10 @@ from .gradient import make_gradient_with_title
|
|||||||
|
|
||||||
logger = logging.getLogger("music-lab.orchestrator")
|
logger = logging.getLogger("music-lab.orchestrator")
|
||||||
|
|
||||||
|
STEP_MAX_RETRIES = 2 # 추가 재시도 (총 시도 = +1)
|
||||||
|
STEP_RETRY_BACKOFF_SEC = [5, 15]
|
||||||
|
NON_RETRY_STEPS = {"publish"}
|
||||||
|
|
||||||
|
|
||||||
async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
|
async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
|
||||||
"""단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이.
|
"""단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이.
|
||||||
@@ -28,27 +32,35 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
|
|||||||
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
|
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1)
|
||||||
if step == "cover":
|
last_err = None
|
||||||
result = await _run_cover(p, ctx, feedback)
|
for i in range(attempts):
|
||||||
elif step == "video":
|
try:
|
||||||
result = await _run_video(p, ctx)
|
result = await _dispatch_step(step, p, ctx, feedback)
|
||||||
elif step == "thumb":
|
db.update_pipeline_job(job_id, status="succeeded")
|
||||||
result = await _run_thumb(p, ctx, feedback)
|
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
|
||||||
elif step == "meta":
|
return
|
||||||
result = await _run_meta(p, ctx, feedback)
|
except Exception as e:
|
||||||
elif step == "review":
|
last_err = e
|
||||||
result = await _run_review(p, ctx)
|
logger.exception(
|
||||||
elif step == "publish":
|
"step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts
|
||||||
result = await _run_publish(p, ctx)
|
)
|
||||||
else:
|
if i < attempts - 1:
|
||||||
raise ValueError(f"unknown step: {step}")
|
backoff = STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)] if STEP_RETRY_BACKOFF_SEC else 0
|
||||||
db.update_pipeline_job(job_id, status="succeeded")
|
await asyncio.sleep(backoff)
|
||||||
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
|
db.update_pipeline_job(job_id, status="failed", error=str(last_err))
|
||||||
except Exception as e:
|
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")
|
||||||
logger.exception("step %s failed for pipeline %s", step, pipeline_id)
|
|
||||||
db.update_pipeline_job(job_id, status="failed", error=str(e))
|
|
||||||
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
|
async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict:
|
||||||
|
"""step 이름으로 실행 함수 디스패치."""
|
||||||
|
if step == "cover": return await _run_cover(p, ctx, feedback)
|
||||||
|
if step == "video": return await _run_video(p, ctx)
|
||||||
|
if step == "thumb": return await _run_thumb(p, ctx, feedback)
|
||||||
|
if step == "meta": return await _run_meta(p, ctx, feedback)
|
||||||
|
if step == "review": return await _run_review(p, ctx)
|
||||||
|
if step == "publish": return await _run_publish(p, ctx)
|
||||||
|
raise ValueError(f"unknown step: {step}")
|
||||||
|
|
||||||
|
|
||||||
def _resolve_input(p: dict) -> dict:
|
def _resolve_input(p: dict) -> dict:
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
[pytest]
|
[pytest]
|
||||||
testpaths = tests
|
testpaths = tests
|
||||||
pythonpath = .
|
pythonpath = . ..
|
||||||
asyncio_mode = auto
|
asyncio_mode = auto
|
||||||
|
|||||||
@@ -52,6 +52,19 @@ def test_list_pipelines_active_filter(client):
|
|||||||
assert all(p["state"] != "published" for p in r.json()["pipelines"])
|
assert all(p["state"] != "published" for p in r.json()["pipelines"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_pipelines_failed_filter(client):
|
||||||
|
"""status=failed 필터는 state='failed' 파이프라인만 반환한다."""
|
||||||
|
# failed 파이프라인 생성
|
||||||
|
pid_f = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
|
||||||
|
db.update_pipeline_state(pid_f, "failed", failed_reason="cover: oops")
|
||||||
|
r = client.get("/api/music/pipeline?status=failed")
|
||||||
|
assert r.status_code == 200
|
||||||
|
pipelines = r.json()["pipelines"]
|
||||||
|
assert len(pipelines) == 1
|
||||||
|
assert pipelines[0]["state"] == "failed"
|
||||||
|
assert pipelines[0]["id"] == pid_f
|
||||||
|
|
||||||
|
|
||||||
def test_feedback_reject_records_feedback_and_increments_count(client):
|
def test_feedback_reject_records_feedback_and_increments_count(client):
|
||||||
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
|
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
|
||||||
db.update_pipeline_state(pid, "cover_pending")
|
db.update_pipeline_state(pid, "cover_pending")
|
||||||
|
|||||||
174
music-lab/tests/test_pipeline_retry.py
Normal file
174
music-lab/tests/test_pipeline_retry.py
Normal file
@@ -0,0 +1,174 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from app import db
|
||||||
|
from app.pipeline import orchestrator
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fresh_db(monkeypatch, tmp_path):
|
||||||
|
db_path = tmp_path / "music.db"
|
||||||
|
monkeypatch.setattr(db, "DB_PATH", str(db_path))
|
||||||
|
db.init_db()
|
||||||
|
return db_path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _no_backoff(monkeypatch):
|
||||||
|
monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0])
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_last_failed_step_returns_step(fresh_db):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job_id = db.create_pipeline_job(pid, "video")
|
||||||
|
db.update_pipeline_job(job_id, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||||
|
assert db.get_last_failed_step(pid) == "video"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_last_failed_step_none_when_no_failure(fresh_db):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
db.create_pipeline_job(pid, "cover")
|
||||||
|
assert db.get_last_failed_step(pid) is None
|
||||||
|
|
||||||
|
|
||||||
|
async def test_retryable_step_retries_then_succeeds(fresh_db, monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
calls = {"n": 0}
|
||||||
|
|
||||||
|
async def flaky(step, p, ctx, feedback):
|
||||||
|
calls["n"] += 1
|
||||||
|
if calls["n"] < 3:
|
||||||
|
raise RuntimeError("transient")
|
||||||
|
return {"next_state": "video_pending", "fields": {}}
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "_dispatch_step", flaky)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
orchestrator, "_resolve_input",
|
||||||
|
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
|
||||||
|
)
|
||||||
|
await orchestrator.run_step(pid, "cover")
|
||||||
|
assert calls["n"] == 3
|
||||||
|
assert db.get_pipeline(pid)["state"] == "video_pending"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_retryable_step_exhausts_to_failed(fresh_db, monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
|
||||||
|
async def always_fail(step, p, ctx, feedback):
|
||||||
|
raise RuntimeError("permanent")
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
orchestrator, "_resolve_input",
|
||||||
|
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
|
||||||
|
)
|
||||||
|
await orchestrator.run_step(pid, "cover")
|
||||||
|
assert db.get_pipeline(pid)["state"] == "failed"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_publish_not_retried(fresh_db, monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
calls = {"n": 0}
|
||||||
|
|
||||||
|
async def fail_publish(step, p, ctx, feedback):
|
||||||
|
calls["n"] += 1
|
||||||
|
raise RuntimeError("upload error")
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
orchestrator, "_resolve_input",
|
||||||
|
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
|
||||||
|
)
|
||||||
|
await orchestrator.run_step(pid, "publish")
|
||||||
|
assert calls["n"] == 1
|
||||||
|
assert db.get_pipeline(pid)["state"] == "failed"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Task 3: retry endpoint tests ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client(fresh_db):
|
||||||
|
from app.main import app
|
||||||
|
return TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_failed_pipeline_retriggers(fresh_db, client, monkeypatch):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "video")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||||
|
called = {}
|
||||||
|
|
||||||
|
async def fake_run(p, step, *a):
|
||||||
|
called["pid"], called["step"] = p, step
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code in (200, 202)
|
||||||
|
assert r.json()["retrying_step"] == "video"
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_non_failed_409(fresh_db, client):
|
||||||
|
pid = db.create_pipeline(track_id=1) # state='created'
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code == 409
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_publish_with_video_id_rejected(fresh_db, client):
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "publish")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="x")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="publish: x",
|
||||||
|
youtube_video_id="abc123")
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code == 409
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fix 2: fake_run 인자 검증 ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_retry_failed_pipeline_retriggers_with_correct_args(fresh_db, client, monkeypatch):
|
||||||
|
"""fake_run이 (pid, failed_step)으로 호출되는지 검증."""
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "video")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||||
|
called = {}
|
||||||
|
|
||||||
|
async def fake_run(p, step, *a):
|
||||||
|
called["pid"], called["step"] = p, step
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code in (200, 202)
|
||||||
|
assert called["pid"] == pid
|
||||||
|
assert called["step"] == "video"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fix 1: retrying 전이로 중복 retry 409 ────────────────────────────────────
|
||||||
|
|
||||||
|
def test_retry_twice_second_is_409(fresh_db, client, monkeypatch):
|
||||||
|
"""첫 번째 retry가 상태를 'retrying'으로 전이 → 두 번째 retry는 409."""
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "video")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||||
|
|
||||||
|
async def fake_run(p, step, *a):
|
||||||
|
pass
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
||||||
|
r1 = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r1.status_code in (200, 202)
|
||||||
|
r2 = client.post(f"/api/music/pipeline/{pid}/retry") # 이미 retrying → 409
|
||||||
|
assert r2.status_code == 409
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fix 3: 알 수 없는 step prefix → 409 ─────────────────────────────────────
|
||||||
|
|
||||||
|
def test_retry_unparseable_failed_reason_409(fresh_db, client):
|
||||||
|
"""failed_reason이 known STEPS에 없는 prefix면 409."""
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
# failed job row 없이 state만 failed + 비-step prefix reason
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="ValueError: track 1 없음")
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code == 409
|
||||||
Reference in New Issue
Block a user