11 Commits

Author SHA1 Message Date
4224333219 refactor(agent-office/base): transition의 State 자동 로그 제거
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:47:41 +09:00
5613497367 feat(agent-office): /agents/{id}/logs 엔드포인트가 service /logs/recent 와 merge
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:46:44 +09:00
b25abea80a feat(agent-office/db): get_logs에서 State: 자동 로그 제외 + delete_old_logs(90일) 2026-05-28 02:45:10 +09:00
ed30790f22 feat(agent-office): fetch_service_logs 추가 (path_prefix 정규식 필터)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:42:23 +09:00
1d723764b4 feat(agent-office): AGENT_CONTAINER_MAP 상수 추가 (Phase 1 lotto) 2026-05-28 02:40:18 +09:00
c0c4422c7c build(lotto): PYTHONPATH=_shared + json-file logging 추가 (Phase 1 PoC) 2026-05-28 02:38:47 +09:00
fe4d3912a5 feat(lotto): _shared/access_log install (Phase 1 PoC) 2026-05-28 02:37:19 +09:00
f461f05ac0 feat(_shared): access_log 공용 모듈 추가 (ring buffer + middleware + /logs/recent)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:31:30 +09:00
dfd3b1bb17 docs(agent-office): docker logs 통합 타임라인 구현 계획
21개 task로 분해된 3-phase 실행 계획. Phase 1 (PoC, lotto 단일),
Phase 2 (4개 서비스 확장 + cleanup 스케줄러), Phase 3 (비즈니스 이벤트
보강). TDD 기반 task 구성, 모든 step 에 실제 코드/명령 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 01:44:43 +09:00
809eec9b15 docs(agent-office): docker logs 통합 타임라인 설계 spec
agent-office LogTab에 각 서비스의 docker 로그(액세스 + 비즈니스 이벤트)를
통합 타임라인으로 노출하는 아키텍처를 정의. 5개 서비스 공용 _shared/access_log
모듈, ring buffer 기반 /logs/recent 엔드포인트, agent-office 측 merge 로직,
3단계 phase 분리 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 01:23:21 +09:00
512ed59dcd fix(nginx): /api/tarot/ proxy_read/send_timeout 300s→600s
3-card Claude 해석 응답이 truncation reroll 발생 시 90초+ 걸려 DSM Reverse
Proxy 또는 docker nginx에서 504 가능성. 600s 안전 마진 (saju/music과 동일
수준).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 13:19:24 +09:00
17 changed files with 2453 additions and 9 deletions

1
_shared/__init__.py Normal file
View File

@@ -0,0 +1 @@
# empty

112
_shared/access_log.py Normal file
View File

@@ -0,0 +1,112 @@
"""각 lab 컨테이너에서 import 하는 공용 액세스/이벤트 로그 모듈.
사용법:
from _shared.access_log import install as install_access_log
install_access_log(app)
"""
from collections import deque
from datetime import datetime
from typing import Optional
import logging
import time
from fastapi import APIRouter, Request
from fastapi.applications import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
# 컨테이너당 최근 500개를 in-memory 로 유지. 재시작 시 휘발.
_BUFFER: deque = deque(maxlen=500)
EXCLUDED_PATHS = {
"/health", "/healthz", "/ping", "/favicon.ico",
"/docs", "/redoc", "/openapi.json", "/logs/recent",
}
EXCLUDED_PREFIXES = ("/static/",)
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}
def _should_log(request: Request) -> bool:
if request.method in EXCLUDED_METHODS:
return False
path = request.url.path
if path in EXCLUDED_PATHS:
return False
if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
return False
return True
class AccessLogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.time()
response = await call_next(request)
if not _should_log(request):
return response
elapsed_ms = int((time.time() - start) * 1000)
status = response.status_code
if status < 400:
level = "info"
elif status < 500:
level = "warning"
else:
level = "error"
_BUFFER.append({
"ts": datetime.utcnow().isoformat() + "Z",
"level": level,
"source": "access",
"method": request.method,
"path": request.url.path,
"status": status,
"ms": elapsed_ms,
"message": f"{request.method} {request.url.path}{status} ({elapsed_ms}ms)",
})
return response
class BufferLogHandler(logging.Handler):
"""root logger 에 부착하면 모든 logger.info/warning/error 가 buffer 에 흐름."""
def emit(self, record: logging.LogRecord) -> None:
try:
_BUFFER.append({
"ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
"level": record.levelname.lower(),
"source": "log",
"logger": record.name,
"message": record.getMessage(),
})
except Exception:
# buffer 에 못 넣는다고 서비스가 죽으면 안 됨
pass
router = APIRouter()
@router.get("/logs/recent")
def logs_recent(limit: int = 200, since: Optional[str] = None,
path_prefix: Optional[str] = None):
items = list(_BUFFER)
if since:
items = [x for x in items if x["ts"] > since]
if path_prefix:
items = [
x for x in items
if x["source"] == "log"
or x.get("path", "").startswith(path_prefix)
]
return {"logs": items[-limit:]}
def install(app: FastAPI, logger_root: str = "") -> None:
"""서비스 main.py 에서 호출하는 단일 설치 함수.
- AccessLogMiddleware 등록
- /logs/recent 라우터 등록
- root logger 에 BufferLogHandler 부착 (모든 child logger 자동 전파)
"""
app.add_middleware(AccessLogMiddleware)
app.include_router(router)
root = logging.getLogger(logger_root)
if not any(isinstance(h, BufferLogHandler) for h in root.handlers):
root.addHandler(BufferLogHandler())

