22 Commits

Author SHA1 Message Date
a508a5633a Merge branch 'feat/realestate-targeting'
청약 서비스 타겟팅 고도화 — 12 task TDD 구현
- realestate-lab: 자치구 5티어 가중치 매칭, 30일 윈도우 수집, 90일 grace 자동 정리
- agent-office: 신규 매칭 즉시 텔레그램 푸시 + 인라인 키보드 (북마크/공고 보기)
- 단일 SoT: preferred_districts/min_match_score/notify_enabled 프로필 필드
- 데일리 리포트 cron 폐기, 09:00 수집 직후 push로 통합
- 62 tests passing (realestate-lab 44 + agent-office 18)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 09:55:23 +09:00
1d6c1b4329 fix(agent-office): bookmark field name + service_proxy contract + mktemp
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 09:09:05 +09:00
7b3ddd1b19 chore(deploy): wire realestate↔agent-office URLs for push notify
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 09:03:43 +09:00
32e021cfc7 feat(agent-office): drop daily realestate cron + bookmark callback routing
- scheduler.py: remove _run_realestate_schedule() and its 09:15 cron job
- service_proxy.py: add realestate_bookmark_toggle() helper (PATCH bookmark endpoint)
- webhook.py: add _handle_realestate_bookmark() dispatcher before DB-lookup path;
  realestate_bookmark_{id} callbacks are handled inline without a DB entry
- tests/test_realestate_callback.py: 4 new unit tests covering happy path,
  invalid id, proxy error, and regression that approve/reject still uses DB path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 09:01:38 +09:00
3749d79168 feat(agent-office): realestate on_new_matches + /notify endpoint
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:57:52 +09:00
0de2d3cf93 feat(agent-office-telegram): realestate match formatter + keyboard
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:54:34 +09:00
55c37df703 feat(realestate): wire cleanup + notifier into scheduled flow
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:52:00 +09:00
c2939459e7 fix(realestate-notifier): preserve threshold=0 and test sent_ids retry path
- Replace `or 70` fallback with explicit None-check so that
  min_match_score=0 ("notify all matches") is no longer silently
  coerced to 70
- Add test: 200 OK + sent_ids=[] must not mark matches notified
- Add test: threshold=0 correctly pushes low-score matches

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:50:29 +09:00
7aa7ccc6d5 feat(realestate-notifier): push unnotified matches to agent-office
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:46:59 +09:00
d46d2cb30b test(realestate): truncate tables for isolation instead of file delete
Replace os.remove() in conftest autouse fixture with per-table DELETE
to avoid Windows SQLite file-lock PermissionError being swallowed
silently and leaking state across tests. Remove the inline DELETE
workaround from test_profile_api.py that was coupling the test to
internal DB knowledge.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:45:03 +09:00
20b51f706c feat(realestate-profile): expose 5tier districts + min_match_score + notify_enabled
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:40:51 +09:00
eb04b954a5 refactor(realestate-matcher): integer tier points + clearer legacy path docstring
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:38:09 +09:00
a75ff069df feat(realestate-matcher): 5-tier district weighting + eligibility curve
지역 점수를 35점(광역 10 + 자치구 S/A/B/C/D 티어 0~25)으로 재배분하고,
자격 점수를 25점(첫 자격 15 + 추가당 5, 최대 +10) 곡선으로 변경.
총점 구성: 지역 35 + 유형 10 + 면적 15 + 가격 15 + 자격 25 = 100.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:34:19 +09:00
d39d9f26ac fix(realestate-collector): district regex tolerates missing separator 2026-04-28 08:31:55 +09:00
9dd517e82a feat(realestate-collector): 30-day window + district extraction + completed skip
- Add _extract_district() helper with DISTRICT_PATTERN regex (서울 only)
- collect_all() now passes RCRIT_PBLANC_DE_FROM param (30-day window) to all detail endpoints
- collect_all() skips announcements where compute_status() returns '완료'
- collect_all() stamps district on each parsed announcement before upsert
- upsert_announcement(): add district to INSERT/VALUES/ON CONFLICT UPDATE; data.setdefault('district', None)
- ANNOUNCEMENT_COLUMNS: add 'district' (closes deferred gap from Task 2 review)
- 9 new tests in realestate-lab/tests/test_collector.py (6 unit + 3 integration)
- Full suite: 22 passed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:28:10 +09:00
496e3a6a73 refactor(realestate-db): tuple param + status column in unnotified query
- mark_matches_notified: pass tuple(match_ids) to conn.execute for consistency
- get_unnotified_matches: add a.status to SELECT so notifier/formatter can skip stale '완료' matches
- add regression test: test_get_unnotified_matches_includes_status

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:24:03 +09:00
d6547edf0d feat(realestate-db): add notify queue + 90-day grace cleanup
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:19:03 +09:00
5749d4d35d feat(realestate-db): add district / notify / 5tier columns with migration
- announcements.district + idx_ann_district 인덱스
- user_profile: preferred_districts(JSON obj), min_match_score(int 70), notify_enabled(bool)
- match_results: notified_at(TEXT)
- _profile_row_to_dict: notify_enabled bool화, preferred_districts dict 역직렬화
- PROFILE_COLUMNS 확장 (3 신규 필드)
- upsert_profile: list|dict 모두 JSON 직렬화

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:13:44 +09:00
2477342272 test(realestate): fix mktemp deprecation + narrow exception in conftest
Replace deprecated mktemp with mkstemp to eliminate TOCTOU race, narrow
OSError catches to PermissionError (Windows SQLite lock intent only), and
add a deferred-import comment for clarity.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:11:19 +09:00
62a9009fea test(realestate): add pytest harness with isolated SQLite fixture
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:07:59 +09:00
0fadc774d8 docs(plan): 청약 타겟팅 고도화 구현 계획
12 task TDD 분할:
- realestate-lab: 테스트 셋업 → 스키마 마이그 → 신규 함수 → collector/matcher → profile API → notifier → 흐름 통합
- agent-office: 텔레그램 fmt → on_new_matches + endpoint → cron 폐기 + 콜백 라우팅
- 마지막: docker-compose 환경변수 + 회귀 검증

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 03:45:10 +09:00
eef2e3967e docs(spec): 청약 타겟팅 고도화 설계
- 수집 사전 좁힘(30일 윈도우) + 완료 공고 90일 grace 자동 정리
- 자치구 5티어 가중치 매칭 (S/A/B/C/D)
- realestate-lab → agent-office push 기반 즉시 텔레그램 알림
- 데일리 리포트 cron 폐기, 임계값 통과 신규 매칭만 푸시

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 03:36:38 +09:00
29 changed files with 4054 additions and 104 deletions

View File

@@ -82,3 +82,9 @@ SUNO_API_KEY=
# CORS 허용 도메인 (콤마 구분) # CORS 허용 도메인 (콤마 구분)
CORS_ALLOW_ORIGINS=https://gahusb.synology.me,http://localhost:3007,http://localhost:8080 CORS_ALLOW_ORIGINS=https://gahusb.synology.me,http://localhost:3007,http://localhost:8080
# [REALESTATE LAB — agent-office push notify]
AGENT_OFFICE_URL=http://agent-office:8000
REALESTATE_LAB_URL=http://realestate-lab:8000
REALESTATE_DASHBOARD_URL=http://localhost:8080/realestate
REALESTATE_NOTIFY_TIMEOUT=15

View File

@@ -1,89 +1,68 @@
import asyncio
from .base import BaseAgent from .base import BaseAgent
from ..db import create_task, update_task_status, add_log from ..db import create_task, update_task_status, add_log
from .. import service_proxy from .. import service_proxy
from .. import telegram_bot from ..telegram import messaging
from ..telegram.realestate_message import format_realestate_matches, build_match_keyboard
class RealestateAgent(BaseAgent): class RealestateAgent(BaseAgent):
"""부동산 청약 에이전트. """부동산 청약 에이전트.
매일 09:15 자동 실행: realestate-lab의 수집을 트리거하고 realestate-lab이 신규 매칭 발견 시 /realestate/notify로 push해 트리거됨.
신규 매칭 결과를 텔레그램으로 푸시 (승인 없는 리포트형). on_new_matches가 메인 진입점. on_schedule은 사용하지 않음(cron 폐기).
""" """
agent_id = "realestate" agent_id = "realestate"
display_name = "청약 애널리스트" display_name = "청약 애널리스트"
async def on_schedule(self) -> None: async def on_new_matches(self, matches: list[dict]) -> dict:
if self.state not in ("idle", "break"): """신규 매칭 N건을 텔레그램 1통으로 푸시.
return 성공 시 sent_ids 반환 → realestate-lab이 notified_at 마킹.
실패 시 sent=0, sent_ids=[] 반환 → 다음 사이클 재시도.
"""
if not matches:
return {"sent": 0, "sent_ids": []}
task_id = create_task(self.agent_id, "daily_match_report", {}) task_id = create_task(self.agent_id, "notify_matches", {"count": len(matches)})
await self.transition("working", "청약 공고 수집 중", task_id)
try: try:
collect = await service_proxy.realestate_collect() text = format_realestate_matches(matches)
new_count = collect.get("new_count", 0) or 0 keyboard = build_match_keyboard(matches)
await self.transition("reporting", f"매칭 {len(matches)}건 알림", task_id)
await self.transition("working", "신규 매칭 조회 중", task_id) tg = await messaging.send_raw(text, reply_markup=keyboard)
matches = await service_proxy.realestate_matches(limit=20) if not tg.get("ok"):
dashboard = await service_proxy.realestate_dashboard() update_task_status(task_id, "failed", {"error": tg.get("description")})
await self.transition("idle", "알림 실패")
await self.transition("reporting", "리포트 전송 중", task_id) return {"sent": 0, "sent_ids": [], "error": tg.get("description")}
if not matches:
body = (
f"수집된 신규 공고: {new_count}\n"
f"진행 중 공고: {dashboard.get('active_count', 0)}\n"
f"신규 매칭: 없음"
)
else:
lines = [
f"📌 수집 {new_count}건 / 매칭 {len(matches)}",
"",
]
for m in matches[:5]:
title = m.get("title") or m.get("announcement_title") or "(제목 없음)"
region = m.get("region") or ""
score = m.get("match_score") or m.get("score") or ""
lines.append(f"• [{region}] {title} (매칭 {score})")
if len(matches) > 5:
lines.append(f"… 외 {len(matches) - 5}")
body = "\n".join(lines)
tg = await telegram_bot.send_task_result(
self.agent_id,
"🏢 [청약 에이전트] 오늘의 매칭 리포트",
body,
)
# 확인한 매칭 read 처리
for m in matches[:5]:
mid = m.get("id")
if mid:
try:
await service_proxy.realestate_mark_read(int(mid))
except Exception:
pass
sent_ids = [m["id"] for m in matches if "id" in m]
update_task_status(task_id, "succeeded", { update_task_status(task_id, "succeeded", {
"new_count": new_count, "sent": len(matches),
"match_count": len(matches),
"telegram_sent": tg.get("ok", False),
"telegram_message_id": tg.get("message_id"), "telegram_message_id": tg.get("message_id"),
}) })
await self.transition("idle", f"매칭 {len(matches)}") await self.transition("idle", f"매칭 {len(matches)} 알림 완료")
return {
"sent": len(matches),
"sent_ids": sent_ids,
"message_id": tg.get("message_id"),
}
except Exception as e: except Exception as e:
add_log(self.agent_id, f"Realestate report failed: {e}", "error", task_id) add_log(self.agent_id, f"on_new_matches failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {"error": str(e)}) update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}") await self.transition("idle", f"오류: {e}")
return {"sent": 0, "sent_ids": [], "error": str(e)}
async def on_command(self, command: str, params: dict) -> dict: async def on_command(self, command: str, params: dict) -> dict:
if command == "fetch_matches": if command == "fetch_matches":
await self.on_schedule() try:
return {"ok": True, "message": "매칭 리포트 시작"} matches = await service_proxy.realestate_matches(limit=20)
if not matches:
return {"ok": True, "message": "매칭 없음"}
result = await self.on_new_matches(matches)
return {"ok": True, "result": result}
except Exception as e:
return {"ok": False, "message": str(e)}
if command == "dashboard": if command == "dashboard":
try: try:

