blog-lab의 generate/market/review 엔드포인트는 task_id만 즉시 반환하고 BackgroundTask로 실제 작업을 수행한다. 기존 코드는 응답에서 바로 post_id를 꺼내려 해 항상 'generate did not return post_id' 실패. 공통 폴링 헬퍼 _await_task로 research처럼 status=succeeded 대기하도록 수정. 점수는 review 완료 후 post를 다시 읽어 review_score로 판정. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
193 lines
8.0 KiB
Python
193 lines
8.0 KiB
Python
import asyncio
|
|
from typing import Optional
|
|
|
|
from .base import BaseAgent
|
|
from ..db import (
|
|
create_task, update_task_status, approve_task, reject_task,
|
|
get_task, get_agent_config, add_log,
|
|
)
|
|
from .. import service_proxy
|
|
from .. import telegram_bot
|
|
|
|
|
|
DEFAULT_TREND_KEYWORDS = [
|
|
"다이어트 식단", "재택근무 꿀템", "캠핑 장비 추천",
|
|
"홈트레이닝", "제주도 여행", "에어프라이어 레시피",
|
|
]
|
|
|
|
|
|
class BlogAgent(BaseAgent):
|
|
"""블로그 마케팅 에이전트.
|
|
|
|
매일 10:00 자동 실행: 키워드 1개 리서치 → 글 생성 → 마케터 → 평가자
|
|
→ 평가 점수와 요약을 텔레그램 승인 요청으로 푸시
|
|
→ 승인 시 `published` 상태로 전환, 거절 시 재생성
|
|
"""
|
|
|
|
agent_id = "blog"
|
|
display_name = "블로그 마케터"
|
|
|
|
async def on_schedule(self) -> None:
|
|
if self.state not in ("idle", "break"):
|
|
return
|
|
|
|
config = get_agent_config(self.agent_id) or {}
|
|
custom = config.get("custom_config", {}) or {}
|
|
keywords = custom.get("trend_keywords") or DEFAULT_TREND_KEYWORDS
|
|
if not keywords:
|
|
return
|
|
|
|
import random
|
|
keyword = random.choice(keywords)
|
|
|
|
task_id = create_task(
|
|
self.agent_id,
|
|
"auto_blog_pipeline",
|
|
{"keyword": keyword},
|
|
requires_approval=True,
|
|
)
|
|
await self.transition("working", f"리서치: {keyword}", task_id)
|
|
asyncio.create_task(self._run_pipeline(task_id, keyword))
|
|
|
|
async def _await_task(self, step: str, task_id: str, timeout_sec: int = 240) -> Optional[int]:
|
|
"""blog-lab BackgroundTask 완료 폴링. 완료 시 result_id 반환."""
|
|
attempts = max(1, timeout_sec // 5)
|
|
for _ in range(attempts):
|
|
await asyncio.sleep(5)
|
|
status = await service_proxy.blog_task_status(task_id)
|
|
s = status.get("status")
|
|
if s == "succeeded":
|
|
return status.get("result_id")
|
|
if s == "failed":
|
|
raise Exception(f"{step} failed: {status.get('error')}")
|
|
raise Exception(f"{step} timeout ({timeout_sec}s 내 완료되지 않음)")
|
|
|
|
async def _run_pipeline(self, task_id: str, keyword: str) -> None:
|
|
try:
|
|
# 1) 리서치
|
|
research = await service_proxy.blog_research(keyword)
|
|
keyword_id = await self._await_task("research", research.get("task_id"), 180)
|
|
if not keyword_id:
|
|
raise Exception("research succeeded but result_id missing")
|
|
|
|
# 2) 작가 단계 (비동기)
|
|
await self.transition("working", f"글 생성: {keyword}", task_id)
|
|
gen = await service_proxy.blog_generate(keyword_id)
|
|
post_id = await self._await_task("generate", gen.get("task_id"), 300)
|
|
if not post_id:
|
|
raise Exception("generate succeeded but post_id missing")
|
|
|
|
# 3) 마케터 단계 (비동기)
|
|
await self.transition("working", "링크 삽입 중", task_id)
|
|
mkt = await service_proxy.blog_market(post_id)
|
|
await self._await_task("market", mkt.get("task_id"), 180)
|
|
|
|
# 4) 평가자 단계 (비동기)
|
|
await self.transition("working", "품질 리뷰 중", task_id)
|
|
rev = await service_proxy.blog_review(post_id)
|
|
await self._await_task("review", rev.get("task_id"), 180)
|
|
|
|
post_after = await service_proxy.blog_get_post(post_id)
|
|
score = post_after.get("review_score")
|
|
passed = (score or 0) >= 42
|
|
|
|
title = post_after.get("title", "(제목 없음)")
|
|
excerpt = (post_after.get("body") or "")[:300]
|
|
|
|
update_task_status(task_id, "pending", {
|
|
"keyword": keyword,
|
|
"post_id": post_id,
|
|
"score": score,
|
|
"passed": passed,
|
|
"title": title,
|
|
})
|
|
|
|
await self.transition("waiting", f"승인 대기 · {score}/60", task_id)
|
|
|
|
detail = (
|
|
f"키워드: {keyword}\n"
|
|
f"제목: {title}\n"
|
|
f"평가 점수: {score}/60 ({'통과' if passed else '미통과'})\n\n"
|
|
f"{excerpt}..."
|
|
)
|
|
await telegram_bot.send_approval_request(
|
|
self.agent_id, task_id,
|
|
"✍️ [블로그 에이전트] 발행 승인 요청", detail,
|
|
)
|
|
|
|
except Exception as e:
|
|
add_log(self.agent_id, f"Blog pipeline failed: {e}", "error", task_id)
|
|
update_task_status(task_id, "failed", {"error": str(e), "keyword": keyword})
|
|
await self.transition("idle", f"오류: {e}")
|
|
await telegram_bot.send_task_result(
|
|
self.agent_id, "✍️ [블로그 에이전트] 파이프라인 실패",
|
|
f"키워드: {keyword}\n오류: {e}",
|
|
)
|
|
|
|
async def on_command(self, command: str, params: dict) -> dict:
|
|
if command == "research":
|
|
keyword = (params.get("keyword") or "").strip()
|
|
if not keyword:
|
|
return {"ok": False, "message": "keyword 필수"}
|
|
task_id = create_task(
|
|
self.agent_id, "auto_blog_pipeline",
|
|
{"keyword": keyword}, requires_approval=True,
|
|
)
|
|
await self.transition("working", f"리서치: {keyword}", task_id)
|
|
asyncio.create_task(self._run_pipeline(task_id, keyword))
|
|
return {"ok": True, "task_id": task_id, "message": f"파이프라인 시작: {keyword}"}
|
|
|
|
if command == "add_trend_keyword":
|
|
keyword = (params.get("keyword") or "").strip()
|
|
if not keyword:
|
|
return {"ok": False, "message": "keyword 필수"}
|
|
config = get_agent_config(self.agent_id) or {}
|
|
custom = config.get("custom_config", {}) or {}
|
|
kws = list(custom.get("trend_keywords") or [])
|
|
if keyword not in kws:
|
|
kws.append(keyword)
|
|
from ..db import update_agent_config
|
|
update_agent_config(self.agent_id, custom_config={**custom, "trend_keywords": kws})
|
|
return {"ok": True, "keywords": kws}
|
|
|
|
if command == "list_trend_keywords":
|
|
config = get_agent_config(self.agent_id) or {}
|
|
custom = config.get("custom_config", {}) or {}
|
|
return {"ok": True, "keywords": custom.get("trend_keywords") or DEFAULT_TREND_KEYWORDS}
|
|
|
|
return {"ok": False, "message": f"Unknown command: {command}"}
|
|
|
|
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
|
|
task = get_task(task_id)
|
|
if not task:
|
|
return
|
|
result = task.get("result_data") or {}
|
|
post_id = result.get("post_id")
|
|
|
|
if not approved:
|
|
reject_task(task_id)
|
|
await self.transition("idle", "발행 거절됨")
|
|
await telegram_bot.send_task_result(
|
|
self.agent_id, "✍️ [블로그 에이전트] 발행 취소",
|
|
f"키워드: {result.get('keyword', '')}\n사용자가 거절했습니다.",
|
|
)
|
|
return
|
|
|
|
approve_task(task_id, via="telegram")
|
|
await self.transition("reporting", "발행 중...", task_id)
|
|
|
|
try:
|
|
if post_id:
|
|
await service_proxy.blog_publish(int(post_id))
|
|
update_task_status(task_id, "succeeded", {**result, "published": True})
|
|
await telegram_bot.send_task_result(
|
|
self.agent_id, "✍️ [블로그 에이전트] 발행 완료",
|
|
f"키워드: {result.get('keyword', '')}\n제목: {result.get('title', '')}\n"
|
|
f"점수: {result.get('score')}/60",
|
|
)
|
|
await self.transition("idle", "발행 완료")
|
|
except Exception as e:
|
|
add_log(self.agent_id, f"Blog publish failed: {e}", "error", task_id)
|
|
update_task_status(task_id, "failed", {**result, "publish_error": str(e)})
|
|
await self.transition("idle", f"발행 오류: {e}")
|