Files
web-page-backend/docs/superpowers/plans/2026-05-28-agent-office-docker-logs.md
gahusb dfd3b1bb17 docs(agent-office): docker logs 통합 타임라인 구현 계획
21개 task로 분해된 3-phase 실행 계획. Phase 1 (PoC, lotto 단일),
Phase 2 (4개 서비스 확장 + cleanup 스케줄러), Phase 3 (비즈니스 이벤트
보강). TDD 기반 task 구성, 모든 step 에 실제 코드/명령 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 01:44:43 +09:00

47 KiB
Raw Blame History

Agent Office — Docker 로그 통합 타임라인 구현 계획

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: agent-office LogTab에 5개 서비스 컨테이너의 의미 있는 docker 로그(액세스 로그 + 비즈니스 이벤트)를 통합 타임라인으로 표시하고, healthcheck/노이즈는 서버 측에서 차단한다.

Architecture: 공용 _shared/access_log.py 모듈에 ring buffer + AccessLogMiddleware + BufferLogHandler + /logs/recent 엔드포인트를 두고, 5개 서비스가 한 줄(install_access_log(app))로 적용한다. agent-office는 service_proxy.fetch_service_logs()로 polling해서 자체 agent_logs DB와 시간순 merge한 결과를 /api/agent-office/agents/{id}/logs로 반환한다.

Tech Stack: FastAPI, Starlette middleware, Python logging.Handler, collections.deque, httpx, APScheduler, React (LogTab), Docker Compose, SQLite.

Spec: web-backend/docs/superpowers/specs/2026-05-28-agent-office-docker-logs-design.md


File Structure

Creates

web-backend/_shared/__init__.py                 — 빈 패키지 선언
web-backend/_shared/access_log.py               — Ring buffer + Middleware + Handler + Router + install()
web-backend/_shared/tests/__init__.py
web-backend/_shared/tests/test_access_log.py    — 단위 테스트
web-backend/agent-office/tests/test_service_proxy_logs.py  — fetch_service_logs 테스트
web-backend/agent-office/tests/test_log_merge.py            — get_logs merge 테스트

Modifies

web-backend/docker-compose.yml                  — 5개 서비스 PYTHONPATH/volume/logging
web-backend/lotto/app/main.py                   — install_access_log(app)
web-backend/stock/app/main.py                   — install_access_log(app) + logger.info 3개
web-backend/music-lab/app/main.py               — install_access_log(app) + logger.info 2개
web-backend/insta-lab/app/main.py               — install_access_log(app) + logger.info 3개
web-backend/realestate-lab/app/main.py          — install_access_log(app)
web-backend/agent-office/app/config.py          — AGENT_CONTAINER_MAP 상수
web-backend/agent-office/app/service_proxy.py   — fetch_service_logs(agent_id, ...)
web-backend/agent-office/app/db.py              — get_logs NOT LIKE 'State: %' + delete_old_logs
web-backend/agent-office/app/main.py            — /logs 엔드포인트 merge
web-backend/agent-office/app/scheduler.py       — cleanup job
web-backend/agent-office/app/agents/base.py     — add_log('State: ...') 제거
web-ui/src/pages/agent-office/components/LogTab.jsx — source 뱃지 + access 메타데이터

각 파일은 단일 책임. 5개 서비스의 main.py는 install_access_log(app) 한 줄만 추가하므로 변경 최소.


Phase 1 — PoC (lotto 단일 서비스에 적용)

Task 1: _shared/access_log.py 모듈 + 단위 테스트

Files:

  • Create: web-backend/_shared/__init__.py

  • Create: web-backend/_shared/access_log.py

  • Create: web-backend/_shared/tests/__init__.py

  • Create: web-backend/_shared/tests/test_access_log.py

  • Step 1: _shared/__init__.py 빈 파일 생성

# empty
  • Step 2: _shared/tests/__init__.py 빈 파일 생성
# empty
  • Step 3: 실패하는 테스트 작성 (_shared/tests/test_access_log.py)
import logging
import time
from fastapi import FastAPI
from fastapi.testclient import TestClient

from _shared.access_log import (
    AccessLogMiddleware,
    BufferLogHandler,
    router as logs_router,
    install,
    _BUFFER,
)


def _reset_buffer():
    _BUFFER.clear()


def test_access_middleware_records_request():
    _reset_buffer()
    app = FastAPI()
    app.add_middleware(AccessLogMiddleware)

    @app.get("/api/lotto/recommend")
    def recommend():
        return {"ok": True}

    client = TestClient(app)
    client.get("/api/lotto/recommend")

    items = [x for x in _BUFFER if x["source"] == "access"]
    assert len(items) == 1
    assert items[0]["method"] == "GET"
    assert items[0]["path"] == "/api/lotto/recommend"
    assert items[0]["status"] == 200
    assert items[0]["ms"] >= 0


def test_access_middleware_skips_health():
    _reset_buffer()
    app = FastAPI()
    app.add_middleware(AccessLogMiddleware)

    @app.get("/health")
    def health():
        return {"ok": True}

    client = TestClient(app)
    client.get("/health")
    client.get("/healthz") if False else None  # health 만 노출되어 있음

    items = [x for x in _BUFFER if x["source"] == "access"]
    assert items == []


def test_access_middleware_skips_options():
    _reset_buffer()
    app = FastAPI()
    app.add_middleware(AccessLogMiddleware)

    @app.get("/api/lotto/recommend")
    def recommend():
        return {"ok": True}

    client = TestClient(app)
    client.options("/api/lotto/recommend")

    items = [x for x in _BUFFER if x["source"] == "access"]
    assert items == []


def test_buffer_log_handler_captures_logger_info():
    _reset_buffer()
    root = logging.getLogger("")
    handler = BufferLogHandler()
    root.addHandler(handler)
    try:
        lg = logging.getLogger("lotto.test")
        lg.setLevel(logging.INFO)
        lg.info("뉴스 스크래핑 완료: 국내 12건")
    finally:
        root.removeHandler(handler)

    items = [x for x in _BUFFER if x["source"] == "log"]
    assert len(items) == 1
    assert items[0]["message"] == "뉴스 스크래핑 완료: 국내 12건"
    assert items[0]["level"] == "info"
    assert items[0]["logger"] == "lotto.test"