View File

View File

@@ -0,0 +1,129 @@
import logging
import time
from fastapi import FastAPI
from fastapi.testclient import TestClient
from _shared.access_log import (
AccessLogMiddleware,
BufferLogHandler,
router as logs_router,
install,
_BUFFER,
)
def _reset_buffer():
_BUFFER.clear()
def test_access_middleware_records_request():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
items = [x for x in _BUFFER if x["source"] == "access"]
assert len(items) == 1
assert items[0]["method"] == "GET"
assert items[0]["path"] == "/api/lotto/recommend"
assert items[0]["status"] == 200
assert items[0]["ms"] >= 0
def test_access_middleware_skips_health():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/health")
def health():
return {"ok": True}
client = TestClient(app)
client.get("/health")
items = [x for x in _BUFFER if x["source"] == "access"]
assert items == []
def test_access_middleware_skips_options():
_reset_buffer()
app = FastAPI()
app.add_middleware(AccessLogMiddleware)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.options("/api/lotto/recommend")
items = [x for x in _BUFFER if x["source"] == "access"]
assert items == []
def test_buffer_log_handler_captures_logger_info():
_reset_buffer()
root = logging.getLogger("")
handler = BufferLogHandler()
root.addHandler(handler)
try:
lg = logging.getLogger("lotto.test")
lg.setLevel(logging.INFO)
lg.info("뉴스 스크래핑 완료: 국내 12건")
finally:
root.removeHandler(handler)
items = [x for x in _BUFFER if x["source"] == "log"]
assert len(items) == 1
assert items[0]["message"] == "뉴스 스크래핑 완료: 국내 12건"
assert items[0]["level"] == "info"
assert items[0]["logger"] == "lotto.test"
def test_logs_recent_endpoint_returns_recent_items():
_reset_buffer()
app = FastAPI()
install(app)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
client.get("/api/lotto/recommend")
client.get("/health") # 제외되어야 함
resp = client.get("/logs/recent")
assert resp.status_code == 200
logs = resp.json()["logs"]
access_items = [x for x in logs if x["source"] == "access"]
assert len(access_items) == 2
def test_logs_recent_with_since_filter():
_reset_buffer()
app = FastAPI()
install(app)
@app.get("/api/lotto/recommend")
def recommend():
return {"ok": True}
client = TestClient(app)
client.get("/api/lotto/recommend")
time.sleep(0.01)
cursor_resp = client.get("/logs/recent")
cursor_ts = cursor_resp.json()["logs"][-1]["ts"]
client.get("/api/lotto/recommend")
resp = client.get(f"/logs/recent?since={cursor_ts}")
items = [x for x in resp.json()["logs"] if x["source"] == "access"]
assert len(items) == 1

View File

@@ -1,8 +1,6 @@
import time import time
from typing import Optional from typing import Optional
from ..db import add_log
VALID_STATES = ("idle", "working", "waiting", "reporting") VALID_STATES = ("idle", "working", "waiting", "reporting")
class BaseAgent: class BaseAgent:
@@ -29,8 +27,6 @@ class BaseAgent:
if new_state == "idle": if new_state == "idle":
self._idle_since = time.time() self._idle_since = time.time()
add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})")
if self._ws_manager: if self._ws_manager:
await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id) await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id)
if new_state == "working" and old != "working": if new_state == "working" and old != "working":

View File

@@ -38,3 +38,17 @@ LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9"))
LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25")) LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25"))
LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6")) LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6"))
LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3")) LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3"))
import re as _re
# 에이전트 → (container_host, port, path_prefix_regex)
# path_prefix_regex: lotto 컨테이너에 personal/blog/todo 도 같이 있어
# /api/lotto 만 골라내기 위한 정규식. business log (source='log') 는 모두 통과.
AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = {
"lotto": ("lotto", 8000, _re.compile(r"^/api/lotto")),
# Phase 2 에서 추가:
# "stock": ("stock", 8000, _re.compile(r"^/api/(stock|trade|portfolio)")),
# "music": ("music-lab", 8000, _re.compile(r"^/api/music")),
# "insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")),
# "realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),
}

