11 Commits

Author SHA1 Message Date
4224333219 refactor(agent-office/base): transition의 State 자동 로그 제거
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:47:41 +09:00
5613497367 feat(agent-office): /agents/{id}/logs 엔드포인트가 service /logs/recent 와 merge
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:46:44 +09:00
b25abea80a feat(agent-office/db): get_logs에서 State: 자동 로그 제외 + delete_old_logs(90일) 2026-05-28 02:45:10 +09:00
ed30790f22 feat(agent-office): fetch_service_logs 추가 (path_prefix 정규식 필터)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:42:23 +09:00
1d723764b4 feat(agent-office): AGENT_CONTAINER_MAP 상수 추가 (Phase 1 lotto) 2026-05-28 02:40:18 +09:00
c0c4422c7c build(lotto): PYTHONPATH=_shared + json-file logging 추가 (Phase 1 PoC) 2026-05-28 02:38:47 +09:00
fe4d3912a5 feat(lotto): _shared/access_log install (Phase 1 PoC) 2026-05-28 02:37:19 +09:00
f461f05ac0 feat(_shared): access_log 공용 모듈 추가 (ring buffer + middleware + /logs/recent)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:31:30 +09:00
dfd3b1bb17 docs(agent-office): docker logs 통합 타임라인 구현 계획
21개 task로 분해된 3-phase 실행 계획. Phase 1 (PoC, lotto 단일),
Phase 2 (4개 서비스 확장 + cleanup 스케줄러), Phase 3 (비즈니스 이벤트
보강). TDD 기반 task 구성, 모든 step 에 실제 코드/명령 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 01:44:43 +09:00
809eec9b15 docs(agent-office): docker logs 통합 타임라인 설계 spec
agent-office LogTab에 각 서비스의 docker 로그(액세스 + 비즈니스 이벤트)를
통합 타임라인으로 노출하는 아키텍처를 정의. 5개 서비스 공용 _shared/access_log
모듈, ring buffer 기반 /logs/recent 엔드포인트, agent-office 측 merge 로직,
3단계 phase 분리 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 01:23:21 +09:00
512ed59dcd fix(nginx): /api/tarot/ proxy_read/send_timeout 300s→600s
3-card Claude 해석 응답이 truncation reroll 발생 시 90초+ 걸려 DSM Reverse
Proxy 또는 docker nginx에서 504 가능성. 600s 안전 마진 (saju/music과 동일
수준).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 13:19:24 +09:00
17 changed files with 2453 additions and 9 deletions

1
_shared/__init__.py Normal file
View File

@@ -0,0 +1 @@
# empty

112
_shared/access_log.py Normal file
View File

@@ -0,0 +1,112 @@
"""각 lab 컨테이너에서 import 하는 공용 액세스/이벤트 로그 모듈.
사용법:
from _shared.access_log import install as install_access_log
install_access_log(app)
"""
from collections import deque
from datetime import datetime
from typing import Optional
import logging
import time
from fastapi import APIRouter, Request
from fastapi.applications import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
# 컨테이너당 최근 500개를 in-memory 로 유지. 재시작 시 휘발.
_BUFFER: deque = deque(maxlen=500)
EXCLUDED_PATHS = {
"/health", "/healthz", "/ping", "/favicon.ico",
"/docs", "/redoc", "/openapi.json", "/logs/recent",
}
EXCLUDED_PREFIXES = ("/static/",)
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}
def _should_log(request: Request) -> bool:
if request.method in EXCLUDED_METHODS:
return False
path = request.url.path
if path in EXCLUDED_PATHS:
return False
if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
return False
return True
class AccessLogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.time()
response = await call_next(request)
if not _should_log(request):
return response
elapsed_ms = int((time.time() - start) * 1000)
status = response.status_code
if status < 400:
level = "info"
elif status < 500:
level = "warning"
else:
level = "error"
_BUFFER.append({
"ts": datetime.utcnow().isoformat() + "Z",
"level": level,
"source": "access",
"method": request.method,
"path": request.url.path,
"status": status,
"ms": elapsed_ms,
"message": f"{request.method} {request.url.path}{status} ({elapsed_ms}ms)",
})
return response
class BufferLogHandler(logging.Handler):
"""root logger 에 부착하면 모든 logger.info/warning/error 가 buffer 에 흐름."""
def emit(self, record: logging.LogRecord) -> None:
try:
_BUFFER.append({
"ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
"level": record.levelname.lower(),
"source": "log",
"logger": record.name,
"message": record.getMessage(),
})
except Exception:
# buffer 에 못 넣는다고 서비스가 죽으면 안 됨
pass
router = APIRouter()
@router.get("/logs/recent")
def logs_recent(limit: int = 200, since: Optional[str] = None,
path_prefix: Optional[str] = None):
items = list(_BUFFER)
if since:
items = [x for x in items if x["ts"] > since]
if path_prefix:
items = [
x for x in items
if x["source"] == "log"
or x.get("path", "").startswith(path_prefix)
]
return {"logs": items[-limit:]}
def install(app: FastAPI, logger_root: str = "") -> None:
"""서비스 main.py 에서 호출하는 단일 설치 함수.
- AccessLogMiddleware 등록
- /logs/recent 라우터 등록
- root logger 에 BufferLogHandler 부착 (모든 child logger 자동 전파)
"""
app.add_middleware(AccessLogMiddleware)
app.include_router(router)
root = logging.getLogger(logger_root)
if not any(isinstance(h, BufferLogHandler) for h in root.handlers):
root.addHandler(BufferLogHandler())

