Merge branch 'feat/realestate-targeting'

청약 서비스 타겟팅 고도화 — 12 task TDD 구현
- realestate-lab: 자치구 5티어 가중치 매칭, 30일 윈도우 수집, 90일 grace 자동 정리
- agent-office: 신규 매칭 즉시 텔레그램 푸시 + 인라인 키보드 (북마크/공고 보기)
- 단일 SoT: preferred_districts/min_match_score/notify_enabled 프로필 필드
- 데일리 리포트 cron 폐기, 09:00 수집 직후 push로 통합
- 62 tests passing (realestate-lab 44 + agent-office 18)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-28 09:55:23 +09:00
27 changed files with 1377 additions and 104 deletions

View File

@@ -82,3 +82,9 @@ SUNO_API_KEY=
# CORS 허용 도메인 (콤마 구분)
CORS_ALLOW_ORIGINS=https://gahusb.synology.me,http://localhost:3007,http://localhost:8080
# [REALESTATE LAB — agent-office push notify]
AGENT_OFFICE_URL=http://agent-office:8000
REALESTATE_LAB_URL=http://realestate-lab:8000
REALESTATE_DASHBOARD_URL=http://localhost:8080/realestate
REALESTATE_NOTIFY_TIMEOUT=15

View File

@@ -1,89 +1,68 @@
import asyncio
from .base import BaseAgent
from ..db import create_task, update_task_status, add_log
from .. import service_proxy
from .. import telegram_bot
from ..telegram import messaging
from ..telegram.realestate_message import format_realestate_matches, build_match_keyboard
class RealestateAgent(BaseAgent):
"""부동산 청약 에이전트.
매일 09:15 자동 실행: realestate-lab의 수집을 트리거하고
신규 매칭 결과를 텔레그램으로 푸시 (승인 없는 리포트형).
realestate-lab이 신규 매칭 발견 시 /realestate/notify로 push해 트리거됨.
on_new_matches가 메인 진입점. on_schedule은 사용하지 않음(cron 폐기).
"""
agent_id = "realestate"
display_name = "청약 애널리스트"
async def on_schedule(self) -> None:
if self.state not in ("idle", "break"):
return
async def on_new_matches(self, matches: list[dict]) -> dict:
"""신규 매칭 N건을 텔레그램 1통으로 푸시.
성공 시 sent_ids 반환 → realestate-lab이 notified_at 마킹.
실패 시 sent=0, sent_ids=[] 반환 → 다음 사이클 재시도.
"""
if not matches:
return {"sent": 0, "sent_ids": []}
task_id = create_task(self.agent_id, "daily_match_report", {})
await self.transition("working", "청약 공고 수집 중", task_id)
task_id = create_task(self.agent_id, "notify_matches", {"count": len(matches)})
try:
collect = await service_proxy.realestate_collect()
new_count = collect.get("new_count", 0) or 0
text = format_realestate_matches(matches)
keyboard = build_match_keyboard(matches)
await self.transition("reporting", f"매칭 {len(matches)}건 알림", task_id)
await self.transition("working", "신규 매칭 조회 중", task_id)
matches = await service_proxy.realestate_matches(limit=20)
dashboard = await service_proxy.realestate_dashboard()
await self.transition("reporting", "리포트 전송 중", task_id)
if not matches:
body = (
f"수집된 신규 공고: {new_count}\n"
f"진행 중 공고: {dashboard.get('active_count', 0)}\n"
f"신규 매칭: 없음"
)
else:
lines = [
f"📌 수집 {new_count}건 / 매칭 {len(matches)}",
"",
]
for m in matches[:5]:
title = m.get("title") or m.get("announcement_title") or "(제목 없음)"
region = m.get("region") or ""
score = m.get("match_score") or m.get("score") or ""
lines.append(f"• [{region}] {title} (매칭 {score})")
if len(matches) > 5:
lines.append(f"… 외 {len(matches) - 5}")
body = "\n".join(lines)
tg = await telegram_bot.send_task_result(
self.agent_id,
"🏢 [청약 에이전트] 오늘의 매칭 리포트",
body,
)
# 확인한 매칭 read 처리
for m in matches[:5]:
mid = m.get("id")
if mid:
try:
await service_proxy.realestate_mark_read(int(mid))
except Exception:
pass
tg = await messaging.send_raw(text, reply_markup=keyboard)
if not tg.get("ok"):
update_task_status(task_id, "failed", {"error": tg.get("description")})
await self.transition("idle", "알림 실패")
return {"sent": 0, "sent_ids": [], "error": tg.get("description")}
sent_ids = [m["id"] for m in matches if "id" in m]
update_task_status(task_id, "succeeded", {
"new_count": new_count,
"match_count": len(matches),
"telegram_sent": tg.get("ok", False),
"sent": len(matches),
"telegram_message_id": tg.get("message_id"),
})
await self.transition("idle", f"매칭 {len(matches)}")
await self.transition("idle", f"매칭 {len(matches)} 알림 완료")
return {
"sent": len(matches),
"sent_ids": sent_ids,
"message_id": tg.get("message_id"),
}
except Exception as e:
add_log(self.agent_id, f"Realestate report failed: {e}", "error", task_id)
add_log(self.agent_id, f"on_new_matches failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}")
return {"sent": 0, "sent_ids": [], "error": str(e)}
async def on_command(self, command: str, params: dict) -> dict:
if command == "fetch_matches":
await self.on_schedule()
return {"ok": True, "message": "매칭 리포트 시작"}
try:
matches = await service_proxy.realestate_matches(limit=20)
if not matches:
return {"ok": True, "message": "매칭 없음"}
result = await self.on_new_matches(matches)
return {"ok": True, "result": result}
except Exception as e:
return {"ok": False, "message": str(e)}
if command == "dashboard":
try:

View File

@@ -180,3 +180,22 @@ def conversation_stats(days: int = 7):
@app.get("/api/agent-office/activity")
def activity_feed(limit: int = 50, offset: int = 0):
return get_activity_feed(limit, offset)
# --- Realestate Agent Push Endpoint ---
from pydantic import BaseModel
from typing import List, Dict, Any
class RealestateNotifyBody(BaseModel):
matches: List[Dict[str, Any]]
@app.post("/api/agent-office/realestate/notify")
async def realestate_notify(body: RealestateNotifyBody):
agent = get_agent("realestate")
if agent is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="RealestateAgent not initialized")
return await agent.on_new_matches(body.matches)

View File

@@ -14,11 +14,6 @@ async def _run_stock_schedule():
if agent:
await agent.on_schedule()
async def _run_realestate_schedule():
agent = AGENT_REGISTRY.get("realestate")
if agent:
await agent.on_schedule()
async def _run_blog_schedule():
agent = AGENT_REGISTRY.get("blog")
if agent:
@@ -31,7 +26,6 @@ async def _run_lotto_schedule():
def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
scheduler.add_job(_run_realestate_schedule, "cron", hour=9, minute=15, id="realestate_report")
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate")
scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check")

View File