View File

@@ -321,7 +321,13 @@ def add_log(agent_id: str, message: str, level: str = "info", task_id: str = Non
def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]: def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
with _conn() as conn: with _conn() as conn:
rows = conn.execute( rows = conn.execute(
"SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?", """
SELECT * FROM agent_logs
WHERE agent_id = ?
AND message NOT LIKE 'State: %'
ORDER BY created_at DESC
LIMIT ?
""",
(agent_id, limit), (agent_id, limit),
).fetchall() ).fetchall()
return [ return [
@@ -332,6 +338,7 @@ def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
"level": r["level"], "level": r["level"],
"message": r["message"], "message": r["message"],
"created_at": r["created_at"], "created_at": r["created_at"],
"source": "agent",
} }
for r in rows for r in rows
] ]
@@ -588,6 +595,20 @@ def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
return {"items": items, "total": total} return {"items": items, "total": total}
import datetime as _dt
def delete_old_logs(days: int = 90) -> int:
"""retention 정책: N일 이전 agent_logs 삭제. 매일 03:00 스케줄러가 호출."""
cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=days)).isoformat()
with _conn() as conn:
c = conn.execute(
"DELETE FROM agent_logs WHERE created_at < ?",
(cutoff,),
)
return c.rowcount
# ── youtube_research_jobs CRUD ──────────────────────────────────────────────── # ── youtube_research_jobs CRUD ────────────────────────────────────────────────
def add_youtube_research_job(countries: list) -> int: def add_youtube_research_job(countries: list) -> int:

View File

@@ -116,8 +116,18 @@ def agent_tasks(
return {"tasks": tasks_list, "items": tasks_list} return {"tasks": tasks_list, "items": tasks_list}
@app.get("/api/agent-office/agents/{agent_id}/logs") @app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50): async def agent_logs(agent_id: str, limit: int = 50):
return {"logs": get_logs(agent_id, limit)} from .service_proxy import fetch_service_logs
agent_items = get_logs(agent_id, limit=limit)
service_items = await fetch_service_logs(agent_id, limit=limit)
def _sort_key(x):
# agent_logs: created_at, service: ts
return x.get("ts") or x.get("created_at") or ""
merged = sorted(agent_items + service_items, key=_sort_key, reverse=True)
return {"logs": merged[:limit]}
@app.get("/api/agent-office/tasks/pending") @app.get("/api/agent-office/tasks/pending")
def pending_tasks(): def pending_tasks():

View File

@@ -1,8 +1,11 @@
import httpx import httpx
import logging
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from .config import STOCK_URL, MUSIC_LAB_URL, INSTA_LAB_URL, REALESTATE_LAB_URL from .config import STOCK_URL, MUSIC_LAB_URL, INSTA_LAB_URL, REALESTATE_LAB_URL
logger = logging.getLogger(__name__)
_client = httpx.AsyncClient(timeout=30.0) _client = httpx.AsyncClient(timeout=30.0)
async def fetch_stock_news(limit: int = 10, category: str = None) -> List[Dict[str, Any]]: async def fetch_stock_news(limit: int = 10, category: str = None) -> List[Dict[str, Any]]:
@@ -394,3 +397,38 @@ async def lotto_evolver_evaluate() -> Dict[str, Any]:
resp = await client.post(f"{LOTTO_BACKEND_URL}/api/lotto/evolver/evaluate-now") resp = await client.post(f"{LOTTO_BACKEND_URL}/api/lotto/evolver/evaluate-now")
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
from .config import AGENT_CONTAINER_MAP
async def fetch_service_logs(
agent_id: str,
since: Optional[str] = None,
limit: int = 200,
) -> List[Dict[str, Any]]:
"""해당 에이전트가 가리키는 컨테이너의 /logs/recent 를 호출해서
path_prefix 정규식으로 필터한 결과를 반환.
네트워크 실패 시 빈 리스트를 반환하고 warning 만 남김 (LogTab 이 죽지 않게).
"""
mapping = AGENT_CONTAINER_MAP.get(agent_id)
if not mapping:
return []
host, port, path_re = mapping
url = f"http://{host}:{port}/logs/recent"
params: Dict[str, Any] = {"limit": limit}
if since:
params["since"] = since
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(url, params=params)
data = resp.json().get("logs", [])
except Exception as e:
logger.warning("fetch_service_logs(%s) 실패: %s", agent_id, e)
return []
return [
x for x in data
if x.get("source") == "log"
or path_re.match(x.get("path", "") or "")
]