View File

View File

@@ -0,0 +1,129 @@
import logging
import time
from fastapi import FastAPI
from fastapi.testclient import TestClient
from _shared.access_log import (
AccessLogMiddleware,
BufferLogHandler,
router as logs_router,
install,
_BUFFER,
)
def _reset_buffer():
_BUFFER.clear()
def test_access_middleware_records_request():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
items = [x for x in _BUFFER if x["source"] == "access"]
assert len(items) == 1
assert items[0]["method"] == "GET"
assert items[0]["path"] == "/api/lotto/recommend"
assert items[0]["status"] == 200
assert items[0]["ms"] >= 0
def test_access_middleware_skips_health():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/health")
def health():
return {"ok": True}
client = TestClient(app)
client.get("/health")
items = [x for x in _BUFFER if x["source"] == "access"]
assert items == []
def test_access_middleware_skips_options():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.options("/api/lotto/recommend")
items = [x for x in _BUFFER if x["source"] == "access"]
assert items == []
def test_buffer_log_handler_captures_logger_info():
_reset_buffer()
root = logging.getLogger("")
handler = BufferLogHandler()
root.addHandler(handler)
try:
lg = logging.getLogger("lotto.test")
lg.setLevel(logging.INFO)
lg.info("뉴스 스크래핑 완료: 국내 12건")
finally:
root.removeHandler(handler)
items = [x for x in _BUFFER if x["source"] == "log"]
assert len(items) == 1
assert items[0]["message"] == "뉴스 스크래핑 완료: 국내 12건"
assert items[0]["level"] == "info"
assert items[0]["logger"] == "lotto.test"
def test_logs_recent_endpoint_returns_recent_items():
_reset_buffer()
app = FastAPI()
install(app)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
client.get("/api/lotto/recommend")
client.get("/health") # 제외되어야 함
resp = client.get("/logs/recent")
assert resp.status_code == 200
logs = resp.json()["logs"]
access_items = [x for x in logs if x["source"] == "access"]
assert len(access_items) == 2
def test_logs_recent_with_since_filter():
_reset_buffer()
app = FastAPI()
install(app)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
time.sleep(0.01)
cursor_resp = client.get("/logs/recent")
cursor_ts = cursor_resp.json()["logs"][-1]["ts"]
client.get("/api/lotto/recommend")
resp = client.get(f"/logs/recent?since={cursor_ts}")
items = [x for x in resp.json()["logs"] if x["source"] == "access"]
assert len(items) == 1

View File

@@ -1,8 +1,6 @@
import time
from typing import Optional
from ..db import add_log
VALID_STATES = ("idle", "working", "waiting", "reporting")
class BaseAgent:
@@ -29,8 +27,6 @@ class BaseAgent:
if new_state == "idle":
self._idle_since = time.time()
add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})")
if self._ws_manager:
await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id)
if new_state == "working" and old != "working":

View File

