12 task TDD 분할: - realestate-lab: 테스트 셋업 → 스키마 마이그 → 신규 함수 → collector/matcher → profile API → notifier → 흐름 통합 - agent-office: 텔레그램 fmt → on_new_matches + endpoint → cron 폐기 + 콜백 라우팅 - 마지막: docker-compose 환경변수 + 회귀 검증 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
75 KiB
청약 서비스 타겟팅 고도화 구현 계획
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: realestate-lab의 수집·매칭을 자치구 5티어 가중치 기반으로 정교화하고, agent-office를 통해 신규 매칭 발견 즉시 텔레그램 푸시 한다.
Architecture: realestate-lab은 09:00 cron에서 collect → 정리 → 매칭 → notifier 푸시 순으로 진행. notifier는 임계값 통과한 미알림 매칭을 모아 agent-office의 /api/agent-office/realestate/notify 엔드포인트로 HTTP push. agent-office의 RealestateAgent.on_new_matches()가 텔레그램 메시지 fmt + 인라인 키보드 빌드 + 송신 후 notified_at를 마킹. 데일리 리포트 cron은 폐기.
Tech Stack: Python 3.12 / FastAPI / SQLite (WAL) / APScheduler / requests / pytest / unittest.mock / aiogram-style raw Telegram Bot API.
스펙 참조: docs/superpowers/specs/2026-04-28-realestate-targeting-enhancement-design.md
Task 1: realestate-lab 테스트 환경 셋업 (DB 경로 환경변수 + conftest)
테스트가 운영 DB(/app/data/realestate.db)를 건드리지 않도록 환경변수로 DB 경로 오버라이드 가능하게 만든다.
Files:
-
Modify:
realestate-lab/app/db.py:10(DB_PATH 환경변수화) -
Create:
realestate-lab/tests/__init__.py -
Create:
realestate-lab/tests/conftest.py -
Create:
realestate-lab/tests/test_db_basic.py(smoke test) -
Step 1: DB_PATH를 환경변수로 변경
realestate-lab/app/db.py:10 수정.
기존:
DB_PATH = "/app/data/realestate.db"
변경:
import os
DB_PATH = os.getenv("REALESTATE_DB_PATH", "/app/data/realestate.db")
os import는 파일 상단의 import 블록에 추가.
- Step 2: tests 디렉토리 패키지 표시
realestate-lab/tests/__init__.py 생성. 비어 있는 파일.
- Step 3: conftest.py 생성 — 임시 DB 픽스처
realestate-lab/tests/conftest.py:
import os
import sys
import tempfile
import pytest
# 테스트 임시 DB 경로를 import 전에 주입
_TMP_DB = tempfile.mktemp(suffix=".db")
os.environ["REALESTATE_DB_PATH"] = _TMP_DB
# app 패키지 import 가능하게 PYTHONPATH 보정
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@pytest.fixture(autouse=True)
def _clean_db():
"""각 테스트마다 DB 초기화."""
if os.path.exists(_TMP_DB):
os.remove(_TMP_DB)
from app.db import init_db
init_db()
yield
if os.path.exists(_TMP_DB):
os.remove(_TMP_DB)
- Step 4: smoke test 작성
realestate-lab/tests/test_db_basic.py:
def test_init_db_creates_tables():
from app.db import _conn
with _conn() as conn:
tables = {row[0] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
)}
assert "announcements" in tables
assert "announcement_models" in tables
assert "user_profile" in tables
assert "match_results" in tables
assert "collect_log" in tables
- Step 5: 테스트 실행 확인
Run: cd realestate-lab && python -m pytest tests/ -v
Expected: 1 passed
- Step 6: 커밋
git add realestate-lab/app/db.py realestate-lab/tests/__init__.py realestate-lab/tests/conftest.py realestate-lab/tests/test_db_basic.py
git commit -m "test(realestate): add pytest harness with isolated SQLite fixture"
Task 2: realestate-lab DB 스키마 마이그레이션
user_profile에 3 컬럼, announcements에 district, match_results에 notified_at 추가. init_db() 안에서 try/except 패턴으로 운영 DB 무중단 마이그레이션.
Files:
-
Modify:
realestate-lab/app/db.py(init_db 내부 ALTER 추가, _profile_row_to_dict 확장, PROFILE_COLUMNS 확장) -
Test:
realestate-lab/tests/test_db_migration.py -
Step 1: 마이그레이션 단위 테스트 작성 (실패 예상)
realestate-lab/tests/test_db_migration.py:
def test_user_profile_has_new_columns():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(user_profile)")}
assert "preferred_districts" in cols
assert "min_match_score" in cols
assert "notify_enabled" in cols
def test_announcements_has_district():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(announcements)")}
assert "district" in cols
def test_match_results_has_notified_at():
from app.db import _conn
with _conn() as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(match_results)")}
assert "notified_at" in cols
def test_district_index_exists():
from app.db import _conn
with _conn() as conn:
idx = {row["name"] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='index'"
)}
assert "idx_ann_district" in idx
def test_profile_defaults():
from app.db import upsert_profile, get_profile
upsert_profile({"name": "테스트"})
profile = get_profile()
assert profile["preferred_districts"] == {}
assert profile["min_match_score"] == 70
assert profile["notify_enabled"] is True
- Step 2: 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_db_migration.py -v
Expected: 5 failed (컬럼 없음, default 미적용)
- Step 3: announcements 테이블에 district 컬럼 + 인덱스 마이그레이션
realestate-lab/app/db.py 의 init_db() 안, idx_ann_region 인덱스 생성 다음 줄에 추가.
찾을 위치 (현재 코드):
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);")
# ── 마이그레이션: is_bookmarked 컬럼 추가 ──
다음과 같이 수정:
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);")
# ── 마이그레이션: district 컬럼 + 인덱스 추가 ──
try:
conn.execute("SELECT district FROM announcements LIMIT 1")
except Exception:
conn.execute("ALTER TABLE announcements ADD COLUMN district TEXT")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_district ON announcements(district);")
# ── 마이그레이션: is_bookmarked 컬럼 추가 ──
- Step 4: user_profile에 3 컬럼 마이그레이션
init_db()의 user_profile CREATE 문 뒤에 추가. 현재 match_results CREATE 문 시작 부분 직전에 끼워 넣음.
찾을 위치:
);
""")
# ── match_results ────────────────────────────────────────────────
(이 직전이 user_profile CREATE 종료점)
다음과 같이 추가:
);
""")
# ── 마이그레이션: user_profile 신규 3컬럼 ──
for col, ddl in (
("preferred_districts", "ALTER TABLE user_profile ADD COLUMN preferred_districts TEXT NOT NULL DEFAULT '{}'"),
("min_match_score", "ALTER TABLE user_profile ADD COLUMN min_match_score INTEGER NOT NULL DEFAULT 70"),
("notify_enabled", "ALTER TABLE user_profile ADD COLUMN notify_enabled INTEGER NOT NULL DEFAULT 1"),
):
try:
conn.execute(f"SELECT {col} FROM user_profile LIMIT 1")
except Exception:
conn.execute(ddl)
# ── match_results ────────────────────────────────────────────────
- Step 5: match_results에 notified_at 컬럼 마이그레이션
init_db()의 match_results CREATE 직후에 추가.
찾을 위치:
UNIQUE(announcement_id, model_id)
);
""")
# ── collect_log ──────────────────────────────────────────────────
다음과 같이 변경:
UNIQUE(announcement_id, model_id)
);
""")
# ── 마이그레이션: notified_at 컬럼 추가 ──
try:
conn.execute("SELECT notified_at FROM match_results LIMIT 1")
except Exception:
conn.execute("ALTER TABLE match_results ADD COLUMN notified_at TEXT")
# ── collect_log ──────────────────────────────────────────────────
- Step 6: PROFILE_COLUMNS와 _profile_row_to_dict, upsert_profile에 신규 필드 처리
realestate-lab/app/db.py의 _profile_row_to_dict 함수 (현재 파일 라인 ~536)를 다음과 같이 수정:
def _profile_row_to_dict(r) -> Dict[str, Any]:
d = {}
for c in r.keys():
val = r[c]
if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed",
"has_newborn", "is_first_home", "notify_enabled"):
d[c] = bool(val) if val is not None else None
elif c in ("preferred_regions", "preferred_types"):
d[c] = json.loads(val) if val else []
elif c == "preferred_districts":
d[c] = json.loads(val) if val else {}
else:
d[c] = val
return d
PROFILE_COLUMNS 상수 (현재 라인 ~560)를 다음과 같이 확장:
PROFILE_COLUMNS = {
"name", "age", "is_homeless", "is_householder",
"subscription_months", "subscription_amount", "family_members",
"has_dependents", "children_count", "is_newlywed", "marriage_months",
"has_newborn", "is_first_home", "income_level",
"preferred_regions", "preferred_types", "preferred_districts",
"min_area", "max_area", "max_price",
"min_match_score", "notify_enabled",
}
upsert_profile 함수 안에서 list 처리 분기는 이미 있으므로, dict 분기를 추가해야 한다. 현재 코드:
if isinstance(v, bool):
updates[k] = 1 if v else 0
elif isinstance(v, list):
updates[k] = json.dumps(v)
else:
updates[k] = v
다음과 같이 수정:
if isinstance(v, bool):
updates[k] = 1 if v else 0
elif isinstance(v, (list, dict)):
updates[k] = json.dumps(v)
else:
updates[k] = v
- Step 7: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_db_migration.py tests/test_db_basic.py -v
Expected: 6 passed
- Step 8: 커밋
git add realestate-lab/app/db.py realestate-lab/tests/test_db_migration.py
git commit -m "feat(realestate-db): add district / notify / 5tier columns with migration"
Task 3: realestate-lab DB 신규 함수 3종
delete_old_completed_announcements, get_unnotified_matches, mark_matches_notified.
Files:
-
Modify:
realestate-lab/app/db.py(함수 3종 추가) -
Test:
realestate-lab/tests/test_db_functions.py -
Step 1: 테스트 작성 — delete_old_completed_announcements
realestate-lab/tests/test_db_functions.py:
import json
from datetime import date, timedelta
from app.db import _conn
def _seed_announcement(house_nm, status, winner_date=None, hmno="HM1", pno="P1"):
with _conn() as conn:
conn.execute("""
INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, winner_date, source)
VALUES (?, ?, ?, ?, ?, 'manual')
""", (hmno, pno, house_nm, status, winner_date))
return conn.execute("SELECT id FROM announcements WHERE house_manage_no=?", (hmno,)).fetchone()["id"]
def test_delete_old_completed_removes_expired():
from app.db import delete_old_completed_announcements
old = (date.today() - timedelta(days=100)).isoformat()
_seed_announcement("OldA", "완료", old, hmno="OLD", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 1
def test_delete_old_completed_keeps_recent():
from app.db import delete_old_completed_announcements
recent = (date.today() - timedelta(days=30)).isoformat()
_seed_announcement("RecentA", "완료", recent, hmno="REC", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0
def test_delete_old_completed_keeps_active():
from app.db import delete_old_completed_announcements
old = (date.today() - timedelta(days=200)).isoformat()
_seed_announcement("ActiveA", "청약중", old, hmno="ACT", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0
def test_delete_old_completed_keeps_null_winner_date():
from app.db import delete_old_completed_announcements
_seed_announcement("NullA", "완료", None, hmno="NULL", pno="1")
deleted = delete_old_completed_announcements(grace_days=90)
assert deleted == 0 # winner_date NULL은 안전 보존
def test_get_unnotified_matches_filters_by_score_and_null():
from app.db import get_unnotified_matches
aid = _seed_announcement("MatchA", "청약중", hmno="MA", pno="1")
with _conn() as conn:
# 임계값 미만
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 50, '[]', '[]', 1)
""", (aid,))
# 임계값 통과 — 미알림
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, 1, 80, '[]', '[]', 1)
""", (aid,))
# 임계값 통과 — 이미 알림됨
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new, notified_at)
VALUES (?, 2, 90, '[]', '[]', 1, '2026-04-01T00:00:00.000Z')
""", (aid,))
matches = get_unnotified_matches(min_score=70)
assert len(matches) == 1
assert matches[0]["match_score"] == 80
assert matches[0]["house_nm"] == "MatchA"
def test_mark_matches_notified_sets_timestamp():
from app.db import mark_matches_notified
aid = _seed_announcement("NotifyA", "청약중", hmno="NT", pno="1")
with _conn() as conn:
cur = conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, 80, '[]', '[]', 1)
""", (aid,))
match_id = cur.lastrowid
mark_matches_notified([match_id])
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id = ?", (match_id,)).fetchone()
assert row["notified_at"] is not None
- Step 2: 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_db_functions.py -v
Expected: 6 failed (함수 미정의)
- Step 3: db.py에 함수 3종 추가
realestate-lab/app/db.py 파일의 delete_closed_announcements 함수 다음에 추가 (현재 라인 ~408 인근).
찾을 위치:
def delete_closed_announcements() -> int:
"""status='완료' 공고 일괄 삭제. 삭제된 건수 반환."""
with _conn() as conn:
cur = conn.execute("DELETE FROM announcements WHERE status = '완료'")
return cur.rowcount
직후에 추가:
def delete_old_completed_announcements(grace_days: int = 90) -> int:
"""winner_date + grace_days 경과한 status='완료' 공고를 삭제.
winner_date가 NULL인 행은 안전하게 보존(수동 검토 대상).
match_results는 FK CASCADE로 자동 삭제. 삭제된 건수 반환.
"""
with _conn() as conn:
cur = conn.execute(
"""
DELETE FROM announcements
WHERE status = '완료'
AND winner_date IS NOT NULL
AND date(winner_date) < date('now', ?)
""",
(f"-{grace_days} days",),
)
return cur.rowcount
같은 파일의 mark_match_read 함수 다음에 추가 (현재 라인 ~660 인근). 찾을 위치:
def mark_match_read(match_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("UPDATE match_results SET is_new = 0 WHERE id = ?", (match_id,))
return cur.rowcount > 0
직후에 추가:
def get_unnotified_matches(min_score: int) -> List[Dict[str, Any]]:
"""notified_at IS NULL AND match_score >= min_score 인 매칭과 공고 정보 조인 반환."""
with _conn() as conn:
rows = conn.execute("""
SELECT m.id, m.announcement_id, m.match_score, m.match_reasons, m.eligible_types,
a.house_nm, a.region_name, a.district, a.address,
a.receipt_start, a.receipt_end, a.winner_date,
a.house_secd, a.is_speculative_area, a.is_price_cap, a.pblanc_url
FROM match_results m
JOIN announcements a ON a.id = m.announcement_id
WHERE m.notified_at IS NULL
AND m.match_score >= ?
ORDER BY m.match_score DESC
""", (min_score,)).fetchall()
items = []
for r in rows:
d = {c: r[c] for c in r.keys()}
d["match_reasons"] = json.loads(d["match_reasons"]) if d["match_reasons"] else []
d["eligible_types"] = json.loads(d["eligible_types"]) if d["eligible_types"] else []
items.append(d)
return items
def mark_matches_notified(match_ids: List[int]) -> None:
"""주어진 match_results IDs의 notified_at을 현재 시각으로 일괄 업데이트."""
if not match_ids:
return
placeholders = ",".join("?" for _ in match_ids)
with _conn() as conn:
conn.execute(
f"UPDATE match_results SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') "
f"WHERE id IN ({placeholders})",
match_ids,
)
- Step 4: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_db_functions.py -v
Expected: 6 passed
- Step 5: 커밋
git add realestate-lab/app/db.py realestate-lab/tests/test_db_functions.py
git commit -m "feat(realestate-db): add notify queue + 90-day grace cleanup"
Task 4: realestate-lab collector 변경
_extract_district 추가, 모집공고일 윈도우 사전 좁힘, 완료 상태 skip.
Files:
-
Modify:
realestate-lab/app/collector.py -
Test:
realestate-lab/tests/test_collector.py -
Step 1: _extract_district 단위 테스트 작성
realestate-lab/tests/test_collector.py:
def test_extract_district_seoul_full_address():
from app.collector import _extract_district
parsed = {"address": "서울특별시 강남구 도곡동 123-45", "region_name": None}
assert _extract_district(parsed) == "강남구"
def test_extract_district_seoul_short():
from app.collector import _extract_district
parsed = {"address": None, "region_name": "서울 송파구"}
assert _extract_district(parsed) == "송파구"
def test_extract_district_busan_returns_none():
from app.collector import _extract_district
parsed = {"address": "부산광역시 해운대구 우동", "region_name": None}
assert _extract_district(parsed) is None
def test_extract_district_empty_returns_none():
from app.collector import _extract_district
parsed = {"address": "", "region_name": ""}
assert _extract_district(parsed) is None
def test_extract_district_seoul_county():
from app.collector import _extract_district
parsed = {"address": "서울 강서구", "region_name": None}
assert _extract_district(parsed) == "강서구"
def test_extract_district_prefers_address_over_region():
from app.collector import _extract_district
parsed = {"address": "서울특별시 마포구 합정동", "region_name": "서울 강남구"}
assert _extract_district(parsed) == "마포구"
- Step 2: 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_collector.py -v
Expected: 6 failed (_extract_district 미정의)
- Step 3: collector.py에 _extract_district 추가
realestate-lab/app/collector.py의 import 블록 변경:
import os
import re
import logging
from datetime import date, timedelta
import requests
from typing import List, Dict, Any
(re, date, timedelta 추가)
기존 _parse_apt_detail 함수 위에 정규식 + 헬퍼 함수 추가:
DISTRICT_PATTERN = re.compile(r"(?:서울특별시|서울시|서울)\s+(\S+?(?:구|군))")
def _extract_district(parsed: Dict[str, Any]) -> str | None:
"""파싱된 공고에서 자치구를 추출. 서울 외 지역·실패 시 None."""
for src in (parsed.get("address"), parsed.get("region_name")):
if not src:
continue
m = DISTRICT_PATTERN.search(src)
if m:
return m.group(1)
return None
- Step 4: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_collector.py -v
Expected: 6 passed
- Step 5: collect_all 통합 테스트 추가 — 완료 skip + 윈도우
realestate-lab/tests/test_collector.py 파일 끝에 추가:
from datetime import date, timedelta
from unittest.mock import patch
def test_collect_skips_completed_status(monkeypatch):
"""winner_date가 과거인 응답은 status='완료'로 판정되어 upsert되지 않는다."""
from app import collector
from app.db import _conn
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
# 모듈 상수도 갱신
monkeypatch.setattr(collector, "API_KEY", "TEST")
past_winner = (date.today() - timedelta(days=10)).strftime("%Y-%m-%d")
fake_detail_rows = [{
"HOUSE_MANAGE_NO": "DONE-1",
"PBLANC_NO": "01",
"HOUSE_NM": "완료된단지",
"HSSPLY_ADRES": "서울특별시 강남구",
"RCEPT_BGNDE": "2026-01-01",
"RCEPT_ENDDE": "2026-01-05",
"PRZWNER_PRESNATN_DE": past_winner,
}]
def fake_call(endpoint, params=None):
if "Detail" in endpoint:
return fake_detail_rows
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
with _conn() as conn:
rows = conn.execute("SELECT * FROM announcements WHERE house_manage_no='DONE-1'").fetchall()
assert len(rows) == 0
def test_collect_stores_district_for_seoul_announcement(monkeypatch):
from app import collector
from app.db import _conn
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
future_start = (date.today() + timedelta(days=10)).strftime("%Y-%m-%d")
future_end = (date.today() + timedelta(days=15)).strftime("%Y-%m-%d")
future_winner = (date.today() + timedelta(days=30)).strftime("%Y-%m-%d")
fake_detail = [{
"HOUSE_MANAGE_NO": "SEOUL-1",
"PBLANC_NO": "01",
"HOUSE_NM": "강남단지",
"HSSPLY_ADRES": "서울특별시 강남구 도곡동 1",
"RCEPT_BGNDE": future_start,
"RCEPT_ENDDE": future_end,
"PRZWNER_PRESNATN_DE": future_winner,
}]
def fake_call(endpoint, params=None):
if "Detail" in endpoint:
return fake_detail
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
with _conn() as conn:
row = conn.execute("SELECT district, status FROM announcements WHERE house_manage_no='SEOUL-1'").fetchone()
assert row["district"] == "강남구"
assert row["status"] in ("청약예정", "청약중")
def test_collect_passes_date_window_param(monkeypatch):
from app import collector
monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST")
monkeypatch.setattr(collector, "API_KEY", "TEST")
captured_params = []
def fake_call(endpoint, params=None):
captured_params.append(params or {})
return []
monkeypatch.setattr(collector, "_api_call", fake_call)
collector.collect_all()
expected_from = (date.today() - timedelta(days=30)).strftime("%Y%m%d")
detail_calls = [p for p in captured_params if "RCRIT_PBLANC_DE_FROM" in p]
assert detail_calls, "detail 엔드포인트 호출에 윈도우 파라미터가 없음"
assert detail_calls[0]["RCRIT_PBLANC_DE_FROM"] == expected_from
- Step 6: 통합 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_collector.py -v -k "test_collect"
Expected: 3 failed (윈도우/완료/district 미적용)
- Step 7: collect_all 본문 변경 — 윈도우 + skip + district
realestate-lab/app/collector.py의 collect_all 함수를 다음과 같이 수정:
기존:
def collect_all() -> Dict[str, Any]:
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다."""
if not API_KEY:
logger.warning("API 키 미설정 — 수집 중단")
save_collect_log(0, 0, "API 키 미설정")
return {"new_count": 0, "total_count": 0}
total_count = 0
new_count = 0
for detail_ep, model_ep in DETAIL_ENDPOINTS:
# 공고 상세 수집
detail_rows = _api_call(detail_ep)
for raw in detail_rows:
try:
parsed = _parse_apt_detail(raw)
# 일정 정보가 하나도 없는 공고는 건너뜀
has_dates = any(parsed.get(f) for f in (
"receipt_start", "receipt_end", "spsply_start",
"gnrl_rank1_start", "winner_date", "contract_start",
))
if not has_dates:
continue
_, is_new = upsert_announcement(parsed)
total_count += 1
if is_new:
new_count += 1
except Exception as e:
logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e)
변경:
def collect_all() -> Dict[str, Any]:
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.
모집공고일 30일 이전 데이터는 API 파라미터로 사전 좁힘.
status='완료'로 판정되는 응답은 저장하지 않음.
"""
if not API_KEY:
logger.warning("API 키 미설정 — 수집 중단")
save_collect_log(0, 0, "API 키 미설정")
return {"new_count": 0, "total_count": 0}
today = date.today()
date_from = (today - timedelta(days=30)).strftime("%Y%m%d")
total_count = 0
new_count = 0
skipped_completed = 0
for detail_ep, model_ep in DETAIL_ENDPOINTS:
# 공고 상세 수집 — API에 모집공고일 윈도우 파라미터 전달
# 일부 엔드포인트는 파라미터 미지원일 수 있어 무시되지만 응답에 영향 없음
detail_rows = _api_call(detail_ep, params={"RCRIT_PBLANC_DE_FROM": date_from})
for raw in detail_rows:
try:
parsed = _parse_apt_detail(raw)
parsed["district"] = _extract_district(parsed)
# 일정 정보가 하나도 없는 공고는 건너뜀 (기존)
has_dates = any(parsed.get(f) for f in (
"receipt_start", "receipt_end", "spsply_start",
"gnrl_rank1_start", "winner_date", "contract_start",
))
if not has_dates:
continue
# status='완료'면 저장하지 않음 (자원 절감)
from .db import compute_status
status = compute_status(
parsed.get("receipt_start", "") or "",
parsed.get("receipt_end", "") or "",
parsed.get("winner_date", "") or "",
)
if status == "완료":
skipped_completed += 1
continue
_, is_new = upsert_announcement(parsed)
total_count += 1
if is_new:
new_count += 1
except Exception as e:
logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e)
- Step 8: upsert_announcement에 district 컬럼 처리 — db.py 변경
realestate-lab/app/db.py의 upsert_announcement 함수의 INSERT 컬럼 목록과 ON CONFLICT 절에 district를 추가.
찾을 위치 (INSERT INTO announcements 부분):
conn.execute("""
INSERT INTO announcements (
house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd,
rent_secd, region_code, region_name, address, total_units,
다음과 같이 변경:
conn.execute("""
INSERT INTO announcements (
house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd,
rent_secd, region_code, region_name, district, address, total_units,
VALUES 블록도 변경: 기존:
) VALUES (
:house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd,
:rent_secd, :region_code, :region_name, :address, :total_units,
변경:
) VALUES (
:house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd,
:rent_secd, :region_code, :region_name, :district, :address, :total_units,
ON CONFLICT 절에도 추가. 찾을 위치:
region_code=excluded.region_code,
region_name=excluded.region_name,
address=excluded.address,
다음과 같이 변경:
region_code=excluded.region_code,
region_name=excluded.region_name,
district=excluded.district,
address=excluded.address,
마지막으로, upsert_announcement 함수 시작부에 district 기본값 보정 추가. 함수 첫 줄 직후:
def upsert_announcement(data: Dict[str, Any]) -> tuple:
"""공고 upsert — house_manage_no + pblanc_no 기준. Returns (dict, is_new: bool)."""
data.setdefault("district", None) # 수동 등록 등에서 누락 시 안전 처리
status = compute_status(
...
ANNOUNCEMENT_COLUMNS 상수에도 추가:
ANNOUNCEMENT_COLUMNS = {
"house_nm", "house_secd", "house_dtl_secd", "rent_secd",
"region_code", "region_name", "district", "address", "total_units",
...
}
- Step 9: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_collector.py -v
Expected: 9 passed
- Step 10: 커밋
git add realestate-lab/app/collector.py realestate-lab/app/db.py realestate-lab/tests/test_collector.py
git commit -m "feat(realestate-collector): 30-day window + district extraction + completed skip"
Task 5: realestate-lab matcher 5티어 + 자격 점수 재배분
지역 35점(광역 10 + 자치구 가중 25), 자격 25점(첫 자격 15 + 추가 5씩 최대 +10).
Files:
-
Modify:
realestate-lab/app/matcher.py -
Test:
realestate-lab/tests/test_matcher.py -
Step 1: 지역·자격 점수 단위 테스트 작성
realestate-lab/tests/test_matcher.py:
def test_region_score_no_districts_full_when_region_match():
"""자치구 미설정: 광역 일치 시 35점."""
from app.matcher import _region_score
profile = {"preferred_regions": ["서울"], "preferred_districts": {}}
ann = {"region_name": "서울특별시", "district": None}
score, _ = _region_score(profile, ann)
assert score == 35
def test_region_score_no_districts_zero_when_region_mismatch():
from app.matcher import _region_score
profile = {"preferred_regions": ["서울"], "preferred_districts": {}}
ann = {"region_name": "부산광역시", "district": None}
score, _ = _region_score(profile, ann)
assert score == 0
def test_region_score_s_tier_district():
"""광역 매칭 + S티어 자치구: 10 + 25 = 35."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "강남구"}
score, _ = _region_score(profile, ann)
assert score == 35
def test_region_score_a_tier_district():
"""광역 매칭 + A티어 자치구: 10 + 20 = 30."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": ["송파구"], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "송파구"}
score, _ = _region_score(profile, ann)
assert score == 30
def test_region_score_d_tier_district():
"""광역 매칭 + D티어 자치구: 10 + 5 = 15."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": [], "A": [], "B": [], "C": [], "D": ["도봉구"]},
}
ann = {"region_name": "서울특별시", "district": "도봉구"}
score, _ = _region_score(profile, ann)
assert score == 15
def test_region_score_district_set_but_not_listed():
"""광역 매칭 + 자치구 5티어 어디에도 없음: 10점만."""
from app.matcher import _region_score
profile = {
"preferred_regions": ["서울"],
"preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []},
}
ann = {"region_name": "서울특별시", "district": "강서구"}
score, _ = _region_score(profile, ann)
assert score == 10
def test_eligibility_score_zero_when_empty():
from app.matcher import _eligibility_score
assert _eligibility_score([]) == 0
def test_eligibility_score_one_type_returns_15():
from app.matcher import _eligibility_score
assert _eligibility_score(["일반1순위"]) == 15
def test_eligibility_score_two_types_returns_20():
from app.matcher import _eligibility_score
assert _eligibility_score(["일반1순위", "특별-신혼부부"]) == 20
def test_eligibility_score_caps_at_25():
from app.matcher import _eligibility_score
assert _eligibility_score(["a", "b", "c", "d", "e"]) == 25
- Step 2: 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_matcher.py -v
Expected: 10 failed
- Step 3: matcher.py에 신규 함수 추가
realestate-lab/app/matcher.py 파일 상단 (logger 정의 후, _HOUSE_TYPE_MAP 위)에 추가:
TIER_WEIGHTS = {"S": 1.00, "A": 0.80, "B": 0.60, "C": 0.40, "D": 0.20}
def _region_score(profile: Dict[str, Any], ann: Dict[str, Any]) -> tuple[int, list[str]]:
"""지역 점수 계산. 광역 10점 + 자치구 5티어 가중치 0~25점.
자치구 기준 미설정 시 광역 매칭만으로 35점 풀 점수(기존 호환).
"""
region_name = ann.get("region_name") or ""
district = ann.get("district") or ""
preferred_regions = profile.get("preferred_regions") or []
preferred_districts = profile.get("preferred_districts") or {}
region_match = bool(region_name and any(r in region_name for r in preferred_regions))
if not region_match:
return 0, []
has_districts = any(preferred_districts.get(t) for t in TIER_WEIGHTS)
if not has_districts:
return 35, [f"선호 지역 일치: {region_name}"]
score = 10
reasons = [f"광역 일치: {region_name}"]
for tier, weight in TIER_WEIGHTS.items():
if district and district in (preferred_districts.get(tier) or []):
tier_score = round(25 * weight)
score += tier_score
reasons.append(f"자치구 {tier}티어: {district} (+{tier_score})")
break
return score, reasons
def _eligibility_score(eligible_types: List[str]) -> int:
"""자격 점수 0~25. 첫 자격 15점 + 추가 자격당 5점, 최대 +10."""
if not eligible_types:
return 0
return 15 + min((len(eligible_types) - 1) * 5, 10)
- Step 4: _compute_score 본문 교체 — 새 함수 호출 통합
realestate-lab/app/matcher.py의 _compute_score 함수 본문을 다음과 같이 교체:
def _compute_score(
profile: Dict[str, Any],
ann: Dict[str, Any],
models: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""매칭 점수(0-100)와 사유를 계산한다.
배분: 지역 35 / 유형 10 / 면적 15 / 가격 15 / 자격 25.
"""
score = 0
reasons: List[str] = []
# 1. 지역 (35점) — 광역 + 자치구 5티어
region_score, region_reasons = _region_score(profile, ann)
score += region_score
reasons.extend(region_reasons)
# 2. 주택유형 (10점) — binary
preferred_types = profile.get("preferred_types") or []
house_secd = ann.get("house_secd") or ""
type_name = _HOUSE_TYPE_MAP.get(house_secd, house_secd)
if type_name and type_name in preferred_types:
score += 10
reasons.append(f"선호 유형 일치: {type_name}")
# 3. 면적 (15점) — binary, 범위 안 모델 1개라도 있으면 통과
min_area = profile.get("min_area")
max_area = profile.get("max_area")
if min_area is not None and max_area is not None and models:
for m in models:
supply_area = m.get("supply_area")
if supply_area is not None and min_area <= supply_area <= max_area:
score += 15
reasons.append(f"희망 면적 범위 내 모델 존재 ({supply_area}㎡)")
break
# 4. 가격 (15점) — binary, 예산 이하 모델 1개라도 있으면 통과
max_price = profile.get("max_price")
if max_price is not None and models:
for m in models:
top_amount = m.get("top_amount")
if top_amount is not None and top_amount <= max_price:
score += 15
reasons.append(f"예산 범위 내 모델 존재 (최고가 {top_amount:,}만원)")
break
# 5. 자격 (25점) — 첫 자격 15 + 추가당 5
eligible_types = _check_eligible_types(profile, ann)
elig_score = _eligibility_score(eligible_types)
if elig_score > 0:
score += elig_score
reasons.append(f"자격 유형 {len(eligible_types)}개: {', '.join(eligible_types)}")
return {
"match_score": score,
"match_reasons": reasons,
"eligible_types": eligible_types,
}
- Step 5: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_matcher.py -v
Expected: 10 passed
- Step 6: 커밋
git add realestate-lab/app/matcher.py realestate-lab/tests/test_matcher.py
git commit -m "feat(realestate-matcher): 5-tier district weighting + eligibility curve"
Task 6: realestate-lab Profile API 확장
ProfileUpdate Pydantic 모델에 3 필드 추가. models.py만 수정하면 main.py 흐름은 자동 반영(이미 PROFILE_COLUMNS 기반).
Files:
-
Modify:
realestate-lab/app/models.py -
Test:
realestate-lab/tests/test_profile_api.py -
Step 1: API 통합 테스트 작성
realestate-lab/tests/test_profile_api.py:
from fastapi.testclient import TestClient
def test_profile_update_accepts_new_fields():
from app.main import app
client = TestClient(app)
body = {
"name": "테스트",
"preferred_districts": {
"S": ["강남구", "서초구"],
"A": ["송파구"],
"B": [],
"C": [],
"D": [],
},
"min_match_score": 75,
"notify_enabled": True,
}
resp = client.put("/api/realestate/profile", json=body)
assert resp.status_code == 200
data = resp.json()
assert data["preferred_districts"]["S"] == ["강남구", "서초구"]
assert data["min_match_score"] == 75
assert data["notify_enabled"] is True
def test_profile_get_returns_defaults_for_new_fields():
from app.main import app
from app.db import upsert_profile
upsert_profile({"name": "기본"})
client = TestClient(app)
resp = client.get("/api/realestate/profile")
assert resp.status_code == 200
data = resp.json()
assert data["preferred_districts"] == {}
assert data["min_match_score"] == 70
assert data["notify_enabled"] is True
- Step 2: 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_profile_api.py -v
Expected: 2 failed (모델에 필드 없음 → 422 또는 dict 직렬화 실패)
- Step 3: ProfileUpdate 모델에 필드 추가
realestate-lab/app/models.py 파일의 import 변경:
from typing import Optional, List, Dict
from pydantic import BaseModel, Field
ProfileUpdate 클래스 끝에 필드 추가:
class ProfileUpdate(BaseModel):
# ... 기존 필드 그대로
name: Optional[str] = None
age: Optional[int] = None
is_homeless: Optional[bool] = None
is_householder: Optional[bool] = None
subscription_months: Optional[int] = None
subscription_amount: Optional[int] = None
family_members: Optional[int] = None
has_dependents: Optional[bool] = None
children_count: Optional[int] = None
is_newlywed: Optional[bool] = None
marriage_months: Optional[int] = None
has_newborn: Optional[bool] = None
is_first_home: Optional[bool] = None
income_level: Optional[str] = None
preferred_regions: Optional[List[str]] = None
preferred_types: Optional[List[str]] = None
min_area: Optional[float] = None
max_area: Optional[float] = None
max_price: Optional[int] = None
# 신규
preferred_districts: Optional[Dict[str, List[str]]] = None
min_match_score: Optional[int] = Field(default=None, ge=0, le=100)
notify_enabled: Optional[bool] = None
- Step 4: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_profile_api.py -v
Expected: 2 passed
- Step 5: 커밋
git add realestate-lab/app/models.py realestate-lab/tests/test_profile_api.py
git commit -m "feat(realestate-profile): expose 5tier districts + min_match_score + notify_enabled"
Task 7: realestate-lab notifier.py 신규
agent-office로 push 트리거. 임계값 + notify_enabled 필터 + 멱등 마킹.
Files:
-
Create:
realestate-lab/app/notifier.py -
Create:
realestate-lab/tests/test_notifier.py -
Step 1: notifier 단위 테스트 작성
realestate-lab/tests/test_notifier.py:
from unittest.mock import patch, MagicMock
def _seed_profile_and_match(score, notify_enabled=True, threshold=70):
from app.db import _conn, upsert_profile
upsert_profile({
"name": "u",
"notify_enabled": notify_enabled,
"min_match_score": threshold,
})
with _conn() as conn:
conn.execute("""
INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, source)
VALUES ('NF1', '01', '단지', '청약중', 'manual')
""")
ann_id = conn.execute("SELECT id FROM announcements WHERE house_manage_no='NF1'").fetchone()["id"]
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, NULL, ?, '[]', '[]', 1)
""", (ann_id, score))
match_id = conn.execute("SELECT id FROM match_results WHERE announcement_id=?", (ann_id,)).fetchone()["id"]
return match_id
def test_notify_skips_when_disabled():
from app import notifier
_seed_profile_and_match(score=80, notify_enabled=False)
with patch.object(notifier, "requests") as r:
result = notifier.notify_new_matches()
assert r.post.call_count == 0
assert result["sent"] == 0
assert result.get("skipped") == "notify_disabled"
def test_notify_filters_below_threshold():
from app import notifier
_seed_profile_and_match(score=60, threshold=70)
with patch.object(notifier, "requests") as r:
result = notifier.notify_new_matches()
assert r.post.call_count == 0
assert result["sent"] == 0
def test_notify_pushes_and_marks_notified():
from app import notifier
from app.db import _conn
match_id = _seed_profile_and_match(score=80, threshold=70)
fake_resp = MagicMock()
fake_resp.json.return_value = {"sent": 1, "sent_ids": [match_id]}
fake_resp.raise_for_status.return_value = None
with patch.object(notifier.requests, "post", return_value=fake_resp) as post:
result = notifier.notify_new_matches()
assert post.call_count == 1
args, kwargs = post.call_args
assert "/api/agent-office/realestate/notify" in args[0]
assert kwargs["json"]["matches"][0]["id"] == match_id
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is not None
assert result["sent"] == 1
def test_notify_does_not_mark_on_failure():
from app import notifier
from app.db import _conn
import requests as real_requests
match_id = _seed_profile_and_match(score=80, threshold=70)
def boom(*a, **k):
raise real_requests.RequestException("agent-office down")
with patch.object(notifier.requests, "post", side_effect=boom):
result = notifier.notify_new_matches()
with _conn() as conn:
row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone()
assert row["notified_at"] is None
assert result["sent"] == 0
assert "error" in result
- Step 2: 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_notifier.py -v
Expected: 4 failed (모듈 없음)
- Step 3: notifier.py 작성
realestate-lab/app/notifier.py:
"""신규 매칭을 agent-office로 push하여 텔레그램 알림을 트리거한다."""
import os
import logging
import requests
from .db import get_profile, get_unnotified_matches, mark_matches_notified
logger = logging.getLogger("realestate-lab")
AGENT_OFFICE_URL = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000")
NOTIFY_TIMEOUT_SECONDS = int(os.getenv("REALESTATE_NOTIFY_TIMEOUT", "15"))
def notify_new_matches() -> dict:
"""프로필의 임계값을 통과한 미알림 매칭을 agent-office로 push한다.
응답이 200이고 sent_ids가 비어있지 않으면 해당 IDs의 notified_at을 마킹.
실패 시 마킹하지 않아 다음 사이클에서 재시도된다.
"""
profile = get_profile()
if not profile:
return {"sent": 0, "skipped": "no_profile"}
if not profile.get("notify_enabled"):
return {"sent": 0, "skipped": "notify_disabled"}
threshold = profile.get("min_match_score") or 70
matches = get_unnotified_matches(threshold)
if not matches:
return {"sent": 0}
url = f"{AGENT_OFFICE_URL}/api/agent-office/realestate/notify"
try:
resp = requests.post(url, json={"matches": matches}, timeout=NOTIFY_TIMEOUT_SECONDS)
resp.raise_for_status()
body = resp.json()
except requests.RequestException as e:
logger.error("agent-office push 실패: %s", e)
return {"sent": 0, "error": str(e)}
sent_ids = body.get("sent_ids") or []
if sent_ids:
mark_matches_notified(sent_ids)
logger.info("알림 송신: %d건", len(sent_ids))
return body
- Step 4: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_notifier.py -v
Expected: 4 passed
- Step 5: 커밋
git add realestate-lab/app/notifier.py realestate-lab/tests/test_notifier.py
git commit -m "feat(realestate-notifier): push unnotified matches to agent-office"
Task 8: realestate-lab scheduled_collect 흐름 통합
scheduled_collect에 정리 + notifier 호출 추가.
Files:
-
Modify:
realestate-lab/app/main.py -
Test:
realestate-lab/tests/test_scheduled_flow.py -
Step 1: 흐름 통합 테스트 작성
realestate-lab/tests/test_scheduled_flow.py:
from unittest.mock import patch
def test_scheduled_collect_calls_cleanup_and_notifier():
from app import main as app_main
calls = []
def fake_collect():
calls.append("collect")
return {"new_count": 0, "total_count": 0}
def fake_cleanup(grace_days=90):
calls.append(("cleanup", grace_days))
return 0
def fake_match():
calls.append("match")
def fake_notify():
calls.append("notify")
return {"sent": 0}
with patch.object(app_main, "collect_all", side_effect=fake_collect), \
patch.object(app_main, "delete_old_completed_announcements", side_effect=fake_cleanup), \
patch.object(app_main, "run_matching", side_effect=fake_match), \
patch.object(app_main, "notify_new_matches", side_effect=fake_notify):
app_main.scheduled_collect()
assert calls == ["collect", ("cleanup", 90), "match", "notify"]
- Step 2: 테스트 실행 — 실패 확인
Run: cd realestate-lab && python -m pytest tests/test_scheduled_flow.py -v
Expected: 1 failed (notify_new_matches/delete_old_completed import 안 됨)
- Step 3: main.py 수정
realestate-lab/app/main.py의 import 블록에 추가:
from .db import (
init_db, get_announcements, get_announcement, create_announcement,
update_announcement, delete_announcement, delete_closed_announcements, toggle_bookmark,
update_all_statuses,
get_profile, upsert_profile, get_matches, mark_match_read,
get_last_collect_log, get_dashboard,
delete_old_completed_announcements, # NEW
)
from .collector import collect_all
from .matcher import run_matching
from .notifier import notify_new_matches # NEW
from .models import AnnouncementCreate, AnnouncementUpdate, ProfileUpdate
scheduled_collect 함수 수정:
기존:
def scheduled_collect():
"""매일 09:00 — 수집 + 매칭"""
logger.info("스케줄 수집 시작")
collect_all()
run_matching()
logger.info("스케줄 수집 + 매칭 완료")
변경:
def scheduled_collect():
"""매일 09:00 — 수집 + 정리 + 매칭 + 알림 push"""
logger.info("스케줄 수집 시작")
collect_all()
deleted = delete_old_completed_announcements(grace_days=90)
if deleted:
logger.info("정리: %d건 삭제", deleted)
run_matching()
notify_new_matches()
logger.info("스케줄 수집 + 매칭 + 알림 완료")
_run_collect_and_match도 통일성을 위해 같은 흐름 적용:
기존:
def _run_collect_and_match():
if not _collect_lock.acquire(blocking=False):
logger.info("수집 이미 진행 중 — 건너뜀")
return
try:
collect_all()
run_matching()
finally:
_collect_lock.release()
변경:
def _run_collect_and_match():
if not _collect_lock.acquire(blocking=False):
logger.info("수집 이미 진행 중 — 건너뜀")
return
try:
collect_all()
delete_old_completed_announcements(grace_days=90)
run_matching()
notify_new_matches()
finally:
_collect_lock.release()
- Step 4: 테스트 실행 — 통과 확인
Run: cd realestate-lab && python -m pytest tests/test_scheduled_flow.py -v
Expected: 1 passed
- Step 5: 전체 회귀 검증
Run: cd realestate-lab && python -m pytest tests/ -v
Expected: 모든 테스트 통과
- Step 6: 커밋
git add realestate-lab/app/main.py realestate-lab/tests/test_scheduled_flow.py
git commit -m "feat(realestate): wire cleanup + notifier into scheduled flow"
Task 9: agent-office 텔레그램 fmt + messaging 헬퍼
청약 매칭 메시지 포맷터(묶음/풀 카드) + 인라인 키보드 빌더 + 송신 헬퍼.
Files:
-
Create:
agent-office/app/telegram/realestate_message.py -
Create:
agent-office/tests/test_realestate_message.py -
Step 1: 테스트 작성
agent-office/tests/test_realestate_message.py:
def test_format_realestate_match_full_card_single():
from app.telegram.realestate_message import format_realestate_matches
matches = [{
"id": 1,
"match_score": 90,
"house_nm": "디에이치 강남",
"region_name": "서울특별시",
"district": "강남구",
"is_speculative_area": "Y",
"is_price_cap": "Y",
"receipt_start": "2026-05-15",
"receipt_end": "2026-05-19",
"match_reasons": ["광역 일치", "자치구 S티어: 강남구 (+25)", "예산 범위"],
"eligible_types": ["일반1순위", "특별-신혼부부"],
"pblanc_url": "https://example.com/p/1",
}]
text = format_realestate_matches(matches)
assert "디에이치 강남" in text
assert "90점" in text
assert "강남구" in text
assert "2026-05-15" in text
def test_format_realestate_match_compact_when_three_or_more():
from app.telegram.realestate_message import format_realestate_matches
matches = [
{"id": i, "match_score": 90 - i, "house_nm": f"단지{i}", "district": "강남구",
"region_name": "서울특별시", "receipt_start": "2026-05-15", "receipt_end": "2026-05-19",
"match_reasons": [], "eligible_types": [], "pblanc_url": ""}
for i in range(3)
]
text = format_realestate_matches(matches)
assert "3건" in text or "3" in text
for i in range(3):
assert f"단지{i}" in text
def test_build_keyboard_single_match_has_bookmark_and_url():
from app.telegram.realestate_message import build_match_keyboard
matches = [{"id": 42, "pblanc_url": "https://example.com/p/42"}]
kb = build_match_keyboard(matches)
rows = kb["inline_keyboard"]
flat = [b for row in rows for b in row]
assert any(b.get("callback_data", "").startswith("realestate_bookmark_42") for b in flat)
assert any(b.get("url") == "https://example.com/p/42" for b in flat)
def test_build_keyboard_multi_matches_uses_dashboard_link():
from app.telegram.realestate_message import build_match_keyboard
matches = [{"id": i, "pblanc_url": ""} for i in range(3)]
kb = build_match_keyboard(matches)
flat = [b for row in kb["inline_keyboard"] for b in row]
# 3건 이상이면 [전체 보기] 단일 URL 버튼
assert any("전체" in b.get("text", "") for b in flat)
def test_build_keyboard_empty_returns_none():
from app.telegram.realestate_message import build_match_keyboard
assert build_match_keyboard([]) is None
- Step 2: 테스트 실행 — 실패 확인
Run: cd agent-office && python -m pytest tests/test_realestate_message.py -v
Expected: 5 failed (모듈 없음)
- Step 3: realestate_message.py 작성
agent-office/app/telegram/realestate_message.py:
"""청약 매칭 알림 — 텔레그램 메시지 포맷터 + 인라인 키보드 빌더."""
import os
from html import escape as _h
from typing import Optional
DASHBOARD_URL = os.getenv("REALESTATE_DASHBOARD_URL", "https://example.com/realestate")
_TIER_BADGE = {"S": "S", "A": "A", "B": "B", "C": "C", "D": "D"}
def _format_one_compact(m: dict) -> str:
score = m.get("match_score", 0)
name = _h(m.get("house_nm") or "(제목 없음)")
district = m.get("district") or ""
region = m.get("region_name") or ""
where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상"
rstart = m.get("receipt_start") or ""
rend = m.get("receipt_end") or ""
return (
f"⭐ {score}점 — <b>{name}</b>\n"
f"📍 {_h(where)} 📅 {_h(rstart)} ~ {_h(rend)}"
)
def _format_one_full(m: dict) -> str:
score = m.get("match_score", 0)
name = _h(m.get("house_nm") or "(제목 없음)")
district = m.get("district") or ""
region = m.get("region_name") or ""
flags = []
if m.get("is_speculative_area") == "Y":
flags.append("투기과열")
if m.get("is_price_cap") == "Y":
flags.append("분양가상한제")
flag_str = f" ({', '.join(flags)})" if flags else ""
rstart = m.get("receipt_start") or ""
rend = m.get("receipt_end") or ""
elig = m.get("eligible_types") or []
reasons = m.get("match_reasons") or []
where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상"
lines = [
f"⭐ {score}점 — <b>{name}</b>",
f"📍 {_h(where)}{_h(flag_str)}",
f"📅 청약 {_h(rstart)} ~ {_h(rend)}",
]
if elig:
lines.append(f"✓ 자격: {_h(', '.join(elig))}")
if reasons:
lines.append(f"💡 {_h(' / '.join(reasons[:4]))}")
return "\n".join(lines)
def format_realestate_matches(matches: list[dict]) -> str:
"""매칭 목록을 텔레그램 HTML 메시지로 변환.
1~2건은 풀 카드, 3건 이상은 묶음 카드(상위 5건).
"""
if not matches:
return "🏢 새 청약 매칭이 없습니다."
if len(matches) <= 2:
body = "\n\n".join(_format_one_full(m) for m in matches)
return f"🏢 <b>새 청약 매칭 {len(matches)}건</b>\n━━━━━━━━━━\n\n{body}"
top = matches[:5]
body = "\n\n".join(_format_one_compact(m) for m in top)
suffix = f"\n\n…외 {len(matches) - 5}건" if len(matches) > 5 else ""
return f"🏢 <b>새 청약 매칭 {len(matches)}건</b>\n━━━━━━━━━━\n\n{body}{suffix}"
def build_match_keyboard(matches: list[dict]) -> Optional[dict]:
"""1~2건: 매치별 [북마크][공고 보기] 행. 3건 이상: [전체 보기] 단일 행."""
if not matches:
return None
if len(matches) <= 2:
rows = []
for m in matches:
buttons = [{
"text": "🔖 북마크",
"callback_data": f"realestate_bookmark_{m['id']}",
}]
url = m.get("pblanc_url")
if url:
buttons.append({"text": "📄 공고 보기", "url": url})
rows.append(buttons)
return {"inline_keyboard": rows}
return {
"inline_keyboard": [[
{"text": "📋 전체 보기", "url": DASHBOARD_URL},
]],
}
- Step 4: 테스트 실행 — 통과 확인
Run: cd agent-office && python -m pytest tests/test_realestate_message.py -v
Expected: 5 passed
- Step 5: 커밋
git add agent-office/app/telegram/realestate_message.py agent-office/tests/test_realestate_message.py
git commit -m "feat(agent-office-telegram): realestate match formatter + keyboard"
Task 10: agent-office RealestateAgent.on_new_matches + endpoint
신규 메소드 추가, endpoint 추가, on_schedule는 폐기 표시(cron 등록 빠지면 호출 안 됨, fetch_matches 명령은 on_new_matches 직접 호출로 단순화).
Files:
-
Modify:
agent-office/app/agents/realestate.py -
Modify:
agent-office/app/main.py(endpoint 추가) -
Test:
agent-office/tests/test_realestate_agent.py -
Step 1: 에이전트 + endpoint 테스트 작성
agent-office/tests/test_realestate_agent.py:
import os
import sys
import tempfile
_TMP = tempfile.mktemp(suffix=".db")
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
@pytest.fixture(autouse=True)
def _init_db():
if os.path.exists(_TMP):
os.remove(_TMP)
from app.db import init_db
init_db()
yield
def test_on_new_matches_returns_empty_when_no_matches():
from app.agents.realestate import RealestateAgent
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches([]))
assert result == {"sent": 0, "sent_ids": []}
def test_on_new_matches_sends_telegram_and_returns_ids():
from app.agents.realestate import RealestateAgent
from app.telegram import messaging
matches = [{
"id": 7, "match_score": 80, "house_nm": "단지A",
"region_name": "서울특별시", "district": "강남구",
"receipt_start": "2026-05-01", "receipt_end": "2026-05-05",
"match_reasons": [], "eligible_types": [], "pblanc_url": "https://x.test/7",
}]
fake_send = AsyncMock(return_value={"ok": True, "message_id": 123})
with patch.object(messaging, "send_raw", fake_send):
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches(matches))
assert result["sent"] == 1
assert result["sent_ids"] == [7]
assert result["message_id"] == 123
fake_send.assert_awaited_once()
args, kwargs = fake_send.call_args
text = args[0]
assert "단지A" in text
def test_on_new_matches_telegram_failure_returns_zero():
from app.agents.realestate import RealestateAgent
from app.telegram import messaging
matches = [{
"id": 8, "match_score": 80, "house_nm": "단지B",
"region_name": "서울", "district": "송파구",
"receipt_start": "", "receipt_end": "",
"match_reasons": [], "eligible_types": [], "pblanc_url": "",
}]
fake_send = AsyncMock(return_value={"ok": False, "description": "401"})
with patch.object(messaging, "send_raw", fake_send):
agent = RealestateAgent()
result = asyncio.run(agent.on_new_matches(matches))
assert result["sent"] == 0
assert result["sent_ids"] == []
assert "error" in result
def test_endpoint_calls_agent_on_new_matches():
from fastapi.testclient import TestClient
from app.main import app
from app.agents.realestate import RealestateAgent
fake = AsyncMock(return_value={"sent": 1, "sent_ids": [99], "message_id": 1})
with patch.object(RealestateAgent, "on_new_matches", fake):
client = TestClient(app)
resp = client.post(
"/api/agent-office/realestate/notify",
json={"matches": [{"id": 99, "match_score": 80}]},
)
assert resp.status_code == 200
body = resp.json()
assert body["sent"] == 1
assert body["sent_ids"] == [99]
- Step 2: 테스트 실행 — 실패 확인
Run: cd agent-office && python -m pytest tests/test_realestate_agent.py -v
Expected: 4 failed (메소드/endpoint 없음)
- Step 3: agents/realestate.py 수정 — on_new_matches 추가, on_schedule 단순화
agent-office/app/agents/realestate.py 전체 파일을 다음 내용으로 교체:
from .base import BaseAgent
from ..db import create_task, update_task_status, add_log
from .. import service_proxy
from ..telegram import messaging
from ..telegram.realestate_message import format_realestate_matches, build_match_keyboard
class RealestateAgent(BaseAgent):
"""부동산 청약 에이전트.
realestate-lab이 신규 매칭 발견 시 /realestate/notify로 push해 트리거됨.
on_new_matches가 메인 진입점. on_schedule은 사용하지 않음(cron 폐기).
"""
agent_id = "realestate"
display_name = "청약 애널리스트"
async def on_new_matches(self, matches: list[dict]) -> dict:
"""신규 매칭 N건을 텔레그램 1통으로 푸시.
성공 시 sent_ids 반환 → realestate-lab이 notified_at 마킹.
실패 시 sent=0, sent_ids=[] 반환 → 다음 사이클 재시도.
"""
if not matches:
return {"sent": 0, "sent_ids": []}
task_id = create_task(self.agent_id, "notify_matches", {"count": len(matches)})
try:
text = format_realestate_matches(matches)
keyboard = build_match_keyboard(matches)
await self.transition("reporting", f"매칭 {len(matches)}건 알림", task_id)
tg = await messaging.send_raw(text, reply_markup=keyboard)
if not tg.get("ok"):
update_task_status(task_id, "failed", {"error": tg.get("description")})
await self.transition("idle", "알림 실패")
return {"sent": 0, "sent_ids": [], "error": tg.get("description")}
sent_ids = [m["id"] for m in matches if "id" in m]
update_task_status(task_id, "succeeded", {
"sent": len(matches),
"telegram_message_id": tg.get("message_id"),
})
await self.transition("idle", f"매칭 {len(matches)}건 알림 완료")
return {
"sent": len(matches),
"sent_ids": sent_ids,
"message_id": tg.get("message_id"),
}
except Exception as e:
add_log(self.agent_id, f"on_new_matches failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}")
return {"sent": 0, "sent_ids": [], "error": str(e)}
async def on_command(self, command: str, params: dict) -> dict:
if command == "fetch_matches":
try:
matches = await service_proxy.realestate_matches(limit=20)
if not matches:
return {"ok": True, "message": "매칭 없음"}
result = await self.on_new_matches(matches)
return {"ok": True, "result": result}
except Exception as e:
return {"ok": False, "message": str(e)}
if command == "dashboard":
try:
data = await service_proxy.realestate_dashboard()
return {"ok": True, "dashboard": data}
except Exception as e:
return {"ok": False, "message": str(e)}
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
pass
- Step 4: main.py에 endpoint 추가
agent-office/app/main.py를 열고 다른 라우트들이 정의된 곳에 다음 endpoint를 추가한다. 정확한 위치는 다른 /api/agent-office/... 경로 근처. 만약 RealestateAgent가 registry로 관리된다면 그 패턴을 따르고, 아니면 직접 인스턴스화.
먼저 agent dispatch 패턴을 확인해야 한다. agent-office/app/main.py에서 다른 에이전트가 어떻게 호출되는지 본다 (예: stock 에이전트의 명령 처리). 그 패턴에 맞춰 RealestateAgent 인스턴스를 얻고 on_new_matches 호출.
다음 엔드포인트를 추가:
from .agents.realestate import RealestateAgent
from pydantic import BaseModel
from typing import List, Dict, Any
class RealestateNotifyBody(BaseModel):
matches: List[Dict[str, Any]]
_realestate_agent_singleton: RealestateAgent | None = None
def _get_realestate_agent() -> RealestateAgent:
global _realestate_agent_singleton
if _realestate_agent_singleton is None:
_realestate_agent_singleton = RealestateAgent()
return _realestate_agent_singleton
@app.post("/api/agent-office/realestate/notify")
async def realestate_notify(body: RealestateNotifyBody):
agent = _get_realestate_agent()
return await agent.on_new_matches(body.matches)
⚠️ 구현 시 검증: agent-office가 이미 에이전트 인스턴스 registry/dispatcher를 가지고 있다면 (
agents/__init__.py또는main.py상단), 그 패턴을 따르고 위의 singleton 패턴은 사용하지 않는다. 기존 패턴이 없을 때만 위 코드 그대로 사용. 다른 에이전트(stock/music/lotto)의 인스턴스 관리 방식을 먼저 grep해서 확인할 것:cd agent-office && grep -rn "StockAgent\|MusicAgent\|LottoAgent" app/main.py app/agents/__init__.py.
- Step 5: 테스트 실행 — 통과 확인
Run: cd agent-office && python -m pytest tests/test_realestate_agent.py -v
Expected: 4 passed
- Step 6: 커밋
git add agent-office/app/agents/realestate.py agent-office/app/main.py agent-office/tests/test_realestate_agent.py
git commit -m "feat(agent-office): realestate on_new_matches + /notify endpoint"
Task 11: agent-office scheduler에서 realestate cron 제거 + 콜백 라우팅
데일리 09:15 cron 등록 제거. 인라인 키보드 콜백(realestate_bookmark_*)을 텔레그램 webhook에서 처리해 realestate-lab으로 프록시.
Files:
-
Modify:
agent-office/app/scheduler.py -
Modify:
agent-office/app/telegram/webhook.py(또는 callback 라우팅 위치) -
Modify:
agent-office/app/service_proxy.py(북마크 토글 헬퍼 추가) -
Step 1: scheduler에서 realestate cron 제거
agent-office/app/scheduler.py를 열어 realestate 관련 add_job 호출을 찾는다 (예: realestate_agent.on_schedule 등록부). 해당 라인을 제거하고 import도 정리.
찾는 패턴 예시:
scheduler.add_job(realestate_agent.on_schedule, "cron", hour=9, minute=15, id="realestate_daily")
이 라인과 관련 import를 삭제한다. 만약 RealestateAgent import가 다른 곳에서 안 쓰이면 import도 함께 제거.
- Step 2: 콜백 핸들러 위치 확인
Run: cd agent-office && grep -rn "callback_query\|callback_data" app/telegram/
다음을 확인:
-
webhook.py또는router.py에서callback_query를 받아 처리하는 함수 -
현재 패턴(예:
approve_*,reject_*콜백 처리 방식) -
Step 3: service_proxy에 북마크 토글 추가
agent-office/app/service_proxy.py 끝에 추가:
async def realestate_bookmark_toggle(announcement_id: int) -> dict:
"""realestate-lab의 PATCH /api/realestate/announcements/{id}/bookmark 호출."""
url = f"{REALESTATE_LAB_URL}/api/realestate/announcements/{announcement_id}/bookmark"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.patch(url)
resp.raise_for_status()
return resp.json()
(상단의 REALESTATE_LAB_URL, httpx import는 이미 service_proxy에 있을 것이므로 신규 환경변수만 docker-compose의 agent-office 환경에 노출. 만약 없다면 다음 라인을 service_proxy 상단에 추가:)
REALESTATE_LAB_URL = os.getenv("REALESTATE_LAB_URL", "http://realestate-lab:8000")
- Step 4: 콜백 라우팅에 realestate_bookmark 처리 추가
webhook 또는 router의 callback_query 디스패처에 다음 분기를 추가 (정확한 함수명은 step 2에서 확인한 위치에 맞춤):
if data.startswith("realestate_bookmark_"):
try:
ann_id = int(data.removeprefix("realestate_bookmark_"))
except ValueError:
return {"ok": False, "message": "잘못된 콜백 데이터"}
try:
await service_proxy.realestate_bookmark_toggle(ann_id)
await messaging.send_raw(f"🔖 북마크 토글 완료 (#{ann_id})")
return {"ok": True}
except Exception as e:
await messaging.send_raw(f"⚠️ 북마크 실패: {e}")
return {"ok": False, "error": str(e)}
⚠️ 위치 적응 필요: step 2에서 확인한 콜백 디스패처의 기존 분기 스타일에 맞춰 통합. 기존 분기가
if data.startswith("approve_"): ... elif data.startswith("reject_"): ...라면elif로 추가.
- Step 5: 콜백 라우팅 단위 테스트
agent-office/tests/test_realestate_callback.py:
import os
import sys
import tempfile
from unittest.mock import AsyncMock, patch
_TMP = tempfile.mktemp(suffix=".db")
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def test_callback_realestate_bookmark_calls_proxy():
"""callback_data 'realestate_bookmark_42' 가 service_proxy.realestate_bookmark_toggle(42) 를 호출."""
import asyncio
from app import service_proxy
from app.telegram import webhook # 실제 모듈명에 맞춰 수정 필요
fake = AsyncMock(return_value={"ok": True})
with patch.object(service_proxy, "realestate_bookmark_toggle", fake):
# webhook의 callback dispatcher를 직접 호출. 정확한 함수 시그니처는 step 2 확인 결과로 채움.
# 예시: await webhook.handle_callback({"data": "realestate_bookmark_42", ...})
result = asyncio.run(webhook.handle_callback({"data": "realestate_bookmark_42", "from": {"id": 1}, "id": "cb1"}))
fake.assert_awaited_once_with(42)
⚠️ 테스트 시그니처 적응: step 2에서 확인한 콜백 핸들러의 정확한 함수명·인자에 맞춰 수정. 이 테스트는 realistic stub이며 실제 webhook 모듈 구조에 맞게 업데이트해야 통과.
- Step 6: 회귀 검증
Run: cd agent-office && python -m pytest tests/ -v
Expected: 모든 기존 테스트 통과 + 신규 추가분 통과
- Step 7: 커밋
git add agent-office/app/scheduler.py agent-office/app/telegram/ agent-office/app/service_proxy.py agent-office/tests/test_realestate_callback.py
git commit -m "feat(agent-office): drop daily realestate cron + bookmark callback routing"
Task 12: docker-compose 환경변수 + 통합 검증
AGENT_OFFICE_URL을 realestate-lab 환경에 추가, REALESTATE_LAB_URL이 agent-office에 없으면 추가.
Files:
-
Modify:
docker-compose.yml -
Modify:
.env.example -
Step 1: docker-compose.yml의 realestate-lab 서비스에 AGENT_OFFICE_URL 추가
docker-compose.yml에서 realestate-lab 서비스 정의를 찾고, environment: 블록에 추가:
realestate-lab:
# ... 기존
environment:
# ... 기존
AGENT_OFFICE_URL: http://agent-office:8000
- Step 2: docker-compose.yml의 agent-office 서비스에 REALESTATE_LAB_URL 추가 (없는 경우)
agent-office 서비스 environment에:
agent-office:
# ... 기존
environment:
# ... 기존
REALESTATE_LAB_URL: http://realestate-lab:8000
REALESTATE_DASHBOARD_URL: ${REALESTATE_DASHBOARD_URL:-http://localhost:8080/realestate}
- Step 3: .env.example에 신규 변수 명시
.env.example 파일 끝에 추가 (이미 있는 변수와 중복 없는지 확인):
# 청약 알림 — agent-office push
AGENT_OFFICE_URL=http://agent-office:8000
REALESTATE_LAB_URL=http://realestate-lab:8000
REALESTATE_DASHBOARD_URL=http://localhost:8080/realestate
REALESTATE_NOTIFY_TIMEOUT=15
- Step 4: 두 서비스 전체 회귀 검증
Run:
cd realestate-lab && python -m pytest tests/ -v
cd ../agent-office && python -m pytest tests/ -v
Expected: 모든 테스트 통과
- Step 5: 운영 배포 sanity check (수동, 선택)
NAS 배포 후 (git push로 자동), 다음 시나리오 수동 검증:
PUT /api/realestate/profilebody에preferred_districts/min_match_score/notify_enabled포함하여 저장 → 200, 응답에 새 필드 반영POST /api/realestate/collect트리거 → 수집 후GET /api/realestate/announcements?region=서울시district필드 포함,완료공고 없음- 매칭 점수 70점 이상이고 미알림인 매치 1건 이상 존재 시 → 텔레그램에 메시지 도착, 인라인 키보드 표시
- 텔레그램 [🔖 북마크] 클릭 → realestate-lab의
is_bookmarked토글 확인 notify_enabled=false로 변경 후POST /collect→ 텔레그램 푸시 미발생
- Step 6: 커밋
git add docker-compose.yml .env.example
git commit -m "chore(deploy): wire realestate↔agent-office URLs for push notify"
완료 기준
- 모든 task의 테스트 통과 (
realestate-lab/tests/,agent-office/tests/) - 회귀 없이 기존 endpoint 동작 유지
- 매칭 점수 모델: 35 + 10 + 15 + 15 + 25 = 100점 일관
- realestate-lab 09:00 cron이
collect → cleanup → match → notify순으로 동작 - agent-office 09:15 데일리 cron 제거됨
- 텔레그램에 신규 매칭 알림 + 인라인 키보드 동작
match_results.notified_at멱등 마킹
참고 — 후속 별도 plan
web-ui프론트 자치구 5티어 입력 UI (별도 frontend plan)- 청약 가점 vs 공고별 예상 커트라인 비교 (외부 데이터 의존성 연구)
- 서울 외 광역(부산 해운대구 등) 자치구 파싱 확장
POST /notifications/resend(임계값 변경 후 재발송)- 자치구별 매칭 분포 대시보드 위젯