import os import json import logging from pathlib import Path from typing import Any, List from fastapi import FastAPI, HTTPException, Query from fastapi.responses import FileResponse from pydantic import BaseModel from PIL import Image from .db import init_db, get_photos_by_region, get_all_albums, set_album_cover, mark_thumb_done from .indexer import sync, _load_region_map_merged logger = logging.getLogger(__name__) app = FastAPI() # ----------------------------- # Env / Paths # ----------------------------- ROOT = Path(os.getenv("TRAVEL_ROOT", "/data/travel")).resolve() MEDIA_BASE = os.getenv("TRAVEL_MEDIA_BASE", "/media/travel") META_DIR = ROOT / "_meta" REGION_MAP_PATH = META_DIR / "region_map.json" REGIONS_GEOJSON_PATH = META_DIR / "regions.geojson" THUMB_ROOT = Path(os.getenv("TRAVEL_THUMB_ROOT", "/data/thumbs")).resolve() THUMB_SIZE = (480, 480) THUMB_ROOT.mkdir(parents=True, exist_ok=True) IMAGE_EXT = {".jpg", ".jpeg", ".png", ".webp"} # ----------------------------- # DB init # ----------------------------- init_db() # ----------------------------- # Helpers # ----------------------------- def _read_json(path: Path) -> Any: if not path.exists(): raise HTTPException(500, f"Missing required file: {path}") with open(path, "r", encoding="utf-8") as f: return json.load(f) def load_region_map() -> dict: return _load_region_map_merged(REGION_MAP_PATH, THUMB_ROOT) def load_regions_geojson() -> dict: return _read_json(REGIONS_GEOJSON_PATH) def _get_albums_for_region(region: str, region_map: dict) -> List[str]: if region not in region_map: raise HTTPException(400, "Unknown region") v = region_map[region] if isinstance(v, list): return v if isinstance(v, dict) and isinstance(v.get("albums"), list): return v["albums"] raise HTTPException(500, "Invalid region_map format") def _ensure_thumb_fallback(src: Path, album: str) -> Path: """온디맨드 썸네일 폴백 (sync 누락 분 대응).""" out = THUMB_ROOT / album / src.name if out.exists(): return out out.parent.mkdir(parents=True, exist_ok=True) tmp = out.with_name(out.stem + ".tmp" + out.suffix) try: with Image.open(src) as im: im.thumbnail(THUMB_SIZE) ext = out.suffix.lower() if ext in (".jpg", ".jpeg"): fmt = "JPEG" elif ext == ".png": fmt = "PNG" elif ext == ".webp": fmt = "WEBP" else: fmt = (im.format or "").upper() or "JPEG" im.save(tmp, format=fmt, quality=85, optimize=True) tmp.replace(out) mark_thumb_done(album, src.name) return out finally: try: if tmp.exists(): tmp.unlink() except Exception: pass # ----------------------------- # Models # ----------------------------- class CoverRequest(BaseModel): filename: str # ----------------------------- # Routes # ----------------------------- @app.get("/health") def health(): return {"status": "healthy", "service": "travel-proxy"} @app.get("/api/travel/regions") def regions(): geojson = load_regions_geojson() # 미분류 지역이 region_map에 있으면 GeoJSON에도 동적 추가 region_map = load_region_map() existing_ids = { f.get("properties", {}).get("id") for f in geojson.get("features", []) } if "미분류" in region_map and "미분류" not in existing_ids: geojson.setdefault("features", []).append({ "type": "Feature", "properties": {"id": "미분류", "name": "미분류"}, "geometry": None, }) return geojson @app.get("/api/travel/photos") def photos( region: str = Query(...), page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), ): region_map = load_region_map() albums = _get_albums_for_region(region, region_map) result = get_photos_by_region(albums, page, size) # URL 조합 (DB에는 경로를 저장하지 않음) items = [] for row in result["items"]: items.append({ "album": row["album"], "file": row["filename"], "url": f"{MEDIA_BASE}/{row['album']}/{row['filename']}", "thumb": f"{MEDIA_BASE}/.thumb/{row['album']}/{row['filename']}", "mtime": row["mtime"], }) return { "region": region, "page": page, "size": size, "total": result["total"], "has_next": result["has_next"], "items": items, "matched_albums": result["matched_albums"], } @app.post("/api/travel/sync") def sync_endpoint(): result = sync( travel_root=ROOT, thumb_root=THUMB_ROOT, region_map_path=REGION_MAP_PATH, ) return result @app.get("/api/travel/albums") def albums_list(): rows = get_all_albums() result = [] for r in rows: cover = r["cover_filename"] result.append({ "album": r["album"], "count": r["count"], "cover_url": f"{MEDIA_BASE}/{r['album']}/{cover}", "cover_thumb": f"{MEDIA_BASE}/.thumb/{r['album']}/{cover}", }) return result @app.put("/api/travel/albums/{album}/cover") def set_cover(album: str, body: CoverRequest): ok = set_album_cover(album, body.filename) if not ok: raise HTTPException(404, f"Photo not found: {album}/{body.filename}") return { "album": album, "filename": body.filename, "cover_url": f"{MEDIA_BASE}/{album}/{body.filename}", "cover_thumb": f"{MEDIA_BASE}/.thumb/{album}/{body.filename}", } @app.get("/media/travel/.thumb/{album}/{filename}") def get_thumb(album: str, filename: str): if ".." in album or ".." in filename: raise HTTPException(400, "Invalid path") src = (ROOT / album / filename).resolve() if not str(src).startswith(str(ROOT)): raise HTTPException(403, "Access denied") if not src.exists() or not src.is_file(): raise HTTPException(404, "Source not found") p = _ensure_thumb_fallback(src, album) if not p.exists() or not p.is_file(): raise HTTPException(404, "Thumbnail not found") return FileResponse(str(p)) @app.get("/api/version") def version(): return {"version": os.getenv("APP_VERSION", "dev")}