Files
web-page-backend/insta-lab/app/main.py
gahusb 20514193e8 perf(infra): NAS CPU 중기 2건 + 1건 보류 (CHECK_POINT 🟡)
#6 insta-lab Chromium Browser Pool — Playwright/Chromium 인스턴스를
모듈 레벨에서 보관하고 매 슬레이트마다 reuse. 카드 10장 렌더의
launch 비용 (~3초/회)이 사라짐. startup/shutdown lifecycle hook 추가.
crashed/disconnected 시 lazy 재초기화.

#8 realestate-lab 수집 병렬화 — collect_all과 delete_old_completed가
서로 다른 데이터 영역이라 ThreadPoolExecutor(2)로 병렬. asyncio.gather
대신 thread executor를 쓴 이유는 BackgroundScheduler+동기 함수 환경
에서 자연스럽고 추가 의존성 없기 때문. 매칭은 일관성 유지로 순차.

#7 stock async — 보류. 재진단 결과 stock은 BackgroundScheduler 사용
중이라 main loop 블로킹 없음. fetch 4회는 network I/O wait가
대부분이라 to_thread도 의미 없음. 진짜 효과를 보려면 AsyncIOScheduler
전환 + aiohttp 병렬이라 큰 리팩토링. 박재오 판단 대기.

CHECK_POINT.md 갱신.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 10:42:43 +09:00

314 lines
11 KiB
Python

