기존 Stock/Music 에이전트 패턴을 따라 2개 신규 에이전트 도입. - Blog 에이전트 (10:00 매일): 트렌드 키워드 1개 자동 선택 → blog-lab 파이프라인 전체 (research→generate→market→review) 자동 실행 → 평가 점수와 본문 요약을 텔레그램 승인 요청으로 푸시 → 승인 시 published 전환, 거절 시 작업 종료 - Realestate 에이전트 (09:15 매일): realestate-lab 수집 트리거 → 신규 매칭 상위 5건 + 대시보드를 텔레그램 리포트 → 조회한 매칭은 자동 읽음 처리 - service_proxy: blog-lab/realestate-lab REST 호출 래퍼 추가 - agents 레지스트리 + DB 시드 + 스케줄러 3개 잡 등록 - docker-compose: agent-office에 BLOG_LAB_URL/REALESTATE_LAB_URL 주입 - README: 에이전트 구성 표 + 명령어 + 스케줄러 잡 정리 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
99 lines
3.7 KiB
Python
99 lines
3.7 KiB
Python
import asyncio
|
|
from .base import BaseAgent
|
|
from ..db import create_task, update_task_status, add_log
|
|
from .. import service_proxy
|
|
from .. import telegram_bot
|
|
|
|
|
|
class RealestateAgent(BaseAgent):
|
|
"""부동산 청약 에이전트.
|
|
|
|
매일 09:15 자동 실행: realestate-lab의 수집을 트리거하고
|
|
신규 매칭 결과를 텔레그램으로 푸시 (승인 없는 리포트형).
|
|
"""
|
|
|
|
agent_id = "realestate"
|
|
display_name = "청약 애널리스트"
|
|
|
|
async def on_schedule(self) -> None:
|
|
if self.state not in ("idle", "break"):
|
|
return
|
|
|
|
task_id = create_task(self.agent_id, "daily_match_report", {})
|
|
await self.transition("working", "청약 공고 수집 중", task_id)
|
|
|
|
try:
|
|
collect = await service_proxy.realestate_collect()
|
|
new_count = collect.get("new_count", 0) or 0
|
|
|
|
await self.transition("working", "신규 매칭 조회 중", task_id)
|
|
matches = await service_proxy.realestate_matches(limit=20)
|
|
dashboard = await service_proxy.realestate_dashboard()
|
|
|
|
await self.transition("reporting", "리포트 전송 중", task_id)
|
|
|
|
if not matches:
|
|
body = (
|
|
f"수집된 신규 공고: {new_count}건\n"
|
|
f"진행 중 공고: {dashboard.get('active_count', 0)}건\n"
|
|
f"신규 매칭: 없음"
|
|
)
|
|
else:
|
|
lines = [
|
|
f"📌 수집 {new_count}건 / 매칭 {len(matches)}건",
|
|
"",
|
|
]
|
|
for m in matches[:5]:
|
|
title = m.get("title") or m.get("announcement_title") or "(제목 없음)"
|
|
region = m.get("region") or ""
|
|
score = m.get("match_score") or m.get("score") or ""
|
|
lines.append(f"• [{region}] {title} (매칭 {score})")
|
|
if len(matches) > 5:
|
|
lines.append(f"… 외 {len(matches) - 5}건")
|
|
body = "\n".join(lines)
|
|
|
|
tg = await telegram_bot.send_task_result(
|
|
self.agent_id,
|
|
"🏢 [청약 에이전트] 오늘의 매칭 리포트",
|
|
body,
|
|
)
|
|
|
|
# 확인한 매칭 read 처리
|
|
for m in matches[:5]:
|
|
mid = m.get("id")
|
|
if mid:
|
|
try:
|
|
await service_proxy.realestate_mark_read(int(mid))
|
|
except Exception:
|
|
pass
|
|
|
|
update_task_status(task_id, "succeeded", {
|
|
"new_count": new_count,
|
|
"match_count": len(matches),
|
|
"telegram_sent": tg.get("ok", False),
|
|
"telegram_message_id": tg.get("message_id"),
|
|
})
|
|
await self.transition("idle", f"매칭 {len(matches)}건")
|
|
|
|
except Exception as e:
|
|
add_log(self.agent_id, f"Realestate report failed: {e}", "error", task_id)
|
|
update_task_status(task_id, "failed", {"error": str(e)})
|
|
await self.transition("idle", f"오류: {e}")
|
|
|
|
async def on_command(self, command: str, params: dict) -> dict:
|
|
if command == "fetch_matches":
|
|
await self.on_schedule()
|
|
return {"ok": True, "message": "매칭 리포트 시작"}
|
|
|
|
if command == "dashboard":
|
|
try:
|
|
data = await service_proxy.realestate_dashboard()
|
|
return {"ok": True, "dashboard": data}
|
|
except Exception as e:
|
|
return {"ok": False, "message": str(e)}
|
|
|
|
return {"ok": False, "message": f"Unknown command: {command}"}
|
|
|
|
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
|
|
pass
|