Agent State Machine Design
Chapter 57: Agent State Machine Design
An Agent without state management is like an amnesiac detective—waking up each time with no memory of what case it was investigating. The state machine is the skeleton of autonomous Agent operation, determining what the Agent "knows it's doing" at any moment.
57.1 Why Agents Need State Machines
Modern LLM Agents are far more complex than simple question-answer systems. They must maintain coherence across multiple tool calls, multi-turn conversations, and asynchronous tasks. Without explicit state management, Agents face several critical problems:
Problem 1: Unpredictable Behavior An Agent might restart planning mid-execution, or issue new tool calls while waiting for previous ones to return.
Problem 2: Non-isolated Errors A single tool failure causes the entire Agent to crash, rather than gracefully entering an error-handling flow.
Problem 3: Poor Debuggability Without state records, developers cannot reproduce issues or determine which phase the Agent was in when it failed.
The FSM Value Proposition: A Finite State Machine gives an Agent a formal behavioral framework—at any moment, the Agent exists in exactly one deterministic state, with transitions triggered by explicit conditions. This transforms Agent behavior from "black box magic" into "auditable process."
Hermes Agent State Requirements
Hermes Agent, as an autonomous execution framework, follows this typical task flow:
User Input → Understand Task → Create Plan → Execute Tools Stepwise → Process Results → Aggregate Output
This flow contains multiple critical nodes requiring state tracking:
- During planning, new user input should not be processed
- Before tool results return, the Agent should be in a waiting state
- When errors occur, it should enter error handling rather than continue execution
- Upon completion, resources should be released
57.2 FSM Core Concepts
An FSM consists of the following elements:
| Element | Description | Hermes Example |
|---|---|---|
| State | Current phase of the Agent | PLANNING, EXECUTING |
| Event | Condition triggering a state change | tool_result_received, error_occurred |
| Transition | Rule governing state changes | EXECUTING → WAITING (after tool call) |
| Action | Side effect on state entry/exit | Write to errors.log on entering ERROR |
| Guard | Precondition for a transition | Only enter EXECUTING if plan is non-empty |
FSM vs. Alternative Approaches
For Hermes Agent's predominantly sequential execution with occasional concurrency, FSM is the optimal choice—providing clear state boundaries, easy testability, and natural visualization without the complexity of statecharts or behavior trees.
57.3 State Definitions
The Six Core States
| State | Purpose | Entry Condition |
|---|---|---|
| IDLE | Awaiting input, minimal resource usage | Initialization / task reset |
| PLANNING | LLM analyzing task, generating execution plan | User input received |
| EXECUTING | Actively calling tools or running code | Plan generation complete |
| WAITING | Awaiting external response (tool/API/user) | Tool call dispatched |
| ERROR | Handling exception, logging failure | Tool failure / timeout / parse error |
| COMPLETED | Task done, preparing output and reset | All steps finished |
State Context Data Model
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
class AgentState(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
WAITING = "waiting"
ERROR = "error"
COMPLETED = "completed"
@dataclass
class StateContext:
"""State context: records key information about the current state"""
state: AgentState
entered_at: datetime = field(default_factory=datetime.now)
# PLANNING state data
task_description: Optional[str] = None
plan_steps: List[str] = field(default_factory=list)
# EXECUTING state data
current_step_index: int = 0
current_tool: Optional[str] = None
# WAITING state data
waiting_for: Optional[str] = None
timeout_seconds: int = 30
# ERROR state data
error_type: Optional[str] = None
error_message: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
# COMPLETED state data
result: Optional[Any] = None
total_tokens_used: int = 0
session_id: str = ""
def time_in_state(self) -> float:
return (datetime.now() - self.entered_at).total_seconds()
57.4 Transition Rules and Trigger Conditions
State Transition Diagram
user_input
┌─────────────────────────────┐
│ ▼
IDLE ──────────────────────► PLANNING
▲ │
│ plan_generated
│ ▼
task_reset EXECUTING ◄──────────┐
│ │ │ │
│ tool_called plan_revised │
│ ▼ │
└─────── COMPLETED WAITING ─────────────┘
▲ │ result_received
│ │
ERROR ◄───────┘
│ timeout / error_occurred
│
(retry or abort)
Transition Table
| From State | Event | Guard | To State | Action |
|---|---|---|---|---|
| IDLE | user_input | task not empty | PLANNING | Initialize context |
| PLANNING | plan_generated | plan not empty | EXECUTING | Log plan to agent.log |
| PLANNING | error_occurred | — | ERROR | Write to errors.log |
| EXECUTING | tool_called | tool registered | WAITING | Record tool call ID |
| EXECUTING | all_steps_done | — | COMPLETED | Compile results |
| EXECUTING | error_occurred | — | ERROR | Write to errors.log |
| WAITING | result_received | — | EXECUTING | Inject result into context |
| WAITING | timeout | retry < max | ERROR | Log timeout |
| ERROR | retry_triggered | retry < max | EXECUTING | Reset tool call |
| ERROR | abort_triggered | retry >= max | IDLE | Clean resources |
| COMPLETED | task_reset | — | IDLE | Clean session |
57.5 Python Implementation with the transitions Library
pip install transitions transitions[diagrams]
import logging
import json
from datetime import datetime
from transitions import Machine, State
agent_logger = logging.getLogger('agent')
error_logger = logging.getLogger('errors')
class HermesAgentFSM:
"""
Hermes Agent Finite State Machine
Implements state management using the transitions library
"""
STATES = [
State('idle', on_enter='on_enter_idle'),
State('planning', on_enter='on_enter_planning'),
State('executing', on_enter='on_enter_executing'),
State('waiting', on_enter='on_enter_waiting', on_exit='on_exit_waiting'),
State('error', on_enter='on_enter_error'),
State('completed', on_enter='on_enter_completed'),
]
TRANSITIONS = [
{'trigger': 'receive_task', 'source': 'idle', 'dest': 'planning',
'conditions': ['is_task_valid'], 'before': 'log_transition', 'after': 'initialize_context'},
{'trigger': 'plan_ready', 'source': 'planning', 'dest': 'executing',
'conditions': ['has_valid_plan'], 'before': 'log_transition'},
{'trigger': 'planning_failed', 'source': 'planning', 'dest': 'error',
'before': 'log_transition'},
{'trigger': 'call_tool', 'source': 'executing', 'dest': 'waiting',
'conditions': ['is_tool_registered'], 'before': ['log_transition', 'record_tool_call']},
{'trigger': 'finish_execution', 'source': 'executing', 'dest': 'completed',
'before': 'log_transition', 'after': 'compile_results'},
{'trigger': 'execution_error', 'source': 'executing', 'dest': 'error',
'before': 'log_transition'},
{'trigger': 'tool_result_received', 'source': 'waiting', 'dest': 'executing',
'before': ['log_transition', 'inject_result']},
{'trigger': 'needs_replan', 'source': 'waiting', 'dest': 'planning',
'before': 'log_transition'},
{'trigger': 'timeout', 'source': 'waiting', 'dest': 'error',
'before': 'log_transition'},
{'trigger': 'retry', 'source': 'error', 'dest': 'executing',
'conditions': ['can_retry'], 'before': ['log_transition', 'increment_retry']},
{'trigger': 'abort', 'source': 'error', 'dest': 'idle',
'before': 'log_transition', 'after': 'cleanup_resources'},
{'trigger': 'reset', 'source': 'completed', 'dest': 'idle',
'before': 'log_transition', 'after': 'cleanup_resources'},
]
def __init__(self, session_id: str, max_retries: int = 3):
self.session_id = session_id
self.max_retries = max_retries
self.retry_count = 0
self.task_description = None
self.plan_steps = []
self.current_step = 0
self.current_tool = None
self.last_result = None
self.final_result = None
self.registered_tools = set()
self.state_history = []
self.entered_at = datetime.now()
self.machine = Machine(
model=self,
states=self.STATES,
transitions=self.TRANSITIONS,
initial='idle',
auto_transitions=False,
send_event=True
)
# Guard conditions
def is_task_valid(self, event) -> bool:
task = event.kwargs.get('task', '')
return bool(task and task.strip())
def has_valid_plan(self, event) -> bool:
return len(self.plan_steps) > 0
def is_tool_registered(self, event) -> bool:
return event.kwargs.get('tool_name', '') in self.registered_tools
def can_retry(self, event) -> bool:
return self.retry_count < self.max_retries
# Action callbacks
def log_transition(self, event):
msg = {
'timestamp': datetime.now().isoformat(),
'session_id': self.session_id,
'trigger': event.event.name,
'from_state': event.transition.source,
'to_state': event.transition.dest,
}
agent_logger.info(json.dumps(msg))
self.state_history.append(msg)
def initialize_context(self, event):
self.task_description = event.kwargs.get('task')
self.retry_count = 0
self.entered_at = datetime.now()
def record_tool_call(self, event):
self.current_tool = event.kwargs.get('tool_name')
def inject_result(self, event):
self.last_result = event.kwargs.get('result')
def compile_results(self, event):
self.final_result = event.kwargs.get('result')
def increment_retry(self, event):
self.retry_count += 1
def cleanup_resources(self, event):
self.plan_steps = []
self.current_step = 0
self.current_tool = None
# State entry/exit callbacks
def on_enter_idle(self, event):
self.entered_at = datetime.now()
def on_enter_planning(self, event):
self.entered_at = datetime.now()
agent_logger.info(f"[{self.session_id}] Entering PLANNING state")
def on_enter_executing(self, event):
self.entered_at = datetime.now()
def on_enter_waiting(self, event):
self.entered_at = datetime.now()
agent_logger.info(f"[{self.session_id}] Waiting for: {self.current_tool}")
def on_exit_waiting(self, event):
duration = (datetime.now() - self.entered_at).total_seconds()
agent_logger.info(f"[{self.session_id}] Wait duration: {duration:.2f}s")
def on_enter_error(self, event):
error_msg = event.kwargs.get('error', 'Unknown error')
log_entry = {
'timestamp': datetime.now().isoformat(),
'session_id': self.session_id,
'state_when_error': event.transition.source,
'error': error_msg,
'retry_count': self.retry_count,
}
error_logger.error(json.dumps(log_entry))
def on_enter_completed(self, event):
agent_logger.info(f"[{self.session_id}] Task COMPLETED successfully")
def register_tool(self, tool_name: str):
self.registered_tools.add(tool_name)
def get_state_summary(self) -> dict:
return {
'session_id': self.session_id,
'current_state': self.state,
'retry_count': self.retry_count,
'total_transitions': len(self.state_history)
}
Usage Example
import asyncio
async def run_hermes_agent(task: str):
agent = HermesAgentFSM(session_id="sess_001", max_retries=3)
agent.register_tool("web_search")
agent.register_tool("code_executor")
# IDLE → PLANNING
agent.receive_task(task=task)
# Simulate LLM planning
await asyncio.sleep(0.1)
agent.plan_steps = ["Search sources", "Analyze data", "Generate report"]
agent.plan_ready()
# EXECUTING → WAITING → EXECUTING
agent.call_tool(tool_name="web_search", query="Hermes Agent FSM")
await asyncio.sleep(0.2)
agent.tool_result_received(result={"status": "ok", "data": "..."})
# EXECUTING → COMPLETED
agent.finish_execution(result="Report generated successfully")
# COMPLETED → IDLE
agent.reset()
print("Final state:", agent.state)
print("Summary:", agent.get_state_summary())
asyncio.run(run_hermes_agent("Analyze Hermes Agent architecture"))
57.6 Debugging Techniques
Technique 1: Visualize the State Graph
from transitions.extensions import GraphMachine
class DebugHermesAgentFSM(HermesAgentFSM):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.machine = GraphMachine(
model=self,
states=self.STATES,
transitions=self.TRANSITIONS,
initial='idle',
auto_transitions=False,
show_conditions=True
)
def export_diagram(self, filename: str = "agent_fsm.png"):
self.machine.get_graph().draw(filename, prog='dot')
print(f"State diagram saved to {filename}")
Technique 2: Session Replay from Logs
def replay_session(session_id: str, log_file: str = "agent.log"):
"""Replay Agent execution history from log file"""
import json
events = []
with open(log_file, 'r') as f:
for line in f:
try:
entry = json.loads(line)
if entry.get('session_id') == session_id:
events.append(entry)
except json.JSONDecodeError:
continue
print(f"\n=== Session {session_id} Transition History ===")
for i, event in enumerate(events):
print(f"[{i+1}] {event['timestamp']}")
print(f" {event['from_state']} --[{event['trigger']}]--> {event['to_state']}")
# Detect anomalous patterns
state_counts = {}
for event in events:
dest = event['to_state']
state_counts[dest] = state_counts.get(dest, 0) + 1
if state_counts.get('error', 0) > 2:
print(f"\nWARNING: {state_counts['error']} error states detected. Check errors.log")
if state_counts.get('waiting', 0) > 10:
print(f"\nWARNING: Excessive wait transitions ({state_counts['waiting']}). Tool timeout suspected.")
Technique 3: Unit Testing the State Machine
import pytest
class TestHermesAgentFSM:
def setup_method(self):
self.agent = HermesAgentFSM(session_id="test_001")
self.agent.register_tool("web_search")
def test_initial_state_is_idle(self):
assert self.agent.state == 'idle'
def test_valid_task_triggers_planning(self):
self.agent.receive_task(task="Analyze market data")
assert self.agent.state == 'planning'
def test_empty_task_is_rejected(self):
with pytest.raises(Exception):
self.agent.receive_task(task="")
def test_happy_path(self):
self.agent.receive_task(task="Test task")
self.agent.plan_steps = ["step1"]
self.agent.plan_ready()
self.agent.call_tool(tool_name="web_search")
self.agent.tool_result_received(result="ok")
self.agent.finish_execution(result="done")
assert self.agent.state == 'completed'
def test_retry_mechanism(self):
self.agent.receive_task(task="Test retry")
self.agent.plan_steps = ["step1"]
self.agent.plan_ready()
self.agent.execution_error(error="tool_failed")
assert self.agent.state == 'error'
self.agent.retry()
assert self.agent.state == 'executing'
assert self.agent.retry_count == 1
def test_max_retry_blocks_further_retry(self):
agent = HermesAgentFSM(session_id="test_002", max_retries=1)
agent.register_tool("web_search")
agent.receive_task(task="Test max retry")
agent.plan_steps = ["step1"]
agent.plan_ready()
agent.execution_error(error="failed")
agent.retry() # retry_count = 1 = max_retries
agent.execution_error(error="failed_again")
with pytest.raises(Exception):
agent.retry() # Should be blocked by can_retry guard
Chapter Summary
This chapter systematically covered Hermes Agent state machine design:
- Why FSM: Agents need state machines for predictable behavior, isolated error handling, and full auditability
- Six Core States: IDLE → PLANNING → EXECUTING → WAITING → ERROR/COMPLETED form the complete lifecycle
- Transition Rules: Every transition has explicit trigger events, guard conditions, and side-effect actions
- Python Implementation: The
transitionslibrary enables a fully functional FSM with logging, error handling, and retry logic - Debugging Tools: Visualization, session replay, timeout monitoring, and unit tests form a complete debugging toolkit
Review Questions
- When an Agent needs to handle multiple concurrent tasks simultaneously, how would you extend the FSM to support parallel states? (Hint: Research Statechart parallel regions)
- If a tool call fails but the Agent determines the step can be skipped, how would you design a "skip" state transition?
- In a distributed scenario where multiple Agent instances share one state machine, how do you ensure state consistency?
- How can you combine the state machine with Hermes Agent's MEMORY.md to achieve cross-session state persistence?