fix(pipeline): premature state update + reject 재생성 알림

버그1: /feedback approve가 bg task 시작 전에 state를 next_pending으로 set →
polling이 빈 video_url로 알림 발송. bg task의 run_step이 state를 set하도록
일임 — 이중 update 제거.

버그2: reject 후 같은 *_pending 상태로 재생성됐을 때 dedupe에 막혀 알림이
안 감. dedupe 키에 feedback_count_per_step[step]을 포함 — 재생성마다
count가 증가하므로 키가 달라져 재알림 동작.
This commit is contained in:
2026-05-08 23:08:24 +09:00
parent 6d416aab78
commit 97b15cb985
3 changed files with 40 additions and 6 deletions

View File

@@ -25,7 +25,7 @@ class YoutubePublisherAgent(BaseAgent):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._notified_state_per_pipeline: dict[int, str] = {} self._notified_state_per_pipeline: dict[int, tuple] = {}
async def poll_state_changes(self) -> None: async def poll_state_changes(self) -> None:
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
@@ -40,9 +40,13 @@ class YoutubePublisherAgent(BaseAgent):
pid = p.get("id") pid = p.get("id")
if pid is None: if pid is None:
continue continue
if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state: if state in _STEP_TITLES:
_, step = _STEP_TITLES[state]
fb_count = (p.get("feedback_count_per_step") or {}).get(step, 0)
key = (state, fb_count)
if self._notified_state_per_pipeline.get(pid) != key:
await self._notify_step(p) await self._notify_step(p)
self._notified_state_per_pipeline[pid] = state self._notified_state_per_pipeline[pid] = key
async def _notify_step(self, pipeline: dict) -> None: async def _notify_step(self, pipeline: dict) -> None:
state = pipeline["state"] state = pipeline["state"]

View File

@@ -35,6 +35,7 @@ async def test_poll_notifies_once_per_state():
"state": "cover_pending", "state": "cover_pending",
"cover_url": "/x.jpg", "cover_url": "/x.jpg",
"track_title": "Test", "track_title": "Test",
"feedback_count_per_step": {},
}] }]
with patch( with patch(
"app.agents.youtube_publisher.service_proxy.list_active_pipelines", "app.agents.youtube_publisher.service_proxy.list_active_pipelines",
@@ -52,6 +53,27 @@ async def test_poll_notifies_once_per_state():
assert mock_send.call_count == 1 assert mock_send.call_count == 1
@pytest.mark.asyncio
async def test_poll_renotifies_on_reject_regen(monkeypatch):
from app.agents.youtube_publisher import YoutubePublisherAgent
pipelines_v1 = [{"id": 1, "state": "cover_pending", "cover_url": "/x.jpg",
"track_title": "Test", "feedback_count_per_step": {}}]
pipelines_v2 = [{"id": 1, "state": "cover_pending", "cover_url": "/x2.jpg",
"track_title": "Test", "feedback_count_per_step": {"cover": 1}}]
list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2])
with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \
patch("app.agents.youtube_publisher.send_raw",
new=AsyncMock(return_value={"ok": True, "message_id": 99})), \
patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg",
new=AsyncMock()):
a = YoutubePublisherAgent()
await a.poll_state_changes() # 1st: notify
await a.poll_state_changes() # 2nd: feedback count differs → notify again
from app.agents.youtube_publisher import send_raw as sr
assert sr.call_count == 2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_on_telegram_reply_approve_calls_feedback(): async def test_on_telegram_reply_approve_calls_feedback():
from app.agents.youtube_publisher import YoutubePublisherAgent from app.agents.youtube_publisher import YoutubePublisherAgent

View File

@@ -1009,11 +1009,19 @@ async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks):
if req.intent == "approve": if req.intent == "approve":
from .pipeline.state_machine import next_state_on_approve from .pipeline.state_machine import next_state_on_approve
# Validate transition is legal
try:
next_st = next_state_on_approve(state) next_st = next_state_on_approve(state)
_db_module.update_pipeline_state(pid, next_st) except ValueError as e:
raise HTTPException(400, str(e))
next_step = _state_to_step(next_st) next_step = _state_to_step(next_st)
if next_step: if next_step:
# bg task will set state to the new *_pending when step completes
bg.add_task(orchestrator.run_step, pid, next_step) bg.add_task(orchestrator.run_step, pid, next_step)
else:
# No step to run — fall through to direct state update
# (defensive — current code paths don't hit this)
_db_module.update_pipeline_state(pid, next_st)
return {"ok": True} return {"ok": True}
elif req.intent == "reject": elif req.intent == "reject":