163 lines
7.3 KiB
Python
163 lines
7.3 KiB
Python
"""인스타 카드 에이전트 — 매일 09:30 뉴스 수집·키워드 추출 → 텔레그램 후보 푸시.
|
|
사용자가 키워드 버튼을 누르면 카드 슬레이트 생성 + 10장 미디어 그룹 발송."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import httpx
|
|
|
|
from .base import BaseAgent
|
|
from ..db import (
|
|
create_task, update_task_status, add_log, get_agent_config,
|
|
)
|
|
from ..config import TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID
|
|
from .. import service_proxy
|
|
from ..telegram import messaging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def _send_media_group(media: List[Dict[str, Any]], caption: str = "") -> Dict[str, Any]:
|
|
"""텔레그램 sendMediaGroup. media는 InputMediaPhoto dicts.
|
|
각 항목에는 임시 키 '_bytes'로 PNG 바이트가 담겨 있어 attach:// 형식으로 multipart 업로드."""
|
|
if not TELEGRAM_BOT_TOKEN:
|
|
return {"ok": False, "reason": "TELEGRAM_BOT_TOKEN missing"}
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMediaGroup"
|
|
files: Dict[str, tuple] = {}
|
|
for i, m in enumerate(media):
|
|
attach_key = f"photo{i+1}"
|
|
files[attach_key] = (f"{i+1}.png", m["_bytes"], "image/png")
|
|
m["media"] = f"attach://{attach_key}"
|
|
m.pop("_bytes", None)
|
|
if caption and media:
|
|
media[0]["caption"] = caption[:1024]
|
|
payload = {"chat_id": TELEGRAM_CHAT_ID, "media": json.dumps(media, ensure_ascii=False)}
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
resp = await client.post(url, data=payload, files=files)
|
|
return resp.json()
|
|
|
|
|
|
class InstaAgent(BaseAgent):
|
|
agent_id = "insta"
|
|
display_name = "인스타 큐레이터"
|
|
|
|
async def on_schedule(self) -> None:
|
|
"""09:30 매일: 뉴스 수집 → 키워드 추출 → 텔레그램 후보 푸시.
|
|
custom_config.auto_select=True면 카테고리당 1위 키워드 자동 슬레이트 생성."""
|
|
if self.state not in ("idle", "break"):
|
|
return
|
|
config = get_agent_config(self.agent_id) or {}
|
|
custom = config.get("custom_config", {}) or {}
|
|
auto_select = bool(custom.get("auto_select", False))
|
|
|
|
task_id = create_task(self.agent_id, "insta_daily", {"auto_select": auto_select},
|
|
requires_approval=False)
|
|
await self.transition("working", "뉴스 수집·키워드 추출", task_id)
|
|
try:
|
|
await self._run_collect_and_extract()
|
|
kws = await service_proxy.insta_list_keywords(used=False)
|
|
if auto_select:
|
|
await self._auto_render(kws)
|
|
else:
|
|
await self._push_keyword_candidates(kws)
|
|
update_task_status(task_id, "succeeded", {"keywords": len(kws)})
|
|
await self.transition("idle", "후보 푸시 완료")
|
|
except Exception as e:
|
|
add_log(self.agent_id, f"insta daily failed: {e}", "error", task_id)
|
|
update_task_status(task_id, "failed", {"error": str(e)})
|
|
await self.transition("idle", f"오류: {e}")
|
|
|
|
async def _run_collect_and_extract(self) -> None:
|
|
col = await service_proxy.insta_collect()
|
|
await self._wait_task(col["task_id"], step="collect", timeout_sec=300)
|
|
ext = await service_proxy.insta_extract()
|
|
await self._wait_task(ext["task_id"], step="extract", timeout_sec=300)
|
|
|
|
async def _wait_task(self, task_id: str, step: str, timeout_sec: int = 300) -> Dict[str, Any]:
|
|
attempts = max(1, timeout_sec // 5)
|
|
for _ in range(attempts):
|
|
await asyncio.sleep(5)
|
|
st = await service_proxy.insta_task_status(task_id)
|
|
if st["status"] == "succeeded":
|
|
return st
|
|
if st["status"] == "failed":
|
|
raise RuntimeError(f"{step} failed: {st.get('error')}")
|
|
raise TimeoutError(f"{step} timeout {timeout_sec}s")
|
|
|
|
async def _push_keyword_candidates(self, keywords: List[Dict[str, Any]]) -> None:
|
|
by_cat: Dict[str, List[Dict[str, Any]]] = {}
|
|
for k in keywords:
|
|
by_cat.setdefault(k["category"], []).append(k)
|
|
if not by_cat:
|
|
await messaging.send_raw("📰 [인스타 큐레이터] 오늘은 추천할 키워드가 없습니다.")
|
|
return
|
|
rows: List[List[Dict[str, Any]]] = []
|
|
text_lines = ["📰 <b>[인스타 큐레이터]</b> 오늘의 키워드 후보"]
|
|
for cat, items in by_cat.items():
|
|
text_lines.append(f"\n<b>{cat}</b>")
|
|
for k in items[:5]:
|
|
text_lines.append(f" · {k['keyword']} (score {k['score']:.2f})")
|
|
rows.append([{
|
|
"text": f"🎴 {k['keyword']}",
|
|
"callback_data": f"render_{k['id']}",
|
|
}])
|
|
await messaging.send_raw("\n".join(text_lines), reply_markup={"inline_keyboard": rows})
|
|
|
|
async def _auto_render(self, keywords: List[Dict[str, Any]]) -> None:
|
|
by_cat: Dict[str, Dict[str, Any]] = {}
|
|
for k in keywords:
|
|
cat = k["category"]
|
|
if cat not in by_cat or k["score"] > by_cat[cat]["score"]:
|
|
by_cat[cat] = k
|
|
for kw in by_cat.values():
|
|
await self._render_and_push(kw["id"])
|
|
|
|
async def _render_and_push(self, keyword_id: int) -> None:
|
|
kw = await service_proxy.insta_get_keyword(keyword_id)
|
|
if not kw:
|
|
await messaging.send_raw(f"⚠️ 키워드 {keyword_id} 없음")
|
|
return
|
|
await messaging.send_raw(f"🎨 카드 생성 중: <b>{kw['keyword']}</b>")
|
|
created = await service_proxy.insta_create_slate(
|
|
keyword=kw["keyword"], category=kw["category"], keyword_id=kw["id"],
|
|
)
|
|
st = await self._wait_task(created["task_id"], step="slate", timeout_sec=600)
|
|
slate_id = st["result_id"]
|
|
slate = await service_proxy.insta_get_slate(slate_id)
|
|
media = []
|
|
for a in slate["assets"][:10]:
|
|
data = await service_proxy.insta_get_asset_bytes(slate_id, a["page_index"])
|
|
media.append({"type": "photo", "_bytes": data})
|
|
caption = slate.get("suggested_caption", "")
|
|
hashtags = " ".join(slate.get("hashtags", []) or [])
|
|
full_caption = f"{caption}\n\n{hashtags}".strip()
|
|
await _send_media_group(media, caption=full_caption)
|
|
|
|
async def on_command(self, command: str, params: dict) -> dict:
|
|
if command == "extract":
|
|
await self._run_collect_and_extract()
|
|
kws = await service_proxy.insta_list_keywords(used=False)
|
|
await self._push_keyword_candidates(kws)
|
|
return {"ok": True, "count": len(kws)}
|
|
if command == "render":
|
|
kid = int(params.get("keyword_id") or 0)
|
|
if not kid:
|
|
return {"ok": False, "message": "keyword_id 필수"}
|
|
await self._render_and_push(kid)
|
|
return {"ok": True}
|
|
return {"ok": False, "message": f"Unknown command: {command}"}
|
|
|
|
async def on_callback(self, action: str, params: dict) -> dict:
|
|
if action == "render":
|
|
kid = int(params.get("keyword_id") or 0)
|
|
if not kid:
|
|
return {"ok": False}
|
|
await self._render_and_push(kid)
|
|
return {"ok": True}
|
|
return {"ok": False}
|
|
|
|
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
|
|
return
|