diff --git a/agent-office/app/agents/youtube_publisher.py b/agent-office/app/agents/youtube_publisher.py index 24ed747..ac93095 100644 --- a/agent-office/app/agents/youtube_publisher.py +++ b/agent-office/app/agents/youtube_publisher.py @@ -26,6 +26,7 @@ class YoutubePublisherAgent(BaseAgent): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._notified_state_per_pipeline: dict[int, tuple] = {} + self._notified_failed: set[int] = set() async def poll_state_changes(self) -> None: """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" @@ -48,6 +49,32 @@ class YoutubePublisherAgent(BaseAgent): await self._notify_step(p) self._notified_state_per_pipeline[pid] = key + try: + failed = await service_proxy.list_failed_pipelines() + except Exception as e: + logger.warning("failed 폴링 실패: %s", e) + failed = [] + for p in failed: + pid = p.get("id") + if pid is None: + continue + if pid not in self._notified_failed: + await self._notify_failed(p) + self._notified_failed.add(pid) + # 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제 + failed_ids = {p.get("id") for p in failed} + self._notified_failed &= failed_ids + + async def _notify_failed(self, p: dict) -> None: + reason = p.get("failed_reason") or "?" + step = reason.split(":", 1)[0].strip() + title = p.get("track_title") or f"Pipeline #{p['id']}" + text = f"⚠️ [{title}] 파이프라인 #{p['id']} '{step}' 실패\n사유: {reason}" + kb = {"inline_keyboard": [[{"text": "🔄 재시도", "callback_data": f"ytpub_retry_{p['id']}"}]]} + sent = await send_raw(text=text, reply_markup=kb) + if sent.get("ok"): + add_log(self.agent_id, f"pipeline {p['id']} 실패 알림", "warning") + async def _notify_step(self, pipeline: dict) -> None: state = pipeline["state"] title_name, step = _STEP_TITLES[state] diff --git a/agent-office/tests/test_pipeline_polling.py b/agent-office/tests/test_pipeline_polling.py index 83eafc7..3008cec 100644 --- a/agent-office/tests/test_pipeline_polling.py +++ b/agent-office/tests/test_pipeline_polling.py @@ -40,6 +40,9 @@ async def test_poll_notifies_once_per_state(): with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=pipelines), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.send_raw", new=AsyncMock(return_value={"ok": True, "message_id": 99}), @@ -63,6 +66,8 @@ async def test_poll_renotifies_on_reject_regen(monkeypatch): "track_title": "Test", "feedback_count_per_step": {"cover": 1}}] list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2]) with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \ + patch("app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[])), \ patch("app.agents.youtube_publisher.send_raw", new=AsyncMock(return_value={"ok": True, "message_id": 99})), \ patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg", @@ -83,7 +88,7 @@ async def test_on_telegram_reply_approve_calls_feedback(): new=AsyncMock(), ) as mock_fb, patch( "app.agents.youtube_publisher.send_raw", - new=AsyncMock(), + new=AsyncMock(return_value={"ok": True, "message_id": 1}), ): a = YoutubePublisherAgent() await a.on_telegram_reply(pipeline_id=42, step="cover", user_text="승인") @@ -99,7 +104,7 @@ async def test_on_telegram_reply_reject_with_feedback(): new=AsyncMock(), ) as mock_fb, patch( "app.agents.youtube_publisher.send_raw", - new=AsyncMock(), + new=AsyncMock(return_value={"ok": True, "message_id": 1}), ): a = YoutubePublisherAgent() await a.on_telegram_reply(pipeline_id=43, step="meta", user_text="반려, 제목 짧게") diff --git a/agent-office/tests/test_youtube_publisher_retry.py b/agent-office/tests/test_youtube_publisher_retry.py new file mode 100644 index 0000000..e9c005c --- /dev/null +++ b/agent-office/tests/test_youtube_publisher_retry.py @@ -0,0 +1,181 @@ +import os +import sys +import tempfile + +_fd, _TMP = tempfile.mkstemp(suffix=".db") +os.close(_fd) +os.unlink(_TMP) +os.environ["AGENT_OFFICE_DB_PATH"] = _TMP + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import pytest +from unittest.mock import AsyncMock, patch + + +@pytest.fixture(autouse=True) +def _init_db(): + import gc + gc.collect() + if os.path.exists(_TMP): + os.remove(_TMP) + from app.db import init_db + init_db() + yield + gc.collect() + + +@pytest.mark.asyncio +async def test_failed_pipeline_notified_with_retry_button(): + from app.agents.youtube_publisher import YoutubePublisherAgent + + agent = YoutubePublisherAgent() + failed_pipeline = { + "id": 7, + "state": "failed", + "failed_reason": "video: boom", + "track_title": "T", + } + sent = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[failed_pipeline]), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + await agent.poll_state_changes() + + assert sent.await_count == 1 + _, kwargs = sent.await_args + assert "실패" in (kwargs.get("text") or "") + assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"] == "ytpub_retry_7" + + +@pytest.mark.asyncio +async def test_failed_pipeline_no_duplicate_notification(): + """같은 failed 파이프라인은 두 번째 poll에서 알림 안 함.""" + from app.agents.youtube_publisher import YoutubePublisherAgent + + agent = YoutubePublisherAgent() + failed_pipeline = { + "id": 7, + "state": "failed", + "failed_reason": "video: boom", + "track_title": "T", + } + sent = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[failed_pipeline]), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + await agent.poll_state_changes() + await agent.poll_state_changes() + + # 중복 방지: 같은 failed 파이프라인에 대해 1회만 알림 + assert sent.await_count == 1 + + +@pytest.mark.asyncio +async def test_failed_pipeline_renotify_after_recovery(): + """failed에서 벗어난 파이프라인이 다시 failed 되면 재알림.""" + from app.agents.youtube_publisher import YoutubePublisherAgent + + agent = YoutubePublisherAgent() + failed_pipeline = { + "id": 7, + "state": "failed", + "failed_reason": "video: boom", + "track_title": "T", + } + sent = AsyncMock(return_value={"ok": True, "message_id": 1}) + + # 첫 번째 poll: failed 존재 → 알림 + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[failed_pipeline]), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + await agent.poll_state_changes() + + assert sent.await_count == 1 + + # 두 번째 poll: failed 목록에서 사라짐(재개됨) → _notified_failed에서 제거 + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + await agent.poll_state_changes() + + assert sent.await_count == 1 # 아직 추가 알림 없음 + + # 세 번째 poll: 다시 failed → 재알림 가능 + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[failed_pipeline]), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + await agent.poll_state_changes() + + assert sent.await_count == 2 # 재알림 + + +@pytest.mark.asyncio +async def test_failed_poll_exception_is_silent(): + """list_failed_pipelines 예외 시 poll이 조용히 넘어감 (active 알림에 영향 없음).""" + from app.agents.youtube_publisher import YoutubePublisherAgent + + agent = YoutubePublisherAgent() + active_pipeline = { + "id": 1, + "state": "cover_pending", + "cover_url": "/x.jpg", + "track_title": "Track", + "feedback_count_per_step": {}, + } + sent = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[active_pipeline]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(side_effect=Exception("network error")), + ), patch( + "app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg", + new=AsyncMock(), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + await agent.poll_state_changes() + + # active 알림은 정상 발송 + assert sent.await_count == 1