버그1: /feedback approve가 bg task 시작 전에 state를 next_pending으로 set → polling이 빈 video_url로 알림 발송. bg task의 run_step이 state를 set하도록 일임 — 이중 update 제거. 버그2: reject 후 같은 *_pending 상태로 재생성됐을 때 dedupe에 막혀 알림이 안 감. dedupe 키에 feedback_count_per_step[step]을 포함 — 재생성마다 count가 증가하므로 키가 달라져 재알림 동작.
113 lines
4.4 KiB
Python
113 lines
4.4 KiB
Python
"""텔레그램 단일 채널로 단계별 승인 인터랙션 오케스트레이션."""
|
|
import logging
|
|
|
|
from .base import BaseAgent
|
|
from . import classify_intent
|
|
from .. import service_proxy
|
|
from ..db import add_log
|
|
from ..telegram.messaging import send_raw
|
|
|
|
logger = logging.getLogger("agent-office.youtube_publisher")
|
|
|
|
|
|
_STEP_TITLES = {
|
|
"cover_pending": ("커버 아트", "cover"),
|
|
"video_pending": ("영상 비주얼", "video"),
|
|
"thumb_pending": ("썸네일", "thumb"),
|
|
"meta_pending": ("메타데이터", "meta"),
|
|
"publish_pending": ("최종 검토 + 발행", "publish"),
|
|
}
|
|
|
|
|
|
class YoutubePublisherAgent(BaseAgent):
|
|
agent_id = "youtube_publisher"
|
|
display_name = "YouTube 퍼블리셔"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
|
|
|
async def poll_state_changes(self) -> None:
|
|
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
|
|
try:
|
|
pipelines = await service_proxy.list_active_pipelines()
|
|
except Exception as e:
|
|
logger.warning("폴링 실패: %s", e)
|
|
return
|
|
|
|
for p in pipelines:
|
|
state = p.get("state")
|
|
pid = p.get("id")
|
|
if pid is None:
|
|
continue
|
|
if state in _STEP_TITLES:
|
|
_, step = _STEP_TITLES[state]
|
|
fb_count = (p.get("feedback_count_per_step") or {}).get(step, 0)
|
|
key = (state, fb_count)
|
|
if self._notified_state_per_pipeline.get(pid) != key:
|
|
await self._notify_step(p)
|
|
self._notified_state_per_pipeline[pid] = key
|
|
|
|
async def _notify_step(self, pipeline: dict) -> None:
|
|
state = pipeline["state"]
|
|
title_name, step = _STEP_TITLES[state]
|
|
body = self._format_body(pipeline, step)
|
|
track_title = pipeline.get("track_title") or f"Pipeline #{pipeline['id']}"
|
|
text = (
|
|
f"🎵 [{track_title}] {title_name} 검토\n\n"
|
|
f"{body}\n\n"
|
|
f"➡️ 답장으로 알려주세요: '승인' 또는 '반려 + 수정 방향'"
|
|
)
|
|
sent = await send_raw(text=text)
|
|
if sent.get("ok"):
|
|
msg_id = sent.get("message_id")
|
|
try:
|
|
await service_proxy.save_pipeline_telegram_msg(pipeline["id"], step, msg_id)
|
|
except Exception as e:
|
|
logger.warning("telegram-msg 저장 실패: %s", e)
|
|
add_log(self.agent_id, f"pipeline {pipeline['id']} {step} 알림 전송", "info")
|
|
|
|
def _format_body(self, p: dict, step: str) -> str:
|
|
if step == "cover":
|
|
return f"🖼️ 커버: {p.get('cover_url', '-')}"
|
|
if step == "video":
|
|
return f"🎬 영상: {p.get('video_url', '-')}"
|
|
if step == "thumb":
|
|
return f"🎴 썸네일: {p.get('thumbnail_url', '-')}"
|
|
if step == "meta":
|
|
m = p.get("metadata", {}) or {}
|
|
tags = m.get("tags", []) or []
|
|
description = (m.get("description", "") or "")
|
|
return (
|
|
f"📝 제목: {m.get('title', '')}\n"
|
|
f"🏷️ 태그: {', '.join(tags[:8])}\n"
|
|
f"📄 설명(앞부분): {description[:200]}"
|
|
)
|
|
if step == "publish":
|
|
r = p.get("review", {}) or {}
|
|
return (
|
|
f"AI 검토 결과: {r.get('verdict', '?')} "
|
|
f"(가중 {r.get('weighted_total', '?')}/100)\n"
|
|
f"{r.get('summary', '')}"
|
|
)
|
|
return ""
|
|
|
|
async def on_telegram_reply(self, pipeline_id: int, step: str, user_text: str) -> None:
|
|
intent, feedback = classify_intent.classify(user_text)
|
|
if intent == "unclear":
|
|
await send_raw("다시 입력해주세요. 예: '승인' 또는 '반려, 제목 짧게'")
|
|
return
|
|
try:
|
|
await service_proxy.post_pipeline_feedback(pipeline_id, step, intent, feedback)
|
|
except Exception as e:
|
|
await send_raw(f"⚠️ 처리 실패: {e}")
|
|
|
|
async def on_schedule(self) -> None:
|
|
await self.poll_state_changes()
|
|
|
|
async def on_command(self, command: str, params: dict) -> dict:
|
|
return {"ok": False, "message": f"Unknown command: {command}"}
|
|
|
|
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
|
|
pass
|