feat: auto-sync full lotto history on stats api access

This commit is contained in:
2026-01-26 00:45:32 +09:00
parent bd43c99221
commit 597353e6d4
3 changed files with 74 additions and 15 deletions

View File

@@ -1,7 +1,7 @@
import requests import requests
from typing import Dict, Any from typing import Dict, Any
from .db import get_draw, upsert_draw from .db import get_draw, upsert_draw, upsert_many_draws, get_latest_draw, count_draws
def _normalize_item(item: dict) -> dict: def _normalize_item(item: dict) -> dict:
# smok95 all.json / latest.json 구조 # smok95 all.json / latest.json 구조
@@ -27,20 +27,13 @@ def sync_all_from_json(all_url: str) -> Dict[str, Any]:
r.raise_for_status() r.raise_for_status()
data = r.json() # list[dict] data = r.json() # list[dict]
inserted = 0 # 정규화
skipped = 0 rows = [_normalize_item(item) for item in data]
for item in data: # Bulk Insert (성능 향상)
row = _normalize_item(item) upsert_many_draws(rows)
if get_draw(row["drw_no"]): return {"mode": "all_json", "url": all_url, "total": len(rows)}
skipped += 1
continue
upsert_draw(row)
inserted += 1
return {"mode": "all_json", "url": all_url, "inserted": inserted, "skipped": skipped, "total": len(data)}
def sync_latest(latest_url: str) -> Dict[str, Any]: def sync_latest(latest_url: str) -> Dict[str, Any]:
r = requests.get(latest_url, timeout=30) r = requests.get(latest_url, timeout=30)
@@ -53,3 +46,40 @@ def sync_latest(latest_url: str) -> Dict[str, Any]:
return {"mode": "latest_json", "url": latest_url, "was_new": (before is None), "drawNo": row["drw_no"]} return {"mode": "latest_json", "url": latest_url, "was_new": (before is None), "drawNo": row["drw_no"]}
def sync_ensure_all(latest_url: str, all_url: str) -> Dict[str, Any]:
"""
1회부터 최신 회차까지 빠짐없이 있는지 확인하고, 없으면 전체 동기화 수행.
반환값: {"synced": bool, "reason": str, ...}
"""
# 1. 원격 최신 회차 확인
try:
r = requests.get(latest_url, timeout=10)
r.raise_for_status()
remote_item = r.json()
remote_no = int(remote_item["draw_no"])
except Exception as e:
# 외부 통신 실패 시, 그냥 로컬 데이터로 진행하도록 에러 억제 (혹은 에러 발생)
# 여기서는 통계 기능 작동이 우선이므로 로그만 남기고 pass하고 싶지만,
# 확실한 동기화를 위해 에러를 던지거나 False 리턴
return {"synced": False, "error": str(e)}
# 2. 로컬 상태 확인
local_latest_row = get_latest_draw()
local_no = local_latest_row["drw_no"] if local_latest_row else 0
local_cnt = count_draws()
# 3. 동기화 필요 여부 판단
# - 전체 개수가 최신 회차 번호보다 적으면 중간에 빈 것 (1회부터 시작한다고 가정)
# - 혹은 DB 최신 번호가 원격보다 낮으면 업데이트 필요
need_sync = (local_no < remote_no) or (local_cnt < local_no)
if not need_sync:
return {"synced": True, "updated": False, "local_no": local_no}
# 4. 전체 동기화 실행
# (단순 latest sync로는 중간 구멍을 못 채우므로, 구멍이 있거나 차이가 크면 all_sync 수행)
# 만약 차이가 1회차 뿐이고 구멍이 없다면 sync_latest만 해도 되지만,
# 로직 단순화를 위해 missing 감지 시 그냥 all_sync (Bulk Insert라 빠름)
res = sync_all_from_json(all_url)
return {"synced": True, "updated": True, "detail": res}

View File

@@ -88,6 +88,30 @@ def upsert_draw(row: Dict[str, Any]) -> None:
), ),
) )
def upsert_many_draws(rows: List[Dict[str, Any]]) -> None:
data = [
(
int(r["drw_no"]), str(r["drw_date"]),
int(r["n1"]), int(r["n2"]), int(r["n3"]),
int(r["n4"]), int(r["n5"]), int(r["n6"]),
int(r["bonus"])
) for r in rows
]
with _conn() as conn:
conn.executemany(
"""
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(drw_no) DO UPDATE SET
drw_date=excluded.drw_date,
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
bonus=excluded.bonus,
updated_at=datetime('now')
""",
data
)
def get_latest_draw() -> Optional[Dict[str, Any]]: def get_latest_draw() -> Optional[Dict[str, Any]]:
with _conn() as conn: with _conn() as conn:
r = conn.execute("SELECT * FROM draws ORDER BY drw_no DESC LIMIT 1").fetchone() r = conn.execute("SELECT * FROM draws ORDER BY drw_no DESC LIMIT 1").fetchone()

View File

@@ -10,7 +10,8 @@ from .db import (
update_recommendation, update_recommendation,
) )
from .recommender import recommend_numbers from .recommender import recommend_numbers
from .collector import sync_latest from .recommender import recommend_numbers
from .collector import sync_latest, sync_ensure_all
app = FastAPI() app = FastAPI()
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul")) scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
@@ -118,6 +119,10 @@ def admin_sync_latest():
@app.get("/api/lotto/stats") @app.get("/api/lotto/stats")
def api_stats(): def api_stats():
# 1. 데이터 완전성 보장 (없으면 가져옴)
sync_ensure_all(LATEST_URL, ALL_URL)
# 2. 전체 데이터 조회
draws = get_all_draw_numbers() draws = get_all_draw_numbers()
if not draws: if not draws:
raise HTTPException(status_code=404, detail="No data yet") raise HTTPException(status_code=404, detail="No data yet")