View File

@@ -180,3 +180,22 @@ def conversation_stats(days: int = 7):
@app.get("/api/agent-office/activity") @app.get("/api/agent-office/activity")
def activity_feed(limit: int = 50, offset: int = 0): def activity_feed(limit: int = 50, offset: int = 0):
return get_activity_feed(limit, offset) return get_activity_feed(limit, offset)
# --- Realestate Agent Push Endpoint ---
from pydantic import BaseModel
from typing import List, Dict, Any
class RealestateNotifyBody(BaseModel):
matches: List[Dict[str, Any]]
@app.post("/api/agent-office/realestate/notify")
async def realestate_notify(body: RealestateNotifyBody):
agent = get_agent("realestate")
if agent is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="RealestateAgent not initialized")
return await agent.on_new_matches(body.matches)

View File

@@ -14,11 +14,6 @@ async def _run_stock_schedule():
if agent: if agent:
await agent.on_schedule() await agent.on_schedule()
async def _run_realestate_schedule():
agent = AGENT_REGISTRY.get("realestate")
if agent:
await agent.on_schedule()
async def _run_blog_schedule(): async def _run_blog_schedule():
agent = AGENT_REGISTRY.get("blog") agent = AGENT_REGISTRY.get("blog")
if agent: if agent:
@@ -31,7 +26,6 @@ async def _run_lotto_schedule():
def init_scheduler(): def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news") scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
scheduler.add_job(_run_realestate_schedule, "cron", hour=9, minute=15, id="realestate_report")
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline") scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate") scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate")
scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check") scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check")

View File