View File

@@ -93,6 +93,41 @@ def test_telegram_state():
print(" [PASS] test_telegram_state") print(" [PASS] test_telegram_state")
def test_get_logs_excludes_state_messages():
init_db()
add_log("stock", "State: idle -> working (큐레이션 시작)")
add_log("stock", "뉴스 12건 스크랩 완료")
add_log("stock", "State: working -> idle ()")
logs = get_logs("stock", limit=10)
messages = [x["message"] for x in logs]
assert "뉴스 12건 스크랩 완료" in messages
assert not any(m.startswith("State: ") for m in messages)
def test_delete_old_logs_removes_beyond_retention():
import datetime as _dt
from app.db import delete_old_logs, _conn
init_db()
add_log("stock", "오래된 로그")
# 강제로 200일 전으로 옮김
cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=200)).isoformat()
with _conn() as conn:
conn.execute(
"UPDATE agent_logs SET created_at = ? WHERE message = '오래된 로그'",
(cutoff,),
)
add_log("stock", "최근 로그")
deleted = delete_old_logs(days=90)
assert deleted >= 1
msgs = [x["message"] for x in get_logs("stock", limit=20)]
assert "최근 로그" in msgs
assert "오래된 로그" not in msgs
if __name__ == "__main__": if __name__ == "__main__":
test_init_and_seed() test_init_and_seed()
test_agent_config_update() test_agent_config_update()

View File

@@ -0,0 +1,47 @@
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from app.main import app
from app.db import add_log, _conn
@pytest.fixture(autouse=True)
def _clean_logs():
with _conn() as conn:
conn.execute("DELETE FROM agent_logs WHERE agent_id = 'lotto'")
yield
@respx.mock
def test_agent_logs_endpoint_merges_db_and_service_logs():
add_log("lotto", "큐레이션 완료: #1234 conf=0.78")
respx.get("http://lotto:8000/logs/recent").mock(
return_value=httpx.Response(200, json={
"logs": [
{"ts": "2026-05-28T10:00:00Z", "source": "access",
"method": "GET", "path": "/api/lotto/latest",
"status": 200, "ms": 8,
"message": "GET /api/lotto/latest → 200 (8ms)"},
{"ts": "2026-05-28T10:00:02Z", "source": "log",
"logger": "lotto", "level": "info",
"message": "성과 통계 캐시 갱신"},
]
})
)
client = TestClient(app)
resp = client.get("/api/agent-office/agents/lotto/logs?limit=20")
assert resp.status_code == 200
logs = resp.json()["logs"]
sources = {x["source"] for x in logs}
assert "agent" in sources
assert "access" in sources
assert "log" in sources
messages = [x["message"] for x in logs]
assert any("큐레이션 완료" in m for m in messages)
assert any("성과 통계 캐시 갱신" in m for m in messages)
assert any("/api/lotto/latest" in m for m in messages)

View File

@@ -0,0 +1,53 @@
import pytest
import respx
import httpx
from app.service_proxy import fetch_service_logs
@pytest.mark.asyncio
@respx.mock
async def test_fetch_service_logs_filters_by_path_prefix():
# lotto 컨테이너 응답: lotto + personal 섞임
respx.get("http://lotto:8000/logs/recent").mock(
return_value=httpx.Response(200, json={
"logs": [
{"ts": "2026-05-28T10:00:00Z", "source": "access",
"method": "GET", "path": "/api/lotto/recommend",
"status": 200, "ms": 12,
"message": "GET /api/lotto/recommend → 200 (12ms)"},
{"ts": "2026-05-28T10:00:01Z", "source": "access",
"method": "GET", "path": "/api/blog/posts",
"status": 200, "ms": 5,
"message": "GET /api/blog/posts → 200 (5ms)"},
{"ts": "2026-05-28T10:00:02Z", "source": "log",
"logger": "lotto", "level": "info",
"message": "성과 통계 캐시 갱신"},
]
})
)
result = await fetch_service_logs("lotto", limit=50)
# lotto path 와 모든 log 이벤트만 통과
paths = [x.get("path") for x in result]
assert "/api/lotto/recommend" in paths
assert "/api/blog/posts" not in paths
# 비즈니스 로그도 포함
assert any(x["source"] == "log" and x["message"] == "성과 통계 캐시 갱신"
for x in result)
@pytest.mark.asyncio
async def test_fetch_service_logs_unknown_agent_returns_empty():
result = await fetch_service_logs("nonexistent", limit=50)
assert result == []
@pytest.mark.asyncio
@respx.mock
async def test_fetch_service_logs_handles_connection_error():
respx.get("http://lotto:8000/logs/recent").mock(
side_effect=httpx.ConnectError("connection refused")
)
result = await fetch_service_logs("lotto", limit=50)
assert result == []

