- main.py에 basicConfig(level=INFO) 추가 — 기존엔 기본 WARNING이라 info 로그 무시됨 - indexer: 앨범별 변경사항 로그, 썸네일 100개 단위 진행률 로그 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
237 lines
8.0 KiB
Python
237 lines
8.0 KiB
Python
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Set
|
|
|
|
from PIL import Image
|
|
|
|
from . import db
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
IMAGE_EXT = {".jpg", ".jpeg", ".png", ".webp"}
|
|
THUMB_SIZE = (480, 480)
|
|
|
|
|
|
def _scan_folder(folder: Path) -> List[Dict[str, Any]]:
|
|
"""폴더 내 이미지 파일 목록 수집 (os.scandir)."""
|
|
if not folder.exists():
|
|
return []
|
|
items = []
|
|
with os.scandir(folder) as entries:
|
|
for entry in entries:
|
|
if entry.is_file() and Path(entry.name).suffix.lower() in IMAGE_EXT:
|
|
try:
|
|
mtime = entry.stat().st_mtime
|
|
except OSError as e:
|
|
logger.warning("Cannot stat %s: %s", entry.path, e)
|
|
continue
|
|
items.append({
|
|
"filename": entry.name,
|
|
"mtime": mtime,
|
|
})
|
|
return items
|
|
|
|
|
|
def _generate_thumb(src: Path, dest: Path) -> bool:
|
|
"""원본에서 480x480 썸네일 생성. 성공 시 True."""
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = dest.with_name(dest.stem + ".tmp" + dest.suffix)
|
|
try:
|
|
with Image.open(src) as im:
|
|
im.thumbnail(THUMB_SIZE)
|
|
ext = dest.suffix.lower()
|
|
if ext in (".jpg", ".jpeg"):
|
|
fmt = "JPEG"
|
|
elif ext == ".png":
|
|
fmt = "PNG"
|
|
elif ext == ".webp":
|
|
fmt = "WEBP"
|
|
else:
|
|
fmt = (im.format or "").upper() or "JPEG"
|
|
im.save(tmp, format=fmt, quality=85, optimize=True)
|
|
tmp.replace(dest)
|
|
return True
|
|
except Exception as e:
|
|
logger.warning("Thumb generation failed: %s → %s", src, e)
|
|
try:
|
|
if tmp.exists():
|
|
tmp.unlink()
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _load_region_map_merged(region_map_path: Path, thumb_root: Path) -> dict:
|
|
"""
|
|
원본 region_map.json + RW 오버라이드 파일 병합.
|
|
원본은 RO 마운트이므로 신규 폴더는 thumb_root/region_map_extra.json에 저장.
|
|
"""
|
|
try:
|
|
with open(region_map_path, "r", encoding="utf-8") as f:
|
|
region_map = json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
logger.error("Failed to load region_map: %s", e)
|
|
raise
|
|
|
|
# 오버라이드 파일 병합
|
|
extra_path = thumb_root / "region_map_extra.json"
|
|
if extra_path.exists():
|
|
try:
|
|
with open(extra_path, "r", encoding="utf-8") as f:
|
|
extra = json.load(f)
|
|
for k, v in extra.items():
|
|
if k in region_map:
|
|
# 기존 지역에 앨범 추가 (중복 방지)
|
|
existing = region_map[k]
|
|
if isinstance(existing, list) and isinstance(v, list):
|
|
merged = list(dict.fromkeys(existing + v))
|
|
region_map[k] = merged
|
|
else:
|
|
region_map[k] = v
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.warning("Failed to load region_map_extra: %s", e)
|
|
|
|
return region_map
|
|
|
|
|
|
def _save_extra_albums(thumb_root: Path, new_folders: List[str]) -> None:
|
|
"""신규 발견 폴더를 RW 영역의 오버라이드 파일에 저장."""
|
|
extra_path = thumb_root / "region_map_extra.json"
|
|
extra = {}
|
|
if extra_path.exists():
|
|
try:
|
|
with open(extra_path, "r", encoding="utf-8") as f:
|
|
extra = json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
extra = {}
|
|
|
|
uncategorized = extra.get("미분류", [])
|
|
existing = set(uncategorized)
|
|
added = [f for f in new_folders if f not in existing]
|
|
if added:
|
|
uncategorized.extend(added)
|
|
extra["미분류"] = uncategorized
|
|
try:
|
|
with open(extra_path, "w", encoding="utf-8") as f:
|
|
json.dump(extra, f, ensure_ascii=False, indent=2)
|
|
logger.info("Saved %d new folders to region_map_extra.json → 미분류", len(added))
|
|
except OSError as e:
|
|
logger.warning("Cannot write region_map_extra: %s", e)
|
|
|
|
|
|
def sync(
|
|
travel_root: Path,
|
|
thumb_root: Path,
|
|
region_map_path: Path,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
폴더 스캔 → DB 동기화 + 썸네일 일괄 생성.
|
|
|
|
Returns:
|
|
{"added": int, "removed": int, "thumbs_generated": int,
|
|
"discovered": int, "duration_sec": float}
|
|
"""
|
|
start = time.time()
|
|
|
|
# 1. region_map.json + 오버라이드 병합
|
|
region_map = _load_region_map_merged(region_map_path, thumb_root)
|
|
|
|
all_albums: Set[str] = set()
|
|
for v in region_map.values():
|
|
if isinstance(v, list):
|
|
all_albums.update(v)
|
|
elif isinstance(v, dict) and isinstance(v.get("albums"), list):
|
|
all_albums.update(v["albums"])
|
|
|
|
# 1-b. 파일시스템에서 신규 폴더 자동 탐색
|
|
discovered = 0
|
|
new_folders = []
|
|
try:
|
|
with os.scandir(travel_root) as entries:
|
|
for entry in entries:
|
|
if not entry.is_dir() or entry.name.startswith("_"):
|
|
continue
|
|
if entry.name in all_albums:
|
|
continue
|
|
# 이미지가 하나라도 있는 폴더만 추가
|
|
folder = travel_root / entry.name
|
|
has_images = any(
|
|
Path(f.name).suffix.lower() in IMAGE_EXT
|
|
for f in os.scandir(folder)
|
|
if f.is_file()
|
|
)
|
|
if has_images:
|
|
all_albums.add(entry.name)
|
|
new_folders.append(entry.name)
|
|
discovered += 1
|
|
logger.info("Discovered new album folder: %s", entry.name)
|
|
except OSError as e:
|
|
logger.warning("Failed to scan travel_root for new folders: %s", e)
|
|
|
|
# 신규 폴더를 RW 오버라이드 파일에 영구 저장
|
|
if new_folders:
|
|
_save_extra_albums(thumb_root, new_folders)
|
|
|
|
# 2. 각 앨범 폴더 스캔 → DB 배치 동기화
|
|
added = 0
|
|
updated = 0
|
|
removed = 0
|
|
sorted_albums = sorted(all_albums)
|
|
total_albums = len(sorted_albums)
|
|
|
|
logger.info("Syncing %d albums...", total_albums)
|
|
for i, album in enumerate(sorted_albums, 1):
|
|
folder = travel_root / album
|
|
items = _scan_folder(folder)
|
|
existing_filenames = {item["filename"] for item in items}
|
|
|
|
result = db.batch_sync_album(album, items, existing_filenames)
|
|
added += result["added"]
|
|
updated += result["updated"]
|
|
removed += result["removed"]
|
|
|
|
if result["added"] or result["removed"]:
|
|
logger.info(
|
|
"[%d/%d] %s: +%d/-%d (%d files)",
|
|
i, total_albums, album, result["added"], result["removed"], len(items),
|
|
)
|
|
|
|
logger.info("DB sync done: added=%d updated=%d removed=%d", added, updated, removed)
|
|
|
|
# 3. 썸네일 미생성 분 일괄 생성
|
|
no_thumb = db.get_photos_without_thumb()
|
|
thumbs_generated = 0
|
|
thumb_done_batch = []
|
|
|
|
if no_thumb:
|
|
logger.info("Generating %d thumbnails...", len(no_thumb))
|
|
|
|
for idx, photo in enumerate(no_thumb, 1):
|
|
src = travel_root / photo["album"] / photo["filename"]
|
|
dest = thumb_root / photo["album"] / photo["filename"]
|
|
if _generate_thumb(src, dest):
|
|
thumb_done_batch.append(photo)
|
|
thumbs_generated += 1
|
|
if idx % 100 == 0:
|
|
logger.info("Thumbnails: %d/%d done", idx, len(no_thumb))
|
|
|
|
db.batch_mark_thumbs_done(thumb_done_batch)
|
|
|
|
duration = round(time.time() - start, 2)
|
|
logger.info(
|
|
"Sync complete: added=%d updated=%d removed=%d thumbs=%d discovered=%d duration=%.2fs",
|
|
added, updated, removed, thumbs_generated, discovered, duration,
|
|
)
|
|
|
|
return {
|
|
"added": added,
|
|
"updated": updated,
|
|
"removed": removed,
|
|
"thumbs_generated": thumbs_generated,
|
|
"discovered": discovered,
|
|
"duration_sec": duration,
|
|
}
|