diff --git a/agent-office/app/agents/youtube_publisher.py b/agent-office/app/agents/youtube_publisher.py index ac93095..04e1297 100644 --- a/agent-office/app/agents/youtube_publisher.py +++ b/agent-office/app/agents/youtube_publisher.py @@ -4,7 +4,12 @@ import logging from .base import BaseAgent from . import classify_intent from .. import service_proxy -from ..db import add_log +from ..db import ( + add_log, + get_notified_failed_pipelines, + add_notified_failed_pipeline, + prune_notified_failed_pipelines, +) from ..telegram.messaging import send_raw logger = logging.getLogger("agent-office.youtube_publisher") @@ -25,8 +30,9 @@ class YoutubePublisherAgent(BaseAgent): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + # 진행 중(*_pending) 승인 요청 dedup — 인메모리 유지(의도적). + # 재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더라 스팸 아님. self._notified_state_per_pipeline: dict[int, tuple] = {} - self._notified_failed: set[int] = set() async def poll_state_changes(self) -> None: """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" @@ -52,18 +58,21 @@ class YoutubePublisherAgent(BaseAgent): try: failed = await service_proxy.list_failed_pipelines() except Exception as e: + # 일시적 폴링 실패를 "failed 없음"으로 오해하면 원장을 비워 재알림 스팸이 남. + # → 원장을 건드리지 않고 조용히 종료(다음 폴링에서 재시도). logger.warning("failed 폴링 실패: %s", e) - failed = [] + return + notified = get_notified_failed_pipelines() for p in failed: pid = p.get("id") if pid is None: continue - if pid not in self._notified_failed: + if pid not in notified: await self._notify_failed(p) - self._notified_failed.add(pid) - # 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제 - failed_ids = {p.get("id") for p in failed} - self._notified_failed &= failed_ids + add_notified_failed_pipeline(pid) + # 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 원장에서 제거 + failed_ids = {p.get("id") for p in failed if p.get("id") is not None} + prune_notified_failed_pipelines(failed_ids) async def _notify_failed(self, p: dict) -> None: reason = p.get("failed_reason") or "?" diff --git a/agent-office/app/db.py b/agent-office/app/db.py index cb31815..e8e1899 100644 --- a/agent-office/app/db.py +++ b/agent-office/app/db.py @@ -158,6 +158,12 @@ def init_db() -> None: CREATE INDEX IF NOT EXISTS idx_tarot_favorite ON tarot_readings(favorite, created_at DESC) """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS notified_failed_pipelines ( + pipeline_id INTEGER PRIMARY KEY, + notified_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) # Seed default agent configs for agent_id, name in [ ("stock", "주식 트레이더"), @@ -826,6 +832,47 @@ def get_all_baselines() -> List[Dict[str, Any]]: return out +# --- notified_failed_pipelines (파이프라인 실패 알림 dedup 원장, 재시작 지속) --- + +def get_notified_failed_pipelines() -> set: + """이미 실패 알림을 발송한 pipeline_id 집합.""" + with _conn() as conn: + rows = conn.execute( + "SELECT pipeline_id FROM notified_failed_pipelines" + ).fetchall() + return {r["pipeline_id"] for r in rows} + + +def add_notified_failed_pipeline(pipeline_id: int) -> None: + with _conn() as conn: + conn.execute( + "INSERT OR IGNORE INTO notified_failed_pipelines(pipeline_id) VALUES(?)", + (pipeline_id,), + ) + + +def prune_notified_failed_pipelines(active_failed_ids) -> None: + """현재 failed 목록에 없는 pipeline_id를 원장에서 제거. + + 재개되어 failed에서 벗어난 파이프라인이 다시 실패하면 재알림 가능하도록 함. + (기존 인메모리 `_notified_failed &= failed_ids`의 영속 버전) + """ + keep = set(active_failed_ids) + with _conn() as conn: + existing = { + r["pipeline_id"] + for r in conn.execute( + "SELECT pipeline_id FROM notified_failed_pipelines" + ).fetchall() + } + stale = existing - keep + for pid in stale: + conn.execute( + "DELETE FROM notified_failed_pipelines WHERE pipeline_id=?", + (pid,), + ) + + def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]: """같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard.""" with _conn() as conn: diff --git a/agent-office/tests/test_youtube_publisher_retry.py b/agent-office/tests/test_youtube_publisher_retry.py index 86393d4..5eb9a32 100644 --- a/agent-office/tests/test_youtube_publisher_retry.py +++ b/agent-office/tests/test_youtube_publisher_retry.py @@ -14,13 +14,20 @@ from unittest.mock import AsyncMock, patch @pytest.fixture(autouse=True) -def _init_db(): +def _init_db(monkeypatch): import gc gc.collect() - if os.path.exists(_TMP): - os.remove(_TMP) - from app.db import init_db - init_db() + # config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시 + # db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로 + # 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단. + import app.db as _db + monkeypatch.setattr(_db, "DB_PATH", _TMP) + # WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음 + for suffix in ("", "-wal", "-shm"): + p = _TMP + suffix + if os.path.exists(p): + os.remove(p) + _db.init_db() yield gc.collect() @@ -211,3 +218,70 @@ async def test_failed_poll_exception_is_silent(): # active 알림은 정상 발송 assert sent.await_count == 1 + + +@pytest.mark.asyncio +async def test_failed_notification_persists_across_restart(): + """컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음.""" + from app.agents.youtube_publisher import YoutubePublisherAgent + + failed_pipeline = { + "id": 3, + "state": "failed", + "failed_reason": "video: timeout", + "track_title": "beat music v2", + } + sent = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=AsyncMock(return_value=[failed_pipeline]), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + agent1 = YoutubePublisherAgent() + await agent1.poll_state_changes() + # 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실) + agent2 = YoutubePublisherAgent() + await agent2.poll_state_changes() + + # 재시작해도 DB 원장으로 중복 방지 → 1회만 알림 + assert sent.await_count == 1 + + +@pytest.mark.asyncio +async def test_transient_failed_poll_keeps_ledger(): + """failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음.""" + from app.agents.youtube_publisher import YoutubePublisherAgent + + failed_pipeline = { + "id": 3, + "state": "failed", + "failed_reason": "video: timeout", + "track_title": "beat music v2", + } + list_failed = AsyncMock( + side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]] + ) + sent = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=[]), + ), patch( + "app.agents.youtube_publisher.service_proxy.list_failed_pipelines", + new=list_failed, + ), patch( + "app.agents.youtube_publisher.send_raw", + new=sent, + ): + agent = YoutubePublisherAgent() + await agent.poll_state_changes() # #3 최초 알림 + await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지) + await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야 + + assert sent.await_count == 1