24 Commits

Author SHA1 Message Date
598adcbeb5 fix(lotto-lab): 코드 리뷰 이슈 수정 — update_purchase JSON 직렬화, EMA 피드백 루프 연결
- update_purchase에서 numbers/is_real 타입 변환 추가 (런타임 에러 방지)
- purchase_manager에서 evolve_after_check 호출하여 EMA 피드백 루프 활성화
- checker.py 중복 recalculate_weights 호출 제거

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 21:25:27 +09:00
d67e1fcd67 docs: CLAUDE.md 신규 API + 테이블 + 파일 구조 업데이트
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 21:20:41 +09:00
7eda717326 lotto-lab: 구매/전략/스마트추천 API 엔드포인트 추가
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 21:19:28 +09:00
28e3af12ec lotto-lab: checker 연동 — 추첨 결과 시 purchase 자동 체크 + 가중치 재계산
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 21:17:52 +09:00
c9f10aca4a lotto-lab: strategy_evolver — EMA/Softmax 가중치 진화 + 스마트 추천
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 21:15:42 +09:00
706ca410ca feat(lotto-lab): purchase_manager — 구매 결과 자동 체크 + 전략 성과 집계
- backend/app/purchase_manager.py 신규 생성
  - check_purchases_for_draw(): 회차별 미채점 구매 건 자동 채점
  - checker._calc_rank 재사용, RANK_PRIZE 상수 정의
  - 채점 후 strategy_performance 자동 upsert (전략별 집계)
- backend/tests/test_purchase_manager.py에 통합 테스트 2건 추가
  - test_check_purchases_for_draw: 1등/낙첨 결과 검증
  - test_check_purchases_updates_strategy_performance: 성과 테이블 갱신 검증

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 21:12:43 +09:00
4c6e96d59c lotto-lab: 구매 CRUD 확장 + strategy_performance/weights CRUD 추가
- _purchase_row_to_dict: numbers/is_real/source_detail/results/total_prize 신규 컬럼 포함
- add_purchase: numbers, is_real, source_strategy, source_detail 파라미터 추가
- get_purchases: is_real, strategy, checked 필터 추가
- get_purchase_stats: total/real/virtual/by_strategy 분리 통계 + 하위호환 필드 유지
- update_purchase: allowed 셋에 numbers/is_real/source_strategy 추가
- 신규: upsert_strategy_performance, get_strategy_performance
- 신규: get_strategy_weights, update_strategy_weight
- 신규: update_purchase_results (체커 연동용)
- 테스트 5건 추가 (TDD)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 21:09:59 +09:00
7cf4784c08 lotto-lab: DB 스키마 확장 — purchase_history ALTER + strategy 테이블 추가
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 21:07:08 +09:00
afc159c84d fix(realestate-lab): 최종 리뷰 이슈 수정 — FK CASCADE, 단일 연결, 동시성 가드
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:49:05 +09:00
bdfcdee5fd fix(realestate-lab): 코드 리뷰 이슈 수정 — 신규 추적, 보안, 비동기, 매칭 상태 보존
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:43:27 +09:00
3b118725ca docs: CLAUDE.md에 realestate-lab 서비스 정보 추가
서비스 목록, Docker 포트, Nginx 라우팅, 로컬 URL, API 목록 추가.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:35:35 +09:00
6344f957fa refactor(lotto-backend): 청약 관련 코드 완전 제거 — realestate-lab으로 이관
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:34:12 +09:00
0be5693aee infra: realestate-lab Docker/Nginx/배포 스크립트 통합
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:32:08 +09:00
5a493664f2 feat(realestate-lab): FastAPI 앱 + 스케줄러 + 전체 API 라우트
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:31:08 +09:00
c6328f7b04 Merge branch 'worktree-agent-a36803ff' 2026-04-06 08:30:11 +09:00
d6d6faf5c7 Merge branch 'worktree-agent-a395667a' 2026-04-06 08:29:55 +09:00
437838c28b feat(realestate-lab): 프로필 기반 매칭 엔진
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:29:36 +09:00
4cb6296a3d feat(realestate-lab): 공공데이터포털 API 수집기
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:29:19 +09:00
9e7efc3f12 feat(realestate-lab): DB 레이어 — 테이블 생성 + 전체 CRUD
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:27:11 +09:00
6b95c1e5a0 feat(realestate-lab): Pydantic 요청 모델 정의
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:26:01 +09:00
7d20527a17 feat(realestate-lab): 프로젝트 스캐폴딩 — Dockerfile, requirements, init
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 08:25:43 +09:00
e91a5e6be6 docs: realestate-lab 구현 계획서 작성
10개 Task — 스캐폴딩, 모델, DB, 수집기, 매칭, API, 인프라, lotto-backend 정리, 문서, 검증

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-05 22:16:43 +09:00
c4406b9ecd lotto-lab: 구매 연동 + 전략 진화 시스템 구현 계획 추가
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-05 22:10:16 +09:00
65ffdec7d2 docs: realestate-lab 설계 스펙 문서 추가
청약 공고 자동 수집 + 프로필 기반 자격 매칭 서비스 설계.
공공데이터포털 API 연동, 독립 서비스 분리, 매칭 엔진 정의.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-05 20:10:31 +09:00
23 changed files with 5695 additions and 665 deletions

View File