@@ -38,3 +38,17 @@ LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9"))
LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25"))
LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6"))
LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3"))
import re as _re
# 에이전트 → (container_host, port, path_prefix_regex)
# path_prefix_regex: lotto 컨테이너에 personal/blog/todo 도 같이 있어
# /api/lotto 만 골라내기 위한 정규식. business log (source='log') 는 모두 통과.
AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = {
"lotto": ("lotto", 8000, _re.compile(r"^/api/lotto")),
# Phase 2 에서 추가:
# "stock": ("stock", 8000, _re.compile(r"^/api/(stock|trade|portfolio)")),
# "music": ("music-lab", 8000, _re.compile(r"^/api/music")),
# "insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")),
# "realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),
}

View File

@@ -321,7 +321,13 @@ def add_log(agent_id: str, message: str, level: str = "info", task_id: str = Non
def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?",
"""
SELECT * FROM agent_logs
WHERE agent_id = ?
AND message NOT LIKE 'State: %'
ORDER BY created_at DESC
LIMIT ?
""",
(agent_id, limit),
).fetchall()
return [
@@ -332,6 +338,7 @@ def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
"level": r["level"],
"message": r["message"],
"created_at": r["created_at"],
"source": "agent",
}
for r in rows
]
@@ -588,6 +595,20 @@ def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
return {"items": items, "total": total}
import datetime as _dt
def delete_old_logs(days: int = 90) -> int:
"""retention 정책: N일 이전 agent_logs 삭제. 매일 03:00 스케줄러가 호출."""
cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=days)).isoformat()
with _conn() as conn:
c = conn.execute(
"DELETE FROM agent_logs WHERE created_at < ?",
(cutoff,),
)
return c.rowcount
# ── youtube_research_jobs CRUD ────────────────────────────────────────────────
def add_youtube_research_job(countries: list) -> int:

View File

@@ -116,8 +116,18 @@ def agent_tasks(
return {"tasks": tasks_list, "items": tasks_list}
@app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50):
return {"logs": get_logs(agent_id, limit)}
async def agent_logs(agent_id: str, limit: int = 50):
from .service_proxy import fetch_service_logs
agent_items = get_logs(agent_id, limit=limit)
service_items = await fetch_service_logs(agent_id, limit=limit)
def _sort_key(x):
# agent_logs: created_at, service: ts
return x.get("ts") or x.get("created_at") or ""
merged = sorted(agent_items + service_items, key=_sort_key, reverse=True)
return {"logs": merged[:limit]}
@app.get("/api/agent-office/tasks/pending")
def pending_tasks():

View File

@@ -1,8 +1,11 @@
import httpx
import logging
from typing import Any, Dict, List, Optional
from .config import STOCK_URL, MUSIC_LAB_URL, INSTA_LAB_URL, REALESTATE_LAB_URL
logger = logging.getLogger(__name__)
_client = httpx.AsyncClient(timeout=30.0)
async def fetch_stock_news(limit: int = 10, category: str = None) -> List[Dict[str, Any]]:
@@ -394,3 +397,38 @@ async def lotto_evolver_evaluate() -> Dict[str, Any]:
resp = await client.post(f"{LOTTO_BACKEND_URL}/api/lotto/evolver/evaluate-now")
resp.raise_for_status()
return resp.json()
from .config import AGENT_CONTAINER_MAP
async def fetch_service_logs(
agent_id: str,
since: Optional[str] = None,
limit: int = 200,
) -> List[Dict[str, Any]]:
"""해당 에이전트가 가리키는 컨테이너의 /logs/recent 를 호출해서
path_prefix 정규식으로 필터한 결과를 반환.
네트워크 실패 시 빈 리스트를 반환하고 warning 만 남김 (LogTab 이 죽지 않게).
"""
mapping = AGENT_CONTAINER_MAP.get(agent_id)
if not mapping:
return []
host, port, path_re = mapping
url = f"http://{host}:{port}/logs/recent"
params: Dict[str, Any] = {"limit": limit}
if since:
params["since"] = since
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(url, params=params)
data = resp.json().get("logs", [])
except Exception as e:
logger.warning("fetch_service_logs(%s) 실패: %s", agent_id, e)
return []
return [
x for x in data
if x.get("source") == "log"
or path_re.match(x.get("path", "") or "")
]

View File

