From 97b15cb9857a15bd0635ec38cf183093893d2ef4 Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 8 May 2026 23:08:24 +0900 Subject: [PATCH] =?UTF-8?q?fix(pipeline):=20premature=20state=20update=20+?= =?UTF-8?q?=20reject=20=EC=9E=AC=EC=83=9D=EC=84=B1=20=EC=95=8C=EB=A6=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 버그1: /feedback approve가 bg task 시작 전에 state를 next_pending으로 set → polling이 빈 video_url로 알림 발송. bg task의 run_step이 state를 set하도록 일임 — 이중 update 제거. 버그2: reject 후 같은 *_pending 상태로 재생성됐을 때 dedupe에 막혀 알림이 안 감. dedupe 키에 feedback_count_per_step[step]을 포함 — 재생성마다 count가 증가하므로 키가 달라져 재알림 동작. --- agent-office/app/agents/youtube_publisher.py | 12 +++++++---- agent-office/tests/test_pipeline_polling.py | 22 ++++++++++++++++++++ music-lab/app/main.py | 12 +++++++++-- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/agent-office/app/agents/youtube_publisher.py b/agent-office/app/agents/youtube_publisher.py index fc52bc8..24ed747 100644 --- a/agent-office/app/agents/youtube_publisher.py +++ b/agent-office/app/agents/youtube_publisher.py @@ -25,7 +25,7 @@ class YoutubePublisherAgent(BaseAgent): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._notified_state_per_pipeline: dict[int, str] = {} + self._notified_state_per_pipeline: dict[int, tuple] = {} async def poll_state_changes(self) -> None: """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" @@ -40,9 +40,13 @@ class YoutubePublisherAgent(BaseAgent): pid = p.get("id") if pid is None: continue - if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state: - await self._notify_step(p) - self._notified_state_per_pipeline[pid] = state + if state in _STEP_TITLES: + _, step = _STEP_TITLES[state] + fb_count = (p.get("feedback_count_per_step") or {}).get(step, 0) + key = (state, fb_count) + if self._notified_state_per_pipeline.get(pid) != key: + await self._notify_step(p) + self._notified_state_per_pipeline[pid] = key async def _notify_step(self, pipeline: dict) -> None: state = pipeline["state"] diff --git a/agent-office/tests/test_pipeline_polling.py b/agent-office/tests/test_pipeline_polling.py index 74e8c52..83eafc7 100644 --- a/agent-office/tests/test_pipeline_polling.py +++ b/agent-office/tests/test_pipeline_polling.py @@ -35,6 +35,7 @@ async def test_poll_notifies_once_per_state(): "state": "cover_pending", "cover_url": "/x.jpg", "track_title": "Test", + "feedback_count_per_step": {}, }] with patch( "app.agents.youtube_publisher.service_proxy.list_active_pipelines", @@ -52,6 +53,27 @@ async def test_poll_notifies_once_per_state(): assert mock_send.call_count == 1 +@pytest.mark.asyncio +async def test_poll_renotifies_on_reject_regen(monkeypatch): + from app.agents.youtube_publisher import YoutubePublisherAgent + + pipelines_v1 = [{"id": 1, "state": "cover_pending", "cover_url": "/x.jpg", + "track_title": "Test", "feedback_count_per_step": {}}] + pipelines_v2 = [{"id": 1, "state": "cover_pending", "cover_url": "/x2.jpg", + "track_title": "Test", "feedback_count_per_step": {"cover": 1}}] + list_mock = AsyncMock(side_effect=[pipelines_v1, pipelines_v2]) + with patch("app.agents.youtube_publisher.service_proxy.list_active_pipelines", list_mock), \ + patch("app.agents.youtube_publisher.send_raw", + new=AsyncMock(return_value={"ok": True, "message_id": 99})), \ + patch("app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg", + new=AsyncMock()): + a = YoutubePublisherAgent() + await a.poll_state_changes() # 1st: notify + await a.poll_state_changes() # 2nd: feedback count differs → notify again + from app.agents.youtube_publisher import send_raw as sr + assert sr.call_count == 2 + + @pytest.mark.asyncio async def test_on_telegram_reply_approve_calls_feedback(): from app.agents.youtube_publisher import YoutubePublisherAgent diff --git a/music-lab/app/main.py b/music-lab/app/main.py index 2db9944..07e4fd0 100644 --- a/music-lab/app/main.py +++ b/music-lab/app/main.py @@ -1009,11 +1009,19 @@ async def feedback(pid: int, req: FeedbackRequest, bg: BackgroundTasks): if req.intent == "approve": from .pipeline.state_machine import next_state_on_approve - next_st = next_state_on_approve(state) - _db_module.update_pipeline_state(pid, next_st) + # Validate transition is legal + try: + next_st = next_state_on_approve(state) + except ValueError as e: + raise HTTPException(400, str(e)) next_step = _state_to_step(next_st) if next_step: + # bg task will set state to the new *_pending when step completes bg.add_task(orchestrator.run_step, pid, next_step) + else: + # No step to run — fall through to direct state update + # (defensive — current code paths don't hit this) + _db_module.update_pipeline_state(pid, next_st) return {"ok": True} elif req.intent == "reject":