@@ -112,13 +112,15 @@ async def realestate_collect() -> Dict[str, Any]:
async def realestate_matches(limit: int = 20) -> List[Dict[str, Any]]: async def realestate_matches(limit: int = 20) -> List[Dict[str, Any]]:
resp = await _client.get( """realestate-lab의 GET /api/realestate/matches 호출."""
f"{REALESTATE_LAB_URL}/api/realestate/matches", async with httpx.AsyncClient(timeout=10) as client:
params={"limit": limit, "unread_only": True}, resp = await client.get(
) f"{REALESTATE_LAB_URL}/api/realestate/matches",
resp.raise_for_status() params={"size": limit},
data = resp.json() )
return data if isinstance(data, list) else data.get("matches", []) resp.raise_for_status()
data = resp.json()
return data.get("items", [])
async def realestate_dashboard() -> Dict[str, Any]: async def realestate_dashboard() -> Dict[str, Any]:
@@ -133,6 +135,16 @@ async def realestate_mark_read(match_id: int) -> Dict[str, Any]:
return resp.json() return resp.json()
async def realestate_bookmark_toggle(announcement_id: int) -> Dict[str, Any]:
"""realestate-lab의 PATCH /api/realestate/announcements/{id}/bookmark 호출."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.patch(
f"{REALESTATE_LAB_URL}/api/realestate/announcements/{announcement_id}/bookmark"
)
resp.raise_for_status()
return resp.json()
# --- lotto-backend --- # --- lotto-backend ---
async def lotto_candidates(n: int = 20) -> Dict[str, Any]: async def lotto_candidates(n: int = 20) -> Dict[str, Any]:

View File

@@ -0,0 +1,93 @@
"""청약 매칭 알림 — 텔레그램 메시지 포맷터 + 인라인 키보드 빌더."""
import os
from html import escape as _h
from typing import Optional
DASHBOARD_URL = os.getenv("REALESTATE_DASHBOARD_URL", "https://example.com/realestate")
def _format_one_compact(m: dict) -> str:
score = m.get("match_score", 0)
name = _h(m.get("house_nm") or "(제목 없음)")
district = m.get("district") or ""
region = m.get("region_name") or ""
where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상"
rstart = m.get("receipt_start") or ""
rend = m.get("receipt_end") or ""
return (
f"{score}점 — <b>{name}</b>\n"
f"📍 {_h(where)} 📅 {_h(rstart)} ~ {_h(rend)}"
)
def _format_one_full(m: dict) -> str:
score = m.get("match_score", 0)
name = _h(m.get("house_nm") or "(제목 없음)")
district = m.get("district") or ""
region = m.get("region_name") or ""
flags = []
if m.get("is_speculative_area") == "Y":
flags.append("투기과열")
if m.get("is_price_cap") == "Y":
flags.append("분양가상한제")
flag_str = f" ({', '.join(flags)})" if flags else ""
rstart = m.get("receipt_start") or ""
rend = m.get("receipt_end") or ""
elig = m.get("eligible_types") or []
reasons = m.get("match_reasons") or []
where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상"
lines = [
f"{score}점 — <b>{name}</b>",
f"📍 {_h(where)}{_h(flag_str)}",
f"📅 청약 {_h(rstart)} ~ {_h(rend)}",
]
if elig:
lines.append(f"✓ 자격: {_h(', '.join(elig))}")
if reasons:
lines.append(f"💡 {_h(' / '.join(reasons[:4]))}")
return "\n".join(lines)
def format_realestate_matches(matches: list[dict]) -> str:
"""매칭 목록을 텔레그램 HTML 메시지로 변환.
1~2건은 풀 카드, 3건 이상은 묶음 카드(상위 5건).
"""
if not matches:
return "🏢 새 청약 매칭이 없습니다."
if len(matches) <= 2:
body = "\n\n".join(_format_one_full(m) for m in matches)
return f"🏢 <b>새 청약 매칭 {len(matches)}건</b>\n━━━━━━━━━━\n\n{body}"
top = matches[:5]
body = "\n\n".join(_format_one_compact(m) for m in top)
suffix = f"\n\n…외 {len(matches) - 5}" if len(matches) > 5 else ""
return f"🏢 <b>새 청약 매칭 {len(matches)}건</b>\n━━━━━━━━━━\n\n{body}{suffix}"
def build_match_keyboard(matches: list[dict]) -> Optional[dict]:
"""1~2건: 매치별 [북마크][공고 보기] 행. 3건 이상: [전체 보기] 단일 행."""
if not matches:
return None
if len(matches) <= 2:
rows = []
for m in matches:
buttons = [{
"text": "🔖 북마크",
"callback_data": f"realestate_bookmark_{m['id']}",
}]
url = m.get("pblanc_url")
if url:
buttons.append({"text": "📄 공고 보기", "url": url})
rows.append(buttons)
return {"inline_keyboard": rows}
return {
"inline_keyboard": [[
{"text": "📋 전체 보기", "url": DASHBOARD_URL},
]],
}

View File

@@ -30,8 +30,13 @@ async def handle_webhook(data: dict, agent_dispatcher=None) -> Optional[dict]:
async def _handle_callback(callback_query: dict) -> Optional[dict]: async def _handle_callback(callback_query: dict) -> Optional[dict]:
"""기존 승인/거절 콜백 처리 로직.""" """승인/거절 및 realestate 북마크 콜백 처리."""
callback_id = callback_query.get("data", "") callback_id = callback_query.get("data", "")
# realestate 북마크 토글 콜백 — DB 조회 없이 직접 처리
if callback_id.startswith("realestate_bookmark_"):
return await _handle_realestate_bookmark(callback_query, callback_id)
cb = get_telegram_callback(callback_id) cb = get_telegram_callback(callback_id)
if not cb: if not cb:
return None return None
@@ -60,6 +65,38 @@ async def _handle_callback(callback_query: dict) -> Optional[dict]:
} }
async def _handle_realestate_bookmark(callback_query: dict, callback_id: str) -> dict:
"""realestate_bookmark_{announcement_id} 콜백 처리."""
from .. import service_proxy
from .messaging import send_raw
# answerCallbackQuery 먼저 — 텔레그램 로딩 스피너 해제
await api_call(
"answerCallbackQuery",
{"callback_query_id": callback_query["id"], "text": "처리 중..."},
)
try:
ann_id = int(callback_id.removeprefix("realestate_bookmark_"))
except ValueError:
await send_raw("⚠️ 잘못된 북마크 콜백 데이터")
return {"ok": False, "error": "invalid_callback_data"}
try:
result = await service_proxy.realestate_bookmark_toggle(ann_id)
is_on = result.get("is_bookmarked")
if is_on == 1:
await send_raw(f"🔖 북마크 추가 완료 (#{ann_id})")
elif is_on == 0:
await send_raw(f"🔖 북마크 해제 완료 (#{ann_id})")
else:
await send_raw(f"🔖 북마크 토글 완료 (#{ann_id})")
return {"ok": True, "announcement_id": ann_id}
except Exception as e:
await send_raw(f"⚠️ 북마크 처리 실패: {e}")
return {"ok": False, "error": str(e)}
async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]: async def _handle_message(message: dict, agent_dispatcher) -> Optional[dict]:
"""슬래시 명령 메시지 처리.""" """슬래시 명령 메시지 처리."""
from .router import parse_command, resolve_agent_command, HELP_TEXT from .router import parse_command, resolve_agent_command, HELP_TEXT

View File

@@ -0,0 +1,99 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
@pytest.fixture(autouse=True)
def _init_db():
import gc
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
from app.db import init_db
init_db()
yield
gc.collect()
def test_on_new_matches_returns_empty_when_no_matches():
from app.agents.realestate import RealestateAgent
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches([]))
assert result == {"sent": 0, "sent_ids": []}
def test_on_new_matches_sends_telegram_and_returns_ids():
from app.agents.realestate import RealestateAgent
from app.telegram import messaging
matches = [{
"id": 7, "match_score": 80, "house_nm": "단지A",
"region_name": "서울특별시", "district": "강남구",
"receipt_start": "2026-05-01", "receipt_end": "2026-05-05",
"match_reasons": [], "eligible_types": [], "pblanc_url": "https://x.test/7",
}]
fake_send = AsyncMock(return_value={"ok": True, "message_id": 123})
with patch.object(messaging, "send_raw", fake_send):
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches(matches))
assert result["sent"] == 1
assert result["sent_ids"] == [7]
assert result["message_id"] == 123
fake_send.assert_awaited_once()
args, kwargs = fake_send.call_args
text = args[0]
assert "단지A" in text
def test_on_new_matches_telegram_failure_returns_zero():
from app.agents.realestate import RealestateAgent
from app.telegram import messaging
matches = [{
"id": 8, "match_score": 80, "house_nm": "단지B",
"region_name": "서울", "district": "송파구",
"receipt_start": "", "receipt_end": "",
"match_reasons": [], "eligible_types": [], "pblanc_url": "",
}]
fake_send = AsyncMock(return_value={"ok": False, "description": "401"})
with patch.object(messaging, "send_raw", fake_send):
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches(matches))
assert result["sent"] == 0
assert result["sent_ids"] == []
assert "error" in result
def test_endpoint_calls_agent_on_new_matches():
from fastapi.testclient import TestClient
from app.main import app
from app.agents.realestate import RealestateAgent
fake = AsyncMock(return_value={"sent": 1, "sent_ids": [99], "message_id": 1})
with patch.object(RealestateAgent, "on_new_matches", fake):
with TestClient(app) as client:
resp = client.post(
"/api/agent-office/realestate/notify",
json={"matches": [{"id": 99, "match_score": 80}]},
)
assert resp.status_code == 200
body = resp.json()
assert body["sent"] == 1
assert body["sent_ids"] == [99]

View File

@@ -0,0 +1,133 @@
import os
import sys
import tempfile
import gc
from unittest.mock import AsyncMock, patch
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import pytest
@pytest.fixture(autouse=True)
def _init_db():
gc.collect()
if os.path.exists(_TMP):
try:
os.remove(_TMP)
except PermissionError:
pass
from app.db import init_db
init_db()
yield
def test_callback_realestate_bookmark_calls_proxy():
"""callback_data 'realestate_bookmark_42' 가 service_proxy.realestate_bookmark_toggle(42) 를 호출하고
is_bookmarked=1 이면 '추가 완료' 메시지를 전송한다."""
from app import service_proxy
from app.telegram import webhook
fake_toggle = AsyncMock(return_value={"is_bookmarked": 1})
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb1",
"from": {"id": 1},
"data": "realestate_bookmark_42",
}
}
with patch.object(service_proxy, "realestate_bookmark_toggle", fake_toggle), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
fake_toggle.assert_awaited_once_with(42)
assert result == {"ok": True, "announcement_id": 42}
args, _ = fake_send.call_args
assert "추가" in args[0]
def test_callback_realestate_bookmark_invalid_id():
"""callback_data 'realestate_bookmark_abc' 는 ValueError를 처리하고 에러 응답 반환."""
from app import service_proxy
from app.telegram import webhook
fake_toggle = AsyncMock(return_value={"bookmarked": True})
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb2",
"from": {"id": 1},
"data": "realestate_bookmark_abc",
}
}
with patch.object(service_proxy, "realestate_bookmark_toggle", fake_toggle), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
fake_toggle.assert_not_awaited()
assert result is not None
assert result.get("ok") is False
assert result.get("error") == "invalid_callback_data"
def test_callback_realestate_bookmark_proxy_error():
"""service_proxy 가 예외를 던질 때 에러 응답 반환."""
from app import service_proxy
from app.telegram import webhook
fake_toggle = AsyncMock(side_effect=Exception("connection refused"))
fake_send = AsyncMock(return_value={"ok": True})
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb3",
"from": {"id": 1},
"data": "realestate_bookmark_99",
}
}
with patch.object(service_proxy, "realestate_bookmark_toggle", fake_toggle), \
patch("app.telegram.messaging.send_raw", fake_send), \
patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
fake_toggle.assert_awaited_once_with(99)
assert result is not None
assert result.get("ok") is False
assert "connection refused" in result.get("error", "")
def test_non_realestate_callback_uses_db_path():
"""approve_*/reject_* 콜백은 기존 DB 조회 경로를 사용 (realestate 분기를 타지 않음)."""
from app.telegram import webhook
fake_api_call = AsyncMock(return_value={"ok": True})
update = {
"callback_query": {
"id": "cb4",
"from": {"id": 1},
"data": "approve_abcd1234",
}
}
# DB에 등록되지 않은 콜백이므로 None 반환 — 기존 로직 진입 확인
with patch("app.telegram.webhook.api_call", fake_api_call):
result = asyncio.run(webhook.handle_webhook(update))
assert result is None # DB에 없으면 None 반환 (기존 동작 유지)

View File

@@ -0,0 +1,59 @@
def test_format_realestate_match_full_card_single():
from app.telegram.realestate_message import format_realestate_matches
matches = [{
"id": 1,
"match_score": 90,
"house_nm": "디에이치 강남",
"region_name": "서울특별시",
"district": "강남구",
"is_speculative_area": "Y",
"is_price_cap": "Y",
"receipt_start": "2026-05-15",
"receipt_end": "2026-05-19",
"match_reasons": ["광역 일치", "자치구 S티어: 강남구 (+25)", "예산 범위"],
"eligible_types": ["일반1순위", "특별-신혼부부"],
"pblanc_url": "https://example.com/p/1",
}]
text = format_realestate_matches(matches)
assert "디에이치 강남" in text
assert "90점" in text
assert "강남구" in text
assert "2026-05-15" in text
def test_format_realestate_match_compact_when_three_or_more():
from app.telegram.realestate_message import format_realestate_matches
matches = [
{"id": i, "match_score": 90 - i, "house_nm": f"단지{i}", "district": "강남구",
"region_name": "서울특별시", "receipt_start": "2026-05-15", "receipt_end": "2026-05-19",
"match_reasons": [], "eligible_types": [], "pblanc_url": ""}
for i in range(3)
]
text = format_realestate_matches(matches)
assert "3건" in text or "3" in text
for i in range(3):
assert f"단지{i}" in text
def test_build_keyboard_single_match_has_bookmark_and_url():
from app.telegram.realestate_message import build_match_keyboard
matches = [{"id": 42, "pblanc_url": "https://example.com/p/42"}]
kb = build_match_keyboard(matches)
rows = kb["inline_keyboard"]
flat = [b for row in rows for b in row]
assert any(b.get("callback_data", "").startswith("realestate_bookmark_42") for b in flat)
assert any(b.get("url") == "https://example.com/p/42" for b in flat)
def test_build_keyboard_multi_matches_uses_dashboard_link():
from app.telegram.realestate_message import build_match_keyboard
matches = [{"id": i, "pblanc_url": ""} for i in range(3)]
kb = build_match_keyboard(matches)
flat = [b for row in kb["inline_keyboard"] for b in row]
# 3건 이상이면 [전체 보기] 단일 URL 버튼
assert any("전체" in b.get("text", "") for b in flat)
def test_build_keyboard_empty_returns_none():
from app.telegram.realestate_message import build_match_keyboard
assert build_match_keyboard([]) is None

View File

@@ -104,6 +104,7 @@ services:
- TZ=${TZ:-Asia/Seoul} - TZ=${TZ:-Asia/Seoul}
- DATA_GO_KR_API_KEY=${DATA_GO_KR_API_KEY:-} - DATA_GO_KR_API_KEY=${DATA_GO_KR_API_KEY:-}
- CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080} - CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080}
- AGENT_OFFICE_URL=${AGENT_OFFICE_URL:-http://agent-office:8000}
volumes: volumes:
- ${RUNTIME_PATH}/data/realestate:/app/data - ${RUNTIME_PATH}/data/realestate:/app/data
healthcheck: healthcheck:
@@ -126,6 +127,7 @@ services:
- MUSIC_LAB_URL=http://music-lab:8000 - MUSIC_LAB_URL=http://music-lab:8000
- BLOG_LAB_URL=http://blog-lab:8000 - BLOG_LAB_URL=http://blog-lab:8000
- REALESTATE_LAB_URL=http://realestate-lab:8000 - REALESTATE_LAB_URL=http://realestate-lab:8000
- REALESTATE_DASHBOARD_URL=${REALESTATE_DASHBOARD_URL:-http://localhost:8080/realestate}
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN:-} - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN:-}
- TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID:-} - TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID:-}
- TELEGRAM_WEBHOOK_URL=${TELEGRAM_WEBHOOK_URL:-} - TELEGRAM_WEBHOOK_URL=${TELEGRAM_WEBHOOK_URL:-}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,479 @@
# 청약 서비스 타겟팅 고도화 설계
> 대상: `web-backend/realestate-lab/` + `web-backend/agent-office/`
> 후속 별도 스펙: 프론트 자치구 입력 UI(`web-ui`), 청약 가점 vs 커트라인 비교, 서울 외 광역 자치구 파싱
---
## 1. 목표
현재 청약 서비스가 1) 완료된 공고까지 무차별 수집하고, 2) 매칭이 binary라 단지별 의미 있는 점수 차이가 없으며, 3) 데일리 리포트라 "발견 즉시"의 가치를 못 살리는 문제를 해결한다.
### 핵심 변경
- **수집**: 모집공고 30일 이전 + 이미 `완료` 상태인 공고는 저장하지 않음. 90일 경과 완료 공고 자동 정리.
- **단일 SoT**: `user_profile.preferred_regions`를 수집·조회·매칭의 단일 기준점으로 사용 (서울 default).
- **매칭**: 자치구 5티어 가중치(S=100% / A=80% / B=60% / C=40% / D=20%) 도입. 자격 점수 미세 조정.
- **알림**: 데일리 리포트 폐기. "신규 매칭 + 임계값 통과" 즉시 텔레그램 푸시. realestate-lab → agent-office HTTP push.
### 변경하지 않는 것
- 공공데이터 API 엔드포인트 5종 구성
- 매칭 총점 100점 체계
- 텔레그램 봇 토큰·formatter는 agent-office에 단일 보관
- realestate-lab의 09:00 / 00:00 cron 스케줄(기존 그대로 유지, 트리거 로직만 변경)
---
## 2. 아키텍처 변경 개요
### 2.1 변경 포인트
| # | 위치 | 변경 |
|---|------|------|
| 1 | `realestate-lab/collector.py` | API 호출 시 모집공고일 윈도우 사전 적용. 응답 시 `완료` 상태 skip. 자치구 파싱. 90일 경과 완료 공고 정리. |
| 2 | `realestate-lab/db.py` | `user_profile`에 3컬럼, `announcements``district`, `match_results``notified_at` 추가. `delete_old_completed_announcements()` 신규. |
| 3 | `realestate-lab/matcher.py` | 자치구 5티어 가중치 + 자격 점수 재배분. binary → 자치구 그라디언트. |
| 4 | `realestate-lab` 신규 모듈 | `notifier.py`: 임계값 통과 신규 매칭 추출 + agent-office push. `notified_at` 멱등 마킹. |
| 5 | `agent-office/agents/realestate.py` | 데일리 cron 폐기. `on_new_matches(matches)` 신규. 메시지 fmt + 인라인 키보드. |
| 6 | `agent-office/main.py` | `POST /api/agent-office/realestate/notify` 신규 엔드포인트. |
### 2.2 데이터 흐름
```
[09:00 cron] realestate-lab.scheduled_collect()
├─ collect_all()
│ ├─ API 호출 (RCRIT_PBLANC_DE_FROM = today 30일)
│ ├─ 응답 파싱 + district 추출
│ ├─ status='완료' skip → upsert
│ └─ delete_old_completed_announcements(grace_days=90)
├─ run_matching() // 5티어 가중치 적용
└─ notify_new_matches()
├─ SELECT match_results WHERE notified_at IS NULL
│ AND match_score >= profile.min_match_score
│ AND profile.notify_enabled = 1
├─ POST agent-office /api/agent-office/realestate/notify
└─ 성공 → UPDATE notified_at = now()
[agent-office] POST /api/agent-office/realestate/notify
└─ RealestateAgent.on_new_matches(matches)
├─ formatter로 텔레그램 텍스트 + 인라인 키보드 빌드
└─ telegram_bot.send_message()
```
### 2.3 기각된 대안
| 대안 | 기각 사유 |
|------|-----------|
| 매칭 로직을 agent-office에 이식 | 두 서비스에 매칭 코드 복제 → 동기화 부담 |
| 완료 공고 즉시 삭제 | 사용자가 회고 못 함. 90일 grace 채택 |
| agent-office가 realestate-lab을 폴링 | 트래픽 + 지연 |
| realestate-lab이 직접 텔레그램 호출 | 토큰·formatter 분산. 봇 단일 책임 위반 |
| 가격·면적 그라디언트 곡선 | 점수 해석 어려움. binary 유지 (자치구 1축에만 곡선 적용) |
---
## 3. DB 스키마 변경
### 3.1 `user_profile` — 3컬럼 추가
```sql
ALTER TABLE user_profile ADD COLUMN preferred_districts TEXT NOT NULL DEFAULT '{}';
ALTER TABLE user_profile ADD COLUMN min_match_score INTEGER NOT NULL DEFAULT 70;
ALTER TABLE user_profile ADD COLUMN notify_enabled INTEGER NOT NULL DEFAULT 1;
```
- **`preferred_districts`**: JSON. 5티어 분류.
```json
{"S": ["강남구", "서초구"], "A": ["송파구", "마포구"], "B": [], "C": [], "D": []}
```
모든 티어 비어있으면 자치구 기준 미설정으로 간주 (기존 호환 동작).
- **`min_match_score`**: 알림 트리거 임계값(0~100). 기본 70.
- **`notify_enabled`**: 텔레그램 푸시 ON/OFF. 0이면 알림 전체 차단.
### 3.2 `announcements` — `district` 컬럼 추가
```sql
ALTER TABLE announcements ADD COLUMN district TEXT;
CREATE INDEX IF NOT EXISTS idx_ann_district ON announcements(district);
```
- collector가 응답의 `HSSPLY_ADRES` / `region_name`을 정규식 파싱하여 채움.
- 서울 외 지역, 파싱 실패 → NULL.
### 3.3 `match_results` — `notified_at` 컬럼 추가
```sql
ALTER TABLE match_results ADD COLUMN notified_at TEXT;
```
- NULL이면 미알림. 알림 송신 후 `strftime('%Y-%m-%dT%H:%M:%fZ','now')` 기록.
- 기존 `is_new`(사용자가 UI에서 봤는지)와 의미 분리.
### 3.4 신규 함수
```python
def delete_old_completed_announcements(grace_days: int = 90) -> int:
"""winner_date + grace_days 경과한 status='완료' 공고를 삭제.
winner_date가 NULL인 행은 안전하게 보존(수동 검토 대상).
match_results는 FK CASCADE로 자동 삭제. 삭제된 건수 반환.
"""
```
```python
def get_unnotified_matches(min_score: int) -> list[dict]:
"""notified_at IS NULL AND match_score >= min_score 인 매칭 + 공고 정보 조인 반환."""
```
```python
def mark_matches_notified(match_ids: list[int]) -> None:
"""notified_at = now() 일괄 업데이트."""
```
### 3.5 마이그레이션 패턴
기존 db.py의 `init_db()` 안에서 try/except로 컬럼 존재 여부 검사 후 ALTER (운영 DB 무중단).
---
## 4. collector 변경
### 4.1 모집공고일 윈도우 사전 좁힘
```python
def collect_all() -> dict:
today = date.today()
date_from = (today - timedelta(days=30)).strftime("%Y%m%d")
for detail_ep, model_ep in DETAIL_ENDPOINTS:
rows = _api_call(detail_ep, params={
# 공공데이터 API 파라미터명은 엔드포인트별로 다를 수 있음.
# 구현 시 한국부동산원 API 스펙 확인 후 정확한 키 적용.
"RCRIT_PBLANC_DE_FROM": date_from,
})
# ...
```
> ⚠️ **구현 시 검증 필요**: `ApplyhomeInfoDetailSvc`의 5개 엔드포인트가 모두 모집공고일 필터 파라미터를 지원하지 않을 수 있음. 미지원 시 응답 수신 후 클라이언트 측에서 `parsed["rcrit_date"] < date_from` skip하는 fallback을 적용.
### 4.2 `완료` 상태 skip
```python
parsed = _parse_apt_detail(raw)
parsed["district"] = _extract_district(parsed)
status = compute_status(
parsed.get("receipt_start", ""),
parsed.get("receipt_end", ""),
parsed.get("winner_date", ""),
)
if status == "완료":
continue # DB 자원 절감
# 일정 정보 없는 공고 skip (기존 로직 유지)
has_dates = any(parsed.get(f) for f in (...))
if not has_dates:
continue
upsert_announcement(parsed)
```
### 4.3 자치구 추출
```python
DISTRICT_PATTERN = re.compile(r"(?:서울특별시|서울시|서울)\s+(\S+?(?:구|군))")
def _extract_district(parsed: dict) -> str | None:
for src in (parsed.get("address"), parsed.get("region_name")):
if not src:
continue
m = DISTRICT_PATTERN.search(src)
if m:
return m.group(1)
return None
```
### 4.4 정리 + 매칭 + 알림 트리거
```python
def collect_all() -> dict:
# ... 위 수집 로직
save_collect_log(new_count, total_count)
return {"new_count": new_count, "total_count": total_count}
def scheduled_collect():
"""09:00 cron — 수집 + 정리 + 매칭 + 알림"""
collect_all()
deleted = delete_old_completed_announcements(grace_days=90)
logger.info("정리: %d건 삭제", deleted)
run_matching()
notify_new_matches() # NEW
```
---
## 5. matcher 변경
### 5.1 가중치 재배분 (총 100점 유지)
| 축 | 기존 | 신규 |
|----|------|------|
| 지역 | 30 | **35** (광역 10 + 자치구 가중 0~25) |
| 주택유형 | 10 | 10 |
| 면적 | 15 | 15 |
| 가격 | 15 | 15 |
| 자격 | 30 | **25** |
### 5.2 지역 점수 (35점)
```python
TIER_WEIGHTS = {"S": 1.00, "A": 0.80, "B": 0.60, "C": 0.40, "D": 0.20}
def _region_score(profile: dict, ann: dict) -> tuple[int, list[str]]:
region_name = ann.get("region_name") or ""
district = ann.get("district") or ""
preferred_regions = profile.get("preferred_regions") or []
preferred_districts = profile.get("preferred_districts") or {}
region_match = bool(region_name and any(r in region_name for r in preferred_regions))
if not region_match:
return 0, []
# 자치구 기준 미설정 → 광역만으로 풀 점수 (기존 호환)
has_districts = any(preferred_districts.get(t) for t in TIER_WEIGHTS)
if not has_districts:
return 35, [f"선호 지역 일치: {region_name}"]
score = 10
reasons = [f"광역 일치: {region_name}"]
for tier, weight in TIER_WEIGHTS.items():
if district in (preferred_districts.get(tier) or []):
tier_score = round(25 * weight)
score += tier_score
reasons.append(f"자치구 {tier}티어: {district} (+{tier_score})")
break
return score, reasons
```
### 5.3 자격 점수 (25점)
```python
def _eligibility_score(eligible_types: list[str]) -> int:
if not eligible_types:
return 0
score = 15 # 첫 자격
score += min((len(eligible_types) - 1) * 5, 10) # 추가 자격당 +5, 최대 +10
return score
```
다른 축(주택유형 10, 면적 15, 가격 15)은 기존 binary 로직 유지.
### 5.4 매칭 결과 저장
`run_matching()`은 기존 흐름 유지. `match_results.notified_at`은 손대지 않음 (notifier가 관리).
---
## 6. 알림 흐름
### 6.1 realestate-lab 측 — `notifier.py`
```python
import os
import requests
from .db import get_unnotified_matches, mark_matches_notified, get_profile
AGENT_OFFICE_URL = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000")
def notify_new_matches() -> dict:
profile = get_profile()
if not profile or not profile.get("notify_enabled"):
return {"sent": 0, "skipped": "notify_disabled"}
threshold = profile.get("min_match_score", 70)
matches = get_unnotified_matches(threshold)
if not matches:
return {"sent": 0}
try:
resp = requests.post(
f"{AGENT_OFFICE_URL}/api/agent-office/realestate/notify",
json={"matches": matches},
timeout=15,
)
resp.raise_for_status()
body = resp.json()
sent_ids = body.get("sent_ids", [])
if sent_ids:
mark_matches_notified(sent_ids)
return body
except requests.RequestException as e:
logger.error("알림 push 실패: %s", e)
return {"sent": 0, "error": str(e)}
```
알림 push 실패 시 `notified_at`을 채우지 않아 다음 사이클에서 재시도된다.
### 6.2 agent-office 측 — 신규 엔드포인트
```python
# agent-office/main.py
@app.post("/api/agent-office/realestate/notify")
async def realestate_notify(body: dict):
matches = body.get("matches", [])
agent = registry.get("realestate")
result = await agent.on_new_matches(matches)
return result
```
```python
# agents/realestate.py
async def on_new_matches(self, matches: list[dict]) -> dict:
if not matches:
return {"sent": 0, "sent_ids": []}
text = telegram_formatter.format_realestate_matches(matches)
keyboard = telegram_formatter.build_match_keyboard(matches)
tg = await telegram_bot.send_message(text, reply_markup=keyboard)
if not tg.get("ok"):
return {"sent": 0, "sent_ids": [], "error": tg.get("error")}
sent_ids = [m["id"] for m in matches]
return {"sent": len(matches), "sent_ids": sent_ids, "message_id": tg.get("message_id")}
```
### 6.3 텔레그램 메시지 포맷
**3건 이상 — 묶음 카드**
```
🏢 새 청약 매칭 3건
⭐ 92점 — 디에이치 강남 [S]
📍 서울 강남구 (분양가상한제) · 32~45㎡ · 6.2~9.8억
📅 청약 05/15(수) ~ 05/19(일)
⭐ 78점 — 마포 푸르지오 [A]
📍 서울 마포구 · 59~84㎡ · 8.0~11.5억
📅 청약 05/22(수) ~ 05/26(일)
⭐ 72점 — 송파 데시앙 [A]
📍 서울 송파구 · 39~59㎡ · 5.8~7.9억
📅 청약 05/27(월) ~ 05/30(목)
[전체 보기]
```
**1~2건 — 풀 카드**
```
⭐ 90점 — 디에이치 강남 [S]
📍 서울 강남구 (분양가상한제)
🏠 32~45㎡ · 6.2~9.8억
📅 청약 05/15(수) ~ 05/19(일)
✓ 자격: 일반1순위, 특별-신혼부부
💡 광역 일치 / 자치구 S티어 / 예산 범위 / 자격 2개
[🔖 북마크] [📄 공고 보기]
```
### 6.4 인라인 키보드 콜백
| 버튼 | 콜백 동작 |
|------|-----------|
| `[🔖 북마크]` | `PATCH /api/realestate/announcements/{id}/bookmark` (기존 endpoint) |
| `[📄 공고 보기]` | `pblanc_url` (텔레그램 URL 버튼) |
| `[전체 보기]` | 대시보드 deep link (`/realestate?tab=matches`) |
agent-office의 텔레그램 webhook(`/api/agent-office/telegram/webhook`)이 callback_query를 받아 service_proxy로 realestate-lab API 호출.
### 6.5 기존 RealestateAgent 동작 정리
```python
# agent-office/scheduler.py — 09:15 데일리 cron 제거
# scheduler.add_job(realestate_agent.on_schedule, ...) ← REMOVE
```
`RealestateAgent.on_schedule()`은 호출 지점이 사라지므로 제거. `on_command("fetch_matches")`는 수동 트리거(텔레그램 슬래시 명령)용으로 보존하되 `on_new_matches()`를 직접 호출하도록 단순화.
### 6.6 환경변수
| 변수 | 위치 | 기본값 |
|------|------|--------|
| `AGENT_OFFICE_URL` | realestate-lab `.env` | `http://agent-office:8000` |
| `TELEGRAM_BOT_TOKEN` / `TELEGRAM_CHAT_ID` | agent-office (기존) | (기존) |
docker-compose의 사내 네트워크로 호출되므로 외부 노출 없음.
---
## 7. API 변경 요약
### 7.1 realestate-lab
| 메서드 | 경로 | 변경 |
|--------|------|------|
| PUT | `/api/realestate/profile` | body에 `preferred_districts`, `min_match_score`, `notify_enabled` 수용 |
| GET | `/api/realestate/profile` | 응답에 위 3필드 포함 |
| GET | `/api/realestate/announcements` | 응답 item에 `district` 포함 |
| GET | `/api/realestate/announcements/{id}` | 응답에 `district` 포함 |
| GET | `/api/realestate/matches` | 응답 item에 `notified_at` 포함 (디버깅용) |
### 7.2 agent-office
| 메서드 | 경로 | 변경 |
|--------|------|------|
| POST | `/api/agent-office/realestate/notify` | **신규** — realestate-lab 전용 push 수신 |
### 7.3 Pydantic 모델 확장
```python
# realestate-lab/app/models.py
class ProfileUpdate(BaseModel):
# ... 기존 필드
preferred_districts: Optional[Dict[str, List[str]]] = None
min_match_score: Optional[int] = Field(default=None, ge=0, le=100)
notify_enabled: Optional[bool] = None
```
---
## 8. 테스트 전략
| 영역 | 테스트 항목 |
|------|-------------|
| `_extract_district` | "서울특별시 강남구 도곡동" → `"강남구"`, "서울 송파구" → `"송파구"`, "부산 해운대구" → NULL, "" → NULL |
| `compute_status` | 변경 없음. 기존 테스트 유지 |
| `_region_score` | 광역 미매칭 / 광역만 매칭 + 자치구 미설정 / S~D 티어별 / 광역 매칭 + 비선호 자치구 — 5케이스 |
| `_eligibility_score` | 자격 0개 / 1개 / 3개 / 5개 — 점수 단조 증가 + 25 상한 |
| `delete_old_completed_announcements` | winner_date 91일 전 → 삭제, 89일 전 → 보존, status≠'완료' → 보존 |
| collector 사전 좁힘 | mock API 응답으로 30일 윈도우 외 데이터 skip 확인. `완료` skip 확인 |
| `notify_new_matches` 멱등성 | `notified_at` 채워진 매치는 push 후보 제외, push 실패 시 `notified_at` 미기록 → 다음 사이클 재시도 |
| agent-office push endpoint | mock telegram client로 `format_realestate_matches` 호출 + send 검증 |
| 알림 임계값 필터 | min_match_score=70, score=69 → push 대상 외 / score=70 → 포함 |
| `notify_enabled=0` | push 자체 skip |
NAS Docker는 git push 자동 배포이므로 별도 절차 없음. ALTER TABLE은 init_db에서 try/except 패턴으로 운영 DB 무중단 적용.
---
## 9. 스코프
### 본 스펙 범위
- ✅ realestate-lab: collector, matcher, db 변경, notifier 신규
- ✅ agent-office: `/realestate/notify` 엔드포인트, `on_new_matches` 메소드, 메시지 formatter
- ✅ 기존 데일리 RealestateAgent cron 폐기
### 후속 별도 스펙
- ❌ 프론트(`web-ui`) 자치구 5티어 입력 UI (별도 frontend 스펙)
- ❌ 청약 가점 vs 공고별 예상 커트라인 비교 (외부 데이터 의존성, 별도 연구)
- ❌ 서울 외 광역(부산 해운대구 등) 자치구 파싱 확장
- ❌ 매칭 임계값 변경 후 재발송 트리거 (`POST /notifications/resend`)
- ❌ 자치구별 매칭 분포 대시보드 위젯

View File

@@ -1,9 +1,11 @@
import os import os
import re
import logging import logging
from datetime import date, timedelta
import requests import requests
from typing import List, Dict, Any from typing import List, Dict, Any
from .db import upsert_announcement, upsert_model, save_collect_log from .db import upsert_announcement, upsert_model, save_collect_log, compute_status
logger = logging.getLogger("realestate-lab") logger = logging.getLogger("realestate-lab")
@@ -19,6 +21,19 @@ DETAIL_ENDPOINTS = [
("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"), ("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"),
] ]
DISTRICT_PATTERN = re.compile(r"(?:서울특별시|서울시|서울)\s*(\S+?(?:구|군))")
def _extract_district(parsed: Dict[str, Any]) -> str | None:
"""파싱된 공고에서 자치구를 추출. 서울 외 지역·실패 시 None."""
for src in (parsed.get("address"), parsed.get("region_name")):
if not src:
continue
m = DISTRICT_PATTERN.search(src)
if m:
return m.group(1)
return None
def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]: def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]:
"""페이지네이션 처리하여 API 전체 데이터를 반환한다.""" """페이지네이션 처리하여 API 전체 데이터를 반환한다."""
@@ -130,28 +145,49 @@ def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]:
def collect_all() -> Dict[str, Any]: def collect_all() -> Dict[str, Any]:
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.""" """모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.
모집공고일 30일 이전 데이터는 API 파라미터로 사전 좁힘.
status='완료'로 판정되는 응답은 저장하지 않음.
"""
if not API_KEY: if not API_KEY:
logger.warning("API 키 미설정 — 수집 중단") logger.warning("API 키 미설정 — 수집 중단")
save_collect_log(0, 0, "API 키 미설정") save_collect_log(0, 0, "API 키 미설정")
return {"new_count": 0, "total_count": 0} return {"new_count": 0, "total_count": 0}
today = date.today()
date_from = (today - timedelta(days=30)).strftime("%Y%m%d")
total_count = 0 total_count = 0
new_count = 0 new_count = 0
skipped_completed = 0
for detail_ep, model_ep in DETAIL_ENDPOINTS: for detail_ep, model_ep in DETAIL_ENDPOINTS:
# 공고 상세 수집 # 공고 상세 수집 — API에 모집공고일 윈도우 파라미터 전달
detail_rows = _api_call(detail_ep) # 일부 엔드포인트는 파라미터 미지원일 수 있어 무시되지만 응답에 영향 없음
detail_rows = _api_call(detail_ep, params={"RCRIT_PBLANC_DE_FROM": date_from})
for raw in detail_rows: for raw in detail_rows:
try: try:
parsed = _parse_apt_detail(raw) parsed = _parse_apt_detail(raw)
# 일정 정보가 하나도 없는 공고는 건너뜀 parsed["district"] = _extract_district(parsed)
# 일정 정보가 하나도 없는 공고는 건너뜀 (기존)
has_dates = any(parsed.get(f) for f in ( has_dates = any(parsed.get(f) for f in (
"receipt_start", "receipt_end", "spsply_start", "receipt_start", "receipt_end", "spsply_start",
"gnrl_rank1_start", "winner_date", "contract_start", "gnrl_rank1_start", "winner_date", "contract_start",
)) ))
if not has_dates: if not has_dates:
continue continue
# status='완료'면 저장하지 않음 (자원 절감)
status = compute_status(
parsed.get("receipt_start", "") or "",
parsed.get("receipt_end", "") or "",
parsed.get("winner_date", "") or "",
)
if status == "완료":
skipped_completed += 1
continue
_, is_new = upsert_announcement(parsed) _, is_new = upsert_announcement(parsed)
total_count += 1 total_count += 1
if is_new: if is_new:
@@ -168,5 +204,5 @@ def collect_all() -> Dict[str, Any]:
except Exception as e: except Exception as e:
logger.error("모델 upsert 실패 [%s]: %s", model_ep, e) logger.error("모델 upsert 실패 [%s]: %s", model_ep, e)
save_collect_log(new_count, total_count) save_collect_log(new_count, total_count)
logger.info("수집 완료: new=%d, total=%d", new_count, total_count) logger.info("수집 완료: new=%d, total=%d, skipped_completed=%d", new_count, total_count, skipped_completed)
return {"new_count": new_count, "total_count": total_count} return {"new_count": new_count, "total_count": total_count}

