Files
web-page-backend/agent-office/app/agents/base.py
gahusb de8adaeadd refactor(agent-office): drop the random idle→break→idle cycle
The pixel-office game UI is gone, so simulating coffee-break /
nap / walk states no longer serves any purpose. Remove:
- scheduler's _check_idle_breaks job (no more 60s idle scan)
- BaseAgent.check_idle_break() and _break_until field
- 'break' from VALID_STATES and from transition() branches
- IDLE_BREAK_THRESHOLD / BREAK_DURATION_MIN / BREAK_DURATION_MAX
  config knobs
- 'idle/break' guard in each agent's on_schedule (now just 'idle')

Agents now sit in 'idle' between scheduled jobs and explicit
commands. Display reads 'Idle' instead of churning between idle
and break.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 08:44:50 +09:00

61 lines
1.9 KiB
Python

import time
from typing import Optional
from ..db import add_log
VALID_STATES = ("idle", "working", "waiting", "reporting")
class BaseAgent:
agent_id: str = ""
display_name: str = ""
state: str = "idle"
state_detail: str = ""
_idle_since: float = 0.0
_ws_manager = None
def __init__(self):
self._idle_since = time.time()
def set_ws_manager(self, manager):
self._ws_manager = manager
async def transition(self, new_state: str, detail: str = "", task_id: str = None) -> None:
if new_state not in VALID_STATES:
return
old = self.state
self.state = new_state
self.state_detail = detail
if new_state == "idle":
self._idle_since = time.time()
add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})")
if self._ws_manager:
await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id)
if new_state == "working" and old != "working":
await self._ws_manager.send_notification(
self.agent_id, "task_assigned", task_id, detail or "새 작업 시작"
)
elif new_state == "idle" and old in ("working", "reporting"):
await self._ws_manager.send_notification(
self.agent_id, "task_completed", task_id, detail or "작업 완료"
)
async def on_schedule(self) -> None:
raise NotImplementedError
async def on_command(self, command: str, params: dict) -> dict:
raise NotImplementedError
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
raise NotImplementedError
async def get_status(self) -> dict:
return {
"agent_id": self.agent_id,
"display_name": self.display_name,
"state": self.state,
"detail": self.state_detail,
}