def test_logs_recent_endpoint_returns_recent_items():
    _reset_buffer()
    app = FastAPI()
    install(app)

    @app.get("/api/lotto/recommend")
    def recommend():
        return {"ok": True}

    client = TestClient(app)
    client.get("/api/lotto/recommend")
    client.get("/api/lotto/recommend")
    client.get("/health")  # 제외되어야 함

    resp = client.get("/logs/recent")
    assert resp.status_code == 200
    logs = resp.json()["logs"]
    access_items = [x for x in logs if x["source"] == "access"]
    assert len(access_items) == 2


def test_logs_recent_with_since_filter():
    _reset_buffer()
    app = FastAPI()
    install(app)

    @app.get("/api/lotto/recommend")
    def recommend():
        return {"ok": True}

    client = TestClient(app)
    client.get("/api/lotto/recommend")
    time.sleep(0.01)
    cursor_resp = client.get("/logs/recent")
    cursor_ts = cursor_resp.json()["logs"][-1]["ts"]
    client.get("/api/lotto/recommend")

    resp = client.get(f"/logs/recent?since={cursor_ts}")
    items = [x for x in resp.json()["logs"] if x["source"] == "access"]
    assert len(items) == 1
  • Step 4: 테스트 실행해서 실패 확인

Run:

cd web-backend && python -m pytest _shared/tests/test_access_log.py -v

Expected: FAIL — ModuleNotFoundError: No module named '_shared.access_log'

  • Step 5: _shared/access_log.py 구현
"""각 lab 컨테이너에서 import 하는 공용 액세스/이벤트 로그 모듈.

사용법:
    from _shared.access_log import install as install_access_log
    install_access_log(app)
"""
from collections import deque
from datetime import datetime
from typing import Optional
import logging
import time

from fastapi import APIRouter, Request
from fastapi.applications import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware

# 컨테이너당 최근 500개를 in-memory 로 유지. 재시작 시 휘발.
_BUFFER: deque = deque(maxlen=500)

EXCLUDED_PATHS = {
    "/health", "/healthz", "/ping", "/favicon.ico",
    "/docs", "/redoc", "/openapi.json", "/logs/recent",
}
EXCLUDED_PREFIXES = ("/static/",)
EXCLUDED_METHODS = {"OPTIONS", "HEAD"}


def _should_log(request: Request) -> bool:
    if request.method in EXCLUDED_METHODS:
        return False
    path = request.url.path
    if path in EXCLUDED_PATHS:
        return False
    if any(path.startswith(p) for p in EXCLUDED_PREFIXES):
        return False
    return True


class AccessLogMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start = time.time()
        response = await call_next(request)
        if not _should_log(request):
            return response
        elapsed_ms = int((time.time() - start) * 1000)
        status = response.status_code
        if status < 400:
            level = "info"
        elif status < 500:
            level = "warning"
        else:
            level = "error"
        _BUFFER.append({
            "ts": datetime.utcnow().isoformat() + "Z",
            "level": level,
            "source": "access",
            "method": request.method,
            "path": request.url.path,
            "status": status,
            "ms": elapsed_ms,
            "message": f"{request.method} {request.url.path}{status} ({elapsed_ms}ms)",
        })
        return response


class BufferLogHandler(logging.Handler):
    """root logger 에 부착하면 모든 logger.info/warning/error 가 buffer 에 흐름."""

    def emit(self, record: logging.LogRecord) -> None:
        try:
            _BUFFER.append({
                "ts": datetime.utcfromtimestamp(record.created).isoformat() + "Z",
                "level": record.levelname.lower(),
                "source": "log",
                "logger": record.name,
                "message": record.getMessage(),
            })
        except Exception:
            # buffer 에 못 넣는다고 서비스가 죽으면 안 됨
            pass


router = APIRouter()


@router.get("/logs/recent")
def logs_recent(limit: int = 200, since: Optional[str] = None,
                path_prefix: Optional[str] = None):
    items = list(_BUFFER)
    if since:
        items = [x for x in items if x["ts"] > since]
    if path_prefix:
        items = [
            x for x in items
            if x["source"] == "log"
            or x.get("path", "").startswith(path_prefix)
        ]
    return {"logs": items[-limit:]}


def install(app: FastAPI, logger_root: str = "") -> None:
    """서비스 main.py 에서 호출하는 단일 설치 함수.

    - AccessLogMiddleware 등록
    - /logs/recent 라우터 등록
    - root logger 에 BufferLogHandler 부착 (모든 child logger 자동 전파)
    """
    app.add_middleware(AccessLogMiddleware)
    app.include_router(router)
    root = logging.getLogger(logger_root)
    if not any(isinstance(h, BufferLogHandler) for h in root.handlers):
        root.addHandler(BufferLogHandler())
  • Step 6: 테스트 통과 확인

Run:

cd web-backend && python -m pytest _shared/tests/test_access_log.py -v

Expected: PASS — 6 passed.

  • Step 7: 커밋
git add _shared/
git commit -m "feat(_shared): access_log 공용 모듈 추가 (ring buffer + middleware + /logs/recent)"

Task 2: lotto/app/main.pyinstall_access_log(app) 추가

Files:

  • Modify: web-backend/lotto/app/main.py

  • Step 1: lotto/app/main.py 상단 import 부분 확인

Run:

head -30 web-backend/lotto/app/main.py
  • Step 2: app = FastAPI(...) 호출 직후 위치에 install_access_log 호출 추가

lotto/app/main.py 의 FastAPI 인스턴스 생성 직후 (CORS middleware 추가 자리쯤) 에 다음 두 줄 삽입:

from _shared.access_log import install as install_access_log
install_access_log(app)
  • Step 3: 커밋
git add lotto/app/main.py
git commit -m "feat(lotto): _shared/access_log install (Phase 1 PoC)"

Task 3: docker-compose.yml lotto 서비스에 PYTHONPATH + volume + logging 추가

Files:

  • Modify: web-backend/docker-compose.yml (lotto 서비스 블록만)

  • Step 1: 현재 lotto 서비스 블록 확인

위에서 확인된 라인 (3-23). environment, volumes 섹션을 확장.

  • Step 2: lotto 서비스에 3가지 변경 적용

