import os import sys import tempfile _fd, _TMP = tempfile.mkstemp(suffix=".db") os.close(_fd) os.unlink(_TMP) os.environ["AGENT_OFFICE_DB_PATH"] = _TMP sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pytest from unittest.mock import AsyncMock, patch @pytest.fixture(autouse=True) def _init_db(monkeypatch): import gc gc.collect() # config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시 # db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로 # 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단. import app.db as _db monkeypatch.setattr(_db, "DB_PATH", _TMP) # WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음 for suffix in ("", "-wal", "-shm"): p = _TMP + suffix if os.path.exists(p): os.remove(p) _db.init_db() yield gc.collect() @pytest.mark.asyncio async def test_failed_pipeline_notified_with_retry_button(): from app.agents.youtube_publisher import YoutubePublisherAgent agent = YoutubePublisherAgent() failed_pipeline = { "id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T", } sent = AsyncMock(return_value={"ok": True, "message_id": 1}) with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=AsyncMock(return_value=[failed_pipeline]), ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): await agent.poll_state_changes() assert sent.await_count == 1 _, kwargs = sent.await_args assert "실패" in (kwargs.get("text") or "") assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"] == "ytpub_retry_7" @pytest.mark.asyncio async def test_failed_pipeline_no_duplicate_notification(): """같은 failed 파이프라인은 두 번째 poll에서 알림 안 함.""" from app.agents.youtube_publisher import YoutubePublisherAgent agent = YoutubePublisherAgent() failed_pipeline = { "id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T", } sent = AsyncMock(return_value={"ok": True, "message_id": 1}) with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=AsyncMock(return_value=[failed_pipeline]), ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): await agent.poll_state_changes() await agent.poll_state_changes() # 중복 방지: 같은 failed 파이프라인에 대해 1회만 알림 assert sent.await_count == 1 @pytest.mark.asyncio async def test_failed_pipeline_renotify_after_recovery(): """failed에서 벗어난 파이프라인이 다시 failed 되면 재알림.""" from app.agents.youtube_publisher import YoutubePublisherAgent agent = YoutubePublisherAgent() failed_pipeline = { "id": 7, "state": "failed", "failed_reason": "video: boom", "track_title": "T", } sent = AsyncMock(return_value={"ok": True, "message_id": 1}) # 첫 번째 poll: failed 존재 → 알림 with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=AsyncMock(return_value=[failed_pipeline]), ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): await agent.poll_state_changes() assert sent.await_count == 1 # 두 번째 poll: failed 목록에서 사라짐(재개됨) → _notified_failed에서 제거 with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): await agent.poll_state_changes() assert sent.await_count == 1 # 아직 추가 알림 없음 # 세 번째 poll: 다시 failed → 재알림 가능 with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=AsyncMock(return_value=[failed_pipeline]), ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): await agent.poll_state_changes() assert sent.await_count == 2 # 재알림 @pytest.mark.asyncio async def test_handle_ytpub_retry_calls_proxy(): from app import service_proxy from app.telegram import webhook retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"}) fake_send = AsyncMock(return_value={"ok": True}) fake_api_call = AsyncMock(return_value={"ok": True}) with patch.object(service_proxy, "pipeline_retry", retry), \ patch("app.telegram.messaging.send_raw", fake_send), \ patch("app.telegram.webhook.api_call", fake_api_call): res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7") retry.assert_awaited_once_with(7) assert res["ok"] is True @pytest.mark.asyncio async def test_handle_ytpub_retry_invalid_data(): from app.telegram import webhook fake_send = AsyncMock(return_value={"ok": True}) fake_api_call = AsyncMock(return_value={"ok": True}) with patch("app.telegram.messaging.send_raw", fake_send), \ patch("app.telegram.webhook.api_call", fake_api_call): res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_abc") assert res["ok"] is False @pytest.mark.asyncio async def test_failed_poll_exception_is_silent(): """list_failed_pipelines 예외 시 poll이 조용히 넘어감 (active 알림에 영향 없음).""" from app.agents.youtube_publisher import YoutubePublisherAgent agent = YoutubePublisherAgent() active_pipeline = { "id": 1, "state": "cover_pending", "cover_url": "/x.jpg", "track_title": "Track", "feedback_count_per_step": {}, } sent = AsyncMock(return_value={"ok": True, "message_id": 1}) with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[active_pipeline]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=AsyncMock(side_effect=Exception("network error")), ), patch( "app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg", new=AsyncMock(), ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): await agent.poll_state_changes() # active 알림은 정상 발송 assert sent.await_count == 1 @pytest.mark.asyncio async def test_failed_notification_persists_across_restart(): """컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음.""" from app.agents.youtube_publisher import YoutubePublisherAgent failed_pipeline = { "id": 3, "state": "failed", "failed_reason": "video: timeout", "track_title": "beat music v2", } sent = AsyncMock(return_value={"ok": True, "message_id": 1}) with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=AsyncMock(return_value=[failed_pipeline]), ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): agent1 = YoutubePublisherAgent() await agent1.poll_state_changes() # 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실) agent2 = YoutubePublisherAgent() await agent2.poll_state_changes() # 재시작해도 DB 원장으로 중복 방지 → 1회만 알림 assert sent.await_count == 1 @pytest.mark.asyncio async def test_transient_failed_poll_keeps_ledger(): """failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음.""" from app.agents.youtube_publisher import YoutubePublisherAgent failed_pipeline = { "id": 3, "state": "failed", "failed_reason": "video: timeout", "track_title": "beat music v2", } list_failed = AsyncMock( side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]] ) sent = AsyncMock(return_value={"ok": True, "message_id": 1}) with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", new=AsyncMock(return_value=[]), ), patch( "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", new=list_failed, ), patch( "app.agents.youtube_publisher.send_raw", new=sent, ): agent = YoutubePublisherAgent() await agent.poll_state_changes() # #3 최초 알림 await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지) await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야 assert sent.await_count == 1