View File

@@ -1,5 +1,6 @@
# realestate-lab/app/db.py # realestate-lab/app/db.py
import json import json
import os
import sqlite3 import sqlite3
import logging import logging
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
@@ -7,7 +8,7 @@ from datetime import date
logger = logging.getLogger("realestate-lab") logger = logging.getLogger("realestate-lab")
DB_PATH = "/app/data/realestate.db" DB_PATH = os.getenv("REALESTATE_DB_PATH", "/app/data/realestate.db")
def _conn(): def _conn():
@@ -63,6 +64,13 @@ def init_db():
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_status ON announcements(status);") conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_status ON announcements(status);")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);") conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);")
# ── 마이그레이션: district 컬럼 + 인덱스 추가 ──
try:
conn.execute("SELECT district FROM announcements LIMIT 1")
except Exception:
conn.execute("ALTER TABLE announcements ADD COLUMN district TEXT")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_district ON announcements(district);")
# ── 마이그레이션: is_bookmarked 컬럼 추가 ── # ── 마이그레이션: is_bookmarked 컬럼 추가 ──
try: try:
conn.execute("SELECT is_bookmarked FROM announcements LIMIT 1") conn.execute("SELECT is_bookmarked FROM announcements LIMIT 1")
@@ -119,6 +127,17 @@ def init_db():
); );
""") """)
# ── 마이그레이션: user_profile 신규 3컬럼 ──
for col, ddl in (
("preferred_districts", "ALTER TABLE user_profile ADD COLUMN preferred_districts TEXT NOT NULL DEFAULT '{}'"),
("min_match_score", "ALTER TABLE user_profile ADD COLUMN min_match_score INTEGER NOT NULL DEFAULT 70"),
("notify_enabled", "ALTER TABLE user_profile ADD COLUMN notify_enabled INTEGER NOT NULL DEFAULT 1"),
):
try:
conn.execute(f"SELECT {col} FROM user_profile LIMIT 1")
except Exception:
conn.execute(ddl)
# ── match_results ──────────────────────────────────────────────── # ── match_results ────────────────────────────────────────────────
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS match_results ( CREATE TABLE IF NOT EXISTS match_results (
@@ -134,6 +153,12 @@ def init_db():
); );
""") """)
# ── 마이그레이션: notified_at 컬럼 추가 ──
try:
conn.execute("SELECT notified_at FROM match_results LIMIT 1")
except Exception:
conn.execute("ALTER TABLE match_results ADD COLUMN notified_at TEXT")
# ── collect_log ────────────────────────────────────────────────── # ── collect_log ──────────────────────────────────────────────────
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS collect_log ( CREATE TABLE IF NOT EXISTS collect_log (
@@ -169,6 +194,7 @@ def _ann_row_to_dict(r) -> Dict[str, Any]:
def upsert_announcement(data: Dict[str, Any]) -> tuple: def upsert_announcement(data: Dict[str, Any]) -> tuple:
"""공고 upsert — house_manage_no + pblanc_no 기준. Returns (dict, is_new: bool).""" """공고 upsert — house_manage_no + pblanc_no 기준. Returns (dict, is_new: bool)."""
data.setdefault("district", None) # 수동 등록 등에서 누락 시 안전 처리
status = compute_status( status = compute_status(
data.get("receipt_start", ""), data.get("receipt_start", ""),
data.get("receipt_end", ""), data.get("receipt_end", ""),
@@ -183,7 +209,7 @@ def upsert_announcement(data: Dict[str, Any]) -> tuple:
conn.execute(""" conn.execute("""
INSERT INTO announcements ( INSERT INTO announcements (
house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd, house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd,
rent_secd, region_code, region_name, address, total_units, rent_secd, region_code, region_name, district, address, total_units,
rcrit_date, receipt_start, receipt_end, spsply_start, spsply_end, rcrit_date, receipt_start, receipt_end, spsply_start, spsply_end,
gnrl_rank1_start, gnrl_rank1_end, winner_date, contract_start, gnrl_rank1_start, gnrl_rank1_end, winner_date, contract_start,
contract_end, homepage_url, pblanc_url, constructor, developer, contract_end, homepage_url, pblanc_url, constructor, developer,
@@ -191,7 +217,7 @@ def upsert_announcement(data: Dict[str, Any]) -> tuple:
status, source status, source
) VALUES ( ) VALUES (
:house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd, :house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd,
:rent_secd, :region_code, :region_name, :address, :total_units, :rent_secd, :region_code, :region_name, :district, :address, :total_units,
:rcrit_date, :receipt_start, :receipt_end, :spsply_start, :spsply_end, :rcrit_date, :receipt_start, :receipt_end, :spsply_start, :spsply_end,
:gnrl_rank1_start, :gnrl_rank1_end, :winner_date, :contract_start, :gnrl_rank1_start, :gnrl_rank1_end, :winner_date, :contract_start,
:contract_end, :homepage_url, :pblanc_url, :constructor, :developer, :contract_end, :homepage_url, :pblanc_url, :constructor, :developer,
@@ -205,6 +231,7 @@ def upsert_announcement(data: Dict[str, Any]) -> tuple:
rent_secd=excluded.rent_secd, rent_secd=excluded.rent_secd,
region_code=excluded.region_code, region_code=excluded.region_code,
region_name=excluded.region_name, region_name=excluded.region_name,
district=excluded.district,
address=excluded.address, address=excluded.address,
total_units=excluded.total_units, total_units=excluded.total_units,
rcrit_date=excluded.rcrit_date, rcrit_date=excluded.rcrit_date,
@@ -343,7 +370,7 @@ def create_announcement(data: Dict[str, Any]) -> Dict[str, Any]:
ANNOUNCEMENT_COLUMNS = { ANNOUNCEMENT_COLUMNS = {
"house_nm", "house_secd", "house_dtl_secd", "rent_secd", "house_nm", "house_secd", "house_dtl_secd", "rent_secd",
"region_code", "region_name", "address", "total_units", "region_code", "region_name", "district", "address", "total_units",
"rcrit_date", "receipt_start", "receipt_end", "spsply_start", "spsply_end", "rcrit_date", "receipt_start", "receipt_end", "spsply_start", "spsply_end",
"gnrl_rank1_start", "gnrl_rank1_end", "winner_date", "gnrl_rank1_start", "gnrl_rank1_end", "winner_date",
"contract_start", "contract_end", "homepage_url", "pblanc_url", "contract_start", "contract_end", "homepage_url", "pblanc_url",
@@ -408,6 +435,24 @@ def delete_closed_announcements() -> int:
return cur.rowcount return cur.rowcount
def delete_old_completed_announcements(grace_days: int = 90) -> int:
"""winner_date + grace_days 경과한 status='완료' 공고를 삭제.
winner_date가 NULL인 행은 안전하게 보존(수동 검토 대상).
match_results는 FK CASCADE로 자동 삭제. 삭제된 건수 반환.
"""
with _conn() as conn:
cur = conn.execute(
"""
DELETE FROM announcements
WHERE status = '완료'
AND winner_date IS NOT NULL
AND date(winner_date) < date('now', ?)
""",
(f"-{grace_days} days",),
)
return cur.rowcount
def update_all_statuses(): def update_all_statuses():
"""모든 진행중 공고의 status를 날짜 기반으로 재계산.""" """모든 진행중 공고의 status를 날짜 기반으로 재계산."""
with _conn() as conn: with _conn() as conn:
@@ -538,10 +583,12 @@ def _profile_row_to_dict(r) -> Dict[str, Any]:
for c in r.keys(): for c in r.keys():
val = r[c] val = r[c]
if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed", if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed",
"has_newborn", "is_first_home"): "has_newborn", "is_first_home", "notify_enabled"):
d[c] = bool(val) if val is not None else None d[c] = bool(val) if val is not None else None
elif c in ("preferred_regions", "preferred_types"): elif c in ("preferred_regions", "preferred_types"):
d[c] = json.loads(val) if val else [] d[c] = json.loads(val) if val else []
elif c == "preferred_districts":
d[c] = json.loads(val) if val else {}
else: else:
d[c] = val d[c] = val
return d return d
@@ -562,8 +609,9 @@ PROFILE_COLUMNS = {
"subscription_months", "subscription_amount", "family_members", "subscription_months", "subscription_amount", "family_members",
"has_dependents", "children_count", "is_newlywed", "marriage_months", "has_dependents", "children_count", "is_newlywed", "marriage_months",
"has_newborn", "is_first_home", "income_level", "has_newborn", "is_first_home", "income_level",
"preferred_regions", "preferred_types", "preferred_regions", "preferred_types", "preferred_districts",
"min_area", "max_area", "max_price", "min_area", "max_area", "max_price",
"min_match_score", "notify_enabled",
} }
@@ -574,7 +622,7 @@ def upsert_profile(data: Dict[str, Any]) -> Dict[str, Any]:
continue continue
if isinstance(v, bool): if isinstance(v, bool):
updates[k] = 1 if v else 0 updates[k] = 1 if v else 0
elif isinstance(v, list): elif isinstance(v, (list, dict)):
updates[k] = json.dumps(v) updates[k] = json.dumps(v)
else: else:
updates[k] = v updates[k] = v
@@ -663,6 +711,42 @@ def mark_match_read(match_id: int) -> bool:
return cur.rowcount > 0 return cur.rowcount > 0
def get_unnotified_matches(min_score: int) -> List[Dict[str, Any]]:
"""notified_at IS NULL AND match_score >= min_score 인 매칭과 공고 정보 조인 반환."""
with _conn() as conn:
rows = conn.execute("""
SELECT m.id, m.announcement_id, m.match_score, m.match_reasons, m.eligible_types,
a.house_nm, a.region_name, a.district, a.address, a.status,
a.receipt_start, a.receipt_end, a.winner_date,
a.house_secd, a.is_speculative_area, a.is_price_cap, a.pblanc_url
FROM match_results m
JOIN announcements a ON a.id = m.announcement_id
WHERE m.notified_at IS NULL
AND m.match_score >= ?
ORDER BY m.match_score DESC
""", (min_score,)).fetchall()
items = []
for r in rows:
d = {c: r[c] for c in r.keys()}
d["match_reasons"] = json.loads(d["match_reasons"]) if d["match_reasons"] else []
d["eligible_types"] = json.loads(d["eligible_types"]) if d["eligible_types"] else []
items.append(d)
return items
def mark_matches_notified(match_ids: List[int]) -> None:
"""주어진 match_results IDs의 notified_at을 현재 시각으로 일괄 업데이트."""
if not match_ids:
return
placeholders = ",".join("?" for _ in match_ids)
with _conn() as conn:
conn.execute(
f"UPDATE match_results SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') "
f"WHERE id IN ({placeholders})",
tuple(match_ids),
)
# ── collect_log CRUD ───────────────────────────────────────────────────────── # ── collect_log CRUD ─────────────────────────────────────────────────────────
def save_collect_log(new_count: int, total_count: int, error: str = None): def save_collect_log(new_count: int, total_count: int, error: str = None):

View File

@@ -12,9 +12,11 @@ from .db import (
update_all_statuses, update_all_statuses,
get_profile, upsert_profile, get_matches, mark_match_read, get_profile, upsert_profile, get_matches, mark_match_read,
get_last_collect_log, get_dashboard, get_last_collect_log, get_dashboard,
delete_old_completed_announcements,
) )
from .collector import collect_all from .collector import collect_all
from .matcher import run_matching from .matcher import run_matching
from .notifier import notify_new_matches
from .models import AnnouncementCreate, AnnouncementUpdate, ProfileUpdate from .models import AnnouncementCreate, AnnouncementUpdate, ProfileUpdate
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
@@ -24,11 +26,15 @@ scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
def scheduled_collect(): def scheduled_collect():
"""매일 09:00 — 수집 + 매칭""" """매일 09:00 — 수집 + 정리 + 매칭 + 알림 push"""
logger.info("스케줄 수집 시작") logger.info("스케줄 수집 시작")
collect_all() collect_all()
deleted = delete_old_completed_announcements(grace_days=90)
if deleted:
logger.info("정리: %d건 삭제", deleted)
run_matching() run_matching()
logger.info("스케줄 수집 + 매칭 완료") notify_new_matches()
logger.info("스케줄 수집 + 매칭 + 알림 완료")
def scheduled_status_update(): def scheduled_status_update():
@@ -137,7 +143,9 @@ def _run_collect_and_match():
return return
try: try:
collect_all() collect_all()
delete_old_completed_announcements(grace_days=90)
run_matching() run_matching()
notify_new_matches()
finally: finally:
_collect_lock.release() _collect_lock.release()

View File

@@ -6,6 +6,44 @@ from .db import _conn, _profile_row_to_dict
logger = logging.getLogger("realestate-lab") logger = logging.getLogger("realestate-lab")
TIER_POINTS = {"S": 25, "A": 20, "B": 15, "C": 10, "D": 5}
def _region_score(profile: Dict[str, Any], ann: Dict[str, Any]) -> tuple[int, list[str]]:
"""지역 점수 계산. 광역 10점 + 자치구 5티어 점수 0~25점.
preferred_districts에 자치구가 하나라도 등록되면 티어 가중 모드로 동작.
자치구가 하나도 등록되지 않으면(빈 dict 또는 모든 티어가 빈 리스트) 광역 매칭만으로 35점 풀 점수(기존 호환).
"""
region_name = ann.get("region_name") or ""
district = ann.get("district") or ""
preferred_regions = profile.get("preferred_regions") or []
preferred_districts = profile.get("preferred_districts") or {}
region_match = bool(region_name and any(r in region_name for r in preferred_regions))
if not region_match:
return 0, []
has_districts = any(preferred_districts.values())
if not has_districts:
return 35, [f"선호 지역 일치: {region_name}"]
score = 10
reasons = [f"광역 일치: {region_name}"]
for tier, tier_score in TIER_POINTS.items():
if district and district in (preferred_districts.get(tier) or []):
score += tier_score
reasons.append(f"자치구 {tier}티어: {district} (+{tier_score})")
break
return score, reasons
def _eligibility_score(eligible_types: List[str]) -> int:
"""자격 점수 0~25. 첫 자격 15점 + 추가 자격당 5점, 최대 +10."""
if not eligible_types:
return 0
return 15 + min((len(eligible_types) - 1) * 5, 10)
# house_secd → 주택유형 이름 매핑 # house_secd → 주택유형 이름 매핑
_HOUSE_TYPE_MAP = { _HOUSE_TYPE_MAP = {
"01": "APT", "01": "APT",
@@ -60,18 +98,18 @@ def _compute_score(
ann: Dict[str, Any], ann: Dict[str, Any],
models: List[Dict[str, Any]], models: List[Dict[str, Any]],
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""매칭 점수(0-100)와 사유를 계산한다.""" """매칭 점수(0-100)와 사유를 계산한다.
배분: 지역 35 / 유형 10 / 면적 15 / 가격 15 / 자격 25.
"""
score = 0 score = 0
reasons: List[str] = [] reasons: List[str] = []
# 1. 지역 (30점) # 1. 지역 (35점) — 광역 + 자치구 5티어
preferred_regions = profile.get("preferred_regions") or [] region_score, region_reasons = _region_score(profile, ann)
region_name = ann.get("region_name") or "" score += region_score
if region_name and any(r in region_name for r in preferred_regions): reasons.extend(region_reasons)
score += 30
reasons.append(f"선호 지역 일치: {region_name}")
# 2. 주택유형 (10점) # 2. 주택유형 (10점) — binary
preferred_types = profile.get("preferred_types") or [] preferred_types = profile.get("preferred_types") or []
house_secd = ann.get("house_secd") or "" house_secd = ann.get("house_secd") or ""
type_name = _HOUSE_TYPE_MAP.get(house_secd, house_secd) type_name = _HOUSE_TYPE_MAP.get(house_secd, house_secd)
@@ -79,7 +117,7 @@ def _compute_score(
score += 10 score += 10
reasons.append(f"선호 유형 일치: {type_name}") reasons.append(f"선호 유형 일치: {type_name}")
# 3. 면적 (15점) # 3. 면적 (15점) — binary, 범위 안 모델 1개라도 있으면 통과
min_area = profile.get("min_area") min_area = profile.get("min_area")
max_area = profile.get("max_area") max_area = profile.get("max_area")
if min_area is not None and max_area is not None and models: if min_area is not None and max_area is not None and models:
@@ -90,7 +128,7 @@ def _compute_score(
reasons.append(f"희망 면적 범위 내 모델 존재 ({supply_area}㎡)") reasons.append(f"희망 면적 범위 내 모델 존재 ({supply_area}㎡)")
break break
# 4. 가격 (15점) # 4. 가격 (15점) — binary, 예산 이하 모델 1개라도 있으면 통과
max_price = profile.get("max_price") max_price = profile.get("max_price")
if max_price is not None and models: if max_price is not None and models:
for m in models: for m in models:
@@ -100,11 +138,11 @@ def _compute_score(
reasons.append(f"예산 범위 내 모델 존재 (최고가 {top_amount:,}만원)") reasons.append(f"예산 범위 내 모델 존재 (최고가 {top_amount:,}만원)")
break break
# 5. 자격 (30점) # 5. 자격 (25점) — 첫 자격 15 + 추가당 5
eligible_types = _check_eligible_types(profile, ann) eligible_types = _check_eligible_types(profile, ann)
eligibility_score = min(len(eligible_types) * 10, 30) elig_score = _eligibility_score(eligible_types)
if eligibility_score > 0: if elig_score > 0:
score += eligibility_score score += elig_score
reasons.append(f"자격 유형 {len(eligible_types)}개: {', '.join(eligible_types)}") reasons.append(f"자격 유형 {len(eligible_types)}개: {', '.join(eligible_types)}")
return { return {

View File

@@ -1,5 +1,5 @@
from typing import Optional, List from typing import Optional, List, Dict
from pydantic import BaseModel from pydantic import BaseModel, Field
class AnnouncementCreate(BaseModel): class AnnouncementCreate(BaseModel):
@@ -80,3 +80,7 @@ class ProfileUpdate(BaseModel):
min_area: Optional[float] = None min_area: Optional[float] = None
max_area: Optional[float] = None max_area: Optional[float] = None
max_price: Optional[int] = None max_price: Optional[int] = None
# 신규
preferred_districts: Optional[Dict[str, List[str]]] = None
min_match_score: Optional[int] = Field(default=None, ge=0, le=100)
notify_enabled: Optional[bool] = None

View File

@@ -0,0 +1,46 @@
"""신규 매칭을 agent-office로 push하여 텔레그램 알림을 트리거한다."""
import os
import logging
import requests
from .db import get_profile, get_unnotified_matches, mark_matches_notified
logger = logging.getLogger("realestate-lab")
AGENT_OFFICE_URL = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000")
NOTIFY_TIMEOUT_SECONDS = int(os.getenv("REALESTATE_NOTIFY_TIMEOUT", "15"))
def notify_new_matches() -> dict:
"""프로필의 임계값을 통과한 미알림 매칭을 agent-office로 push한다.
응답이 200이고 sent_ids가 비어있지 않으면 해당 IDs의 notified_at을 마킹.
실패 시 마킹하지 않아 다음 사이클에서 재시도된다.
"""
profile = get_profile()
if not profile:
return {"sent": 0, "skipped": "no_profile"}
if not profile.get("notify_enabled"):
return {"sent": 0, "skipped": "notify_disabled"}
raw_threshold = profile.get("min_match_score")
threshold = 70 if raw_threshold is None else raw_threshold
matches = get_unnotified_matches(threshold)
if not matches:
return {"sent": 0}
url = f"{AGENT_OFFICE_URL}/api/agent-office/realestate/notify"
try:
resp = requests.post(url, json={"matches": matches}, timeout=NOTIFY_TIMEOUT_SECONDS)
resp.raise_for_status()
body = resp.json()
except requests.RequestException as e:
logger.error("agent-office push 실패: %s", e)
return {"sent": 0, "error": str(e)}
sent_ids = body.get("sent_ids") or []
if sent_ids:
mark_matches_notified(sent_ids)
logger.info("알림 송신: %d", len(sent_ids))
return body

View File

View File

@@ -0,0 +1,37 @@
import os
import sys
import tempfile
import pytest
# 테스트 임시 DB 경로를 import 전에 주입
# mkstemp으로 충돌 없는 고유 경로 확보 후 SQLite가 직접 생성하도록 즉시 삭제
_fd, _TMP_DB = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP_DB)
os.environ["REALESTATE_DB_PATH"] = _TMP_DB
# app 패키지 import 가능하게 PYTHONPATH 보정
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 테이블 목록 — init_db가 생성하는 모든 테이블
_USER_TABLES = (
"match_results", # FK CASCADE 대비 자식 테이블 먼저
"announcement_models",
"announcements",
"user_profile",
"collect_log",
)
@pytest.fixture(autouse=True)
def _clean_db():
"""각 테스트마다 DB 테이블 비우기. 파일 삭제 대신 TRUNCATE 패턴 사용해
Windows SQLite 파일 잠금 이슈를 회피한다."""
# deferred to ensure REALESTATE_DB_PATH is set before first module load
from app.db import _conn, init_db
init_db()
with _conn() as conn:
for table in _USER_TABLES:
conn.execute(f"DELETE FROM {table}")
yield

View File

@@ -0,0 +1,138 @@
"""Tests for collector.py — _extract_district unit tests + collect_all integration tests."""
from datetime import date, timedelta
# ── _extract_district unit tests ─────────────────────────────────────────────
def test_extract_district_seoul_full_address():
from app.collector import _extract_district
parsed = {"address": "서울특별시 강남구 도곡동 123-45", "region_name": None}
assert _extract_district(parsed) == "강남구"
def test_extract_district_seoul_short():
from app.collector import _extract_district
parsed = {"address": None, "region_name": "서울 송파구"}
assert _extract_district(parsed) == "송파구"
def test_extract_district_busan_returns_none():
from app.collector import _extract_district
parsed = {"address": "부산광역시 해운대구 우동", "region_name": None}
assert _extract_district(parsed) is None
def test_extract_district_empty_returns_none():
from app.collector import _extract_district
parsed = {"address": "", "region_name": ""}
assert _extract_district(parsed) is None
def test_extract_district_seoul_county():
from app.collector import _extract_district
parsed = {"address": "서울 강서구", "region_name": None}
assert _extract_district(parsed) == "강서구"
def test_extract_district_prefers_address_over_region():
from app.collector import _extract_district
parsed = {"address": "서울특별시 마포구 합정동", "region_name": "서울 강남구"}
assert _extract_district(parsed) == "마포구"
# ── collect_all integration tests ────────────────────────────────────────────
def test_collect_skips_completed_status(monkeypatch):
"""winner_date가 과거인 응답은 status='완료'로 판정되어 upsert되지 않는다."""
from app import collector
from app.db import _conn
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
past_winner = (date.today() - timedelta(days=10)).strftime("%Y-%m-%d")
fake_detail_rows = [{
"HOUSE_MANAGE_NO": "DONE-1",
"PBLANC_NO": "01",
"HOUSE_NM": "완료된단지",
"HSSPLY_ADRES": "서울특별시 강남구",
"RCEPT_BGNDE": "2026-01-01",
"RCEPT_ENDDE": "2026-01-05",
"PRZWNER_PRESNATN_DE": past_winner,
}]
def fake_call(endpoint, params=None):
if "Detail" in endpoint:
return fake_detail_rows
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
with _conn() as conn:
rows = conn.execute("SELECT * FROM announcements WHERE house_manage_no='DONE-1'").fetchall()
assert len(rows) == 0
def test_collect_stores_district_for_seoul_announcement(monkeypatch):
from app import collector
from app.db import _conn
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
future_start = (date.today() + timedelta(days=10)).strftime("%Y-%m-%d")
future_end = (date.today() + timedelta(days=15)).strftime("%Y-%m-%d")
future_winner = (date.today() + timedelta(days=30)).strftime("%Y-%m-%d")
fake_detail = [{
"HOUSE_MANAGE_NO": "SEOUL-1",
"PBLANC_NO": "01",
"HOUSE_NM": "강남단지",
"HSSPLY_ADRES": "서울특별시 강남구 도곡동 1",
"RCEPT_BGNDE": future_start,
"RCEPT_ENDDE": future_end,
"PRZWNER_PRESNATN_DE": future_winner,
}]
def fake_call(endpoint, params=None):
if "Detail" in endpoint:
return fake_detail
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
with _conn() as conn:
row = conn.execute("SELECT district, status FROM announcements WHERE house_manage_no='SEOUL-1'").fetchone()
assert row["district"] == "강남구"
assert row["status"] in ("청약예정", "청약중")
def test_extract_district_no_separator():
"""주소에 공백이 없어도 자치구를 추출해야 한다."""
from app.collector import _extract_district
parsed = {"address": "서울특별시강남구도곡동", "region_name": None}
assert _extract_district(parsed) == "강남구"
def test_collect_passes_date_window_param(monkeypatch):
from app import collector
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
captured_params = []
def fake_call(endpoint, params=None):
captured_params.append(params or {})
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
expected_from = (date.today() - timedelta(days=30)).strftime("%Y%m%d")
detail_calls = [p for p in captured_params if "RCRIT_PBLANC_DE_FROM" in p]
assert detail_calls, "detail 엔드포인트 호출에 윈도우 파라미터가 없음"
assert detail_calls[0]["RCRIT_PBLANC_DE_FROM"] == expected_from

View File

@@ -0,0 +1,11 @@
def test_init_db_creates_tables():
from app.db import _conn
with _conn() as conn:
tables = {row[0] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
)}
assert "announcements" in tables
assert "announcement_models" in tables
assert "user_profile" in tables
assert "match_results" in tables
assert "collect_log" in tables

View File

@@ -0,0 +1,100 @@
import json
from datetime import date, timedelta
from app.db import _conn
def _seed_announcement(house_nm, status, winner_date=None, hmno="HM1", pno="P1"):
with _conn() as conn:
conn.execute("""
INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, winner_date, source)
VALUES (?, ?, ?, ?, ?, 'manual')
""", (hmno, pno, house_nm, status, winner_date))
return conn.execute("SELECT id FROM announcements WHERE house_manage_no=?", (hmno,)).fetchone()["id"]
def test_delete_old_completed_removes_expired():
from app.db import delete_old_completed_announcements
old = (date.today() - timedelta(days=100)).isoformat()
_seed_announcement("OldA", "완료", old, hmno="OLD", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 1
def test_delete_old_completed_keeps_recent():
from app.db import delete_old_completed_announcements
recent = (date.today() - timedelta(days=30)).isoformat()
_seed_announcement("RecentA", "완료", recent, hmno="REC", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0
def test_delete_old_completed_keeps_active():
from app.db import delete_old_completed_announcements
old = (date.today() - timedelta(days=200)).isoformat()
_seed_announcement("ActiveA", "청약중", old, hmno="ACT", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0
def test_delete_old_completed_keeps_null_winner_date():
from app.db import delete_old_completed_announcements
_seed_announcement("NullA", "완료", None, hmno="NULL", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0 # winner_date NULL은 안전 보존
def test_get_unnotified_matches_filters_by_score_and_null():
from app.db import get_unnotified_matches
aid = _seed_announcement("MatchA", "청약중", hmno="MA", pno="1")
with _conn() as conn:
# 임계값 미만
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 50, '[]', '[]', 1)
""", (aid,))
# 임계값 통과 — 미알림
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, 1, 80, '[]', '[]', 1)
""", (aid,))
# 임계값 통과 — 이미 알림됨
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new, notified_at)
VALUES (?, 2, 90, '[]', '[]', 1, '2026-04-01T00:00:00.000Z')
""", (aid,))
matches = get_unnotified_matches(min_score=70)
assert len(matches) == 1
assert matches[0]["match_score"] == 80
assert matches[0]["house_nm"] == "MatchA"
def test_mark_matches_notified_sets_timestamp():
from app.db import mark_matches_notified
aid = _seed_announcement("NotifyA", "청약중", hmno="NT", pno="1")
with _conn() as conn:
cur = conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 80, '[]', '[]', 1)
""", (aid,))
match_id = cur.lastrowid
mark_matches_notified([match_id])
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id = ?", (match_id,)).fetchone()
assert row["notified_at"] is not None
def test_get_unnotified_matches_includes_status():
from app.db import get_unnotified_matches
aid = _seed_announcement("StatusA", "청약중", hmno="ST", pno="1")
with _conn() as conn:
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 80, '[]', '[]', 1)
""", (aid,))
matches = get_unnotified_matches(min_score=70)
status_matches = [m for m in matches if m["house_nm"] == "StatusA"]
assert len(status_matches) == 1
assert status_matches[0]["status"] == "청약중"

View File

@@ -0,0 +1,39 @@
def test_user_profile_has_new_columns():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(user_profile)")}
assert "preferred_districts" in cols
assert "min_match_score" in cols
assert "notify_enabled" in cols
def test_announcements_has_district():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(announcements)")}
assert "district" in cols
def test_match_results_has_notified_at():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(match_results)")}
assert "notified_at" in cols
def test_district_index_exists():
from app.db import _conn
with _conn() as conn:
idx = {row["name"] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='index'"
)}
assert "idx_ann_district" in idx
def test_profile_defaults():
from app.db import upsert_profile, get_profile
upsert_profile({"name": "테스트"})
profile = get_profile()
assert profile["preferred_districts"] == {}
assert profile["min_match_score"] == 70
assert profile["notify_enabled"] is True

View File

@@ -0,0 +1,107 @@
def test_region_score_no_districts_full_when_region_match():
"""자치구 미설정: 광역 일치 시 35점."""
from app.matcher import _region_score
profile = {"preferred_regions": ["서울"], "preferred_districts": {}}
ann = {"region_name": "서울특별시", "district": None}
score, _ = _region_score(profile, ann)
assert score == 35
def test_region_score_no_districts_zero_when_region_mismatch():
from app.matcher import _region_score
profile = {"preferred_regions": ["서울"], "preferred_districts": {}}
ann = {"region_name": "부산광역시", "district": None}
score, _ = _region_score(profile, ann)
assert score == 0
def test_region_score_s_tier_district():
"""광역 매칭 + S티어 자치구: 10 + 25 = 35."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "강남구"}
score, _ = _region_score(profile, ann)
assert score == 35
def test_region_score_a_tier_district():
"""광역 매칭 + A티어 자치구: 10 + 20 = 30."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": ["송파구"], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "송파구"}
score, _ = _region_score(profile, ann)
assert score == 30
def test_region_score_d_tier_district():
"""광역 매칭 + D티어 자치구: 10 + 5 = 15."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": [], "B": [], "C": [], "D": ["도봉구"]},
}
ann = {"region_name": "서울특별시", "district": "도봉구"}
score, _ = _region_score(profile, ann)
assert score == 15
def test_region_score_district_set_but_not_listed():
"""광역 매칭 + 자치구 5티어 어디에도 없음: 10점만."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "강서구"}
score, _ = _region_score(profile, ann)
assert score == 10
def test_eligibility_score_zero_when_empty():
from app.matcher import _eligibility_score
assert _eligibility_score([]) == 0
def test_eligibility_score_one_type_returns_15():
from app.matcher import _eligibility_score
assert _eligibility_score(["일반1순위"]) == 15
def test_eligibility_score_two_types_returns_20():
from app.matcher import _eligibility_score
assert _eligibility_score(["일반1순위", "특별-신혼부부"]) == 20
def test_eligibility_score_caps_at_25():
from app.matcher import _eligibility_score
assert _eligibility_score(["a", "b", "c", "d", "e"]) == 25
def test_region_score_b_tier_district():
"""광역 매칭 + B티어 자치구: 10 + 15 = 25."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": [], "B": ["관악구"], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "관악구"}
score, _ = _region_score(profile, ann)
assert score == 25
def test_region_score_c_tier_district():
"""광역 매칭 + C티어 자치구: 10 + 10 = 20."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": [], "B": [], "C": ["은평구"], "D": []},
}
ann = {"region_name": "서울특별시", "district": "은평구"}
score, _ = _region_score(profile, ann)
assert score == 20