docker-compose.ymllotto: 서비스 블록에서 다음 변경:

  lotto:
    build:
      context: ./lotto
      args:
        APP_VERSION: ${APP_VERSION:-dev}
    container_name: lotto
    restart: unless-stopped
    ports:
      - "18000:8000"
    environment:
      - TZ=${TZ:-Asia/Seoul}
      - LOTTO_ALL_URL=${LOTTO_ALL_URL:-https://smok95.github.io/lotto/results/all.json}
      - LOTTO_LATEST_URL=${LOTTO_LATEST_URL:-https://smok95.github.io/lotto/results/latest.json}
      - PYTHONPATH=/app:/shared        # NEW
    volumes:
      - ${RUNTIME_PATH}/data:/app/data
      - ./_shared:/shared:ro            # NEW
    logging:                            # NEW
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
    healthcheck:
      test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
      interval: 60s
      timeout: 5s
      retries: 3
  • Step 3: docker-compose 문법 검증 (NAS에서만 docker 구동 가능하므로 lint만)

Run (로컬에서):

cd web-backend && python -c "import yaml; yaml.safe_load(open('docker-compose.yml'))"

Expected: 에러 없음.

  • Step 4: 커밋
git add docker-compose.yml
git commit -m "build(lotto): PYTHONPATH=_shared + json-file logging 추가 (Phase 1 PoC)"

Task 4: agent-office/app/config.pyAGENT_CONTAINER_MAP 추가

Files:

  • Modify: web-backend/agent-office/app/config.py

  • Step 1: 현재 config.py 확인

Run:

cat web-backend/agent-office/app/config.py | head -40
  • Step 2: 파일 끝에 매핑 상수 추가

agent-office/app/config.py 끝에 다음 추가:

import re as _re

# 에이전트 → (container_host, port, path_prefix_regex)
# path_prefix_regex: lotto-backend 컨테이너에 personal/blog/todo 도 같이 있어
# /api/lotto 만 골라내기 위한 정규식. business log (source='log') 는 모두 통과.
AGENT_CONTAINER_MAP: dict[str, tuple[str, int, _re.Pattern]] = {
    "lotto": ("lotto", 8000, _re.compile(r"^/api/lotto")),
    # Phase 2 에서 추가:
    # "stock":      ("stock",          8000, _re.compile(r"^/api/(stock|trade|portfolio)")),
    # "music":      ("music-lab",      8000, _re.compile(r"^/api/music")),
    # "insta":      ("insta-lab",      8000, _re.compile(r"^/api/insta")),
    # "realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),
}
  • Step 3: 커밋
git add agent-office/app/config.py
git commit -m "feat(agent-office): AGENT_CONTAINER_MAP 상수 추가 (Phase 1 lotto)"

Task 5: agent-office/app/service_proxy.pyfetch_service_logs + 테스트

Files:

  • Modify: web-backend/agent-office/app/service_proxy.py

  • Create: web-backend/agent-office/tests/test_service_proxy_logs.py

  • Step 1: 실패하는 테스트 작성

agent-office/tests/test_service_proxy_logs.py:

import pytest
import respx
import httpx

from app.service_proxy import fetch_service_logs


@pytest.mark.asyncio
@respx.mock
async def test_fetch_service_logs_filters_by_path_prefix():
    # lotto 컨테이너 응답: lotto + personal 섞임
    respx.get("http://lotto:8000/logs/recent").mock(
        return_value=httpx.Response(200, json={
            "logs": [
                {"ts": "2026-05-28T10:00:00Z", "source": "access",
                 "method": "GET", "path": "/api/lotto/recommend",
                 "status": 200, "ms": 12,
                 "message": "GET /api/lotto/recommend → 200 (12ms)"},
                {"ts": "2026-05-28T10:00:01Z", "source": "access",
                 "method": "GET", "path": "/api/blog/posts",
                 "status": 200, "ms": 5,
                 "message": "GET /api/blog/posts → 200 (5ms)"},
                {"ts": "2026-05-28T10:00:02Z", "source": "log",
                 "logger": "lotto", "level": "info",
                 "message": "성과 통계 캐시 갱신"},
            ]
        })
    )

    result = await fetch_service_logs("lotto", limit=50)
    # lotto path 와 모든 log 이벤트만 통과
    paths = [x.get("path") for x in result]
    assert "/api/lotto/recommend" in paths
    assert "/api/blog/posts" not in paths
    # 비즈니스 로그도 포함
    assert any(x["source"] == "log" and x["message"] == "성과 통계 캐시 갱신"
               for x in result)


@pytest.mark.asyncio
async def test_fetch_service_logs_unknown_agent_returns_empty():
    result = await fetch_service_logs("nonexistent", limit=50)
    assert result == []


@pytest.mark.asyncio
@respx.mock
async def test_fetch_service_logs_handles_connection_error():
    respx.get("http://lotto:8000/logs/recent").mock(
        side_effect=httpx.ConnectError("connection refused")
    )
    result = await fetch_service_logs("lotto", limit=50)
    assert result == []
  • Step 2: 테스트 실행해서 실패 확인

Run:

cd web-backend/agent-office && python -m pytest tests/test_service_proxy_logs.py -v

Expected: FAIL — ImportError: cannot import name 'fetch_service_logs'

respx 미설치 시:

pip install respx pytest-asyncio
  • Step 3: service_proxy.py 끝에 함수 추가
# 파일 상단에 logging import 추가
import logging
logger = logging.getLogger(__name__)

# 파일 하단에 추가:

from .config import AGENT_CONTAINER_MAP


async def fetch_service_logs(
    agent_id: str,
    since: Optional[str] = None,
    limit: int = 200,
) -> List[Dict[str, Any]]:
    """해당 에이전트가 가리키는 컨테이너의 /logs/recent 를 호출해서
    path_prefix 정규식으로 필터한 결과를 반환.

    네트워크 실패 시 빈 리스트를 반환하고 warning 만 남김 (LogTab 이 죽지 않게).
    """
    mapping = AGENT_CONTAINER_MAP.get(agent_id)
    if not mapping:
        return []
    host, port, path_re = mapping
    url = f"http://{host}:{port}/logs/recent"
    params: Dict[str, Any] = {"limit": limit}
    if since:
        params["since"] = since
    try:
        async with httpx.AsyncClient(timeout=3.0) as client:
            resp = await client.get(url, params=params)
            data = resp.json().get("logs", [])
    except Exception as e:
        logger.warning("fetch_service_logs(%s) 실패: %s", agent_id, e)
        return []
    return [
        x for x in data
        if x.get("source") == "log"
        or path_re.match(x.get("path", "") or "")
    ]
  • Step 4: 테스트 통과 확인

Run:

cd web-backend/agent-office && python -m pytest tests/test_service_proxy_logs.py -v

Expected: PASS — 3 passed.

  • Step 5: 커밋
git add agent-office/app/service_proxy.py agent-office/tests/test_service_proxy_logs.py
git commit -m "feat(agent-office): fetch_service_logs 추가 (path_prefix 정규식 필터)"

Task 6: agent-office/app/db.py get_logs 필터 + delete_old_logs + 테스트

Files:

  • Modify: web-backend/agent-office/app/db.py

  • Modify: web-backend/agent-office/app/test_db.py

  • Step 1: 실패하는 테스트 추가

agent-office/app/test_db.py 끝에 추가:

def test_get_logs_excludes_state_messages():
    add_log("stock", "State: idle -> working (큐레이션 시작)")
    add_log("stock", "뉴스 12건 스크랩 완료")
    add_log("stock", "State: working -> idle ()")

    logs = get_logs("stock", limit=10)
    messages = [x["message"] for x in logs]
    assert "뉴스 12건 스크랩 완료" in messages
    assert not any(m.startswith("State: ") for m in messages)


def test_delete_old_logs_removes_beyond_retention():
    import datetime as _dt
    from app.db import delete_old_logs, _conn

    add_log("stock", "오래된 로그")
    # 강제로 200일 전으로 옮김
    cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=200)).isoformat()
    with _conn() as conn:
        conn.execute(
            "UPDATE agent_logs SET created_at = ? WHERE message = '오래된 로그'",
            (cutoff,),
        )

    add_log("stock", "최근 로그")
    deleted = delete_old_logs(days=90)
    assert deleted >= 1

    msgs = [x["message"] for x in get_logs("stock", limit=20)]
    assert "최근 로그" in msgs
    assert "오래된 로그" not in msgs
  • Step 2: 테스트 실행해서 실패 확인