@@ -93,6 +93,41 @@ def test_telegram_state():
print(" [PASS] test_telegram_state")
def test_get_logs_excludes_state_messages():
init_db()
add_log("stock", "State: idle -> working (큐레이션 시작)")
add_log("stock", "뉴스 12건 스크랩 완료")
add_log("stock", "State: working -> idle ()")
logs = get_logs("stock", limit=10)
messages = [x["message"] for x in logs]
assert "뉴스 12건 스크랩 완료" in messages
assert not any(m.startswith("State: ") for m in messages)
def test_delete_old_logs_removes_beyond_retention():
import datetime as _dt
from app.db import delete_old_logs, _conn
init_db()
add_log("stock", "오래된 로그")
# 강제로 200일 전으로 옮김
cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=200)).isoformat()
with _conn() as conn:
conn.execute(
"UPDATE agent_logs SET created_at = ? WHERE message = '오래된 로그'",
(cutoff,),
)
add_log("stock", "최근 로그")
deleted = delete_old_logs(days=90)
assert deleted >= 1
msgs = [x["message"] for x in get_logs("stock", limit=20)]
assert "최근 로그" in msgs
assert "오래된 로그" not in msgs
if __name__ == "__main__":
test_init_and_seed()
test_agent_config_update()

View File

@@ -0,0 +1,47 @@
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from app.main import app
from app.db import add_log, _conn
@pytest.fixture(autouse=True)
def _clean_logs():
with _conn() as conn:
conn.execute("DELETE FROM agent_logs WHERE agent_id = 'lotto'")
yield
@respx.mock
def test_agent_logs_endpoint_merges_db_and_service_logs():
add_log("lotto", "큐레이션 완료: #1234 conf=0.78")
respx.get("http://lotto:8000/logs/recent").mock(
return_value=httpx.Response(200, json={
"logs": [
{"ts": "2026-05-28T10:00:00Z", "source": "access",
"method": "GET", "path": "/api/lotto/latest",
"status": 200, "ms": 8,
"message": "GET /api/lotto/latest → 200 (8ms)"},
{"ts": "2026-05-28T10:00:02Z", "source": "log",
"logger": "lotto", "level": "info",
"message": "성과 통계 캐시 갱신"},
]
})
)
client = TestClient(app)
resp = client.get("/api/agent-office/agents/lotto/logs?limit=20")
assert resp.status_code == 200
logs = resp.json()["logs"]
sources = {x["source"] for x in logs}
assert "agent" in sources
assert "access" in sources
assert "log" in sources
messages = [x["message"] for x in logs]
assert any("큐레이션 완료" in m for m in messages)
assert any("성과 통계 캐시 갱신" in m for m in messages)
assert any("/api/lotto/latest" in m for m in messages)

View File

@@ -0,0 +1,53 @@
import pytest
import respx
import httpx
from app.service_proxy import fetch_service_logs
@pytest.mark.asyncio
@respx.mock
async def test_fetch_service_logs_filters_by_path_prefix():
# lotto 컨테이너 응답: lotto + personal 섞임
respx.get("http://lotto:8000/logs/recent").mock(
return_value=httpx.Response(200, json={
"logs": [
{"ts": "2026-05-28T10:00:00Z", "source": "access",
"method": "GET", "path": "/api/lotto/recommend",
"status": 200, "ms": 12,
"message": "GET /api/lotto/recommend → 200 (12ms)"},
{"ts": "2026-05-28T10:00:01Z", "source": "access",
"method": "GET", "path": "/api/blog/posts",
"status": 200, "ms": 5,
"message": "GET /api/blog/posts → 200 (5ms)"},
{"ts": "2026-05-28T10:00:02Z", "source": "log",
"logger": "lotto", "level": "info",
"message": "성과 통계 캐시 갱신"},
]
})
)
result = await fetch_service_logs("lotto", limit=50)
# lotto path 와 모든 log 이벤트만 통과
paths = [x.get("path") for x in result]
assert "/api/lotto/recommend" in paths
assert "/api/blog/posts" not in paths
# 비즈니스 로그도 포함
assert any(x["source"] == "log" and x["message"] == "성과 통계 캐시 갱신"
for x in result)
@pytest.mark.asyncio
async def test_fetch_service_logs_unknown_agent_returns_empty():
result = await fetch_service_logs("nonexistent", limit=50)
assert result == []
@pytest.mark.asyncio
@respx.mock
async def test_fetch_service_logs_handles_connection_error():
respx.get("http://lotto:8000/logs/recent").mock(
side_effect=httpx.ConnectError("connection refused")
)
result = await fetch_service_logs("lotto", limit=50)
assert result == []

View File