View File

@@ -0,0 +1,124 @@
from unittest.mock import patch, MagicMock
def _seed_profile_and_match(score, notify_enabled=True, threshold=70):
from app.db import _conn, upsert_profile
upsert_profile({
"name": "u",
"notify_enabled": notify_enabled,
"min_match_score": threshold,
})
with _conn() as conn:
conn.execute("""
INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, source)
VALUES ('NF1', '01', '단지', '청약중', 'manual')
""")
ann_id = conn.execute("SELECT id FROM announcements WHERE house_manage_no='NF1'").fetchone()["id"]
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, ?, '[]', '[]', 1)
""", (ann_id, score))
match_id = conn.execute("SELECT id FROM match_results WHERE announcement_id=?", (ann_id,)).fetchone()["id"]
return match_id
def test_notify_skips_when_disabled():
from app import notifier
_seed_profile_and_match(score=80, notify_enabled=False)
with patch.object(notifier, "requests") as r:
result = notifier.notify_new_matches()
assert r.post.call_count == 0
assert result["sent"] == 0
assert result.get("skipped") == "notify_disabled"
def test_notify_filters_below_threshold():
from app import notifier
_seed_profile_and_match(score=60, threshold=70)
with patch.object(notifier, "requests") as r:
result = notifier.notify_new_matches()
assert r.post.call_count == 0
assert result["sent"] == 0
def test_notify_pushes_and_marks_notified():
from app import notifier
from app.db import _conn
match_id = _seed_profile_and_match(score=80, threshold=70)
fake_resp = MagicMock()
fake_resp.json.return_value = {"sent": 1, "sent_ids": [match_id]}
fake_resp.raise_for_status.return_value = None
with patch.object(notifier.requests, "post", return_value=fake_resp) as post:
result = notifier.notify_new_matches()
assert post.call_count == 1
args, kwargs = post.call_args
assert "/api/agent-office/realestate/notify" in args[0]
assert kwargs["json"]["matches"][0]["id"] == match_id
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is not None
assert result["sent"] == 1
def test_notify_does_not_mark_on_failure():
from app import notifier
from app.db import _conn
import requests as real_requests
match_id = _seed_profile_and_match(score=80, threshold=70)
def boom(*a, **k):
raise real_requests.RequestException("agent-office down")
with patch.object(notifier.requests, "post", side_effect=boom):
result = notifier.notify_new_matches()
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is None
assert result["sent"] == 0
assert "error" in result
def test_notify_does_not_mark_when_sent_ids_empty():
"""agent-office가 200 OK + sent_ids=[]을 반환하면 마킹하지 않고 다음 사이클 재시도 가능."""
from app import notifier
from app.db import _conn
match_id = _seed_profile_and_match(score=80, threshold=70)
fake_resp = MagicMock()
fake_resp.json.return_value = {"sent": 0, "sent_ids": []}
fake_resp.raise_for_status.return_value = None
with patch.object(notifier.requests, "post", return_value=fake_resp):
result = notifier.notify_new_matches()
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is None
assert result["sent"] == 0
def test_notify_threshold_zero_pushes_all_matches():
"""min_match_score=0이면 모든 양수 점수 매치를 알림."""
from app import notifier
from app.db import _conn
match_id = _seed_profile_and_match(score=10, threshold=0)
fake_resp = MagicMock()
fake_resp.json.return_value = {"sent": 1, "sent_ids": [match_id]}
fake_resp.raise_for_status.return_value = None
with patch.object(notifier.requests, "post", return_value=fake_resp) as post:
result = notifier.notify_new_matches()
assert post.call_count == 1
assert result["sent"] == 1

View File

@@ -0,0 +1,38 @@
from fastapi.testclient import TestClient
def test_profile_update_accepts_new_fields():
from app.main import app
body = {
"name": "테스트",
"preferred_districts": {
"S": ["강남구", "서초구"],
"A": ["송파구"],
"B": [],
"C": [],
"D": [],
},
"min_match_score": 75,
"notify_enabled": True,
}
with TestClient(app) as client:
resp = client.put("/api/realestate/profile", json=body)
assert resp.status_code == 200
data = resp.json()
assert data["preferred_districts"]["S"] == ["강남구", "서초구"]
assert data["min_match_score"] == 75
assert data["notify_enabled"] is True
def test_profile_get_returns_defaults_for_new_fields():
from app.main import app
from app.db import upsert_profile
upsert_profile({"name": "기본"})
with TestClient(app) as client:
resp = client.get("/api/realestate/profile")
assert resp.status_code == 200
data = resp.json()
assert data["preferred_districts"] == {}
assert data["min_match_score"] == 70
assert data["notify_enabled"] is True

View File

@@ -0,0 +1,30 @@
from unittest.mock import patch
def test_scheduled_collect_calls_cleanup_and_notifier():
from app import main as app_main
calls = []
def fake_collect():
calls.append("collect")
return {"new_count": 0, "total_count": 0}
def fake_cleanup(grace_days=90):
calls.append(("cleanup", grace_days))
return 0
def fake_match():
calls.append("match")
def fake_notify():
calls.append("notify")
return {"sent": 0}
with patch.object(app_main, "collect_all", side_effect=fake_collect), \
patch.object(app_main, "delete_old_completed_announcements", side_effect=fake_cleanup), \
patch.object(app_main, "run_matching", side_effect=fake_match), \
patch.object(app_main, "notify_new_matches", side_effect=fake_notify):
app_main.scheduled_collect()
assert calls == ["collect", ("cleanup", 90), "match", "notify"]