Run:

cd web-backend/agent-office && python -m pytest app/test_db.py -v -k "state or old_logs"

Expected: FAIL — get_logs 가 State 포함, delete_old_logs 미정의.

  • Step 3: db.py 수정 — get_logs 에 NOT LIKE 필터 추가

db.pyget_logs 함수 SQL 을:

def get_logs(agent_id: str, limit: int = 50) -> List[Dict[str, Any]]:
    with _conn() as conn:
        rows = conn.execute(
            """
            SELECT * FROM agent_logs
            WHERE agent_id = ?
              AND message NOT LIKE 'State: %'
            ORDER BY created_at DESC
            LIMIT ?
            """,
            (agent_id, limit),
        ).fetchall()
    return [
        {
            "id": r["id"],
            "agent_id": r["agent_id"],
            "task_id": r["task_id"],
            "level": r["level"],
            "message": r["message"],
            "created_at": r["created_at"],
            "source": "agent",
        }
        for r in rows
    ]

(source: "agent" 필드도 추가 — 통합 타임라인에서 source 구분용)

  • Step 4: db.py 끝에 delete_old_logs 함수 추가
import datetime as _dt


def delete_old_logs(days: int = 90) -> int:
    """retention 정책: 90일 이전 agent_logs 삭제. 매일 03:00 스케줄러가 호출."""
    cutoff = (_dt.datetime.utcnow() - _dt.timedelta(days=days)).isoformat()
    with _conn() as conn:
        c = conn.execute(
            "DELETE FROM agent_logs WHERE created_at < ?",
            (cutoff,),
        )
        return c.rowcount
  • Step 5: 테스트 통과 확인

Run:

cd web-backend/agent-office && python -m pytest app/test_db.py -v

Expected: PASS — 기존 + 추가 2개 모두 통과.

  • Step 6: 커밋
git add agent-office/app/db.py agent-office/app/test_db.py
git commit -m "feat(agent-office/db): get_logs에서 State: 자동 로그 제외 + delete_old_logs(90일)"

Task 7: agent-office/app/main.py agent_logs 엔드포인트 merge + 테스트

Files:

  • Modify: web-backend/agent-office/app/main.py

  • Create: web-backend/agent-office/tests/test_log_merge.py

  • Step 1: 실패하는 테스트 작성

agent-office/tests/test_log_merge.py:

import pytest
import respx
import httpx
from fastapi.testclient import TestClient

from app.main import app
from app.db import add_log, _conn


@pytest.fixture(autouse=True)
def _clean_logs():
    with _conn() as conn:
        conn.execute("DELETE FROM agent_logs WHERE agent_id = 'lotto'")
    yield


@respx.mock
def test_agent_logs_endpoint_merges_db_and_service_logs():
    add_log("lotto", "큐레이션 완료: #1234 conf=0.78")
    respx.get("http://lotto:8000/logs/recent").mock(
        return_value=httpx.Response(200, json={
            "logs": [
                {"ts": "2026-05-28T10:00:00Z", "source": "access",
                 "method": "GET", "path": "/api/lotto/latest",
                 "status": 200, "ms": 8,
                 "message": "GET /api/lotto/latest → 200 (8ms)"},
                {"ts": "2026-05-28T10:00:02Z", "source": "log",
                 "logger": "lotto", "level": "info",
                 "message": "성과 통계 캐시 갱신"},
            ]
        })
    )

    client = TestClient(app)
    resp = client.get("/api/agent-office/agents/lotto/logs?limit=20")
    assert resp.status_code == 200
    logs = resp.json()["logs"]

    sources = {x["source"] for x in logs}
    assert "agent" in sources
    assert "access" in sources
    assert "log" in sources

    messages = [x["message"] for x in logs]
    assert any("큐레이션 완료" in m for m in messages)
    assert any("성과 통계 캐시 갱신" in m for m in messages)
    assert any("/api/lotto/latest" in m for m in messages)
  • Step 2: 테스트 실행해서 실패 확인