View File

@@ -14,8 +14,15 @@ services:
- TZ=${TZ:-Asia/Seoul} - TZ=${TZ:-Asia/Seoul}
- LOTTO_ALL_URL=${LOTTO_ALL_URL:-https://smok95.github.io/lotto/results/all.json} - LOTTO_ALL_URL=${LOTTO_ALL_URL:-https://smok95.github.io/lotto/results/all.json}
- LOTTO_LATEST_URL=${LOTTO_LATEST_URL:-https://smok95.github.io/lotto/results/latest.json} - LOTTO_LATEST_URL=${LOTTO_LATEST_URL:-https://smok95.github.io/lotto/results/latest.json}
- PYTHONPATH=/app:/shared
volumes: volumes:
- ${RUNTIME_PATH}/data:/app/data - ${RUNTIME_PATH}/data:/app/data
- ./_shared:/shared:ro
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s interval: 60s

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,362 @@
# Agent Office — Docker 로그 기반 통합 타임라인 설계
> 작성일: 2026-05-28
> 대상: web-backend (5개 lab + agent-office) + web-ui (LogTab)
## 배경
`/agent-office` 의 각 에이전트 상세 패널에 노출되는 **로그 탭** 이 현재는 의미가 빈약하다.
- 노출 소스는 `agent-office` 의 자체 SQLite `agent_logs` 테이블 한 곳뿐.
- `base.py BaseAgent.transition()` 가 매번 `State: idle -> working ({detail})` 형식 자동 로그를 기록 — 사용자가 실제로 무슨 일이 일어났는지 파악하기 어려운 노이즈가 다수.
- 각 에이전트가 실제로 호출하는 외부 서비스 컨테이너 (lotto / stock / music-lab / insta-lab / realestate-lab) 의 docker stdout 은 LogTab 에 한 줄도 흐르지 않는다.
따라서 LogTab 에서는 “이 에이전트가 어떤 API 를 불러서 어떤 응답을 받았는지” “외부 서비스에서 어떤 비즈니스 이벤트가 발생했는지” 가 보이지 않는다.
## 목표
1. 각 에이전트 LogTab 에 **해당 서비스 컨테이너의 의미 있는 docker 로그** 를 흘려보낸다.
2. healthcheck / static / OPTIONS 같은 노이즈 로그는 **서버 측에서 미리 차단** 한다.
3. API 호출 한 줄 (`POST /api/lotto/recommend → 200 142ms`) 과 비즈니스 이벤트 (`수집 완료: new=12, total=340`) 양쪽 모두 표시한다.
4. 에이전트 내부 동작 로그 (`agent_logs` DB) 와 서비스 로그를 **한 화면에 시간순으로 통합** 한다.
5. `State: idle -> working` 형식 자동 transition 로그는 제거한다.
## 비목표
- 실시간 WebSocket push (지금은 5초 폴링이면 충분).
- 컨테이너 외부 (NAS 호스트, Windows AI 서버) 로그 수집.
- 로그 검색 / 필터 UI (당장은 단순 시간순 표시).
- 다른 lab (image-lab / tarot-lab / saju-lab / packs-lab / video-lab) 은 1차 범위에서 제외 — 5개 활성 에이전트가 가리키는 5개 컨테이너만 다룬다.
## 결정사항 요약
| 항목 | 결정 |
|---|---|
| 수집 방식 | 각 서비스가 `/logs/recent` 엔드포인트 노출 + agent-office 가 polling |
| 표시 방식 | 통합 타임라인 (agent 로그 + service 로그 시간순 merge) |
| 로그 범위 | 액세스 로그 (healthcheck 제외) + 비즈니스 이벤트 (logger.info/warning/error) |
| ring buffer 크기 | 컨테이너당 500개, in-memory deque |
| docker logs retention | `max-size 10m × max-file 3` = 서비스당 30MB |
| agent_logs DB retention | **90일** (매일 03:00 cleanup) |
| state 자동 로그 | 제거 (`base.py BaseAgent.transition()``add_log("State: ...")`) |
| 자동 수집 메커니즘 | Python `logging.Handler` 를 BufferLogHandler 로 등록 — 기존 logger.info/warning/error 호출이 자동으로 ring buffer 에 흐름 |
## 아키텍처
```
┌─────────────────────────────────────────────────────────────┐
│ web-ui (LogTab) │
│ ─ GET /api/agent-office/agents/{id}/logs?limit=N │
│ ─ 5초 폴링 (기존 refreshTrigger 흐름 재활용) │
│ ─ source 뱃지 표시 (access | log | agent) │
└─────────────────────────────────────────────────────────────┘
│ 통합 타임라인 (시간순 merge)
┌─────────────────────────────────────────────────────────────┐
│ agent-office │
│ - get_merged_logs(agent_id, limit) = │
│ agent_logs (state 로그 제외) │
│ + service_proxy.fetch_logs(container, path_prefix) │
│ → ts 기준 정렬 → 최근 N개 │
│ - 매핑: AGENT_CONTAINER_MAP │
│ stock → ("stock", "/api/(stock|trade|portfolio)") │
│ music → ("music-lab", "/api/music") │
│ insta → ("insta-lab", "/api/insta") │
│ realestate → ("realestate-lab", "/api/realestate") │
│ lotto → ("lotto-backend", "/api/lotto") │
└─────────────────────────────────────────────────────────────┘
│ GET http://{container}:{port}/logs/recent
│ ?since=ISO&limit=N&path_prefix=...
│ (내부 docker 네트워크 only, nginx public 라우팅 X)
┌─────────────────────────────────────────────────────────────┐
│ 각 서비스 컨테이너 (5개) │
│ 공용 모듈 _shared/access_log.py: │
│ - LogBuffer: collections.deque(maxlen=500) │
│ - AccessLogMiddleware: 모든 요청 후 한 줄 기록 │
│ 제외: /health /healthz /ping /favicon /docs /redoc │
│ /openapi.json /logs/recent OPTIONS HEAD │
│ - BufferLogHandler: logger.info/warning/error 자동 캡처 │
│ - /logs/recent 라우터 │
└─────────────────────────────────────────────────────────────┘
```
## 공용 모듈 — `web-backend/_shared/access_log.py`
```python
from collections import deque
from datetime import datetime
from fastapi import APIRouter, Request
from starlette.middleware.base import BaseHTTPMiddleware
import logging
import time
_BUFFER = deque(maxlen=500)
EXCLUDED_PATHS = {"/health", "/healthz", "/ping", "/favicon.ico",
"/docs", "/redoc", "/openapi.json", "/logs/recent"}
EXCLUDED_PREFIXES = ("/static/",)
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}
def _should_log(request: Request) -> bool:
if request.method in EXCLUDED_METHODS:
return False
path = request.url.path
if path in EXCLUDED_PATHS:
return False
if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
return False
return True
class AccessLogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.time()
response = await call_next(request)
if not _should_log(request):
return response
elapsed_ms = int((time.time() - start) * 1000)
status = response.status_code
_BUFFER.append({
"ts": datetime.utcnow().isoformat() + "Z",
"level": "info" if status < 400 else
"warning" if status < 500 else "error",
"source": "access",
"method": request.method,
"path": request.url.path,
"status": status,
"ms": elapsed_ms,
"message": f"{request.method} {request.url.path}{status} ({elapsed_ms}ms)",
})
return response
class BufferLogHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
try:
_BUFFER.append({
"ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
"level": record.levelname.lower(),
"source": "log",
"logger": record.name,
"message": record.getMessage(),
})
except Exception:
pass
router = APIRouter()
@router.get("/logs/recent")
def logs_recent(limit: int = 200, since: str | None = None,
path_prefix: str | None = None):
items = list(_BUFFER)
if since:
items = [x for x in items if x["ts"] > since]
if path_prefix:
items = [x for x in items
if x["source"] == "log" or x.get("path", "").startswith(path_prefix)]
return {"logs": items[-limit:]}
def install(app, logger_root: str = ""):
"""서비스 main.py 가 호출하는 단일 설치 함수."""
app.add_middleware(AccessLogMiddleware)
app.include_router(router)
logging.getLogger(logger_root).addHandler(BufferLogHandler())
```
### 각 서비스 main.py 적용
```python
from _shared.access_log import install as install_access_log
install_access_log(app)
```
## docker-compose 변경
5개 서비스 (`lotto-backend`, `stock`, `music-lab`, `insta-lab`, `realestate-lab`) 에 동일 패턴 추가:
```yaml
environment:
- PYTHONPATH=/app:/shared
volumes:
- ../_shared:/shared:ro
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
```
`/logs/recent`**nginx default.conf 의 public location 블록에 추가하지 않는다**. 내부 docker 네트워크에서 `http://{container_name}:{port}/logs/recent` 로만 접근.
## agent-office 측 변경
### `app/constants.py`
```python
AGENT_CONTAINER_MAP = {
"stock": ("stock", 8000, r"^/api/(stock|trade|portfolio)"),
"music": ("music-lab", 8000, r"^/api/music"),
"insta": ("insta-lab", 8000, r"^/api/insta"),
"realestate": ("realestate-lab", 8000, r"^/api/realestate"),
"lotto": ("lotto-backend", 8000, r"^/api/lotto"),
}
```
### `app/service_proxy.py`
```python
async def fetch_service_logs(agent_id: str, since: str | None = None,
limit: int = 200) -> list[dict]:
mapping = AGENT_CONTAINER_MAP.get(agent_id)
if not mapping:
return []
host, port, path_re = mapping
url = f"http://{host}:{port}/logs/recent"
params = {"limit": limit}
if since:
params["since"] = since
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(url, params=params)
data = resp.json().get("logs", [])
except Exception as e:
logger.warning("fetch_service_logs(%s) 실패: %s", agent_id, e)
return []
# path_prefix 필터: access 로그만 path_re 검증
return [x for x in data if x["source"] == "log"
or re.match(path_re, x.get("path", ""))]
```
### `app/db.py`
```python
def get_logs(agent_id: str, limit: int = 50) -> list[dict]:
# 'State: ...' 자동 로그 제외 (사용자 요청)
rows = conn.execute("""
SELECT * FROM agent_logs
WHERE agent_id=?
AND message NOT LIKE 'State: %'
ORDER BY created_at DESC LIMIT ?
""", (agent_id, limit)).fetchall()
return [...]
def delete_old_logs(days: int = 90) -> int:
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
with _conn() as conn:
c = conn.execute("DELETE FROM agent_logs WHERE created_at < ?", (cutoff,))
return c.rowcount
```
### `app/main.py`
```python
@app.get("/api/agent-office/agents/{agent_id}/logs")
async def agent_logs(agent_id: str, limit: int = 50):
agent_items = get_logs(agent_id, limit=limit)
service_items = await fetch_service_logs(agent_id, limit=limit)
merged = sorted(agent_items + service_items,
key=lambda x: x.get("ts") or x.get("created_at"),
reverse=True)[:limit]
return {"logs": merged}
```
### `app/agents/base.py`
```python
async def transition(self, new_state, detail="", task_id=None):
# add_log(... "State: ...") 호출 삭제 — 사용자 요청
...
# ws_manager 알림은 유지
```
### `app/scheduler.py`
```python
scheduler.add_job(
lambda: delete_old_logs(days=90),
CronTrigger(hour=3, minute=0),
id="cleanup_old_logs",
)
```
## web-ui 측 변경
### `src/pages/agent-office/components/LogTab.jsx`
- log row schema 가 두 가지로 늘어남: agent_logs `{level, message, created_at}` vs service `{ts, level, source, method, path, status, ms, message}`.
- source 뱃지를 추가로 표시: `[ACCESS]` / `[LOG]` / `[AGENT]`.
- access 로그는 method + path + status + ms 를 보조 라인으로 표시.
색상 가이드:
- `source=access` 청록 (#5eead4)
- `source=log` 파랑 (#60a5fa)
- `level=warning` 노랑 (#fbbf24)
- `level=error` 빨강 (#ef4444)
- `source=agent` (agent_logs) 회색 (#9ca3af)
## Phase 분리
대규모 변경이라 단일 PR 위험. 3단계로 나눠 진행.
### Phase 1 — PoC (가장 우선)
1. `web-backend/_shared/access_log.py` 신설.
2. `web-backend/lotto/app/main.py` 한 곳에만 `install_access_log(app)` 추가.
3. `web-backend/docker-compose.yml``lotto-backend` 서비스에 PYTHONPATH + volume + logging 추가.
4. `agent-office``service_proxy.fetch_service_logs()` + `AGENT_CONTAINER_MAP` (lotto 만) + `get_logs(agent_id)` merge.
5. `LogTab.jsx` 가 source 뱃지를 표시하도록 확장.
6. base.py `State: ...` 자동 로그 제거 + `db.get_logs()` NOT LIKE 필터 추가.
검증: `/agent-office` 에서 lotto 에이전트 선택 → LogTab 에 `POST /api/lotto/...` 한 줄과 기존 logger.info 출력이 같이 보이는지.
### Phase 2 — 4개 서비스 확장
1. stock / music-lab / insta-lab / realestate-lab 의 `main.py``install_access_log(app)` 추가.
2. docker-compose 4개 서비스 동일 패턴 적용.
3. `AGENT_CONTAINER_MAP` 에 4개 매핑 추가.
4. `delete_old_logs` cleanup job 등록.
검증: 5개 에이전트 모두 LogTab 에서 의미 있는 로그 노출.
### Phase 3 — 비즈니스 이벤트 보강
디자인 4/5 의 "추가 권장" 표 항목들을 `logger.info(...)` 한 줄씩 추가. 약 1015줄.
- stock: Order 응답, AI Coach 호출, 스크리너 결과
- music-lab: 생성 시작/완료
- insta-lab: 키워드 추출 완료, 슬레이트 생성 완료, 발행 결과
- lotto-backend: AI 큐레이터 호출/응답, 점수 계산 완료
## 알려진 위험과 완화
| 위험 | 완화 |
|---|---|
| `/logs/recent` 가 외부로 노출되면 access pattern + 내부 동작 노출 | nginx public location 에 등재하지 않음 + 내부 docker 네트워크만 |
| 각 서비스의 logger 가 propagate 설정이 달라 BufferLogHandler 에 안 흐를 가능성 | `install()` 에서 `logging.getLogger("")` (root) 에 핸들러 등록 — 모든 child logger 가 자동 전파 |
| BufferLogHandler 의 `emit()` 가 다른 핸들러의 포맷팅에 영향 | `Handler.emit` 만 override, formatter 사용 안 함 |
| ring buffer 가 0.5초당 수십 건 트래픽으로 가득 차서 30초 분량밖에 안 남음 | 500개는 평소 트래픽 기준 1시간 이상 보관. 모니터링하다 부족하면 1000 으로 상향 |
| `lotto-backend` 컨테이너의 personal/blog/todo API 가 lotto 에이전트 로그에 섞임 | `AGENT_CONTAINER_MAP` 의 path_prefix 정규식으로 `/api/lotto` 만 매칭 — 다른 prefix 는 자연스럽게 필터 |
| docker-compose volume `../_shared:/shared:ro` 가 NAS 운영 환경에서 경로 차이로 깨질 가능성 | repo 의 상대경로 (`../_shared`) 는 NAS 의 `/volume1/docker/webpage/backend/_shared` 와 동일 구조로 git pull 됨. Gitea webhook 으로 push 되는 경로에 `_shared/` 디렉토리도 함께 포함됨을 deployer rsync 시 검증 |
## 변경 파일 요약
```
■ 신설
web-backend/_shared/__init__.py
web-backend/_shared/access_log.py
■ web-backend
lotto/app/main.py + install_access_log + 추가 logger.info 34개 (Phase 3)
stock/app/main.py + install_access_log + 추가 logger.info 3개 (Phase 3)
music-lab/app/main.py + install_access_log + 추가 logger.info 2개 (Phase 3)
insta-lab/app/main.py + install_access_log + 추가 logger.info 3개 (Phase 3)
realestate-lab/app/main.py + install_access_log (Phase 3 추가 없음)
docker-compose.yml 5개 서비스 PYTHONPATH/volume/logging 추가
■ web-backend/agent-office
app/service_proxy.py + fetch_service_logs(agent_id, ...)
app/main.py agent_logs 엔드포인트가 merge 사용
app/db.py + delete_old_logs + get_logs NOT LIKE 'State: %'
app/scheduler.py + 매일 03:00 cleanup job
app/agents/base.py transition() 의 add_log('State: ...') 제거
app/constants.py + AGENT_CONTAINER_MAP
■ web-ui
src/pages/agent-office/components/LogTab.jsx
source 뱃지 + access 로그 method/status/ms 표시
```

View File

@@ -5,6 +5,7 @@ from typing import Optional, List, Dict, Any, Tuple
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from _shared.access_log import install as install_access_log
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
logger = logging.getLogger("lotto-backend") logger = logging.getLogger("lotto-backend")
@@ -49,6 +50,7 @@ from .routers import review as review_router
from .jobs.grade_weekly_review import run_for_latest as grade_run_for_latest from .jobs.grade_weekly_review import run_for_latest as grade_run_for_latest
app = FastAPI() app = FastAPI()
install_access_log(app)
app.include_router(curator_router.router) app.include_router(curator_router.router)
app.include_router(briefing_router.router) app.include_router(briefing_router.router)
app.include_router(review_router.router) app.include_router(review_router.router)

View File

@@ -370,6 +370,7 @@ server {
} }
# tarot-lab API (agent-office에서 분리) # tarot-lab API (agent-office에서 분리)
# Claude Sonnet 3-card 해석은 30~90초 + truncation reroll 시 추가. 600s 안전 마진.
location /api/tarot/ { location /api/tarot/ {
resolver 127.0.0.11 valid=10s; resolver 127.0.0.11 valid=10s;
set $tarot_backend tarot-lab:8000; set $tarot_backend tarot-lab:8000;
@@ -378,8 +379,8 @@ server {
proxy_set_header Host $host; proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 300s; proxy_read_timeout 600s;
proxy_send_timeout 300s; proxy_send_timeout 600s;
proxy_connect_timeout 60s; proxy_connect_timeout 60s;
proxy_pass http://$tarot_backend$request_uri; proxy_pass http://$tarot_backend$request_uri;
} }