"""텔레그램 단일 채널로 단계별 승인 인터랙션 오케스트레이션.""" import logging from .base import BaseAgent from . import classify_intent from .. import service_proxy from ..db import add_log from ..telegram.messaging import send_raw logger = logging.getLogger("agent-office.youtube_publisher") _STEP_TITLES = { "cover_pending": ("커버 아트", "cover"), "video_pending": ("영상 비주얼", "video"), "thumb_pending": ("썸네일", "thumb"), "meta_pending": ("메타데이터", "meta"), "publish_pending": ("최종 검토 + 발행", "publish"), } class YoutubePublisherAgent(BaseAgent): agent_id = "youtube_publisher" display_name = "YouTube 퍼블리셔" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._notified_state_per_pipeline: dict[int, str] = {} async def poll_state_changes(self) -> None: """주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송.""" try: pipelines = await service_proxy.list_active_pipelines() except Exception as e: logger.warning("폴링 실패: %s", e) return for p in pipelines: state = p.get("state") pid = p.get("id") if pid is None: continue if state in _STEP_TITLES and self._notified_state_per_pipeline.get(pid) != state: await self._notify_step(p) self._notified_state_per_pipeline[pid] = state async def _notify_step(self, pipeline: dict) -> None: state = pipeline["state"] title_name, step = _STEP_TITLES[state] body = self._format_body(pipeline, step) track_title = pipeline.get("track_title") or f"Pipeline #{pipeline['id']}" text = ( f"🎵 [{track_title}] {title_name} 검토\n\n" f"{body}\n\n" f"➡️ 답장으로 알려주세요: '승인' 또는 '반려 + 수정 방향'" ) sent = await send_raw(text=text) if sent.get("ok"): msg_id = sent.get("message_id") try: await service_proxy.save_pipeline_telegram_msg(pipeline["id"], step, msg_id) except Exception as e: logger.warning("telegram-msg 저장 실패: %s", e) add_log(self.agent_id, f"pipeline {pipeline['id']} {step} 알림 전송", "info") def _format_body(self, p: dict, step: str) -> str: if step == "cover": return f"🖼️ 커버: {p.get('cover_url', '-')}" if step == "video": return f"🎬 영상: {p.get('video_url', '-')}" if step == "thumb": return f"🎴 썸네일: {p.get('thumbnail_url', '-')}" if step == "meta": m = p.get("metadata", {}) or {} tags = m.get("tags", []) or [] description = (m.get("description", "") or "") return ( f"📝 제목: {m.get('title', '')}\n" f"🏷️ 태그: {', '.join(tags[:8])}\n" f"📄 설명(앞부분): {description[:200]}" ) if step == "publish": r = p.get("review", {}) or {} return ( f"AI 검토 결과: {r.get('verdict', '?')} " f"(가중 {r.get('weighted_total', '?')}/100)\n" f"{r.get('summary', '')}" ) return "" async def on_telegram_reply(self, pipeline_id: int, step: str, user_text: str) -> None: intent, feedback = classify_intent.classify(user_text) if intent == "unclear": await send_raw("다시 입력해주세요. 예: '승인' 또는 '반려, 제목 짧게'") return try: await service_proxy.post_pipeline_feedback(pipeline_id, step, intent, feedback) except Exception as e: await send_raw(f"⚠️ 처리 실패: {e}") async def on_schedule(self) -> None: await self.poll_state_changes() async def on_command(self, command: str, params: dict) -> dict: return {"ok": False, "message": f"Unknown command: {command}"} async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: pass