Run:

cd web-backend/agent-office && python -m pytest tests/test_log_merge.py -v

Expected: FAIL — 현재 엔드포인트는 service 로그를 fetch 하지 않음.

  • Step 3: agent-office/app/main.pyagent_logs 엔드포인트 교체

기존 (line 118-120):

@app.get("/api/agent-office/agents/{agent_id}/logs")
def agent_logs(agent_id: str, limit: int = 50):
    return {"logs": get_logs(agent_id, limit)}

→ 다음으로 교체:

@app.get("/api/agent-office/agents/{agent_id}/logs")
async def agent_logs(agent_id: str, limit: int = 50):
    from .service_proxy import fetch_service_logs

    agent_items = get_logs(agent_id, limit=limit)
    service_items = await fetch_service_logs(agent_id, limit=limit)

    def _sort_key(x):
        # agent_logs: created_at, service: ts
        return x.get("ts") or x.get("created_at") or ""

    merged = sorted(agent_items + service_items, key=_sort_key, reverse=True)
    return {"logs": merged[:limit]}
  • Step 4: 테스트 통과 확인

Run:

cd web-backend/agent-office && python -m pytest tests/test_log_merge.py -v

Expected: PASS.

  • Step 5: 커밋
git add agent-office/app/main.py agent-office/tests/test_log_merge.py
git commit -m "feat(agent-office): /agents/{id}/logs 엔드포인트가 service /logs/recent 와 merge"

Task 8: agent-office/app/agents/base.py transition 의 add_log 제거

Files:

  • Modify: web-backend/agent-office/app/agents/base.py

  • Step 1: 현재 transition 메서드 확인

base.pytransition() 메서드 (line 22-43) 의 32번 라인:

add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})")
  • Step 2: 해당 라인 삭제

add_log(self.agent_id, f"State: ...") 라인 1줄과 위쪽 빈 줄 1줄을 제거. 또한 파일 상단의 from ..db import add_log 도 다른 곳에서 더 이상 쓰지 않으면 제거.

(주의: db.py 의 add_log 자체는 다른 모듈이 계속 쓰므로 함수는 유지)

import time
from typing import Optional

VALID_STATES = ("idle", "working", "waiting", "reporting")

class BaseAgent:
    agent_id: str = ""
    display_name: str = ""
    state: str = "idle"
    state_detail: str = ""
    _idle_since: float = 0.0
    _ws_manager = None

    def __init__(self):
        self._idle_since = time.time()

    def set_ws_manager(self, manager):
        self._ws_manager = manager

    async def transition(self, new_state: str, detail: str = "", task_id: str = None) -> None:
        if new_state not in VALID_STATES:
            return
        old = self.state
        self.state = new_state
        self.state_detail = detail

        if new_state == "idle":
            self._idle_since = time.time()

        if self._ws_manager:
            await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id)
            if new_state == "working" and old != "working":
                await self._ws_manager.send_notification(
                    self.agent_id, "task_assigned", task_id, detail or "새 작업 시작"
                )
            elif new_state == "idle" and old in ("working", "reporting"):
                await self._ws_manager.send_notification(
                    self.agent_id, "task_completed", task_id, detail or "작업 완료"
                )
    # ... 나머지 메서드는 그대로
  • Step 3: 기존 agent_logs DB 의 state 로그 일괄 삭제 (선택, 일회성)

운영 DB 에 누적된 State: ... 행을 한 번 정리.

Run (NAS 에서 또는 로컬 DB 에서):

sqlite3 path/to/agent_office.db "DELETE FROM agent_logs WHERE message LIKE 'State: %'"

(이 작업은 plan 진행자가 NAS 접근권한 있을 때 직접 수행. CI 에서는 생략.)

  • Step 4: 기존 테스트가 깨지지 않는지 전체 확인

Run:

cd web-backend/agent-office && python -m pytest -v

Expected: 기존 모든 테스트 통과.

  • Step 5: 커밋
git add agent-office/app/agents/base.py
git commit -m "refactor(agent-office/base): transition의 State 자동 로그 제거"

Task 9: web-ui/src/pages/agent-office/components/LogTab.jsx source 뱃지 + access 메타데이터

Files:

  • Modify: web-ui/src/pages/agent-office/components/LogTab.jsx

  • Step 1: 새 LogTab.jsx 작성

web-ui/src/pages/agent-office/components/LogTab.jsx 전체를 다음으로 교체:

// src/pages/agent-office/components/LogTab.jsx
import { useState, useEffect, useRef } from 'react';
import { getAgentLogs } from '../../../api';

const LEVEL_STYLE = {
  info:    { color: '#60a5fa' },
  warning: { color: '#fbbf24' },
  error:   { color: '#ef4444' },
};

const SOURCE_STYLE = {
  agent:  { color: '#9ca3af', label: 'AGENT' },
  access: { color: '#5eead4', label: 'ACCESS' },
  log:    { color: '#a78bfa', label: 'LOG' },
};

function formatTime(iso) {
  if (!iso) return '';
  return new Date(iso).toLocaleTimeString('ko-KR', {
    hour: '2-digit', minute: '2-digit', second: '2-digit',
  });
}