@@ -112,13 +112,15 @@ async def realestate_collect() -> Dict[str, Any]:
async def realestate_matches(limit: int = 20) -> List[Dict[str, Any]]:
resp = await _client.get(
f"{REALESTATE_LAB_URL}/api/realestate/matches",
params={"limit": limit, "unread_only": True},
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else data.get("matches", [])
"""realestate-lab의 GET /api/realestate/matches 호출."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{REALESTATE_LAB_URL}/api/realestate/matches",
params={"size": limit},
)
resp.raise_for_status()
data = resp.json()
return data.get("items", [])
async def realestate_dashboard() -> Dict[str, Any]:
@@ -133,6 +135,16 @@ async def realestate_mark_read(match_id: int) -> Dict[str, Any]:
return resp.json()
async def realestate_bookmark_toggle(announcement_id: int) -> Dict[str, Any]:
"""realestate-lab의 PATCH /api/realestate/announcements/{id}/bookmark 호출."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.patch(
f"{REALESTATE_LAB_URL}/api/realestate/announcements/{announcement_id}/bookmark"
)
resp.raise_for_status()
return resp.json()
# --- lotto-backend ---
async def lotto_candidates(n: int = 20) -> Dict[str, Any]:

View File

@@ -0,0 +1,93 @@
"""청약 매칭 알림 — 텔레그램 메시지 포맷터 + 인라인 키보드 빌더."""
import os
from html import escape as _h
from typing import Optional
DASHBOARD_URL = os.getenv("REALESTATE_DASHBOARD_URL", "https://example.com/realestate")
def _format_one_compact(m: dict) -> str:
score = m.get("match_score", 0)
name = _h(m.get("house_nm") or "(제목 없음)")
district = m.get("district") or ""
region = m.get("region_name") or ""
where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상"
rstart = m.get("receipt_start") or ""
rend = m.get("receipt_end") or ""
return (
f"{score}점 — <b>{name}</b>\n"
f"📍 {_h(where)} 📅 {_h(rstart)} ~ {_h(rend)}"
)
def _format_one_full(m: dict) -> str:
score = m.get("match_score", 0)
name = _h(m.get("house_nm") or "(제목 없음)")
district = m.get("district") or ""
region = m.get("region_name") or ""
flags = []
if m.get("is_speculative_area") == "Y":
flags.append("투기과열")
if m.get("is_price_cap") == "Y":
flags.append("분양가상한제")
flag_str = f" ({', '.join(flags)})" if flags else ""
rstart = m.get("receipt_start") or ""
rend = m.get("receipt_end") or ""
elig = m.get("eligible_types") or []
reasons = m.get("match_reasons") or []
where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상"
lines = [
f"{score}점 — <b>{name}</b>",
f"📍 {_h(where)}{_h(flag_str)}",
f"📅 청약 {_h(rstart)} ~ {_h(rend)}",
]
if elig:
lines.append(f"✓ 자격: {_h(', '.join(elig))}")
if reasons:
lines.append(f"💡 {_h(' / '.join(reasons[:4]))}")
return "\n".join(lines)
def format_realestate_matches(matches: list[dict]) -> str:
"""매칭 목록을 텔레그램 HTML 메시지로 변환.
1~2건은 풀 카드, 3건 이상은 묶음 카드(상위 5건).
"""
if not matches:
return "🏢 새 청약 매칭이 없습니다."
if len(matches) <= 2:
body = "\n\n".join(_format_one_full(m) for m in matches)
return f"🏢 <b>새 청약 매칭 {len(matches)}건</b>\n━━━━━━━━━━\n\n{body}"
top = matches[:5]
body = "\n\n".join(_format_one_compact(m) for m in top)
suffix = f"\n\n…외 {len(matches) - 5}" if len(matches) > 5 else ""
return f"🏢 <b>새 청약 매칭 {len(matches)}건</b>\n━━━━━━━━━━\n\n{body}{suffix}"
def build_match_keyboard(matches: list[dict]) -> Optional[dict]:
"""1~2건: 매치별 [북마크][공고 보기] 행. 3건 이상: [전체 보기] 단일 행."""
if not matches:
return None
if len(matches) <= 2:
rows = []
for m in matches:
buttons = [{
"text": "🔖 북마크",
"callback_data": f"realestate_bookmark_{m['id']}",
}]
url = m.get("pblanc_url")
if url:
buttons.append({"text": "📄 공고 보기", "url": url})
rows.append(buttons)
return {"inline_keyboard": rows}
return {
"inline_keyboard": [[
{"text": "📋 전체 보기", "url": DASHBOARD_URL},
]],
}

View File

@@ -30,8 +30,13 @@ async def handle_webhook(data: dict, agent_dispatcher=None) -> Optional[dict]:
async def _handle_callback(callback_query: dict) -> Optional[dict]:
"""기존 승인/거절 콜백 처리 로직."""
"""승인/거절 및 realestate 북마크 콜백 처리."""
callback_id = callback_query.get("data", "")
# realestate 북마크 토글 콜백 — DB 조회 없이 직접 처리
if callback_id.startswith("realestate_bookmark_"):
return await _handle_realestate_bookmark(callback_query, callback_id)
cb = get_telegram_callback(callback_id)
if not cb:
return None
@@ -60,6 +65,38 @@ async def _handle_callback(callback_query: dict) -> Optional[dict]:
}
async def _handle_realestate_bookmark(callback_query: dict, callback_id: str) -> dict:
"""realestate_bookmark_{announcement_id} 콜백 처리."""
from .. import service_proxy
from .messaging import send_raw
# answerCallbackQuery 먼저 — 텔레그램 로딩 스피너 해제
await api_call(
"answerCallbackQuery",
{"callback_query_id": callback_query["id"], "text": "처리 중..."},
)
try:
ann_id = int(callback_id.removeprefix("realestate_bookmark_"))
except ValueError:
await send_raw("⚠️ 잘못된 북마크 콜백 데이터")
return {"ok": False, "error": "invalid_callback_data"}
try:
result = await service_proxy.realestate_bookmark_toggle(ann_id)
is_on = result.get("is_bookmarked")
if is_on == 1:
await send_raw(f"🔖 북마크 추가 완료 (#{ann_id})")
elif is_on == 0:
await send_raw(f"🔖 북마크 해제 완료 (#{ann_id})")
else:
await send_raw(f"🔖 북마크 토글 완료 (#{ann_id})")
return {"ok": True, "announcement_id": ann_id}
except Exception as e:
await send_raw(f"⚠️ 북마크 처리 실패: {e}")
return {"ok": False, "error": str(e)}
async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]:
"""슬래시 명령 메시지 처리."""
from .router import parse_command, resolve_agent_command, HELP_TEXT

View File

