feat(_shared): access_log 공용 모듈 추가 (ring buffer + middleware + /logs/recent)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 02:31:30 +09:00
parent dfd3b1bb17
commit f461f05ac0
4 changed files with 242 additions and 0 deletions

1
_shared/__init__.py Normal file
View File

@@ -0,0 +1 @@
# empty

112
_shared/access_log.py Normal file
View File

@@ -0,0 +1,112 @@
"""각 lab 컨테이너에서 import 하는 공용 액세스/이벤트 로그 모듈.
사용법:
from _shared.access_log import install as install_access_log
install_access_log(app)
"""
from collections import deque
from datetime import datetime
from typing import Optional
import logging
import time
from fastapi import APIRouter, Request
from fastapi.applications import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
# 컨테이너당 최근 500개를 in-memory 로 유지. 재시작 시 휘발.
_BUFFER: deque = deque(maxlen=500)
EXCLUDED_PATHS = {
"/health", "/healthz", "/ping", "/favicon.ico",
"/docs", "/redoc", "/openapi.json", "/logs/recent",
}
EXCLUDED_PREFIXES = ("/static/",)
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}
def _should_log(request: Request) -> bool:
if request.method in EXCLUDED_METHODS:
return False
path = request.url.path
if path in EXCLUDED_PATHS:
return False
if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
return False
return True
class AccessLogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.time()
response = await call_next(request)
if not _should_log(request):
return response
elapsed_ms = int((time.time() - start) * 1000)
status = response.status_code
if status < 400:
level = "info"
elif status < 500:
level = "warning"
else:
level = "error"
_BUFFER.append({
"ts": datetime.utcnow().isoformat() + "Z",
"level": level,
"source": "access",
"method": request.method,
"path": request.url.path,
"status": status,
"ms": elapsed_ms,
"message": f"{request.method} {request.url.path}{status} ({elapsed_ms}ms)",
})
return response
class BufferLogHandler(logging.Handler):
"""root logger 에 부착하면 모든 logger.info/warning/error 가 buffer 에 흐름."""
def emit(self, record: logging.LogRecord) -> None:
try:
_BUFFER.append({
"ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
"level": record.levelname.lower(),
"source": "log",
"logger": record.name,
"message": record.getMessage(),
})
except Exception:
# buffer 에 못 넣는다고 서비스가 죽으면 안 됨
pass
router = APIRouter()
@router.get("/logs/recent")
def logs_recent(limit: int = 200, since: Optional[str] = None,
path_prefix: Optional[str] = None):
items = list(_BUFFER)
if since:
items = [x for x in items if x["ts"] > since]
if path_prefix:
items = [
x for x in items
if x["source"] == "log"
or x.get("path", "").startswith(path_prefix)
]
return {"logs": items[-limit:]}
def install(app: FastAPI, logger_root: str = "") -> None:
"""서비스 main.py 에서 호출하는 단일 설치 함수.
- AccessLogMiddleware 등록
- /logs/recent 라우터 등록
- root logger 에 BufferLogHandler 부착 (모든 child logger 자동 전파)
"""
app.add_middleware(AccessLogMiddleware)
app.include_router(router)
root = logging.getLogger(logger_root)
if not any(isinstance(h, BufferLogHandler) for h in root.handlers):
root.addHandler(BufferLogHandler())

View File

View File

@@ -0,0 +1,129 @@
import logging
import time
from fastapi import FastAPI
from fastapi.testclient import TestClient
from _shared.access_log import (
AccessLogMiddleware,
BufferLogHandler,
router as logs_router,
install,
_BUFFER,
)
def _reset_buffer():
_BUFFER.clear()
def test_access_middleware_records_request():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
items = [x for x in _BUFFER if x["source"] == "access"]
assert len(items) == 1
assert items[0]["method"] == "GET"
assert items[0]["path"] == "/api/lotto/recommend"
assert items[0]["status"] == 200
assert items[0]["ms"] >= 0
def test_access_middleware_skips_health():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/health")
def health():
return {"ok": True}
client = TestClient(app)
client.get("/health")
items = [x for x in _BUFFER if x["source"] == "access"]
assert items == []
def test_access_middleware_skips_options():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.options("/api/lotto/recommend")
items = [x for x in _BUFFER if x["source"] == "access"]
assert items == []
def test_buffer_log_handler_captures_logger_info():
_reset_buffer()
root = logging.getLogger("")
handler = BufferLogHandler()
root.addHandler(handler)
try:
lg = logging.getLogger("lotto.test")
lg.setLevel(logging.INFO)
lg.info("뉴스 스크래핑 완료: 국내 12건")
finally:
root.removeHandler(handler)
items = [x for x in _BUFFER if x["source"] == "log"]
assert len(items) == 1
assert items[0]["message"] == "뉴스 스크래핑 완료: 국내 12건"
assert items[0]["level"] == "info"
assert items[0]["logger"] == "lotto.test"
def test_logs_recent_endpoint_returns_recent_items():
_reset_buffer()
app = FastAPI()
install(app)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
client.get("/api/lotto/recommend")
client.get("/health") # 제외되어야 함
resp = client.get("/logs/recent")
assert resp.status_code == 200
logs = resp.json()["logs"]
access_items = [x for x in logs if x["source"] == "access"]
assert len(access_items) == 2
def test_logs_recent_with_since_filter():
_reset_buffer()
app = FastAPI()
install(app)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
time.sleep(0.01)
cursor_resp = client.get("/logs/recent")
cursor_ts = cursor_resp.json()["logs"][-1]["ts"]
client.get("/api/lotto/recommend")
resp = client.get(f"/logs/recent?since={cursor_ts}")
items = [x for x in resp.json()["logs"] if x["source"] == "access"]
assert len(items) == 1