Files
web-page-backend/agent-office/tests/test_youtube_publisher_retry.py
gahusb 7cce5c422f fix(agent-office): 파이프라인 실패 알림 dedup을 DB 영속화 (재시작 재알림 스팸 해소)
youtube_publisher._notified_failed(인메모리 set)가 컨테이너 재시작 시 소실되어
기존 failed 파이프라인(예: video 인코딩 구버전 실패 #3)을 매 재시작마다 "신규"로
재알림하던 스팸 버그를 notified_failed_pipelines 테이블로 영속화해 해결.

부수 버그 fix: failed 폴링이 예외를 던지면 failed=[]로 오해해 원장을 통째로
비우던 코드 → 예외 시 early-return(원장 보존).

진행 중 *_pending 승인 dedup(_notified_state_per_pipeline)은 의도적으로 인메모리
유지(재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더).

테스트: 재시작 지속성 + 일시적 폴링 예외 재현 테스트 2종 추가(TDD Red→Green).
DB_PATH 첫 import 고정으로 인한 테스트 간 영속 테이블 누수를 monkeypatch로 격리.
agent-office 전체 140개 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
2026-07-01 15:20:07 +09:00

288 lines
9.7 KiB
Python

import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture(autouse=True)
def _init_db(monkeypatch):
import gc
gc.collect()
# config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
# db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
# 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단.
import app.db as _db
monkeypatch.setattr(_db, "DB_PATH", _TMP)
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
for suffix in ("", "-wal", "-shm"):
p = _TMP + suffix
if os.path.exists(p):
os.remove(p)
_db.init_db()
yield
gc.collect()
@pytest.mark.asyncio
async def test_failed_pipeline_notified_with_retry_button():
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
failed_pipeline = {
"id": 7,
"state": "failed",
"failed_reason": "video: boom",
"track_title": "T",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 1
_, kwargs = sent.await_args
assert "실패" in (kwargs.get("text") or "")
assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"] == "ytpub_retry_7"
@pytest.mark.asyncio
async def test_failed_pipeline_no_duplicate_notification():
"""같은 failed 파이프라인은 두 번째 poll에서 알림 안 함."""
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
failed_pipeline = {
"id": 7,
"state": "failed",
"failed_reason": "video: boom",
"track_title": "T",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
await agent.poll_state_changes()
# 중복 방지: 같은 failed 파이프라인에 대해 1회만 알림
assert sent.await_count == 1
@pytest.mark.asyncio
async def test_failed_pipeline_renotify_after_recovery():
"""failed에서 벗어난 파이프라인이 다시 failed 되면 재알림."""
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
failed_pipeline = {
"id": 7,
"state": "failed",
"failed_reason": "video: boom",
"track_title": "T",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
# 첫 번째 poll: failed 존재 → 알림
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 1
# 두 번째 poll: failed 목록에서 사라짐(재개됨) → _notified_failed에서 제거
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 1 # 아직 추가 알림 없음
# 세 번째 poll: 다시 failed → 재알림 가능
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
assert sent.await_count == 2 # 재알림
@pytest.mark.asyncio
async def test_handle_ytpub_retry_calls_proxy():
from app import service_proxy
from app.telegram import webhook
retry = AsyncMock(return_value={"status_code": 202, "ok": True, "retrying_step": "video"})
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
with patch.object(service_proxy, "pipeline_retry", retry), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_7")
retry.assert_awaited_once_with(7)
assert res["ok"] is True
@pytest.mark.asyncio
async def test_handle_ytpub_retry_invalid_data():
from app.telegram import webhook
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
with patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
res = await webhook._handle_ytpub_retry({"id": 1}, "ytpub_retry_abc")
assert res["ok"] is False
@pytest.mark.asyncio
async def test_failed_poll_exception_is_silent():
"""list_failed_pipelines 예외 시 poll이 조용히 넘어감 (active 알림에 영향 없음)."""
from app.agents.youtube_publisher import YoutubePublisherAgent
agent = YoutubePublisherAgent()
active_pipeline = {
"id": 1,
"state": "cover_pending",
"cover_url": "/x.jpg",
"track_title": "Track",
"feedback_count_per_step": {},
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[active_pipeline]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(side_effect=Exception("network error")),
), patch(
"app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
new=AsyncMock(),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
await agent.poll_state_changes()
# active 알림은 정상 발송
assert sent.await_count == 1
@pytest.mark.asyncio
async def test_failed_notification_persists_across_restart():
"""컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=AsyncMock(return_value=[failed_pipeline]),
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent1 = YoutubePublisherAgent()
await agent1.poll_state_changes()
# 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실)
agent2 = YoutubePublisherAgent()
await agent2.poll_state_changes()
# 재시작해도 DB 원장으로 중복 방지 → 1회만 알림
assert sent.await_count == 1
@pytest.mark.asyncio
async def test_transient_failed_poll_keeps_ledger():
"""failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음."""
from app.agents.youtube_publisher import YoutubePublisherAgent
failed_pipeline = {
"id": 3,
"state": "failed",
"failed_reason": "video: timeout",
"track_title": "beat music v2",
}
list_failed = AsyncMock(
side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]]
)
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
new=AsyncMock(return_value=[]),
), patch(
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
new=list_failed,
), patch(
"app.agents.youtube_publisher.send_raw",
new=sent,
):
agent = YoutubePublisherAgent()
await agent.poll_state_changes() # #3 최초 알림
await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지)
await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야
assert sent.await_count == 1