@@ -7,7 +7,7 @@
## 1. 프로젝트 개요
Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
- **서비스**: lotto-lab, stock-lab, travel-album, music-lab, deployer
- **서비스**: lotto-lab, stock-lab, travel-album, music-lab, blog-lab, realestate-lab, deployer
- **프론트엔드**: 별도 레포 (React + Vite SPA), 빌드 산출물만 NAS에 배포
- **인프라**: Docker Compose + Nginx(리버스 프록시) + Gitea Webhook 자동 배포
@@ -57,6 +57,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
| `stock-lab` | 18500 | 주식 뉴스·AI 분석·KIS API 연동 |
| `music-lab` | 18600 | AI 음악 생성·라이브러리 관리 API |
| `blog-lab` | 18700 | 블로그 마케팅 수익화 API |
| `realestate-lab` | 18800 | 부동산 청약 자동 수집·매칭 API |
| `travel-proxy` | 19000 | 여행 사진 API + 썸네일 생성 |
| `lotto-frontend` (nginx) | 8080 | 정적 SPA 서빙 + API 리버스 프록시 |
| `webpage-deployer` | 19010 | Gitea Webhook 수신 → 자동 배포 |
@@ -74,6 +75,7 @@ Synology NAS 기반의 개인 웹 플랫폼 백엔드 모노레포.
| `/api/portfolio` | `stock-lab:8000` | trailing slash 유무 모두 매칭 |
| `/api/music/` | `music-lab:8000` | AI 음악 생성·라이브러리 API |
| `/api/blog-marketing/` | `blog-lab:8000` | 블로그 마케팅 수익화 API |
| `/api/realestate/` | `realestate-lab:8000` | 부동산 청약 API |
| `/webhook`, `/webhook/` | `deployer:9000` | Gitea Webhook |
| `/media/music/` | `/data/music/` (파일 직접 서빙) | 생성된 오디오 파일 |
| `/media/travel/.thumb/` | `/data/thumbs/` (파일 직접 서빙) | 썸네일 캐시 |
@@ -125,6 +127,7 @@ docker compose up -d
| Travel API | http://localhost:19000 |
| Stock Lab | http://localhost:18500 |
| Blog Lab | http://localhost:18700 |
| Realestate Lab | http://localhost:18800 |
---
@@ -133,7 +136,7 @@ docker compose up -d
### lotto-lab (backend/)
- DB: `/app/data/lotto.db`
- 데이터 소스: `smok95.github.io/lotto/results/`
- 파일 구조: `main.py`, `db.py`, `recommender.py`, `collector.py`, `checker.py`, `generator.py`, `analyzer.py`, `utils.py`
- 파일 구조: `main.py`, `db.py`, `recommender.py`, `collector.py`, `checker.py`, `generator.py`, `analyzer.py`, `utils.py`, `purchase_manager.py`, `strategy_evolver.py`
**lotto.db 테이블**
@@ -144,6 +147,10 @@ docker compose up -d
| `simulation_runs` | 시뮬레이션 실행 기록 |
| `simulation_candidates` | 시뮬레이션 후보 (점수 5종) |
| `best_picks` | 현재 활성 최적 번호 20개 (`is_active` 플래그로 교체) |
| `purchase_history` | 구매 이력 (실제/가상, 번호, 전략 출처, 결과) |
| `strategy_performance` | 전략별 회차 성과 (EMA 입력 데이터) |
| `strategy_weights` | 메타 전략 가중치 (EMA + Softmax) |
| `weekly_reports` | 주간 공략 리포트 캐시 |
| `todos` | 투두리스트 (UUID PK) |
| `blog_posts` | 블로그 글 (tags: JSON 배열) |
@@ -165,6 +172,15 @@ docker compose up -d
| GET | `/api/lotto/recommend/heatmap` | 히트맵 기반 추천 |
| GET | `/api/lotto/recommend/batch` | 배치 추천 |
| POST | `/api/lotto/recommend/batch` | 배치 추천 저장 |
| GET | `/api/lotto/recommend/smart` | 전략 진화 기반 메타 추천 |
| GET | `/api/lotto/purchase` | 구매 이력 조회 (is_real, strategy, draw_no, days 필터) |
| POST | `/api/lotto/purchase` | 구매 등록 (실제/가상, 번호, 전략 출처 포함) |
| PUT | `/api/lotto/purchase/{id}` | 구매 이력 수정 |
| DELETE | `/api/lotto/purchase/{id}` | 구매 이력 삭제 |
| GET | `/api/lotto/purchase/stats` | 구매 통계 (전체/실제/가상 + 전략별) |
| GET | `/api/lotto/strategy/weights` | 전략별 가중치 + 성과 + trend |
| GET | `/api/lotto/strategy/performance` | 전략별 회차 성과 이력 (차트용) |
| POST | `/api/lotto/strategy/evolve` | 수동 가중치 재계산 |
| POST | `/api/admin/simulate` | 시뮬레이션 수동 실행 |
| POST | `/api/admin/sync_latest` | 당첨번호 수동 동기화 |
| GET | `/api/history` | 추천 이력 (limit, offset, favorite, tag, sort) |
@@ -267,6 +283,36 @@ docker compose up -d
- CDN URL(`cdn1.suno.ai`)은 임시 → 반드시 로컬 다운로드 필요
- 가사 섹션 태그: `[Verse]`, `[Chorus]`, `[Bridge]`, `[Instrumental]`
### realestate-lab (realestate-lab/)
- 공공데이터포털 API 연동: 한국부동산원 청약홈 분양정보 조회 서비스
- DB: `/app/data/realestate.db` (announcements, announcement_models, user_profile, match_results, collect_log 테이블)
- 파일 구조: `main.py`, `db.py`, `collector.py`, `matcher.py`, `models.py`
**환경변수**
- `DATA_GO_KR_API_KEY`: 공공데이터포털 API 키 (미설정 시 수동 등록만 가능)
**스케줄러 job**
- 09:00 매일 — 청약 공고 수집 + 매칭 (`scheduled_collect`)
- 00:00 매일 — 상태 갱신 + 재매칭 (`scheduled_status_update`)
**realestate-lab API 목록**
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/realestate/announcements` | 공고 목록 (region, status, house_type, matched_only, sort, page, size) |
| GET | `/api/realestate/announcements/{id}` | 공고 상세 (주택형별 포함) |
| POST | `/api/realestate/announcements` | 수동 공고 등록 |
| PUT | `/api/realestate/announcements/{id}` | 공고 수정 |
| DELETE | `/api/realestate/announcements/{id}` | 공고 삭제 |
| POST | `/api/realestate/collect` | 수동 수집 트리거 |
| GET | `/api/realestate/collect/status` | 마지막 수집 결과 |
| GET | `/api/realestate/profile` | 내 프로필 조회 |
| PUT | `/api/realestate/profile` | 프로필 수정 (upsert) |
| GET | `/api/realestate/matches` | 매칭 결과 목록 |
| POST | `/api/realestate/matches/refresh` | 매칭 재계산 |
| PATCH | `/api/realestate/matches/{id}/read` | 신규 알림 읽음 처리 |
| GET | `/api/realestate/dashboard` | 요약 (진행중 공고수, 신규 매칭수, 다가오는 일정) |
### travel-proxy (travel-proxy/)
- 원본 사진: `/data/travel/` (RO)
- 썸네일 캐시: `/data/thumbs/` (RW)

View File

@@ -62,5 +62,12 @@ def check_results_for_draw(drw_no: int) -> int:
update_recommendation_result(r["id"], rank, correct, has_bonus)
count += 1
# ── 구매 이력 체크 연동 ──────────────────────────────────────
try:
from .purchase_manager import check_purchases_for_draw as _check_purchases
_check_purchases(drw_no) # 내부에서 evolve_after_check → recalculate_weights 호출
except ImportError:
pass # purchase_manager 미설치 시 무시 (하위호환)
return count

View File

@@ -181,76 +181,6 @@ def init_db() -> None:
"CREATE INDEX IF NOT EXISTS idx_blog_date ON blog_posts(date DESC);"
)
# ── realestate_complexes 테이블 ────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS realestate_complexes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
address TEXT NOT NULL DEFAULT '',
lat REAL,
lng REAL,
units INTEGER,
types TEXT NOT NULL DEFAULT '[]',
avg_price_per_pyeong INTEGER,
subscription_start TEXT,
subscription_end TEXT,
result_date TEXT,
status TEXT NOT NULL DEFAULT '청약예정'
CHECK(status IN ('청약예정','청약중','결과발표','완료')),
priority TEXT NOT NULL DEFAULT 'normal'
CHECK(priority IN ('high','normal','low')),
tags TEXT NOT NULL DEFAULT '[]',
naver_url TEXT NOT NULL DEFAULT '',
floor_plan_url TEXT NOT NULL DEFAULT '',
memo TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_realestate_status ON realestate_complexes(status);"
)
# ── subscription_items 테이블 ──────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS subscription_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
complex_name TEXT NOT NULL,
address TEXT NOT NULL DEFAULT '',
pyeong TEXT,
total_price INTEGER,
type TEXT,
special_type TEXT,
supply_type TEXT,
status TEXT NOT NULL DEFAULT '검토중',
min_score INTEGER,
max_income INTEGER,
homeless_required INTEGER,
subscription_start TEXT,
subscription_end TEXT,
contract_date TEXT,
interim_date TEXT,
balance_date TEXT,
result_date TEXT,
deposit_rate INTEGER DEFAULT 10,
interim_rate INTEGER DEFAULT 60,
balance_rate INTEGER DEFAULT 30,
loan_type TEXT,
loan_rate REAL,
memo TEXT NOT NULL DEFAULT '',
naver_url TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_sub_items_created ON subscription_items(created_at DESC);"
)
# ── purchase_history 테이블 ────────────────────────────────────────────
conn.execute(
"""
@@ -267,6 +197,69 @@ def init_db() -> None:
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_purchase_draw ON purchase_history(draw_no DESC);")
# ── purchase_history 컬럼 확장 (기존 데이터 보존) ──────────────────────
_ensure_column(conn, "purchase_history", "numbers",
"ALTER TABLE purchase_history ADD COLUMN numbers TEXT NOT NULL DEFAULT '[]'")
_ensure_column(conn, "purchase_history", "is_real",
"ALTER TABLE purchase_history ADD COLUMN is_real INTEGER NOT NULL DEFAULT 1")
_ensure_column(conn, "purchase_history", "source_strategy",
"ALTER TABLE purchase_history ADD COLUMN source_strategy TEXT NOT NULL DEFAULT 'manual'")
_ensure_column(conn, "purchase_history", "source_detail",
"ALTER TABLE purchase_history ADD COLUMN source_detail TEXT NOT NULL DEFAULT '{}'")
_ensure_column(conn, "purchase_history", "checked",
"ALTER TABLE purchase_history ADD COLUMN checked INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "purchase_history", "results",
"ALTER TABLE purchase_history ADD COLUMN results TEXT NOT NULL DEFAULT '[]'")
_ensure_column(conn, "purchase_history", "total_prize",
"ALTER TABLE purchase_history ADD COLUMN total_prize INTEGER NOT NULL DEFAULT 0")
# ── strategy_performance 테이블 ────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS strategy_performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy TEXT NOT NULL,
draw_no INTEGER NOT NULL,
sets_count INTEGER NOT NULL DEFAULT 0,
total_correct INTEGER NOT NULL DEFAULT 0,
max_correct INTEGER NOT NULL DEFAULT 0,
prize_total INTEGER NOT NULL DEFAULT 0,
avg_score REAL NOT NULL DEFAULT 0.0,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(strategy, draw_no)
);
"""
)
# ── strategy_weights 테이블 ────────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS strategy_weights (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy TEXT NOT NULL UNIQUE,
weight REAL NOT NULL DEFAULT 0.2,
ema_score REAL NOT NULL DEFAULT 0.15,
total_sets INTEGER NOT NULL DEFAULT 0,
total_hits_3plus INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
"""
)
# strategy_weights 초기값 시드 (이미 있으면 무시)
_INIT_WEIGHTS = [
("combined", 0.30, 0.15),
("simulation", 0.25, 0.15),
("heatmap", 0.20, 0.15),
("manual", 0.15, 0.15),
("custom", 0.10, 0.15),
]
for strat, w, ema in _INIT_WEIGHTS:
conn.execute(
"INSERT OR IGNORE INTO strategy_weights (strategy, weight, ema_score) VALUES (?, ?, ?)",
(strat, w, ema),
)
# ── weekly_reports 캐시 테이블 ──────────────────────────────────────────
conn.execute(
"""
@@ -279,26 +272,6 @@ def init_db() -> None:
"""
)
# ── subscription_profile 테이블 (싱글톤 id=1) ──────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS subscription_profile (
id INTEGER PRIMARY KEY DEFAULT 1,
is_household_head INTEGER DEFAULT 1,
is_homeless INTEGER DEFAULT 1,
homeless_period INTEGER,
savings_months INTEGER,
savings_count INTEGER,
dependents INTEGER DEFAULT 0,
residency_area TEXT,
is_married INTEGER,
marriage_months INTEGER,
monthly_income INTEGER,
special_quals TEXT NOT NULL DEFAULT '[]'
);
"""
)
# ── todos CRUD ───────────────────────────────────────────────────────────────
@@ -834,312 +807,14 @@ def get_simulation_candidates(run_id: int, limit: int = 100) -> List[Dict[str, A
]
# ── realestate_complexes CRUD ─────────────────────────────────────────────────
def _complex_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"name": r["name"],
"address": r["address"],
"lat": r["lat"],
"lng": r["lng"],
"units": r["units"],
"types": json.loads(r["types"]) if r["types"] else [],
"avgPricePerPyeong": r["avg_price_per_pyeong"],
"subscriptionStart": r["subscription_start"],
"subscriptionEnd": r["subscription_end"],
"resultDate": r["result_date"],
"status": r["status"],
"priority": r["priority"],
"tags": json.loads(r["tags"]) if r["tags"] else [],
"naverUrl": r["naver_url"],
"floorPlanUrl": r["floor_plan_url"],
"memo": r["memo"],
"created_at": r["created_at"],
"updated_at": r["updated_at"],
}
def get_all_complexes() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM realestate_complexes ORDER BY id DESC"
).fetchall()
return [_complex_row_to_dict(r) for r in rows]
def get_complex(complex_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute(
"SELECT * FROM realestate_complexes WHERE id = ?", (complex_id,)
).fetchone()
return _complex_row_to_dict(r) if r else None
def create_complex(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""
INSERT INTO realestate_complexes
(name, address, lat, lng, units, types, avg_price_per_pyeong,
subscription_start, subscription_end, result_date,
status, priority, tags, naver_url, floor_plan_url, memo)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data["name"],
data.get("address", ""),
data.get("lat"),
data.get("lng"),
data.get("units"),
json.dumps(data.get("types", [])),
data.get("avgPricePerPyeong"),
data.get("subscriptionStart"),
data.get("subscriptionEnd"),
data.get("resultDate"),
data.get("status", "청약예정"),
data.get("priority", "normal"),
json.dumps(data.get("tags", [])),
data.get("naverUrl", ""),
data.get("floorPlanUrl", ""),
data.get("memo", ""),
),
)
row = conn.execute(
"SELECT * FROM realestate_complexes WHERE rowid = last_insert_rowid()"
).fetchone()
return _complex_row_to_dict(row)
def update_complex(complex_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
field_map = {
"name": "name",
"address": "address",
"lat": "lat",
"lng": "lng",
"units": "units",
"avgPricePerPyeong": "avg_price_per_pyeong",
"subscriptionStart": "subscription_start",
"subscriptionEnd": "subscription_end",
"resultDate": "result_date",
"status": "status",
"priority": "priority",
"naverUrl": "naver_url",
"floorPlanUrl": "floor_plan_url",
"memo": "memo",
}
json_fields = {"types", "tags"}
updates: Dict[str, Any] = {}
for camel, snake in field_map.items():
if camel in data:
updates[snake] = data[camel]
for f in json_fields:
if f in data:
updates[f] = json.dumps(data[f])
if not updates:
return get_complex(complex_id)
set_clauses = ", ".join(f"{k} = ?" for k in updates)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
args = list(updates.values()) + [complex_id]
with _conn() as conn:
conn.execute(
f"UPDATE realestate_complexes SET {set_clauses} WHERE id = ?", args
)
row = conn.execute(
"SELECT * FROM realestate_complexes WHERE id = ?", (complex_id,)
).fetchone()
return _complex_row_to_dict(row) if row else None
def delete_complex(complex_id: int) -> bool:
with _conn() as conn:
cur = conn.execute(
"DELETE FROM realestate_complexes WHERE id = ?", (complex_id,)
)
return cur.rowcount > 0
# ── subscription_items CRUD ───────────────────────────────────────────────────
_SUB_ITEM_FIELD_MAP = {
"complexName": "complex_name",
"address": "address",
"pyeong": "pyeong",
"totalPrice": "total_price",
"type": "type",
"specialType": "special_type",
"supplyType": "supply_type",
"status": "status",
"minScore": "min_score",
"maxIncome": "max_income",
"homelessRequired": "homeless_required",
"subscriptionStart": "subscription_start",
"subscriptionEnd": "subscription_end",
"contractDate": "contract_date",
"interimDate": "interim_date",
"balanceDate": "balance_date",
"resultDate": "result_date",
"depositRate": "deposit_rate",
"interimRate": "interim_rate",
"balanceRate": "balance_rate",
"loanType": "loan_type",
"loanRate": "loan_rate",
"memo": "memo",
"naverUrl": "naver_url",
}
def _sub_item_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"complexName": r["complex_name"],
"address": r["address"],
"pyeong": r["pyeong"],
"totalPrice": r["total_price"],
"type": r["type"],
"specialType": r["special_type"],
"supplyType": r["supply_type"],
"status": r["status"],
"minScore": r["min_score"],
"maxIncome": r["max_income"],
"homelessRequired": r["homeless_required"],
"subscriptionStart": r["subscription_start"],
"subscriptionEnd": r["subscription_end"],
"contractDate": r["contract_date"],
"interimDate": r["interim_date"],
"balanceDate": r["balance_date"],
"resultDate": r["result_date"],
"depositRate": r["deposit_rate"],
"interimRate": r["interim_rate"],
"balanceRate": r["balance_rate"],
"loanType": r["loan_type"],
"loanRate": r["loan_rate"],
"memo": r["memo"],
"naverUrl": r["naver_url"],
"created_at": r["created_at"],
"updated_at": r["updated_at"],
}
def get_all_subscription_items() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM subscription_items ORDER BY created_at DESC"
).fetchall()
return [_sub_item_row_to_dict(r) for r in rows]
def create_subscription_item(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""
INSERT INTO subscription_items
(complex_name, address, pyeong, total_price, type, special_type, supply_type,
status, min_score, max_income, homeless_required,
subscription_start, subscription_end, contract_date, interim_date,
balance_date, result_date, deposit_rate, interim_rate, balance_rate,
loan_type, loan_rate, memo, naver_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data["complexName"],
data.get("address", ""),
data.get("pyeong"),
data.get("totalPrice"),
data.get("type"),
data.get("specialType"),
data.get("supplyType"),
data.get("status", "검토중"),
data.get("minScore"),
data.get("maxIncome"),
data.get("homelessRequired"),
data.get("subscriptionStart"),
data.get("subscriptionEnd"),
data.get("contractDate"),
data.get("interimDate"),
data.get("balanceDate"),
data.get("resultDate"),
data.get("depositRate", 10),
data.get("interimRate", 60),
data.get("balanceRate", 30),
data.get("loanType"),
data.get("loanRate"),
data.get("memo", ""),
data.get("naverUrl", ""),
),
)
row = conn.execute(
"SELECT * FROM subscription_items WHERE rowid = last_insert_rowid()"
).fetchone()
return _sub_item_row_to_dict(row)
def update_subscription_item(item_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
updates: Dict[str, Any] = {}
for camel, snake in _SUB_ITEM_FIELD_MAP.items():
if camel in data:
updates[snake] = data[camel]
if not updates:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM subscription_items WHERE id = ?", (item_id,)
).fetchone()
return _sub_item_row_to_dict(row) if row else None
set_clauses = ", ".join(f"{k} = ?" for k in updates)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
args = list(updates.values()) + [item_id]
with _conn() as conn:
conn.execute(
f"UPDATE subscription_items SET {set_clauses} WHERE id = ?", args
)
row = conn.execute(
"SELECT * FROM subscription_items WHERE id = ?", (item_id,)
).fetchone()
return _sub_item_row_to_dict(row) if row else None
def delete_subscription_item(item_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM subscription_items WHERE id = ?", (item_id,))
return cur.rowcount > 0
# ── subscription_profile CRUD (싱글톤) ────────────────────────────────────────
def _profile_row_to_dict(r) -> Dict[str, Any]:
return {
"isHouseholdHead": bool(r["is_household_head"]) if r["is_household_head"] is not None else None,
"isHomeless": bool(r["is_homeless"]) if r["is_homeless"] is not None else None,
"homelessPeriod": r["homeless_period"],
"savingsMonths": r["savings_months"],
"savingsCount": r["savings_count"],
"dependents": r["dependents"],
"residencyArea": r["residency_area"],
"isMarried": bool(r["is_married"]) if r["is_married"] is not None else None,
"marriageMonths": r["marriage_months"],
"monthlyIncome": r["monthly_income"],
"specialQuals": json.loads(r["special_quals"]) if r["special_quals"] else [],
}
def get_subscription_profile() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute(
"SELECT * FROM subscription_profile WHERE id = 1"
).fetchone()
return _profile_row_to_dict(r) if r else None
# ── purchase_history CRUD ─────────────────────────────────────────────────────
def _purchase_row_to_dict(r) -> Dict[str, Any]:
import json as _json
keys = r.keys()
numbers_raw = r["numbers"] if "numbers" in keys else "[]"
detail_raw = r["source_detail"] if "source_detail" in keys else "{}"
results_raw = r["results"] if "results" in keys else "[]"
return {
"id": r["id"],
"draw_no": r["draw_no"],
@@ -1148,20 +823,37 @@ def _purchase_row_to_dict(r) -> Dict[str, Any]:
"prize": r["prize"],
"note": r["note"],
"created_at": r["created_at"],
"numbers": _json.loads(numbers_raw) if numbers_raw else [],
"is_real": r["is_real"] if "is_real" in keys else 1,
"source_strategy": r["source_strategy"] if "source_strategy" in keys else "manual",
"source_detail": _json.loads(detail_raw) if detail_raw else {},
"checked": r["checked"] if "checked" in keys else 0,
"results": _json.loads(results_raw) if results_raw else [],
"total_prize": r["total_prize"] if "total_prize" in keys else 0,
}
def add_purchase(draw_no: int, amount: int, sets: int, prize: int = 0, note: str = "") -> Dict[str, Any]:
def add_purchase(draw_no: int, amount: int, sets: int, prize: int = 0, note: str = "",
numbers: list = None, is_real: bool = True,
source_strategy: str = "manual", source_detail: dict = None) -> Dict[str, Any]:
import json as _json
numbers_json = _json.dumps(numbers or [], ensure_ascii=False)
detail_json = _json.dumps(source_detail or {}, ensure_ascii=False)
is_real_int = 1 if is_real else 0
with _conn() as conn:
conn.execute(
"INSERT INTO purchase_history (draw_no, amount, sets, prize, note) VALUES (?, ?, ?, ?, ?)",
(draw_no, amount, sets, prize, note),
"""INSERT INTO purchase_history
(draw_no, amount, sets, prize, note, numbers, is_real, source_strategy, source_detail)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(draw_no, amount, sets, prize, note, numbers_json, is_real_int, source_strategy, detail_json),
)
row = conn.execute("SELECT * FROM purchase_history WHERE rowid = last_insert_rowid()").fetchone()
return _purchase_row_to_dict(row)
def get_purchases(draw_no: int = None, days: int = None) -> List[Dict[str, Any]]:
def get_purchases(draw_no: int = None, days: int = None,
is_real: bool = None, strategy: str = None,
checked: bool = None) -> List[Dict[str, Any]]:
conditions, params = [], []
if draw_no is not None:
conditions.append("draw_no = ?")
@@ -1169,6 +861,15 @@ def get_purchases(draw_no: int = None, days: int = None) -> List[Dict[str, Any]]
if days:
conditions.append("created_at >= datetime('now', ? || ' days')")
params.append(f"-{days}")
if is_real is not None:
conditions.append("is_real = ?")
params.append(1 if is_real else 0)
if strategy is not None:
conditions.append("source_strategy = ?")
params.append(strategy)
if checked is not None:
conditions.append("checked = ?")
params.append(1 if checked else 0)
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
with _conn() as conn:
rows = conn.execute(
@@ -1179,12 +880,18 @@ def get_purchases(draw_no: int = None, days: int = None) -> List[Dict[str, Any]]
def update_purchase(purchase_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
allowed = {"draw_no", "amount", "sets", "prize", "note"}
import json as _json
allowed = {"draw_no", "amount", "sets", "prize", "note", "numbers", "is_real", "source_strategy"}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
with _conn() as conn:
row = conn.execute("SELECT * FROM purchase_history WHERE id = ?", (purchase_id,)).fetchone()
return _purchase_row_to_dict(row) if row else None
# SQLite에 전달 전 타입 변환
if "numbers" in updates:
updates["numbers"] = _json.dumps(updates["numbers"], ensure_ascii=False)
if "is_real" in updates:
updates["is_real"] = 1 if updates["is_real"] else 0
set_clause = ", ".join(f"{k} = ?" for k in updates)
with _conn() as conn:
cur = conn.execute(
@@ -1204,30 +911,74 @@ def delete_purchase(purchase_id: int) -> bool:
def get_purchase_stats() -> Dict[str, Any]:
with _conn() as conn:
rows = conn.execute("SELECT amount, prize FROM purchase_history").fetchall()
if not rows:
import json as _json
def _calc_group(rows):
if not rows:
return {"sets": 0, "invested": 0, "prize": 0, "roi": 0.0, "win_rate": 0.0}
invested = sum(r["amount"] for r in rows)
prize = sum(r.get("total_prize") or r["prize"] for r in rows)
wins = sum(1 for r in rows if (r.get("total_prize") or r["prize"]) > 0)
return {
"total_records": 0,
"total_invested": 0,
"total_prize": 0,
"net": 0,
"return_rate": 0.0,
"prize_count": 0,
"max_prize": 0,
"sets": sum(r["sets"] for r in rows),
"invested": invested,
"prize": prize,
"roi": round((prize / invested * 100 - 100) if invested else 0.0, 2),
"win_rate": round(wins / len(rows) * 100, 2) if rows else 0.0,
}
amounts = [r["amount"] for r in rows]
prizes = [r["prize"] for r in rows]
total_invested = sum(amounts)
total_prize = sum(prizes)
with _conn() as conn:
rows = conn.execute("SELECT * FROM purchase_history").fetchall()
all_rows = [dict(r) for r in rows]
real_rows = [r for r in all_rows if r.get("is_real", 1) == 1]
virtual_rows = [r for r in all_rows if r.get("is_real", 1) == 0]
# 전략별 집계
by_strategy: Dict[str, list] = {}
for r in all_rows:
strat = r.get("source_strategy", "manual")
if strat not in by_strategy:
by_strategy[strat] = []
by_strategy[strat].append(r)
strategy_stats: Dict[str, Any] = {}
for strat, srows in by_strategy.items():
s = _calc_group(srows)
total_correct = 0
count_sets = 0
hits_3plus = 0
for r in srows:
results_raw = r.get("results", "[]")
try:
results = _json.loads(results_raw) if isinstance(results_raw, str) else (results_raw or [])
except Exception:
results = []
for res in results:
count_sets += 1
c = res.get("correct", 0)
total_correct += c
if c >= 3:
hits_3plus += 1
s["avg_correct"] = round(total_correct / count_sets, 2) if count_sets else 0.0
s["hits_3plus"] = hits_3plus
strategy_stats[strat] = s
total_invested = sum(r["amount"] for r in all_rows)
total_prize_sum = sum(r.get("total_prize") or r["prize"] for r in all_rows)
return {
"total_records": len(rows),
"total": _calc_group(all_rows),
"real": _calc_group(real_rows),
"virtual": _calc_group(virtual_rows),
"by_strategy": strategy_stats,
# 하위호환
"total_records": len(all_rows),
"total_invested": total_invested,
"total_prize": total_prize,
"net": total_prize - total_invested,
"return_rate": round((total_prize / total_invested * 100) if total_invested else 0.0, 2),
"prize_count": sum(1 for p in prizes if p > 0),
"max_prize": max(prizes),
"total_prize": total_prize_sum,
"net": total_prize_sum - total_invested,
"return_rate": round((total_prize_sum / total_invested * 100) if total_invested else 0.0, 2),
"prize_count": sum(1 for r in all_rows if (r.get("total_prize") or r["prize"]) > 0),
"max_prize": max((r.get("total_prize") or r["prize"] for r in all_rows), default=0),
}
@@ -1275,54 +1026,70 @@ def get_all_recommendation_numbers() -> List[List[int]]:
return [json.loads(r["numbers"]) for r in rows]
def upsert_subscription_profile(data: Dict[str, Any]) -> Dict[str, Any]:
field_map = {
"isHouseholdHead": "is_household_head",
"isHomeless": "is_homeless",
"homelessPeriod": "homeless_period",
"savingsMonths": "savings_months",
"savingsCount": "savings_count",
"dependents": "dependents",
"residencyArea": "residency_area",
"isMarried": "is_married",
"marriageMonths": "marriage_months",
"monthlyIncome": "monthly_income",
}
updates: Dict[str, Any] = {}
for camel, snake in field_map.items():
if camel in data:
val = data[camel]
# bool → int (SQLite)
if isinstance(val, bool):
val = 1 if val else 0
updates[snake] = val
if "specialQuals" in data:
updates["special_quals"] = json.dumps(data["specialQuals"])
# ── strategy_performance CRUD ─────────────────────────────────────────────────
def upsert_strategy_performance(strategy: str, draw_no: int, sets_count: int = 0,
total_correct: int = 0, max_correct: int = 0,
prize_total: int = 0, avg_score: float = 0.0) -> None:
with _conn() as conn:
existing = conn.execute(
"SELECT id FROM subscription_profile WHERE id = 1"
).fetchone()
conn.execute(
"""INSERT INTO strategy_performance (strategy, draw_no, sets_count, total_correct, max_correct, prize_total, avg_score)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(strategy, draw_no) DO UPDATE SET
sets_count=excluded.sets_count, total_correct=excluded.total_correct,
max_correct=excluded.max_correct, prize_total=excluded.prize_total,
avg_score=excluded.avg_score,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')""",
(strategy, draw_no, sets_count, total_correct, max_correct, prize_total, avg_score),
)
if existing:
if updates:
set_clauses = ", ".join(f"{k} = ?" for k in updates)
conn.execute(
f"UPDATE subscription_profile SET {set_clauses} WHERE id = 1",
list(updates.values()),
)
else:
cols = ["id"] + list(updates.keys())
vals = [1] + list(updates.values())
placeholders = ", ".join("?" for _ in vals)
conn.execute(
f"INSERT INTO subscription_profile ({', '.join(cols)}) VALUES ({placeholders})",
vals,
)
row = conn.execute(
"SELECT * FROM subscription_profile WHERE id = 1"
).fetchone()
return _profile_row_to_dict(row)
def get_strategy_performance(strategy: str = None, days: int = None) -> List[Dict[str, Any]]:
conditions, params = [], []
if strategy:
conditions.append("strategy = ?")
params.append(strategy)
if days:
conditions.append("updated_at >= datetime('now', ? || ' days')")
params.append(f"-{days}")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
with _conn() as conn:
rows = conn.execute(
f"SELECT * FROM strategy_performance {where} ORDER BY draw_no ASC",
params,
).fetchall()
return [dict(r) for r in rows]
# ── strategy_weights CRUD ─────────────────────────────────────────────────────
def get_strategy_weights() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM strategy_weights ORDER BY weight DESC").fetchall()
return [dict(r) for r in rows]
def update_strategy_weight(strategy: str, weight: float, ema_score: float,
total_sets: int = None, total_hits_3plus: int = None) -> None:
with _conn() as conn:
fields = "weight=?, ema_score=?, updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')"
params = [weight, ema_score]
if total_sets is not None:
fields += ", total_sets=?"
params.append(total_sets)
if total_hits_3plus is not None:
fields += ", total_hits_3plus=?"
params.append(total_hits_3plus)
params.append(strategy)
conn.execute(f"UPDATE strategy_weights SET {fields} WHERE strategy=?", params)
def update_purchase_results(purchase_id: int, results: list, total_prize: int) -> None:
"""구매 건의 결과를 갱신 (체커 호출 후)"""
import json as _json
with _conn() as conn:
conn.execute(
"UPDATE purchase_history SET results=?, total_prize=?, checked=1 WHERE id=?",
(_json.dumps(results, ensure_ascii=False), total_prize, purchase_id),
)

View File

@@ -19,12 +19,6 @@ from .db import (
get_all_todos, create_todo, update_todo, delete_todo, delete_done_todos,
# blog
get_all_posts, create_post, update_post, delete_post,
# realestate
get_all_complexes, get_complex, create_complex, update_complex, delete_complex,
# subscription
get_all_subscription_items, create_subscription_item,
update_subscription_item, delete_subscription_item,
get_subscription_profile, upsert_subscription_profile,
# 성과 통계
get_recommendation_performance,
# Phase 2: 구매 이력
@@ -33,6 +27,8 @@ from .db import (
save_weekly_report, get_weekly_report_list, get_weekly_report,
# Phase 2: 개인 패턴 분석
get_all_recommendation_numbers,
# Phase 3: 전략 관련
get_strategy_performance as db_get_strategy_performance,
)
from .recommender import recommend_numbers, recommend_with_heatmap
from .collector import sync_latest, sync_ensure_all
@@ -40,6 +36,11 @@ from .generator import run_simulation, generate_smart_recommendations
from .checker import check_results_for_draw
from .utils import calc_metrics, calc_recent_overlap
from .analyzer import get_statistical_report, generate_weekly_report, analyze_personal_patterns, generate_combined_recommendation
from .purchase_manager import check_purchases_for_draw
from .strategy_evolver import (
get_weights_with_trend, recalculate_weights,
generate_smart_recommendation,
)
app = FastAPI()
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
@@ -263,6 +264,10 @@ class PurchaseCreate(BaseModel):
sets: int = 1
prize: int = 0
note: str = ""
numbers: List[List[int]] = []
is_real: bool = True
source_strategy: str = "manual"
source_detail: dict = {}
class PurchaseUpdate(BaseModel):
@@ -271,6 +276,9 @@ class PurchaseUpdate(BaseModel):
sets: Optional[int] = None
prize: Optional[int] = None
note: Optional[str] = None
numbers: Optional[List[List[int]]] = None
is_real: Optional[bool] = None
source_strategy: Optional[str] = None
@app.get("/api/lotto/purchase/stats")
@@ -280,15 +288,28 @@ def api_purchase_stats():
@app.get("/api/lotto/purchase")
def api_purchase_list(draw_no: Optional[int] = None, days: Optional[int] = None):
"""구매 이력 조회 (draw_no, days 필터 선택)"""
return {"records": get_purchases(draw_no=draw_no, days=days)}
def api_purchase_list(draw_no: Optional[int] = None, days: Optional[int] = None,
is_real: Optional[bool] = None, strategy: Optional[str] = None):
"""구매 이력 조회 (필터: draw_no, days, is_real, strategy)"""
return {"records": get_purchases(draw_no=draw_no, days=days, is_real=is_real, strategy=strategy)}
@app.post("/api/lotto/purchase", status_code=201)
def api_purchase_create(body: PurchaseCreate):
"""구매 이력 추가"""
return add_purchase(body.draw_no, body.amount, body.sets, body.prize, body.note)
"""구매 이력 추가 (실제/가상)"""
sets = body.sets if body.sets > 0 else max(len(body.numbers), 1)
amount = body.amount if body.amount > 0 else sets * 1000
return add_purchase(
draw_no=body.draw_no,
amount=amount,
sets=sets,
prize=body.prize,
note=body.note,
numbers=body.numbers,
is_real=body.is_real,
source_strategy=body.source_strategy,
source_detail=body.source_detail,
)
@app.put("/api/lotto/purchase/{purchase_id}")
@@ -308,6 +329,40 @@ def api_purchase_delete(purchase_id: int):
return {"ok": True}
# ── 전략 진화 API ──────────────────────────────────────────────────────────
@app.get("/api/lotto/strategy/weights")
def api_strategy_weights():
"""현재 전략별 가중치 + 성과 요약 + trend"""
return get_weights_with_trend()
@app.get("/api/lotto/strategy/performance")
def api_strategy_performance(strategy: Optional[str] = None, days: Optional[int] = None):
"""전략별 회차 성과 이력 (차트용)"""
rows = db_get_strategy_performance(strategy=strategy, days=days)
return {"records": rows}
@app.post("/api/lotto/strategy/evolve")
def api_strategy_evolve():
"""수동 가중치 재계산 트리거"""
new_weights = recalculate_weights()
return {"ok": True, "weights": new_weights}
# ── 스마트 추천 API ────────────────────────────────────────────────────────
@app.get("/api/lotto/recommend/smart")
def api_recommend_smart(sets: int = 5):
"""전략 가중치 기반 메타 전략 추천"""
sets = max(1, min(sets, 10))
result = generate_smart_recommendation(sets=sets)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
# ── 통계 분석 리포트 ────────────────────────────────────────────────────────
@app.get("/api/lotto/analysis")
def api_analysis():
@@ -876,189 +931,3 @@ def api_blog_delete(post_id: int):
if not ok:
raise HTTPException(status_code=404, detail="Post not found")
return {"ok": True}
# ── RealEstate API ─────────────────────────────────────────────────────────────
VALID_STATUSES = {"청약예정", "청약중", "결과발표", "완료"}
VALID_PRIORITIES = {"high", "normal", "low"}
class ComplexCreate(BaseModel):
name: str
address: str = ""
lat: Optional[float] = None
lng: Optional[float] = None
units: Optional[int] = None
types: List[str] = []
avgPricePerPyeong: Optional[int] = None
subscriptionStart: Optional[str] = None
subscriptionEnd: Optional[str] = None
resultDate: Optional[str] = None
status: str = "청약예정"
priority: str = "normal"
tags: List[str] = []
naverUrl: str = ""
floorPlanUrl: str = ""
memo: str = ""
class ComplexUpdate(BaseModel):
name: Optional[str] = None
address: Optional[str] = None
lat: Optional[float] = None
lng: Optional[float] = None
units: Optional[int] = None
types: Optional[List[str]] = None
avgPricePerPyeong: Optional[int] = None
subscriptionStart: Optional[str] = None
subscriptionEnd: Optional[str] = None
resultDate: Optional[str] = None
status: Optional[str] = None
priority: Optional[str] = None
tags: Optional[List[str]] = None
naverUrl: Optional[str] = None
floorPlanUrl: Optional[str] = None
memo: Optional[str] = None
@app.get("/api/realestate/complexes")
def api_realestate_list():
return get_all_complexes()
@app.post("/api/realestate/complexes", status_code=201)
def api_realestate_create(body: ComplexCreate):
if body.status not in VALID_STATUSES:
raise HTTPException(status_code=400, detail=f"status must be one of {VALID_STATUSES}")
if body.priority not in VALID_PRIORITIES:
raise HTTPException(status_code=400, detail=f"priority must be one of {VALID_PRIORITIES}")
return create_complex(body.model_dump())
@app.put("/api/realestate/complexes/{complex_id}")
def api_realestate_update(complex_id: int, body: ComplexUpdate):
data = body.model_dump(exclude_none=True)
if "status" in data and data["status"] not in VALID_STATUSES:
raise HTTPException(status_code=400, detail=f"status must be one of {VALID_STATUSES}")
if "priority" in data and data["priority"] not in VALID_PRIORITIES:
raise HTTPException(status_code=400, detail=f"priority must be one of {VALID_PRIORITIES}")
updated = update_complex(complex_id, data)
if updated is None:
raise HTTPException(status_code=404, detail="Complex not found")
return updated
@app.delete("/api/realestate/complexes/{complex_id}")
def api_realestate_delete(complex_id: int):
ok = delete_complex(complex_id)
if not ok:
raise HTTPException(status_code=404, detail="Complex not found")
return {"ok": True}
# ── Subscription API ───────────────────────────────────────────────────────────
class SubscriptionItemCreate(BaseModel):
complexName: str
address: str = ""
pyeong: Optional[str] = None
totalPrice: Optional[int] = None
type: Optional[str] = None
specialType: Optional[str] = None
supplyType: Optional[str] = None
status: str = "검토중"
minScore: Optional[int] = None
maxIncome: Optional[int] = None
homelessRequired: Optional[int] = None
subscriptionStart: Optional[str] = None
subscriptionEnd: Optional[str] = None
contractDate: Optional[str] = None
interimDate: Optional[str] = None
balanceDate: Optional[str] = None
resultDate: Optional[str] = None
depositRate: int = 10
interimRate: int = 60
balanceRate: int = 30
loanType: Optional[str] = None
loanRate: Optional[float] = None
memo: str = ""
naverUrl: str = ""
class SubscriptionItemUpdate(BaseModel):
complexName: Optional[str] = None
address: Optional[str] = None
pyeong: Optional[str] = None
totalPrice: Optional[int] = None
type: Optional[str] = None
specialType: Optional[str] = None
supplyType: Optional[str] = None
status: Optional[str] = None
minScore: Optional[int] = None
maxIncome: Optional[int] = None
homelessRequired: Optional[int] = None
subscriptionStart: Optional[str] = None
subscriptionEnd: Optional[str] = None
contractDate: Optional[str] = None
interimDate: Optional[str] = None
balanceDate: Optional[str] = None
resultDate: Optional[str] = None
depositRate: Optional[int] = None
interimRate: Optional[int] = None
balanceRate: Optional[int] = None
loanType: Optional[str] = None
loanRate: Optional[float] = None
memo: Optional[str] = None
naverUrl: Optional[str] = None
class SubscriptionProfile(BaseModel):
isHouseholdHead: Optional[bool] = None
isHomeless: Optional[bool] = None
homelessPeriod: Optional[int] = None
savingsMonths: Optional[int] = None
savingsCount: Optional[int] = None
dependents: Optional[int] = None
residencyArea: Optional[str] = None
isMarried: Optional[bool] = None
marriageMonths: Optional[int] = None
monthlyIncome: Optional[int] = None
specialQuals: Optional[List[str]] = None
@app.get("/api/subscription/items")
def api_subscription_list():
return get_all_subscription_items()
@app.post("/api/subscription/items", status_code=201)
def api_subscription_create(body: SubscriptionItemCreate):
return create_subscription_item(body.model_dump())
@app.put("/api/subscription/items/{item_id}")
def api_subscription_update(item_id: int, body: SubscriptionItemUpdate):
updated = update_subscription_item(item_id, body.model_dump(exclude_none=True))
if updated is None:
raise HTTPException(status_code=404, detail="Item not found")
return updated
@app.delete("/api/subscription/items/{item_id}")
def api_subscription_delete(item_id: int):
ok = delete_subscription_item(item_id)
if not ok:
raise HTTPException(status_code=404, detail="Item not found")
return {"ok": True}
@app.get("/api/subscription/profile")
def api_subscription_profile_get():
profile = get_subscription_profile()
return profile if profile is not None else {}
@app.put("/api/subscription/profile")
def api_subscription_profile_put(body: SubscriptionProfile):
return upsert_subscription_profile(body.model_dump(exclude_none=True))

View File

@@ -0,0 +1,99 @@
"""
구매 이력 관리 + 결과 체크 모듈.
- check_purchases_for_draw(): 특정 회차 구매 건들의 결과를 자동 체크
- 체커의 _calc_rank 재사용
- 결과 체크 후 strategy_performance 자동 갱신
"""
import logging
from .db import (
get_draw, get_purchases, update_purchase_results,
upsert_strategy_performance,
)
from .checker import _calc_rank
logger = logging.getLogger("lotto-backend")
RANK_PRIZE = {1: 0, 2: 0, 3: 1_500_000, 4: 50_000, 5: 5_000}
def check_purchases_for_draw(drw_no: int) -> int:
"""
특정 회차 결과로 해당 회차 구매 건들을 채점한다.
Returns: 채점한 구매 건 수
"""
win_row = get_draw(drw_no)
if not win_row:
return 0
win_nums = [win_row["n1"], win_row["n2"], win_row["n3"],
win_row["n4"], win_row["n5"], win_row["n6"]]
bonus = win_row["bonus"]
unchecked = get_purchases(draw_no=drw_no, checked=False)
strategy_agg = {}
count = 0
for purchase in unchecked:
numbers_list = purchase["numbers"]
if not numbers_list:
continue
results = []
for nums in numbers_list:
rank, correct, has_bonus = _calc_rank(nums, win_nums, bonus)
prize = RANK_PRIZE.get(rank, 0)
results.append({
"numbers": nums,
"rank": rank,
"correct": correct,
"has_bonus": has_bonus,
"prize": prize,
})
total_prize = sum(r["prize"] for r in results)
update_purchase_results(purchase["id"], results, total_prize)
strat = purchase["source_strategy"]
if strat not in strategy_agg:
strategy_agg[strat] = {
"sets_count": 0,
"total_correct": 0,
"max_correct": 0,
"prize_total": 0,
"scores": [],
"_results": [],
}
agg = strategy_agg[strat]
agg["_results"].extend(results)
for r in results:
agg["sets_count"] += 1
agg["total_correct"] += r["correct"]
agg["max_correct"] = max(agg["max_correct"], r["correct"])
agg["prize_total"] += r["prize"]
agg["scores"].append(r["correct"] / 6.0)
count += 1
for strat, agg in strategy_agg.items():
avg_score = sum(agg["scores"]) / len(agg["scores"]) if agg["scores"] else 0.0
upsert_strategy_performance(
strategy=strat,
draw_no=drw_no,
sets_count=agg["sets_count"],
total_correct=agg["total_correct"],
max_correct=agg["max_correct"],
prize_total=agg["prize_total"],
avg_score=round(avg_score, 4),
)
# EMA 피드백 루프: 전략 가중치 진화
try:
from .strategy_evolver import evolve_after_check
evolve_after_check(strat, drw_no, agg["_results"])
except Exception:
logger.debug(f"[purchase_manager] evolve_after_check 건너뜀: {strat}")
logger.info(f"[purchase_manager] {drw_no}회차 구매 {count}건 체크 완료")
return count

View File

@@ -0,0 +1,277 @@
"""
전략 진화 엔진 — EMA + Softmax 기반 적응형 가중치 관리.
"""
import math
import json
import logging
from typing import Dict, List, Any
logger = logging.getLogger("lotto-backend")
# ── Constants (importable without DB) ─────────────────────────────────────────
ALPHA = 0.3 # EMA 감쇠율
TEMPERATURE = 2.0 # Softmax 온도
MIN_WEIGHT = 0.05 # 최소 가중치
INITIAL_EMA = 0.15 # 콜드스타트 초기값
MIN_DATA_DRAWS = 10 # 학습 최소 회차
STRATEGIES = ["combined", "simulation", "heatmap", "manual", "custom"]
RANK_BONUS = {5: 0.1, 4: 0.3, 3: 0.6, 2: 0.8, 1: 1.0}
# ── Pure functions (no DB dependency) ─────────────────────────────────────────
def calc_draw_score(results: List[Dict]) -> float:
"""구매 결과 리스트 → 평균 성과 점수"""
if not results:
return 0.0
scores = []
for r in results:
s = r.get("correct", 0) / 6.0
s += RANK_BONUS.get(r.get("rank", 0), 0)
scores.append(s)
return sum(scores) / len(scores)
def _softmax_weights(ema_scores: Dict[str, float]) -> Dict[str, float]:
"""EMA 점수 → Softmax → 최소 가중치 보장 → 정규화"""
raw = {s: math.exp(ema / TEMPERATURE) for s, ema in ema_scores.items()}
total = sum(raw.values())
weights = {s: v / total for s, v in raw.items()}
clamped = {}
surplus = 0.0
unclamped = []
for s, w in weights.items():
if w < MIN_WEIGHT:
clamped[s] = MIN_WEIGHT
surplus += MIN_WEIGHT - w
else:
unclamped.append(s)
clamped[s] = w
if surplus > 0 and unclamped:
unclamped_total = sum(clamped[s] for s in unclamped)
for s in unclamped:
clamped[s] -= surplus * (clamped[s] / unclamped_total)
final_total = sum(clamped.values())
return {s: round(v / final_total, 4) for s, v in clamped.items()}
# ── DB-dependent functions (use lazy imports) ─────────────────────────────────
def _db():
"""Lazy import to avoid circular/relative import issues in tests"""
from . import db as _db_mod
return _db_mod
def _recommender():
from . import recommender as _rec_mod
return _rec_mod
def _analyzer():
from . import analyzer as _ana_mod
return _ana_mod
def update_ema_for_strategy(strategy: str, draw_score: float) -> float:
db = _db()
weights = db.get_strategy_weights()
current = next((w for w in weights if w["strategy"] == strategy), None)
old_ema = current["ema_score"] if current else INITIAL_EMA
new_ema = ALPHA * draw_score + (1 - ALPHA) * old_ema
return new_ema
def recalculate_weights() -> Dict[str, float]:
db = _db()
weights_rows = db.get_strategy_weights()
ema_scores = {w["strategy"]: w["ema_score"] for w in weights_rows}
for s in STRATEGIES:
if s not in ema_scores:
ema_scores[s] = INITIAL_EMA
new_weights = _softmax_weights(ema_scores)
for s, w in new_weights.items():
row = next((r for r in weights_rows if r["strategy"] == s), None)
db.update_strategy_weight(
strategy=s,
weight=w,
ema_score=ema_scores[s],
total_sets=row["total_sets"] if row else 0,
total_hits_3plus=row["total_hits_3plus"] if row else 0,
)
logger.info(f"[strategy_evolver] 가중치 재계산: {new_weights}")
return new_weights
def evolve_after_check(strategy: str, draw_no: int, results: List[Dict]) -> None:
db = _db()
draw_score = calc_draw_score(results)
new_ema = update_ema_for_strategy(strategy, draw_score)
weights_rows = db.get_strategy_weights()
current = next((w for w in weights_rows if w["strategy"] == strategy), None)
hits_3plus = sum(1 for r in results if r.get("correct", 0) >= 3)
db.update_strategy_weight(
strategy=strategy,
weight=current["weight"] if current else 0.2,
ema_score=new_ema,
total_sets=(current["total_sets"] if current else 0) + len(results),
total_hits_3plus=(current["total_hits_3plus"] if current else 0) + hits_3plus,
)
recalculate_weights()
def get_weights_with_trend() -> Dict[str, Any]:
db = _db()
weights = db.get_strategy_weights()
perfs = db.get_strategy_performance()
strat_perfs = {}
for p in perfs:
s = p["strategy"]
if s not in strat_perfs:
strat_perfs[s] = []
strat_perfs[s].append(p)
result = []
for w in weights:
sp = strat_perfs.get(w["strategy"], [])
if len(sp) >= 5:
recent_avg = sum(p["avg_score"] for p in sp[-3:]) / 3
older_avg = sum(p["avg_score"] for p in sp[-5:-2]) / 3
delta = recent_avg - older_avg
trend = "up" if delta > 0.02 else ("down" if delta < -0.02 else "stable")
else:
trend = "stable"
result.append({
"strategy": w["strategy"],
"weight": w["weight"],
"ema_score": w["ema_score"],
"total_sets": w["total_sets"],
"hits_3plus": w["total_hits_3plus"],
"trend": trend,
})
all_draws = set()
for p in perfs:
all_draws.add(p["draw_no"])
return {
"weights": result,
"last_evolved": weights[0]["updated_at"] if weights else None,
"min_data_draws": MIN_DATA_DRAWS,
"current_data_draws": len(all_draws),
"status": "active" if len(all_draws) >= MIN_DATA_DRAWS else "learning",
}
def generate_smart_recommendation(sets: int = 5) -> Dict[str, Any]:
db = _db()
rec = _recommender()
ana = _analyzer()
weights_data = db.get_strategy_weights()
weight_map = {w["strategy"]: w["weight"] for w in weights_data}
draws = db.get_all_draw_numbers()
if not draws:
return {"error": "No draw data"}
latest = db.get_latest_draw()
cache = ana.build_analysis_cache(draws)
past_recs = db.list_recommendations_ex(limit=100, sort="id_desc")
candidates = []
seen_keys = set()
def _add_candidate(nums: list, strategy: str, raw_score: float = None):
key = tuple(sorted(nums))
if key in seen_keys:
return
seen_keys.add(key)
if raw_score is None:
sc = ana.score_combination(nums, cache)
raw_score = sc["score_total"]
meta = raw_score * weight_map.get(strategy, 0.1)
candidates.append({
"numbers": sorted(nums),
"raw_score": round(raw_score, 4),
"strategy": strategy,
"meta_score": round(meta, 4),
})
# combined: 10세트
for _ in range(10):
try:
r = ana.generate_combined_recommendation(draws)
if "final_numbers" in r:
_add_candidate(r["final_numbers"], "combined")
except Exception:
pass
# simulation: best_picks 상위 10개
best = db.get_best_picks(limit=10)
for b in best:
nums = json.loads(b["numbers"]) if isinstance(b["numbers"], str) else b["numbers"]
_add_candidate(nums, "simulation", b.get("score_total"))
# heatmap: 10세트
for _ in range(10):
try:
r = rec.recommend_with_heatmap(draws, past_recs)
_add_candidate(r["numbers"], "heatmap")
except Exception:
pass
# manual: 10세트
for _ in range(10):
try:
r = rec.recommend_numbers(draws)
_add_candidate(r["numbers"], "manual")
except Exception:
pass
candidates.sort(key=lambda c: -c["meta_score"])
top = candidates[:sets]
result_sets = []
for c in top:
sc = ana.score_combination(c["numbers"], cache)
contributions = {}
for strat in STRATEGIES:
contributions[strat] = round(weight_map.get(strat, 0) * sc["score_total"], 4)
contrib_total = sum(contributions.values()) or 1
contributions = {s: round(v / contrib_total, 3) for s, v in contributions.items()}
result_sets.append({
"numbers": c["numbers"],
"meta_score": c["meta_score"],
"source_strategy": c["strategy"],
"contribution": contributions,
"individual_scores": {k: round(v, 4) for k, v in sc.items()},
})
perfs = db.get_strategy_performance()
data_draws = len(set(p["draw_no"] for p in perfs))
status = "active" if data_draws >= MIN_DATA_DRAWS else "learning"
return {
"sets": result_sets,
"strategy_weights_used": weight_map,
"learning_status": {
"draws_learned": data_draws,
"status": status,
"message": "" if status == "active" else f"{MIN_DATA_DRAWS}회차 이상 데이터 필요 (현재 {data_draws}회차)",
},
"based_on_latest_draw": latest["drw_no"] if latest else None,
}

View File

@@ -0,0 +1,61 @@
# backend/tests/test_integration.py
"""checker.py → purchase_manager 연동 통합 테스트"""
import sys, os
import sqlite3
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
from unittest.mock import patch
def _make_mem_conn():
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
return conn
def test_check_results_triggers_purchase_check():
"""check_results_for_draw가 purchase 체크도 트리거하는지 검증"""
import db
import backend.app.purchase_manager as pm
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
# 당첨번호 삽입
mem.execute(
"INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(1124, "2026-03-28", 1, 2, 3, 4, 5, 6, 7)
)
mem.execute(
"INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(1125, "2026-04-04", 10, 20, 30, 35, 40, 44, 15)
)
mem.commit()
# 1125회차 대상 구매 등록
db.add_purchase(
draw_no=1125, amount=1000, sets=1,
numbers=[[10, 20, 30, 1, 2, 3]],
is_real=True, source_strategy="combined",
)
# purchase_manager의 check_purchases_for_draw<61><77><EFBFBD> 직접 호출하여 연동 검증
with patch("db._conn", return_value=mem), \
patch("backend.app.purchase_manager.get_draw", side_effect=lambda drw: db.get_draw(drw)), \
patch("backend.app.purchase_manager.get_purchases", side_effect=lambda **kw: db.get_purchases(**kw)), \
patch("backend.app.purchase_manager.update_purchase_results", side_effect=lambda *a, **kw: db.update_purchase_results(*a, **kw)), \
patch("backend.app.purchase_manager.upsert_strategy_performance", side_effect=lambda **kw: db.upsert_strategy_performance(**kw)):
purchase_count = pm.check_purchases_for_draw(1125)
assert purchase_count == 1
# purchase가 체크되었는지 확인
with patch("db._conn", return_value=mem):
purchases = db.get_purchases(draw_no=1125)
assert purchases[0]["checked"] == 1
assert purchases[0]["results"][0]["correct"] == 3 # 10, 20, 30 맞음
mem.close()

View File

@@ -0,0 +1,309 @@
# backend/tests/test_purchase_manager.py
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
# Also insert the backend root so that "backend.app" package is importable
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
import sqlite3
import pytest
from unittest.mock import patch, MagicMock
# ":memory:" 공유 커넥션 — 각 테스트에서 독립적으로 생성
def _make_mem_conn():
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
return conn
def test_purchase_history_has_new_columns():
"""purchase_history 테이블에 신규 컬럼이 존재하는지 검증"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
cols = {r["name"] for r in mem.execute("PRAGMA table_info(purchase_history)").fetchall()}
assert "numbers" in cols
assert "is_real" in cols
assert "source_strategy" in cols
assert "source_detail" in cols
assert "checked" in cols
assert "results" in cols
assert "total_prize" in cols
# 기존 컬럼도 유지
assert "draw_no" in cols
assert "amount" in cols
assert "sets" in cols
assert "prize" in cols
assert "note" in cols
mem.close()
def test_strategy_performance_table_exists():
"""strategy_performance 테이블이 생성되는지 검증"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
cols = {r["name"] for r in mem.execute("PRAGMA table_info(strategy_performance)").fetchall()}
assert "strategy" in cols
assert "draw_no" in cols
assert "sets_count" in cols
assert "total_correct" in cols
assert "avg_score" in cols
mem.close()
def test_strategy_weights_table_exists():
"""strategy_weights 테이블이 생성되고 초기값이 있는지 검증"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
rows = mem.execute("SELECT * FROM strategy_weights ORDER BY strategy").fetchall()
strategies = {r["strategy"] for r in rows}
assert strategies == {"combined", "simulation", "heatmap", "manual", "custom"}
# 가중치 합이 1.0
total_weight = sum(r["weight"] for r in rows)
assert abs(total_weight - 1.0) < 0.01
mem.close()
def test_add_purchase_with_numbers():
"""번호 포함 구매 등록"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
result = db.add_purchase(
draw_no=1150,
amount=5000,
sets=5,
numbers=[[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12]],
is_real=False,
source_strategy="simulation",
source_detail={"run_id": 42},
)
assert result["draw_no"] == 1150
assert result["amount"] == 5000
assert result["is_real"] == 0
assert result["source_strategy"] == "simulation"
assert result["numbers"] == [[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12]]
assert result["source_detail"] == {"run_id": 42}
mem.close()
def test_get_purchases_filter_is_real():
"""is_real 필터 동작"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
db.add_purchase(draw_no=1150, amount=5000, sets=5, is_real=True)
db.add_purchase(draw_no=1150, amount=1000, sets=1, is_real=False)
real_only = db.get_purchases(is_real=True)
virtual_only = db.get_purchases(is_real=False)
assert len(real_only) == 1
assert real_only[0]["is_real"] == 1
assert len(virtual_only) == 1
assert virtual_only[0]["is_real"] == 0
mem.close()
def test_get_purchase_stats_by_type():
"""실제/가상 분리 통계"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
db.add_purchase(draw_no=1150, amount=5000, sets=5, is_real=True, source_strategy="manual")
db.add_purchase(draw_no=1150, amount=1000, sets=1, is_real=False, source_strategy="simulation")
stats = db.get_purchase_stats()
assert "total" in stats
assert "real" in stats
assert "virtual" in stats
assert "by_strategy" in stats
assert stats["total"]["sets"] == 6
assert stats["real"]["sets"] == 5
assert stats["virtual"]["sets"] == 1
assert "manual" in stats["by_strategy"]
assert "simulation" in stats["by_strategy"]
# 하위호환 필드
assert "total_records" in stats
assert stats["total_records"] == 2
mem.close()
def test_upsert_strategy_performance():
"""전략 성과 upsert"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
# 최초 insert
db.upsert_strategy_performance(
strategy="simulation",
draw_no=1150,
sets_count=10,
total_correct=30,
max_correct=5,
prize_total=5000,
avg_score=3.0,
)
rows = db.get_strategy_performance(strategy="simulation")
assert len(rows) == 1
assert rows[0]["sets_count"] == 10
assert rows[0]["avg_score"] == 3.0
# upsert (동일 strategy+draw_no)
db.upsert_strategy_performance(
strategy="simulation",
draw_no=1150,
sets_count=20,
total_correct=60,
max_correct=6,
prize_total=10000,
avg_score=4.5,
)
rows = db.get_strategy_performance(strategy="simulation")
assert len(rows) == 1 # 중복 없이 1개
assert rows[0]["sets_count"] == 20
assert rows[0]["avg_score"] == 4.5
mem.close()
def test_update_strategy_weight():
"""전략 가중치 업데이트"""
import db
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
# 초기값 확인
weights_before = db.get_strategy_weights()
combined_before = next(w for w in weights_before if w["strategy"] == "combined")
original_weight = combined_before["weight"]
# 업데이트
db.update_strategy_weight(
strategy="combined",
weight=0.5,
ema_score=0.75,
total_sets=100,
total_hits_3plus=20,
)
weights_after = db.get_strategy_weights()
combined_after = next(w for w in weights_after if w["strategy"] == "combined")
assert combined_after["weight"] == 0.5
assert combined_after["ema_score"] == 0.75
assert combined_after["total_sets"] == 100
assert combined_after["total_hits_3plus"] == 20
mem.close()
# ── purchase_manager 테스트 ───────────────────────────────────────────────────
def _import_purchase_manager_with_mem(mem_conn):
"""purchase_manager를 메모리 DB에 연결된 상태로 임포트."""
import db
import importlib
# backend.app 패키지로 로드해 상대 임포트가 동작하게 함
import backend.app.purchase_manager as pm
return pm
def test_check_purchases_for_draw():
"""특정 회차 구매 건들의 결과 체크"""
import db
import backend.app.purchase_manager as pm
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
# 당첨번호 삽입: 1125회 [3,12,23,34,38,45] bonus=7
mem.execute(
"""INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(1125, "2024-12-01", 3, 12, 23, 34, 38, 45, 7),
)
mem.commit()
# 구매 등록: 1등 번호 세트 + 낙첨 세트
purchase = db.add_purchase(
draw_no=1125,
amount=2000,
sets=2,
numbers=[[3, 12, 23, 34, 38, 45], [1, 2, 3, 4, 5, 6]],
is_real=False,
source_strategy="simulation",
)
with patch("db._conn", return_value=mem), \
patch("backend.app.purchase_manager.get_draw", side_effect=lambda drw: db.get_draw(drw)), \
patch("backend.app.purchase_manager.get_purchases", side_effect=lambda **kw: db.get_purchases(**kw)), \
patch("backend.app.purchase_manager.update_purchase_results", side_effect=lambda *a, **kw: db.update_purchase_results(*a, **kw)), \
patch("backend.app.purchase_manager.upsert_strategy_performance", side_effect=lambda **kw: db.upsert_strategy_performance(**kw)):
count = pm.check_purchases_for_draw(1125)
assert count == 1
# 결과 확인
with patch("db._conn", return_value=mem):
checked = db.get_purchases(draw_no=1125, checked=True)
assert len(checked) == 1
results = checked[0]["results"]
assert results is not None
assert len(results) == 2
# 첫 번째 세트: 6개 일치 → 1등
assert results[0]["rank"] == 1
assert results[0]["correct"] == 6
# 두 번째 세트: 3 하나만 일치 → 낙첨(correct=1)
assert results[1]["rank"] == 0
assert results[1]["correct"] == 1
mem.close()
def test_check_purchases_updates_strategy_performance():
"""결과 체크 후 strategy_performance가 갱신되는지 검증"""
import db
import backend.app.purchase_manager as pm
mem = _make_mem_conn()
with patch("db._conn", return_value=mem):
db.init_db()
# 당첨번호 삽입: 1126회
mem.execute(
"""INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(1126, "2024-12-08", 1, 2, 3, 4, 5, 6, 7),
)
mem.commit()
db.add_purchase(
draw_no=1126,
amount=5000,
sets=5,
numbers=[[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 41, 42]],
is_real=False,
source_strategy="simulation",
)
with patch("db._conn", return_value=mem), \
patch("backend.app.purchase_manager.get_draw", side_effect=lambda drw: db.get_draw(drw)), \
patch("backend.app.purchase_manager.get_purchases", side_effect=lambda **kw: db.get_purchases(**kw)), \
patch("backend.app.purchase_manager.update_purchase_results", side_effect=lambda *a, **kw: db.update_purchase_results(*a, **kw)), \
patch("backend.app.purchase_manager.upsert_strategy_performance", side_effect=lambda **kw: db.upsert_strategy_performance(**kw)):
count = pm.check_purchases_for_draw(1126)
assert count == 1
with patch("db._conn", return_value=mem):
perf = db.get_strategy_performance(strategy="simulation")
assert len(perf) >= 1
entry = next((p for p in perf if p["draw_no"] == 1126), None)
assert entry is not None, "draw_no=1126 에 대한 strategy_performance 없음"
assert entry["strategy"] == "simulation"
assert entry["sets_count"] == 2 # 2개 세트
mem.close()

View File

@@ -0,0 +1,72 @@
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import math
import pytest
def test_calc_draw_score_basic():
"""세트별 결과 → draw_score 계산"""
from strategy_evolver import calc_draw_score
results = [
{"correct": 3, "rank": 5}, # 3/6 + 0.1 = 0.6
{"correct": 1, "rank": 0}, # 1/6 + 0 = 0.167
]
score = calc_draw_score(results)
expected = ((3/6 + 0.1) + (1/6)) / 2
assert abs(score - expected) < 0.01
def test_calc_draw_score_empty():
"""빈 결과 → 0"""
from strategy_evolver import calc_draw_score
assert calc_draw_score([]) == 0.0
def test_recalculate_weights_softmax():
"""EMA → Softmax 가중치 변환"""
from strategy_evolver import _softmax_weights
ema_scores = {
"combined": 0.30,
"simulation": 0.25,
"heatmap": 0.15,
"manual": 0.10,
"custom": 0.05,
}
weights = _softmax_weights(ema_scores)
assert abs(sum(weights.values()) - 1.0) < 0.001
assert weights["combined"] > weights["simulation"]
assert weights["simulation"] > weights["heatmap"]
assert all(w >= 0.049 for w in weights.values())
def test_recalculate_weights_min_weight():
"""한 전략의 EMA가 매우 낮아도 최소 5% 보장"""
from strategy_evolver import _softmax_weights
ema_scores = {
"combined": 0.50,
"simulation": 0.01,
"heatmap": 0.01,
"manual": 0.01,
"custom": 0.01,
}
weights = _softmax_weights(ema_scores)
assert weights["simulation"] >= 0.049
assert weights["custom"] >= 0.049
assert abs(sum(weights.values()) - 1.0) < 0.001
def test_update_ema():
"""EMA 갱신 공식 검증"""
from strategy_evolver import ALPHA
old_ema = 0.15
draw_score = 0.40
new_ema = ALPHA * draw_score + (1 - ALPHA) * old_ema
expected = 0.3 * 0.40 + 0.7 * 0.15 # = 0.225
assert abs(new_ema - expected) < 0.001

View File

@@ -89,6 +89,25 @@ services:
timeout: 5s
retries: 3
realestate-lab:
build:
context: ./realestate-lab
container_name: realestate-lab
restart: unless-stopped
ports:
- "18800:8000"
environment:
- TZ=${TZ:-Asia/Seoul}
- DATA_GO_KR_API_KEY=${DATA_GO_KR_API_KEY:-}
- CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080}
volumes:
- ${REALESTATE_DATA_PATH:-./data/realestate}:/app/data
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 30s
timeout: 5s
retries: 3
travel-proxy:
build: ./travel-proxy
container_name: travel-proxy
@@ -119,6 +138,7 @@ services:
depends_on:
- music-lab
- blog-lab
- realestate-lab
ports:
- "8080:80"
volumes:

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,342 @@
# realestate-lab 설계 스펙
> 부동산 청약 공고 자동 수집 + 프로필 기반 자격 매칭 서비스
---
## 1. 개요
공공데이터포털(한국부동산원 청약홈 분양정보 API)에서 청약 공고를 자동 수집하고, 사용자 프로필 기반으로 지원 가능 여부를 자동 판별하는 독립 서비스.
**핵심 목표:**
- 수동 공고 등록 없이 자동 수집 → DB 저장
- 프로필 기반 자격 매칭 → 지원 가능한 청약만 필터링
- 프론트에서 "새 공고 N건" 확인 → 향후 텔레그램 알림 확장
---
## 2. 서비스 아키텍처
### 독립 서비스 구조
```
realestate-lab/ # 포트 18800
├── app/
│ ├── main.py # FastAPI 앱 + APScheduler
│ ├── db.py # SQLite CRUD (realestate.db)
│ ├── collector.py # 공공데이터포털 API 수집기
│ ├── matcher.py # 프로필 기반 자격 매칭 엔진
│ └── models.py # Pydantic 요청/응답 모델
├── Dockerfile
└── requirements.txt
```
### 수집 흐름
```
APScheduler (매일 09:00)
→ collector.py: 청약홈 API 5개 엔드포인트 호출
→ DB에 신규 공고 upsert (HOUSE_MANAGE_NO + PBLANC_NO 기준)
→ matcher.py: 프로필 매칭 → 적격 공고에 match_status 부여
→ 신규 매칭 공고 카운트 → (향후) 텔레그램 알림
```
---
## 3. 데이터 소스
### 공공데이터포털 — 한국부동산원_청약홈 분양정보 조회 서비스
- **Base URL**: `https://api.odcloud.kr/api`
- **서비스 키**: `DATA_GO_KR_API_KEY` 환경변수
- **일 호출 제한**: 40,000건
- **데이터 포맷**: JSON
### 수집 대상 API 엔드포인트
| 엔드포인트 | 설명 |
|-----------|------|
| `/ApplyhomeInfoDetailSvc/v1/getAPTLttotPblancDetail` | APT 분양정보 상세 |
| `/ApplyhomeInfoDetailSvc/v1/getUrbtyOfctlLttotPblancDetail` | 오피스텔/도시형/민간임대 상세 |
| `/ApplyhomeInfoDetailSvc/v1/getRemndrLttotPblancDetail` | 잔여세대 상세 |
| `/ApplyhomeInfoDetailSvc/v1/getPblPvtRentLttotPblancDetail` | 공공지원 민간임대 상세 |
| `/ApplyhomeInfoDetailSvc/v1/getOPTLttotPblancDetail` | 임의공급 상세 |
### 주택형별 상세 API (모델별 세대수·분양가)
| 엔드포인트 | 설명 |
|-----------|------|
| `/ApplyhomeInfoDetailSvc/v1/getAPTLttotPblancMdl` | APT 주택형별 |
| `/ApplyhomeInfoDetailSvc/v1/getUrbtyOfctlLttotPblancMdl` | 오피스텔 주택형별 |
| `/ApplyhomeInfoDetailSvc/v1/getRemndrLttotPblancMdl` | 잔여세대 주택형별 |
| `/ApplyhomeInfoDetailSvc/v1/getPblPvtRentLttotPblancMdl` | 공공지원 민간임대 주택형별 |
| `/ApplyhomeInfoDetailSvc/v1/getOPTLttotPblancMdl` | 임의공급 주택형별 |
### 공통 쿼리 파라미터
- `page` (기본: 1), `perPage` (기본: 100)
- `serviceKey` — 인코딩된 API 키
- `cond[RCRIT_PBLANC_DE::GTE]` / `cond[RCRIT_PBLANC_DE::LTE]` — 모집공고일 범위 필터
---
## 4. DB 스키마 (realestate.db)
### announcements (청약 공고)
| 컬럼 | 타입 | 설명 |
|------|------|------|
| id | INTEGER PK | 자동 증가 |
| house_manage_no | TEXT NOT NULL | 주택관리번호 |
| pblanc_no | TEXT NOT NULL | 공고번호 |
| house_nm | TEXT | 주택명 |
| house_secd | TEXT | 주택구분코드 (01:APT, 02:오피스텔, 04:무순위 등) |
| house_dtl_secd | TEXT | 주택상세구분코드 (01:민영, 03:국민 등) |
| rent_secd | TEXT | 분양구분 (0:분양, 1:임대) |
| region_code | TEXT | 공급지역코드 |
| region_name | TEXT | 공급지역명 |
| address | TEXT | 공급위치 |
| total_units | INTEGER | 공급규모 |
| rcrit_date | TEXT | 모집공고일 |
| receipt_start | TEXT | 청약접수시작일 |
| receipt_end | TEXT | 청약접수종료일 |
| spsply_start | TEXT | 특별공급 접수시작일 |
| spsply_end | TEXT | 특별공급 접수종료일 |
| gnrl_rank1_start | TEXT | 1순위 접수시작일 |
| gnrl_rank1_end | TEXT | 1순위 접수종료일 |
| winner_date | TEXT | 당첨자발표일 |
| contract_start | TEXT | 계약시작일 |
| contract_end | TEXT | 계약종료일 |
| homepage_url | TEXT | 홈페이지 |
| pblanc_url | TEXT | 공고 URL |
| constructor | TEXT | 시공사 |
| developer | TEXT | 시행사 |
| move_in_month | TEXT | 입주예정월 |
| is_speculative_area | TEXT | 투기과열지구 |
| is_price_cap | TEXT | 분양가상한제 |
| contact | TEXT | 문의처 |
| status | TEXT | 청약예정/청약중/결과발표/완료 (자동 계산) |
| source | TEXT | auto/manual |
| created_at | TEXT | |
| updated_at | TEXT | |
- UNIQUE 제약: `(house_manage_no, pblanc_no)`
- INDEX: `idx_realestate_status` on `status`
- INDEX: `idx_realestate_region` on `region_name`
### announcement_models (주택형별 상세)
| 컬럼 | 타입 | 설명 |
|------|------|------|
| id | INTEGER PK | |
| house_manage_no | TEXT | FK → announcements |
| pblanc_no | TEXT | FK → announcements |
| model_no | TEXT | 모델번호 |
| house_ty | TEXT | 주택형 (84A 등) |
| supply_area | REAL | 공급면적(㎡) |
| general_units | INTEGER | 일반공급 세대수 |
| special_units | INTEGER | 특별공급 세대수 |
| multi_child_units | INTEGER | 다자녀 |
| newlywed_units | INTEGER | 신혼부부 |
| first_life_units | INTEGER | 생애최초 |
| old_parent_units | INTEGER | 노부모부양 |
| institution_units | INTEGER | 기관추천 |
| youth_units | INTEGER | 청년 |
| newborn_units | INTEGER | 신생아 |
| top_amount | INTEGER | 분양최고금액(만원) |
- UNIQUE: `(house_manage_no, pblanc_no, model_no)`
### user_profile (사용자 청약 프로필)
| 컬럼 | 타입 | 설명 |
|------|------|------|
| id | INTEGER PK | 항상 1 (단일 사용자) |
| name | TEXT | 이름 |
| age | INTEGER | 나이 |
| is_homeless | BOOLEAN | 무주택 여부 |
| is_householder | BOOLEAN | 세대주 여부 |
| subscription_months | INTEGER | 청약통장 가입개월수 |
| subscription_amount | INTEGER | 청약통장 납입총액(만원) |
| family_members | INTEGER | 세대원 수 |
| has_dependents | BOOLEAN | 부양가족 유무 |
| children_count | INTEGER | 미성년 자녀수 |
| is_newlywed | BOOLEAN | 신혼부부 여부 |
| marriage_months | INTEGER | 혼인기간(개월) |
| has_newborn | BOOLEAN | 2세 이하 자녀 유무 |
| is_first_home | BOOLEAN | 생애최초 해당 여부 |
| income_level | TEXT | 소득수준 (100%이하/100~130%/130~160%) |
| preferred_regions | TEXT | 관심지역 JSON 배열 |
| preferred_types | TEXT | 관심주택유형 JSON 배열 |
| min_area | REAL | 최소 희망면적(㎡) |
| max_area | REAL | 최대 희망면적(㎡) |
| max_price | INTEGER | 최대 분양가(만원) |
| updated_at | TEXT | |
### match_results (매칭 결과)
| 컬럼 | 타입 | 설명 |
|------|------|------|
| id | INTEGER PK | |
| announcement_id | INTEGER | FK → announcements |
| model_id | INTEGER | FK → announcement_models (nullable) |
| match_score | INTEGER | 매칭 점수 (0~100) |
| match_reasons | TEXT | 매칭 사유 JSON 배열 |
| eligible_types | TEXT | 지원 가능 유형 JSON 배열 |
| is_new | BOOLEAN | 신규 매칭 여부 (알림용) |
| created_at | TEXT | |
- UNIQUE: `(announcement_id, model_id)`
---
## 5. API 엔드포인트
### 청약 공고
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/realestate/announcements` | 공고 목록 (필터: region, status, house_type, matched_only, sort, page, size) |
| GET | `/api/realestate/announcements/{id}` | 공고 상세 (주택형별 포함) |
| POST | `/api/realestate/announcements` | 수동 공고 등록 |
| PUT | `/api/realestate/announcements/{id}` | 공고 수정 |
| DELETE | `/api/realestate/announcements/{id}` | 공고 삭제 |
### 수집 관리
| 메서드 | 경로 | 설명 |
|--------|------|------|
| POST | `/api/realestate/collect` | 수동 수집 트리거 |
| GET | `/api/realestate/collect/status` | 마지막 수집 결과 (수집일시, 신규건수, 에러) |
### 프로필
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/realestate/profile` | 내 프로필 조회 |
| PUT | `/api/realestate/profile` | 프로필 수정 (upsert) |
### 매칭
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/realestate/matches` | 매칭 결과 목록 (점수순, 신규 우선) |
| POST | `/api/realestate/matches/refresh` | 매칭 재계산 |
| PATCH | `/api/realestate/matches/{id}/read` | 신규 알림 읽음 처리 |
### 대시보드
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/realestate/dashboard` | 요약 (진행중 공고수, 신규 매칭수, 다가오는 일정) |
---
## 6. 매칭 엔진
### 점수 산출 (0~100)
| 기준 | 가중치 | 로직 |
|------|--------|------|
| 지역 매칭 | 30 | preferred_regions에 포함 → 30점 |
| 주택유형 매칭 | 10 | preferred_types에 포함 → 10점 |
| 면적 매칭 | 15 | min_area~max_area 범위 내 주택형 존재 → 15점 |
| 가격 매칭 | 15 | max_price 이하 주택형 존재 → 15점 |
| 자격 매칭 | 30 | 지원 가능 공급유형 수에 비례 |
### 자격 매칭 세부
| 공급유형 | 판별 조건 |
|----------|----------|
| 일반 1순위 | 무주택 + 세대주 + 청약통장 가입기간 충족 (투기과열 24개월, 그 외 12개월) |
| 일반 2순위 | 1순위 미충족 시 |
| 특별-신혼부부 | is_newlywed + 무주택 + 소득기준 |
| 특별-생애최초 | is_first_home + 무주택 + 소득기준 |
| 특별-다자녀 | children_count >= 2 + 무주택 |
| 특별-노부모부양 | has_dependents + 무주택 |
| 특별-청년 | age 19~39 + 무주택 |
| 특별-신생아 | has_newborn + 무주택 |
- 1개 유형 → 10점, 2개 → 20점, 3개 이상 → 30점
- `eligible_types`: 지원 가능 유형 목록 저장
- `match_reasons`: 각 판별 사유 저장
### 상태 자동 계산
```
오늘 < receipt_start → 청약예정
receipt_start ≤ 오늘 ≤ receipt_end → 청약중
receipt_end < 오늘 ≤ winner_date → 결과발표
오늘 > winner_date → 완료
```
### 매칭 실행 시점
- 신규 공고 수집 후 자동 실행
- 프로필 변경 시 `POST /matches/refresh`로 재계산
- 매일 00:00 상태 갱신 시 재매칭
---
## 7. 인프라 통합
### Docker Compose
```yaml
realestate-lab:
build: ./realestate-lab
container_name: realestate-lab
ports:
- "18800:8000"
volumes:
- ${RUNTIME_PATH:-.}/data:/app/data
environment:
- DATA_GO_KR_API_KEY=${DATA_GO_KR_API_KEY}
restart: unless-stopped
```
### Nginx
```nginx
location /api/realestate/ {
proxy_pass http://realestate-lab:8000;
}
```
### APScheduler
| 시간 | Job | 설명 |
|------|-----|------|
| 매일 09:00 | `run_collection` | 5개 API 수집 → 매칭 |
| 매일 00:00 | `update_statuses` | 날짜 기반 상태 갱신 |
### 배포
- `scripts/deploy-nas.sh``realestate-lab/` rsync 대상 추가
---
## 8. lotto-backend 제거 대상
| 파일 | 제거 항목 |
|------|----------|
| `backend/app/db.py` | `realestate_complexes` 테이블 생성, CRUD 함수 5개 |
| `backend/app/main.py` | `ComplexCreate`/`ComplexUpdate` 모델, `/api/realestate/complexes` 라우트 4개 |
기존 `realestate_complexes` 테이블 데이터는 마이그레이션 불필요 (스키마 완전 상이).
---
## 9. 환경변수
| 변수 | 설명 | 필수 |
|------|------|------|
| `DATA_GO_KR_API_KEY` | 공공데이터포털 API 키 | 선택 (미설정 시 수동 등록만 가능) |
---
## 10. 향후 확장
- **텔레그램 알림**: 신규 매칭 공고 발생 시 텔레그램 봇으로 push (알림 모듈 분리 구조 대비)
- **경쟁률 조회**: 청약 접수 기간 중 경쟁률 실시간 수집
- **실거래가 비교**: 주변 시세와 분양가 비교 분석

