# realestate-lab Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 부동산 청약 공고 자동 수집 + 프로필 기반 자격 매칭 독립 서비스 구축, 기존 lotto-backend 청약 코드 제거 **Architecture:** 공공데이터포털(한국부동산원 청약홈 API)에서 매일 공고를 수집하여 SQLite에 저장하고, 사용자 프로필 기반으로 자격 매칭 점수를 산출하는 FastAPI 독립 서비스. stock-lab/music-lab과 동일한 Docker 컨테이너 패턴. **Tech Stack:** Python 3.12, FastAPI, SQLite, APScheduler, requests, pydantic --- ## File Structure ### New files (realestate-lab/) | File | Responsibility | |------|----------------| | `realestate-lab/app/__init__.py` | 패키지 마커 | | `realestate-lab/app/main.py` | FastAPI 앱, 라우트, APScheduler, startup/shutdown | | `realestate-lab/app/db.py` | SQLite 테이블 생성, 모든 CRUD 함수 | | `realestate-lab/app/collector.py` | 공공데이터포털 API 호출, 응답 파싱, DB 저장 | | `realestate-lab/app/matcher.py` | 프로필 기반 매칭 점수 산출 엔진 | | `realestate-lab/app/models.py` | Pydantic 요청/응답 모델 | | `realestate-lab/Dockerfile` | python:3.12-alpine 기반 컨테이너 | | `realestate-lab/requirements.txt` | 의존성 목록 | ### Modified files | File | Change | |------|--------| | `docker-compose.yml` | realestate-lab 서비스 추가 | | `nginx/default.conf` | `/api/realestate/` 프록시 라우팅 추가 | | `scripts/deploy-nas.sh` | rsync 대상에 realestate-lab 추가 | | `backend/app/main.py` | 청약 관련 모델 + 라우트 + import 제거 | | `backend/app/db.py` | realestate_complexes, subscription_items, subscription_profile 테이블 및 CRUD 제거 | --- ### Task 1: 프로젝트 스캐폴딩 (Dockerfile, requirements.txt, __init__.py) **Files:** - Create: `realestate-lab/app/__init__.py` - Create: `realestate-lab/requirements.txt` - Create: `realestate-lab/Dockerfile` - [ ] **Step 1: 디렉토리 생성 및 __init__.py** ```bash mkdir -p realestate-lab/app ``` ```python # realestate-lab/app/__init__.py ``` (빈 파일) - [ ] **Step 2: requirements.txt 작성** ``` # realestate-lab/requirements.txt requests==2.32.3 fastapi==0.115.6 uvicorn[standard]==0.30.6 apscheduler==3.10.4 pydantic>=2.0 ``` - [ ] **Step 3: Dockerfile 작성** ```dockerfile # realestate-lab/Dockerfile FROM python:3.12-alpine WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] ``` - [ ] **Step 4: Commit** ```bash git add realestate-lab/ git commit -m "feat(realestate-lab): 프로젝트 스캐폴딩 — Dockerfile, requirements, init" ``` --- ### Task 2: Pydantic 모델 정의 (models.py) **Files:** - Create: `realestate-lab/app/models.py` - [ ] **Step 1: models.py 작성** ```python # realestate-lab/app/models.py from typing import Optional, List from pydantic import BaseModel # ── 공고 ───────────────────────────────────────────────────────────────────── class AnnouncementCreate(BaseModel): house_nm: str house_secd: str = "01" house_dtl_secd: Optional[str] = None rent_secd: Optional[str] = None region_code: Optional[str] = None region_name: Optional[str] = None address: Optional[str] = None total_units: Optional[int] = None rcrit_date: Optional[str] = None receipt_start: Optional[str] = None receipt_end: Optional[str] = None spsply_start: Optional[str] = None spsply_end: Optional[str] = None gnrl_rank1_start: Optional[str] = None gnrl_rank1_end: Optional[str] = None winner_date: Optional[str] = None contract_start: Optional[str] = None contract_end: Optional[str] = None homepage_url: Optional[str] = None pblanc_url: Optional[str] = None constructor: Optional[str] = None developer: Optional[str] = None move_in_month: Optional[str] = None is_speculative_area: Optional[str] = None is_price_cap: Optional[str] = None contact: Optional[str] = None class AnnouncementUpdate(BaseModel): house_nm: Optional[str] = None house_secd: Optional[str] = None house_dtl_secd: Optional[str] = None rent_secd: Optional[str] = None region_code: Optional[str] = None region_name: Optional[str] = None address: Optional[str] = None total_units: Optional[int] = None rcrit_date: Optional[str] = None receipt_start: Optional[str] = None receipt_end: Optional[str] = None spsply_start: Optional[str] = None spsply_end: Optional[str] = None gnrl_rank1_start: Optional[str] = None gnrl_rank1_end: Optional[str] = None winner_date: Optional[str] = None contract_start: Optional[str] = None contract_end: Optional[str] = None homepage_url: Optional[str] = None pblanc_url: Optional[str] = None constructor: Optional[str] = None developer: Optional[str] = None move_in_month: Optional[str] = None is_speculative_area: Optional[str] = None is_price_cap: Optional[str] = None contact: Optional[str] = None # ── 프로필 ─────────────────────────────────────────────────────────────────── class ProfileUpdate(BaseModel): name: Optional[str] = None age: Optional[int] = None is_homeless: Optional[bool] = None is_householder: Optional[bool] = None subscription_months: Optional[int] = None subscription_amount: Optional[int] = None family_members: Optional[int] = None has_dependents: Optional[bool] = None children_count: Optional[int] = None is_newlywed: Optional[bool] = None marriage_months: Optional[int] = None has_newborn: Optional[bool] = None is_first_home: Optional[bool] = None income_level: Optional[str] = None preferred_regions: Optional[List[str]] = None preferred_types: Optional[List[str]] = None min_area: Optional[float] = None max_area: Optional[float] = None max_price: Optional[int] = None ``` - [ ] **Step 2: Commit** ```bash git add realestate-lab/app/models.py git commit -m "feat(realestate-lab): Pydantic 요청 모델 정의" ``` --- ### Task 3: DB 레이어 (db.py) **Files:** - Create: `realestate-lab/app/db.py` - [ ] **Step 1: db.py 작성 — 테이블 생성 + announcements CRUD** ```python # realestate-lab/app/db.py import json import sqlite3 import logging from typing import Dict, Any, List, Optional from datetime import date logger = logging.getLogger("realestate-lab") DB_PATH = "/app/data/realestate.db" def _conn(): c = sqlite3.connect(DB_PATH) c.row_factory = sqlite3.Row c.execute("PRAGMA journal_mode=WAL;") c.execute("PRAGMA foreign_keys=ON;") return c def init_db(): with _conn() as conn: # ── announcements ──────────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS announcements ( id INTEGER PRIMARY KEY AUTOINCREMENT, house_manage_no TEXT NOT NULL, pblanc_no TEXT NOT NULL, house_nm TEXT, house_secd TEXT, house_dtl_secd TEXT, rent_secd TEXT, region_code TEXT, region_name TEXT, address TEXT, total_units INTEGER, rcrit_date TEXT, receipt_start TEXT, receipt_end TEXT, spsply_start TEXT, spsply_end TEXT, gnrl_rank1_start TEXT, gnrl_rank1_end TEXT, winner_date TEXT, contract_start TEXT, contract_end TEXT, homepage_url TEXT, pblanc_url TEXT, constructor TEXT, developer TEXT, move_in_month TEXT, is_speculative_area TEXT, is_price_cap TEXT, contact TEXT, status TEXT NOT NULL DEFAULT '청약예정', source TEXT NOT NULL DEFAULT 'manual', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), UNIQUE(house_manage_no, pblanc_no) ); """) conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_status ON announcements(status);") conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);") # ── announcement_models ────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS announcement_models ( id INTEGER PRIMARY KEY AUTOINCREMENT, house_manage_no TEXT NOT NULL, pblanc_no TEXT NOT NULL, model_no TEXT, house_ty TEXT, supply_area REAL, general_units INTEGER DEFAULT 0, special_units INTEGER DEFAULT 0, multi_child_units INTEGER DEFAULT 0, newlywed_units INTEGER DEFAULT 0, first_life_units INTEGER DEFAULT 0, old_parent_units INTEGER DEFAULT 0, institution_units INTEGER DEFAULT 0, youth_units INTEGER DEFAULT 0, newborn_units INTEGER DEFAULT 0, top_amount INTEGER, UNIQUE(house_manage_no, pblanc_no, model_no) ); """) # ── user_profile ───────────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS user_profile ( id INTEGER PRIMARY KEY DEFAULT 1, name TEXT, age INTEGER, is_homeless INTEGER, is_householder INTEGER, subscription_months INTEGER, subscription_amount INTEGER, family_members INTEGER, has_dependents INTEGER, children_count INTEGER DEFAULT 0, is_newlywed INTEGER, marriage_months INTEGER, has_newborn INTEGER, is_first_home INTEGER, income_level TEXT, preferred_regions TEXT NOT NULL DEFAULT '[]', preferred_types TEXT NOT NULL DEFAULT '[]', min_area REAL, max_area REAL, max_price INTEGER, updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ); """) # ── match_results ──────────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS match_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, announcement_id INTEGER NOT NULL, model_id INTEGER, match_score INTEGER NOT NULL DEFAULT 0, match_reasons TEXT NOT NULL DEFAULT '[]', eligible_types TEXT NOT NULL DEFAULT '[]', is_new INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), UNIQUE(announcement_id, model_id) ); """) # ── collect_log ────────────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS collect_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), new_count INTEGER NOT NULL DEFAULT 0, total_count INTEGER NOT NULL DEFAULT 0, error TEXT ); """) # ── 상태 자동 계산 ─────────────────────────────────────────────────────────── def compute_status(receipt_start: str, receipt_end: str, winner_date: str) -> str: today = date.today().isoformat() if receipt_start and today < receipt_start: return "청약예정" if receipt_start and receipt_end and receipt_start <= today <= receipt_end: return "청약중" if receipt_end and winner_date and receipt_end < today <= winner_date: return "결과발표" if winner_date and today > winner_date: return "완료" return "청약예정" # ── announcements CRUD ─────────────────────────────────────────────────────── def _ann_row_to_dict(r) -> Dict[str, Any]: return {c: r[c] for c in r.keys()} def upsert_announcement(data: Dict[str, Any]) -> Dict[str, Any]: """공고 upsert — house_manage_no + pblanc_no 기준.""" status = compute_status( data.get("receipt_start", ""), data.get("receipt_end", ""), data.get("winner_date", ""), ) with _conn() as conn: conn.execute(""" INSERT INTO announcements ( house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd, rent_secd, region_code, region_name, address, total_units, rcrit_date, receipt_start, receipt_end, spsply_start, spsply_end, gnrl_rank1_start, gnrl_rank1_end, winner_date, contract_start, contract_end, homepage_url, pblanc_url, constructor, developer, move_in_month, is_speculative_area, is_price_cap, contact, status, source ) VALUES ( :house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd, :rent_secd, :region_code, :region_name, :address, :total_units, :rcrit_date, :receipt_start, :receipt_end, :spsply_start, :spsply_end, :gnrl_rank1_start, :gnrl_rank1_end, :winner_date, :contract_start, :contract_end, :homepage_url, :pblanc_url, :constructor, :developer, :move_in_month, :is_speculative_area, :is_price_cap, :contact, :status, :source ) ON CONFLICT(house_manage_no, pblanc_no) DO UPDATE SET house_nm=excluded.house_nm, house_secd=excluded.house_secd, house_dtl_secd=excluded.house_dtl_secd, rent_secd=excluded.rent_secd, region_code=excluded.region_code, region_name=excluded.region_name, address=excluded.address, total_units=excluded.total_units, rcrit_date=excluded.rcrit_date, receipt_start=excluded.receipt_start, receipt_end=excluded.receipt_end, spsply_start=excluded.spsply_start, spsply_end=excluded.spsply_end, gnrl_rank1_start=excluded.gnrl_rank1_start, gnrl_rank1_end=excluded.gnrl_rank1_end, winner_date=excluded.winner_date, contract_start=excluded.contract_start, contract_end=excluded.contract_end, homepage_url=excluded.homepage_url, pblanc_url=excluded.pblanc_url, constructor=excluded.constructor, developer=excluded.developer, move_in_month=excluded.move_in_month, is_speculative_area=excluded.is_speculative_area, is_price_cap=excluded.is_price_cap, contact=excluded.contact, status=excluded.status, updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now') """, {**data, "status": status}) row = conn.execute( "SELECT * FROM announcements WHERE house_manage_no = ? AND pblanc_no = ?", (data["house_manage_no"], data["pblanc_no"]), ).fetchone() return _ann_row_to_dict(row) def get_announcements( region: str = None, status: str = None, house_type: str = None, matched_only: bool = False, sort: str = "date", page: int = 1, size: int = 20, ) -> Dict[str, Any]: conditions, params = [], [] if region: conditions.append("a.region_name = ?") params.append(region) if status: conditions.append("a.status = ?") params.append(status) if house_type: conditions.append("a.house_secd = ?") params.append(house_type) join_clause = "" if matched_only: join_clause = "INNER JOIN match_results m ON m.announcement_id = a.id" where = f"WHERE {' AND '.join(conditions)}" if conditions else "" order_map = {"date": "a.rcrit_date DESC", "score": "a.id DESC", "price": "a.id ASC"} order = order_map.get(sort, "a.rcrit_date DESC") if matched_only and sort == "score": order = "m.match_score DESC" offset = (page - 1) * size with _conn() as conn: total = conn.execute( f"SELECT COUNT(*) FROM announcements a {join_clause} {where}", params ).fetchone()[0] rows = conn.execute( f"SELECT a.* FROM announcements a {join_clause} {where} ORDER BY {order} LIMIT ? OFFSET ?", params + [size, offset], ).fetchall() return { "items": [_ann_row_to_dict(r) for r in rows], "total": total, "page": page, "size": size, } def get_announcement(ann_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM announcements WHERE id = ?", (ann_id,)).fetchone() if not row: return None ann = _ann_row_to_dict(row) models = conn.execute( "SELECT * FROM announcement_models WHERE house_manage_no = ? AND pblanc_no = ?", (ann["house_manage_no"], ann["pblanc_no"]), ).fetchall() ann["models"] = [dict(m) for m in models] return ann def create_announcement(data: Dict[str, Any]) -> Dict[str, Any]: """수동 공고 등록 (house_manage_no 자동 생성).""" import uuid data["house_manage_no"] = data.get("house_manage_no", f"MANUAL-{uuid.uuid4().hex[:8]}") data["pblanc_no"] = data.get("pblanc_no", "00") data["source"] = "manual" return upsert_announcement(data) def update_announcement(ann_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: fields = {k: v for k, v in data.items() if v is not None} if not fields: return get_announcement(ann_id) # 날짜 변경 시 status 재계산 with _conn() as conn: row = conn.execute("SELECT * FROM announcements WHERE id = ?", (ann_id,)).fetchone() if not row: return None current = _ann_row_to_dict(row) merged = {**current, **fields} status = compute_status( merged.get("receipt_start", ""), merged.get("receipt_end", ""), merged.get("winner_date", ""), ) fields["status"] = status set_clauses = ", ".join(f"{k} = ?" for k in fields) set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')" conn.execute( f"UPDATE announcements SET {set_clauses} WHERE id = ?", list(fields.values()) + [ann_id], ) return get_announcement(ann_id) def delete_announcement(ann_id: int) -> bool: with _conn() as conn: # 관련 매칭 결과도 삭제 conn.execute("DELETE FROM match_results WHERE announcement_id = ?", (ann_id,)) cur = conn.execute("DELETE FROM announcements WHERE id = ?", (ann_id,)) return cur.rowcount > 0 def update_all_statuses(): """모든 진행중 공고의 status를 날짜 기반으로 재계산.""" with _conn() as conn: rows = conn.execute( "SELECT id, receipt_start, receipt_end, winner_date FROM announcements WHERE status != '완료'" ).fetchall() for r in rows: new_status = compute_status(r["receipt_start"], r["receipt_end"], r["winner_date"]) if new_status != "완료": conn.execute("UPDATE announcements SET status = ? WHERE id = ?", (new_status, r["id"])) else: conn.execute( "UPDATE announcements SET status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?", (new_status, r["id"]), ) # ── announcement_models CRUD ───────────────────────────────────────────────── def upsert_model(data: Dict[str, Any]): with _conn() as conn: conn.execute(""" INSERT INTO announcement_models ( house_manage_no, pblanc_no, model_no, house_ty, supply_area, general_units, special_units, multi_child_units, newlywed_units, first_life_units, old_parent_units, institution_units, youth_units, newborn_units, top_amount ) VALUES ( :house_manage_no, :pblanc_no, :model_no, :house_ty, :supply_area, :general_units, :special_units, :multi_child_units, :newlywed_units, :first_life_units, :old_parent_units, :institution_units, :youth_units, :newborn_units, :top_amount ) ON CONFLICT(house_manage_no, pblanc_no, model_no) DO UPDATE SET house_ty=excluded.house_ty, supply_area=excluded.supply_area, general_units=excluded.general_units, special_units=excluded.special_units, multi_child_units=excluded.multi_child_units, newlywed_units=excluded.newlywed_units, first_life_units=excluded.first_life_units, old_parent_units=excluded.old_parent_units, institution_units=excluded.institution_units, youth_units=excluded.youth_units, newborn_units=excluded.newborn_units, top_amount=excluded.top_amount """, data) # ── user_profile CRUD ──────────────────────────────────────────────────────── def _profile_row_to_dict(r) -> Dict[str, Any]: d = {} for c in r.keys(): val = r[c] if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed", "has_newborn", "is_first_home"): d[c] = bool(val) if val is not None else None elif c in ("preferred_regions", "preferred_types"): d[c] = json.loads(val) if val else [] else: d[c] = val return d def get_profile() -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM user_profile WHERE id = 1").fetchone() return _profile_row_to_dict(r) if r else None def upsert_profile(data: Dict[str, Any]) -> Dict[str, Any]: updates = {} for k, v in data.items(): if v is None: continue if isinstance(v, bool): updates[k] = 1 if v else 0 elif isinstance(v, list): updates[k] = json.dumps(v) else: updates[k] = v with _conn() as conn: existing = conn.execute("SELECT id FROM user_profile WHERE id = 1").fetchone() if existing: if updates: set_clauses = ", ".join(f"{k} = ?" for k in updates) set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')" conn.execute( f"UPDATE user_profile SET {set_clauses} WHERE id = 1", list(updates.values()), ) else: cols = ["id"] + list(updates.keys()) vals = [1] + list(updates.values()) placeholders = ", ".join("?" for _ in vals) conn.execute( f"INSERT INTO user_profile ({', '.join(cols)}) VALUES ({placeholders})", vals, ) row = conn.execute("SELECT * FROM user_profile WHERE id = 1").fetchone() return _profile_row_to_dict(row) # ── match_results CRUD ─────────────────────────────────────────────────────── def save_match_result(data: Dict[str, Any]): with _conn() as conn: conn.execute(""" INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new) VALUES (:announcement_id, :model_id, :match_score, :match_reasons, :eligible_types, 1) ON CONFLICT(announcement_id, model_id) DO UPDATE SET match_score=excluded.match_score, match_reasons=excluded.match_reasons, eligible_types=excluded.eligible_types """, { **data, "match_reasons": json.dumps(data.get("match_reasons", [])), "eligible_types": json.dumps(data.get("eligible_types", [])), }) def get_matches(page: int = 1, size: int = 20) -> Dict[str, Any]: offset = (page - 1) * size with _conn() as conn: total = conn.execute("SELECT COUNT(*) FROM match_results").fetchone()[0] rows = conn.execute(""" SELECT m.*, a.house_nm, a.region_name, a.address, a.status as ann_status, a.receipt_start, a.receipt_end, a.winner_date, a.pblanc_url FROM match_results m JOIN announcements a ON a.id = m.announcement_id ORDER BY m.is_new DESC, m.match_score DESC LIMIT ? OFFSET ? """, (size, offset)).fetchall() items = [] for r in rows: d = {c: r[c] for c in r.keys()} d["match_reasons"] = json.loads(d["match_reasons"]) if d["match_reasons"] else [] d["eligible_types"] = json.loads(d["eligible_types"]) if d["eligible_types"] else [] items.append(d) return {"items": items, "total": total, "page": page, "size": size} def mark_match_read(match_id: int) -> bool: with _conn() as conn: cur = conn.execute("UPDATE match_results SET is_new = 0 WHERE id = ?", (match_id,)) return cur.rowcount > 0 def clear_match_results(): with _conn() as conn: conn.execute("DELETE FROM match_results") # ── collect_log CRUD ───────────────────────────────────────────────────────── def save_collect_log(new_count: int, total_count: int, error: str = None): with _conn() as conn: conn.execute( "INSERT INTO collect_log (new_count, total_count, error) VALUES (?, ?, ?)", (new_count, total_count, error), ) def get_last_collect_log() -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM collect_log ORDER BY id DESC LIMIT 1").fetchone() return dict(r) if r else None # ── 대시보드 ───────────────────────────────────────────────────────────────── def get_dashboard() -> Dict[str, Any]: with _conn() as conn: active = conn.execute( "SELECT COUNT(*) FROM announcements WHERE status IN ('청약예정', '청약중')" ).fetchone()[0] new_matches = conn.execute( "SELECT COUNT(*) FROM match_results WHERE is_new = 1" ).fetchone()[0] upcoming = conn.execute(""" SELECT id, house_nm, receipt_start, receipt_end, status FROM announcements WHERE status IN ('청약예정', '청약중') ORDER BY receipt_start ASC LIMIT 5 """).fetchall() return { "active_count": active, "new_match_count": new_matches, "upcoming": [dict(r) for r in upcoming], } ``` - [ ] **Step 2: Commit** ```bash git add realestate-lab/app/db.py git commit -m "feat(realestate-lab): DB 레이어 — 테이블 생성 + 전체 CRUD" ``` --- ### Task 4: 수집기 (collector.py) **Files:** - Create: `realestate-lab/app/collector.py` - [ ] **Step 1: collector.py 작성** ```python # realestate-lab/app/collector.py import os import logging import requests from typing import List, Dict, Any from .db import upsert_announcement, upsert_model, save_collect_log logger = logging.getLogger("realestate-lab") API_BASE = "https://api.odcloud.kr/api/ApplyhomeInfoDetailSvc/v1" API_KEY = os.getenv("DATA_GO_KR_API_KEY", "") # 수집 대상 엔드포인트 (상세 + 주택형별 쌍) DETAIL_ENDPOINTS = [ ("getAPTLttotPblancDetail", "getAPTLttotPblancMdl"), ("getUrbtyOfctlLttotPblancDetail", "getUrbtyOfctlLttotPblancMdl"), ("getRemndrLttotPblancDetail", "getRemndrLttotPblancMdl"), ("getPblPvtRentLttotPblancDetail", "getPblPvtRentLttotPblancMdl"), ("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"), ] def _api_call(endpoint: str, params: dict = None) -> List[Dict[str, Any]]: """공공데이터포털 API 호출. 페이지네이션 자동 처리.""" if not API_KEY: logger.warning("DATA_GO_KR_API_KEY 미설정 — API 수집 건너뜀") return [] url = f"{API_BASE}/{endpoint}" base_params = { "serviceKey": API_KEY, "perPage": 100, "returnType": "JSON", } if params: base_params.update(params) all_data = [] page = 1 while True: base_params["page"] = page try: resp = requests.get(url, params=base_params, timeout=30) resp.raise_for_status() body = resp.json() except Exception as e: logger.error(f"API 호출 실패: {endpoint} page={page} — {e}") break data = body.get("data", []) if not data: break all_data.extend(data) total = body.get("totalCount", 0) if len(all_data) >= total: break page += 1 return all_data def _parse_apt_detail(raw: Dict[str, Any]) -> Dict[str, Any]: """APT 상세 API 응답을 announcements 스키마로 변환.""" return { "house_manage_no": str(raw.get("HOUSE_MANAGE_NO", "")), "pblanc_no": str(raw.get("PBLANC_NO", "")), "house_nm": raw.get("HOUSE_NM"), "house_secd": raw.get("HOUSE_SECD"), "house_dtl_secd": raw.get("HOUSE_DTL_SECD"), "rent_secd": raw.get("RENT_SECD"), "region_code": raw.get("SUBSCRPT_AREA_CODE"), "region_name": raw.get("SUBSCRPT_AREA_CODE_NM"), "address": raw.get("HSSPLY_ADRES"), "total_units": raw.get("TOT_SUPLY_HSHLDCO"), "rcrit_date": raw.get("RCRIT_PBLANC_DE"), "receipt_start": raw.get("RCEPT_BGNDE") or raw.get("SUBSCRPT_RCEPT_BGNDE"), "receipt_end": raw.get("RCEPT_ENDDE") or raw.get("SUBSCRPT_RCEPT_ENDDE"), "spsply_start": raw.get("SPSPLY_RCEPT_BGNDE"), "spsply_end": raw.get("SPSPLY_RCEPT_ENDDE"), "gnrl_rank1_start": raw.get("GNRL_RNK1_CRSPAREA_RCPTDE") or raw.get("GNRL_RCEPT_BGNDE"), "gnrl_rank1_end": raw.get("GNRL_RNK1_CRSPAREA_ENDDE") or raw.get("GNRL_RCEPT_ENDDE"), "winner_date": raw.get("PRZWNER_PRESNATN_DE"), "contract_start": raw.get("CNTRCT_CNCLS_BGNDE"), "contract_end": raw.get("CNTRCT_CNCLS_ENDDE"), "homepage_url": raw.get("HMPG_ADRES"), "pblanc_url": raw.get("PBLANC_URL"), "constructor": raw.get("CNSTRCT_ENTRPS_NM"), "developer": raw.get("BSNS_MBY_NM"), "move_in_month": raw.get("MVN_PREARNGE_YM"), "is_speculative_area": raw.get("SPECLT_RDN_EARTH_AT"), "is_price_cap": raw.get("PARCPRC_ULS_AT"), "contact": raw.get("MDHS_TELNO"), "source": "auto", } def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]: """주택형별 API 응답을 announcement_models 스키마로 변환.""" top = raw.get("LTTOT_TOP_AMOUNT") if isinstance(top, str): top = int(top.replace(",", "")) if top.strip() else None return { "house_manage_no": str(raw.get("HOUSE_MANAGE_NO", "")), "pblanc_no": str(raw.get("PBLANC_NO", "")), "model_no": raw.get("MODEL_NO"), "house_ty": raw.get("HOUSE_TY"), "supply_area": float(raw["SUPLY_AR"]) if raw.get("SUPLY_AR") else None, "general_units": raw.get("SUPLY_HSHLDCO", 0) or 0, "special_units": raw.get("SPSPLY_HSHLDCO", 0) or 0, "multi_child_units": raw.get("MNYCH_HSHLDCO", 0) or 0, "newlywed_units": raw.get("NWWDS_HSHLDCO", 0) or 0, "first_life_units": raw.get("LFE_FRST_HSHLDCO", 0) or 0, "old_parent_units": raw.get("OLD_PARNTS_SUPORT_HSHLDCO", 0) or 0, "institution_units": raw.get("INSTT_RECOMEND_HSHLDCO", 0) or 0, "youth_units": raw.get("YGMN_HSHLDCO", 0) or 0, "newborn_units": raw.get("NWBB_HSHLDCO", 0) or 0, "top_amount": top, } def collect_all() -> Dict[str, int]: """전체 수집 실행. 반환: {"new_count": N, "total_count": N}""" if not API_KEY: logger.warning("DATA_GO_KR_API_KEY 미설정 — 수집 건너뜀") save_collect_log(0, 0, "API 키 미설정") return {"new_count": 0, "total_count": 0} total_count = 0 new_count = 0 errors = [] for detail_ep, model_ep in DETAIL_ENDPOINTS: try: # 상세 공고 수집 details = _api_call(detail_ep) for raw in details: parsed = _parse_apt_detail(raw) if not parsed["house_manage_no"]: continue upsert_announcement(parsed) total_count += 1 # 주택형별 상세 수집 models = _api_call(model_ep) for raw in models: parsed = _parse_model(raw) if not parsed["house_manage_no"]: continue upsert_model(parsed) except Exception as e: logger.error(f"수집 에러 ({detail_ep}): {e}") errors.append(f"{detail_ep}: {str(e)}") error_msg = "; ".join(errors) if errors else None save_collect_log(new_count, total_count, error_msg) logger.info(f"수집 완료: total={total_count}, new={new_count}, errors={len(errors)}") return {"new_count": new_count, "total_count": total_count} ``` - [ ] **Step 2: Commit** ```bash git add realestate-lab/app/collector.py git commit -m "feat(realestate-lab): 공공데이터포털 API 수집기" ``` --- ### Task 5: 매칭 엔진 (matcher.py) **Files:** - Create: `realestate-lab/app/matcher.py` - [ ] **Step 1: matcher.py 작성** ```python # realestate-lab/app/matcher.py import json import logging from typing import Dict, Any, List from .db import ( get_profile, get_announcements, save_match_result, clear_match_results, _conn, ) logger = logging.getLogger("realestate-lab") def _check_eligible_types(profile: Dict[str, Any], ann: Dict[str, Any]) -> List[str]: """프로필 기반 지원 가능 공급유형 판별.""" types = [] is_homeless = profile.get("is_homeless", False) is_householder = profile.get("is_householder", False) sub_months = profile.get("subscription_months", 0) or 0 is_speculative = ann.get("is_speculative_area", "") == "Y" # 일반공급 1순위 required_months = 24 if is_speculative else 12 if is_homeless and is_householder and sub_months >= required_months: types.append("일반1순위") elif is_homeless: types.append("일반2순위") # 특별공급 — 신혼부부 if profile.get("is_newlywed") and is_homeless: types.append("특별-신혼부부") # 특별공급 — 생애최초 if profile.get("is_first_home") and is_homeless: types.append("특별-생애최초") # 특별공급 — 다자녀 children = profile.get("children_count", 0) or 0 if children >= 2 and is_homeless: types.append("특별-다자녀") # 특별공급 — 노부모부양 if profile.get("has_dependents") and is_homeless: types.append("특별-노부모부양") # 특별공급 — 청년 age = profile.get("age", 0) or 0 if 19 <= age <= 39 and is_homeless: types.append("특별-청년") # 특별공급 — 신생아 if profile.get("has_newborn") and is_homeless: types.append("특별-신생아") return types def _compute_score(profile: Dict[str, Any], ann: Dict[str, Any], models: List[Dict]) -> Dict[str, Any]: """매칭 점수 산출 (0~100).""" score = 0 reasons = [] # 1. 지역 매칭 (30점) pref_regions = profile.get("preferred_regions", []) if pref_regions and ann.get("region_name"): if ann["region_name"] in pref_regions: score += 30 reasons.append(f"지역 일치: {ann['region_name']}") # 2. 주택유형 매칭 (10점) pref_types = profile.get("preferred_types", []) type_map = {"01": "APT", "02": "오피스텔", "04": "무순위", "09": "민간사전청약", "10": "신혼희망타운"} ann_type = type_map.get(ann.get("house_secd", ""), ann.get("house_secd", "")) if pref_types and ann_type in pref_types: score += 10 reasons.append(f"유형 일치: {ann_type}") # 3. 면적 매칭 (15점) min_area = profile.get("min_area") max_area = profile.get("max_area") if models and (min_area is not None or max_area is not None): for m in models: area = m.get("supply_area", 0) or 0 in_range = True if min_area and area < min_area: in_range = False if max_area and area > max_area: in_range = False if in_range and area > 0: score += 15 reasons.append(f"면적 범위 내: {area}㎡") break # 4. 가격 매칭 (15점) max_price = profile.get("max_price") if models and max_price: for m in models: top = m.get("top_amount") if top and top <= max_price: score += 15 reasons.append(f"가격 범위 내: {top}만원") break # 5. 자격 매칭 (30점) eligible = _check_eligible_types(profile, ann) if eligible: eligibility_score = min(len(eligible) * 10, 30) score += eligibility_score reasons.append(f"지원 가능: {', '.join(eligible)}") return { "match_score": score, "match_reasons": reasons, "eligible_types": eligible, } def run_matching(): """전체 매칭 실행 — 프로필과 모든 활성 공고를 매칭.""" profile = get_profile() if not profile: logger.info("프로필 미설정 — 매칭 건너뜀") return # 기존 매칭 결과 초기화 clear_match_results() with _conn() as conn: anns = conn.execute( "SELECT * FROM announcements WHERE status IN ('청약예정', '청약중')" ).fetchall() for ann_row in anns: ann = {c: ann_row[c] for c in ann_row.keys()} models = conn.execute( "SELECT * FROM announcement_models WHERE house_manage_no = ? AND pblanc_no = ?", (ann["house_manage_no"], ann["pblanc_no"]), ).fetchall() model_list = [dict(m) for m in models] result = _compute_score(profile, ann, model_list) if result["match_score"] > 0: save_match_result({ "announcement_id": ann["id"], "model_id": None, "match_score": result["match_score"], "match_reasons": result["match_reasons"], "eligible_types": result["eligible_types"], }) logger.info("매칭 완료") ``` - [ ] **Step 2: Commit** ```bash git add realestate-lab/app/matcher.py git commit -m "feat(realestate-lab): 프로필 기반 매칭 엔진" ``` --- ### Task 6: FastAPI 앱 (main.py) **Files:** - Create: `realestate-lab/app/main.py` - [ ] **Step 1: main.py 작성** ```python # realestate-lab/app/main.py import os import logging from contextlib import asynccontextmanager from fastapi import FastAPI, Query, HTTPException from fastapi.middleware.cors import CORSMiddleware from apscheduler.schedulers.background import BackgroundScheduler from .db import ( init_db, get_announcements, get_announcement, create_announcement, update_announcement, delete_announcement, update_all_statuses, get_profile, upsert_profile, get_matches, mark_match_read, get_last_collect_log, get_dashboard, ) from .collector import collect_all from .matcher import run_matching from .models import AnnouncementCreate, AnnouncementUpdate, ProfileUpdate logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s") logger = logging.getLogger("realestate-lab") scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul")) def scheduled_collect(): """매일 09:00 — 수집 + 매칭""" logger.info("스케줄 수집 시작") collect_all() run_matching() logger.info("스케줄 수집 + 매칭 완료") def scheduled_status_update(): """매일 00:00 — 상태 갱신 + 재매칭""" logger.info("상태 갱신 시작") update_all_statuses() run_matching() logger.info("상태 갱신 + 재매칭 완료") @asynccontextmanager async def lifespan(app: FastAPI): init_db() scheduler.add_job(scheduled_collect, "cron", hour=9, minute=0, id="collect") scheduler.add_job(scheduled_status_update, "cron", hour=0, minute=0, id="status_update") scheduler.start() logger.info("realestate-lab 시작") yield scheduler.shutdown() app = FastAPI(lifespan=lifespan) _cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",") app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in _cors_origins], allow_credentials=False, allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"], allow_headers=["Content-Type"], ) @app.get("/health") def health(): return {"status": "ok"} # ── 공고 API ───────────────────────────────────────────────────────────────── @app.get("/api/realestate/announcements") def api_announcements( region: str = None, status: str = None, house_type: str = None, matched_only: bool = False, sort: str = "date", page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), ): return get_announcements(region, status, house_type, matched_only, sort, page, size) @app.get("/api/realestate/announcements/{ann_id}") def api_announcement_detail(ann_id: int): ann = get_announcement(ann_id) if not ann: raise HTTPException(status_code=404, detail="Announcement not found") return ann @app.post("/api/realestate/announcements", status_code=201) def api_announcement_create(body: AnnouncementCreate): return create_announcement(body.model_dump()) @app.put("/api/realestate/announcements/{ann_id}") def api_announcement_update(ann_id: int, body: AnnouncementUpdate): updated = update_announcement(ann_id, body.model_dump(exclude_none=True)) if not updated: raise HTTPException(status_code=404, detail="Announcement not found") return updated @app.delete("/api/realestate/announcements/{ann_id}") def api_announcement_delete(ann_id: int): if not delete_announcement(ann_id): raise HTTPException(status_code=404, detail="Announcement not found") return {"ok": True} # ── 수집 API ───────────────────────────────────────────────────────────────── @app.post("/api/realestate/collect") def api_collect(): result = collect_all() run_matching() return result @app.get("/api/realestate/collect/status") def api_collect_status(): log = get_last_collect_log() return log if log else {"collected_at": None, "new_count": 0, "total_count": 0, "error": None} # ── 프로필 API ─────────────────────────────────────────────────────────────── @app.get("/api/realestate/profile") def api_profile_get(): profile = get_profile() return profile if profile else {} @app.put("/api/realestate/profile") def api_profile_update(body: ProfileUpdate): return upsert_profile(body.model_dump(exclude_none=True)) # ── 매칭 API ───────────────────────────────────────────────────────────────── @app.get("/api/realestate/matches") def api_matches(page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100)): return get_matches(page, size) @app.post("/api/realestate/matches/refresh") def api_matches_refresh(): run_matching() return {"ok": True} @app.patch("/api/realestate/matches/{match_id}/read") def api_match_read(match_id: int): if not mark_match_read(match_id): raise HTTPException(status_code=404, detail="Match not found") return {"ok": True} # ── 대시보드 API ───────────────────────────────────────────────────────────── @app.get("/api/realestate/dashboard") def api_dashboard(): return get_dashboard() ``` - [ ] **Step 2: Commit** ```bash git add realestate-lab/app/main.py git commit -m "feat(realestate-lab): FastAPI 앱 + 스케줄러 + 전체 API 라우트" ``` --- ### Task 7: 인프라 통합 (docker-compose, nginx, deploy script) **Files:** - Modify: `docker-compose.yml` - Modify: `nginx/default.conf` - Modify: `scripts/deploy-nas.sh` - [ ] **Step 1: docker-compose.yml에 realestate-lab 서비스 추가** `blog-lab` 서비스 블록 뒤, `travel-proxy` 블록 앞에 추가: ```yaml realestate-lab: build: context: ./realestate-lab container_name: realestate-lab restart: unless-stopped ports: - "18800:8000" environment: - TZ=${TZ:-Asia/Seoul} - DATA_GO_KR_API_KEY=${DATA_GO_KR_API_KEY:-} - CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080} volumes: - ${REALESTATE_DATA_PATH:-./data/realestate}:/app/data healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] interval: 30s timeout: 5s retries: 3 ``` - [ ] **Step 2: nginx/default.conf에 realestate 프록시 추가** `/api/music/` 블록 뒤에 추가: ```nginx # realestate API location /api/realestate/ { proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_pass http://realestate-lab:8000/api/realestate/; } ``` - [ ] **Step 3: scripts/deploy-nas.sh rsync 대상에 realestate-lab 추가** 변경 전: ```bash for dir in backend travel-proxy deployer stock-lab music-lab blog-lab nginx scripts; do ``` 변경 후: ```bash for dir in backend travel-proxy deployer stock-lab music-lab blog-lab realestate-lab nginx scripts; do ``` - [ ] **Step 4: nginx frontend 컨테이너 depends_on에 realestate-lab 추가** ```yaml depends_on: - music-lab - blog-lab - realestate-lab ``` - [ ] **Step 5: Commit** ```bash git add docker-compose.yml nginx/default.conf scripts/deploy-nas.sh git commit -m "infra: realestate-lab Docker/Nginx/배포 스크립트 통합" ``` --- ### Task 8: lotto-backend에서 청약 코드 제거 **Files:** - Modify: `backend/app/main.py` — 라인 22~27 import, 881~957 (realestate), 960~1064 (subscription) - Modify: `backend/app/db.py` — 라인 184~214 (realestate_complexes 테이블), 216~300 (subscription 테이블), 837~963 (realestate CRUD), 966~1327 (subscription CRUD) - [ ] **Step 1: backend/app/main.py에서 청약 import 제거** 라인 22~27 변경 — `# realestate` 와 `# subscription` import 블록 삭제: 변경 전: ```python # realestate get_all_complexes, get_complex, create_complex, update_complex, delete_complex, # subscription get_all_subscription_items, create_subscription_item, update_subscription_item, delete_subscription_item, get_subscription_profile, upsert_subscription_profile, ``` 변경 후: (해당 6줄 삭제) - [ ] **Step 2: backend/app/main.py에서 RealEstate API 섹션 삭제 (라인 881~957)** `# ── RealEstate API ──` 부터 `return {"ok": True}` (957번 라인)까지 전체 삭제: - `VALID_STATUSES`, `VALID_PRIORITIES` 상수 - `ComplexCreate`, `ComplexUpdate` 모델 - `api_realestate_list`, `api_realestate_create`, `api_realestate_update`, `api_realestate_delete` 라우트 - [ ] **Step 3: backend/app/main.py에서 Subscription API 섹션 삭제 (라인 960~1064)** `# ── Subscription API ──` 부터 파일 끝 `upsert_subscription_profile` 호출까지 전체 삭제: - `SubscriptionItemCreate`, `SubscriptionItemUpdate`, `SubscriptionProfile` 모델 - `api_subscription_list`, `api_subscription_create`, `api_subscription_update`, `api_subscription_delete` 라우트 - `api_subscription_profile_get`, `api_subscription_profile_put` 라우트 - [ ] **Step 4: backend/app/db.py에서 realestate_complexes 테이블 생성 삭제 (라인 184~214)** `# ── realestate_complexes 테이블 ──` 블록 전체 삭제 (CREATE TABLE + CREATE INDEX) - [ ] **Step 5: backend/app/db.py에서 subscription_items 테이블 생성 삭제 (라인 216~252)** `# ── subscription_items 테이블 ──` 블록 전체 삭제 (CREATE TABLE + CREATE INDEX) - [ ] **Step 6: backend/app/db.py에서 subscription_profile 테이블 생성 삭제 (라인 282~300)** `# ── subscription_profile 테이블 ──` 블록 전체 삭제 - [ ] **Step 7: backend/app/db.py에서 realestate_complexes CRUD 삭제 (라인 837~963)** `# ── realestate_complexes CRUD ──` 부터 `delete_complex` 함수 끝까지 전체 삭제: - `_complex_row_to_dict`, `get_all_complexes`, `get_complex`, `create_complex`, `update_complex`, `delete_complex` - [ ] **Step 8: backend/app/db.py에서 subscription CRUD 삭제 (라인 966~1327)** `# ── subscription_items CRUD ──` 부터 `upsert_subscription_profile` 함수 끝까지 전체 삭제: - `_SUB_ITEM_FIELD_MAP`, `_sub_item_row_to_dict`, `get_all_subscription_items`, `create_subscription_item`, `update_subscription_item`, `delete_subscription_item` - `_profile_row_to_dict`, `get_subscription_profile`, `upsert_subscription_profile` - [ ] **Step 9: Commit** ```bash git add backend/app/main.py backend/app/db.py git commit -m "refactor(lotto-backend): 청약 관련 코드 완전 제거 — realestate-lab으로 이관" ``` --- ### Task 9: CLAUDE.md 업데이트 **Files:** - Modify: `CLAUDE.md` - [ ] **Step 1: CLAUDE.md 서비스 목록에 realestate-lab 추가** Docker 서비스 & 포트 테이블에 추가: ``` | `realestate-lab` | 18800 | 부동산 청약 자동 수집·매칭 API | ``` Nginx 라우팅 규칙 테이블에 추가: ``` | `/api/realestate/` | `realestate-lab:8000` | 부동산 청약 API | ``` 서비스별 핵심 정보 섹션에 realestate-lab 추가. lotto-lab 테이블 목록에서 `realestate_complexes`, `subscription_items`, `subscription_profile` 참조 삭제 (해당 테이블이 lotto.db 테이블 목록에 있는 경우). - [ ] **Step 2: Commit** ```bash git add CLAUDE.md git commit -m "docs: CLAUDE.md에 realestate-lab 서비스 정보 추가" ``` --- ### Task 10: 로컬 빌드 검증 - [ ] **Step 1: Docker 빌드 테스트** ```bash cd C:/Users/jaeoh/Desktop/workspace/web-backend docker compose build realestate-lab ``` Expected: 빌드 성공 - [ ] **Step 2: 서비스 단독 기동 테스트** ```bash docker compose up -d realestate-lab ``` - [ ] **Step 3: 헬스체크** ```bash curl http://localhost:18800/health ``` Expected: `{"status": "ok"}` - [ ] **Step 4: 프로필 API 테스트** ```bash curl -X PUT http://localhost:18800/api/realestate/profile \ -H "Content-Type: application/json" \ -d '{"name":"test","age":30,"is_homeless":true,"preferred_regions":["서울"]}' ``` Expected: 프로필 JSON 응답 - [ ] **Step 5: 수동 수집 테스트** ```bash curl -X POST http://localhost:18800/api/realestate/collect ``` Expected: `{"new_count": N, "total_count": N}` (API 키 설정 시 실제 데이터 수집) - [ ] **Step 6: 대시보드 테스트** ```bash curl http://localhost:18800/api/realestate/dashboard ``` Expected: `{"active_count": N, "new_match_count": N, "upcoming": [...]}` - [ ] **Step 7: lotto-backend 정상 동작 확인** ```bash curl http://localhost:18000/health ``` Expected: `{"status":"ok"}` (청약 코드 제거 후에도 정상)