# 청약 서비스 타겟팅 고도화 구현 계획 > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** realestate-lab의 수집·매칭을 자치구 5티어 가중치 기반으로 정교화하고, agent-office를 통해 신규 매칭 발견 즉시 텔레그램 푸시 한다. **Architecture:** realestate-lab은 09:00 cron에서 `collect → 정리 → 매칭 → notifier 푸시` 순으로 진행. notifier는 임계값 통과한 미알림 매칭을 모아 agent-office의 `/api/agent-office/realestate/notify` 엔드포인트로 HTTP push. agent-office의 `RealestateAgent.on_new_matches()`가 텔레그램 메시지 fmt + 인라인 키보드 빌드 + 송신 후 `notified_at`를 마킹. 데일리 리포트 cron은 폐기. **Tech Stack:** Python 3.12 / FastAPI / SQLite (WAL) / APScheduler / requests / pytest / unittest.mock / aiogram-style raw Telegram Bot API. **스펙 참조:** `docs/superpowers/specs/2026-04-28-realestate-targeting-enhancement-design.md` --- ## Task 1: realestate-lab 테스트 환경 셋업 (DB 경로 환경변수 + conftest) 테스트가 운영 DB(`/app/data/realestate.db`)를 건드리지 않도록 환경변수로 DB 경로 오버라이드 가능하게 만든다. **Files:** - Modify: `realestate-lab/app/db.py:10` (DB_PATH 환경변수화) - Create: `realestate-lab/tests/__init__.py` - Create: `realestate-lab/tests/conftest.py` - Create: `realestate-lab/tests/test_db_basic.py` (smoke test) - [ ] **Step 1: DB_PATH를 환경변수로 변경** `realestate-lab/app/db.py:10` 수정. 기존: ```python DB_PATH = "/app/data/realestate.db" ``` 변경: ```python import os DB_PATH = os.getenv("REALESTATE_DB_PATH", "/app/data/realestate.db") ``` `os` import는 파일 상단의 import 블록에 추가. - [ ] **Step 2: tests 디렉토리 패키지 표시** `realestate-lab/tests/__init__.py` 생성. 비어 있는 파일. - [ ] **Step 3: conftest.py 생성 — 임시 DB 픽스처** `realestate-lab/tests/conftest.py`: ```python import os import sys import tempfile import pytest # 테스트 임시 DB 경로를 import 전에 주입 _TMP_DB = tempfile.mktemp(suffix=".db") os.environ["REALESTATE_DB_PATH"] = _TMP_DB # app 패키지 import 가능하게 PYTHONPATH 보정 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @pytest.fixture(autouse=True) def _clean_db(): """각 테스트마다 DB 초기화.""" if os.path.exists(_TMP_DB): os.remove(_TMP_DB) from app.db import init_db init_db() yield if os.path.exists(_TMP_DB): os.remove(_TMP_DB) ``` - [ ] **Step 4: smoke test 작성** `realestate-lab/tests/test_db_basic.py`: ```python def test_init_db_creates_tables(): from app.db import _conn with _conn() as conn: tables = {row[0] for row in conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" )} assert "announcements" in tables assert "announcement_models" in tables assert "user_profile" in tables assert "match_results" in tables assert "collect_log" in tables ``` - [ ] **Step 5: 테스트 실행 확인** Run: `cd realestate-lab && python -m pytest tests/ -v` Expected: 1 passed - [ ] **Step 6: 커밋** ```bash git add realestate-lab/app/db.py realestate-lab/tests/__init__.py realestate-lab/tests/conftest.py realestate-lab/tests/test_db_basic.py git commit -m "test(realestate): add pytest harness with isolated SQLite fixture" ``` --- ## Task 2: realestate-lab DB 스키마 마이그레이션 `user_profile`에 3 컬럼, `announcements`에 `district`, `match_results`에 `notified_at` 추가. `init_db()` 안에서 try/except 패턴으로 운영 DB 무중단 마이그레이션. **Files:** - Modify: `realestate-lab/app/db.py` (init_db 내부 ALTER 추가, _profile_row_to_dict 확장, PROFILE_COLUMNS 확장) - Test: `realestate-lab/tests/test_db_migration.py` - [ ] **Step 1: 마이그레이션 단위 테스트 작성 (실패 예상)** `realestate-lab/tests/test_db_migration.py`: ```python def test_user_profile_has_new_columns(): from app.db import _conn with _conn() as conn: cols = {row["name"] for row in conn.execute("PRAGMA table_info(user_profile)")} assert "preferred_districts" in cols assert "min_match_score" in cols assert "notify_enabled" in cols def test_announcements_has_district(): from app.db import _conn with _conn() as conn: cols = {row["name"] for row in conn.execute("PRAGMA table_info(announcements)")} assert "district" in cols def test_match_results_has_notified_at(): from app.db import _conn with _conn() as conn: cols = {row["name"] for row in conn.execute("PRAGMA table_info(match_results)")} assert "notified_at" in cols def test_district_index_exists(): from app.db import _conn with _conn() as conn: idx = {row["name"] for row in conn.execute( "SELECT name FROM sqlite_master WHERE type='index'" )} assert "idx_ann_district" in idx def test_profile_defaults(): from app.db import upsert_profile, get_profile upsert_profile({"name": "테스트"}) profile = get_profile() assert profile["preferred_districts"] == {} assert profile["min_match_score"] == 70 assert profile["notify_enabled"] is True ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_db_migration.py -v` Expected: 5 failed (컬럼 없음, default 미적용) - [ ] **Step 3: announcements 테이블에 district 컬럼 + 인덱스 마이그레이션** `realestate-lab/app/db.py` 의 `init_db()` 안, `idx_ann_region` 인덱스 생성 다음 줄에 추가. 찾을 위치 (현재 코드): ```python conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);") # ── 마이그레이션: is_bookmarked 컬럼 추가 ── ``` 다음과 같이 수정: ```python conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);") # ── 마이그레이션: district 컬럼 + 인덱스 추가 ── try: conn.execute("SELECT district FROM announcements LIMIT 1") except Exception: conn.execute("ALTER TABLE announcements ADD COLUMN district TEXT") conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_district ON announcements(district);") # ── 마이그레이션: is_bookmarked 컬럼 추가 ── ``` - [ ] **Step 4: user_profile에 3 컬럼 마이그레이션** `init_db()`의 user_profile CREATE 문 뒤에 추가. 현재 `match_results` CREATE 문 시작 부분 직전에 끼워 넣음. 찾을 위치: ```python ); """) # ── match_results ──────────────────────────────────────────────── ``` (이 직전이 user_profile CREATE 종료점) 다음과 같이 추가: ```python ); """) # ── 마이그레이션: user_profile 신규 3컬럼 ── for col, ddl in ( ("preferred_districts", "ALTER TABLE user_profile ADD COLUMN preferred_districts TEXT NOT NULL DEFAULT '{}'"), ("min_match_score", "ALTER TABLE user_profile ADD COLUMN min_match_score INTEGER NOT NULL DEFAULT 70"), ("notify_enabled", "ALTER TABLE user_profile ADD COLUMN notify_enabled INTEGER NOT NULL DEFAULT 1"), ): try: conn.execute(f"SELECT {col} FROM user_profile LIMIT 1") except Exception: conn.execute(ddl) # ── match_results ──────────────────────────────────────────────── ``` - [ ] **Step 5: match_results에 notified_at 컬럼 마이그레이션** `init_db()`의 match_results CREATE 직후에 추가. 찾을 위치: ```python UNIQUE(announcement_id, model_id) ); """) # ── collect_log ────────────────────────────────────────────────── ``` 다음과 같이 변경: ```python UNIQUE(announcement_id, model_id) ); """) # ── 마이그레이션: notified_at 컬럼 추가 ── try: conn.execute("SELECT notified_at FROM match_results LIMIT 1") except Exception: conn.execute("ALTER TABLE match_results ADD COLUMN notified_at TEXT") # ── collect_log ────────────────────────────────────────────────── ``` - [ ] **Step 6: PROFILE_COLUMNS와 _profile_row_to_dict, upsert_profile에 신규 필드 처리** `realestate-lab/app/db.py`의 `_profile_row_to_dict` 함수 (현재 파일 라인 ~536)를 다음과 같이 수정: ```python def _profile_row_to_dict(r) -> Dict[str, Any]: d = {} for c in r.keys(): val = r[c] if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed", "has_newborn", "is_first_home", "notify_enabled"): d[c] = bool(val) if val is not None else None elif c in ("preferred_regions", "preferred_types"): d[c] = json.loads(val) if val else [] elif c == "preferred_districts": d[c] = json.loads(val) if val else {} else: d[c] = val return d ``` `PROFILE_COLUMNS` 상수 (현재 라인 ~560)를 다음과 같이 확장: ```python PROFILE_COLUMNS = { "name", "age", "is_homeless", "is_householder", "subscription_months", "subscription_amount", "family_members", "has_dependents", "children_count", "is_newlywed", "marriage_months", "has_newborn", "is_first_home", "income_level", "preferred_regions", "preferred_types", "preferred_districts", "min_area", "max_area", "max_price", "min_match_score", "notify_enabled", } ``` `upsert_profile` 함수 안에서 list 처리 분기는 이미 있으므로, dict 분기를 추가해야 한다. 현재 코드: ```python if isinstance(v, bool): updates[k] = 1 if v else 0 elif isinstance(v, list): updates[k] = json.dumps(v) else: updates[k] = v ``` 다음과 같이 수정: ```python if isinstance(v, bool): updates[k] = 1 if v else 0 elif isinstance(v, (list, dict)): updates[k] = json.dumps(v) else: updates[k] = v ``` - [ ] **Step 7: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_db_migration.py tests/test_db_basic.py -v` Expected: 6 passed - [ ] **Step 8: 커밋** ```bash git add realestate-lab/app/db.py realestate-lab/tests/test_db_migration.py git commit -m "feat(realestate-db): add district / notify / 5tier columns with migration" ``` --- ## Task 3: realestate-lab DB 신규 함수 3종 `delete_old_completed_announcements`, `get_unnotified_matches`, `mark_matches_notified`. **Files:** - Modify: `realestate-lab/app/db.py` (함수 3종 추가) - Test: `realestate-lab/tests/test_db_functions.py` - [ ] **Step 1: 테스트 작성 — delete_old_completed_announcements** `realestate-lab/tests/test_db_functions.py`: ```python import json from datetime import date, timedelta from app.db import _conn def _seed_announcement(house_nm, status, winner_date=None, hmno="HM1", pno="P1"): with _conn() as conn: conn.execute(""" INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, winner_date, source) VALUES (?, ?, ?, ?, ?, 'manual') """, (hmno, pno, house_nm, status, winner_date)) return conn.execute("SELECT id FROM announcements WHERE house_manage_no=?", (hmno,)).fetchone()["id"] def test_delete_old_completed_removes_expired(): from app.db import delete_old_completed_announcements old = (date.today() - timedelta(days=100)).isoformat() _seed_announcement("OldA", "완료", old, hmno="OLD", pno="1") deleted = delete_old_completed_announcements(grace_days=90) assert deleted == 1 def test_delete_old_completed_keeps_recent(): from app.db import delete_old_completed_announcements recent = (date.today() - timedelta(days=30)).isoformat() _seed_announcement("RecentA", "완료", recent, hmno="REC", pno="1") deleted = delete_old_completed_announcements(grace_days=90) assert deleted == 0 def test_delete_old_completed_keeps_active(): from app.db import delete_old_completed_announcements old = (date.today() - timedelta(days=200)).isoformat() _seed_announcement("ActiveA", "청약중", old, hmno="ACT", pno="1") deleted = delete_old_completed_announcements(grace_days=90) assert deleted == 0 def test_delete_old_completed_keeps_null_winner_date(): from app.db import delete_old_completed_announcements _seed_announcement("NullA", "완료", None, hmno="NULL", pno="1") deleted = delete_old_completed_announcements(grace_days=90) assert deleted == 0 # winner_date NULL은 안전 보존 def test_get_unnotified_matches_filters_by_score_and_null(): from app.db import get_unnotified_matches aid = _seed_announcement("MatchA", "청약중", hmno="MA", pno="1") with _conn() as conn: # 임계값 미만 conn.execute(""" INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new) VALUES (?, NULL, 50, '[]', '[]', 1) """, (aid,)) # 임계값 통과 — 미알림 conn.execute(""" INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new) VALUES (?, 1, 80, '[]', '[]', 1) """, (aid,)) # 임계값 통과 — 이미 알림됨 conn.execute(""" INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new, notified_at) VALUES (?, 2, 90, '[]', '[]', 1, '2026-04-01T00:00:00.000Z') """, (aid,)) matches = get_unnotified_matches(min_score=70) assert len(matches) == 1 assert matches[0]["match_score"] == 80 assert matches[0]["house_nm"] == "MatchA" def test_mark_matches_notified_sets_timestamp(): from app.db import mark_matches_notified aid = _seed_announcement("NotifyA", "청약중", hmno="NT", pno="1") with _conn() as conn: cur = conn.execute(""" INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new) VALUES (?, NULL, 80, '[]', '[]', 1) """, (aid,)) match_id = cur.lastrowid mark_matches_notified([match_id]) with _conn() as conn: row = conn.execute("SELECT notified_at FROM match_results WHERE id = ?", (match_id,)).fetchone() assert row["notified_at"] is not None ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_db_functions.py -v` Expected: 6 failed (함수 미정의) - [ ] **Step 3: db.py에 함수 3종 추가** `realestate-lab/app/db.py` 파일의 `delete_closed_announcements` 함수 다음에 추가 (현재 라인 ~408 인근). 찾을 위치: ```python def delete_closed_announcements() -> int: """status='완료' 공고 일괄 삭제. 삭제된 건수 반환.""" with _conn() as conn: cur = conn.execute("DELETE FROM announcements WHERE status = '완료'") return cur.rowcount ``` 직후에 추가: ```python def delete_old_completed_announcements(grace_days: int = 90) -> int: """winner_date + grace_days 경과한 status='완료' 공고를 삭제. winner_date가 NULL인 행은 안전하게 보존(수동 검토 대상). match_results는 FK CASCADE로 자동 삭제. 삭제된 건수 반환. """ with _conn() as conn: cur = conn.execute( """ DELETE FROM announcements WHERE status = '완료' AND winner_date IS NOT NULL AND date(winner_date) < date('now', ?) """, (f"-{grace_days} days",), ) return cur.rowcount ``` 같은 파일의 `mark_match_read` 함수 다음에 추가 (현재 라인 ~660 인근). 찾을 위치: ```python def mark_match_read(match_id: int) -> bool: with _conn() as conn: cur = conn.execute("UPDATE match_results SET is_new = 0 WHERE id = ?", (match_id,)) return cur.rowcount > 0 ``` 직후에 추가: ```python def get_unnotified_matches(min_score: int) -> List[Dict[str, Any]]: """notified_at IS NULL AND match_score >= min_score 인 매칭과 공고 정보 조인 반환.""" with _conn() as conn: rows = conn.execute(""" SELECT m.id, m.announcement_id, m.match_score, m.match_reasons, m.eligible_types, a.house_nm, a.region_name, a.district, a.address, a.receipt_start, a.receipt_end, a.winner_date, a.house_secd, a.is_speculative_area, a.is_price_cap, a.pblanc_url FROM match_results m JOIN announcements a ON a.id = m.announcement_id WHERE m.notified_at IS NULL AND m.match_score >= ? ORDER BY m.match_score DESC """, (min_score,)).fetchall() items = [] for r in rows: d = {c: r[c] for c in r.keys()} d["match_reasons"] = json.loads(d["match_reasons"]) if d["match_reasons"] else [] d["eligible_types"] = json.loads(d["eligible_types"]) if d["eligible_types"] else [] items.append(d) return items def mark_matches_notified(match_ids: List[int]) -> None: """주어진 match_results IDs의 notified_at을 현재 시각으로 일괄 업데이트.""" if not match_ids: return placeholders = ",".join("?" for _ in match_ids) with _conn() as conn: conn.execute( f"UPDATE match_results SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') " f"WHERE id IN ({placeholders})", match_ids, ) ``` - [ ] **Step 4: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_db_functions.py -v` Expected: 6 passed - [ ] **Step 5: 커밋** ```bash git add realestate-lab/app/db.py realestate-lab/tests/test_db_functions.py git commit -m "feat(realestate-db): add notify queue + 90-day grace cleanup" ``` --- ## Task 4: realestate-lab collector 변경 `_extract_district` 추가, 모집공고일 윈도우 사전 좁힘, `완료` 상태 skip. **Files:** - Modify: `realestate-lab/app/collector.py` - Test: `realestate-lab/tests/test_collector.py` - [ ] **Step 1: _extract_district 단위 테스트 작성** `realestate-lab/tests/test_collector.py`: ```python def test_extract_district_seoul_full_address(): from app.collector import _extract_district parsed = {"address": "서울특별시 강남구 도곡동 123-45", "region_name": None} assert _extract_district(parsed) == "강남구" def test_extract_district_seoul_short(): from app.collector import _extract_district parsed = {"address": None, "region_name": "서울 송파구"} assert _extract_district(parsed) == "송파구" def test_extract_district_busan_returns_none(): from app.collector import _extract_district parsed = {"address": "부산광역시 해운대구 우동", "region_name": None} assert _extract_district(parsed) is None def test_extract_district_empty_returns_none(): from app.collector import _extract_district parsed = {"address": "", "region_name": ""} assert _extract_district(parsed) is None def test_extract_district_seoul_county(): from app.collector import _extract_district parsed = {"address": "서울 강서구", "region_name": None} assert _extract_district(parsed) == "강서구" def test_extract_district_prefers_address_over_region(): from app.collector import _extract_district parsed = {"address": "서울특별시 마포구 합정동", "region_name": "서울 강남구"} assert _extract_district(parsed) == "마포구" ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_collector.py -v` Expected: 6 failed (`_extract_district` 미정의) - [ ] **Step 3: collector.py에 _extract_district 추가** `realestate-lab/app/collector.py`의 import 블록 변경: ```python import os import re import logging from datetime import date, timedelta import requests from typing import List, Dict, Any ``` (`re`, `date`, `timedelta` 추가) 기존 `_parse_apt_detail` 함수 위에 정규식 + 헬퍼 함수 추가: ```python DISTRICT_PATTERN = re.compile(r"(?:서울특별시|서울시|서울)\s+(\S+?(?:구|군))") def _extract_district(parsed: Dict[str, Any]) -> str | None: """파싱된 공고에서 자치구를 추출. 서울 외 지역·실패 시 None.""" for src in (parsed.get("address"), parsed.get("region_name")): if not src: continue m = DISTRICT_PATTERN.search(src) if m: return m.group(1) return None ``` - [ ] **Step 4: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_collector.py -v` Expected: 6 passed - [ ] **Step 5: collect_all 통합 테스트 추가 — 완료 skip + 윈도우** `realestate-lab/tests/test_collector.py` 파일 끝에 추가: ```python from datetime import date, timedelta from unittest.mock import patch def test_collect_skips_completed_status(monkeypatch): """winner_date가 과거인 응답은 status='완료'로 판정되어 upsert되지 않는다.""" from app import collector from app.db import _conn monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST") # 모듈 상수도 갱신 monkeypatch.setattr(collector, "API_KEY", "TEST") past_winner = (date.today() - timedelta(days=10)).strftime("%Y-%m-%d") fake_detail_rows = [{ "HOUSE_MANAGE_NO": "DONE-1", "PBLANC_NO": "01", "HOUSE_NM": "완료된단지", "HSSPLY_ADRES": "서울특별시 강남구", "RCEPT_BGNDE": "2026-01-01", "RCEPT_ENDDE": "2026-01-05", "PRZWNER_PRESNATN_DE": past_winner, }] def fake_call(endpoint, params=None): if "Detail" in endpoint: return fake_detail_rows return [] monkeypatch.setattr(collector, "_api_call", fake_call) collector.collect_all() with _conn() as conn: rows = conn.execute("SELECT * FROM announcements WHERE house_manage_no='DONE-1'").fetchall() assert len(rows) == 0 def test_collect_stores_district_for_seoul_announcement(monkeypatch): from app import collector from app.db import _conn monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST") monkeypatch.setattr(collector, "API_KEY", "TEST") future_start = (date.today() + timedelta(days=10)).strftime("%Y-%m-%d") future_end = (date.today() + timedelta(days=15)).strftime("%Y-%m-%d") future_winner = (date.today() + timedelta(days=30)).strftime("%Y-%m-%d") fake_detail = [{ "HOUSE_MANAGE_NO": "SEOUL-1", "PBLANC_NO": "01", "HOUSE_NM": "강남단지", "HSSPLY_ADRES": "서울특별시 강남구 도곡동 1", "RCEPT_BGNDE": future_start, "RCEPT_ENDDE": future_end, "PRZWNER_PRESNATN_DE": future_winner, }] def fake_call(endpoint, params=None): if "Detail" in endpoint: return fake_detail return [] monkeypatch.setattr(collector, "_api_call", fake_call) collector.collect_all() with _conn() as conn: row = conn.execute("SELECT district, status FROM announcements WHERE house_manage_no='SEOUL-1'").fetchone() assert row["district"] == "강남구" assert row["status"] in ("청약예정", "청약중") def test_collect_passes_date_window_param(monkeypatch): from app import collector monkeypatch.setenv("DATA_GO_KR_API_KEY", "TEST") monkeypatch.setattr(collector, "API_KEY", "TEST") captured_params = [] def fake_call(endpoint, params=None): captured_params.append(params or {}) return [] monkeypatch.setattr(collector, "_api_call", fake_call) collector.collect_all() expected_from = (date.today() - timedelta(days=30)).strftime("%Y%m%d") detail_calls = [p for p in captured_params if "RCRIT_PBLANC_DE_FROM" in p] assert detail_calls, "detail 엔드포인트 호출에 윈도우 파라미터가 없음" assert detail_calls[0]["RCRIT_PBLANC_DE_FROM"] == expected_from ``` - [ ] **Step 6: 통합 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_collector.py -v -k "test_collect"` Expected: 3 failed (윈도우/완료/district 미적용) - [ ] **Step 7: collect_all 본문 변경 — 윈도우 + skip + district** `realestate-lab/app/collector.py`의 `collect_all` 함수를 다음과 같이 수정: 기존: ```python def collect_all() -> Dict[str, Any]: """모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.""" if not API_KEY: logger.warning("API 키 미설정 — 수집 중단") save_collect_log(0, 0, "API 키 미설정") return {"new_count": 0, "total_count": 0} total_count = 0 new_count = 0 for detail_ep, model_ep in DETAIL_ENDPOINTS: # 공고 상세 수집 detail_rows = _api_call(detail_ep) for raw in detail_rows: try: parsed = _parse_apt_detail(raw) # 일정 정보가 하나도 없는 공고는 건너뜀 has_dates = any(parsed.get(f) for f in ( "receipt_start", "receipt_end", "spsply_start", "gnrl_rank1_start", "winner_date", "contract_start", )) if not has_dates: continue _, is_new = upsert_announcement(parsed) total_count += 1 if is_new: new_count += 1 except Exception as e: logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e) ``` 변경: ```python def collect_all() -> Dict[str, Any]: """모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다. 모집공고일 30일 이전 데이터는 API 파라미터로 사전 좁힘. status='완료'로 판정되는 응답은 저장하지 않음. """ if not API_KEY: logger.warning("API 키 미설정 — 수집 중단") save_collect_log(0, 0, "API 키 미설정") return {"new_count": 0, "total_count": 0} today = date.today() date_from = (today - timedelta(days=30)).strftime("%Y%m%d") total_count = 0 new_count = 0 skipped_completed = 0 for detail_ep, model_ep in DETAIL_ENDPOINTS: # 공고 상세 수집 — API에 모집공고일 윈도우 파라미터 전달 # 일부 엔드포인트는 파라미터 미지원일 수 있어 무시되지만 응답에 영향 없음 detail_rows = _api_call(detail_ep, params={"RCRIT_PBLANC_DE_FROM": date_from}) for raw in detail_rows: try: parsed = _parse_apt_detail(raw) parsed["district"] = _extract_district(parsed) # 일정 정보가 하나도 없는 공고는 건너뜀 (기존) has_dates = any(parsed.get(f) for f in ( "receipt_start", "receipt_end", "spsply_start", "gnrl_rank1_start", "winner_date", "contract_start", )) if not has_dates: continue # status='완료'면 저장하지 않음 (자원 절감) from .db import compute_status status = compute_status( parsed.get("receipt_start", "") or "", parsed.get("receipt_end", "") or "", parsed.get("winner_date", "") or "", ) if status == "완료": skipped_completed += 1 continue _, is_new = upsert_announcement(parsed) total_count += 1 if is_new: new_count += 1 except Exception as e: logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e) ``` - [ ] **Step 8: upsert_announcement에 district 컬럼 처리 — db.py 변경** `realestate-lab/app/db.py`의 `upsert_announcement` 함수의 INSERT 컬럼 목록과 ON CONFLICT 절에 `district`를 추가. 찾을 위치 (`INSERT INTO announcements` 부분): ```python conn.execute(""" INSERT INTO announcements ( house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd, rent_secd, region_code, region_name, address, total_units, ``` 다음과 같이 변경: ```python conn.execute(""" INSERT INTO announcements ( house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd, rent_secd, region_code, region_name, district, address, total_units, ``` VALUES 블록도 변경: 기존: ```python ) VALUES ( :house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd, :rent_secd, :region_code, :region_name, :address, :total_units, ``` 변경: ```python ) VALUES ( :house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd, :rent_secd, :region_code, :region_name, :district, :address, :total_units, ``` ON CONFLICT 절에도 추가. 찾을 위치: ```python region_code=excluded.region_code, region_name=excluded.region_name, address=excluded.address, ``` 다음과 같이 변경: ```python region_code=excluded.region_code, region_name=excluded.region_name, district=excluded.district, address=excluded.address, ``` 마지막으로, `upsert_announcement` 함수 시작부에 district 기본값 보정 추가. 함수 첫 줄 직후: ```python def upsert_announcement(data: Dict[str, Any]) -> tuple: """공고 upsert — house_manage_no + pblanc_no 기준. Returns (dict, is_new: bool).""" data.setdefault("district", None) # 수동 등록 등에서 누락 시 안전 처리 status = compute_status( ... ``` `ANNOUNCEMENT_COLUMNS` 상수에도 추가: ```python ANNOUNCEMENT_COLUMNS = { "house_nm", "house_secd", "house_dtl_secd", "rent_secd", "region_code", "region_name", "district", "address", "total_units", ... } ``` - [ ] **Step 9: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_collector.py -v` Expected: 9 passed - [ ] **Step 10: 커밋** ```bash git add realestate-lab/app/collector.py realestate-lab/app/db.py realestate-lab/tests/test_collector.py git commit -m "feat(realestate-collector): 30-day window + district extraction + completed skip" ``` --- ## Task 5: realestate-lab matcher 5티어 + 자격 점수 재배분 지역 35점(광역 10 + 자치구 가중 25), 자격 25점(첫 자격 15 + 추가 5씩 최대 +10). **Files:** - Modify: `realestate-lab/app/matcher.py` - Test: `realestate-lab/tests/test_matcher.py` - [ ] **Step 1: 지역·자격 점수 단위 테스트 작성** `realestate-lab/tests/test_matcher.py`: ```python def test_region_score_no_districts_full_when_region_match(): """자치구 미설정: 광역 일치 시 35점.""" from app.matcher import _region_score profile = {"preferred_regions": ["서울"], "preferred_districts": {}} ann = {"region_name": "서울특별시", "district": None} score, _ = _region_score(profile, ann) assert score == 35 def test_region_score_no_districts_zero_when_region_mismatch(): from app.matcher import _region_score profile = {"preferred_regions": ["서울"], "preferred_districts": {}} ann = {"region_name": "부산광역시", "district": None} score, _ = _region_score(profile, ann) assert score == 0 def test_region_score_s_tier_district(): """광역 매칭 + S티어 자치구: 10 + 25 = 35.""" from app.matcher import _region_score profile = { "preferred_regions": ["서울"], "preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []}, } ann = {"region_name": "서울특별시", "district": "강남구"} score, _ = _region_score(profile, ann) assert score == 35 def test_region_score_a_tier_district(): """광역 매칭 + A티어 자치구: 10 + 20 = 30.""" from app.matcher import _region_score profile = { "preferred_regions": ["서울"], "preferred_districts": {"S": [], "A": ["송파구"], "B": [], "C": [], "D": []}, } ann = {"region_name": "서울특별시", "district": "송파구"} score, _ = _region_score(profile, ann) assert score == 30 def test_region_score_d_tier_district(): """광역 매칭 + D티어 자치구: 10 + 5 = 15.""" from app.matcher import _region_score profile = { "preferred_regions": ["서울"], "preferred_districts": {"S": [], "A": [], "B": [], "C": [], "D": ["도봉구"]}, } ann = {"region_name": "서울특별시", "district": "도봉구"} score, _ = _region_score(profile, ann) assert score == 15 def test_region_score_district_set_but_not_listed(): """광역 매칭 + 자치구 5티어 어디에도 없음: 10점만.""" from app.matcher import _region_score profile = { "preferred_regions": ["서울"], "preferred_districts": {"S": ["강남구"], "A": [], "B": [], "C": [], "D": []}, } ann = {"region_name": "서울특별시", "district": "강서구"} score, _ = _region_score(profile, ann) assert score == 10 def test_eligibility_score_zero_when_empty(): from app.matcher import _eligibility_score assert _eligibility_score([]) == 0 def test_eligibility_score_one_type_returns_15(): from app.matcher import _eligibility_score assert _eligibility_score(["일반1순위"]) == 15 def test_eligibility_score_two_types_returns_20(): from app.matcher import _eligibility_score assert _eligibility_score(["일반1순위", "특별-신혼부부"]) == 20 def test_eligibility_score_caps_at_25(): from app.matcher import _eligibility_score assert _eligibility_score(["a", "b", "c", "d", "e"]) == 25 ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_matcher.py -v` Expected: 10 failed - [ ] **Step 3: matcher.py에 신규 함수 추가** `realestate-lab/app/matcher.py` 파일 상단 (logger 정의 후, _HOUSE_TYPE_MAP 위)에 추가: ```python TIER_WEIGHTS = {"S": 1.00, "A": 0.80, "B": 0.60, "C": 0.40, "D": 0.20} def _region_score(profile: Dict[str, Any], ann: Dict[str, Any]) -> tuple[int, list[str]]: """지역 점수 계산. 광역 10점 + 자치구 5티어 가중치 0~25점. 자치구 기준 미설정 시 광역 매칭만으로 35점 풀 점수(기존 호환). """ region_name = ann.get("region_name") or "" district = ann.get("district") or "" preferred_regions = profile.get("preferred_regions") or [] preferred_districts = profile.get("preferred_districts") or {} region_match = bool(region_name and any(r in region_name for r in preferred_regions)) if not region_match: return 0, [] has_districts = any(preferred_districts.get(t) for t in TIER_WEIGHTS) if not has_districts: return 35, [f"선호 지역 일치: {region_name}"] score = 10 reasons = [f"광역 일치: {region_name}"] for tier, weight in TIER_WEIGHTS.items(): if district and district in (preferred_districts.get(tier) or []): tier_score = round(25 * weight) score += tier_score reasons.append(f"자치구 {tier}티어: {district} (+{tier_score})") break return score, reasons def _eligibility_score(eligible_types: List[str]) -> int: """자격 점수 0~25. 첫 자격 15점 + 추가 자격당 5점, 최대 +10.""" if not eligible_types: return 0 return 15 + min((len(eligible_types) - 1) * 5, 10) ``` - [ ] **Step 4: _compute_score 본문 교체 — 새 함수 호출 통합** `realestate-lab/app/matcher.py`의 `_compute_score` 함수 본문을 다음과 같이 교체: ```python def _compute_score( profile: Dict[str, Any], ann: Dict[str, Any], models: List[Dict[str, Any]], ) -> Dict[str, Any]: """매칭 점수(0-100)와 사유를 계산한다. 배분: 지역 35 / 유형 10 / 면적 15 / 가격 15 / 자격 25. """ score = 0 reasons: List[str] = [] # 1. 지역 (35점) — 광역 + 자치구 5티어 region_score, region_reasons = _region_score(profile, ann) score += region_score reasons.extend(region_reasons) # 2. 주택유형 (10점) — binary preferred_types = profile.get("preferred_types") or [] house_secd = ann.get("house_secd") or "" type_name = _HOUSE_TYPE_MAP.get(house_secd, house_secd) if type_name and type_name in preferred_types: score += 10 reasons.append(f"선호 유형 일치: {type_name}") # 3. 면적 (15점) — binary, 범위 안 모델 1개라도 있으면 통과 min_area = profile.get("min_area") max_area = profile.get("max_area") if min_area is not None and max_area is not None and models: for m in models: supply_area = m.get("supply_area") if supply_area is not None and min_area <= supply_area <= max_area: score += 15 reasons.append(f"희망 면적 범위 내 모델 존재 ({supply_area}㎡)") break # 4. 가격 (15점) — binary, 예산 이하 모델 1개라도 있으면 통과 max_price = profile.get("max_price") if max_price is not None and models: for m in models: top_amount = m.get("top_amount") if top_amount is not None and top_amount <= max_price: score += 15 reasons.append(f"예산 범위 내 모델 존재 (최고가 {top_amount:,}만원)") break # 5. 자격 (25점) — 첫 자격 15 + 추가당 5 eligible_types = _check_eligible_types(profile, ann) elig_score = _eligibility_score(eligible_types) if elig_score > 0: score += elig_score reasons.append(f"자격 유형 {len(eligible_types)}개: {', '.join(eligible_types)}") return { "match_score": score, "match_reasons": reasons, "eligible_types": eligible_types, } ``` - [ ] **Step 5: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_matcher.py -v` Expected: 10 passed - [ ] **Step 6: 커밋** ```bash git add realestate-lab/app/matcher.py realestate-lab/tests/test_matcher.py git commit -m "feat(realestate-matcher): 5-tier district weighting + eligibility curve" ``` --- ## Task 6: realestate-lab Profile API 확장 `ProfileUpdate` Pydantic 모델에 3 필드 추가. `models.py`만 수정하면 main.py 흐름은 자동 반영(이미 PROFILE_COLUMNS 기반). **Files:** - Modify: `realestate-lab/app/models.py` - Test: `realestate-lab/tests/test_profile_api.py` - [ ] **Step 1: API 통합 테스트 작성** `realestate-lab/tests/test_profile_api.py`: ```python from fastapi.testclient import TestClient def test_profile_update_accepts_new_fields(): from app.main import app client = TestClient(app) body = { "name": "테스트", "preferred_districts": { "S": ["강남구", "서초구"], "A": ["송파구"], "B": [], "C": [], "D": [], }, "min_match_score": 75, "notify_enabled": True, } resp = client.put("/api/realestate/profile", json=body) assert resp.status_code == 200 data = resp.json() assert data["preferred_districts"]["S"] == ["강남구", "서초구"] assert data["min_match_score"] == 75 assert data["notify_enabled"] is True def test_profile_get_returns_defaults_for_new_fields(): from app.main import app from app.db import upsert_profile upsert_profile({"name": "기본"}) client = TestClient(app) resp = client.get("/api/realestate/profile") assert resp.status_code == 200 data = resp.json() assert data["preferred_districts"] == {} assert data["min_match_score"] == 70 assert data["notify_enabled"] is True ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_profile_api.py -v` Expected: 2 failed (모델에 필드 없음 → 422 또는 dict 직렬화 실패) - [ ] **Step 3: ProfileUpdate 모델에 필드 추가** `realestate-lab/app/models.py` 파일의 import 변경: ```python from typing import Optional, List, Dict from pydantic import BaseModel, Field ``` `ProfileUpdate` 클래스 끝에 필드 추가: ```python class ProfileUpdate(BaseModel): # ... 기존 필드 그대로 name: Optional[str] = None age: Optional[int] = None is_homeless: Optional[bool] = None is_householder: Optional[bool] = None subscription_months: Optional[int] = None subscription_amount: Optional[int] = None family_members: Optional[int] = None has_dependents: Optional[bool] = None children_count: Optional[int] = None is_newlywed: Optional[bool] = None marriage_months: Optional[int] = None has_newborn: Optional[bool] = None is_first_home: Optional[bool] = None income_level: Optional[str] = None preferred_regions: Optional[List[str]] = None preferred_types: Optional[List[str]] = None min_area: Optional[float] = None max_area: Optional[float] = None max_price: Optional[int] = None # 신규 preferred_districts: Optional[Dict[str, List[str]]] = None min_match_score: Optional[int] = Field(default=None, ge=0, le=100) notify_enabled: Optional[bool] = None ``` - [ ] **Step 4: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_profile_api.py -v` Expected: 2 passed - [ ] **Step 5: 커밋** ```bash git add realestate-lab/app/models.py realestate-lab/tests/test_profile_api.py git commit -m "feat(realestate-profile): expose 5tier districts + min_match_score + notify_enabled" ``` --- ## Task 7: realestate-lab notifier.py 신규 agent-office로 push 트리거. 임계값 + notify_enabled 필터 + 멱등 마킹. **Files:** - Create: `realestate-lab/app/notifier.py` - Create: `realestate-lab/tests/test_notifier.py` - [ ] **Step 1: notifier 단위 테스트 작성** `realestate-lab/tests/test_notifier.py`: ```python from unittest.mock import patch, MagicMock def _seed_profile_and_match(score, notify_enabled=True, threshold=70): from app.db import _conn, upsert_profile upsert_profile({ "name": "u", "notify_enabled": notify_enabled, "min_match_score": threshold, }) with _conn() as conn: conn.execute(""" INSERT INTO announcements (house_manage_no, pblanc_no, house_nm, status, source) VALUES ('NF1', '01', '단지', '청약중', 'manual') """) ann_id = conn.execute("SELECT id FROM announcements WHERE house_manage_no='NF1'").fetchone()["id"] conn.execute(""" INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new) VALUES (?, NULL, ?, '[]', '[]', 1) """, (ann_id, score)) match_id = conn.execute("SELECT id FROM match_results WHERE announcement_id=?", (ann_id,)).fetchone()["id"] return match_id def test_notify_skips_when_disabled(): from app import notifier _seed_profile_and_match(score=80, notify_enabled=False) with patch.object(notifier, "requests") as r: result = notifier.notify_new_matches() assert r.post.call_count == 0 assert result["sent"] == 0 assert result.get("skipped") == "notify_disabled" def test_notify_filters_below_threshold(): from app import notifier _seed_profile_and_match(score=60, threshold=70) with patch.object(notifier, "requests") as r: result = notifier.notify_new_matches() assert r.post.call_count == 0 assert result["sent"] == 0 def test_notify_pushes_and_marks_notified(): from app import notifier from app.db import _conn match_id = _seed_profile_and_match(score=80, threshold=70) fake_resp = MagicMock() fake_resp.json.return_value = {"sent": 1, "sent_ids": [match_id]} fake_resp.raise_for_status.return_value = None with patch.object(notifier.requests, "post", return_value=fake_resp) as post: result = notifier.notify_new_matches() assert post.call_count == 1 args, kwargs = post.call_args assert "/api/agent-office/realestate/notify" in args[0] assert kwargs["json"]["matches"][0]["id"] == match_id with _conn() as conn: row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone() assert row["notified_at"] is not None assert result["sent"] == 1 def test_notify_does_not_mark_on_failure(): from app import notifier from app.db import _conn import requests as real_requests match_id = _seed_profile_and_match(score=80, threshold=70) def boom(*a, **k): raise real_requests.RequestException("agent-office down") with patch.object(notifier.requests, "post", side_effect=boom): result = notifier.notify_new_matches() with _conn() as conn: row = conn.execute("SELECT notified_at FROM match_results WHERE id=?", (match_id,)).fetchone() assert row["notified_at"] is None assert result["sent"] == 0 assert "error" in result ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_notifier.py -v` Expected: 4 failed (모듈 없음) - [ ] **Step 3: notifier.py 작성** `realestate-lab/app/notifier.py`: ```python """신규 매칭을 agent-office로 push하여 텔레그램 알림을 트리거한다.""" import os import logging import requests from .db import get_profile, get_unnotified_matches, mark_matches_notified logger = logging.getLogger("realestate-lab") AGENT_OFFICE_URL = os.getenv("AGENT_OFFICE_URL", "http://agent-office:8000") NOTIFY_TIMEOUT_SECONDS = int(os.getenv("REALESTATE_NOTIFY_TIMEOUT", "15")) def notify_new_matches() -> dict: """프로필의 임계값을 통과한 미알림 매칭을 agent-office로 push한다. 응답이 200이고 sent_ids가 비어있지 않으면 해당 IDs의 notified_at을 마킹. 실패 시 마킹하지 않아 다음 사이클에서 재시도된다. """ profile = get_profile() if not profile: return {"sent": 0, "skipped": "no_profile"} if not profile.get("notify_enabled"): return {"sent": 0, "skipped": "notify_disabled"} threshold = profile.get("min_match_score") or 70 matches = get_unnotified_matches(threshold) if not matches: return {"sent": 0} url = f"{AGENT_OFFICE_URL}/api/agent-office/realestate/notify" try: resp = requests.post(url, json={"matches": matches}, timeout=NOTIFY_TIMEOUT_SECONDS) resp.raise_for_status() body = resp.json() except requests.RequestException as e: logger.error("agent-office push 실패: %s", e) return {"sent": 0, "error": str(e)} sent_ids = body.get("sent_ids") or [] if sent_ids: mark_matches_notified(sent_ids) logger.info("알림 송신: %d건", len(sent_ids)) return body ``` - [ ] **Step 4: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_notifier.py -v` Expected: 4 passed - [ ] **Step 5: 커밋** ```bash git add realestate-lab/app/notifier.py realestate-lab/tests/test_notifier.py git commit -m "feat(realestate-notifier): push unnotified matches to agent-office" ``` --- ## Task 8: realestate-lab scheduled_collect 흐름 통합 `scheduled_collect`에 정리 + notifier 호출 추가. **Files:** - Modify: `realestate-lab/app/main.py` - Test: `realestate-lab/tests/test_scheduled_flow.py` - [ ] **Step 1: 흐름 통합 테스트 작성** `realestate-lab/tests/test_scheduled_flow.py`: ```python from unittest.mock import patch def test_scheduled_collect_calls_cleanup_and_notifier(): from app import main as app_main calls = [] def fake_collect(): calls.append("collect") return {"new_count": 0, "total_count": 0} def fake_cleanup(grace_days=90): calls.append(("cleanup", grace_days)) return 0 def fake_match(): calls.append("match") def fake_notify(): calls.append("notify") return {"sent": 0} with patch.object(app_main, "collect_all", side_effect=fake_collect), \ patch.object(app_main, "delete_old_completed_announcements", side_effect=fake_cleanup), \ patch.object(app_main, "run_matching", side_effect=fake_match), \ patch.object(app_main, "notify_new_matches", side_effect=fake_notify): app_main.scheduled_collect() assert calls == ["collect", ("cleanup", 90), "match", "notify"] ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd realestate-lab && python -m pytest tests/test_scheduled_flow.py -v` Expected: 1 failed (notify_new_matches/delete_old_completed import 안 됨) - [ ] **Step 3: main.py 수정** `realestate-lab/app/main.py`의 import 블록에 추가: ```python from .db import ( init_db, get_announcements, get_announcement, create_announcement, update_announcement, delete_announcement, delete_closed_announcements, toggle_bookmark, update_all_statuses, get_profile, upsert_profile, get_matches, mark_match_read, get_last_collect_log, get_dashboard, delete_old_completed_announcements, # NEW ) from .collector import collect_all from .matcher import run_matching from .notifier import notify_new_matches # NEW from .models import AnnouncementCreate, AnnouncementUpdate, ProfileUpdate ``` `scheduled_collect` 함수 수정: 기존: ```python def scheduled_collect(): """매일 09:00 — 수집 + 매칭""" logger.info("스케줄 수집 시작") collect_all() run_matching() logger.info("스케줄 수집 + 매칭 완료") ``` 변경: ```python def scheduled_collect(): """매일 09:00 — 수집 + 정리 + 매칭 + 알림 push""" logger.info("스케줄 수집 시작") collect_all() deleted = delete_old_completed_announcements(grace_days=90) if deleted: logger.info("정리: %d건 삭제", deleted) run_matching() notify_new_matches() logger.info("스케줄 수집 + 매칭 + 알림 완료") ``` `_run_collect_and_match`도 통일성을 위해 같은 흐름 적용: 기존: ```python def _run_collect_and_match(): if not _collect_lock.acquire(blocking=False): logger.info("수집 이미 진행 중 — 건너뜀") return try: collect_all() run_matching() finally: _collect_lock.release() ``` 변경: ```python def _run_collect_and_match(): if not _collect_lock.acquire(blocking=False): logger.info("수집 이미 진행 중 — 건너뜀") return try: collect_all() delete_old_completed_announcements(grace_days=90) run_matching() notify_new_matches() finally: _collect_lock.release() ``` - [ ] **Step 4: 테스트 실행 — 통과 확인** Run: `cd realestate-lab && python -m pytest tests/test_scheduled_flow.py -v` Expected: 1 passed - [ ] **Step 5: 전체 회귀 검증** Run: `cd realestate-lab && python -m pytest tests/ -v` Expected: 모든 테스트 통과 - [ ] **Step 6: 커밋** ```bash git add realestate-lab/app/main.py realestate-lab/tests/test_scheduled_flow.py git commit -m "feat(realestate): wire cleanup + notifier into scheduled flow" ``` --- ## Task 9: agent-office 텔레그램 fmt + messaging 헬퍼 청약 매칭 메시지 포맷터(묶음/풀 카드) + 인라인 키보드 빌더 + 송신 헬퍼. **Files:** - Create: `agent-office/app/telegram/realestate_message.py` - Create: `agent-office/tests/test_realestate_message.py` - [ ] **Step 1: 테스트 작성** `agent-office/tests/test_realestate_message.py`: ```python def test_format_realestate_match_full_card_single(): from app.telegram.realestate_message import format_realestate_matches matches = [{ "id": 1, "match_score": 90, "house_nm": "디에이치 강남", "region_name": "서울특별시", "district": "강남구", "is_speculative_area": "Y", "is_price_cap": "Y", "receipt_start": "2026-05-15", "receipt_end": "2026-05-19", "match_reasons": ["광역 일치", "자치구 S티어: 강남구 (+25)", "예산 범위"], "eligible_types": ["일반1순위", "특별-신혼부부"], "pblanc_url": "https://example.com/p/1", }] text = format_realestate_matches(matches) assert "디에이치 강남" in text assert "90점" in text assert "강남구" in text assert "2026-05-15" in text def test_format_realestate_match_compact_when_three_or_more(): from app.telegram.realestate_message import format_realestate_matches matches = [ {"id": i, "match_score": 90 - i, "house_nm": f"단지{i}", "district": "강남구", "region_name": "서울특별시", "receipt_start": "2026-05-15", "receipt_end": "2026-05-19", "match_reasons": [], "eligible_types": [], "pblanc_url": ""} for i in range(3) ] text = format_realestate_matches(matches) assert "3건" in text or "3" in text for i in range(3): assert f"단지{i}" in text def test_build_keyboard_single_match_has_bookmark_and_url(): from app.telegram.realestate_message import build_match_keyboard matches = [{"id": 42, "pblanc_url": "https://example.com/p/42"}] kb = build_match_keyboard(matches) rows = kb["inline_keyboard"] flat = [b for row in rows for b in row] assert any(b.get("callback_data", "").startswith("realestate_bookmark_42") for b in flat) assert any(b.get("url") == "https://example.com/p/42" for b in flat) def test_build_keyboard_multi_matches_uses_dashboard_link(): from app.telegram.realestate_message import build_match_keyboard matches = [{"id": i, "pblanc_url": ""} for i in range(3)] kb = build_match_keyboard(matches) flat = [b for row in kb["inline_keyboard"] for b in row] # 3건 이상이면 [전체 보기] 단일 URL 버튼 assert any("전체" in b.get("text", "") for b in flat) def test_build_keyboard_empty_returns_none(): from app.telegram.realestate_message import build_match_keyboard assert build_match_keyboard([]) is None ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd agent-office && python -m pytest tests/test_realestate_message.py -v` Expected: 5 failed (모듈 없음) - [ ] **Step 3: realestate_message.py 작성** `agent-office/app/telegram/realestate_message.py`: ```python """청약 매칭 알림 — 텔레그램 메시지 포맷터 + 인라인 키보드 빌더.""" import os from html import escape as _h from typing import Optional DASHBOARD_URL = os.getenv("REALESTATE_DASHBOARD_URL", "https://example.com/realestate") _TIER_BADGE = {"S": "S", "A": "A", "B": "B", "C": "C", "D": "D"} def _format_one_compact(m: dict) -> str: score = m.get("match_score", 0) name = _h(m.get("house_nm") or "(제목 없음)") district = m.get("district") or "" region = m.get("region_name") or "" where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상" rstart = m.get("receipt_start") or "" rend = m.get("receipt_end") or "" return ( f"⭐ {score}점 — {name}\n" f"📍 {_h(where)} 📅 {_h(rstart)} ~ {_h(rend)}" ) def _format_one_full(m: dict) -> str: score = m.get("match_score", 0) name = _h(m.get("house_nm") or "(제목 없음)") district = m.get("district") or "" region = m.get("region_name") or "" flags = [] if m.get("is_speculative_area") == "Y": flags.append("투기과열") if m.get("is_price_cap") == "Y": flags.append("분양가상한제") flag_str = f" ({', '.join(flags)})" if flags else "" rstart = m.get("receipt_start") or "" rend = m.get("receipt_end") or "" elig = m.get("eligible_types") or [] reasons = m.get("match_reasons") or [] where = f"{region.split()[0] if region else ''} {district}".strip() or "위치 미상" lines = [ f"⭐ {score}점 — {name}", f"📍 {_h(where)}{_h(flag_str)}", f"📅 청약 {_h(rstart)} ~ {_h(rend)}", ] if elig: lines.append(f"✓ 자격: {_h(', '.join(elig))}") if reasons: lines.append(f"💡 {_h(' / '.join(reasons[:4]))}") return "\n".join(lines) def format_realestate_matches(matches: list[dict]) -> str: """매칭 목록을 텔레그램 HTML 메시지로 변환. 1~2건은 풀 카드, 3건 이상은 묶음 카드(상위 5건). """ if not matches: return "🏢 새 청약 매칭이 없습니다." if len(matches) <= 2: body = "\n\n".join(_format_one_full(m) for m in matches) return f"🏢 새 청약 매칭 {len(matches)}건\n━━━━━━━━━━\n\n{body}" top = matches[:5] body = "\n\n".join(_format_one_compact(m) for m in top) suffix = f"\n\n…외 {len(matches) - 5}건" if len(matches) > 5 else "" return f"🏢 새 청약 매칭 {len(matches)}건\n━━━━━━━━━━\n\n{body}{suffix}" def build_match_keyboard(matches: list[dict]) -> Optional[dict]: """1~2건: 매치별 [북마크][공고 보기] 행. 3건 이상: [전체 보기] 단일 행.""" if not matches: return None if len(matches) <= 2: rows = [] for m in matches: buttons = [{ "text": "🔖 북마크", "callback_data": f"realestate_bookmark_{m['id']}", }] url = m.get("pblanc_url") if url: buttons.append({"text": "📄 공고 보기", "url": url}) rows.append(buttons) return {"inline_keyboard": rows} return { "inline_keyboard": [[ {"text": "📋 전체 보기", "url": DASHBOARD_URL}, ]], } ``` - [ ] **Step 4: 테스트 실행 — 통과 확인** Run: `cd agent-office && python -m pytest tests/test_realestate_message.py -v` Expected: 5 passed - [ ] **Step 5: 커밋** ```bash git add agent-office/app/telegram/realestate_message.py agent-office/tests/test_realestate_message.py git commit -m "feat(agent-office-telegram): realestate match formatter + keyboard" ``` --- ## Task 10: agent-office RealestateAgent.on_new_matches + endpoint 신규 메소드 추가, endpoint 추가, on_schedule는 폐기 표시(cron 등록 빠지면 호출 안 됨, fetch_matches 명령은 on_new_matches 직접 호출로 단순화). **Files:** - Modify: `agent-office/app/agents/realestate.py` - Modify: `agent-office/app/main.py` (endpoint 추가) - Test: `agent-office/tests/test_realestate_agent.py` - [ ] **Step 1: 에이전트 + endpoint 테스트 작성** `agent-office/tests/test_realestate_agent.py`: ```python import os import sys import tempfile _TMP = tempfile.mktemp(suffix=".db") os.environ["AGENT_OFFICE_DB_PATH"] = _TMP sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncio from unittest.mock import AsyncMock, patch import pytest @pytest.fixture(autouse=True) def _init_db(): if os.path.exists(_TMP): os.remove(_TMP) from app.db import init_db init_db() yield def test_on_new_matches_returns_empty_when_no_matches(): from app.agents.realestate import RealestateAgent agent = RealestateAgent() result = asyncio.run(agent.on_new_matches([])) assert result == {"sent": 0, "sent_ids": []} def test_on_new_matches_sends_telegram_and_returns_ids(): from app.agents.realestate import RealestateAgent from app.telegram import messaging matches = [{ "id": 7, "match_score": 80, "house_nm": "단지A", "region_name": "서울특별시", "district": "강남구", "receipt_start": "2026-05-01", "receipt_end": "2026-05-05", "match_reasons": [], "eligible_types": [], "pblanc_url": "https://x.test/7", }] fake_send = AsyncMock(return_value={"ok": True, "message_id": 123}) with patch.object(messaging, "send_raw", fake_send): agent = RealestateAgent() result = asyncio.run(agent.on_new_matches(matches)) assert result["sent"] == 1 assert result["sent_ids"] == [7] assert result["message_id"] == 123 fake_send.assert_awaited_once() args, kwargs = fake_send.call_args text = args[0] assert "단지A" in text def test_on_new_matches_telegram_failure_returns_zero(): from app.agents.realestate import RealestateAgent from app.telegram import messaging matches = [{ "id": 8, "match_score": 80, "house_nm": "단지B", "region_name": "서울", "district": "송파구", "receipt_start": "", "receipt_end": "", "match_reasons": [], "eligible_types": [], "pblanc_url": "", }] fake_send = AsyncMock(return_value={"ok": False, "description": "401"}) with patch.object(messaging, "send_raw", fake_send): agent = RealestateAgent() result = asyncio.run(agent.on_new_matches(matches)) assert result["sent"] == 0 assert result["sent_ids"] == [] assert "error" in result def test_endpoint_calls_agent_on_new_matches(): from fastapi.testclient import TestClient from app.main import app from app.agents.realestate import RealestateAgent fake = AsyncMock(return_value={"sent": 1, "sent_ids": [99], "message_id": 1}) with patch.object(RealestateAgent, "on_new_matches", fake): client = TestClient(app) resp = client.post( "/api/agent-office/realestate/notify", json={"matches": [{"id": 99, "match_score": 80}]}, ) assert resp.status_code == 200 body = resp.json() assert body["sent"] == 1 assert body["sent_ids"] == [99] ``` - [ ] **Step 2: 테스트 실행 — 실패 확인** Run: `cd agent-office && python -m pytest tests/test_realestate_agent.py -v` Expected: 4 failed (메소드/endpoint 없음) - [ ] **Step 3: agents/realestate.py 수정 — on_new_matches 추가, on_schedule 단순화** `agent-office/app/agents/realestate.py` 전체 파일을 다음 내용으로 교체: ```python from .base import BaseAgent from ..db import create_task, update_task_status, add_log from .. import service_proxy from ..telegram import messaging from ..telegram.realestate_message import format_realestate_matches, build_match_keyboard class RealestateAgent(BaseAgent): """부동산 청약 에이전트. realestate-lab이 신규 매칭 발견 시 /realestate/notify로 push해 트리거됨. on_new_matches가 메인 진입점. on_schedule은 사용하지 않음(cron 폐기). """ agent_id = "realestate" display_name = "청약 애널리스트" async def on_new_matches(self, matches: list[dict]) -> dict: """신규 매칭 N건을 텔레그램 1통으로 푸시. 성공 시 sent_ids 반환 → realestate-lab이 notified_at 마킹. 실패 시 sent=0, sent_ids=[] 반환 → 다음 사이클 재시도. """ if not matches: return {"sent": 0, "sent_ids": []} task_id = create_task(self.agent_id, "notify_matches", {"count": len(matches)}) try: text = format_realestate_matches(matches) keyboard = build_match_keyboard(matches) await self.transition("reporting", f"매칭 {len(matches)}건 알림", task_id) tg = await messaging.send_raw(text, reply_markup=keyboard) if not tg.get("ok"): update_task_status(task_id, "failed", {"error": tg.get("description")}) await self.transition("idle", "알림 실패") return {"sent": 0, "sent_ids": [], "error": tg.get("description")} sent_ids = [m["id"] for m in matches if "id" in m] update_task_status(task_id, "succeeded", { "sent": len(matches), "telegram_message_id": tg.get("message_id"), }) await self.transition("idle", f"매칭 {len(matches)}건 알림 완료") return { "sent": len(matches), "sent_ids": sent_ids, "message_id": tg.get("message_id"), } except Exception as e: add_log(self.agent_id, f"on_new_matches failed: {e}", "error", task_id) update_task_status(task_id, "failed", {"error": str(e)}) await self.transition("idle", f"오류: {e}") return {"sent": 0, "sent_ids": [], "error": str(e)} async def on_command(self, command: str, params: dict) -> dict: if command == "fetch_matches": try: matches = await service_proxy.realestate_matches(limit=20) if not matches: return {"ok": True, "message": "매칭 없음"} result = await self.on_new_matches(matches) return {"ok": True, "result": result} except Exception as e: return {"ok": False, "message": str(e)} if command == "dashboard": try: data = await service_proxy.realestate_dashboard() return {"ok": True, "dashboard": data} except Exception as e: return {"ok": False, "message": str(e)} return {"ok": False, "message": f"Unknown command: {command}"} async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: pass ``` - [ ] **Step 4: main.py에 endpoint 추가** `agent-office/app/main.py`를 열고 다른 라우트들이 정의된 곳에 다음 endpoint를 추가한다. 정확한 위치는 다른 `/api/agent-office/...` 경로 근처. 만약 RealestateAgent가 registry로 관리된다면 그 패턴을 따르고, 아니면 직접 인스턴스화. 먼저 agent dispatch 패턴을 확인해야 한다. `agent-office/app/main.py`에서 다른 에이전트가 어떻게 호출되는지 본다 (예: stock 에이전트의 명령 처리). 그 패턴에 맞춰 RealestateAgent 인스턴스를 얻고 `on_new_matches` 호출. 다음 엔드포인트를 추가: ```python from .agents.realestate import RealestateAgent from pydantic import BaseModel from typing import List, Dict, Any class RealestateNotifyBody(BaseModel): matches: List[Dict[str, Any]] _realestate_agent_singleton: RealestateAgent | None = None def _get_realestate_agent() -> RealestateAgent: global _realestate_agent_singleton if _realestate_agent_singleton is None: _realestate_agent_singleton = RealestateAgent() return _realestate_agent_singleton @app.post("/api/agent-office/realestate/notify") async def realestate_notify(body: RealestateNotifyBody): agent = _get_realestate_agent() return await agent.on_new_matches(body.matches) ``` > ⚠️ **구현 시 검증**: agent-office가 이미 에이전트 인스턴스 registry/dispatcher를 가지고 있다면 (`agents/__init__.py` 또는 `main.py` 상단), 그 패턴을 따르고 위의 singleton 패턴은 사용하지 않는다. 기존 패턴이 없을 때만 위 코드 그대로 사용. 다른 에이전트(stock/music/lotto)의 인스턴스 관리 방식을 먼저 grep해서 확인할 것: `cd agent-office && grep -rn "StockAgent\|MusicAgent\|LottoAgent" app/main.py app/agents/__init__.py`. - [ ] **Step 5: 테스트 실행 — 통과 확인** Run: `cd agent-office && python -m pytest tests/test_realestate_agent.py -v` Expected: 4 passed - [ ] **Step 6: 커밋** ```bash git add agent-office/app/agents/realestate.py agent-office/app/main.py agent-office/tests/test_realestate_agent.py git commit -m "feat(agent-office): realestate on_new_matches + /notify endpoint" ``` --- ## Task 11: agent-office scheduler에서 realestate cron 제거 + 콜백 라우팅 데일리 09:15 cron 등록 제거. 인라인 키보드 콜백(`realestate_bookmark_*`)을 텔레그램 webhook에서 처리해 realestate-lab으로 프록시. **Files:** - Modify: `agent-office/app/scheduler.py` - Modify: `agent-office/app/telegram/webhook.py` (또는 callback 라우팅 위치) - Modify: `agent-office/app/service_proxy.py` (북마크 토글 헬퍼 추가) - [ ] **Step 1: scheduler에서 realestate cron 제거** `agent-office/app/scheduler.py`를 열어 realestate 관련 add_job 호출을 찾는다 (예: `realestate_agent.on_schedule` 등록부). 해당 라인을 제거하고 import도 정리. 찾는 패턴 예시: ```python scheduler.add_job(realestate_agent.on_schedule, "cron", hour=9, minute=15, id="realestate_daily") ``` 이 라인과 관련 import를 삭제한다. 만약 RealestateAgent import가 다른 곳에서 안 쓰이면 import도 함께 제거. - [ ] **Step 2: 콜백 핸들러 위치 확인** Run: `cd agent-office && grep -rn "callback_query\|callback_data" app/telegram/` 다음을 확인: - `webhook.py` 또는 `router.py`에서 `callback_query`를 받아 처리하는 함수 - 현재 패턴(예: `approve_*`, `reject_*` 콜백 처리 방식) - [ ] **Step 3: service_proxy에 북마크 토글 추가** `agent-office/app/service_proxy.py` 끝에 추가: ```python async def realestate_bookmark_toggle(announcement_id: int) -> dict: """realestate-lab의 PATCH /api/realestate/announcements/{id}/bookmark 호출.""" url = f"{REALESTATE_LAB_URL}/api/realestate/announcements/{announcement_id}/bookmark" async with httpx.AsyncClient(timeout=10) as client: resp = await client.patch(url) resp.raise_for_status() return resp.json() ``` (상단의 `REALESTATE_LAB_URL`, `httpx` import는 이미 service_proxy에 있을 것이므로 신규 환경변수만 docker-compose의 agent-office 환경에 노출. 만약 없다면 다음 라인을 service_proxy 상단에 추가:) ```python REALESTATE_LAB_URL = os.getenv("REALESTATE_LAB_URL", "http://realestate-lab:8000") ``` - [ ] **Step 4: 콜백 라우팅에 realestate_bookmark 처리 추가** webhook 또는 router의 `callback_query` 디스패처에 다음 분기를 추가 (정확한 함수명은 step 2에서 확인한 위치에 맞춤): ```python if data.startswith("realestate_bookmark_"): try: ann_id = int(data.removeprefix("realestate_bookmark_")) except ValueError: return {"ok": False, "message": "잘못된 콜백 데이터"} try: await service_proxy.realestate_bookmark_toggle(ann_id) await messaging.send_raw(f"🔖 북마크 토글 완료 (#{ann_id})") return {"ok": True} except Exception as e: await messaging.send_raw(f"⚠️ 북마크 실패: {e}") return {"ok": False, "error": str(e)} ``` > ⚠️ **위치 적응 필요**: step 2에서 확인한 콜백 디스패처의 기존 분기 스타일에 맞춰 통합. 기존 분기가 `if data.startswith("approve_"): ... elif data.startswith("reject_"): ...` 라면 `elif`로 추가. - [ ] **Step 5: 콜백 라우팅 단위 테스트** `agent-office/tests/test_realestate_callback.py`: ```python import os import sys import tempfile from unittest.mock import AsyncMock, patch _TMP = tempfile.mktemp(suffix=".db") os.environ["AGENT_OFFICE_DB_PATH"] = _TMP sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def test_callback_realestate_bookmark_calls_proxy(): """callback_data 'realestate_bookmark_42' 가 service_proxy.realestate_bookmark_toggle(42) 를 호출.""" import asyncio from app import service_proxy from app.telegram import webhook # 실제 모듈명에 맞춰 수정 필요 fake = AsyncMock(return_value={"ok": True}) with patch.object(service_proxy, "realestate_bookmark_toggle", fake): # webhook의 callback dispatcher를 직접 호출. 정확한 함수 시그니처는 step 2 확인 결과로 채움. # 예시: await webhook.handle_callback({"data": "realestate_bookmark_42", ...}) result = asyncio.run(webhook.handle_callback({"data": "realestate_bookmark_42", "from": {"id": 1}, "id": "cb1"})) fake.assert_awaited_once_with(42) ``` > ⚠️ **테스트 시그니처 적응**: step 2에서 확인한 콜백 핸들러의 정확한 함수명·인자에 맞춰 수정. 이 테스트는 realistic stub이며 실제 webhook 모듈 구조에 맞게 업데이트해야 통과. - [ ] **Step 6: 회귀 검증** Run: `cd agent-office && python -m pytest tests/ -v` Expected: 모든 기존 테스트 통과 + 신규 추가분 통과 - [ ] **Step 7: 커밋** ```bash git add agent-office/app/scheduler.py agent-office/app/telegram/ agent-office/app/service_proxy.py agent-office/tests/test_realestate_callback.py git commit -m "feat(agent-office): drop daily realestate cron + bookmark callback routing" ``` --- ## Task 12: docker-compose 환경변수 + 통합 검증 `AGENT_OFFICE_URL`을 realestate-lab 환경에 추가, `REALESTATE_LAB_URL`이 agent-office에 없으면 추가. **Files:** - Modify: `docker-compose.yml` - Modify: `.env.example` - [ ] **Step 1: docker-compose.yml의 realestate-lab 서비스에 AGENT_OFFICE_URL 추가** `docker-compose.yml`에서 `realestate-lab` 서비스 정의를 찾고, `environment:` 블록에 추가: ```yaml realestate-lab: # ... 기존 environment: # ... 기존 AGENT_OFFICE_URL: http://agent-office:8000 ``` - [ ] **Step 2: docker-compose.yml의 agent-office 서비스에 REALESTATE_LAB_URL 추가 (없는 경우)** agent-office 서비스 environment에: ```yaml agent-office: # ... 기존 environment: # ... 기존 REALESTATE_LAB_URL: http://realestate-lab:8000 REALESTATE_DASHBOARD_URL: ${REALESTATE_DASHBOARD_URL:-http://localhost:8080/realestate} ``` - [ ] **Step 3: .env.example에 신규 변수 명시** `.env.example` 파일 끝에 추가 (이미 있는 변수와 중복 없는지 확인): ```bash # 청약 알림 — agent-office push AGENT_OFFICE_URL=http://agent-office:8000 REALESTATE_LAB_URL=http://realestate-lab:8000 REALESTATE_DASHBOARD_URL=http://localhost:8080/realestate REALESTATE_NOTIFY_TIMEOUT=15 ``` - [ ] **Step 4: 두 서비스 전체 회귀 검증** Run: ```bash cd realestate-lab && python -m pytest tests/ -v cd ../agent-office && python -m pytest tests/ -v ``` Expected: 모든 테스트 통과 - [ ] **Step 5: 운영 배포 sanity check (수동, 선택)** NAS 배포 후 (`git push`로 자동), 다음 시나리오 수동 검증: 1. `PUT /api/realestate/profile` body에 `preferred_districts`/`min_match_score`/`notify_enabled` 포함하여 저장 → 200, 응답에 새 필드 반영 2. `POST /api/realestate/collect` 트리거 → 수집 후 `GET /api/realestate/announcements?region=서울` 시 `district` 필드 포함, `완료` 공고 없음 3. 매칭 점수 70점 이상이고 미알림인 매치 1건 이상 존재 시 → 텔레그램에 메시지 도착, 인라인 키보드 표시 4. 텔레그램 [🔖 북마크] 클릭 → realestate-lab의 `is_bookmarked` 토글 확인 5. `notify_enabled=false`로 변경 후 `POST /collect` → 텔레그램 푸시 미발생 - [ ] **Step 6: 커밋** ```bash git add docker-compose.yml .env.example git commit -m "chore(deploy): wire realestate↔agent-office URLs for push notify" ``` --- ## 완료 기준 - 모든 task의 테스트 통과 (`realestate-lab/tests/`, `agent-office/tests/`) - 회귀 없이 기존 endpoint 동작 유지 - 매칭 점수 모델: 35 + 10 + 15 + 15 + 25 = 100점 일관 - realestate-lab 09:00 cron이 `collect → cleanup → match → notify` 순으로 동작 - agent-office 09:15 데일리 cron 제거됨 - 텔레그램에 신규 매칭 알림 + 인라인 키보드 동작 - `match_results.notified_at` 멱등 마킹 --- ## 참고 — 후속 별도 plan - `web-ui` 프론트 자치구 5티어 입력 UI (별도 frontend plan) - 청약 가점 vs 공고별 예상 커트라인 비교 (외부 데이터 의존성 연구) - 서울 외 광역(부산 해운대구 등) 자치구 파싱 확장 - `POST /notifications/resend` (임계값 변경 후 재발송) - 자치구별 매칭 분포 대시보드 위젯