View File

@@ -42,6 +42,16 @@ server {
proxy_pass http://$music_backend$request_uri;
}
# realestate API
location /api/realestate/ {
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_pass http://realestate-lab:8000/api/realestate/;
}
# travel thumbnails (generated by travel-proxy, stored in /data/thumbs)
location ^~ /media/travel/.thumb/ {
alias /data/thumbs/;

View File

@@ -0,0 +1,9 @@
FROM python:3.12-alpine
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

View File

@@ -0,0 +1,165 @@
import os
import logging
import requests
from typing import List, Dict, Any
from .db import upsert_announcement, upsert_model, save_collect_log
logger = logging.getLogger("realestate-lab")
API_BASE = "https://api.odcloud.kr/api/ApplyhomeInfoDetailSvc/v1"
API_KEY = os.getenv("DATA_GO_KR_API_KEY", "")
# 5 detail+model endpoint pairs
DETAIL_ENDPOINTS = [
("getAPTLttotPblancDetail", "getAPTLttotPblancMdl"),
("getUrbtyOfctlLttotPblancDetail", "getUrbtyOfctlLttotPblancMdl"),
("getRemndrLttotPblancDetail", "getRemndrLttotPblancMdl"),
("getPblPvtRentLttotPblancDetail", "getPblPvtRentLttotPblancMdl"),
("getOPTLttotPblancDetail", "getOPTLttotPblancMdl"),
]
def _api_call(endpoint: str, params: Dict[str, Any] = None) -> List[Dict]:
"""페이지네이션 처리하여 API 전체 데이터를 반환한다."""
if not API_KEY:
logger.warning("DATA_GO_KR_API_KEY 미설정 — API 호출 건너뜀")
return []
base_params = {
"serviceKey": API_KEY,
"perPage": 100,
"returnType": "JSON",
}
if params:
base_params.update(params)
url = f"{API_BASE}/{endpoint}"
all_data: List[Dict] = []
page = 1
while True:
base_params["page"] = page
try:
resp = requests.get(url, params=base_params, timeout=30)
resp.raise_for_status()
body = resp.json()
except requests.RequestException as e:
logger.error("API 호출 실패 [%s page=%d]: %s", endpoint, page, e)
break
except ValueError as e:
logger.error("JSON 파싱 실패 [%s page=%d]: %s", endpoint, page, e)
break
data = body.get("data", [])
total_count = body.get("totalCount", 0)
all_data.extend(data)
if len(all_data) >= total_count:
break
page += 1
logger.info("[%s] %d건 수집", endpoint, len(all_data))
return all_data
def _parse_apt_detail(raw: Dict[str, Any]) -> Dict[str, Any]:
"""API 응답 필드를 DB 스키마에 맞게 매핑한다."""
return {
"house_manage_no": raw.get("HOUSE_MANAGE_NO", ""),
"pblanc_no": raw.get("PBLANC_NO", ""),
"house_nm": raw.get("HOUSE_NM"),
"house_secd": raw.get("HOUSE_SECD"),
"house_dtl_secd": raw.get("HOUSE_DTL_SECD"),
"rent_secd": raw.get("RENT_SECD"),
"region_code": raw.get("SUBSCRPT_AREA_CODE"),
"region_name": raw.get("SUBSCRPT_AREA_CODE_NM"),
"address": raw.get("HSSPLY_ADRES"),
"total_units": raw.get("TOT_SUPLY_HSHLDCO"),
"rcrit_date": raw.get("RCRIT_PBLANC_DE"),
"receipt_start": raw.get("RCEPT_BGNDE") or raw.get("SUBSCRPT_RCEPT_BGNDE"),
"receipt_end": raw.get("RCEPT_ENDDE") or raw.get("SUBSCRPT_RCEPT_ENDDE"),
"spsply_start": raw.get("SPSPLY_RCEPT_BGNDE"),
"spsply_end": raw.get("SPSPLY_RCEPT_ENDDE"),
"gnrl_rank1_start": raw.get("GNRL_RNK1_CRSPAREA_RCPTDE") or raw.get("GNRL_RCEPT_BGNDE"),
"gnrl_rank1_end": raw.get("GNRL_RNK1_CRSPAREA_ENDDE") or raw.get("GNRL_RCEPT_ENDDE"),
"winner_date": raw.get("PRZWNER_PRESNATN_DE"),
"contract_start": raw.get("CNTRCT_CNCLS_BGNDE"),
"contract_end": raw.get("CNTRCT_CNCLS_ENDDE"),
"homepage_url": raw.get("HMPG_ADRES"),
"pblanc_url": raw.get("PBLANC_URL"),
"constructor": raw.get("CNSTRCT_ENTRPS_NM"),
"developer": raw.get("BSNS_MBY_NM"),
"move_in_month": raw.get("MVN_PREARNGE_YM"),
"is_speculative_area": raw.get("SPECLT_RDN_EARTH_AT"),
"is_price_cap": raw.get("PARCPRC_ULS_AT"),
"contact": raw.get("MDHS_TELNO"),
"source": "auto",
}
def _parse_top_amount(val: Any) -> int | None:
"""최고 금액 문자열에서 콤마를 제거하고 정수로 변환한다."""
if val is None:
return None
try:
return int(str(val).replace(",", ""))
except (ValueError, TypeError):
return None
def _parse_model(raw: Dict[str, Any]) -> Dict[str, Any]:
"""모델 API 응답 필드를 DB 스키마에 맞게 매핑한다."""
return {
"house_manage_no": raw.get("HOUSE_MANAGE_NO", ""),
"pblanc_no": raw.get("PBLANC_NO", ""),
"model_no": raw.get("MODEL_NO"),
"house_ty": raw.get("HOUSE_TY"),
"supply_area": float(raw["SUPLY_AR"]) if raw.get("SUPLY_AR") is not None else None,
"general_units": raw.get("SUPLY_HSHLDCO") or 0,
"special_units": raw.get("SPSPLY_HSHLDCO") or 0,
"multi_child_units": raw.get("MNYCH_HSHLDCO") or 0,
"newlywed_units": raw.get("NWWDS_HSHLDCO") or 0,
"first_life_units": raw.get("LFE_FRST_HSHLDCO") or 0,
"old_parent_units": raw.get("OLD_PARNTS_SUPORT_HSHLDCO") or 0,
"institution_units": raw.get("INSTT_RECOMEND_HSHLDCO") or 0,
"youth_units": raw.get("YGMN_HSHLDCO") or 0,
"newborn_units": raw.get("NWBB_HSHLDCO") or 0,
"top_amount": _parse_top_amount(raw.get("LTTOT_TOP_AMOUNT")),
}
def collect_all() -> Dict[str, Any]:
"""모든 엔드포인트를 순회하며 공고 + 모델 데이터를 수집·저장한다."""
if not API_KEY:
logger.warning("API 키 미설정 — 수집 중단")
save_collect_log(0, 0, "API 키 미설정")
return {"new_count": 0, "total_count": 0}
total_count = 0
new_count = 0
for detail_ep, model_ep in DETAIL_ENDPOINTS:
# 공고 상세 수집
detail_rows = _api_call(detail_ep)
for raw in detail_rows:
try:
parsed = _parse_apt_detail(raw)
_, is_new = upsert_announcement(parsed)
total_count += 1
if is_new:
new_count += 1
except Exception as e:
logger.error("공고 upsert 실패 [%s]: %s", detail_ep, e)
# 모델(평형) 수집
model_rows = _api_call(model_ep)
for raw in model_rows:
try:
parsed = _parse_model(raw)
upsert_model(parsed)
except Exception as e:
logger.error("모델 upsert 실패 [%s]: %s", model_ep, e)
save_collect_log(new_count, total_count)
logger.info("수집 완료: new=%d, total=%d", new_count, total_count)
return {"new_count": new_count, "total_count": total_count}

546
realestate-lab/app/db.py Normal file
View File

@@ -0,0 +1,546 @@
# realestate-lab/app/db.py
import json
import sqlite3
import logging
from typing import Dict, Any, List, Optional
from datetime import date
logger = logging.getLogger("realestate-lab")
DB_PATH = "/app/data/realestate.db"
def _conn():
c = sqlite3.connect(DB_PATH)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL;")
c.execute("PRAGMA foreign_keys=ON;")
return c
def init_db():
with _conn() as conn:
# ── announcements ────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS announcements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
house_manage_no TEXT NOT NULL,
pblanc_no TEXT NOT NULL,
house_nm TEXT,
house_secd TEXT,
house_dtl_secd TEXT,
rent_secd TEXT,
region_code TEXT,
region_name TEXT,
address TEXT,
total_units INTEGER,
rcrit_date TEXT,
receipt_start TEXT,
receipt_end TEXT,
spsply_start TEXT,
spsply_end TEXT,
gnrl_rank1_start TEXT,
gnrl_rank1_end TEXT,
winner_date TEXT,
contract_start TEXT,
contract_end TEXT,
homepage_url TEXT,
pblanc_url TEXT,
constructor TEXT,
developer TEXT,
move_in_month TEXT,
is_speculative_area TEXT,
is_price_cap TEXT,
contact TEXT,
status TEXT NOT NULL DEFAULT '청약예정',
source TEXT NOT NULL DEFAULT 'manual',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(house_manage_no, pblanc_no)
);
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_status ON announcements(status);")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ann_region ON announcements(region_name);")
# ── announcement_models ──────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS announcement_models (
id INTEGER PRIMARY KEY AUTOINCREMENT,
house_manage_no TEXT NOT NULL,
pblanc_no TEXT NOT NULL,
model_no TEXT,
house_ty TEXT,
supply_area REAL,
general_units INTEGER DEFAULT 0,
special_units INTEGER DEFAULT 0,
multi_child_units INTEGER DEFAULT 0,
newlywed_units INTEGER DEFAULT 0,
first_life_units INTEGER DEFAULT 0,
old_parent_units INTEGER DEFAULT 0,
institution_units INTEGER DEFAULT 0,
youth_units INTEGER DEFAULT 0,
newborn_units INTEGER DEFAULT 0,
top_amount INTEGER,
UNIQUE(house_manage_no, pblanc_no, model_no)
);
""")
# ── user_profile ─────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS user_profile (
id INTEGER PRIMARY KEY DEFAULT 1,
name TEXT,
age INTEGER,
is_homeless INTEGER,
is_householder INTEGER,
subscription_months INTEGER,
subscription_amount INTEGER,
family_members INTEGER,
has_dependents INTEGER,
children_count INTEGER DEFAULT 0,
is_newlywed INTEGER,
marriage_months INTEGER,
has_newborn INTEGER,
is_first_home INTEGER,
income_level TEXT,
preferred_regions TEXT NOT NULL DEFAULT '[]',
preferred_types TEXT NOT NULL DEFAULT '[]',
min_area REAL,
max_area REAL,
max_price INTEGER,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
""")
# ── match_results ────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS match_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
announcement_id INTEGER NOT NULL REFERENCES announcements(id) ON DELETE CASCADE,
model_id INTEGER,
match_score INTEGER NOT NULL DEFAULT 0,
match_reasons TEXT NOT NULL DEFAULT '[]',
eligible_types TEXT NOT NULL DEFAULT '[]',
is_new INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(announcement_id, model_id)
);
""")
# ── collect_log ──────────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS collect_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
new_count INTEGER NOT NULL DEFAULT 0,
total_count INTEGER NOT NULL DEFAULT 0,
error TEXT
);
""")
# ── 상태 자동 계산 ───────────────────────────────────────────────────────────
def compute_status(receipt_start: str, receipt_end: str, winner_date: str) -> str:
today = date.today().isoformat()
if receipt_start and today < receipt_start:
return "청약예정"
if receipt_start and receipt_end and receipt_start <= today <= receipt_end:
return "청약중"
if receipt_end and winner_date and receipt_end < today <= winner_date:
return "결과발표"
if winner_date and today > winner_date:
return "완료"
return "청약예정"
# ── announcements CRUD ───────────────────────────────────────────────────────
def _ann_row_to_dict(r) -> Dict[str, Any]:
return {c: r[c] for c in r.keys()}
def upsert_announcement(data: Dict[str, Any]) -> tuple:
"""공고 upsert — house_manage_no + pblanc_no 기준. Returns (dict, is_new: bool)."""
status = compute_status(
data.get("receipt_start", ""),
data.get("receipt_end", ""),
data.get("winner_date", ""),
)
with _conn() as conn:
exists = conn.execute(
"SELECT 1 FROM announcements WHERE house_manage_no = ? AND pblanc_no = ?",
(data["house_manage_no"], data["pblanc_no"]),
).fetchone()
is_new = exists is None
conn.execute("""
INSERT INTO announcements (
house_manage_no, pblanc_no, house_nm, house_secd, house_dtl_secd,
rent_secd, region_code, region_name, address, total_units,
rcrit_date, receipt_start, receipt_end, spsply_start, spsply_end,
gnrl_rank1_start, gnrl_rank1_end, winner_date, contract_start,
contract_end, homepage_url, pblanc_url, constructor, developer,
move_in_month, is_speculative_area, is_price_cap, contact,
status, source
) VALUES (
:house_manage_no, :pblanc_no, :house_nm, :house_secd, :house_dtl_secd,
:rent_secd, :region_code, :region_name, :address, :total_units,
:rcrit_date, :receipt_start, :receipt_end, :spsply_start, :spsply_end,
:gnrl_rank1_start, :gnrl_rank1_end, :winner_date, :contract_start,
:contract_end, :homepage_url, :pblanc_url, :constructor, :developer,
:move_in_month, :is_speculative_area, :is_price_cap, :contact,
:status, :source
)
ON CONFLICT(house_manage_no, pblanc_no) DO UPDATE SET
house_nm=excluded.house_nm,
house_secd=excluded.house_secd,
house_dtl_secd=excluded.house_dtl_secd,
rent_secd=excluded.rent_secd,
region_code=excluded.region_code,
region_name=excluded.region_name,
address=excluded.address,
total_units=excluded.total_units,
rcrit_date=excluded.rcrit_date,
receipt_start=excluded.receipt_start,
receipt_end=excluded.receipt_end,
spsply_start=excluded.spsply_start,
spsply_end=excluded.spsply_end,
gnrl_rank1_start=excluded.gnrl_rank1_start,
gnrl_rank1_end=excluded.gnrl_rank1_end,
winner_date=excluded.winner_date,
contract_start=excluded.contract_start,
contract_end=excluded.contract_end,
homepage_url=excluded.homepage_url,
pblanc_url=excluded.pblanc_url,
constructor=excluded.constructor,
developer=excluded.developer,
move_in_month=excluded.move_in_month,
is_speculative_area=excluded.is_speculative_area,
is_price_cap=excluded.is_price_cap,
contact=excluded.contact,
status=excluded.status,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
""", {**data, "status": status})
row = conn.execute(
"SELECT * FROM announcements WHERE house_manage_no = ? AND pblanc_no = ?",
(data["house_manage_no"], data["pblanc_no"]),
).fetchone()
return _ann_row_to_dict(row), is_new
def get_announcements(
region: str = None,
status: str = None,
house_type: str = None,
matched_only: bool = False,
sort: str = "date",
page: int = 1,
size: int = 20,
) -> Dict[str, Any]:
conditions, params = [], []
if region:
conditions.append("a.region_name = ?")
params.append(region)
if status:
conditions.append("a.status = ?")
params.append(status)
if house_type:
conditions.append("a.house_secd = ?")
params.append(house_type)
if matched_only:
conditions.append("a.id IN (SELECT announcement_id FROM match_results)")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
order_map = {"date": "a.rcrit_date DESC", "score": "a.id DESC", "price": "a.id ASC"}
order = order_map.get(sort, "a.rcrit_date DESC")
if matched_only and sort == "score":
order = "(SELECT MAX(match_score) FROM match_results WHERE announcement_id = a.id) DESC"
offset = (page - 1) * size
with _conn() as conn:
total = conn.execute(
f"SELECT COUNT(*) FROM announcements a {where}", params
).fetchone()[0]
rows = conn.execute(
f"SELECT a.* FROM announcements a {where} ORDER BY {order} LIMIT ? OFFSET ?",
params + [size, offset],
).fetchall()
return {
"items": [_ann_row_to_dict(r) for r in rows],
"total": total,
"page": page,
"size": size,
}
def get_announcement(ann_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM announcements WHERE id = ?", (ann_id,)).fetchone()
if not row:
return None
ann = _ann_row_to_dict(row)
models = conn.execute(
"SELECT * FROM announcement_models WHERE house_manage_no = ? AND pblanc_no = ?",
(ann["house_manage_no"], ann["pblanc_no"]),
).fetchall()
ann["models"] = [dict(m) for m in models]
return ann
def create_announcement(data: Dict[str, Any]) -> Dict[str, Any]:
"""수동 공고 등록 (house_manage_no 자동 생성)."""
import uuid
data["house_manage_no"] = data.get("house_manage_no", f"MANUAL-{uuid.uuid4().hex[:8]}")
data["pblanc_no"] = data.get("pblanc_no", "00")
data["source"] = "manual"
result, _ = upsert_announcement(data)
return result
ANNOUNCEMENT_COLUMNS = {
"house_nm", "house_secd", "house_dtl_secd", "rent_secd",
"region_code", "region_name", "address", "total_units",
"rcrit_date", "receipt_start", "receipt_end", "spsply_start", "spsply_end",
"gnrl_rank1_start", "gnrl_rank1_end", "winner_date",
"contract_start", "contract_end", "homepage_url", "pblanc_url",
"constructor", "developer", "move_in_month",
"is_speculative_area", "is_price_cap", "contact",
}
def update_announcement(ann_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
fields = {k: v for k, v in data.items() if v is not None and k in ANNOUNCEMENT_COLUMNS}
if not fields:
return get_announcement(ann_id)
# 날짜 변경 시 status 재계산
with _conn() as conn:
row = conn.execute("SELECT * FROM announcements WHERE id = ?", (ann_id,)).fetchone()
if not row:
return None
current = _ann_row_to_dict(row)
merged = {**current, **fields}
status = compute_status(
merged.get("receipt_start", ""),
merged.get("receipt_end", ""),
merged.get("winner_date", ""),
)
fields["status"] = status
set_clauses = ", ".join(f"{k} = ?" for k in fields)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
conn.execute(
f"UPDATE announcements SET {set_clauses} WHERE id = ?",
list(fields.values()) + [ann_id],
)
return get_announcement(ann_id)
def delete_announcement(ann_id: int) -> bool:
with _conn() as conn:
# match_results는 FK CASCADE로 자동 삭제
cur = conn.execute("DELETE FROM announcements WHERE id = ?", (ann_id,))
return cur.rowcount > 0
def update_all_statuses():
"""모든 진행중 공고의 status를 날짜 기반으로 재계산."""
with _conn() as conn:
rows = conn.execute(
"SELECT id, status, receipt_start, receipt_end, winner_date FROM announcements "
"WHERE status != '완료' AND (receipt_start IS NOT NULL OR receipt_end IS NOT NULL OR winner_date IS NOT NULL)"
).fetchall()
for r in rows:
new_status = compute_status(r["receipt_start"], r["receipt_end"], r["winner_date"])
if new_status != r["status"]: # only update if status actually changed
conn.execute(
"UPDATE announcements SET status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?",
(new_status, r["id"]),
)
# ── announcement_models CRUD ─────────────────────────────────────────────────
def upsert_model(data: Dict[str, Any]):
with _conn() as conn:
conn.execute("""
INSERT INTO announcement_models (
house_manage_no, pblanc_no, model_no, house_ty, supply_area,
general_units, special_units, multi_child_units, newlywed_units,
first_life_units, old_parent_units, institution_units,
youth_units, newborn_units, top_amount
) VALUES (
:house_manage_no, :pblanc_no, :model_no, :house_ty, :supply_area,
:general_units, :special_units, :multi_child_units, :newlywed_units,
:first_life_units, :old_parent_units, :institution_units,
:youth_units, :newborn_units, :top_amount
)
ON CONFLICT(house_manage_no, pblanc_no, model_no) DO UPDATE SET
house_ty=excluded.house_ty,
supply_area=excluded.supply_area,
general_units=excluded.general_units,
special_units=excluded.special_units,
multi_child_units=excluded.multi_child_units,
newlywed_units=excluded.newlywed_units,
first_life_units=excluded.first_life_units,
old_parent_units=excluded.old_parent_units,
institution_units=excluded.institution_units,
youth_units=excluded.youth_units,
newborn_units=excluded.newborn_units,
top_amount=excluded.top_amount
""", data)
# ── user_profile CRUD ────────────────────────────────────────────────────────
def _profile_row_to_dict(r) -> Dict[str, Any]:
d = {}
for c in r.keys():
val = r[c]
if c in ("is_homeless", "is_householder", "has_dependents", "is_newlywed",
"has_newborn", "is_first_home"):
d[c] = bool(val) if val is not None else None
elif c in ("preferred_regions", "preferred_types"):
d[c] = json.loads(val) if val else []
else:
d[c] = val
return d
def get_profile() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM user_profile WHERE id = 1").fetchone()
return _profile_row_to_dict(r) if r else None
PROFILE_COLUMNS = {
"name", "age", "is_homeless", "is_householder",
"subscription_months", "subscription_amount", "family_members",
"has_dependents", "children_count", "is_newlywed", "marriage_months",
"has_newborn", "is_first_home", "income_level",
"preferred_regions", "preferred_types",
"min_area", "max_area", "max_price",
}
def upsert_profile(data: Dict[str, Any]) -> Dict[str, Any]:
updates = {}
for k, v in data.items():
if v is None or k not in PROFILE_COLUMNS:
continue
if isinstance(v, bool):
updates[k] = 1 if v else 0
elif isinstance(v, list):
updates[k] = json.dumps(v)
else:
updates[k] = v
with _conn() as conn:
existing = conn.execute("SELECT id FROM user_profile WHERE id = 1").fetchone()
if existing:
if updates:
set_clauses = ", ".join(f"{k} = ?" for k in updates)
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
conn.execute(
f"UPDATE user_profile SET {set_clauses} WHERE id = 1",
list(updates.values()),
)
else:
cols = ["id"] + list(updates.keys())
vals = [1] + list(updates.values())
placeholders = ", ".join("?" for _ in vals)
conn.execute(
f"INSERT INTO user_profile ({', '.join(cols)}) VALUES ({placeholders})",
vals,
)
row = conn.execute("SELECT * FROM user_profile WHERE id = 1").fetchone()
return _profile_row_to_dict(row)
# ── match_results CRUD ───────────────────────────────────────────────────────
def save_match_result(data: Dict[str, Any]):
with _conn() as conn:
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (:announcement_id, :model_id, :match_score, :match_reasons, :eligible_types, 1)
ON CONFLICT(announcement_id, model_id) DO UPDATE SET
match_score=excluded.match_score,
match_reasons=excluded.match_reasons,
eligible_types=excluded.eligible_types
""", {
**data,
"match_reasons": json.dumps(data.get("match_reasons", [])),
"eligible_types": json.dumps(data.get("eligible_types", [])),
})
def get_matches(page: int = 1, size: int = 20) -> Dict[str, Any]:
offset = (page - 1) * size
with _conn() as conn:
total = conn.execute("SELECT COUNT(*) FROM match_results").fetchone()[0]
rows = conn.execute("""
SELECT m.*, a.house_nm, a.region_name, a.address, a.status as ann_status,
a.receipt_start, a.receipt_end, a.winner_date, a.pblanc_url
FROM match_results m
JOIN announcements a ON a.id = m.announcement_id
ORDER BY m.is_new DESC, m.match_score DESC
LIMIT ? OFFSET ?
""", (size, offset)).fetchall()
items = []
for r in rows:
d = {c: r[c] for c in r.keys()}
d["match_reasons"] = json.loads(d["match_reasons"]) if d["match_reasons"] else []
d["eligible_types"] = json.loads(d["eligible_types"]) if d["eligible_types"] else []
items.append(d)
return {"items": items, "total": total, "page": page, "size": size}
def mark_match_read(match_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("UPDATE match_results SET is_new = 0 WHERE id = ?", (match_id,))
return cur.rowcount > 0
# ── collect_log CRUD ─────────────────────────────────────────────────────────
def save_collect_log(new_count: int, total_count: int, error: str = None):
with _conn() as conn:
conn.execute(
"INSERT INTO collect_log (new_count, total_count, error) VALUES (?, ?, ?)",
(new_count, total_count, error),
)
def get_last_collect_log() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM collect_log ORDER BY id DESC LIMIT 1").fetchone()
return dict(r) if r else None
# ── 대시보드 ─────────────────────────────────────────────────────────────────
def get_dashboard() -> Dict[str, Any]:
with _conn() as conn:
active = conn.execute(
"SELECT COUNT(*) FROM announcements WHERE status IN ('청약예정', '청약중')"
).fetchone()[0]
new_matches = conn.execute(
"SELECT COUNT(*) FROM match_results WHERE is_new = 1"
).fetchone()[0]
upcoming = conn.execute("""
SELECT id, house_nm, receipt_start, receipt_end, status
FROM announcements
WHERE status IN ('청약예정', '청약중')
ORDER BY receipt_start ASC
LIMIT 5
""").fetchall()
return {
"active_count": active,
"new_match_count": new_matches,
"upcoming": [dict(r) for r in upcoming],
}

177
realestate-lab/app/main.py Normal file
View File

@@ -0,0 +1,177 @@
import os
import logging
import threading
from contextlib import asynccontextmanager
from fastapi import BackgroundTasks, FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from apscheduler.schedulers.background import BackgroundScheduler
from .db import (
init_db, get_announcements, get_announcement, create_announcement,
update_announcement, delete_announcement, update_all_statuses,
get_profile, upsert_profile, get_matches, mark_match_read,
get_last_collect_log, get_dashboard,
)
from .collector import collect_all
from .matcher import run_matching
from .models import AnnouncementCreate, AnnouncementUpdate, ProfileUpdate
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
logger = logging.getLogger("realestate-lab")
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
def scheduled_collect():
"""매일 09:00 — 수집 + 매칭"""
logger.info("스케줄 수집 시작")
collect_all()
run_matching()
logger.info("스케줄 수집 + 매칭 완료")
def scheduled_status_update():
"""매일 00:00 — 상태 갱신 + 재매칭"""
logger.info("상태 갱신 시작")
update_all_statuses()
run_matching()
logger.info("상태 갱신 + 재매칭 완료")
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
scheduler.add_job(scheduled_collect, "cron", hour=9, minute=0, id="collect")
scheduler.add_job(scheduled_status_update, "cron", hour=0, minute=0, id="status_update")
scheduler.start()
logger.info("realestate-lab 시작")
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
@app.get("/health")
def health():
return {"status": "ok"}
# ── 공고 API ─────────────────────────────────────────────────────────────────
@app.get("/api/realestate/announcements")
def api_announcements(
region: str = None,
status: str = None,
house_type: str = None,
matched_only: bool = False,
sort: str = "date",
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
):
return get_announcements(region, status, house_type, matched_only, sort, page, size)
@app.get("/api/realestate/announcements/{ann_id}")
def api_announcement_detail(ann_id: int):
ann = get_announcement(ann_id)
if not ann:
raise HTTPException(status_code=404, detail="Announcement not found")
return ann
@app.post("/api/realestate/announcements", status_code=201)
def api_announcement_create(body: AnnouncementCreate):
return create_announcement(body.model_dump())
@app.put("/api/realestate/announcements/{ann_id}")
def api_announcement_update(ann_id: int, body: AnnouncementUpdate):
updated = update_announcement(ann_id, body.model_dump(exclude_none=True))
if not updated:
raise HTTPException(status_code=404, detail="Announcement not found")
return updated
@app.delete("/api/realestate/announcements/{ann_id}")
def api_announcement_delete(ann_id: int):
if not delete_announcement(ann_id):
raise HTTPException(status_code=404, detail="Announcement not found")
return {"ok": True}
# ── 수집 API ─────────────────────────────────────────────────────────────────
_collect_lock = threading.Lock()
def _run_collect_and_match():
if not _collect_lock.acquire(blocking=False):
logger.info("수집 이미 진행 중 — 건너뜀")
return
try:
collect_all()
run_matching()
finally:
_collect_lock.release()
@app.post("/api/realestate/collect")
def api_collect(background_tasks: BackgroundTasks):
background_tasks.add_task(_run_collect_and_match)
return {"ok": True, "message": "수집 시작됨"}
@app.get("/api/realestate/collect/status")
def api_collect_status():
log = get_last_collect_log()
return log if log else {"collected_at": None, "new_count": 0, "total_count": 0, "error": None}
# ── 프로필 API ───────────────────────────────────────────────────────────────
@app.get("/api/realestate/profile")
def api_profile_get():
profile = get_profile()
return profile if profile else {}
@app.put("/api/realestate/profile")
def api_profile_update(body: ProfileUpdate):
return upsert_profile(body.model_dump(exclude_none=True))
# ── 매칭 API ─────────────────────────────────────────────────────────────────
@app.get("/api/realestate/matches")
def api_matches(page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100)):
return get_matches(page, size)
@app.post("/api/realestate/matches/refresh")
def api_matches_refresh():
run_matching()
return {"ok": True}
@app.patch("/api/realestate/matches/{match_id}/read")
def api_match_read(match_id: int):
if not mark_match_read(match_id):
raise HTTPException(status_code=404, detail="Match not found")
return {"ok": True}
# ── 대시보드 API ─────────────────────────────────────────────────────────────
@app.get("/api/realestate/dashboard")
def api_dashboard():
return get_dashboard()

View File

@@ -0,0 +1,160 @@
import json
import logging
from typing import Dict, Any, List
from .db import get_profile, _conn
logger = logging.getLogger("realestate-lab")
# house_secd → 주택유형 이름 매핑
_HOUSE_TYPE_MAP = {
"01": "APT",
"02": "오피스텔",
"04": "무순위",
"09": "민간사전청약",
"10": "신혼희망타운",
}
def _check_eligible_types(profile: Dict[str, Any], ann: Dict[str, Any]) -> List[str]:
"""프로필 기반으로 신청 가능한 공급유형 목록을 반환한다."""
eligible: List[str] = []
is_homeless = profile.get("is_homeless", False)
is_speculative = ann.get("is_speculative_area") == "Y"
required_months = 24 if is_speculative else 12
subscription_months = profile.get("subscription_months") or 0
# 일반공급
if is_homeless and profile.get("is_householder") and subscription_months >= required_months:
eligible.append("일반1순위")
elif is_homeless:
eligible.append("일반2순위")
# 특별공급 — 신혼부부
# NOTE: 소득기준 검증은 향후 구현 예정 (income_level 필드 활용)
if profile.get("is_newlywed") and is_homeless:
eligible.append("특별-신혼부부")
if profile.get("is_first_home") and is_homeless:
eligible.append("특별-생애최초")
children_count = profile.get("children_count") or 0
if children_count >= 2 and is_homeless:
eligible.append("특별-다자녀")
if profile.get("has_dependents") and is_homeless:
eligible.append("특별-노부모부양")
age = profile.get("age") or 0
if 19 <= age <= 39 and is_homeless:
eligible.append("특별-청년")
if profile.get("has_newborn") and is_homeless:
eligible.append("특별-신생아")
return eligible
def _compute_score(
profile: Dict[str, Any],
ann: Dict[str, Any],
models: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""매칭 점수(0-100)와 사유를 계산한다."""
score = 0
reasons: List[str] = []
# 1. 지역 (30점)
preferred_regions = profile.get("preferred_regions") or []
region_name = ann.get("region_name") or ""
if region_name and any(r in region_name for r in preferred_regions):
score += 30
reasons.append(f"선호 지역 일치: {region_name}")
# 2. 주택유형 (10점)
preferred_types = profile.get("preferred_types") or []
house_secd = ann.get("house_secd") or ""
type_name = _HOUSE_TYPE_MAP.get(house_secd, house_secd)
if type_name and type_name in preferred_types:
score += 10
reasons.append(f"선호 유형 일치: {type_name}")
# 3. 면적 (15점)
min_area = profile.get("min_area")
max_area = profile.get("max_area")
if min_area is not None and max_area is not None and models:
for m in models:
supply_area = m.get("supply_area")
if supply_area is not None and min_area <= supply_area <= max_area:
score += 15
reasons.append(f"희망 면적 범위 내 모델 존재 ({supply_area}㎡)")
break
# 4. 가격 (15점)
max_price = profile.get("max_price")
if max_price is not None and models:
for m in models:
top_amount = m.get("top_amount")
if top_amount is not None and top_amount <= max_price:
score += 15
reasons.append(f"예산 범위 내 모델 존재 (최고가 {top_amount:,}만원)")
break
# 5. 자격 (30점)
eligible_types = _check_eligible_types(profile, ann)
eligibility_score = min(len(eligible_types) * 10, 30)
if eligibility_score > 0:
score += eligibility_score
reasons.append(f"자격 유형 {len(eligible_types)}개: {', '.join(eligible_types)}")
return {
"match_score": score,
"match_reasons": reasons,
"eligible_types": eligible_types,
}
def run_matching():
"""프로필 기반 매칭을 실행하여 결과를 저장한다."""
profile = get_profile()
if not profile:
logger.info("프로필 미설정 — 매칭 건너뜀")
return
with _conn() as conn:
anns = conn.execute(
"SELECT * FROM announcements WHERE status IN ('청약예정', '청약중')"
).fetchall()
for ann_row in anns:
ann = {c: ann_row[c] for c in ann_row.keys()}
models = conn.execute(
"SELECT * FROM announcement_models WHERE house_manage_no = ? AND pblanc_no = ?",
(ann["house_manage_no"], ann["pblanc_no"]),
).fetchall()
model_list = [dict(m) for m in models]
result = _compute_score(profile, ann, model_list)
if result["match_score"] > 0:
conn.execute("""
INSERT INTO match_results (announcement_id, model_id, match_score, match_reasons, eligible_types, is_new)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(announcement_id, model_id) DO UPDATE SET
match_score=excluded.match_score,
match_reasons=excluded.match_reasons,
eligible_types=excluded.eligible_types
""", (
ann["id"],
None,
result["match_score"],
json.dumps(result["match_reasons"]),
json.dumps(result["eligible_types"]),
))
# Clean up stale match results for completed announcements
conn.execute(
"DELETE FROM match_results WHERE announcement_id NOT IN "
"(SELECT id FROM announcements WHERE status IN ('청약예정', '청약중'))"
)
logger.info("매칭 완료")

View File

@@ -0,0 +1,82 @@
from typing import Optional, List
from pydantic import BaseModel
class AnnouncementCreate(BaseModel):
house_nm: str
house_secd: str = "01"
house_dtl_secd: Optional[str] = None
rent_secd: Optional[str] = None
region_code: Optional[str] = None
region_name: Optional[str] = None
address: Optional[str] = None
total_units: Optional[int] = None
rcrit_date: Optional[str] = None
receipt_start: Optional[str] = None
receipt_end: Optional[str] = None
spsply_start: Optional[str] = None
spsply_end: Optional[str] = None
gnrl_rank1_start: Optional[str] = None
gnrl_rank1_end: Optional[str] = None
winner_date: Optional[str] = None
contract_start: Optional[str] = None
contract_end: Optional[str] = None
homepage_url: Optional[str] = None
pblanc_url: Optional[str] = None
constructor: Optional[str] = None
developer: Optional[str] = None
move_in_month: Optional[str] = None
is_speculative_area: Optional[str] = None
is_price_cap: Optional[str] = None
contact: Optional[str] = None
class AnnouncementUpdate(BaseModel):
house_nm: Optional[str] = None
house_secd: Optional[str] = None
house_dtl_secd: Optional[str] = None
rent_secd: Optional[str] = None
region_code: Optional[str] = None
region_name: Optional[str] = None
address: Optional[str] = None
total_units: Optional[int] = None
rcrit_date: Optional[str] = None
receipt_start: Optional[str] = None
receipt_end: Optional[str] = None
spsply_start: Optional[str] = None
spsply_end: Optional[str] = None
gnrl_rank1_start: Optional[str] = None
gnrl_rank1_end: Optional[str] = None
winner_date: Optional[str] = None
contract_start: Optional[str] = None
contract_end: Optional[str] = None
homepage_url: Optional[str] = None
pblanc_url: Optional[str] = None
constructor: Optional[str] = None
developer: Optional[str] = None
move_in_month: Optional[str] = None
is_speculative_area: Optional[str] = None
is_price_cap: Optional[str] = None
contact: Optional[str] = None
class ProfileUpdate(BaseModel):
name: Optional[str] = None
age: Optional[int] = None
is_homeless: Optional[bool] = None
is_householder: Optional[bool] = None
subscription_months: Optional[int] = None
subscription_amount: Optional[int] = None
family_members: Optional[int] = None
has_dependents: Optional[bool] = None
children_count: Optional[int] = None
is_newlywed: Optional[bool] = None
marriage_months: Optional[int] = None
has_newborn: Optional[bool] = None
is_first_home: Optional[bool] = None
income_level: Optional[str] = None
preferred_regions: Optional[List[str]] = None
preferred_types: Optional[List[str]] = None
min_area: Optional[float] = None
max_area: Optional[float] = None
max_price: Optional[int] = None

View File

@@ -0,0 +1,5 @@
requests==2.32.3
fastapi==0.115.6
uvicorn[standard]==0.30.6
apscheduler==3.10.4
pydantic>=2.0

View File

@@ -32,7 +32,7 @@ cd "$SRC"
# backend, travel-proxy, deployer, nginx, scripts, docker-compose.yml, .env 등
RSYNC_EXCLUDES="--exclude .git --exclude __pycache__ --exclude *.pyc --exclude data/"
for dir in backend travel-proxy deployer stock-lab music-lab blog-lab nginx scripts; do
for dir in backend travel-proxy deployer stock-lab music-lab blog-lab realestate-lab nginx scripts; do
rsync -a --delete $RSYNC_EXCLUDES \
"$SRC/$dir/" "$DST/$dir/"
done