"""FastAPI entrypoint for insta-lab."""
import asyncio
import json
import logging
import os
from typing import Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks, Body, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
from .config import (
CORS_ALLOW_ORIGINS, NAVER_CLIENT_ID, ANTHROPIC_API_KEY,
INSTA_DATA_PATH, DB_PATH, DEFAULT_CATEGORY_SEEDS, KEYWORDS_PER_CATEGORY,
INSTA_DEFAULT_THEME,
)
from . import db, news_collector, keyword_extractor, card_writer, card_renderer, trend_collector
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in CORS_ALLOW_ORIGINS.split(",")],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
async def on_startup():
os.makedirs(INSTA_DATA_PATH, exist_ok=True)
db.init_db()
# Chromium browser pool 초기화 (CHECK_POINT 중기-6)
await card_renderer.init_browser()
@app.on_event("shutdown")
async def on_shutdown():
await card_renderer.shutdown_browser()
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/insta/status")
def status():
return {
"ok": True,
"naver_api": bool(NAVER_CLIENT_ID),
"anthropic_api": bool(ANTHROPIC_API_KEY),
}
# ── News ─────────────────────────────────────────────────────────
class CollectRequest(BaseModel):
categories: Optional[list[str]] = None
def _seeds_for(category: str) -> list[str]:
pt = db.get_prompt_template("category_seeds")
if pt and pt.get("template"):
try:
data = json.loads(pt["template"])
if category in data:
return list(data[category])
except Exception:
pass
return list(DEFAULT_CATEGORY_SEEDS.get(category, []))
async def _bg_collect(task_id: str, categories: list[str]):
try:
db.update_task(task_id, "processing", 10, "수집 중")
total = 0
for cat in categories:
seeds = _seeds_for(cat)
if not seeds:
continue
total += news_collector.collect_for_category(cat, seeds)
db.update_task(task_id, "succeeded", 100, f"{total}건 수집", result_id=total)
except Exception as e:
logger.exception("collect failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/news/collect")
def collect_news(req: CollectRequest, bg: BackgroundTasks):
cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys())
tid = db.create_task("news_collect", {"categories": cats})
bg.add_task(_bg_collect, tid, cats)
return {"task_id": tid, "categories": cats}
@app.get("/api/insta/news/articles")
def list_articles(category: Optional[str] = None, days: int = Query(7, ge=1, le=90)):
return {"items": db.list_news_articles(category=category, days=days)}
# ── Keywords ─────────────────────────────────────────────────────
class ExtractRequest(BaseModel):
categories: Optional[list[str]] = None
async def _bg_extract(task_id: str, categories: Optional[list[str]] = None):
try:
db.update_task(task_id, "processing", 10, "추출 중")
prefs_rows = db.get_preferences()
weights = {p["category"]: p["weight"] for p in prefs_rows}
if categories:
# 사용자가 카테고리 명시한 경우만 그 서브셋으로 균등 가중치 (override)
weights = {c: 1.0 for c in categories}
total = KEYWORDS_PER_CATEGORY * max(1, len([w for w in weights.values() if w > 0]))
keyword_extractor.extract_with_weights(weights, total_limit=total)
db.update_task(task_id, "succeeded", 100, "완료", result_id=0)
except Exception as e:
logger.exception("extract failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/keywords/extract")
def extract_keywords(req: ExtractRequest, bg: BackgroundTasks):
cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys())
tid = db.create_task("keyword_extract", {"categories": cats})
bg.add_task(_bg_extract, tid, cats)
return {"task_id": tid, "categories": cats}
@app.get("/api/insta/keywords")
def list_keywords(
category: Optional[str] = None,
used: Optional[bool] = None,
source: Optional[str] = None,
):
if source:
return {"items": db.list_trends(source=source, category=category, days=30)}
return {"items": db.list_trending_keywords(category=category, used=used)}
# ── Slates ───────────────────────────────────────────────────────
class SlateRequest(BaseModel):
keyword: str
category: str
keyword_id: Optional[int] = None
async def _bg_create_slate(task_id: str, keyword: str, category: str, keyword_id: Optional[int]):
try:
db.update_task(task_id, "processing", 30, "카피 생성 중")
sid = card_writer.write_slate(keyword=keyword, category=category)
db.update_task(task_id, "processing", 70, "카드 렌더 중")
await card_renderer.render_slate(sid, template=f"{INSTA_DEFAULT_THEME}/card.html.j2")
db.update_slate_status(sid, "rendered")
if keyword_id:
db.mark_keyword_used(keyword_id)
db.update_task(task_id, "succeeded", 100, "완료", result_id=sid)
except Exception as e:
logger.exception("create slate failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/slates")
def create_slate(req: SlateRequest, bg: BackgroundTasks):
tid = db.create_task("slate_create", req.dict())
bg.add_task(_bg_create_slate, tid, req.keyword, req.category, req.keyword_id)
return {"task_id": tid}
@app.get("/api/insta/slates")
def list_slates(limit: int = Query(50, ge=1, le=500)):
return {"items": db.list_card_slates(limit=limit)}
@app.get("/api/insta/slates/{slate_id}")
def get_slate(slate_id: int):
s = db.get_card_slate(slate_id)
if not s:
raise HTTPException(404, "slate not found")
s["assets"] = db.list_card_assets(slate_id)
for k in ("cover_copy", "body_copies", "cta_copy", "hashtags"):
if isinstance(s.get(k), str):
try:
s[k] = json.loads(s[k])
except Exception:
pass
return s
async def _bg_render(task_id: str, slate_id: int):
try:
db.update_task(task_id, "processing", 30, "재렌더 중")
await card_renderer.render_slate(slate_id, template=f"{INSTA_DEFAULT_THEME}/card.html.j2")
db.update_slate_status(slate_id, "rendered")
db.update_task(task_id, "succeeded", 100, "완료", result_id=slate_id)
except Exception as e:
logger.exception("render failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/slates/{slate_id}/render")
def render_slate_endpoint(slate_id: int, bg: BackgroundTasks):
if not db.get_card_slate(slate_id):
raise HTTPException(404, "slate not found")
tid = db.create_task("slate_render", {"slate_id": slate_id})
bg.add_task(_bg_render, tid, slate_id)
return {"task_id": tid}
@app.get("/api/insta/slates/{slate_id}/assets/{page}")
def get_asset(slate_id: int, page: int):
if not (1 <= page <= 10):
raise HTTPException(400, "page must be 1..10")
assets = db.list_card_assets(slate_id)
match = next((a for a in assets if a["page_index"] == page), None)
if not match:
raise HTTPException(404, "asset not found")
return FileResponse(match["file_path"], media_type="image/png")
@app.delete("/api/insta/slates/{slate_id}")
def delete_slate(slate_id: int):
if not db.get_card_slate(slate_id):
raise HTTPException(404)
for a in db.list_card_assets(slate_id):
try:
os.unlink(a["file_path"])
except OSError:
pass
db.delete_card_slate(slate_id)
return {"ok": True}
# ── Tasks ────────────────────────────────────────────────────────
@app.get("/api/insta/tasks/{task_id}")
def get_task_status(task_id: str):
t = db.get_task(task_id)
if not t:
raise HTTPException(404)
return t
# ── Prompt Templates ─────────────────────────────────────────────
class TemplateBody(BaseModel):
template: str
description: str = ""
@app.get("/api/insta/templates/prompts/{name}")
def get_prompt(name: str):
pt = db.get_prompt_template(name)
if not pt:
raise HTTPException(404)
return pt
@app.put("/api/insta/templates/prompts/{name}")
def upsert_prompt(name: str, body: TemplateBody):
db.upsert_prompt_template(name, body.template, body.description)
return db.get_prompt_template(name)
# ── Trends ───────────────────────────────────────────────────────
class TrendsCollectRequest(BaseModel):
categories: Optional[list[str]] = None
async def _bg_collect_trends(task_id: str, categories: list[str]):
try:
db.update_task(task_id, "processing", 10, "외부 트렌드 수집 중")
result = trend_collector.collect_all(categories)
msg = f"naver:{result['naver_popular']}, youtube:{result['youtube_trending']}"
db.update_task(task_id, "succeeded", 100, msg, result_id=sum(result.values()))
except Exception as e:
logger.exception("trends collect failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/trends/collect")
def collect_trends(req: TrendsCollectRequest, bg: BackgroundTasks):
cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys())
tid = db.create_task("trends_collect", {"categories": cats})
bg.add_task(_bg_collect_trends, tid, cats)
return {"task_id": tid, "categories": cats}
@app.get("/api/insta/trends")
def list_trends_endpoint(
source: Optional[str] = None,
category: Optional[str] = None,
days: int = Query(1, ge=1, le=90),
):
return {"items": db.list_trends(source=source, category=category, days=days)}
# ── Preferences ──────────────────────────────────────────────────
class PreferencesBody(BaseModel):
categories: dict[str, float]
@app.get("/api/insta/preferences")
def get_preferences_endpoint():
return {"categories": db.get_preferences()}
@app.put("/api/insta/preferences")
def put_preferences_endpoint(body: PreferencesBody):
db.upsert_preferences(body.categories)
return {"categories": db.get_preferences()}