export default function LogTab({ agentId, refreshTrigger }) {
  const [logs, setLogs] = useState([]);
  const scrollRef = useRef(null);

  useEffect(() => {
    let cancelled = false;
    const fetchLogs = () => {
      getAgentLogs(agentId, 100).then(data => {
        if (cancelled) return;
        setLogs(Array.isArray(data) ? data : (data?.logs || []));
      }).catch(() => {});
    };
    fetchLogs();
    const interval = setInterval(fetchLogs, 5000);  // 5초 폴링
    return () => { cancelled = true; clearInterval(interval); };
  }, [agentId, refreshTrigger]);

  useEffect(() => {
    if (scrollRef.current) {
      scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
    }
  }, [logs]);

  return (
    <div className="ao-log-tab" ref={scrollRef}>
      {logs.length === 0 && <div className="ao-empty">No logs yet</div>}
      {logs.map((log, i) => {
        const source = log.source || 'agent';
        const sourceMeta = SOURCE_STYLE[source] || SOURCE_STYLE.agent;
        const levelStyle = LEVEL_STYLE[log.level] || LEVEL_STYLE.info;
        const time = formatTime(log.ts || log.created_at);
        return (
          <div key={log.id || `${source}-${i}-${time}`} className="ao-log-item">
            <span className="ao-log-time">{time}</span>
            <span className="ao-log-source" style={{ color: sourceMeta.color }}>
              [{sourceMeta.label}]
            </span>
            <span className="ao-log-level" style={levelStyle}>[{log.level}]</span>
            <span className="ao-log-msg">{log.message}</span>
            {source === 'access' && (
              <span className="ao-log-meta">
                {' '}({log.status} · {log.ms}ms)
              </span>
            )}
          </div>
        );
      })}
    </div>
  );
}
  • Step 2: CSS 가 새 클래스 (ao-log-source, ao-log-meta)를 무난히 표시하는지 확인

web-ui/src/pages/agent-office/AgentOffice.css 의 기존 .ao-log-item 영역에 다음을 추가 (없으면):

.ao-log-source {
  margin-left: 6px;
  font-size: 0.75em;
  font-weight: 600;
  letter-spacing: 0.5px;
}

.ao-log-meta {
  color: #6b7280;
  font-size: 0.85em;
}
  • Step 3: 개발 서버에서 시각 확인

Run:

cd web-ui && npm run dev

브라우저: http://localhost:3007/agent-office → lotto 에이전트 카드 클릭 → 로그 탭에서 [AGENT]/[ACCESS]/[LOG] 뱃지가 보이는지 + access 로그에 status/ms 가 표시되는지.

(주의: 백엔드가 NAS 에서 돌고 있어야 하며, /api 가 NAS 로 프록시되므로 lotto 컨테이너에 Phase 1 변경이 배포된 후 검증 가능. 그 전에는 빈 화면 또는 기존 agent_logs 만 보임.)

  • Step 4: 커밋 (web-ui repo)
cd web-ui
git add src/pages/agent-office/components/LogTab.jsx src/pages/agent-office/AgentOffice.css
git commit -m "feat(agent-office/LogTab): source 뱃지 + access 메타데이터 표시 + 5초 폴링"

Task 10: Phase 1 통합 검증

  • Step 1: 5개 backend 변경을 NAS 에 배포

web-backend 의 변경 사항 (Task 1~8) push:

cd web-backend && git push

Gitea Webhook → deployer 가 rsync + docker compose up 으로 lotto 컨테이너 재시작.

  • Step 2: 컨테이너 내부 /shared 마운트 확인

NAS SSH:

docker exec lotto ls /shared/_shared/access_log.py
docker exec lotto python -c "import _shared.access_log; print(_shared.access_log.__file__)"

Expected: 파일 보이고 import 성공.

  • Step 3: lotto 컨테이너 내부에서 /logs/recent 응답 확인
docker exec lotto curl -s http://localhost:8000/logs/recent | head -c 500

Expected: {"logs": [...]} JSON (이미 healthcheck 외 요청이 있었다면 access 로그 보임).

  • Step 4: agent-office 컨테이너에서 lotto 호출 가능한지 확인
docker exec agent-office curl -s http://lotto:8000/logs/recent | head -c 500

Expected: 위와 동일 JSON.

  • Step 5: web-ui (NAS frontend) 배포 + 시각 검증
cd web-ui && npm run release:nas

브라우저: https://gahusb.synology.me/agent-office → lotto 에이전트 → 로그 탭에서:

  • [AGENT] 뱃지 + 기존 agent 로그 (단, State: ... 항목은 더 이상 안 보여야 함)

  • [ACCESS] 뱃지 + /api/lotto/... 요청 (status, ms 표시)

  • [LOG] 뱃지 + lotto 컨테이너의 logger.info 출력 (예: "성과 통계 캐시 갱신")

  • 시간순 정렬

  • Step 6: Phase 1 종료 — Phase 2 진입 결정

LogTab 화면 캡처 또는 사용자 confirmation 후 Phase 2 진행.


Phase 2 — 나머지 4개 서비스 확장

Task 11: stock 서비스 적용

Files:

  • Modify: web-backend/stock/app/main.py

  • Modify: web-backend/docker-compose.yml (stock 서비스 블록)

  • Modify: web-backend/agent-office/app/config.py

  • Step 1: stock/app/main.py 의 FastAPI 인스턴스 직후에 추가

from _shared.access_log import install as install_access_log
install_access_log(app)
  • Step 2: docker-compose.yml 의 stock 서비스에 환경/볼륨/로깅 3가지 추가 (Task 3 과 동일 패턴)
  stock:
    ...
    environment:
      ...
      - PYTHONPATH=/app:/shared        # NEW
    volumes:
      - ${RUNTIME_PATH}/data/stock:/app/data
      - ./_shared:/shared:ro            # NEW
    logging:                            # NEW
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
    healthcheck:
      ...
  • Step 3: agent-office/app/config.pyAGENT_CONTAINER_MAP 에 stock 추가
AGENT_CONTAINER_MAP = {
    "lotto":      ("lotto", 8000, _re.compile(r"^/api/lotto")),
    "stock":      ("stock", 8000, _re.compile(r"^/api/(stock|trade|portfolio)")),  # NEW
}
  • Step 4: 커밋
git add stock/app/main.py docker-compose.yml agent-office/app/config.py
git commit -m "feat(stock): _shared/access_log 적용 + AGENT_CONTAINER_MAP 매핑"

Task 12: music-lab 서비스 적용

Files:

  • Modify: web-backend/music-lab/app/main.py

  • Modify: web-backend/docker-compose.yml (music-lab 서비스 블록)

  • Modify: web-backend/agent-office/app/config.py

  • Step 1: music-lab/app/main.py 의 FastAPI 인스턴스 직후에 추가

from _shared.access_log import install as install_access_log
install_access_log(app)
  • Step 2: docker-compose.yml 의 music-lab 서비스에 동일 패턴 추가
  music-lab:
    ...
    environment:
      ...
      - PYTHONPATH=/app:/shared
    volumes:
      ...
      - ./_shared:/shared:ro
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
  • Step 3: AGENT_CONTAINER_MAP 에 music 추가
    "music": ("music-lab", 8000, _re.compile(r"^/api/music")),  # NEW
  • Step 4: 커밋