@@ -0,0 +1,99 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
@pytest.fixture(autouse=True)
def _init_db():
import gc
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
from app.db import init_db
init_db()
yield
gc.collect()
def test_on_new_matches_returns_empty_when_no_matches():
from app.agents.realestate import RealestateAgent
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches([]))
assert result == {"sent": 0, "sent_ids": []}
def test_on_new_matches_sends_telegram_and_returns_ids():
from app.agents.realestate import RealestateAgent
from app.telegram import messaging
matches = [{
"id": 7, "match_score": 80, "house_nm": "단지A",
"region_name": "서울특별시", "district": "강남구",
"receipt_start": "2026-05-01", "receipt_end": "2026-05-05",
"match_reasons": [], "eligible_types": [], "pblanc_url": "https://x.test/7",
}]
fake_send = AsyncMock(return_value={"ok": True, "message_id": 123})
with patch.object(messaging, "send_raw", fake_send):
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches(matches))
assert result["sent"] == 1
assert result["sent_ids"] == [7]
assert result["message_id"] == 123
fake_send.assert_awaited_once()
args, kwargs = fake_send.call_args
text = args[0]
assert "단지A" in text
def test_on_new_matches_telegram_failure_returns_zero():
from app.agents.realestate import RealestateAgent
from app.telegram import messaging
matches = [{
"id": 8, "match_score": 80, "house_nm": "단지B",
"region_name": "서울", "district": "송파구",
"receipt_start": "", "receipt_end": "",
"match_reasons": [], "eligible_types": [], "pblanc_url": "",
}]
fake_send = AsyncMock(return_value={"ok": False, "description": "401"})
with patch.object(messaging, "send_raw", fake_send):
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches(matches))
assert result["sent"] == 0
assert result["sent_ids"] == []
assert "error" in result
def test_endpoint_calls_agent_on_new_matches():
from fastapi.testclient import TestClient
from app.main import app
from app.agents.realestate import RealestateAgent
fake = AsyncMock(return_value={"sent": 1, "sent_ids": [99], "message_id": 1})
with patch.object(RealestateAgent, "on_new_matches", fake):
with TestClient(app) as client:
resp = client.post(
"/api/agent-office/realestate/notify",
json={"matches": [{"id": 99, "match_score": 80}]},
)
assert resp.status_code == 200
body = resp.json()
assert body["sent"] == 1
assert body["sent_ids"] == [99]

View File

@@ -0,0 +1,133 @@
import os
import sys
import tempfile
import gc
from unittest.mock import AsyncMock, patch
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import pytest
@pytest.fixture(autouse=True)
def _init_db():
gc.collect()
if os.path.exists(_TMP):
try:
os.remove(_TMP)
except PermissionError:
pass
from app.db import init_db
init_db()
yield
def test_callback_realestate_bookmark_calls_proxy():
"""callback_data 'realestate_bookmark_42' 가 service_proxy.realestate_bookmark_toggle(42) 를 호출하고
is_bookmarked=1 이면 '추가 완료' 메시지를 전송한다."""
from app import service_proxy
from app.telegram import webhook
fake_toggle = AsyncMock(return_value={"is_bookmarked": 1})
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb1",
"from": {"id": 1},
"data": "realestate_bookmark_42",
}
}
with patch.object(service_proxy, "realestate_bookmark_toggle", fake_toggle), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
fake_toggle.assert_awaited_once_with(42)
assert result == {"ok": True, "announcement_id": 42}
args, _ = fake_send.call_args
assert "추가" in args[0]
def test_callback_realestate_bookmark_invalid_id():
"""callback_data 'realestate_bookmark_abc' 는 ValueError를 처리하고 에러 응답 반환."""
from app import service_proxy
from app.telegram import webhook
fake_toggle = AsyncMock(return_value={"bookmarked": True})
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb2",
"from": {"id": 1},
"data": "realestate_bookmark_abc",
}
}
with patch.object(service_proxy, "realestate_bookmark_toggle", fake_toggle), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
fake_toggle.assert_not_awaited()
assert result is not None
assert result.get("ok") is False
assert result.get("error") == "invalid_callback_data"
def test_callback_realestate_bookmark_proxy_error():
"""service_proxy 가 예외를 던질 때 에러 응답 반환."""
from app import service_proxy
from app.telegram import webhook
fake_toggle = AsyncMock(side_effect=Exception("connection refused"))
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb3",
"from": {"id": 1},
"data": "realestate_bookmark_99",
}
}
with patch.object(service_proxy, "realestate_bookmark_toggle", fake_toggle), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
fake_toggle.assert_awaited_once_with(99)
assert result is not None
assert result.get("ok") is False
assert "connection refused" in result.get("error", "")
def test_non_realestate_callback_uses_db_path():
"""approve_*/reject_* 콜백은 기존 DB 조회 경로를 사용 (realestate 분기를 타지 않음)."""
from app.telegram import webhook
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb4",
"from": {"id": 1},
"data": "approve_abcd1234",
}
}
# DB에 등록되지 않은 콜백이므로 None 반환 — 기존 로직 진입 확인
with patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
assert result is None # DB에 없으면 None 반환 (기존 동작 유지)

View File

@@ -0,0 +1,59 @@
def test_format_realestate_match_full_card_single():
from app.telegram.realestate_message import format_realestate_matches
matches = [{
"id": 1,
"match_score": 90,
"house_nm": "디에이치 강남",
"region_name": "서울특별시",
"district": "강남구",
"is_speculative_area": "Y",
"is_price_cap": "Y",
"receipt_start": "2026-05-15",
"receipt_end": "2026-05-19",
"match_reasons": ["광역 일치", "자치구 S티어: 강남구 (+25)", "예산 범위"],
"eligible_types": ["일반1순위", "특별-신혼부부"],
"pblanc_url": "https://example.com/p/1",
}]
text = format_realestate_matches(matches)
assert "디에이치 강남" in text
assert "90점" in text
assert "강남구" in text
assert "2026-05-15" in text
def test_format_realestate_match_compact_when_three_or_more():
from app.telegram.realestate_message import format_realestate_matches
matches = [
{"id": i, "match_score": 90 - i, "house_nm": f"단지{i}", "district": "강남구",
"region_name": "서울특별시", "receipt_start": "2026-05-15", "receipt_end": "2026-05-19",
"match_reasons": [], "eligible_types": [], "pblanc_url": ""}
for i in range(3)
]
text = format_realestate_matches(matches)
assert "3건" in text or "3" in text
for i in range(3):
assert f"단지{i}" in text
def test_build_keyboard_single_match_has_bookmark_and_url():
from app.telegram.realestate_message import build_match_keyboard
matches = [{"id": 42, "pblanc_url": "https://example.com/p/42"}]
kb = build_match_keyboard(matches)
rows = kb["inline_keyboard"]
flat = [b for row in rows for b in row]
assert any(b.get("callback_data", "").startswith("realestate_bookmark_42") for b in flat)
assert any(b.get("url") == "https://example.com/p/42" for b in flat)
def test_build_keyboard_multi_matches_uses_dashboard_link():
from app.telegram.realestate_message import build_match_keyboard
matches = [{"id": i, "pblanc_url": ""} for i in range(3)]
kb = build_match_keyboard(matches)
flat = [b for row in kb["inline_keyboard"] for b in row]
# 3건 이상이면 [전체 보기] 단일 URL 버튼
assert any("전체" in b.get("text", "") for b in flat)
def test_build_keyboard_empty_returns_none():
from app.telegram.realestate_message import build_match_keyboard
assert build_match_keyboard([]) is None

View File

@@ -104,6 +104,7 @@ services:
- TZ=${TZ:-Asia/Seoul}
- DATA_GO_KR_API_KEY=${DATA_GO_KR_API_KEY:-}
- CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080}
- AGENT_OFFICE_URL=${AGENT_OFFICE_URL:-http://agent-office:8000}
volumes:
- ${RUNTIME_PATH}/data/realestate:/app/data
healthcheck:
@@ -126,6 +127,7 @@ services:
- MUSIC_LAB_URL=http://music-lab:8000
- BLOG_LAB_URL=http://blog-lab:8000
- REALESTATE_LAB_URL=http://realestate-lab:8000
- REALESTATE_DASHBOARD_URL=${REALESTATE_DASHBOARD_URL:-http://localhost:8080/realestate}
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN:-}
- TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID:-}
- TELEGRAM_WEBHOOK_URL=${TELEGRAM_WEBHOOK_URL:-}

View File