@@ -14,8 +14,15 @@ services:
- TZ=${TZ:-Asia/Seoul}
- LOTTO_ALL_URL=${LOTTO_ALL_URL:-https://smok95.github.io/lotto/results/all.json}
- LOTTO_LATEST_URL=${LOTTO_LATEST_URL:-https://smok95.github.io/lotto/results/latest.json}
- PYTHONPATH=/app:/shared
volumes:
- ${RUNTIME_PATH}/data:/app/data
- ./_shared:/shared:ro
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,362 @@
# Agent Office — Docker 로그 기반 통합 타임라인 설계
> 작성일: 2026-05-28
> 대상: web-backend (5개 lab + agent-office) + web-ui (LogTab)
## 배경
`/agent-office` 의 각 에이전트 상세 패널에 노출되는 **로그 탭** 이 현재는 의미가 빈약하다.
- 노출 소스는 `agent-office` 의 자체 SQLite `agent_logs` 테이블 한 곳뿐.
- `base.py BaseAgent.transition()` 가 매번 `State: idle -> working ({detail})` 형식 자동 로그를 기록 — 사용자가 실제로 무슨 일이 일어났는지 파악하기 어려운 노이즈가 다수.
- 각 에이전트가 실제로 호출하는 외부 서비스 컨테이너 (lotto / stock / music-lab / insta-lab / realestate-lab) 의 docker stdout 은 LogTab 에 한 줄도 흐르지 않는다.
따라서 LogTab 에서는 “이 에이전트가 어떤 API 를 불러서 어떤 응답을 받았는지” “외부 서비스에서 어떤 비즈니스 이벤트가 발생했는지” 가 보이지 않는다.
## 목표
1. 각 에이전트 LogTab 에 **해당 서비스 컨테이너의 의미 있는 docker 로그** 를 흘려보낸다.
2. healthcheck / static / OPTIONS 같은 노이즈 로그는 **서버 측에서 미리 차단** 한다.
3. API 호출 한 줄 (`POST /api/lotto/recommend → 200 142ms`) 과 비즈니스 이벤트 (`수집 완료: new=12, total=340`) 양쪽 모두 표시한다.
4. 에이전트 내부 동작 로그 (`agent_logs` DB) 와 서비스 로그를 **한 화면에 시간순으로 통합** 한다.
5. `State: idle -> working` 형식 자동 transition 로그는 제거한다.
## 비목표
- 실시간 WebSocket push (지금은 5초 폴링이면 충분).
- 컨테이너 외부 (NAS 호스트, Windows AI 서버) 로그 수집.
- 로그 검색 / 필터 UI (당장은 단순 시간순 표시).
- 다른 lab (image-lab / tarot-lab / saju-lab / packs-lab / video-lab) 은 1차 범위에서 제외 — 5개 활성 에이전트가 가리키는 5개 컨테이너만 다룬다.
## 결정사항 요약
| 항목 | 결정 |
|---|---|
| 수집 방식 | 각 서비스가 `/logs/recent` 엔드포인트 노출 + agent-office 가 polling |
| 표시 방식 | 통합 타임라인 (agent 로그 + service 로그 시간순 merge) |
| 로그 범위 | 액세스 로그 (healthcheck 제외) + 비즈니스 이벤트 (logger.info/warning/error) |
| ring buffer 크기 | 컨테이너당 500개, in-memory deque |
| docker logs retention | `max-size 10m × max-file 3` = 서비스당 30MB |
| agent_logs DB retention | **90일** (매일 03:00 cleanup) |
| state 자동 로그 | 제거 (`base.py BaseAgent.transition()``add_log("State: ...")`) |
| 자동 수집 메커니즘 | Python `logging.Handler` 를 BufferLogHandler 로 등록 — 기존 logger.info/warning/error 호출이 자동으로 ring buffer 에 흐름 |
## 아키텍처
```
┌─────────────────────────────────────────────────────────────┐
│ web-ui (LogTab) │
│ ─ GET /api/agent-office/agents/{id}/logs?limit=N │
│ ─ 5초 폴링 (기존 refreshTrigger 흐름 재활용) │
│ ─ source 뱃지 표시 (access | log | agent) │
└─────────────────────────────────────────────────────────────┘
│ 통합 타임라인 (시간순 merge)
┌─────────────────────────────────────────────────────────────┐
│ agent-office │
│ - get_merged_logs(agent_id, limit) = │
│ agent_logs (state 로그 제외) │
│ + service_proxy.fetch_logs(container, path_prefix) │
│ → ts 기준 정렬 → 최근 N개 │
│ - 매핑: AGENT_CONTAINER_MAP │
│ stock → ("stock", "/api/(stock|trade|portfolio)") │
│ music → ("music-lab", "/api/music") │
│ insta → ("insta-lab", "/api/insta") │
│ realestate → ("realestate-lab", "/api/realestate") │
│ lotto → ("lotto-backend", "/api/lotto") │
└─────────────────────────────────────────────────────────────┘
│ GET http://{container}:{port}/logs/recent
│ ?since=ISO&limit=N&path_prefix=...
│ (내부 docker 네트워크 only, nginx public 라우팅 X)
┌─────────────────────────────────────────────────────────────┐
│ 각 서비스 컨테이너 (5개) │
│ 공용 모듈 _shared/access_log.py: │
│ - LogBuffer: collections.deque(maxlen=500) │
│ - AccessLogMiddleware: 모든 요청 후 한 줄 기록 │
│ 제외: /health /healthz /ping /favicon /docs /redoc │
│ /openapi.json /logs/recent OPTIONS HEAD │
│ - BufferLogHandler: logger.info/warning/error 자동 캡처 │
│ - /logs/recent 라우터 │
└─────────────────────────────────────────────────────────────┘
```
## 공용 모듈 — `web-backend/_shared/access_log.py`
```python
from collections import deque
from datetime import datetime
from fastapi import APIRouter, Request
from starlette.middleware.base import BaseHTTPMiddleware
import logging
import time
_BUFFER = deque(maxlen=500)
EXCLUDED_PATHS = {"/health", "/healthz", "/ping", "/favicon.ico",
"/docs", "/redoc", "/openapi.json", "/logs/recent"}
EXCLUDED_PREFIXES = ("/static/",)
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}
def _should_log(request: Request) -> bool:
if request.method in EXCLUDED_METHODS:
return False
path = request.url.path
if path in EXCLUDED_PATHS:
return False
if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
return False
return True
class AccessLogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.time()
response = await call_next(request)
if not _should_log(request):
return response
elapsed_ms = int((time.time() - start) * 1000)
status = response.status_code
_BUFFER.append({
"ts": datetime.utcnow().isoformat() + "Z",
"level": "info" if status < 400 else
"warning" if status < 500 else "error",
"source": "access",
"method": request.method,
"path": request.url.path,
"status": status,
"ms": elapsed_ms,
"message": f"{request.method} {request.url.path}{status} ({elapsed_ms}ms)",
})
return response
class BufferLogHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
try:
_BUFFER.append({
"ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
"level": record.levelname.lower(),
"source": "log",
"logger": record.name,
"message": record.getMessage(),
})
except Exception:
pass
router = APIRouter()
@router.get("/logs/recent")
def logs_recent(limit: int = 200, since: str | None = None,
path_prefix: str | None = None):
items = list(_BUFFER)
if since:
items = [x for x in items if x["ts"] > since]
if path_prefix:
items = [x for x in items
if x["source"] == "log" or x.get("path", "").startswith(path_prefix)]
return {"logs": items[-limit:]}
def install(app, logger_root: str = ""):
"""서비스 main.py 가 호출하는 단일 설치 함수."""
app.add_middleware(AccessLogMiddleware)
app.include_router(router)
logging.getLogger(logger_root).addHandler(BufferLogHandler())
```
### 각 서비스 main.py 적용
```python
from _shared.access_log import install as install_access_log
install_access_log(app)
```
## docker-compose 변경
5개 서비스 (`lotto-backend`, `stock`, `music-lab`, `insta-lab`, `realestate-lab`) 에 동일 패턴 추가:
```yaml
environment:
- PYTHONPATH=/app:/shared
volumes:
- ../_shared:/shared:ro
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
```
`/logs/recent`**nginx default.conf 의 public location 블록에 추가하지 않는다**. 내부 docker 네트워크에서 `http://{container_name}:{port}/logs/recent` 로만 접근.
## agent-office 측 변경
### `app/constants.py`
```python
AGENT_CONTAINER_MAP = {
"stock": ("stock", 8000, r"^/api/(stock|trade|portfolio)"),
"music": ("music-lab", 8000, r"^/api/music"),
"insta": ("insta-lab", 8000, r"^/api/insta"),
"realestate": ("realestate-lab", 8000, r"^/api/realestate"),
"lotto": ("lotto-backend", 8000, r"^/api/lotto"),
}
```
### `app/service_proxy.py`
```python
async def fetch_service_logs(agent_id: str, since: str | None = None,
limit: int = 200) -> list[dict]:
mapping = AGENT_CONTAINER_MAP.get(agent_id)
if not mapping:
return []
host, port, path_re = mapping
url = f"http://{host}:{port}/logs/recent"
params = {"limit": limit}
if since:
params["since"] = since
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(url, params=params)
data = resp.json().get("logs", [])
except Exception as e:
logger.warning("fetch_service_logs(%s) 실패: %s", agent_id, e)
return []
# path_prefix 필터: access 로그만 path_re 검증
return [x for x in data if x["source"] == "log"
or re.match(path_re, x.get("path", ""))]
```
### `app/db.py`
```python
def get_logs(agent_id: str, limit: int = 50) -> list[dict]:
# 'State: ...' 자동 로그 제외 (사용자 요청)
rows = conn.execute("""
SELECT * FROM agent_logs
WHERE agent_id=?
AND message NOT LIKE 'State: %'
ORDER BY created_at DESC LIMIT ?
""", (agent_id, limit)).fetchall()
return [...]
def delete_old_logs(days: int = 90) -> int:
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
with _conn() as conn:
c = conn.execute("DELETE FROM agent_logs WHERE created_at < ?", (cutoff,))
return c.rowcount
```
### `app/main.py`
```python
@app.get("/api/agent-office/agents/{agent_id}/logs")
async def agent_logs(agent_id: str, limit: int = 50):
agent_items = get_logs(agent_id, limit=limit)
service_items = await fetch_service_logs(agent_id, limit=limit)
merged = sorted(agent_items + service_items,
key=lambda x: x.get("ts") or x.get("created_at"),
reverse=True)[:limit]
return {"logs": merged}
```
### `app/agents/base.py`
```python
async def transition(self, new_state, detail="", task_id=None):
# add_log(... "State: ...") 호출 삭제 — 사용자 요청
...
# ws_manager 알림은 유지
```
### `app/scheduler.py`
```python
scheduler.add_job(
lambda: delete_old_logs(days=90),
CronTrigger(hour=3, minute=0),
id="cleanup_old_logs",
)
```
## web-ui 측 변경
### `src/pages/agent-office/components/LogTab.jsx`
- log row schema 가 두 가지로 늘어남: agent_logs `{level, message, created_at}` vs service `{ts, level, source, method, path, status, ms, message}`.
- source 뱃지를 추가로 표시: `[ACCESS]` / `[LOG]` / `[AGENT]`.
- access 로그는 method + path + status + ms 를 보조 라인으로 표시.
색상 가이드:
- `source=access` 청록 (#5eead4)
- `source=log` 파랑 (#60a5fa)
- `level=warning` 노랑 (#fbbf24)
- `level=error` 빨강 (#ef4444)
- `source=agent` (agent_logs) 회색 (#9ca3af)
## Phase 분리
대규모 변경이라 단일 PR 위험. 3단계로 나눠 진행.
### Phase 1 — PoC (가장 우선)
1. `web-backend/_shared/access_log.py` 신설.
2. `web-backend/lotto/app/main.py` 한 곳에만 `install_access_log(app)` 추가.
3. `web-backend/docker-compose.yml``lotto-backend` 서비스에 PYTHONPATH + volume + logging 추가.
4. `agent-office``service_proxy.fetch_service_logs()` + `AGENT_CONTAINER_MAP` (lotto 만) + `get_logs(agent_id)` merge.
5. `LogTab.jsx` 가 source 뱃지를 표시하도록 확장.
6. base.py `State: ...` 자동 로그 제거 + `db.get_logs()` NOT LIKE 필터 추가.
검증: `/agent-office` 에서 lotto 에이전트 선택 → LogTab 에 `POST /api/lotto/...` 한 줄과 기존 logger.info 출력이 같이 보이는지.
### Phase 2 — 4개 서비스 확장
1. stock / music-lab / insta-lab / realestate-lab 의 `main.py``install_access_log(app)` 추가.
2. docker-compose 4개 서비스 동일 패턴 적용.
3. `AGENT_CONTAINER_MAP` 에 4개 매핑 추가.
4. `delete_old_logs` cleanup job 등록.
검증: 5개 에이전트 모두 LogTab 에서 의미 있는 로그 노출.
### Phase 3 — 비즈니스 이벤트 보강
디자인 4/5 의 "추가 권장" 표 항목들을 `logger.info(...)` 한 줄씩 추가. 약 1015줄.
- stock: Order 응답, AI Coach 호출, 스크리너 결과
- music-lab: 생성 시작/완료
- insta-lab: 키워드 추출 완료, 슬레이트 생성 완료, 발행 결과
- lotto-backend: AI 큐레이터 호출/응답, 점수 계산 완료
## 알려진 위험과 완화
| 위험 | 완화 |
|---|---|
| `/logs/recent` 가 외부로 노출되면 access pattern + 내부 동작 노출 | nginx public location 에 등재하지 않음 + 내부 docker 네트워크만 |
| 각 서비스의 logger 가 propagate 설정이 달라 BufferLogHandler 에 안 흐를 가능성 | `install()` 에서 `logging.getLogger("")` (root) 에 핸들러 등록 — 모든 child logger 가 자동 전파 |
| BufferLogHandler 의 `emit()` 가 다른 핸들러의 포맷팅에 영향 | `Handler.emit` 만 override, formatter 사용 안 함 |
| ring buffer 가 0.5초당 수십 건 트래픽으로 가득 차서 30초 분량밖에 안 남음 | 500개는 평소 트래픽 기준 1시간 이상 보관. 모니터링하다 부족하면 1000 으로 상향 |
| `lotto-backend` 컨테이너의 personal/blog/todo API 가 lotto 에이전트 로그에 섞임 | `AGENT_CONTAINER_MAP` 의 path_prefix 정규식으로 `/api/lotto` 만 매칭 — 다른 prefix 는 자연스럽게 필터 |
| docker-compose volume `../_shared:/shared:ro` 가 NAS 운영 환경에서 경로 차이로 깨질 가능성 | repo 의 상대경로 (`../_shared`) 는 NAS 의 `/volume1/docker/webpage/backend/_shared` 와 동일 구조로 git pull 됨. Gitea webhook 으로 push 되는 경로에 `_shared/` 디렉토리도 함께 포함됨을 deployer rsync 시 검증 |
## 변경 파일 요약
```
■ 신설
web-backend/_shared/__init__.py
web-backend/_shared/access_log.py
■ web-backend
lotto/app/main.py + install_access_log + 추가 logger.info 34개 (Phase 3)
stock/app/main.py + install_access_log + 추가 logger.info 3개 (Phase 3)
music-lab/app/main.py + install_access_log + 추가 logger.info 2개 (Phase 3)
insta-lab/app/main.py + install_access_log + 추가 logger.info 3개 (Phase 3)
realestate-lab/app/main.py + install_access_log (Phase 3 추가 없음)
docker-compose.yml 5개 서비스 PYTHONPATH/volume/logging 추가
■ web-backend/agent-office
app/service_proxy.py + fetch_service_logs(agent_id, ...)
app/main.py agent_logs 엔드포인트가 merge 사용
app/db.py + delete_old_logs + get_logs NOT LIKE 'State: %'
app/scheduler.py + 매일 03:00 cleanup job
app/agents/base.py transition() 의 add_log('State: ...') 제거
app/constants.py + AGENT_CONTAINER_MAP
■ web-ui
src/pages/agent-office/components/LogTab.jsx
source 뱃지 + access 로그 method/status/ms 표시
```

View File

@@ -5,6 +5,7 @@ from typing import Optional, List, Dict, Any, Tuple
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from apscheduler.schedulers.background import BackgroundScheduler
from _shared.access_log import install as install_access_log
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
logger = logging.getLogger("lotto-backend")
@@ -49,6 +50,7 @@ from .routers import review as review_router
from .jobs.grade_weekly_review import run_for_latest as grade_run_for_latest
app = FastAPI()
install_access_log(app)
app.include_router(curator_router.router)
app.include_router(briefing_router.router)
app.include_router(review_router.router)

View File

@@ -370,6 +370,7 @@ server {
}
# tarot-lab API (agent-office에서 분리)
# Claude Sonnet 3-card 해석은 30~90초 + truncation reroll 시 추가. 600s 안전 마진.
location /api/tarot/ {
resolver 127.0.0.11 valid=10s;
set $tarot_backend tarot-lab:8000;
@@ -378,8 +379,8 @@ server {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_read_timeout 600s;
proxy_send_timeout 600s;
proxy_connect_timeout 60s;
proxy_pass http://$tarot_backend$request_uri;
}