import asyncio import random import time from typing import Optional from ..config import IDLE_BREAK_THRESHOLD, BREAK_DURATION_MIN, BREAK_DURATION_MAX from ..db import add_log VALID_STATES = ("idle", "working", "waiting", "reporting", "break") class BaseAgent: agent_id: str = "" display_name: str = "" state: str = "idle" state_detail: str = "" _idle_since: float = 0.0 _break_until: float = 0.0 _ws_manager = None def __init__(self): self._idle_since = time.time() def set_ws_manager(self, manager): self._ws_manager = manager async def transition(self, new_state: str, detail: str = "", task_id: str = None) -> None: if new_state not in VALID_STATES: return old = self.state self.state = new_state self.state_detail = detail if new_state == "idle": self._idle_since = time.time() elif new_state == "break": duration = random.randint(BREAK_DURATION_MIN, BREAK_DURATION_MAX) self._break_until = time.time() + duration add_log(self.agent_id, f"State: {old} -> {new_state} ({detail})") if self._ws_manager: await self._ws_manager.send_agent_state(self.agent_id, new_state, detail, task_id) if new_state == "working" and old != "working": await self._ws_manager.send_notification( self.agent_id, "task_assigned", task_id, detail or "새 작업 시작" ) elif new_state == "idle" and old in ("working", "reporting"): await self._ws_manager.send_notification( self.agent_id, "task_completed", task_id, detail or "작업 완료" ) if new_state == "break": await self._ws_manager.send_agent_move(self.agent_id, "break_room") elif old == "break" and new_state == "idle": await self._ws_manager.send_agent_move(self.agent_id, "desk") async def check_idle_break(self) -> None: now = time.time() if self.state == "idle" and (now - self._idle_since) > IDLE_BREAK_THRESHOLD: if random.random() < 0.5: break_type = random.choice(["커피 타임", "잠깐 산책", "졸고 있음"]) await self.transition("break", break_type) elif self.state == "break" and now > self._break_until: await self.transition("idle", "휴식 완료") async def on_schedule(self) -> None: raise NotImplementedError async def on_command(self, command: str, params: dict) -> dict: raise NotImplementedError async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: raise NotImplementedError async def get_status(self) -> dict: return { "agent_id": self.agent_id, "display_name": self.display_name, "state": self.state, "detail": self.state_detail, }