@@ -1,9 +1,11 @@
import os
import re
import logging
from datetime import date, timedelta
import requests
from typing import List, Dict, Any
from .db import upsert_announcement, upsert_model, save_collect_log
from .db import upsert_announcement, upsert_model, save_collect_log, compute_status
logger = logging.getLogger("realestate-lab")
@@ -19,6 +21,19 @@ DETAIL_ENDPOINTS = [
("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"),
]
DISTRICT_PATTERN = re.compile(r"(?:서울특별시|서울시|서울)\s*(\S+?(?:구|군))")
def _extract_district(parsed: Dict[str, Any]) -> str | None:
"""파싱된 공고에서 자치구를 추출. 서울 외 지역·실패 시 None."""
for src in (parsed.get("address"), parsed.get("region_name")):
if not src:
continue
m = DISTRICT_PATTERN.search(src)
if m:
return m.group(1)
return None
def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]:
"""페이지네이션 처리하여 API 전체 데이터를 반환한다."""
@@ -130,28 +145,49 @@ def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]:
def collect_all() -> Dict[str, Any]:
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다."""
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.
모집공고일 30일 이전 데이터는 API 파라미터로 사전 좁힘.
status='완료'로 판정되는 응답은 저장하지 않음.
"""
if not API_KEY:
logger.warning("API 키 미설정 — 수집 중단")
save_collect_log(0, 0, "API 키 미설정")
return {"new_count": 0, "total_count": 0}
today = date.today()
date_from = (today - timedelta(days=30)).strftime("%Y%m%d")
total_count = 0
new_count = 0
skipped_completed = 0
for detail_ep, model_ep in DETAIL_ENDPOINTS:
# 공고 상세 수집
detail_rows = _api_call(detail_ep)
# 공고 상세 수집 — API에 모집공고일 윈도우 파라미터 전달
# 일부 엔드포인트는 파라미터 미지원일 수 있어 무시되지만 응답에 영향 없음
detail_rows = _api_call(detail_ep, params={"RCRIT_PBLANC_DE_FROM": date_from})
for raw in detail_rows:
try:
parsed = _parse_apt_detail(raw)
# 일정 정보가 하나도 없는 공고는 건너뜀
parsed["district"] = _extract_district(parsed)
# 일정 정보가 하나도 없는 공고는 건너뜀 (기존)
has_dates = any(parsed.get(f) for f in (
"receipt_start", "receipt_end", "spsply_start",
"gnrl_rank1_start", "winner_date", "contract_start",
))
if not has_dates:
continue
# status='완료'면 저장하지 않음 (자원 절감)
status = compute_status(
parsed.get("receipt_start", "") or "",
parsed.get("receipt_end", "") or "",
parsed.get("winner_date", "") or "",
)
if status == "완료":
skipped_completed += 1
continue
_, is_new = upsert_announcement(parsed)
total_count += 1
if is_new:
@@ -168,5 +204,5 @@ def collect_all() -> Dict[str, Any]:
except Exception as e:
logger.error("모델 upsert 실패 [%s]: %s", model_ep, e)
save_collect_log(new_count, total_count)
logger.info("수집 완료: new=%d, total=%d", new_count, total_count)
logger.info("수집 완료: new=%d, total=%d, skipped_completed=%d", new_count, total_count, skipped_completed)
return {"new_count": new_count, "total_count": total_count}

View File

