From 4cb6296a3d0a361a781e4dc62130e14ea0583ef5 Mon Sep 17 00:00:00 2001 From: gahusb Date: Mon, 6 Apr 2026 08:29:19 +0900 Subject: [PATCH] =?UTF-8?q?feat(realestate-lab):=20=EA=B3=B5=EA=B3=B5?= =?UTF-8?q?=EB=8D=B0=EC=9D=B4=ED=84=B0=ED=8F=AC=ED=84=B8=20API=20=EC=88=98?= =?UTF-8?q?=EC=A7=91=EA=B8=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- realestate-lab/app/collector.py | 165 ++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 realestate-lab/app/collector.py diff --git a/realestate-lab/app/collector.py b/realestate-lab/app/collector.py new file mode 100644 index 0000000..b686355 --- /dev/null +++ b/realestate-lab/app/collector.py @@ -0,0 +1,165 @@ +import os +import logging +import requests +from typing import List, Dict, Any + +from .db import upsert_announcement, upsert_model, save_collect_log + +logger = logging.getLogger("realestate-lab") + +API_BASE = "https://api.odcloud.kr/api/ApplyhomeInfoDetailSvc/v1" +API_KEY = os.getenv("DATA_GO_KR_API_KEY", "") + +# 5 detail+model endpoint pairs +DETAIL_ENDPOINTS = [ + ("getAPTLttotPblancDetail", "getAPTLttotPblancMdl"), + ("getUrbtyOfctlLttotPblancDetail", "getUrbtyOfctlLttotPblancMdl"), + ("getRemndrLttotPblancDetail", "getRemndrLttotPblancMdl"), + ("getPblPvtRentLttotPblancDetail", "getPblPvtRentLttotPblancMdl"), + ("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"), +] + + +def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]: + """페이지네이션 처리하여 API 전체 데이터를 반환한다.""" + if not API_KEY: + logger.warning("DATA_GO_KR_API_KEY 미설정 — API 호출 건너뜀") + return [] + + base_params = { + "serviceKey": API_KEY, + "perPage": 100, + "returnType": "JSON", + } + if params: + base_params.update(params) + + url = f"{API_BASE}/{endpoint}" + all_data: List[Dict] = [] + page = 1 + + while True: + base_params["page"] = page + try: + resp = requests.get(url, params=base_params, timeout=30) + resp.raise_for_status() + body = resp.json() + except requests.RequestException as e: + logger.error("API 호출 실패 [%s page=%d]: %s", endpoint, page, e) + break + except ValueError as e: + logger.error("JSON 파싱 실패 [%s page=%d]: %s", endpoint, page, e) + break + + data = body.get("data", []) + total_count = body.get("totalCount", 0) + all_data.extend(data) + + if len(all_data) >= total_count: + break + page += 1 + + logger.info("[%s] %d건 수집", endpoint, len(all_data)) + return all_data + + +def _parse_apt_detail(raw: Dict[str, Any]) -> Dict[str, Any]: + """API 응답 필드를 DB 스키마에 맞게 매핑한다.""" + return { + "house_manage_no": raw.get("HOUSE_MANAGE_NO", ""), + "pblanc_no": raw.get("PBLANC_NO", ""), + "house_nm": raw.get("HOUSE_NM"), + "house_secd": raw.get("HOUSE_SECD"), + "house_dtl_secd": raw.get("HOUSE_DTL_SECD"), + "rent_secd": raw.get("RENT_SECD"), + "region_code": raw.get("SUBSCRPT_AREA_CODE"), + "region_name": raw.get("SUBSCRPT_AREA_CODE_NM"), + "address": raw.get("HSSPLY_ADRES"), + "total_units": raw.get("TOT_SUPLY_HSHLDCO"), + "rcrit_date": raw.get("RCRIT_PBLANC_DE"), + "receipt_start": raw.get("RCEPT_BGNDE") or raw.get("SUBSCRPT_RCEPT_BGNDE"), + "receipt_end": raw.get("RCEPT_ENDDE") or raw.get("SUBSCRPT_RCEPT_ENDDE"), + "spsply_start": raw.get("SPSPLY_RCEPT_BGNDE"), + "spsply_end": raw.get("SPSPLY_RCEPT_ENDDE"), + "gnrl_rank1_start": raw.get("GNRL_RNK1_CRSPAREA_RCPTDE") or raw.get("GNRL_RCEPT_BGNDE"), + "gnrl_rank1_end": raw.get("GNRL_RNK1_CRSPAREA_ENDDE") or raw.get("GNRL_RCEPT_ENDDE"), + "winner_date": raw.get("PRZWNER_PRESNATN_DE"), + "contract_start": raw.get("CNTRCT_CNCLS_BGNDE"), + "contract_end": raw.get("CNTRCT_CNCLS_ENDDE"), + "homepage_url": raw.get("HMPG_ADRES"), + "pblanc_url": raw.get("PBLANC_URL"), + "constructor": raw.get("CNSTRCT_ENTRPS_NM"), + "developer": raw.get("BSNS_MBY_NM"), + "move_in_month": raw.get("MVN_PREARNGE_YM"), + "is_speculative_area": raw.get("SPECLT_RDN_EARTH_AT"), + "is_price_cap": raw.get("PARCPRC_ULS_AT"), + "contact": raw.get("MDHS_TELNO"), + "source": "auto", + } + + +def _parse_top_amount(val: Any) -> int | None: + """최고 금액 문자열에서 콤마를 제거하고 정수로 변환한다.""" + if val is None: + return None + try: + return int(str(val).replace(",", "")) + except (ValueError, TypeError): + return None + + +def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]: + """모델 API 응답 필드를 DB 스키마에 맞게 매핑한다.""" + return { + "house_manage_no": raw.get("HOUSE_MANAGE_NO", ""), + "pblanc_no": raw.get("PBLANC_NO", ""), + "model_no": raw.get("MODEL_NO"), + "house_ty": raw.get("HOUSE_TY"), + "supply_area": float(raw["SUPLY_AR"]) if raw.get("SUPLY_AR") is not None else None, + "general_units": raw.get("SUPLY_HSHLDCO") or 0, + "special_units": raw.get("SPSPLY_HSHLDCO") or 0, + "multi_child_units": raw.get("MNYCH_HSHLDCO") or 0, + "newlywed_units": raw.get("NWWDS_HSHLDCO") or 0, + "first_life_units": raw.get("LFE_FRST_HSHLDCO") or 0, + "old_parent_units": raw.get("OLD_PARNTS_SUPORT_HSHLDCO") or 0, + "institution_units": raw.get("INSTT_RECOMEND_HSHLDCO") or 0, + "youth_units": raw.get("YGMN_HSHLDCO") or 0, + "newborn_units": raw.get("NWBB_HSHLDCO") or 0, + "top_amount": _parse_top_amount(raw.get("LTTOT_TOP_AMOUNT")), + } + + +def collect_all() -> Dict[str, Any]: + """모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.""" + if not API_KEY: + logger.warning("API 키 미설정 — 수집 중단") + save_collect_log(0, 0, "API 키 미설정") + return {"new_count": 0, "total_count": 0} + + total_count = 0 + new_count = 0 + + for detail_ep, model_ep in DETAIL_ENDPOINTS: + # 공고 상세 수집 + detail_rows = _api_call(detail_ep) + for raw in detail_rows: + try: + parsed = _parse_apt_detail(raw) + upsert_announcement(parsed) + total_count += 1 + except Exception as e: + logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e) + + # 모델(평형) 수집 + model_rows = _api_call(model_ep) + for raw in model_rows: + try: + parsed = _parse_model(raw) + upsert_model(parsed) + except Exception as e: + logger.error("모델 upsert 실패 [%s]: %s", model_ep, e) + + new_count = total_count # 첫 수집 시 전부 신규로 기록 + save_collect_log(new_count, total_count) + logger.info("수집 완료: new=%d, total=%d", new_count, total_count) + return {"new_count": new_count, "total_count": total_count}