import os import logging import requests from typing import List, Dict, Any from .db import upsert_announcement, upsert_model, save_collect_log logger = logging.getLogger("realestate-lab") API_BASE = "https://api.odcloud.kr/api/ApplyhomeInfoDetailSvc/v1" API_KEY = os.getenv("DATA_GO_KR_API_KEY", "") # 5 detail+model endpoint pairs DETAIL_ENDPOINTS = [ ("getAPTLttotPblancDetail", "getAPTLttotPblancMdl"), ("getUrbtyOfctlLttotPblancDetail", "getUrbtyOfctlLttotPblancMdl"), ("getRemndrLttotPblancDetail", "getRemndrLttotPblancMdl"), ("getPblPvtRentLttotPblancDetail", "getPblPvtRentLttotPblancMdl"), ("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"), ] def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]: """페이지네이션 처리하여 API 전체 데이터를 반환한다.""" if not API_KEY: logger.warning("DATA_GO_KR_API_KEY 미설정 — API 호출 건너뜀") return [] base_params = { "serviceKey": API_KEY, "perPage": 100, "returnType": "JSON", } if params: base_params.update(params) url = f"{API_BASE}/{endpoint}" all_data: List[Dict] = [] page = 1 while True: base_params["page"] = page try: resp = requests.get(url, params=base_params, timeout=30) resp.raise_for_status() body = resp.json() except requests.RequestException as e: logger.error("API 호출 실패 [%s page=%d]: %s", endpoint, page, e) break except ValueError as e: logger.error("JSON 파싱 실패 [%s page=%d]: %s", endpoint, page, e) break data = body.get("data", []) total_count = body.get("totalCount", 0) all_data.extend(data) if len(all_data) >= total_count: break page += 1 logger.info("[%s] %d건 수집", endpoint, len(all_data)) return all_data def _parse_apt_detail(raw: Dict[str, Any]) -> Dict[str, Any]: """API 응답 필드를 DB 스키마에 맞게 매핑한다.""" return { "house_manage_no": raw.get("HOUSE_MANAGE_NO", ""), "pblanc_no": raw.get("PBLANC_NO", ""), "house_nm": raw.get("HOUSE_NM"), "house_secd": raw.get("HOUSE_SECD"), "house_dtl_secd": raw.get("HOUSE_DTL_SECD"), "rent_secd": raw.get("RENT_SECD"), "region_code": raw.get("SUBSCRPT_AREA_CODE"), "region_name": raw.get("SUBSCRPT_AREA_CODE_NM"), "address": raw.get("HSSPLY_ADRES"), "total_units": raw.get("TOT_SUPLY_HSHLDCO"), "rcrit_date": raw.get("RCRIT_PBLANC_DE"), "receipt_start": raw.get("RCEPT_BGNDE") or raw.get("SUBSCRPT_RCEPT_BGNDE"), "receipt_end": raw.get("RCEPT_ENDDE") or raw.get("SUBSCRPT_RCEPT_ENDDE"), "spsply_start": raw.get("SPSPLY_RCEPT_BGNDE"), "spsply_end": raw.get("SPSPLY_RCEPT_ENDDE"), "gnrl_rank1_start": raw.get("GNRL_RNK1_CRSPAREA_RCPTDE") or raw.get("GNRL_RCEPT_BGNDE"), "gnrl_rank1_end": raw.get("GNRL_RNK1_CRSPAREA_ENDDE") or raw.get("GNRL_RCEPT_ENDDE"), "winner_date": raw.get("PRZWNER_PRESNATN_DE"), "contract_start": raw.get("CNTRCT_CNCLS_BGNDE"), "contract_end": raw.get("CNTRCT_CNCLS_ENDDE"), "homepage_url": raw.get("HMPG_ADRES"), "pblanc_url": raw.get("PBLANC_URL"), "constructor": raw.get("CNSTRCT_ENTRPS_NM"), "developer": raw.get("BSNS_MBY_NM"), "move_in_month": raw.get("MVN_PREARNGE_YM"), "is_speculative_area": raw.get("SPECLT_RDN_EARTH_AT"), "is_price_cap": raw.get("PARCPRC_ULS_AT"), "contact": raw.get("MDHS_TELNO"), "source": "auto", } def _parse_top_amount(val: Any) -> int | None: """최고 금액 문자열에서 콤마를 제거하고 정수로 변환한다.""" if val is None: return None try: return int(str(val).replace(",", "")) except (ValueError, TypeError): return None def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]: """모델 API 응답 필드를 DB 스키마에 맞게 매핑한다.""" return { "house_manage_no": raw.get("HOUSE_MANAGE_NO", ""), "pblanc_no": raw.get("PBLANC_NO", ""), "model_no": raw.get("MODEL_NO"), "house_ty": raw.get("HOUSE_TY"), "supply_area": float(raw["SUPLY_AR"]) if raw.get("SUPLY_AR") is not None else None, "general_units": raw.get("SUPLY_HSHLDCO") or 0, "special_units": raw.get("SPSPLY_HSHLDCO") or 0, "multi_child_units": raw.get("MNYCH_HSHLDCO") or 0, "newlywed_units": raw.get("NWWDS_HSHLDCO") or 0, "first_life_units": raw.get("LFE_FRST_HSHLDCO") or 0, "old_parent_units": raw.get("OLD_PARNTS_SUPORT_HSHLDCO") or 0, "institution_units": raw.get("INSTT_RECOMEND_HSHLDCO") or 0, "youth_units": raw.get("YGMN_HSHLDCO") or 0, "newborn_units": raw.get("NWBB_HSHLDCO") or 0, "top_amount": _parse_top_amount(raw.get("LTTOT_TOP_AMOUNT")), } def collect_all() -> Dict[str, Any]: """모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다.""" if not API_KEY: logger.warning("API 키 미설정 — 수집 중단") save_collect_log(0, 0, "API 키 미설정") return {"new_count": 0, "total_count": 0} total_count = 0 new_count = 0 for detail_ep, model_ep in DETAIL_ENDPOINTS: # 공고 상세 수집 detail_rows = _api_call(detail_ep) for raw in detail_rows: try: parsed = _parse_apt_detail(raw) upsert_announcement(parsed) total_count += 1 except Exception as e: logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e) # 모델(평형) 수집 model_rows = _api_call(model_ep) for raw in model_rows: try: parsed = _parse_model(raw) upsert_model(parsed) except Exception as e: logger.error("모델 upsert 실패 [%s]: %s", model_ep, e) new_count = total_count # 첫 수집 시 전부 신규로 기록 save_collect_log(new_count, total_count) logger.info("수집 완료: new=%d, total=%d", new_count, total_count) return {"new_count": new_count, "total_count": total_count}