git add music-lab/app/main.py docker-compose.yml agent-office/app/config.py
git commit -m "feat(music-lab): _shared/access_log 적용 + AGENT_CONTAINER_MAP 매핑"

Task 13: insta-lab 서비스 적용

Files:

  • Modify: web-backend/insta-lab/app/main.py

  • Modify: web-backend/docker-compose.yml (insta-lab 서비스 블록)

  • Modify: web-backend/agent-office/app/config.py

  • Step 1: insta-lab/app/main.py 에 install 추가

from _shared.access_log import install as install_access_log
install_access_log(app)
  • Step 2: docker-compose.yml insta-lab 블록에 동일 패턴 추가
  insta-lab:
    ...
    environment:
      ...
      - PYTHONPATH=/app:/shared
    volumes:
      ...
      - ./_shared:/shared:ro
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
  • Step 3: AGENT_CONTAINER_MAP 에 insta 추가
    "insta": ("insta-lab", 8000, _re.compile(r"^/api/insta")),  # NEW
  • Step 4: 커밋
git add insta-lab/app/main.py docker-compose.yml agent-office/app/config.py
git commit -m "feat(insta-lab): _shared/access_log 적용 + AGENT_CONTAINER_MAP 매핑"

Task 14: realestate-lab 서비스 적용

Files:

  • Modify: web-backend/realestate-lab/app/main.py

  • Modify: web-backend/docker-compose.yml (realestate-lab 서비스 블록)

  • Modify: web-backend/agent-office/app/config.py

  • Step 1: realestate-lab/app/main.py 에 install 추가

from _shared.access_log import install as install_access_log
install_access_log(app)
  • Step 2: docker-compose.yml realestate-lab 블록에 동일 패턴 추가
  realestate-lab:
    ...
    environment:
      ...
      - PYTHONPATH=/app:/shared
    volumes:
      ...
      - ./_shared:/shared:ro
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
  • Step 3: AGENT_CONTAINER_MAP 에 realestate 추가
    "realestate": ("realestate-lab", 8000, _re.compile(r"^/api/realestate")),  # NEW
  • Step 4: 커밋
git add realestate-lab/app/main.py docker-compose.yml agent-office/app/config.py
git commit -m "feat(realestate-lab): _shared/access_log 적용 + AGENT_CONTAINER_MAP 매핑"

Task 15: cleanup 스케줄러 등록

Files:

  • Modify: web-backend/agent-office/app/scheduler.py

  • Step 1: scheduler.py 끝에 import 추가

from apscheduler.triggers.cron import CronTrigger
from .db import delete_old_logs
  • Step 2: scheduler.add_job(...) 가 모여있는 영역에 다음 추가

(scheduler.add_job(_run_stock_schedule, ...) 같은 등록부 근처에)

def _cleanup_old_logs():
    n = delete_old_logs(days=90)
    if n:
        import logging
        logging.getLogger(__name__).info("delete_old_logs: %d rows removed", n)


scheduler.add_job(
    _cleanup_old_logs,
    CronTrigger(hour=3, minute=0, timezone="Asia/Seoul"),
    id="cleanup_old_logs",
    replace_existing=True,
)
  • Step 3: 커밋
git add agent-office/app/scheduler.py
git commit -m "feat(agent-office/scheduler): 매일 03:00 agent_logs 90일 retention cleanup"

Task 16: Phase 2 통합 검증

  • Step 1: web-backend push → NAS 자동 배포
cd web-backend && git push
  • Step 2: 5개 컨테이너 모두 /logs/recent 응답 확인
for svc in lotto stock music-lab insta-lab realestate-lab; do
  echo "=== $svc ==="
  docker exec $svc curl -s http://localhost:8000/logs/recent | head -c 200
  echo
done

Expected: 5개 모두 {"logs": [...]} JSON 응답.

  • Step 3: 5개 에이전트 모두 LogTab 에서 통합 표시 확인

브라우저에서 5개 에이전트 각각 클릭 → 모두 [AGENT]/[ACCESS]/[LOG] 통합 표시.

  • Step 4: cleanup 스케줄러 등록 확인
docker logs agent-office 2>&1 | grep "cleanup_old_logs" | tail -3

Expected: 스케줄러 시작 로그에 cleanup_old_logs 등록.


Phase 3 — 비즈니스 이벤트 보강 (logger.info 추가)

Task 17: stock 비즈니스 이벤트 보강

Files:

  • Modify: web-backend/stock/app/main.py

  • Step 1: order 응답 로깅 추가

stock/app/main.py:250 근처의 /api/trade/order 핸들러:

기존:

    logger.info(f"Order Request: {req.action} {req.ticker} x{req.quantity}")

뒤에 응답 로깅 추가:

        resp = await client.post(...)
        if resp.status_code == 200:
            data = resp.json()
            logger.info(
                "Order Response: %s %s x%s%s (체결가=%s)",
                req.action, req.ticker, req.quantity,
                data.get("status"), data.get("filled_price"),
            )
        else:
            logger.error(f"Order Error: {resp.status_code}")
  • Step 2: AI Coach 호출 로깅 추가

main.py:297 근처 AI Coach 핸들러 시작 부분에:

    logger.info("AI Coach 호출: tickers=%s", tickers[:5])

응답 처리 끝에:

    logger.info("AI Coach 응답: tokens_in=%s tokens_out=%s", usage.get("input"), usage.get("output"))
  • Step 3: 스크리너 호출 로깅 추가

screener 엔드포인트 핸들러 끝에:

    logger.info("Screener 결과: 종목 %d개 선정 (asof=%s)", len(picks), asof)
  • Step 4: 커밋
git add stock/app/main.py
git commit -m "feat(stock): 비즈니스 이벤트 로그 보강 (Order 응답, AI Coach, Screener)"

Task 18: music-lab 비즈니스 이벤트 보강

Files:

  • Modify: web-backend/music-lab/app/main.py

  • Step 1: 음악 생성 시작 로깅

