fix(agent-office): 파이프라인 실패 알림 dedup을 DB 영속화 (재시작 재알림 스팸 해소)
youtube_publisher._notified_failed(인메모리 set)가 컨테이너 재시작 시 소실되어 기존 failed 파이프라인(예: video 인코딩 구버전 실패 #3)을 매 재시작마다 "신규"로 재알림하던 스팸 버그를 notified_failed_pipelines 테이블로 영속화해 해결. 부수 버그 fix: failed 폴링이 예외를 던지면 failed=[]로 오해해 원장을 통째로 비우던 코드 → 예외 시 early-return(원장 보존). 진행 중 *_pending 승인 dedup(_notified_state_per_pipeline)은 의도적으로 인메모리 유지(재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더). 테스트: 재시작 지속성 + 일시적 폴링 예외 재현 테스트 2종 추가(TDD Red→Green). DB_PATH 첫 import 고정으로 인한 테스트 간 영속 테이블 누수를 monkeypatch로 격리. agent-office 전체 140개 통과. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01EqCYBhvTcdeCTUDX3RhWx9
This commit is contained in:
@@ -4,7 +4,12 @@ import logging
|
|||||||
from .base import BaseAgent
|
from .base import BaseAgent
|
||||||
from . import classify_intent
|
from . import classify_intent
|
||||||
from .. import service_proxy
|
from .. import service_proxy
|
||||||
from ..db import add_log
|
from ..db import (
|
||||||
|
add_log,
|
||||||
|
get_notified_failed_pipelines,
|
||||||
|
add_notified_failed_pipeline,
|
||||||
|
prune_notified_failed_pipelines,
|
||||||
|
)
|
||||||
from ..telegram.messaging import send_raw
|
from ..telegram.messaging import send_raw
|
||||||
|
|
||||||
logger = logging.getLogger("agent-office.youtube_publisher")
|
logger = logging.getLogger("agent-office.youtube_publisher")
|
||||||
@@ -25,8 +30,9 @@ class YoutubePublisherAgent(BaseAgent):
|
|||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
# 진행 중(*_pending) 승인 요청 dedup — 인메모리 유지(의도적).
|
||||||
|
# 재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더라 스팸 아님.
|
||||||
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
||||||
self._notified_failed: set[int] = set()
|
|
||||||
|
|
||||||
async def poll_state_changes(self) -> None:
|
async def poll_state_changes(self) -> None:
|
||||||
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
|
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
|
||||||
@@ -52,18 +58,21 @@ class YoutubePublisherAgent(BaseAgent):
|
|||||||
try:
|
try:
|
||||||
failed = await service_proxy.list_failed_pipelines()
|
failed = await service_proxy.list_failed_pipelines()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# 일시적 폴링 실패를 "failed 없음"으로 오해하면 원장을 비워 재알림 스팸이 남.
|
||||||
|
# → 원장을 건드리지 않고 조용히 종료(다음 폴링에서 재시도).
|
||||||
logger.warning("failed 폴링 실패: %s", e)
|
logger.warning("failed 폴링 실패: %s", e)
|
||||||
failed = []
|
return
|
||||||
|
notified = get_notified_failed_pipelines()
|
||||||
for p in failed:
|
for p in failed:
|
||||||
pid = p.get("id")
|
pid = p.get("id")
|
||||||
if pid is None:
|
if pid is None:
|
||||||
continue
|
continue
|
||||||
if pid not in self._notified_failed:
|
if pid not in notified:
|
||||||
await self._notify_failed(p)
|
await self._notify_failed(p)
|
||||||
self._notified_failed.add(pid)
|
add_notified_failed_pipeline(pid)
|
||||||
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제
|
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 원장에서 제거
|
||||||
failed_ids = {p.get("id") for p in failed}
|
failed_ids = {p.get("id") for p in failed if p.get("id") is not None}
|
||||||
self._notified_failed &= failed_ids
|
prune_notified_failed_pipelines(failed_ids)
|
||||||
|
|
||||||
async def _notify_failed(self, p: dict) -> None:
|
async def _notify_failed(self, p: dict) -> None:
|
||||||
reason = p.get("failed_reason") or "?"
|
reason = p.get("failed_reason") or "?"
|
||||||
|
|||||||
@@ -158,6 +158,12 @@ def init_db() -> None:
|
|||||||
CREATE INDEX IF NOT EXISTS idx_tarot_favorite
|
CREATE INDEX IF NOT EXISTS idx_tarot_favorite
|
||||||
ON tarot_readings(favorite, created_at DESC)
|
ON tarot_readings(favorite, created_at DESC)
|
||||||
""")
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS notified_failed_pipelines (
|
||||||
|
pipeline_id INTEGER PRIMARY KEY,
|
||||||
|
notified_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
||||||
|
)
|
||||||
|
""")
|
||||||
# Seed default agent configs
|
# Seed default agent configs
|
||||||
for agent_id, name in [
|
for agent_id, name in [
|
||||||
("stock", "주식 트레이더"),
|
("stock", "주식 트레이더"),
|
||||||
@@ -826,6 +832,47 @@ def get_all_baselines() -> List[Dict[str, Any]]:
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
# --- notified_failed_pipelines (파이프라인 실패 알림 dedup 원장, 재시작 지속) ---
|
||||||
|
|
||||||
|
def get_notified_failed_pipelines() -> set:
|
||||||
|
"""이미 실패 알림을 발송한 pipeline_id 집합."""
|
||||||
|
with _conn() as conn:
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT pipeline_id FROM notified_failed_pipelines"
|
||||||
|
).fetchall()
|
||||||
|
return {r["pipeline_id"] for r in rows}
|
||||||
|
|
||||||
|
|
||||||
|
def add_notified_failed_pipeline(pipeline_id: int) -> None:
|
||||||
|
with _conn() as conn:
|
||||||
|
conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO notified_failed_pipelines(pipeline_id) VALUES(?)",
|
||||||
|
(pipeline_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def prune_notified_failed_pipelines(active_failed_ids) -> None:
|
||||||
|
"""현재 failed 목록에 없는 pipeline_id를 원장에서 제거.
|
||||||
|
|
||||||
|
재개되어 failed에서 벗어난 파이프라인이 다시 실패하면 재알림 가능하도록 함.
|
||||||
|
(기존 인메모리 `_notified_failed &= failed_ids`의 영속 버전)
|
||||||
|
"""
|
||||||
|
keep = set(active_failed_ids)
|
||||||
|
with _conn() as conn:
|
||||||
|
existing = {
|
||||||
|
r["pipeline_id"]
|
||||||
|
for r in conn.execute(
|
||||||
|
"SELECT pipeline_id FROM notified_failed_pipelines"
|
||||||
|
).fetchall()
|
||||||
|
}
|
||||||
|
stale = existing - keep
|
||||||
|
for pid in stale:
|
||||||
|
conn.execute(
|
||||||
|
"DELETE FROM notified_failed_pipelines WHERE pipeline_id=?",
|
||||||
|
(pid,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
|
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
|
||||||
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
|
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
|
||||||
with _conn() as conn:
|
with _conn() as conn:
|
||||||
|
|||||||
@@ -14,13 +14,20 @@ from unittest.mock import AsyncMock, patch
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def _init_db():
|
def _init_db(monkeypatch):
|
||||||
import gc
|
import gc
|
||||||
gc.collect()
|
gc.collect()
|
||||||
if os.path.exists(_TMP):
|
# config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
|
||||||
os.remove(_TMP)
|
# db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
|
||||||
from app.db import init_db
|
# 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단.
|
||||||
init_db()
|
import app.db as _db
|
||||||
|
monkeypatch.setattr(_db, "DB_PATH", _TMP)
|
||||||
|
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
|
||||||
|
for suffix in ("", "-wal", "-shm"):
|
||||||
|
p = _TMP + suffix
|
||||||
|
if os.path.exists(p):
|
||||||
|
os.remove(p)
|
||||||
|
_db.init_db()
|
||||||
yield
|
yield
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
@@ -211,3 +218,70 @@ async def test_failed_poll_exception_is_silent():
|
|||||||
|
|
||||||
# active 알림은 정상 발송
|
# active 알림은 정상 발송
|
||||||
assert sent.await_count == 1
|
assert sent.await_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_notification_persists_across_restart():
|
||||||
|
"""컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음."""
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
failed_pipeline = {
|
||||||
|
"id": 3,
|
||||||
|
"state": "failed",
|
||||||
|
"failed_reason": "video: timeout",
|
||||||
|
"track_title": "beat music v2",
|
||||||
|
}
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[failed_pipeline]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
agent1 = YoutubePublisherAgent()
|
||||||
|
await agent1.poll_state_changes()
|
||||||
|
# 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실)
|
||||||
|
agent2 = YoutubePublisherAgent()
|
||||||
|
await agent2.poll_state_changes()
|
||||||
|
|
||||||
|
# 재시작해도 DB 원장으로 중복 방지 → 1회만 알림
|
||||||
|
assert sent.await_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_transient_failed_poll_keeps_ledger():
|
||||||
|
"""failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음."""
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
failed_pipeline = {
|
||||||
|
"id": 3,
|
||||||
|
"state": "failed",
|
||||||
|
"failed_reason": "video: timeout",
|
||||||
|
"track_title": "beat music v2",
|
||||||
|
}
|
||||||
|
list_failed = AsyncMock(
|
||||||
|
side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]]
|
||||||
|
)
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=list_failed,
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
agent = YoutubePublisherAgent()
|
||||||
|
await agent.poll_state_changes() # #3 최초 알림
|
||||||
|
await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지)
|
||||||
|
await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야
|
||||||
|
|
||||||
|
assert sent.await_count == 1
|
||||||
|
|||||||
Reference in New Issue
Block a user