import os import sqlite3 from typing import Any, Dict, List, Optional, Set DB_PATH = os.getenv("TRAVEL_DB_PATH", "/data/thumbs/travel.db") def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH, timeout=120.0) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=120000") return conn def init_db() -> None: with _conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS photos ( id INTEGER PRIMARY KEY AUTOINCREMENT, album TEXT NOT NULL, filename TEXT NOT NULL, mtime REAL NOT NULL, has_thumb INTEGER DEFAULT 0, indexed_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), UNIQUE(album, filename) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_photos_album ON photos(album)") conn.execute(""" CREATE TABLE IF NOT EXISTS album_covers ( album TEXT PRIMARY KEY, filename TEXT NOT NULL, updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) def get_photos_by_region(albums: List[str], page: int, size: int) -> Dict[str, Any]: """region에 속한 앨범들의 사진을 페이지네이션하여 반환.""" if not albums: return {"items": [], "total": 0, "has_next": False, "matched_albums": []} placeholders = ",".join("?" for _ in albums) with _conn() as conn: rows = conn.execute( f"SELECT album, COUNT(*) as cnt FROM photos WHERE album IN ({placeholders}) GROUP BY album", albums, ).fetchall() matched_albums = [{"album": r["album"], "count": r["cnt"]} for r in rows] total = sum(r["cnt"] for r in rows) offset = (page - 1) * size items = conn.execute( f"""SELECT album, filename, mtime FROM photos WHERE album IN ({placeholders}) ORDER BY album, filename LIMIT ? OFFSET ?""", [*albums, size, offset], ).fetchall() return { "items": [dict(r) for r in items], "total": total, "has_next": (offset + size) < total, "matched_albums": matched_albums, } def get_all_albums() -> List[Dict[str, Any]]: """전체 앨범 목록 + 사진 수 + 커버 정보.""" with _conn() as conn: rows = conn.execute(""" SELECT p.album, COUNT(*) as count, COALESCE(c.filename, MIN(p.filename)) as cover_filename FROM photos p LEFT JOIN album_covers c ON p.album = c.album GROUP BY p.album ORDER BY p.album """).fetchall() return [dict(r) for r in rows] def set_album_cover(album: str, filename: str) -> bool: """앨범 커버 지정. 해당 photo가 존재하면 True, 없으면 False.""" with _conn() as conn: exists = conn.execute( "SELECT 1 FROM photos WHERE album = ? AND filename = ?", (album, filename), ).fetchone() if not exists: return False conn.execute( """INSERT INTO album_covers (album, filename, updated_at) VALUES (?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now')) ON CONFLICT(album) DO UPDATE SET filename = excluded.filename, updated_at = excluded.updated_at""", (album, filename), ) return True def get_album_cover(album: str) -> Optional[str]: """앨범 커버 파일명 반환. 미지정 시 None.""" with _conn() as conn: row = conn.execute( "SELECT filename FROM album_covers WHERE album = ?", (album,), ).fetchone() return row["filename"] if row else None def upsert_photo(album: str, filename: str, mtime: float) -> str: """사진 upsert. 반환: 'added' | 'updated' | 'unchanged'.""" with _conn() as conn: existing = conn.execute( "SELECT mtime, has_thumb FROM photos WHERE album = ? AND filename = ?", (album, filename), ).fetchone() if not existing: conn.execute( "INSERT INTO photos (album, filename, mtime, has_thumb) VALUES (?, ?, ?, 0)", (album, filename, mtime), ) return "added" elif existing["mtime"] != mtime: conn.execute( "UPDATE photos SET mtime = ?, has_thumb = 0 WHERE album = ? AND filename = ?", (mtime, album, filename), ) return "updated" return "unchanged" def remove_missing_photos(album: str, existing_filenames: Set[str]) -> int: """폴더에 없는 사진을 DB에서 제거. 제거 수 반환.""" with _conn() as conn: db_rows = conn.execute( "SELECT filename FROM photos WHERE album = ?", (album,) ).fetchall() db_filenames = {r["filename"] for r in db_rows} to_remove = db_filenames - existing_filenames if to_remove: placeholders = ",".join("?" for _ in to_remove) conn.execute( f"DELETE FROM photos WHERE album = ? AND filename IN ({placeholders})", [album, *to_remove], ) conn.execute( f"DELETE FROM album_covers WHERE album = ? AND filename IN ({placeholders})", [album, *to_remove], ) return len(to_remove) def get_photos_without_thumb() -> List[Dict[str, str]]: """썸네일 미생성 사진 목록.""" with _conn() as conn: rows = conn.execute( "SELECT album, filename FROM photos WHERE has_thumb = 0" ).fetchall() return [dict(r) for r in rows] def mark_thumb_done(album: str, filename: str) -> None: """썸네일 생성 완료 표시.""" with _conn() as conn: conn.execute( "UPDATE photos SET has_thumb = 1 WHERE album = ? AND filename = ?", (album, filename), ) def batch_sync_album(album: str, items: List[Dict[str, Any]], existing_filenames: Set[str]) -> Dict[str, int]: """앨범 단위 배치 동기화. 단일 커넥션으로 upsert + 삭제 처리.""" added = updated = 0 with _conn() as conn: for item in items: existing = conn.execute( "SELECT mtime FROM photos WHERE album = ? AND filename = ?", (album, item["filename"]), ).fetchone() if not existing: conn.execute( "INSERT INTO photos (album, filename, mtime, has_thumb) VALUES (?, ?, ?, 0)", (album, item["filename"], item["mtime"]), ) added += 1 elif existing["mtime"] != item["mtime"]: conn.execute( "UPDATE photos SET mtime = ?, has_thumb = 0 WHERE album = ? AND filename = ?", (item["mtime"], album, item["filename"]), ) updated += 1 # 삭제 처리 db_rows = conn.execute( "SELECT filename FROM photos WHERE album = ?", (album,) ).fetchall() db_filenames = {r["filename"] for r in db_rows} to_remove = db_filenames - existing_filenames removed = len(to_remove) if to_remove: placeholders = ",".join("?" for _ in to_remove) conn.execute( f"DELETE FROM photos WHERE album = ? AND filename IN ({placeholders})", [album, *to_remove], ) conn.execute( f"DELETE FROM album_covers WHERE album = ? AND filename IN ({placeholders})", [album, *to_remove], ) return {"added": added, "updated": updated, "removed": removed} def batch_mark_thumbs_done(items: List[Dict[str, str]]) -> None: """썸네일 생성 완료 배치 표시.""" if not items: return with _conn() as conn: for item in items: conn.execute( "UPDATE photos SET has_thumb = 1 WHERE album = ? AND filename = ?", (item["album"], item["filename"]), )