/api/music/generate 핸들러 시작 부분에:

    logger.info("음악 생성 시작: title=%s genre=%s duration=%ss", title, genre, duration_sec)
  • Step 2: 음악 생성 완료 로깅

생성 성공 분기에 (백엔드 callback 또는 polling 완료 시점):

    logger.info("음악 생성 완료: track_id=%s duration=%ss", track_id, duration_sec)
  • Step 3: 커밋
git add music-lab/app/main.py
git commit -m "feat(music-lab): 음악 생성 시작/완료 비즈니스 로그 보강"

Task 19: insta-lab 비즈니스 이벤트 보강

Files:

  • Modify: web-backend/insta-lab/app/main.py

  • Modify: web-backend/insta-lab/app/keyword_extractor.py (또는 동등 모듈)

  • Step 1: 키워드 추출 완료 로깅

keyword_extractor.py 의 추출 함수 끝부분:

    logger.info("키워드 추출 완료: %d개", len(keywords))
  • Step 2: 슬레이트 생성 완료 로깅

/api/insta/slates POST 핸들러 응답 직전:

    logger.info("슬레이트 생성 완료: id=%s 카드=%d", slate.id, len(slate.cards))
  • Step 3: 인스타 발행 결과 로깅

slate render 또는 publish 핸들러 응답 분기:

    logger.info("인스타 발행 결과: slate=%s status=%s", slate_id, status)
  • Step 4: 커밋
git add insta-lab/
git commit -m "feat(insta-lab): 키워드/슬레이트/발행 비즈니스 로그 보강"

Task 20: lotto-backend 비즈니스 이벤트 보강

Files:

  • Modify: web-backend/lotto/app/main.py

  • Modify: web-backend/lotto/app/curator/pipeline.py (또는 AI 큐레이터 호출부)

  • Step 1: AI 큐레이터 호출/응답 로깅

큐레이터 파이프라인 호출부:

    logger.info("AI 큐레이터 호출: model=%s draw=%s", model, draw_no)
    ...
    logger.info(
        "AI 큐레이터 응답: tokens_in=%s tokens_out=%s confidence=%s",
        usage.get("input"), usage.get("output"), result.get("confidence"),
    )
  • Step 2: 점수 계산 완료 로깅

추천 알고리즘 호출 끝부분:

    logger.info("점수 계산 완료: top_picks=%s mean_score=%.3f", picks[:3], mean_score)
  • Step 3: 커밋
git add lotto/
git commit -m "feat(lotto): AI 큐레이터 호출/점수 계산 비즈니스 로그 보강"

Task 21: 최종 검증

  • Step 1: 전체 변경 NAS 배포
cd web-backend && git push
  • Step 2: 각 에이전트 LogTab 에서 비즈니스 이벤트 노출 확인

브라우저에서 5개 에이전트 각각 클릭 후 24시간 모니터링 (또는 즉시 수동 트리거):

  • stock → 시장 시간이면 Order/AI Coach 로그 보임

  • music → /api/music/generate 요청 시 "음악 생성 시작/완료" 보임

  • insta → 매일 새벽 수집 시 "키워드 추출 완료", "슬레이트 생성 완료"

  • realestate → 매일 수집 사이클에 "수집 완료: new=N, total=N" (기존 로그)

  • lotto → 주간 큐레이션 시 "AI 큐레이터 호출/응답", "점수 계산 완료"

  • Step 3: healthcheck 노이즈 차단 확인

docker exec lotto curl -s http://localhost:8000/logs/recent | python -c "
import sys, json
logs = json.load(sys.stdin)['logs']
print('total:', len(logs))
print('health 포함:', any('/health' in (x.get('path') or '') for x in logs))
"

Expected: health 포함: False

  • Step 4: agent_logs 의 'State:' 행이 더 이상 쌓이지 않는지 확인
docker exec agent-office sqlite3 /app/data/agent_office.db \
  "SELECT COUNT(*) FROM agent_logs WHERE message LIKE 'State: %' AND created_at > date('now', '-1 day')"

Expected: 0

  • Step 5: cleanup 스케줄러가 실제로 동작했는지 (1일 이상 경과 후)
docker logs agent-office 2>&1 | grep "delete_old_logs" | tail -5

Expected: 매일 03:00 직후 delete_old_logs: N rows removed 로그.


Self-Review 결과

1. Spec coverage:

  • 5개 서비스 /logs/recent 엔드포인트 → Task 1, 2, 1114
  • AccessLogMiddleware + BufferLogHandler + install() → Task 1
  • ring buffer maxlen=500 → Task 1 (_BUFFER = deque(maxlen=500))
  • healthcheck/static/OPTIONS 제외 → Task 1 (EXCLUDED_*)
  • agent-office fetch_service_logs + AGENT_CONTAINER_MAP → Task 4, 5, 1114
  • get_logs NOT LIKE 'State: %' → Task 6
  • delete_old_logs 90일 → Task 6, scheduler 등록 Task 15
  • base.py State: ... 자동 로그 제거 → Task 8
  • LogTab source 뱃지 + access 메타데이터 → Task 9
  • docker-compose 5개 서비스 PYTHONPATH/volume/logging → Task 3, 1114
  • 비즈니스 이벤트 보강 → Phase 3 (Task 1720)

2. Placeholder scan:

  • "TBD" / "TODO" / "fill in details" 등 없음.
  • 모든 step 에 실제 코드 또는 실제 명령 포함.

3. Type consistency:

  • fetch_service_logs(agent_id, since, limit) Task 5 정의 → Task 7 에서 동일 시그니처 호출. OK.
  • delete_old_logs(days=90) Task 6 정의 → Task 15 에서 동일 시그니처 호출. OK.
  • install(app) Task 1 정의 → Task 2, 1114 에서 동일 호출. OK.
  • log row schema 4가지 (agent, access, log, 그리고 missing source → fallback 'agent') 모두 Task 9 LogTab 에서 처리. OK.

실행 옵션 (이 plan 완료 후)

이 plan 은 21개 task. 다음 두 가지 옵션 중 선택:

1. Subagent-Driven (recommended) — task 마다 fresh subagent 디스패치, task 간 review checkpoint, 빠른 iteration.

2. Inline Execution — 이 세션에서 task 들을 batch checkpoint 로 실행.