Merge branch 'worktree-agent-a395667a'

This commit is contained in:
2026-04-06 08:29:55 +09:00

View File

@@ -0,0 +1,165 @@
import os
import logging
import requests
from typing import List, Dict, Any
from .db import upsert_announcement, upsert_model, save_collect_log
logger = logging.getLogger("realestate-lab")
API_BASE = "https://api.odcloud.kr/api/ApplyhomeInfoDetailSvc/v1"
API_KEY = os.getenv("DATA_GO_KR_API_KEY", "")
# 5 detail+model endpoint pairs
DETAIL_ENDPOINTS = [
("getAPTLttotPblancDetail", "getAPTLttotPblancMdl"),
("getUrbtyOfctlLttotPblancDetail", "getUrbtyOfctlLttotPblancMdl"),
("getRemndrLttotPblancDetail", "getRemndrLttotPblancMdl"),
("getPblPvtRentLttotPblancDetail", "getPblPvtRentLttotPblancMdl"),
("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"),
]
def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]:
"""페이지네이션 처리하여 API 전체 데이터를 반환한다."""
if not API_KEY:
logger.warning("DATA_GO_KR_API_KEY 미설정 — API 호출 건너뜀")
return []
base_params = {
"serviceKey": API_KEY,
"perPage": 100,
"returnType": "JSON",
}
if params:
base_params.update(params)
url = f"{API_BASE}/{endpoint}"
all_data: List[Dict] = []
page = 1
while True:
base_params["page"] = page
try:
resp = requests.get(url, params=base_params, timeout=30)
resp.raise_for_status()
body = resp.json()
except requests.RequestException as e:
logger.error("API 호출 실패 [%s page=%d]: %s", endpoint, page, e)
break
except ValueError as e:
logger.error("JSON 파싱 실패 [%s page=%d]: %s", endpoint, page, e)
break
data = body.get("data", [])
total_count = body.get("totalCount", 0)
all_data.extend(data)
if len(all_data) >= total_count:
break
page += 1
logger.info("[%s] %d건 수집", endpoint, len(all_data))
return all_data
def _parse_apt_detail(raw: Dict[str, Any]) -> Dict[str, Any]:
"""API 응답 필드를 DB 스키마에 맞게 매핑한다."""
return {
"house_manage_no": raw.get("HOUSE_MANAGE_NO", ""),
"pblanc_no": raw.get("PBLANC_NO", ""),
"house_nm": raw.get("HOUSE_NM"),
"house_secd": raw.get("HOUSE_SECD"),
"house_dtl_secd": raw.get("HOUSE_DTL_SECD"),
"rent_secd": raw.get("RENT_SECD"),
"region_code": raw.get("SUBSCRPT_AREA_CODE"),
"region_name": raw.get("SUBSCRPT_AREA_CODE_NM"),
"address": raw.get("HSSPLY_ADRES"),
"total_units": raw.get("TOT_SUPLY_HSHLDCO"),
"rcrit_date": raw.get("RCRIT_PBLANC_DE"),
"receipt_start": raw.get("RCEPT_BGNDE") or raw.get("SUBSCRPT_RCEPT_BGNDE"),
"receipt_end": raw.get("RCEPT_ENDDE") or raw.get("SUBSCRPT_RCEPT_ENDDE"),
"spsply_start": raw.get("SPSPLY_RCEPT_BGNDE"),
"spsply_end": raw.get("SPSPLY_RCEPT_ENDDE"),
"gnrl_rank1_start": raw.get("GNRL_RNK1_CRSPAREA_RCPTDE") or raw.get("GNRL_RCEPT_BGNDE"),
"gnrl_rank1_end": raw.get("GNRL_RNK1_CRSPAREA_ENDDE") or raw.get("GNRL_RCEPT_ENDDE"),
"winner_date": raw.get("PRZWNER_PRESNATN_DE"),
"contract_start": raw.get("CNTRCT_CNCLS_BGNDE"),
"contract_end": raw.get("CNTRCT_CNCLS_ENDDE"),
"homepage_url": raw.get("HMPG_ADRES"),
"pblanc_url": raw.get("PBLANC_URL"),
"constructor": raw.get("CNSTRCT_ENTRPS_NM"),
"developer": raw.get("BSNS_MBY_NM"),
"move_in_month": raw.get("MVN_PREARNGE_YM"),
"is_speculative_area": raw.get("SPECLT_RDN_EARTH_AT"),
"is_price_cap": raw.get("PARCPRC_ULS_AT"),
"contact": raw.get("MDHS_TELNO"),
"source": "auto",
}
def _parse_top_amount(val: Any) -> int | None:
"""최고 금액 문자열에서 콤마를 제거하고 정수로 변환한다."""
if val is None:
return None
try:
return int(str(val).replace(",", ""))
except (ValueError, TypeError):
return None
def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]:
"""모델 API 응답 필드를 DB 스키마에 맞게 매핑한다."""
return {
"house_manage_no": raw.get("HOUSE_MANAGE_NO", ""),
"pblanc_no": raw.get("PBLANC_NO", ""),
"model_no": raw.get("MODEL_NO"),
"house_ty": raw.get("HOUSE_TY"),
"supply_area": float(raw["SUPLY_AR"]) if raw.get("SUPLY_AR") is not None else None,
"general_units": raw.get("SUPLY_HSHLDCO") or 0,
"special_units": raw.get("SPSPLY_HSHLDCO") or 0,
"multi_child_units": raw.get("MNYCH_HSHLDCO") or 0,
"newlywed_units": raw.get("NWWDS_HSHLDCO") or 0,
"first_life_units": raw.get("LFE_FRST_HSHLDCO") or 0,
"old_parent_units": raw.get("OLD_PARNTS_SUPORT_HSHLDCO") or 0,
"institution_units": raw.get("INSTT_RECOMEND_HSHLDCO") or 0,
"youth_units": raw.get("YGMN_HSHLDCO") or 0,
"newborn_units": raw.get("NWBB_HSHLDCO") or 0,
"top_amount": _parse_top_amount(raw.get("LTTOT_TOP_AMOUNT")),
}
def collect_all() -> Dict[str, Any]:
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다."""
if not API_KEY:
logger.warning("API 키 미설정 — 수집 중단")
save_collect_log(0, 0, "API 키 미설정")
return {"new_count": 0, "total_count": 0}
total_count = 0
new_count = 0
for detail_ep, model_ep in DETAIL_ENDPOINTS:
# 공고 상세 수집
detail_rows = _api_call(detail_ep)
for raw in detail_rows:
try:
parsed = _parse_apt_detail(raw)
upsert_announcement(parsed)
total_count += 1
except Exception as e:
logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e)
# 모델(평형) 수집
model_rows = _api_call(model_ep)
for raw in model_rows:
try:
parsed = _parse_model(raw)
upsert_model(parsed)
except Exception as e:
logger.error("모델 upsert 실패 [%s]: %s", model_ep, e)
new_count = total_count # 첫 수집 시 전부 신규로 기록
save_collect_log(new_count, total_count)
logger.info("수집 완료: new=%d, total=%d", new_count, total_count)
return {"new_count": new_count, "total_count": total_count}