Compare commits
11 Commits
4ee4a1ae7d
...
4224333219
| Author | SHA1 | Date | |
|---|---|---|---|
| 4224333219 | |||
| 5613497367 | |||
| b25abea80a | |||
| ed30790f22 | |||
| 1d723764b4 | |||
| c0c4422c7c | |||
| fe4d3912a5 | |||
| f461f05ac0 | |||
| dfd3b1bb17 | |||
| 809eec9b15 | |||
| 512ed59dcd |
1
_shared/__init__.py
Normal file
1
_shared/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# empty
|
||||
112
_shared/access_log.py
Normal file
112
_shared/access_log.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""각 lab 컨테이너에서 import 하는 공용 액세스/이벤트 로그 모듈.
|
||||
|
||||
사용법:
|
||||
from _shared.access_log import install as install_access_log
|
||||
install_access_log(app)
|
||||
"""
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
import logging
|
||||
import time
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.applications import FastAPI
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
|
||||
# 컨테이너당 최근 500개를 in-memory 로 유지. 재시작 시 휘발.
|
||||
_BUFFER: deque = deque(maxlen=500)
|
||||
|
||||
EXCLUDED_PATHS = {
|
||||
"/health", "/healthz", "/ping", "/favicon.ico",
|
||||
"/docs", "/redoc", "/openapi.json", "/logs/recent",
|
||||
}
|
||||
EXCLUDED_PREFIXES = ("/static/",)
|
||||
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}
|
||||
|
||||
|
||||
def _should_log(request: Request) -> bool:
|
||||
if request.method in EXCLUDED_METHODS:
|
||||
return False
|
||||
path = request.url.path
|
||||
if path in EXCLUDED_PATHS:
|
||||
return False
|
||||
if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class AccessLogMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request, call_next):
|
||||
start = time.time()
|
||||
response = await call_next(request)
|
||||
if not _should_log(request):
|
||||
return response
|
||||
elapsed_ms = int((time.time() - start) * 1000)
|
||||
status = response.status_code
|
||||
if status < 400:
|
||||
level = "info"
|
||||
elif status < 500:
|
||||
level = "warning"
|
||||
else:
|
||||
level = "error"
|
||||
_BUFFER.append({
|
||||
"ts": datetime.utcnow().isoformat() + "Z",
|
||||
"level": level,
|
||||
"source": "access",
|
||||
"method": request.method,
|
||||
"path": request.url.path,
|
||||
"status": status,
|
||||
"ms": elapsed_ms,
|
||||
"message": f"{request.method} {request.url.path} → {status} ({elapsed_ms}ms)",
|
||||
})
|
||||
return response
|
||||
|
||||
|
||||
class BufferLogHandler(logging.Handler):
|
||||
"""root logger 에 부착하면 모든 logger.info/warning/error 가 buffer 에 흐름."""
|
||||
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
try:
|
||||
_BUFFER.append({
|
||||
"ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
|
||||
"level": record.levelname.lower(),
|
||||
"source": "log",
|
||||
"logger": record.name,
|
||||
"message": record.getMessage(),
|
||||
})
|
||||
except Exception:
|
||||
# buffer 에 못 넣는다고 서비스가 죽으면 안 됨
|
||||
pass
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/logs/recent")
|
||||
def logs_recent(limit: int = 200, since: Optional[str] = None,
|
||||
path_prefix: Optional[str] = None):
|
||||
items = list(_BUFFER)
|
||||
if since:
|
||||
items = [x for x in items if x["ts"] > since]
|
||||
if path_prefix:
|
||||
items = [
|
||||
x for x in items
|
||||
if x["source"] == "log"
|
||||
or x.get("path", "").startswith(path_prefix)
|
||||
]
|
||||
return {"logs": items[-limit:]}
|
||||
|
||||
|
||||
def install(app: FastAPI, logger_root: str = "") -> None:
|
||||
"""서비스 main.py 에서 호출하는 단일 설치 함수.
|
||||
|
||||
- AccessLogMiddleware 등록
|
||||
- /logs/recent 라우터 등록
|
||||
- root logger 에 BufferLogHandler 부착 (모든 child logger 자동 전파)
|
||||
"""
|
||||
app.add_middleware(AccessLogMiddleware)
|
||||
app.include_router(router)
|
||||
root = logging.getLogger(logger_root)
|
||||
if not any(isinstance(h, BufferLogHandler) for h in root.handlers):
|
||||
root.addHandler(BufferLogHandler())
|
||||
0
_shared/tests/__init__.py
Normal file
0
_shared/tests/__init__.py
Normal file
129
_shared/tests/test_access_log.py
Normal file
129
_shared/tests/test_access_log.py
Normal file
@@ -0,0 +1,129 @@
|
||||
import logging
|
||||
import time
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from _shared.access_log import (
|
||||
AccessLogMiddleware,
|
||||
BufferLogHandler,
|
||||
router as logs_router,
|
||||
install,
|
||||
_BUFFER,
|
||||
)
|
||||
|
||||
|
||||
def _reset_buffer():
|
||||
_BUFFER.clear()
|
||||
|
||||
|
||||
def test_access_middleware_records_request():
|
||||
_reset_buffer()
|
||||
app = FastAPI()
|
||||
app.add_middleware(AccessLogMiddleware)
|
||||
|
||||
@app.get("/api/lotto/recommend")
|
||||
def recommend():
|
||||
return {"ok": True}
|
||||
|
||||
client = TestClient(app)
|
||||
client.get("/api/lotto/recommend")
|
||||
|
||||
items = [x for x in _BUFFER if x["source"] == "access"]
|
||||
assert len(items) == 1
|
||||
assert items[0]["method"] == "GET"
|
||||
assert items[0]["path"] == "/api/lotto/recommend"
|
||||
assert items[0]["status"] == 200
|
||||
assert items[0]["ms"] >= 0
|
||||
|
||||
|
||||
def test_access_middleware_skips_health():
|
||||
_reset_buffer()
|
||||
app = FastAPI()
|
||||
app.add_middleware(AccessLogMiddleware)
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
return {"ok": True}
|
||||
|
||||
client = TestClient(app)
|
||||
client.get("/health")
|
||||
|
||||
items = [x for x in _BUFFER if x["source"] == "access"]
|
||||
assert items == []
|
||||
|
||||
|
||||
def test_access_middleware_skips_options():
|
||||
_reset_buffer()
|
||||
app = FastAPI()
|
||||
app.add_middleware(AccessLogMiddleware)
|
||||
|
||||
@app.get("/api/lotto/recommend")
|
||||
def recommend():
|
||||
return {"ok": True}
|
||||
|
||||
client = TestClient(app)
|
||||
client.options("/api/lotto/recommend")
|
||||
|
||||
items = [x for x in _BUFFER if x["source"] == "access"]
|
||||
assert items == []
|
||||
|
||||
|
||||
def test_buffer_log_handler_captures_logger_info():
|
||||
_reset_buffer()
|
||||
root = logging.getLogger("")
|
||||
handler = BufferLogHandler()
|
||||
root.addHandler(handler)
|
||||
try:
|
||||
lg = logging.getLogger("lotto.test")
|
||||
lg.setLevel(logging.INFO)
|
||||
lg.info("뉴스 스크래핑 완료: 국내 12건")
|
||||
finally:
|
||||
root.removeHandler(handler)
|
||||
|
||||
items = [x for x in _BUFFER if x["source"] == "log"]
|
||||
assert len(items) == 1
|
||||
assert items[0]["message"] == "뉴스 스크래핑 완료: 국내 12건"
|
||||
assert items[0]["level"] == "info"
|
||||
assert items[0]["logger"] == "lotto.test"
|
||||
|
||||
|
||||
def test_logs_recent_endpoint_returns_recent_items():
|
||||
_reset_buffer()
|
||||
app = FastAPI()
|
||||
install(app)
|
||||
|
||||
@app.get("/api/lotto/recommend")
|
||||
def recommend():
|
||||
return {"ok": True}
|
||||
|
||||
client = TestClient(app)
|
||||
client.get("/api/lotto/recommend")
|
||||
client.get("/api/lotto/recommend")
|
||||
client.get("/health") # 제외되어야 함
|
||||
|
||||
resp = client.get("/logs/recent")
|
||||
assert resp.status_code == 200
|
||||
logs = resp.json()["logs"]
|
||||
access_items = [x for x in logs if x["source"] == "access"]
|
||||
assert len(access_items) == 2
|
||||
|
||||
|
||||
def test_logs_recent_with_since_filter():
|
||||
_reset_buffer()
|
||||
app = FastAPI()
|
||||
install(app)
|
||||
|
||||
@app.get("/api/lotto/recommend")
|
||||
def recommend():
|
||||
return {"ok": True}
|
||||
|
||||
client = TestClient(app)
|
||||
client.get("/api/lotto/recommend")
|
||||
time.sleep(0.01)
|
||||
cursor_resp = client.get("/logs/recent")
|
||||
cursor_ts = cursor_resp.json()["logs"][-1]["ts"]
|
||||
client.get("/api/lotto/recommend")
|
||||
|
||||
resp = client.get(f"/logs/recent?since={cursor_ts}")
|
||||
items = [x for x in resp.json()["logs"] if x["source"] == "access"]
|
||||
assert len(items) == 1
|
||||
@@ -1,8 +1,6 @@
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from ..db import add_log
|
||||
|
||||
VALID_STATES = ("idle", "working", "waiting", "reporting")
|
||||
|
||||
class BaseAgent:
|
||||
@@ -29,8 +27,6 @@ class BaseAgent:
|
||||
if new_state == "idle":
|
||||
self._idle_since = time.time()
|
||||
|
||||
add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})")
|
||||
|
||||
if self._ws_manager:
|
||||
await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id)
|
||||
if new_state == "working" and old != "working":
|
||||
|
||||
@@ -38,3 +38,17 @@ LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9"))
|
||||
LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25"))
|
||||
LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6"))
|
||||
LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3"))
|
||||
|
||||
import re as _re
|
||||
|
||||
# 에이전트 → (container_host, port, path_prefix_regex)
|
||||
# path_prefix_regex: lotto 컨테이너에 personal/blog/todo 도 같이 있어
|
||||
# /api/lotto 만 골라내기 위한 정규식. business log (source='log') 는 모두 통과.
|
||||
AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = {
|
||||
"lotto": ("lotto", 8000, _re.compile(r"^/api/lotto")),
|
||||
# Phase 2 에서 추가:
|
||||
# "stock": ("stock", 8000, _re.compile(r"^/api/(stock|trade|portfolio)")),
|
||||
# "music": ("music-lab", 8000, _re.compile(r"^/api/music")),
|
||||
# "insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")),
|
||||
# "realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),
|
||||
}
|
||||
|
||||
@@ -321,7 +321,13 @@ def add_log(agent_id: str, message: str, level: str = "info", task_id: str = Non
|
||||
def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
|
||||
with _conn() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM agent_logs WHERE agent_id=? ORDER BY created_at DESC LIMIT ?",
|
||||
"""
|
||||
SELECT * FROM agent_logs
|
||||
WHERE agent_id = ?
|
||||
AND message NOT LIKE 'State: %'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(agent_id, limit),
|
||||
).fetchall()
|
||||
return [
|
||||
@@ -332,6 +338,7 @@ def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
|
||||
"level": r["level"],
|
||||
"message": r["message"],
|
||||
"created_at": r["created_at"],
|
||||
"source": "agent",
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
@@ -588,6 +595,20 @@ def get_activity_feed(limit: int = 50, offset: int = 0) -> dict:
|
||||
return {"items": items, "total": total}
|
||||
|
||||
|
||||
import datetime as _dt
|
||||
|
||||
|
||||
def delete_old_logs(days: int = 90) -> int:
|
||||
"""retention 정책: N일 이전 agent_logs 삭제. 매일 03:00 스케줄러가 호출."""
|
||||
cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=days)).isoformat()
|
||||
with _conn() as conn:
|
||||
c = conn.execute(
|
||||
"DELETE FROM agent_logs WHERE created_at < ?",
|
||||
(cutoff,),
|
||||
)
|
||||
return c.rowcount
|
||||
|
||||
|
||||
# ── youtube_research_jobs CRUD ────────────────────────────────────────────────
|
||||
|
||||
def add_youtube_research_job(countries: list) -> int:
|
||||
|
||||
@@ -116,8 +116,18 @@ def agent_tasks(
|
||||
return {"tasks": tasks_list, "items": tasks_list}
|
||||
|
||||
@app.get("/api/agent-office/agents/{agent_id}/logs")
|
||||
def agent_logs(agent_id: str, limit: int = 50):
|
||||
return {"logs": get_logs(agent_id, limit)}
|
||||
async def agent_logs(agent_id: str, limit: int = 50):
|
||||
from .service_proxy import fetch_service_logs
|
||||
|
||||
agent_items = get_logs(agent_id, limit=limit)
|
||||
service_items = await fetch_service_logs(agent_id, limit=limit)
|
||||
|
||||
def _sort_key(x):
|
||||
# agent_logs: created_at, service: ts
|
||||
return x.get("ts") or x.get("created_at") or ""
|
||||
|
||||
merged = sorted(agent_items + service_items, key=_sort_key, reverse=True)
|
||||
return {"logs": merged[:limit]}
|
||||
|
||||
@app.get("/api/agent-office/tasks/pending")
|
||||
def pending_tasks():
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import httpx
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from .config import STOCK_URL, MUSIC_LAB_URL, INSTA_LAB_URL, REALESTATE_LAB_URL
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_client = httpx.AsyncClient(timeout=30.0)
|
||||
|
||||
async def fetch_stock_news(limit: int = 10, category: str = None) -> List[Dict[str, Any]]:
|
||||
@@ -394,3 +397,38 @@ async def lotto_evolver_evaluate() -> Dict[str, Any]:
|
||||
resp = await client.post(f"{LOTTO_BACKEND_URL}/api/lotto/evolver/evaluate-now")
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
from .config import AGENT_CONTAINER_MAP
|
||||
|
||||
|
||||
async def fetch_service_logs(
|
||||
agent_id: str,
|
||||
since: Optional[str] = None,
|
||||
limit: int = 200,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""해당 에이전트가 가리키는 컨테이너의 /logs/recent 를 호출해서
|
||||
path_prefix 정규식으로 필터한 결과를 반환.
|
||||
|
||||
네트워크 실패 시 빈 리스트를 반환하고 warning 만 남김 (LogTab 이 죽지 않게).
|
||||
"""
|
||||
mapping = AGENT_CONTAINER_MAP.get(agent_id)
|
||||
if not mapping:
|
||||
return []
|
||||
host, port, path_re = mapping
|
||||
url = f"http://{host}:{port}/logs/recent"
|
||||
params: Dict[str, Any] = {"limit": limit}
|
||||
if since:
|
||||
params["since"] = since
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=3.0) as client:
|
||||
resp = await client.get(url, params=params)
|
||||
data = resp.json().get("logs", [])
|
||||
except Exception as e:
|
||||
logger.warning("fetch_service_logs(%s) 실패: %s", agent_id, e)
|
||||
return []
|
||||
return [
|
||||
x for x in data
|
||||
if x.get("source") == "log"
|
||||
or path_re.match(x.get("path", "") or "")
|
||||
]
|
||||
|
||||
@@ -93,6 +93,41 @@ def test_telegram_state():
|
||||
print(" [PASS] test_telegram_state")
|
||||
|
||||
|
||||
def test_get_logs_excludes_state_messages():
|
||||
init_db()
|
||||
add_log("stock", "State: idle -> working (큐레이션 시작)")
|
||||
add_log("stock", "뉴스 12건 스크랩 완료")
|
||||
add_log("stock", "State: working -> idle ()")
|
||||
|
||||
logs = get_logs("stock", limit=10)
|
||||
messages = [x["message"] for x in logs]
|
||||
assert "뉴스 12건 스크랩 완료" in messages
|
||||
assert not any(m.startswith("State: ") for m in messages)
|
||||
|
||||
|
||||
def test_delete_old_logs_removes_beyond_retention():
|
||||
import datetime as _dt
|
||||
from app.db import delete_old_logs, _conn
|
||||
|
||||
init_db()
|
||||
add_log("stock", "오래된 로그")
|
||||
# 강제로 200일 전으로 옮김
|
||||
cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=200)).isoformat()
|
||||
with _conn() as conn:
|
||||
conn.execute(
|
||||
"UPDATE agent_logs SET created_at = ? WHERE message = '오래된 로그'",
|
||||
(cutoff,),
|
||||
)
|
||||
|
||||
add_log("stock", "최근 로그")
|
||||
deleted = delete_old_logs(days=90)
|
||||
assert deleted >= 1
|
||||
|
||||
msgs = [x["message"] for x in get_logs("stock", limit=20)]
|
||||
assert "최근 로그" in msgs
|
||||
assert "오래된 로그" not in msgs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_init_and_seed()
|
||||
test_agent_config_update()
|
||||
|
||||
47
agent-office/tests/test_log_merge.py
Normal file
47
agent-office/tests/test_log_merge.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import pytest
|
||||
import respx
|
||||
import httpx
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.main import app
|
||||
from app.db import add_log, _conn
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_logs():
|
||||
with _conn() as conn:
|
||||
conn.execute("DELETE FROM agent_logs WHERE agent_id = 'lotto'")
|
||||
yield
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_agent_logs_endpoint_merges_db_and_service_logs():
|
||||
add_log("lotto", "큐레이션 완료: #1234 conf=0.78")
|
||||
respx.get("http://lotto:8000/logs/recent").mock(
|
||||
return_value=httpx.Response(200, json={
|
||||
"logs": [
|
||||
{"ts": "2026-05-28T10:00:00Z", "source": "access",
|
||||
"method": "GET", "path": "/api/lotto/latest",
|
||||
"status": 200, "ms": 8,
|
||||
"message": "GET /api/lotto/latest → 200 (8ms)"},
|
||||
{"ts": "2026-05-28T10:00:02Z", "source": "log",
|
||||
"logger": "lotto", "level": "info",
|
||||
"message": "성과 통계 캐시 갱신"},
|
||||
]
|
||||
})
|
||||
)
|
||||
|
||||
client = TestClient(app)
|
||||
resp = client.get("/api/agent-office/agents/lotto/logs?limit=20")
|
||||
assert resp.status_code == 200
|
||||
logs = resp.json()["logs"]
|
||||
|
||||
sources = {x["source"] for x in logs}
|
||||
assert "agent" in sources
|
||||
assert "access" in sources
|
||||
assert "log" in sources
|
||||
|
||||
messages = [x["message"] for x in logs]
|
||||
assert any("큐레이션 완료" in m for m in messages)
|
||||
assert any("성과 통계 캐시 갱신" in m for m in messages)
|
||||
assert any("/api/lotto/latest" in m for m in messages)
|
||||
53
agent-office/tests/test_service_proxy_logs.py
Normal file
53
agent-office/tests/test_service_proxy_logs.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import pytest
|
||||
import respx
|
||||
import httpx
|
||||
|
||||
from app.service_proxy import fetch_service_logs
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@respx.mock
|
||||
async def test_fetch_service_logs_filters_by_path_prefix():
|
||||
# lotto 컨테이너 응답: lotto + personal 섞임
|
||||
respx.get("http://lotto:8000/logs/recent").mock(
|
||||
return_value=httpx.Response(200, json={
|
||||
"logs": [
|
||||
{"ts": "2026-05-28T10:00:00Z", "source": "access",
|
||||
"method": "GET", "path": "/api/lotto/recommend",
|
||||
"status": 200, "ms": 12,
|
||||
"message": "GET /api/lotto/recommend → 200 (12ms)"},
|
||||
{"ts": "2026-05-28T10:00:01Z", "source": "access",
|
||||
"method": "GET", "path": "/api/blog/posts",
|
||||
"status": 200, "ms": 5,
|
||||
"message": "GET /api/blog/posts → 200 (5ms)"},
|
||||
{"ts": "2026-05-28T10:00:02Z", "source": "log",
|
||||
"logger": "lotto", "level": "info",
|
||||
"message": "성과 통계 캐시 갱신"},
|
||||
]
|
||||
})
|
||||
)
|
||||
|
||||
result = await fetch_service_logs("lotto", limit=50)
|
||||
# lotto path 와 모든 log 이벤트만 통과
|
||||
paths = [x.get("path") for x in result]
|
||||
assert "/api/lotto/recommend" in paths
|
||||
assert "/api/blog/posts" not in paths
|
||||
# 비즈니스 로그도 포함
|
||||
assert any(x["source"] == "log" and x["message"] == "성과 통계 캐시 갱신"
|
||||
for x in result)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fetch_service_logs_unknown_agent_returns_empty():
|
||||
result = await fetch_service_logs("nonexistent", limit=50)
|
||||
assert result == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@respx.mock
|
||||
async def test_fetch_service_logs_handles_connection_error():
|
||||
respx.get("http://lotto:8000/logs/recent").mock(
|
||||
side_effect=httpx.ConnectError("connection refused")
|
||||
)
|
||||
result = await fetch_service_logs("lotto", limit=50)
|
||||
assert result == []
|
||||
@@ -14,8 +14,15 @@ services:
|
||||
- TZ=${TZ:-Asia/Seoul}
|
||||
- LOTTO_ALL_URL=${LOTTO_ALL_URL:-https://smok95.github.io/lotto/results/all.json}
|
||||
- LOTTO_LATEST_URL=${LOTTO_LATEST_URL:-https://smok95.github.io/lotto/results/latest.json}
|
||||
- PYTHONPATH=/app:/shared
|
||||
volumes:
|
||||
- ${RUNTIME_PATH}/data:/app/data
|
||||
- ./_shared:/shared:ro
|
||||
logging:
|
||||
driver: "json-file"
|
||||
options:
|
||||
max-size: "10m"
|
||||
max-file: "3"
|
||||
healthcheck:
|
||||
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
|
||||
interval: 60s
|
||||
|
||||
1616
docs/superpowers/plans/2026-05-28-agent-office-docker-logs.md
Normal file
1616
docs/superpowers/plans/2026-05-28-agent-office-docker-logs.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,362 @@
|
||||
# Agent Office — Docker 로그 기반 통합 타임라인 설계
|
||||
|
||||
> 작성일: 2026-05-28
|
||||
> 대상: web-backend (5개 lab + agent-office) + web-ui (LogTab)
|
||||
|
||||
## 배경
|
||||
|
||||
`/agent-office` 의 각 에이전트 상세 패널에 노출되는 **로그 탭** 이 현재는 의미가 빈약하다.
|
||||
- 노출 소스는 `agent-office` 의 자체 SQLite `agent_logs` 테이블 한 곳뿐.
|
||||
- `base.py BaseAgent.transition()` 가 매번 `State: idle -> working ({detail})` 형식 자동 로그를 기록 — 사용자가 실제로 무슨 일이 일어났는지 파악하기 어려운 노이즈가 다수.
|
||||
- 각 에이전트가 실제로 호출하는 외부 서비스 컨테이너 (lotto / stock / music-lab / insta-lab / realestate-lab) 의 docker stdout 은 LogTab 에 한 줄도 흐르지 않는다.
|
||||
|
||||
따라서 LogTab 에서는 “이 에이전트가 어떤 API 를 불러서 어떤 응답을 받았는지” “외부 서비스에서 어떤 비즈니스 이벤트가 발생했는지” 가 보이지 않는다.
|
||||
|
||||
## 목표
|
||||
|
||||
1. 각 에이전트 LogTab 에 **해당 서비스 컨테이너의 의미 있는 docker 로그** 를 흘려보낸다.
|
||||
2. healthcheck / static / OPTIONS 같은 노이즈 로그는 **서버 측에서 미리 차단** 한다.
|
||||
3. API 호출 한 줄 (`POST /api/lotto/recommend → 200 142ms`) 과 비즈니스 이벤트 (`수집 완료: new=12, total=340`) 양쪽 모두 표시한다.
|
||||
4. 에이전트 내부 동작 로그 (`agent_logs` DB) 와 서비스 로그를 **한 화면에 시간순으로 통합** 한다.
|
||||
5. `State: idle -> working` 형식 자동 transition 로그는 제거한다.
|
||||
|
||||
## 비목표
|
||||
|
||||
- 실시간 WebSocket push (지금은 5초 폴링이면 충분).
|
||||
- 컨테이너 외부 (NAS 호스트, Windows AI 서버) 로그 수집.
|
||||
- 로그 검색 / 필터 UI (당장은 단순 시간순 표시).
|
||||
- 다른 lab (image-lab / tarot-lab / saju-lab / packs-lab / video-lab) 은 1차 범위에서 제외 — 5개 활성 에이전트가 가리키는 5개 컨테이너만 다룬다.
|
||||
|
||||
## 결정사항 요약
|
||||
|
||||
| 항목 | 결정 |
|
||||
|---|---|
|
||||
| 수집 방식 | 각 서비스가 `/logs/recent` 엔드포인트 노출 + agent-office 가 polling |
|
||||
| 표시 방식 | 통합 타임라인 (agent 로그 + service 로그 시간순 merge) |
|
||||
| 로그 범위 | 액세스 로그 (healthcheck 제외) + 비즈니스 이벤트 (logger.info/warning/error) |
|
||||
| ring buffer 크기 | 컨테이너당 500개, in-memory deque |
|
||||
| docker logs retention | `max-size 10m × max-file 3` = 서비스당 30MB |
|
||||
| agent_logs DB retention | **90일** (매일 03:00 cleanup) |
|
||||
| state 자동 로그 | 제거 (`base.py BaseAgent.transition()` 의 `add_log("State: ...")`) |
|
||||
| 자동 수집 메커니즘 | Python `logging.Handler` 를 BufferLogHandler 로 등록 — 기존 logger.info/warning/error 호출이 자동으로 ring buffer 에 흐름 |
|
||||
|
||||
## 아키텍처
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ web-ui (LogTab) │
|
||||
│ ─ GET /api/agent-office/agents/{id}/logs?limit=N │
|
||||
│ ─ 5초 폴링 (기존 refreshTrigger 흐름 재활용) │
|
||||
│ ─ source 뱃지 표시 (access | log | agent) │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
▲
|
||||
│ 통합 타임라인 (시간순 merge)
|
||||
│
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ agent-office │
|
||||
│ - get_merged_logs(agent_id, limit) = │
|
||||
│ agent_logs (state 로그 제외) │
|
||||
│ + service_proxy.fetch_logs(container, path_prefix) │
|
||||
│ → ts 기준 정렬 → 최근 N개 │
|
||||
│ - 매핑: AGENT_CONTAINER_MAP │
|
||||
│ stock → ("stock", "/api/(stock|trade|portfolio)") │
|
||||
│ music → ("music-lab", "/api/music") │
|
||||
│ insta → ("insta-lab", "/api/insta") │
|
||||
│ realestate → ("realestate-lab", "/api/realestate") │
|
||||
│ lotto → ("lotto-backend", "/api/lotto") │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
▲
|
||||
│ GET http://{container}:{port}/logs/recent
|
||||
│ ?since=ISO&limit=N&path_prefix=...
|
||||
│ (내부 docker 네트워크 only, nginx public 라우팅 X)
|
||||
│
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ 각 서비스 컨테이너 (5개) │
|
||||
│ 공용 모듈 _shared/access_log.py: │
|
||||
│ - LogBuffer: collections.deque(maxlen=500) │
|
||||
│ - AccessLogMiddleware: 모든 요청 후 한 줄 기록 │
|
||||
│ 제외: /health /healthz /ping /favicon /docs /redoc │
|
||||
│ /openapi.json /logs/recent OPTIONS HEAD │
|
||||
│ - BufferLogHandler: logger.info/warning/error 자동 캡처 │
|
||||
│ - /logs/recent 라우터 │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## 공용 모듈 — `web-backend/_shared/access_log.py`
|
||||
|
||||
```python
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from fastapi import APIRouter, Request
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
import logging
|
||||
import time
|
||||
|
||||
_BUFFER = deque(maxlen=500)
|
||||
|
||||
EXCLUDED_PATHS = {"/health", "/healthz", "/ping", "/favicon.ico",
|
||||
"/docs", "/redoc", "/openapi.json", "/logs/recent"}
|
||||
EXCLUDED_PREFIXES = ("/static/",)
|
||||
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}
|
||||
|
||||
|
||||
def _should_log(request: Request) -> bool:
|
||||
if request.method in EXCLUDED_METHODS:
|
||||
return False
|
||||
path = request.url.path
|
||||
if path in EXCLUDED_PATHS:
|
||||
return False
|
||||
if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class AccessLogMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request, call_next):
|
||||
start = time.time()
|
||||
response = await call_next(request)
|
||||
if not _should_log(request):
|
||||
return response
|
||||
elapsed_ms = int((time.time() - start) * 1000)
|
||||
status = response.status_code
|
||||
_BUFFER.append({
|
||||
"ts": datetime.utcnow().isoformat() + "Z",
|
||||
"level": "info" if status < 400 else
|
||||
"warning" if status < 500 else "error",
|
||||
"source": "access",
|
||||
"method": request.method,
|
||||
"path": request.url.path,
|
||||
"status": status,
|
||||
"ms": elapsed_ms,
|
||||
"message": f"{request.method} {request.url.path} → {status} ({elapsed_ms}ms)",
|
||||
})
|
||||
return response
|
||||
|
||||
|
||||
class BufferLogHandler(logging.Handler):
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
try:
|
||||
_BUFFER.append({
|
||||
"ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
|
||||
"level": record.levelname.lower(),
|
||||
"source": "log",
|
||||
"logger": record.name,
|
||||
"message": record.getMessage(),
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/logs/recent")
|
||||
def logs_recent(limit: int = 200, since: str | None = None,
|
||||
path_prefix: str | None = None):
|
||||
items = list(_BUFFER)
|
||||
if since:
|
||||
items = [x for x in items if x["ts"] > since]
|
||||
if path_prefix:
|
||||
items = [x for x in items
|
||||
if x["source"] == "log" or x.get("path", "").startswith(path_prefix)]
|
||||
return {"logs": items[-limit:]}
|
||||
|
||||
|
||||
def install(app, logger_root: str = ""):
|
||||
"""서비스 main.py 가 호출하는 단일 설치 함수."""
|
||||
app.add_middleware(AccessLogMiddleware)
|
||||
app.include_router(router)
|
||||
logging.getLogger(logger_root).addHandler(BufferLogHandler())
|
||||
```
|
||||
|
||||
### 각 서비스 main.py 적용
|
||||
|
||||
```python
|
||||
from _shared.access_log import install as install_access_log
|
||||
install_access_log(app)
|
||||
```
|
||||
|
||||
## docker-compose 변경
|
||||
|
||||
5개 서비스 (`lotto-backend`, `stock`, `music-lab`, `insta-lab`, `realestate-lab`) 에 동일 패턴 추가:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- PYTHONPATH=/app:/shared
|
||||
volumes:
|
||||
- ../_shared:/shared:ro
|
||||
logging:
|
||||
driver: "json-file"
|
||||
options:
|
||||
max-size: "10m"
|
||||
max-file: "3"
|
||||
```
|
||||
|
||||
`/logs/recent` 는 **nginx default.conf 의 public location 블록에 추가하지 않는다**. 내부 docker 네트워크에서 `http://{container_name}:{port}/logs/recent` 로만 접근.
|
||||
|
||||
## agent-office 측 변경
|
||||
|
||||
### `app/constants.py`
|
||||
```python
|
||||
AGENT_CONTAINER_MAP = {
|
||||
"stock": ("stock", 8000, r"^/api/(stock|trade|portfolio)"),
|
||||
"music": ("music-lab", 8000, r"^/api/music"),
|
||||
"insta": ("insta-lab", 8000, r"^/api/insta"),
|
||||
"realestate": ("realestate-lab", 8000, r"^/api/realestate"),
|
||||
"lotto": ("lotto-backend", 8000, r"^/api/lotto"),
|
||||
}
|
||||
```
|
||||
|
||||
### `app/service_proxy.py`
|
||||
```python
|
||||
async def fetch_service_logs(agent_id: str, since: str | None = None,
|
||||
limit: int = 200) -> list[dict]:
|
||||
mapping = AGENT_CONTAINER_MAP.get(agent_id)
|
||||
if not mapping:
|
||||
return []
|
||||
host, port, path_re = mapping
|
||||
url = f"http://{host}:{port}/logs/recent"
|
||||
params = {"limit": limit}
|
||||
if since:
|
||||
params["since"] = since
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=3.0) as client:
|
||||
resp = await client.get(url, params=params)
|
||||
data = resp.json().get("logs", [])
|
||||
except Exception as e:
|
||||
logger.warning("fetch_service_logs(%s) 실패: %s", agent_id, e)
|
||||
return []
|
||||
# path_prefix 필터: access 로그만 path_re 검증
|
||||
return [x for x in data if x["source"] == "log"
|
||||
or re.match(path_re, x.get("path", ""))]
|
||||
```
|
||||
|
||||
### `app/db.py`
|
||||
```python
|
||||
def get_logs(agent_id: str, limit: int = 50) -> list[dict]:
|
||||
# 'State: ...' 자동 로그 제외 (사용자 요청)
|
||||
rows = conn.execute("""
|
||||
SELECT * FROM agent_logs
|
||||
WHERE agent_id=?
|
||||
AND message NOT LIKE 'State: %'
|
||||
ORDER BY created_at DESC LIMIT ?
|
||||
""", (agent_id, limit)).fetchall()
|
||||
return [...]
|
||||
|
||||
def delete_old_logs(days: int = 90) -> int:
|
||||
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
|
||||
with _conn() as conn:
|
||||
c = conn.execute("DELETE FROM agent_logs WHERE created_at < ?", (cutoff,))
|
||||
return c.rowcount
|
||||
```
|
||||
|
||||
### `app/main.py`
|
||||
```python
|
||||
@app.get("/api/agent-office/agents/{agent_id}/logs")
|
||||
async def agent_logs(agent_id: str, limit: int = 50):
|
||||
agent_items = get_logs(agent_id, limit=limit)
|
||||
service_items = await fetch_service_logs(agent_id, limit=limit)
|
||||
merged = sorted(agent_items + service_items,
|
||||
key=lambda x: x.get("ts") or x.get("created_at"),
|
||||
reverse=True)[:limit]
|
||||
return {"logs": merged}
|
||||
```
|
||||
|
||||
### `app/agents/base.py`
|
||||
```python
|
||||
async def transition(self, new_state, detail="", task_id=None):
|
||||
# add_log(... "State: ...") 호출 삭제 — 사용자 요청
|
||||
...
|
||||
# ws_manager 알림은 유지
|
||||
```
|
||||
|
||||
### `app/scheduler.py`
|
||||
```python
|
||||
scheduler.add_job(
|
||||
lambda: delete_old_logs(days=90),
|
||||
CronTrigger(hour=3, minute=0),
|
||||
id="cleanup_old_logs",
|
||||
)
|
||||
```
|
||||
|
||||
## web-ui 측 변경
|
||||
|
||||
### `src/pages/agent-office/components/LogTab.jsx`
|
||||
- log row schema 가 두 가지로 늘어남: agent_logs `{level, message, created_at}` vs service `{ts, level, source, method, path, status, ms, message}`.
|
||||
- source 뱃지를 추가로 표시: `[ACCESS]` / `[LOG]` / `[AGENT]`.
|
||||
- access 로그는 method + path + status + ms 를 보조 라인으로 표시.
|
||||
|
||||
색상 가이드:
|
||||
- `source=access` 청록 (#5eead4)
|
||||
- `source=log` 파랑 (#60a5fa)
|
||||
- `level=warning` 노랑 (#fbbf24)
|
||||
- `level=error` 빨강 (#ef4444)
|
||||
- `source=agent` (agent_logs) 회색 (#9ca3af)
|
||||
|
||||
## Phase 분리
|
||||
|
||||
대규모 변경이라 단일 PR 위험. 3단계로 나눠 진행.
|
||||
|
||||
### Phase 1 — PoC (가장 우선)
|
||||
1. `web-backend/_shared/access_log.py` 신설.
|
||||
2. `web-backend/lotto/app/main.py` 한 곳에만 `install_access_log(app)` 추가.
|
||||
3. `web-backend/docker-compose.yml` 의 `lotto-backend` 서비스에 PYTHONPATH + volume + logging 추가.
|
||||
4. `agent-office` 측 `service_proxy.fetch_service_logs()` + `AGENT_CONTAINER_MAP` (lotto 만) + `get_logs(agent_id)` merge.
|
||||
5. `LogTab.jsx` 가 source 뱃지를 표시하도록 확장.
|
||||
6. base.py `State: ...` 자동 로그 제거 + `db.get_logs()` NOT LIKE 필터 추가.
|
||||
|
||||
검증: `/agent-office` 에서 lotto 에이전트 선택 → LogTab 에 `POST /api/lotto/...` 한 줄과 기존 logger.info 출력이 같이 보이는지.
|
||||
|
||||
### Phase 2 — 4개 서비스 확장
|
||||
1. stock / music-lab / insta-lab / realestate-lab 의 `main.py` 에 `install_access_log(app)` 추가.
|
||||
2. docker-compose 4개 서비스 동일 패턴 적용.
|
||||
3. `AGENT_CONTAINER_MAP` 에 4개 매핑 추가.
|
||||
4. `delete_old_logs` cleanup job 등록.
|
||||
|
||||
검증: 5개 에이전트 모두 LogTab 에서 의미 있는 로그 노출.
|
||||
|
||||
### Phase 3 — 비즈니스 이벤트 보강
|
||||
디자인 4/5 의 "추가 권장" 표 항목들을 `logger.info(...)` 한 줄씩 추가. 약 10–15줄.
|
||||
- stock: Order 응답, AI Coach 호출, 스크리너 결과
|
||||
- music-lab: 생성 시작/완료
|
||||
- insta-lab: 키워드 추출 완료, 슬레이트 생성 완료, 발행 결과
|
||||
- lotto-backend: AI 큐레이터 호출/응답, 점수 계산 완료
|
||||
|
||||
## 알려진 위험과 완화
|
||||
|
||||
| 위험 | 완화 |
|
||||
|---|---|
|
||||
| `/logs/recent` 가 외부로 노출되면 access pattern + 내부 동작 노출 | nginx public location 에 등재하지 않음 + 내부 docker 네트워크만 |
|
||||
| 각 서비스의 logger 가 propagate 설정이 달라 BufferLogHandler 에 안 흐를 가능성 | `install()` 에서 `logging.getLogger("")` (root) 에 핸들러 등록 — 모든 child logger 가 자동 전파 |
|
||||
| BufferLogHandler 의 `emit()` 가 다른 핸들러의 포맷팅에 영향 | `Handler.emit` 만 override, formatter 사용 안 함 |
|
||||
| ring buffer 가 0.5초당 수십 건 트래픽으로 가득 차서 30초 분량밖에 안 남음 | 500개는 평소 트래픽 기준 1시간 이상 보관. 모니터링하다 부족하면 1000 으로 상향 |
|
||||
| `lotto-backend` 컨테이너의 personal/blog/todo API 가 lotto 에이전트 로그에 섞임 | `AGENT_CONTAINER_MAP` 의 path_prefix 정규식으로 `/api/lotto` 만 매칭 — 다른 prefix 는 자연스럽게 필터 |
|
||||
| docker-compose volume `../_shared:/shared:ro` 가 NAS 운영 환경에서 경로 차이로 깨질 가능성 | repo 의 상대경로 (`../_shared`) 는 NAS 의 `/volume1/docker/webpage/backend/_shared` 와 동일 구조로 git pull 됨. Gitea webhook 으로 push 되는 경로에 `_shared/` 디렉토리도 함께 포함됨을 deployer rsync 시 검증 |
|
||||
|
||||
## 변경 파일 요약
|
||||
|
||||
```
|
||||
■ 신설
|
||||
web-backend/_shared/__init__.py
|
||||
web-backend/_shared/access_log.py
|
||||
|
||||
■ web-backend
|
||||
lotto/app/main.py + install_access_log + 추가 logger.info 3–4개 (Phase 3)
|
||||
stock/app/main.py + install_access_log + 추가 logger.info 3개 (Phase 3)
|
||||
music-lab/app/main.py + install_access_log + 추가 logger.info 2개 (Phase 3)
|
||||
insta-lab/app/main.py + install_access_log + 추가 logger.info 3개 (Phase 3)
|
||||
realestate-lab/app/main.py + install_access_log (Phase 3 추가 없음)
|
||||
docker-compose.yml 5개 서비스 PYTHONPATH/volume/logging 추가
|
||||
|
||||
■ web-backend/agent-office
|
||||
app/service_proxy.py + fetch_service_logs(agent_id, ...)
|
||||
app/main.py agent_logs 엔드포인트가 merge 사용
|
||||
app/db.py + delete_old_logs + get_logs NOT LIKE 'State: %'
|
||||
app/scheduler.py + 매일 03:00 cleanup job
|
||||
app/agents/base.py transition() 의 add_log('State: ...') 제거
|
||||
app/constants.py + AGENT_CONTAINER_MAP
|
||||
|
||||
■ web-ui
|
||||
src/pages/agent-office/components/LogTab.jsx
|
||||
source 뱃지 + access 로그 method/status/ms 표시
|
||||
```
|
||||
@@ -5,6 +5,7 @@ from typing import Optional, List, Dict, Any, Tuple
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from _shared.access_log import install as install_access_log
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
|
||||
logger = logging.getLogger("lotto-backend")
|
||||
@@ -49,6 +50,7 @@ from .routers import review as review_router
|
||||
from .jobs.grade_weekly_review import run_for_latest as grade_run_for_latest
|
||||
|
||||
app = FastAPI()
|
||||
install_access_log(app)
|
||||
app.include_router(curator_router.router)
|
||||
app.include_router(briefing_router.router)
|
||||
app.include_router(review_router.router)
|
||||
|
||||
@@ -370,6 +370,7 @@ server {
|
||||
}
|
||||
|
||||
# tarot-lab API (agent-office에서 분리)
|
||||
# Claude Sonnet 3-card 해석은 30~90초 + truncation reroll 시 추가. 600s 안전 마진.
|
||||
location /api/tarot/ {
|
||||
resolver 127.0.0.11 valid=10s;
|
||||
set $tarot_backend tarot-lab:8000;
|
||||
@@ -378,8 +379,8 @@ server {
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_read_timeout 300s;
|
||||
proxy_send_timeout 300s;
|
||||
proxy_read_timeout 600s;
|
||||
proxy_send_timeout 600s;
|
||||
proxy_connect_timeout 60s;
|
||||
proxy_pass http://$tarot_backend$request_uri;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user