182 lines
5.6 KiB
Python
182 lines
5.6 KiB
Python
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
_fd, _TMP = tempfile.mkstemp(suffix=".db")
|
|
os.close(_fd)
|
|
os.unlink(_TMP)
|
|
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import pytest
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _init_db():
|
|
import gc
|
|
gc.collect()
|
|
if os.path.exists(_TMP):
|
|
os.remove(_TMP)
|
|
from app.db import init_db
|
|
init_db()
|
|
yield
|
|
gc.collect()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_pipeline_notified_with_retry_button():
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
agent = YoutubePublisherAgent()
|
|
failed_pipeline = {
|
|
"id": 7,
|
|
"state": "failed",
|
|
"failed_reason": "video: boom",
|
|
"track_title": "T",
|
|
}
|
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
|
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
|
new=AsyncMock(return_value=[]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
|
new=AsyncMock(return_value=[failed_pipeline]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=sent,
|
|
):
|
|
await agent.poll_state_changes()
|
|
|
|
assert sent.await_count == 1
|
|
_, kwargs = sent.await_args
|
|
assert "실패" in (kwargs.get("text") or "")
|
|
assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"] == "ytpub_retry_7"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_pipeline_no_duplicate_notification():
|
|
"""같은 failed 파이프라인은 두 번째 poll에서 알림 안 함."""
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
agent = YoutubePublisherAgent()
|
|
failed_pipeline = {
|
|
"id": 7,
|
|
"state": "failed",
|
|
"failed_reason": "video: boom",
|
|
"track_title": "T",
|
|
}
|
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
|
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
|
new=AsyncMock(return_value=[]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
|
new=AsyncMock(return_value=[failed_pipeline]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=sent,
|
|
):
|
|
await agent.poll_state_changes()
|
|
await agent.poll_state_changes()
|
|
|
|
# 중복 방지: 같은 failed 파이프라인에 대해 1회만 알림
|
|
assert sent.await_count == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_pipeline_renotify_after_recovery():
|
|
"""failed에서 벗어난 파이프라인이 다시 failed 되면 재알림."""
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
agent = YoutubePublisherAgent()
|
|
failed_pipeline = {
|
|
"id": 7,
|
|
"state": "failed",
|
|
"failed_reason": "video: boom",
|
|
"track_title": "T",
|
|
}
|
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
|
|
|
# 첫 번째 poll: failed 존재 → 알림
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
|
new=AsyncMock(return_value=[]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
|
new=AsyncMock(return_value=[failed_pipeline]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=sent,
|
|
):
|
|
await agent.poll_state_changes()
|
|
|
|
assert sent.await_count == 1
|
|
|
|
# 두 번째 poll: failed 목록에서 사라짐(재개됨) → _notified_failed에서 제거
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
|
new=AsyncMock(return_value=[]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
|
new=AsyncMock(return_value=[]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=sent,
|
|
):
|
|
await agent.poll_state_changes()
|
|
|
|
assert sent.await_count == 1 # 아직 추가 알림 없음
|
|
|
|
# 세 번째 poll: 다시 failed → 재알림 가능
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
|
new=AsyncMock(return_value=[]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
|
new=AsyncMock(return_value=[failed_pipeline]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=sent,
|
|
):
|
|
await agent.poll_state_changes()
|
|
|
|
assert sent.await_count == 2 # 재알림
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_poll_exception_is_silent():
|
|
"""list_failed_pipelines 예외 시 poll이 조용히 넘어감 (active 알림에 영향 없음)."""
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
agent = YoutubePublisherAgent()
|
|
active_pipeline = {
|
|
"id": 1,
|
|
"state": "cover_pending",
|
|
"cover_url": "/x.jpg",
|
|
"track_title": "Track",
|
|
"feedback_count_per_step": {},
|
|
}
|
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
|
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
|
new=AsyncMock(return_value=[active_pipeline]),
|
|
), patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
|
new=AsyncMock(side_effect=Exception("network error")),
|
|
), patch(
|
|
"app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
|
|
new=AsyncMock(),
|
|
), patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=sent,
|
|
):
|
|
await agent.poll_state_changes()
|
|
|
|
# active 알림은 정상 발송
|
|
assert sent.await_count == 1
|