Files
web-page-backend/realestate-lab/app/collector.py
gahusb 9dd517e82a feat(realestate-collector): 30-day window + district extraction + completed skip
- Add _extract_district() helper with DISTRICT_PATTERN regex (서울 only)
- collect_all() now passes RCRIT_PBLANC_DE_FROM param (30-day window) to all detail endpoints
- collect_all() skips announcements where compute_status() returns '완료'
- collect_all() stamps district on each parsed announcement before upsert
- upsert_announcement(): add district to INSERT/VALUES/ON CONFLICT UPDATE; data.setdefault('district', None)
- ANNOUNCEMENT_COLUMNS: add 'district' (closes deferred gap from Task 2 review)
- 9 new tests in realestate-lab/tests/test_collector.py (6 unit + 3 integration)
- Full suite: 22 passed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 08:28:10 +09:00

209 lines
8.1 KiB
Python

import os
import re
import logging
from datetime import date, timedelta
import requests
from typing import List, Dict, Any
from .db import upsert_announcement, upsert_model, save_collect_log, compute_status
logger = logging.getLogger("realestate-lab")
API_BASE = "https://api.odcloud.kr/api/ApplyhomeInfoDetailSvc/v1"
API_KEY = os.getenv("DATA_GO_KR_API_KEY", "")
# 5 detail+model endpoint pairs
DETAIL_ENDPOINTS = [
("getAPTLttotPblancDetail", "getAPTLttotPblancMdl"),
("getUrbtyOfctlLttotPblancDetail", "getUrbtyOfctlLttotPblancMdl"),
("getRemndrLttotPblancDetail", "getRemndrLttotPblancMdl"),
("getPblPvtRentLttotPblancDetail", "getPblPvtRentLttotPblancMdl"),
("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"),
]
DISTRICT_PATTERN = re.compile(r"(?:서울특별시|서울시|서울)\s+(\S+?(?:구|군))")
def _extract_district(parsed: Dict[str, Any]) -> str | None:
"""파싱된 공고에서 자치구를 추출. 서울 외 지역·실패 시 None."""
for src in (parsed.get("address"), parsed.get("region_name")):
if not src:
continue
m = DISTRICT_PATTERN.search(src)
if m:
return m.group(1)
return None
def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]:
"""페이지네이션 처리하여 API 전체 데이터를 반환한다."""
if not API_KEY:
logger.warning("DATA_GO_KR_API_KEY 미설정 — API 호출 건너뜀")
return []
base_params = {
"serviceKey": API_KEY,
"perPage": 100,
"returnType": "JSON",
}
if params:
base_params.update(params)
url = f"{API_BASE}/{endpoint}"
all_data: List[Dict] = []
page = 1
while True:
base_params["page"] = page
try:
resp = requests.get(url, params=base_params, timeout=30)
resp.raise_for_status()
body = resp.json()
except requests.RequestException as e:
logger.error("API 호출 실패 [%s page=%d]: %s", endpoint, page, e)
break
except ValueError as e:
logger.error("JSON 파싱 실패 [%s page=%d]: %s", endpoint, page, e)
break
data = body.get("data", [])
total_count = body.get("totalCount", 0)
all_data.extend(data)
if len(all_data) >= total_count:
break
page += 1
logger.info("[%s] %d건 수집", endpoint, len(all_data))
return all_data
def _parse_apt_detail(raw: Dict[str, Any]) -> Dict[str, Any]:
"""API 응답 필드를 DB 스키마에 맞게 매핑한다."""
return {
"house_manage_no": raw.get("HOUSE_MANAGE_NO", ""),
"pblanc_no": raw.get("PBLANC_NO", ""),
"house_nm": raw.get("HOUSE_NM"),
"house_secd": raw.get("HOUSE_SECD"),
"house_dtl_secd": raw.get("HOUSE_DTL_SECD"),
"rent_secd": raw.get("RENT_SECD"),
"region_code": raw.get("SUBSCRPT_AREA_CODE"),
"region_name": raw.get("SUBSCRPT_AREA_CODE_NM"),
"address": raw.get("HSSPLY_ADRES"),
"total_units": raw.get("TOT_SUPLY_HSHLDCO"),
"rcrit_date": raw.get("RCRIT_PBLANC_DE"),
"receipt_start": raw.get("RCEPT_BGNDE") or raw.get("SUBSCRPT_RCEPT_BGNDE"),
"receipt_end": raw.get("RCEPT_ENDDE") or raw.get("SUBSCRPT_RCEPT_ENDDE"),
"spsply_start": raw.get("SPSPLY_RCEPT_BGNDE"),
"spsply_end": raw.get("SPSPLY_RCEPT_ENDDE"),
"gnrl_rank1_start": raw.get("GNRL_RNK1_CRSPAREA_RCPTDE") or raw.get("GNRL_RCEPT_BGNDE"),
"gnrl_rank1_end": raw.get("GNRL_RNK1_CRSPAREA_ENDDE") or raw.get("GNRL_RCEPT_ENDDE"),
"winner_date": raw.get("PRZWNER_PRESNATN_DE"),
"contract_start": raw.get("CNTRCT_CNCLS_BGNDE"),
"contract_end": raw.get("CNTRCT_CNCLS_ENDDE"),
"homepage_url": raw.get("HMPG_ADRES"),
"pblanc_url": raw.get("PBLANC_URL"),
"constructor": raw.get("CNSTRCT_ENTRPS_NM"),
"developer": raw.get("BSNS_MBY_NM"),
"move_in_month": raw.get("MVN_PREARNGE_YM"),
"is_speculative_area": raw.get("SPECLT_RDN_EARTH_AT"),
"is_price_cap": raw.get("PARCPRC_ULS_AT"),
"contact": raw.get("MDHS_TELNO"),
"source": "auto",
}
def _parse_top_amount(val: Any) -> int | None:
"""최고 금액 문자열에서 콤마를 제거하고 정수로 변환한다."""
if val is None:
return None
try:
return int(str(val).replace(",", ""))
except (ValueError, TypeError):
return None
def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]:
"""모델 API 응답 필드를 DB 스키마에 맞게 매핑한다."""
return {
"house_manage_no": raw.get("HOUSE_MANAGE_NO", ""),
"pblanc_no": raw.get("PBLANC_NO", ""),
"model_no": raw.get("MODEL_NO"),
"house_ty": raw.get("HOUSE_TY"),
"supply_area": float(raw["SUPLY_AR"]) if raw.get("SUPLY_AR") is not None else None,
"general_units": raw.get("SUPLY_HSHLDCO") or 0,
"special_units": raw.get("SPSPLY_HSHLDCO") or 0,
"multi_child_units": raw.get("MNYCH_HSHLDCO") or 0,
"newlywed_units": raw.get("NWWDS_HSHLDCO") or 0,
"first_life_units": raw.get("LFE_FRST_HSHLDCO") or 0,
"old_parent_units": raw.get("OLD_PARNTS_SUPORT_HSHLDCO") or 0,
"institution_units": raw.get("INSTT_RECOMEND_HSHLDCO") or 0,
"youth_units": raw.get("YGMN_HSHLDCO") or 0,
"newborn_units": raw.get("NWBB_HSHLDCO") or 0,
"top_amount": _parse_top_amount(raw.get("LTTOT_TOP_AMOUNT")),
}
def collect_all() -> Dict[str, Any]:
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.
모집공고일 30일 이전 데이터는 API 파라미터로 사전 좁힘.
status='완료'로 판정되는 응답은 저장하지 않음.
"""
if not API_KEY:
logger.warning("API 키 미설정 — 수집 중단")
save_collect_log(0, 0, "API 키 미설정")
return {"new_count": 0, "total_count": 0}
today = date.today()
date_from = (today - timedelta(days=30)).strftime("%Y%m%d")
total_count = 0
new_count = 0
skipped_completed = 0
for detail_ep, model_ep in DETAIL_ENDPOINTS:
# 공고 상세 수집 — API에 모집공고일 윈도우 파라미터 전달
# 일부 엔드포인트는 파라미터 미지원일 수 있어 무시되지만 응답에 영향 없음
detail_rows = _api_call(detail_ep, params={"RCRIT_PBLANC_DE_FROM": date_from})
for raw in detail_rows:
try:
parsed = _parse_apt_detail(raw)
parsed["district"] = _extract_district(parsed)
# 일정 정보가 하나도 없는 공고는 건너뜀 (기존)
has_dates = any(parsed.get(f) for f in (
"receipt_start", "receipt_end", "spsply_start",
"gnrl_rank1_start", "winner_date", "contract_start",
))
if not has_dates:
continue
# status='완료'면 저장하지 않음 (자원 절감)
status = compute_status(
parsed.get("receipt_start", "") or "",
parsed.get("receipt_end", "") or "",
parsed.get("winner_date", "") or "",
)
if status == "완료":
skipped_completed += 1
continue
_, is_new = upsert_announcement(parsed)
total_count += 1
if is_new:
new_count += 1
except Exception as e:
logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e)
# 모델(평형) 수집
model_rows = _api_call(model_ep)
for raw in model_rows:
try:
parsed = _parse_model(raw)
upsert_model(parsed)
except Exception as e:
logger.error("모델 upsert 실패 [%s]: %s", model_ep, e)
save_collect_log(new_count, total_count)
logger.info("수집 완료: new=%d, total=%d, skipped_completed=%d", new_count, total_count, skipped_completed)
return {"new_count": new_count, "total_count": total_count}