@@ -1,5 +1,6 @@
# realestate-lab/app/db.py
import json
import os
import sqlite3
import logging
from typing import Dict, Any, List, Optional
@@ -7,7 +8,7 @@ from datetime import date
logger = logging.getLogger("realestate-lab")
DB_PATH = "/app/data/realestate.db"
DB_PATH = os.getenv("REALESTATE_DB_PATH", "/app/data/realestate.db")
def _conn():
@@ -63,6 +64,13 @@ def init_db():
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_status ON announcements(status);")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);")
# ── 마이그레이션: district 컬럼 + 인덱스 추가 ──
try:
conn.execute("SELECT district FROM announcements LIMIT 1")
except Exception:
conn.execute("ALTER TABLE announcements ADD COLUMN district TEXT")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_district ON announcements(district);")
# ── 마이그레이션: is_bookmarked 컬럼 추가 ──
try:
conn.execute("SELECT is_bookmarked FROM announcements LIMIT 1")
@@ -119,6 +127,17 @@ def init_db():
);
""")
# ── 마이그레이션: user_profile 신규 3컬럼 ──
for col, ddl in (
("preferred_districts", "ALTER TABLE user_profile ADD COLUMN preferred_districts TEXT NOT NULL DEFAULT '{}'"),
("min_match_score", "ALTER TABLE user_profile ADD COLUMN min_match_score INTEGER NOT NULL DEFAULT 70"),
("notify_enabled", "ALTER TABLE user_profile ADD COLUMN notify_enabled INTEGER NOT NULL DEFAULT 1"),
):
try:
conn.execute(f"SELECT {col} FROM user_profile LIMIT 1")
except Exception:
conn.execute(ddl)
# ── match_results ────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS match_results (
@@ -134,6 +153,12 @@ def init_db():
);
""")
# ── 마이그레이션: notified_at 컬럼 추가 ──
try:
conn.execute("SELECT notified_at FROM match_results LIMIT 1")
except Exception:
conn.execute("ALTER TABLE match_results ADD COLUMN notified_at TEXT")
# ── collect_log ──────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS collect_log (
@@ -169,6 +194,7 @@ def _ann_row_to_dict(r) -> Dict[str, Any]:
def upsert_announcement(data: Dict[str, Any]) -> tuple:
"""공고 upsert — house_manage_no + pblanc_no 기준. Returns (dict, is_new: bool)."""
data.setdefault("district", None) # 수동 등록 등에서 누락 시 안전 처리
status = compute_status(
data.get("receipt_start", ""),
data.get("receipt_end", ""),
@@ -183,7 +209,7 @@ def upsert_announcement(data: Dict[str, Any]) -> tuple:
conn.execute("""
INSERT INTO announcements (
house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd,
rent_secd, region_code, region_name, address, total_units,
rent_secd, region_code, region_name, district, address, total_units,
rcrit_date, receipt_start, receipt_end, spsply_start, spsply_end,
gnrl_rank1_start, gnrl_rank1_end, winner_date, contract_start,
contract_end, homepage_url, pblanc_url, constructor, developer,
@@ -191,7 +217,7 @@ def upsert_announcement(data: Dict[str, Any]) -> tuple:
status, source
) VALUES (
:house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd,
:rent_secd, :region_code, :region_name, :address, :total_units,
:rent_secd, :region_code, :region_name, :district, :address, :total_units,
:rcrit_date, :receipt_start, :receipt_end, :spsply_start, :spsply_end,
:gnrl_rank1_start, :gnrl_rank1_end, :winner_date, :contract_start,
:contract_end, :homepage_url, :pblanc_url, :constructor, :developer,
@@ -205,6 +231,7 @@ def upsert_announcement(data: Dict[str, Any]) -> tuple:
rent_secd=excluded.rent_secd,
region_code=excluded.region_code,
region_name=excluded.region_name,
district=excluded.district,
address=excluded.address,
total_units=excluded.total_units,
rcrit_date=excluded.rcrit_date,
@@ -343,7 +370,7 @@ def create_announcement(data: Dict[str, Any]) -> Dict[str, Any]:
ANNOUNCEMENT_COLUMNS = {
"house_nm", "house_secd", "house_dtl_secd", "rent_secd",
"region_code", "region_name", "address", "total_units",
"region_code", "region_name", "district", "address", "total_units",
"rcrit_date", "receipt_start", "receipt_end", "spsply_start", "spsply_end",
"gnrl_rank1_start", "gnrl_rank1_end", "winner_date",
"contract_start", "contract_end", "homepage_url", "pblanc_url",
@@ -408,6 +435,24 @@ def delete_closed_announcements() -> int:
return cur.rowcount
def delete_old_completed_announcements(grace_days: int = 90) -> int:
"""winner_date + grace_days 경과한 status='완료' 공고를 삭제.
winner_date가 NULL인 행은 안전하게 보존(수동 검토 대상).
match_results는 FK CASCADE로 자동 삭제. 삭제된 건수 반환.
"""
with _conn() as conn:
cur = conn.execute(
"""
DELETE FROM announcements
WHERE status = '완료'
AND winner_date IS NOT NULL
AND date(winner_date) < date('now', ?)
""",
(f"-{grace_days} days",),
)
return cur.rowcount
def update_all_statuses():
"""모든 진행중 공고의 status를 날짜 기반으로 재계산."""
with _conn() as conn:
@@ -538,10 +583,12 @@ def _profile_row_to_dict(r) -> Dict[str, Any]:
for c in r.keys():
val = r[c]
if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed",
"has_newborn", "is_first_home"):
"has_newborn", "is_first_home", "notify_enabled"):
d[c] = bool(val) if val is not None else None
elif c in ("preferred_regions", "preferred_types"):
d[c] = json.loads(val) if val else []
elif c == "preferred_districts":
d[c] = json.loads(val) if val else {}
else:
d[c] = val
return d
@@ -562,8 +609,9 @@ PROFILE_COLUMNS = {
"subscription_months", "subscription_amount", "family_members",
"has_dependents", "children_count", "is_newlywed", "marriage_months",
"has_newborn", "is_first_home", "income_level",
"preferred_regions", "preferred_types",
"preferred_regions", "preferred_types", "preferred_districts",
"min_area", "max_area", "max_price",
"min_match_score", "notify_enabled",
}
@@ -574,7 +622,7 @@ def upsert_profile(data: Dict[str, Any]) -> Dict[str, Any]:
continue
if isinstance(v, bool):
updates[k] = 1 if v else 0
elif isinstance(v, list):
elif isinstance(v, (list, dict)):
updates[k] = json.dumps(v)
else:
updates[k] = v
@@ -663,6 +711,42 @@ def mark_match_read(match_id: int) -> bool:
return cur.rowcount > 0
def get_unnotified_matches(min_score: int) -> List[Dict[str, Any]]:
"""notified_at IS NULL AND match_score >= min_score 인 매칭과 공고 정보 조인 반환."""
with _conn() as conn:
rows = conn.execute("""
SELECT m.id, m.announcement_id, m.match_score, m.match_reasons, m.eligible_types,
a.house_nm, a.region_name, a.district, a.address, a.status,
a.receipt_start, a.receipt_end, a.winner_date,
a.house_secd, a.is_speculative_area, a.is_price_cap, a.pblanc_url
FROM match_results m
JOIN announcements a ON a.id = m.announcement_id
WHERE m.notified_at IS NULL
AND m.match_score >= ?
ORDER BY m.match_score DESC
""", (min_score,)).fetchall()
items = []
for r in rows:
d = {c: r[c] for c in r.keys()}
d["match_reasons"] = json.loads(d["match_reasons"]) if d["match_reasons"] else []
d["eligible_types"] = json.loads(d["eligible_types"]) if d["eligible_types"] else []
items.append(d)
return items
def mark_matches_notified(match_ids: List[int]) -> None:
"""주어진 match_results IDs의 notified_at을 현재 시각으로 일괄 업데이트."""
if not match_ids:
return
placeholders = ",".join("?" for _ in match_ids)
with _conn() as conn:
conn.execute(
f"UPDATE match_results SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') "
f"WHERE id IN ({placeholders})",
tuple(match_ids),
)
# ── collect_log CRUD ─────────────────────────────────────────────────────────
def save_collect_log(new_count: int, total_count: int, error: str = None):

View File

@@ -12,9 +12,11 @@ from .db import (
update_all_statuses,
get_profile, upsert_profile, get_matches, mark_match_read,
get_last_collect_log, get_dashboard,
delete_old_completed_announcements,
)
from .collector import collect_all
from .matcher import run_matching
from .notifier import notify_new_matches
from .models import AnnouncementCreate, AnnouncementUpdate, ProfileUpdate
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
@@ -24,11 +26,15 @@ scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
def scheduled_collect():
"""매일 09:00 — 수집 + 매칭"""
"""매일 09:00 — 수집 + 정리 + 매칭 + 알림 push"""
logger.info("스케줄 수집 시작")
collect_all()
deleted = delete_old_completed_announcements(grace_days=90)
if deleted:
logger.info("정리: %d건 삭제", deleted)
run_matching()
logger.info("스케줄 수집 + 매칭 완료")
notify_new_matches()
logger.info("스케줄 수집 + 매칭 + 알림 완료")
def scheduled_status_update():
@@ -137,7 +143,9 @@ def _run_collect_and_match():
return
try:
collect_all()
delete_old_completed_announcements(grace_days=90)
run_matching()
notify_new_matches()
finally:
_collect_lock.release()

View File

@@ -6,6 +6,44 @@ from .db import _conn, _profile_row_to_dict
logger = logging.getLogger("realestate-lab")
TIER_POINTS = {"S": 25, "A": 20, "B": 15, "C": 10, "D": 5}
def _region_score(profile: Dict[str, Any], ann: Dict[str, Any]) -> tuple[int, list[str]]:
"""지역 점수 계산. 광역 10점 + 자치구 5티어 점수 0~25점.
preferred_districts에 자치구가 하나라도 등록되면 티어 가중 모드로 동작.
자치구가 하나도 등록되지 않으면(빈 dict 또는 모든 티어가 빈 리스트) 광역 매칭만으로 35점 풀 점수(기존 호환).
"""
region_name = ann.get("region_name") or ""
district = ann.get("district") or ""
preferred_regions = profile.get("preferred_regions") or []
preferred_districts = profile.get("preferred_districts") or {}
region_match = bool(region_name and any(r in region_name for r in preferred_regions))
if not region_match:
return 0, []
has_districts = any(preferred_districts.values())
if not has_districts:
return 35, [f"선호 지역 일치: {region_name}"]
score = 10
reasons = [f"광역 일치: {region_name}"]
for tier, tier_score in TIER_POINTS.items():
if district and district in (preferred_districts.get(tier) or []):
score += tier_score
reasons.append(f"자치구 {tier}티어: {district} (+{tier_score})")
break
return score, reasons
def _eligibility_score(eligible_types: List[str]) -> int:
"""자격 점수 0~25. 첫 자격 15점 + 추가 자격당 5점, 최대 +10."""
if not eligible_types:
return 0
return 15 + min((len(eligible_types) - 1) * 5, 10)
# house_secd → 주택유형 이름 매핑
_HOUSE_TYPE_MAP = {
"01": "APT",
@@ -60,18 +98,18 @@ def _compute_score(
ann: Dict[str, Any],
models: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""매칭 점수(0-100)와 사유를 계산한다."""
"""매칭 점수(0-100)와 사유를 계산한다.
배분: 지역 35 / 유형 10 / 면적 15 / 가격 15 / 자격 25.
"""
score = 0
reasons: List[str] = []
# 1. 지역 (30점)
preferred_regions = profile.get("preferred_regions") or []
region_name = ann.get("region_name") or ""
if region_name and any(r in region_name for r in preferred_regions):
score += 30
reasons.append(f"선호 지역 일치: {region_name}")
# 1. 지역 (35점) — 광역 + 자치구 5티어
region_score, region_reasons = _region_score(profile, ann)
score += region_score
reasons.extend(region_reasons)
# 2. 주택유형 (10점)
# 2. 주택유형 (10점) — binary
preferred_types = profile.get("preferred_types") or []
house_secd = ann.get("house_secd") or ""
type_name = _HOUSE_TYPE_MAP.get(house_secd, house_secd)
@@ -79,7 +117,7 @@ def _compute_score(
score += 10
reasons.append(f"선호 유형 일치: {type_name}")
# 3. 면적 (15점)
# 3. 면적 (15점) — binary, 범위 안 모델 1개라도 있으면 통과
min_area = profile.get("min_area")
max_area = profile.get("max_area")
if min_area is not None and max_area is not None and models:
@@ -90,7 +128,7 @@ def _compute_score(
reasons.append(f"희망 면적 범위 내 모델 존재 ({supply_area}㎡)")
break
# 4. 가격 (15점)
# 4. 가격 (15점) — binary, 예산 이하 모델 1개라도 있으면 통과
max_price = profile.get("max_price")
if max_price is not None and models:
for m in models:
@@ -100,11 +138,11 @@ def _compute_score(
reasons.append(f"예산 범위 내 모델 존재 (최고가 {top_amount:,}만원)")
break
# 5. 자격 (30점)
# 5. 자격 (25점) — 첫 자격 15 + 추가당 5
eligible_types = _check_eligible_types(profile, ann)
eligibility_score = min(len(eligible_types) * 10, 30)
if eligibility_score > 0:
score += eligibility_score
elig_score = _eligibility_score(eligible_types)
if elig_score > 0:
score += elig_score
reasons.append(f"자격 유형 {len(eligible_types)}개: {', '.join(eligible_types)}")
return {

View File

@@ -1,5 +1,5 @@
from typing import Optional, List
from pydantic import BaseModel
from typing import Optional, List, Dict
from pydantic import BaseModel, Field
class AnnouncementCreate(BaseModel):
@@ -80,3 +80,7 @@ class ProfileUpdate(BaseModel):
min_area: Optional[float] = None
max_area: Optional[float] = None
max_price: Optional[int] = None
# 신규
preferred_districts: Optional[Dict[str, List[str]]] = None
min_match_score: Optional[int] = Field(default=None, ge=0, le=100)
notify_enabled: Optional[bool] = None

View File

@@ -0,0 +1,46 @@
"""신규 매칭을 agent-office로 push하여 텔레그램 알림을 트리거한다."""
import os
import logging
import requests
from .db import get_profile, get_unnotified_matches, mark_matches_notified
logger = logging.getLogger("realestate-lab")
AGENT_OFFICE_URL = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000")
NOTIFY_TIMEOUT_SECONDS = int(os.getenv("REALESTATE_NOTIFY_TIMEOUT", "15"))
def notify_new_matches() -> dict:
"""프로필의 임계값을 통과한 미알림 매칭을 agent-office로 push한다.
응답이 200이고 sent_ids가 비어있지 않으면 해당 IDs의 notified_at을 마킹.
실패 시 마킹하지 않아 다음 사이클에서 재시도된다.
"""
profile = get_profile()
if not profile:
return {"sent": 0, "skipped": "no_profile"}
if not profile.get("notify_enabled"):
return {"sent": 0, "skipped": "notify_disabled"}
raw_threshold = profile.get("min_match_score")
threshold = 70 if raw_threshold is None else raw_threshold
matches = get_unnotified_matches(threshold)
if not matches:
return {"sent": 0}
url = f"{AGENT_OFFICE_URL}/api/agent-office/realestate/notify"
try:
resp = requests.post(url, json={"matches": matches}, timeout=NOTIFY_TIMEOUT_SECONDS)
resp.raise_for_status()
body = resp.json()
except requests.RequestException as e:
logger.error("agent-office push 실패: %s", e)
return {"sent": 0, "error": str(e)}
sent_ids = body.get("sent_ids") or []
if sent_ids:
mark_matches_notified(sent_ids)
logger.info("알림 송신: %d", len(sent_ids))
return body

View File

View File

@@ -0,0 +1,37 @@
import os
import sys
import tempfile
import pytest
# 테스트 임시 DB 경로를 import 전에 주입
# mkstemp으로 충돌 없는 고유 경로 확보 후 SQLite가 직접 생성하도록 즉시 삭제
_fd, _TMP_DB = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP_DB)
os.environ["REALESTATE_DB_PATH"] = _TMP_DB
# app 패키지 import 가능하게 PYTHONPATH 보정
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 테이블 목록 — init_db가 생성하는 모든 테이블
_USER_TABLES = (
"match_results", # FK CASCADE 대비 자식 테이블 먼저
"announcement_models",
"announcements",
"user_profile",
"collect_log",
)
@pytest.fixture(autouse=True)
def _clean_db():
"""각 테스트마다 DB 테이블 비우기. 파일 삭제 대신 TRUNCATE 패턴 사용해
Windows SQLite 파일 잠금 이슈를 회피한다."""
# deferred to ensure REALESTATE_DB_PATH is set before first module load
from app.db import _conn, init_db
init_db()
with _conn() as conn:
for table in _USER_TABLES:
conn.execute(f"DELETE FROM {table}")
yield

View File

@@ -0,0 +1,138 @@
"""Tests for collector.py — _extract_district unit tests + collect_all integration tests."""
from datetime import date, timedelta
# ── _extract_district unit tests ─────────────────────────────────────────────
def test_extract_district_seoul_full_address():
from app.collector import _extract_district
parsed = {"address": "서울특별시 강남구 도곡동 123-45", "region_name": None}
assert _extract_district(parsed) == "강남구"
def test_extract_district_seoul_short():
from app.collector import _extract_district
parsed = {"address": None, "region_name": "서울 송파구"}
assert _extract_district(parsed) == "송파구"
def test_extract_district_busan_returns_none():
from app.collector import _extract_district
parsed = {"address": "부산광역시 해운대구 우동", "region_name": None}
assert _extract_district(parsed) is None
def test_extract_district_empty_returns_none():
from app.collector import _extract_district
parsed = {"address": "", "region_name": ""}
assert _extract_district(parsed) is None
def test_extract_district_seoul_county():
from app.collector import _extract_district
parsed = {"address": "서울 강서구", "region_name": None}
assert _extract_district(parsed) == "강서구"
def test_extract_district_prefers_address_over_region():
from app.collector import _extract_district
parsed = {"address": "서울특별시 마포구 합정동", "region_name": "서울 강남구"}
assert _extract_district(parsed) == "마포구"
# ── collect_all integration tests ────────────────────────────────────────────
def test_collect_skips_completed_status(monkeypatch):
"""winner_date가 과거인 응답은 status='완료'로 판정되어 upsert되지 않는다."""
from app import collector
from app.db import _conn
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
past_winner = (date.today() - timedelta(days=10)).strftime("%Y-%m-%d")
fake_detail_rows = [{
"HOUSE_MANAGE_NO": "DONE-1",
"PBLANC_NO": "01",
"HOUSE_NM": "완료된단지",
"HSSPLY_ADRES": "서울특별시 강남구",
"RCEPT_BGNDE": "2026-01-01",
"RCEPT_ENDDE": "2026-01-05",
"PRZWNER_PRESNATN_DE": past_winner,
}]
def fake_call(endpoint, params=None):
if "Detail" in endpoint:
return fake_detail_rows
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
with _conn() as conn:
rows = conn.execute("SELECT * FROM announcements WHERE house_manage_no='DONE-1'").fetchall()
assert len(rows) == 0
def test_collect_stores_district_for_seoul_announcement(monkeypatch):
from app import collector
from app.db import _conn
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
future_start = (date.today() + timedelta(days=10)).strftime("%Y-%m-%d")
future_end = (date.today() + timedelta(days=15)).strftime("%Y-%m-%d")
future_winner = (date.today() + timedelta(days=30)).strftime("%Y-%m-%d")
fake_detail = [{
"HOUSE_MANAGE_NO": "SEOUL-1",
"PBLANC_NO": "01",
"HOUSE_NM": "강남단지",
"HSSPLY_ADRES": "서울특별시 강남구 도곡동 1",
"RCEPT_BGNDE": future_start,
"RCEPT_ENDDE": future_end,
"PRZWNER_PRESNATN_DE": future_winner,
}]
def fake_call(endpoint, params=None):
if "Detail" in endpoint:
return fake_detail
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
with _conn() as conn:
row = conn.execute("SELECT district, status FROM announcements WHERE house_manage_no='SEOUL-1'").fetchone()
assert row["district"] == "강남구"
assert row["status"] in ("청약예정", "청약중")
def test_extract_district_no_separator():
"""주소에 공백이 없어도 자치구를 추출해야 한다."""
from app.collector import _extract_district
parsed = {"address": "서울특별시강남구도곡동", "region_name": None}
assert _extract_district(parsed) == "강남구"
def test_collect_passes_date_window_param(monkeypatch):
from app import collector
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
captured_params = []
def fake_call(endpoint, params=None):
captured_params.append(params or {})
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
expected_from = (date.today() - timedelta(days=30)).strftime("%Y%m%d")
detail_calls = [p for p in captured_params if "RCRIT_PBLANC_DE_FROM" in p]
assert detail_calls, "detail 엔드포인트 호출에 윈도우 파라미터가 없음"
assert detail_calls[0]["RCRIT_PBLANC_DE_FROM"] == expected_from

View File

@@ -0,0 +1,11 @@
def test_init_db_creates_tables():
from app.db import _conn
with _conn() as conn:
tables = {row[0] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
)}
assert "announcements" in tables
assert "announcement_models" in tables
assert "user_profile" in tables
assert "match_results" in tables
assert "collect_log" in tables

View File

@@ -0,0 +1,100 @@
import json
from datetime import date, timedelta
from app.db import _conn
def _seed_announcement(house_nm, status, winner_date=None, hmno="HM1", pno="P1"):
with _conn() as conn:
conn.execute("""
INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, winner_date, source)
VALUES (?, ?, ?, ?, ?, 'manual')
""", (hmno, pno, house_nm, status, winner_date))
return conn.execute("SELECT id FROM announcements WHERE house_manage_no=?", (hmno,)).fetchone()["id"]
def test_delete_old_completed_removes_expired():
from app.db import delete_old_completed_announcements
old = (date.today() - timedelta(days=100)).isoformat()
_seed_announcement("OldA", "완료", old, hmno="OLD", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 1
def test_delete_old_completed_keeps_recent():
from app.db import delete_old_completed_announcements
recent = (date.today() - timedelta(days=30)).isoformat()
_seed_announcement("RecentA", "완료", recent, hmno="REC", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0
def test_delete_old_completed_keeps_active():
from app.db import delete_old_completed_announcements
old = (date.today() - timedelta(days=200)).isoformat()
_seed_announcement("ActiveA", "청약중", old, hmno="ACT", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0
def test_delete_old_completed_keeps_null_winner_date():
from app.db import delete_old_completed_announcements
_seed_announcement("NullA", "완료", None, hmno="NULL", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0 # winner_date NULL은 안전 보존
def test_get_unnotified_matches_filters_by_score_and_null():
from app.db import get_unnotified_matches
aid = _seed_announcement("MatchA", "청약중", hmno="MA", pno="1")
with _conn() as conn:
# 임계값 미만
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 50, '[]', '[]', 1)
""", (aid,))
# 임계값 통과 — 미알림
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, 1, 80, '[]', '[]', 1)
""", (aid,))
# 임계값 통과 — 이미 알림됨
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new, notified_at)
VALUES (?, 2, 90, '[]', '[]', 1, '2026-04-01T00:00:00.000Z')
""", (aid,))
matches = get_unnotified_matches(min_score=70)
assert len(matches) == 1
assert matches[0]["match_score"] == 80
assert matches[0]["house_nm"] == "MatchA"
def test_mark_matches_notified_sets_timestamp():
from app.db import mark_matches_notified
aid = _seed_announcement("NotifyA", "청약중", hmno="NT", pno="1")
with _conn() as conn:
cur = conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 80, '[]', '[]', 1)
""", (aid,))
match_id = cur.lastrowid
mark_matches_notified([match_id])
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id = ?", (match_id,)).fetchone()
assert row["notified_at"] is not None
def test_get_unnotified_matches_includes_status():
from app.db import get_unnotified_matches
aid = _seed_announcement("StatusA", "청약중", hmno="ST", pno="1")
with _conn() as conn:
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 80, '[]', '[]', 1)
""", (aid,))
matches = get_unnotified_matches(min_score=70)
status_matches = [m for m in matches if m["house_nm"] == "StatusA"]
assert len(status_matches) == 1
assert status_matches[0]["status"] == "청약중"

View File

@@ -0,0 +1,39 @@
def test_user_profile_has_new_columns():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(user_profile)")}
assert "preferred_districts" in cols
assert "min_match_score" in cols
assert "notify_enabled" in cols
def test_announcements_has_district():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(announcements)")}
assert "district" in cols
def test_match_results_has_notified_at():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(match_results)")}
assert "notified_at" in cols
def test_district_index_exists():
from app.db import _conn
with _conn() as conn:
idx = {row["name"] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='index'"
)}
assert "idx_ann_district" in idx
def test_profile_defaults():
from app.db import upsert_profile, get_profile
upsert_profile({"name": "테스트"})
profile = get_profile()
assert profile["preferred_districts"] == {}
assert profile["min_match_score"] == 70
assert profile["notify_enabled"] is True

View File

@@ -0,0 +1,107 @@
def test_region_score_no_districts_full_when_region_match():
"""자치구 미설정: 광역 일치 시 35점."""
from app.matcher import _region_score
profile = {"preferred_regions": ["서울"], "preferred_districts": {}}
ann = {"region_name": "서울특별시", "district": None}
score, _ = _region_score(profile, ann)
assert score == 35
def test_region_score_no_districts_zero_when_region_mismatch():
from app.matcher import _region_score
profile = {"preferred_regions": ["서울"], "preferred_districts": {}}
ann = {"region_name": "부산광역시", "district": None}
score, _ = _region_score(profile, ann)
assert score == 0
def test_region_score_s_tier_district():
"""광역 매칭 + S티어 자치구: 10 + 25 = 35."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "강남구"}
score, _ = _region_score(profile, ann)
assert score == 35
def test_region_score_a_tier_district():
"""광역 매칭 + A티어 자치구: 10 + 20 = 30."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": ["송파구"], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "송파구"}
score, _ = _region_score(profile, ann)
assert score == 30
def test_region_score_d_tier_district():
"""광역 매칭 + D티어 자치구: 10 + 5 = 15."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": [], "B": [], "C": [], "D": ["도봉구"]},
}
ann = {"region_name": "서울특별시", "district": "도봉구"}
score, _ = _region_score(profile, ann)
assert score == 15
def test_region_score_district_set_but_not_listed():
"""광역 매칭 + 자치구 5티어 어디에도 없음: 10점만."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "강서구"}
score, _ = _region_score(profile, ann)
assert score == 10
def test_eligibility_score_zero_when_empty():
from app.matcher import _eligibility_score
assert _eligibility_score([]) == 0
def test_eligibility_score_one_type_returns_15():
from app.matcher import _eligibility_score
assert _eligibility_score(["일반1순위"]) == 15
def test_eligibility_score_two_types_returns_20():
from app.matcher import _eligibility_score
assert _eligibility_score(["일반1순위", "특별-신혼부부"]) == 20
def test_eligibility_score_caps_at_25():
from app.matcher import _eligibility_score
assert _eligibility_score(["a", "b", "c", "d", "e"]) == 25
def test_region_score_b_tier_district():
"""광역 매칭 + B티어 자치구: 10 + 15 = 25."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": [], "B": ["관악구"], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "관악구"}
score, _ = _region_score(profile, ann)
assert score == 25
def test_region_score_c_tier_district():
"""광역 매칭 + C티어 자치구: 10 + 10 = 20."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": [], "B": [], "C": ["은평구"], "D": []},
}
ann = {"region_name": "서울특별시", "district": "은평구"}
score, _ = _region_score(profile, ann)
assert score == 20

View File

@@ -0,0 +1,124 @@
from unittest.mock import patch, MagicMock
def _seed_profile_and_match(score, notify_enabled=True, threshold=70):
from app.db import _conn, upsert_profile
upsert_profile({
"name": "u",
"notify_enabled": notify_enabled,
"min_match_score": threshold,
})
with _conn() as conn:
conn.execute("""
INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, source)
VALUES ('NF1', '01', '단지', '청약중', 'manual')
""")
ann_id = conn.execute("SELECT id FROM announcements WHERE house_manage_no='NF1'").fetchone()["id"]
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, ?, '[]', '[]', 1)
""", (ann_id, score))
match_id = conn.execute("SELECT id FROM match_results WHERE announcement_id=?", (ann_id,)).fetchone()["id"]
return match_id
def test_notify_skips_when_disabled():
from app import notifier
_seed_profile_and_match(score=80, notify_enabled=False)
with patch.object(notifier, "requests") as r:
result = notifier.notify_new_matches()
assert r.post.call_count == 0
assert result["sent"] == 0
assert result.get("skipped") == "notify_disabled"
def test_notify_filters_below_threshold():
from app import notifier
_seed_profile_and_match(score=60, threshold=70)
with patch.object(notifier, "requests") as r:
result = notifier.notify_new_matches()
assert r.post.call_count == 0
assert result["sent"] == 0
def test_notify_pushes_and_marks_notified():
from app import notifier
from app.db import _conn
match_id = _seed_profile_and_match(score=80, threshold=70)
fake_resp = MagicMock()
fake_resp.json.return_value = {"sent": 1, "sent_ids": [match_id]}
fake_resp.raise_for_status.return_value = None
with patch.object(notifier.requests, "post", return_value=fake_resp) as post:
result = notifier.notify_new_matches()
assert post.call_count == 1
args, kwargs = post.call_args
assert "/api/agent-office/realestate/notify" in args[0]
assert kwargs["json"]["matches"][0]["id"] == match_id
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is not None
assert result["sent"] == 1
def test_notify_does_not_mark_on_failure():
from app import notifier
from app.db import _conn
import requests as real_requests
match_id = _seed_profile_and_match(score=80, threshold=70)
def boom(*a, **k):
raise real_requests.RequestException("agent-office down")
with patch.object(notifier.requests, "post", side_effect=boom):
result = notifier.notify_new_matches()
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is None
assert result["sent"] == 0
assert "error" in result
def test_notify_does_not_mark_when_sent_ids_empty():
"""agent-office가 200 OK + sent_ids=[]을 반환하면 마킹하지 않고 다음 사이클 재시도 가능."""
from app import notifier
from app.db import _conn
match_id = _seed_profile_and_match(score=80, threshold=70)
fake_resp = MagicMock()
fake_resp.json.return_value = {"sent": 0, "sent_ids": []}
fake_resp.raise_for_status.return_value = None
with patch.object(notifier.requests, "post", return_value=fake_resp):
result = notifier.notify_new_matches()
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is None
assert result["sent"] == 0
def test_notify_threshold_zero_pushes_all_matches():
"""min_match_score=0이면 모든 양수 점수 매치를 알림."""
from app import notifier
from app.db import _conn
match_id = _seed_profile_and_match(score=10, threshold=0)
fake_resp = MagicMock()
fake_resp.json.return_value = {"sent": 1, "sent_ids": [match_id]}
fake_resp.raise_for_status.return_value = None
with patch.object(notifier.requests, "post", return_value=fake_resp) as post:
result = notifier.notify_new_matches()
assert post.call_count == 1
assert result["sent"] == 1

