버그1: /feedback approve가 bg task 시작 전에 state를 next_pending으로 set → polling이 빈 video_url로 알림 발송. bg task의 run_step이 state를 set하도록 일임 — 이중 update 제거. 버그2: reject 후 같은 *_pending 상태로 재생성됐을 때 dedupe에 막혀 알림이 안 감. dedupe 키에 feedback_count_per_step[step]을 포함 — 재생성마다 count가 증가하므로 키가 달라져 재알림 동작.
133 lines
4.4 KiB
Python
133 lines
4.4 KiB
Python
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
_fd, _TMP = tempfile.mkstemp(suffix=".db")
|
|
os.close(_fd)
|
|
os.unlink(_TMP)
|
|
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
|
import pytest
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _init_db():
|
|
import gc
|
|
gc.collect()
|
|
if os.path.exists(_TMP):
|
|
os.remove(_TMP)
|
|
from app.db import init_db
|
|
init_db()
|
|
yield
|
|
gc.collect()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_poll_notifies_once_per_state():
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
pipelines = [{
|
|
"id": 1,
|
|
"state": "cover_pending",
|
|
"cover_url": "/x.jpg",
|
|
"track_title": "Test",
|
|
"feedback_count_per_step": {},
|
|
}]
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
|
new=AsyncMock(return_value=pipelines),
|
|
), patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=AsyncMock(return_value={"ok": True, "message_id": 99}),
|
|
) as mock_send, patch(
|
|
"app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
|
|
new=AsyncMock(),
|
|
):
|
|
a = YoutubePublisherAgent()
|
|
await a.poll_state_changes()
|
|
await a.poll_state_changes() # 같은 상태 — 두 번째는 알림 안 함
|
|
assert mock_send.call_count == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_poll_renotifies_on_reject_regen(monkeypatch):
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
pipelines_v1 = [{"id": 1, "state": "cover_pending", "cover_url": "/x.jpg",
|
|
"track_title": "Test", "feedback_count_per_step": {}}]
|
|
pipelines_v2 = [{"id": 1, "state": "cover_pending", "cover_url": "/x2.jpg",
|
|
"track_title": "Test", "feedback_count_per_step": {"cover": 1}}]
|
|
list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2])
|
|
with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \
|
|
patch("app.agents.youtube_publisher.send_raw",
|
|
new=AsyncMock(return_value={"ok": True, "message_id": 99})), \
|
|
patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
|
|
new=AsyncMock()):
|
|
a = YoutubePublisherAgent()
|
|
await a.poll_state_changes() # 1st: notify
|
|
await a.poll_state_changes() # 2nd: feedback count differs → notify again
|
|
from app.agents.youtube_publisher import send_raw as sr
|
|
assert sr.call_count == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_telegram_reply_approve_calls_feedback():
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.post_pipeline_feedback",
|
|
new=AsyncMock(),
|
|
) as mock_fb, patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=AsyncMock(),
|
|
):
|
|
a = YoutubePublisherAgent()
|
|
await a.on_telegram_reply(pipeline_id=42, step="cover", user_text="승인")
|
|
mock_fb.assert_called_once_with(42, "cover", "approve", None)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_telegram_reply_reject_with_feedback():
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
with patch(
|
|
"app.agents.youtube_publisher.service_proxy.post_pipeline_feedback",
|
|
new=AsyncMock(),
|
|
) as mock_fb, patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=AsyncMock(),
|
|
):
|
|
a = YoutubePublisherAgent()
|
|
await a.on_telegram_reply(pipeline_id=43, step="meta", user_text="반려, 제목 짧게")
|
|
args = mock_fb.call_args[0]
|
|
assert args[0] == 43
|
|
assert args[1] == "meta"
|
|
assert args[2] == "reject"
|
|
assert "제목 짧게" in (args[3] or "")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_telegram_reply_unclear_asks_again():
|
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
|
|
|
sent = []
|
|
|
|
async def mock_send(text=None, **kw):
|
|
sent.append(text)
|
|
return {"ok": True, "message_id": 1}
|
|
|
|
with patch(
|
|
"app.agents.youtube_publisher.send_raw",
|
|
new=mock_send,
|
|
), patch(
|
|
"app.agents.youtube_publisher.classify_intent.classify",
|
|
return_value=("unclear", None),
|
|
):
|
|
a = YoutubePublisherAgent()
|
|
await a.on_telegram_reply(pipeline_id=44, step="cover", user_text="huh?")
|
|
assert any("다시 입력" in (s or "") for s in sent)
|