Files
web-page-backend/realestate-lab/app/db.py

555 lines
24 KiB
Python

# realestate-lab/app/db.py
import json
import sqlite3
import logging
from typing import Dict, Any, List, Optional
from datetime import date
logger = logging.getLogger("realestate-lab")
DB_PATH = "/app/data/realestate.db"
def _conn():
c = sqlite3.connect(DB_PATH)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL;")
c.execute("PRAGMA foreign_keys=ON;")
return c
def init_db():
with _conn() as conn:
# ── announcements ────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS announcements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
house_manage_no TEXT NOT NULL,
pblanc_no TEXT NOT NULL,
house_nm TEXT,
house_secd TEXT,
house_dtl_secd TEXT,
rent_secd TEXT,
region_code TEXT,
region_name TEXT,
address TEXT,
total_units INTEGER,
rcrit_date TEXT,
receipt_start TEXT,
receipt_end TEXT,
spsply_start TEXT,
spsply_end TEXT,
gnrl_rank1_start TEXT,
gnrl_rank1_end TEXT,
winner_date TEXT,
contract_start TEXT,
contract_end TEXT,
homepage_url TEXT,
pblanc_url TEXT,
constructor TEXT,
developer TEXT,
move_in_month TEXT,
is_speculative_area TEXT,
is_price_cap TEXT,
contact TEXT,
status TEXT NOT NULL DEFAULT '청약예정',
source TEXT NOT NULL DEFAULT 'manual',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(house_manage_no, pblanc_no)
);
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_status ON announcements(status);")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);")
# ── announcement_models ──────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS announcement_models (
id INTEGER PRIMARY KEY AUTOINCREMENT,
house_manage_no TEXT NOT NULL,
pblanc_no TEXT NOT NULL,
model_no TEXT,
house_ty TEXT,
supply_area REAL,
general_units INTEGER DEFAULT 0,
special_units INTEGER DEFAULT 0,
multi_child_units INTEGER DEFAULT 0,
newlywed_units INTEGER DEFAULT 0,
first_life_units INTEGER DEFAULT 0,
old_parent_units INTEGER DEFAULT 0,
institution_units INTEGER DEFAULT 0,
youth_units INTEGER DEFAULT 0,
newborn_units INTEGER DEFAULT 0,
top_amount INTEGER,
UNIQUE(house_manage_no, pblanc_no, model_no)
);
""")
# ── user_profile ─────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS user_profile (
id INTEGER PRIMARY KEY DEFAULT 1,
name TEXT,
age INTEGER,
is_homeless INTEGER,
is_householder INTEGER,
subscription_months INTEGER,
subscription_amount INTEGER,
family_members INTEGER,
has_dependents INTEGER,
children_count INTEGER DEFAULT 0,
is_newlywed INTEGER,
marriage_months INTEGER,
has_newborn INTEGER,
is_first_home INTEGER,
income_level TEXT,
preferred_regions TEXT NOT NULL DEFAULT '[]',
preferred_types TEXT NOT NULL DEFAULT '[]',
min_area REAL,
max_area REAL,
max_price INTEGER,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
""")
# ── match_results ────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS match_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
announcement_id INTEGER NOT NULL,
model_id INTEGER,
match_score INTEGER NOT NULL DEFAULT 0,
match_reasons TEXT NOT NULL DEFAULT '[]',
eligible_types TEXT NOT NULL DEFAULT '[]',
is_new INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(announcement_id, model_id)
);
""")
# ── collect_log ──────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS collect_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
new_count INTEGER NOT NULL DEFAULT 0,
total_count INTEGER NOT NULL DEFAULT 0,
error TEXT
);
""")
# ── 상태 자동 계산 ───────────────────────────────────────────────────────────
def compute_status(receipt_start: str, receipt_end: str, winner_date: str) -> str:
today = date.today().isoformat()
if receipt_start and today < receipt_start:
return "청약예정"
if receipt_start and receipt_end and receipt_start <= today <= receipt_end:
return "청약중"
if receipt_end and winner_date and receipt_end < today <= winner_date:
return "결과발표"
if winner_date and today > winner_date:
return "완료"
return "청약예정"
# ── announcements CRUD ───────────────────────────────────────────────────────
def _ann_row_to_dict(r) -> Dict[str, Any]:
return {c: r[c] for c in r.keys()}
def upsert_announcement(data: Dict[str, Any]) -> tuple:
"""공고 upsert — house_manage_no + pblanc_no 기준. Returns (dict, is_new: bool)."""
status = compute_status(
data.get("receipt_start", ""),
data.get("receipt_end", ""),
data.get("winner_date", ""),
)
with _conn() as conn:
exists = conn.execute(
"SELECT 1 FROM announcements WHERE house_manage_no = ? AND pblanc_no = ?",
(data["house_manage_no"], data["pblanc_no"]),
).fetchone()
is_new = exists is None
conn.execute("""
INSERT INTO announcements (
house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd,
rent_secd, region_code, region_name, address, total_units,
rcrit_date, receipt_start, receipt_end, spsply_start, spsply_end,
gnrl_rank1_start, gnrl_rank1_end, winner_date, contract_start,
contract_end, homepage_url, pblanc_url, constructor, developer,
move_in_month, is_speculative_area, is_price_cap, contact,
status, source
) VALUES (
:house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd,
:rent_secd, :region_code, :region_name, :address, :total_units,
:rcrit_date, :receipt_start, :receipt_end, :spsply_start, :spsply_end,
:gnrl_rank1_start, :gnrl_rank1_end, :winner_date, :contract_start,
:contract_end, :homepage_url, :pblanc_url, :constructor, :developer,
:move_in_month, :is_speculative_area, :is_price_cap, :contact,
:status, :source
)
ON CONFLICT(house_manage_no, pblanc_no) DO UPDATE SET
house_nm=excluded.house_nm,
house_secd=excluded.house_secd,
house_dtl_secd=excluded.house_dtl_secd,
rent_secd=excluded.rent_secd,
region_code=excluded.region_code,
region_name=excluded.region_name,
address=excluded.address,
total_units=excluded.total_units,
rcrit_date=excluded.rcrit_date,
receipt_start=excluded.receipt_start,
receipt_end=excluded.receipt_end,
spsply_start=excluded.spsply_start,
spsply_end=excluded.spsply_end,
gnrl_rank1_start=excluded.gnrl_rank1_start,
gnrl_rank1_end=excluded.gnrl_rank1_end,
winner_date=excluded.winner_date,
contract_start=excluded.contract_start,
contract_end=excluded.contract_end,
homepage_url=excluded.homepage_url,
pblanc_url=excluded.pblanc_url,
constructor=excluded.constructor,
developer=excluded.developer,
move_in_month=excluded.move_in_month,
is_speculative_area=excluded.is_speculative_area,
is_price_cap=excluded.is_price_cap,
contact=excluded.contact,
status=excluded.status,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
""", {**data, "status": status})
row = conn.execute(
"SELECT * FROM announcements WHERE house_manage_no = ? AND pblanc_no = ?",
(data["house_manage_no"], data["pblanc_no"]),
).fetchone()
return _ann_row_to_dict(row), is_new
def get_announcements(
region: str = None,
status: str = None,
house_type: str = None,
matched_only: bool = False,
sort: str = "date",
page: int = 1,
size: int = 20,
) -> Dict[str, Any]:
conditions, params = [], []
if region:
conditions.append("a.region_name = ?")
params.append(region)
if status:
conditions.append("a.status = ?")
params.append(status)
if house_type:
conditions.append("a.house_secd = ?")
params.append(house_type)
if matched_only:
conditions.append("a.id IN (SELECT announcement_id FROM match_results)")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
order_map = {"date": "a.rcrit_date DESC", "score": "a.id DESC", "price": "a.id ASC"}
order = order_map.get(sort, "a.rcrit_date DESC")
if matched_only and sort == "score":
order = "(SELECT MAX(match_score) FROM match_results WHERE announcement_id = a.id) DESC"
offset = (page - 1) * size
with _conn() as conn:
total = conn.execute(
f"SELECT COUNT(*) FROM announcements a {where}", params
).fetchone()[0]
rows = conn.execute(
f"SELECT a.* FROM announcements a {where} ORDER BY {order} LIMIT ? OFFSET ?",
params + [size, offset],
).fetchall()
return {
"items": [_ann_row_to_dict(r) for r in rows],
"total": total,
"page": page,
"size": size,
}
def get_announcement(ann_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM announcements WHERE id = ?", (ann_id,)).fetchone()
if not row:
return None
ann = _ann_row_to_dict(row)
models = conn.execute(
"SELECT * FROM announcement_models WHERE house_manage_no = ? AND pblanc_no = ?",
(ann["house_manage_no"], ann["pblanc_no"]),
).fetchall()
ann["models"] = [dict(m) for m in models]
return ann
def create_announcement(data: Dict[str, Any]) -> Dict[str, Any]:
"""수동 공고 등록 (house_manage_no 자동 생성)."""
import uuid
data["house_manage_no"] = data.get("house_manage_no", f"MANUAL-{uuid.uuid4().hex[:8]}")
data["pblanc_no"] = data.get("pblanc_no", "00")
data["source"] = "manual"
result, _ = upsert_announcement(data)
return result
ANNOUNCEMENT_COLUMNS = {
"house_nm", "house_secd", "house_dtl_secd", "rent_secd",
"region_code", "region_name", "address", "total_units",
"rcrit_date", "receipt_start", "receipt_end", "spsply_start", "spsply_end",
"gnrl_rank1_start", "gnrl_rank1_end", "winner_date",
"contract_start", "contract_end", "homepage_url", "pblanc_url",
"constructor", "developer", "move_in_month",
"is_speculative_area", "is_price_cap", "contact",
}
def update_announcement(ann_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
fields = {k: v for k, v in data.items() if v is not None and k in ANNOUNCEMENT_COLUMNS}
if not fields:
return get_announcement(ann_id)
# 날짜 변경 시 status 재계산
with _conn() as conn:
row = conn.execute("SELECT * FROM announcements WHERE id = ?", (ann_id,)).fetchone()
if not row:
return None
current = _ann_row_to_dict(row)
merged = {**current, **fields}
status = compute_status(
merged.get("receipt_start", ""),
merged.get("receipt_end", ""),
merged.get("winner_date", ""),
)
fields["status"] = status
set_clauses = ", ".join(f"{k} = ?" for k in fields)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
conn.execute(
f"UPDATE announcements SET {set_clauses} WHERE id = ?",
list(fields.values()) + [ann_id],
)
return get_announcement(ann_id)
def delete_announcement(ann_id: int) -> bool:
with _conn() as conn:
# 관련 매칭 결과도 삭제
conn.execute("DELETE FROM match_results WHERE announcement_id = ?", (ann_id,))
cur = conn.execute("DELETE FROM announcements WHERE id = ?", (ann_id,))
return cur.rowcount > 0
def update_all_statuses():
"""모든 진행중 공고의 status를 날짜 기반으로 재계산."""
with _conn() as conn:
rows = conn.execute(
"SELECT id, receipt_start, receipt_end, winner_date FROM announcements "
"WHERE status != '완료' AND (receipt_start IS NOT NULL OR receipt_end IS NOT NULL OR winner_date IS NOT NULL)"
).fetchall()
for r in rows:
new_status = compute_status(r["receipt_start"], r["receipt_end"], r["winner_date"])
if new_status != "완료":
conn.execute("UPDATE announcements SET status = ? WHERE id = ?", (new_status, r["id"]))
else:
conn.execute(
"UPDATE announcements SET status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?",
(new_status, r["id"]),
)
# ── announcement_models CRUD ─────────────────────────────────────────────────
def upsert_model(data: Dict[str, Any]):
with _conn() as conn:
conn.execute("""
INSERT INTO announcement_models (
house_manage_no, pblanc_no, model_no, house_ty, supply_area,
general_units, special_units, multi_child_units, newlywed_units,
first_life_units, old_parent_units, institution_units,
youth_units, newborn_units, top_amount
) VALUES (
:house_manage_no, :pblanc_no, :model_no, :house_ty, :supply_area,
:general_units, :special_units, :multi_child_units, :newlywed_units,
:first_life_units, :old_parent_units, :institution_units,
:youth_units, :newborn_units, :top_amount
)
ON CONFLICT(house_manage_no, pblanc_no, model_no) DO UPDATE SET
house_ty=excluded.house_ty,
supply_area=excluded.supply_area,
general_units=excluded.general_units,
special_units=excluded.special_units,
multi_child_units=excluded.multi_child_units,
newlywed_units=excluded.newlywed_units,
first_life_units=excluded.first_life_units,
old_parent_units=excluded.old_parent_units,
institution_units=excluded.institution_units,
youth_units=excluded.youth_units,
newborn_units=excluded.newborn_units,
top_amount=excluded.top_amount
""", data)
# ── user_profile CRUD ────────────────────────────────────────────────────────
def _profile_row_to_dict(r) -> Dict[str, Any]:
d = {}
for c in r.keys():
val = r[c]
if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed",
"has_newborn", "is_first_home"):
d[c] = bool(val) if val is not None else None
elif c in ("preferred_regions", "preferred_types"):
d[c] = json.loads(val) if val else []
else:
d[c] = val
return d
def get_profile() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM user_profile WHERE id = 1").fetchone()
return _profile_row_to_dict(r) if r else None
PROFILE_COLUMNS = {
"name", "age", "is_homeless", "is_householder",
"subscription_months", "subscription_amount", "family_members",
"has_dependents", "children_count", "is_newlywed", "marriage_months",
"has_newborn", "is_first_home", "income_level",
"preferred_regions", "preferred_types",
"min_area", "max_area", "max_price",
}
def upsert_profile(data: Dict[str, Any]) -> Dict[str, Any]:
updates = {}
for k, v in data.items():
if v is None or k not in PROFILE_COLUMNS:
continue
if isinstance(v, bool):
updates[k] = 1 if v else 0
elif isinstance(v, list):
updates[k] = json.dumps(v)
else:
updates[k] = v
with _conn() as conn:
existing = conn.execute("SELECT id FROM user_profile WHERE id = 1").fetchone()
if existing:
if updates:
set_clauses = ", ".join(f"{k} = ?" for k in updates)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
conn.execute(
f"UPDATE user_profile SET {set_clauses} WHERE id = 1",
list(updates.values()),
)
else:
cols = ["id"] + list(updates.keys())
vals = [1] + list(updates.values())
placeholders = ", ".join("?" for _ in vals)
conn.execute(
f"INSERT INTO user_profile ({', '.join(cols)}) VALUES ({placeholders})",
vals,
)
row = conn.execute("SELECT * FROM user_profile WHERE id = 1").fetchone()
return _profile_row_to_dict(row)
# ── match_results CRUD ───────────────────────────────────────────────────────
def save_match_result(data: Dict[str, Any]):
with _conn() as conn:
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (:announcement_id, :model_id, :match_score, :match_reasons, :eligible_types, 1)
ON CONFLICT(announcement_id, model_id) DO UPDATE SET
match_score=excluded.match_score,
match_reasons=excluded.match_reasons,
eligible_types=excluded.eligible_types
""", {
**data,
"match_reasons": json.dumps(data.get("match_reasons", [])),
"eligible_types": json.dumps(data.get("eligible_types", [])),
})
def get_matches(page: int = 1, size: int = 20) -> Dict[str, Any]:
offset = (page - 1) * size
with _conn() as conn:
total = conn.execute("SELECT COUNT(*) FROM match_results").fetchone()[0]
rows = conn.execute("""
SELECT m.*, a.house_nm, a.region_name, a.address, a.status as ann_status,
a.receipt_start, a.receipt_end, a.winner_date, a.pblanc_url
FROM match_results m
JOIN announcements a ON a.id = m.announcement_id
ORDER BY m.is_new DESC, m.match_score DESC
LIMIT ? OFFSET ?
""", (size, offset)).fetchall()
items = []
for r in rows:
d = {c: r[c] for c in r.keys()}
d["match_reasons"] = json.loads(d["match_reasons"]) if d["match_reasons"] else []
d["eligible_types"] = json.loads(d["eligible_types"]) if d["eligible_types"] else []
items.append(d)
return {"items": items, "total": total, "page": page, "size": size}
def mark_match_read(match_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("UPDATE match_results SET is_new = 0 WHERE id = ?", (match_id,))
return cur.rowcount > 0
def clear_match_results():
with _conn() as conn:
conn.execute("DELETE FROM match_results")
# ── collect_log CRUD ─────────────────────────────────────────────────────────
def save_collect_log(new_count: int, total_count: int, error: str = None):
with _conn() as conn:
conn.execute(
"INSERT INTO collect_log (new_count, total_count, error) VALUES (?, ?, ?)",
(new_count, total_count, error),
)
def get_last_collect_log() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM collect_log ORDER BY id DESC LIMIT 1").fetchone()
return dict(r) if r else None
# ── 대시보드 ─────────────────────────────────────────────────────────────────
def get_dashboard() -> Dict[str, Any]:
with _conn() as conn:
active = conn.execute(
"SELECT COUNT(*) FROM announcements WHERE status IN ('청약예정', '청약중')"
).fetchone()[0]
new_matches = conn.execute(
"SELECT COUNT(*) FROM match_results WHERE is_new = 1"
).fetchone()[0]
upcoming = conn.execute("""
SELECT id, house_nm, receipt_start, receipt_end, status
FROM announcements
WHERE status IN ('청약예정', '청약중')
ORDER BY receipt_start ASC
LIMIT 5
""").fetchall()
return {
"active_count": active,
"new_match_count": new_matches,
"upcoming": [dict(r) for r in upcoming],
}