View File

@@ -0,0 +1,38 @@
from fastapi.testclient import TestClient
def test_profile_update_accepts_new_fields():
from app.main import app
body = {
"name": "테스트",
"preferred_districts": {
"S": ["강남구", "서초구"],
"A": ["송파구"],
"B": [],
"C": [],
"D": [],
},
"min_match_score": 75,
"notify_enabled": True,
}
with TestClient(app) as client:
resp = client.put("/api/realestate/profile", json=body)
assert resp.status_code == 200
data = resp.json()
assert data["preferred_districts"]["S"] == ["강남구", "서초구"]
assert data["min_match_score"] == 75
assert data["notify_enabled"] is True
def test_profile_get_returns_defaults_for_new_fields():
from app.main import app
from app.db import upsert_profile
upsert_profile({"name": "기본"})
with TestClient(app) as client:
resp = client.get("/api/realestate/profile")
assert resp.status_code == 200
data = resp.json()
assert data["preferred_districts"] == {}
assert data["min_match_score"] == 70
assert data["notify_enabled"] is True

View File

@@ -0,0 +1,30 @@
from unittest.mock import patch
def test_scheduled_collect_calls_cleanup_and_notifier():
from app import main as app_main
calls = []
def fake_collect():
calls.append("collect")
return {"new_count": 0, "total_count": 0}
def fake_cleanup(grace_days=90):
calls.append(("cleanup", grace_days))
return 0
def fake_match():
calls.append("match")
def fake_notify():
calls.append("notify")
return {"sent": 0}
with patch.object(app_main, "collect_all", side_effect=fake_collect), \
patch.object(app_main, "delete_old_completed_announcements", side_effect=fake_cleanup), \
patch.object(app_main, "run_matching", side_effect=fake_match), \
patch.object(app_main, "notify_new_matches", side_effect=fake_notify):
app_main.scheduled_collect()
assert calls == ["collect", ("cleanup", 90), "match", "notify"]