diff --git a/agent-office/app/agents/__init__.py b/agent-office/app/agents/__init__.py index 6556440..94ae6d0 100644 --- a/agent-office/app/agents/__init__.py +++ b/agent-office/app/agents/__init__.py @@ -4,6 +4,7 @@ from .blog import BlogAgent from .realestate import RealestateAgent from .lotto import LottoAgent from .youtube import YouTubeResearchAgent +from .youtube_publisher import YoutubePublisherAgent AGENT_REGISTRY = {} @@ -14,6 +15,7 @@ def init_agents(): AGENT_REGISTRY["realestate"] = RealestateAgent() AGENT_REGISTRY["lotto"] = LottoAgent() AGENT_REGISTRY["youtube"] = YouTubeResearchAgent() + AGENT_REGISTRY["youtube_publisher"] = YoutubePublisherAgent() def get_agent(agent_id: str): return AGENT_REGISTRY.get(agent_id) diff --git a/agent-office/app/agents/youtube_publisher.py b/agent-office/app/agents/youtube_publisher.py new file mode 100644 index 0000000..fc52bc8 --- /dev/null +++ b/agent-office/app/agents/youtube_publisher.py @@ -0,0 +1,108 @@ +"""텔레그램 단일 채널로 단계별 승인 인터랙션 오케스트레이션.""" +import logging + +from .base import BaseAgent +from . import classify_intent +from .. import service_proxy +from ..db import add_log +from ..telegram.messaging import send_raw + +logger = logging.getLogger("agent-office.youtube_publisher") + + +_STEP_TITLES = { + "cover_pending": ("커버 아트", "cover"), + "video_pending": ("영상 비주얼", "video"), + "thumb_pending": ("썸네일", "thumb"), + "meta_pending": ("메타데이터", "meta"), + "publish_pending": ("최종 검토 + 발행", "publish"), +} + + +class YoutubePublisherAgent(BaseAgent): + agent_id = "youtube_publisher" + display_name = "YouTube 퍼블리셔" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._notified_state_per_pipeline: dict[int, str] = {} + + async def poll_state_changes(self) -> None: + """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" + try: + pipelines = await service_proxy.list_active_pipelines() + except Exception as e: + logger.warning("폴링 실패: %s", e) + return + + for p in pipelines: + state = p.get("state") + pid = p.get("id") + if pid is None: + continue + if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state: + await self._notify_step(p) + self._notified_state_per_pipeline[pid] = state + + async def _notify_step(self, pipeline: dict) -> None: + state = pipeline["state"] + title_name, step = _STEP_TITLES[state] + body = self._format_body(pipeline, step) + track_title = pipeline.get("track_title") or f"Pipeline #{pipeline['id']}" + text = ( + f"🎵 [{track_title}] {title_name} 검토\n\n" + f"{body}\n\n" + f"➡️ 답장으로 알려주세요: '승인' 또는 '반려 + 수정 방향'" + ) + sent = await send_raw(text=text) + if sent.get("ok"): + msg_id = sent.get("message_id") + try: + await service_proxy.save_pipeline_telegram_msg(pipeline["id"], step, msg_id) + except Exception as e: + logger.warning("telegram-msg 저장 실패: %s", e) + add_log(self.agent_id, f"pipeline {pipeline['id']} {step} 알림 전송", "info") + + def _format_body(self, p: dict, step: str) -> str: + if step == "cover": + return f"🖼️ 커버: {p.get('cover_url', '-')}" + if step == "video": + return f"🎬 영상: {p.get('video_url', '-')}" + if step == "thumb": + return f"🎴 썸네일: {p.get('thumbnail_url', '-')}" + if step == "meta": + m = p.get("metadata", {}) or {} + tags = m.get("tags", []) or [] + description = (m.get("description", "") or "") + return ( + f"📝 제목: {m.get('title', '')}\n" + f"🏷️ 태그: {', '.join(tags[:8])}\n" + f"📄 설명(앞부분): {description[:200]}" + ) + if step == "publish": + r = p.get("review", {}) or {} + return ( + f"AI 검토 결과: {r.get('verdict', '?')} " + f"(가중 {r.get('weighted_total', '?')}/100)\n" + f"{r.get('summary', '')}" + ) + return "" + + async def on_telegram_reply(self, pipeline_id: int, step: str, user_text: str) -> None: + intent, feedback = classify_intent.classify(user_text) + if intent == "unclear": + await send_raw("다시 입력해주세요. 예: '승인' 또는 '반려, 제목 짧게'") + return + try: + await service_proxy.post_pipeline_feedback(pipeline_id, step, intent, feedback) + except Exception as e: + await send_raw(f"⚠️ 처리 실패: {e}") + + async def on_schedule(self) -> None: + await self.poll_state_changes() + + async def on_command(self, command: str, params: dict) -> dict: + return {"ok": False, "message": f"Unknown command: {command}"} + + async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: + pass diff --git a/agent-office/app/scheduler.py b/agent-office/app/scheduler.py index 32fecf9..afab21a 100644 --- a/agent-office/app/scheduler.py +++ b/agent-office/app/scheduler.py @@ -34,6 +34,11 @@ async def _send_youtube_weekly_report(): if agent: await agent.send_weekly_report() +async def _poll_pipelines(): + agent = AGENT_REGISTRY.get("youtube_publisher") + if agent: + await agent.poll_state_changes() + def init_scheduler(): scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news") scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline") @@ -41,4 +46,5 @@ def init_scheduler(): scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research") scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report") scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check") + scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll") scheduler.start() diff --git a/agent-office/app/service_proxy.py b/agent-office/app/service_proxy.py index 194de97..4353c10 100644 --- a/agent-office/app/service_proxy.py +++ b/agent-office/app/service_proxy.py @@ -178,3 +178,46 @@ async def lotto_save_briefing(payload: dict) -> Dict[str, Any]: resp = await _client.post(f"{LOTTO_BACKEND_URL}/api/lotto/briefing", json=payload) resp.raise_for_status() return resp.json() + + +# --- music-lab pipeline (YouTube publisher orchestration) --- + +async def list_active_pipelines() -> list[dict]: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline?status=active") + resp.raise_for_status() + return resp.json().get("pipelines", []) + + +async def get_pipeline(pid: int) -> dict: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}") + resp.raise_for_status() + return resp.json() + + +async def post_pipeline_feedback(pid: int, step: str, intent: str, + feedback_text: Optional[str] = None) -> dict: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.post( + f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/feedback", + json={"step": step, "intent": intent, "feedback_text": feedback_text}, + ) + resp.raise_for_status() + return resp.json() + + +async def save_pipeline_telegram_msg(pid: int, step: str, msg_id: int) -> None: + async with httpx.AsyncClient(timeout=10) as client: + await client.patch( + f"{MUSIC_LAB_URL}/api/music/pipeline/{pid}/telegram-msg", + json={"step": step, "message_id": msg_id}, + ) + + +async def lookup_pipeline_by_msg(msg_id: int) -> Optional[dict]: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(f"{MUSIC_LAB_URL}/api/music/pipeline/lookup-by-msg/{msg_id}") + if resp.status_code == 200: + return resp.json() + return None diff --git a/agent-office/app/telegram/conversational.py b/agent-office/app/telegram/conversational.py index 2fa3615..39a2961 100644 --- a/agent-office/app/telegram/conversational.py +++ b/agent-office/app/telegram/conversational.py @@ -103,6 +103,34 @@ def _build_messages(history: list, user_text: str) -> list: return msgs +async def maybe_route_to_pipeline(message: dict) -> bool: + """파이프라인 텔레그램 메시지에 대한 reply 인 경우 youtube_publisher 로 라우팅. + + Returns True if message was routed (caller should stop further processing). + """ + reply_to = message.get("reply_to_message") or {} + msg_id = reply_to.get("message_id") + if not msg_id: + return False + from .. import service_proxy + try: + link = await service_proxy.lookup_pipeline_by_msg(msg_id) + except Exception: + return False + if not link: + return False + from ..agents import AGENT_REGISTRY + agent = AGENT_REGISTRY.get("youtube_publisher") + if not agent: + return False + pipeline_id = link.get("pipeline_id") + step = link.get("step") + if pipeline_id is None or not step: + return False + await agent.on_telegram_reply(pipeline_id, step, message.get("text", "")) + return True + + async def respond_to_message(chat_id: str, user_text: str) -> Optional[str]: """자연어 메시지에 응답. 실패 시 사용자에게 돌려줄 문자열 반환(또는 None = 무시).""" if not ANTHROPIC_API_KEY: diff --git a/agent-office/app/telegram/webhook.py b/agent-office/app/telegram/webhook.py index 107d533..b66d38a 100644 --- a/agent-office/app/telegram/webhook.py +++ b/agent-office/app/telegram/webhook.py @@ -102,6 +102,11 @@ async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]: from .router import parse_command, resolve_agent_command, HELP_TEXT from .messaging import send_raw, send_agent_message from .agent_registry import AGENT_META + from .conversational import maybe_route_to_pipeline + + # 파이프라인 메시지에 대한 reply라면 youtube_publisher 로 라우팅 + if await maybe_route_to_pipeline(message): + return {"handled": "pipeline_reply"} text = message.get("text", "") parsed = parse_command(text) diff --git a/agent-office/tests/test_pipeline_polling.py b/agent-office/tests/test_pipeline_polling.py new file mode 100644 index 0000000..74e8c52 --- /dev/null +++ b/agent-office/tests/test_pipeline_polling.py @@ -0,0 +1,110 @@ +import os +import sys +import tempfile + +_fd, _TMP = tempfile.mkstemp(suffix=".db") +os.close(_fd) +os.unlink(_TMP) +os.environ["AGENT_OFFICE_DB_PATH"] = _TMP + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + +import pytest +from unittest.mock import AsyncMock, patch + + +@pytest.fixture(autouse=True) +def _init_db(): + import gc + gc.collect() + if os.path.exists(_TMP): + os.remove(_TMP) + from app.db import init_db + init_db() + yield + gc.collect() + + +@pytest.mark.asyncio +async def test_poll_notifies_once_per_state(): + from app.agents.youtube_publisher import YoutubePublisherAgent + + pipelines = [{ + "id": 1, + "state": "cover_pending", + "cover_url": "/x.jpg", + "track_title": "Test", + }] + with patch( + "app.agents.youtube_publisher.service_proxy.list_active_pipelines", + new=AsyncMock(return_value=pipelines), + ), patch( + "app.agents.youtube_publisher.send_raw", + new=AsyncMock(return_value={"ok": True, "message_id": 99}), + ) as mock_send, patch( + "app.agents.youtube_publisher.service_proxy.save_pipeline_telegram_msg", + new=AsyncMock(), + ): + a = YoutubePublisherAgent() + await a.poll_state_changes() + await a.poll_state_changes() # 같은 상태 — 두 번째는 알림 안 함 + assert mock_send.call_count == 1 + + +@pytest.mark.asyncio +async def test_on_telegram_reply_approve_calls_feedback(): + from app.agents.youtube_publisher import YoutubePublisherAgent + + with patch( + "app.agents.youtube_publisher.service_proxy.post_pipeline_feedback", + new=AsyncMock(), + ) as mock_fb, patch( + "app.agents.youtube_publisher.send_raw", + new=AsyncMock(), + ): + a = YoutubePublisherAgent() + await a.on_telegram_reply(pipeline_id=42, step="cover", user_text="승인") + mock_fb.assert_called_once_with(42, "cover", "approve", None) + + +@pytest.mark.asyncio +async def test_on_telegram_reply_reject_with_feedback(): + from app.agents.youtube_publisher import YoutubePublisherAgent + + with patch( + "app.agents.youtube_publisher.service_proxy.post_pipeline_feedback", + new=AsyncMock(), + ) as mock_fb, patch( + "app.agents.youtube_publisher.send_raw", + new=AsyncMock(), + ): + a = YoutubePublisherAgent() + await a.on_telegram_reply(pipeline_id=43, step="meta", user_text="반려, 제목 짧게") + args = mock_fb.call_args[0] + assert args[0] == 43 + assert args[1] == "meta" + assert args[2] == "reject" + assert "제목 짧게" in (args[3] or "") + + +@pytest.mark.asyncio +async def test_on_telegram_reply_unclear_asks_again(): + from app.agents.youtube_publisher import YoutubePublisherAgent + + sent = [] + + async def mock_send(text=None, **kw): + sent.append(text) + return {"ok": True, "message_id": 1} + + with patch( + "app.agents.youtube_publisher.send_raw", + new=mock_send, + ), patch( + "app.agents.youtube_publisher.classify_intent.classify", + return_value=("unclear", None), + ): + a = YoutubePublisherAgent() + await a.on_telegram_reply(pipeline_id=44, step="cover", user_text="huh?") + assert any("다시 입력" in (s or "") for s in sent)