第 57 章
Agent 状态机设计
第57章:Agent 状态机设计
一个没有状态管理的 Agent,就像一个失忆的侦探——每次醒来都不知道自己在调查什么案子。状态机是 Agent 自主运行的骨架,决定了它在任何时刻"知道自己在做什么"。
57.1 为什么 Agent 需要状态机
现代 LLM Agent 并非单次问答的简单系统。它需要在多个工具调用、多轮对话、异步任务之间保持连贯性。没有明确的状态管理,Agent 会面临以下问题:
问题一:行为不可预测 Agent 可能在执行到一半时重新开始规划,或者在等待外部结果时继续发出新的工具调用。
问题二:错误无法隔离 单步工具失败会导致整个 Agent 崩溃,而非优雅地进入错误处理流程。
问题三:调试困难 没有状态记录,开发者无法复现问题,也无法判断 Agent 在崩溃前处于哪个阶段。
有限状态机(FSM)的价值:FSM 为 Agent 提供了一个正式的行为框架——在任意时刻,Agent 只处于一个确定的状态,状态之间的转换由明确的条件触发。这让 Agent 的行为从"黑盒魔法"变成"可审计的流程"。
Hermes Agent 的状态需求分析
Hermes Agent 作为自主执行框架,典型任务流程如下:
用户输入 → 理解任务 → 制定计划 → 逐步执行工具 → 处理结果 → 汇总输出
这个流程中存在多个需要状态追踪的关键节点:
- 正在规划时,不应响应新的用户输入
- 工具调用返回前,应处于等待状态
- 错误发生时,应进入错误处理而非继续执行
- 完成后,资源应被释放
57.2 有限状态机(FSM)核心概念
FSM 由以下要素构成:
| 要素 | 说明 | Hermes 中的例子 |
|---|---|---|
| 状态(State) | Agent 当前所处的阶段 | PLANNING, EXECUTING |
| 事件(Event) | 触发状态转换的条件 | tool_result_received, error_occurred |
| 转换(Transition) | 从一个状态到另一个状态的规则 | EXECUTING → WAITING(工具调用后) |
| 动作(Action) | 进入/退出状态时执行的副作用 | 进入 ERROR 时写入 errors.log |
| 守卫(Guard) | 转换发生的前置条件 | 只有 plan 非空才能进入 EXECUTING |
FSM vs 其他状态管理方案
方案对比:
FSM(有限状态机)
优点:状态明确、易于测试、可视化
缺点:状态数量多时图谱复杂
状态图(Statechart / SCXML)
优点:支持嵌套状态、并发状态
缺点:实现复杂
行为树(Behavior Tree)
优点:模块化强、游戏 AI 常用
缺点:状态共享困难
Petri Net
优点:适合并发建模
缺点:学习曲线陡峭
对于 Hermes Agent 这类顺序执行为主、偶有并发的场景,FSM 是最佳选择。
57.3 Hermes Agent 状态定义
六大核心状态
┌─────────────────────────────────────────────────────┐
│ IDLE(空闲) │
│ Agent 处于等待状态,资源占用最低 │
│ 触发条件:初始化完成 / 任务完成后重置 │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ PLANNING(规划) │
│ LLM 正在分析任务,生成执行计划 │
│ 触发条件:收到用户输入 / 子任务被创建 │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ EXECUTING(执行) │
│ 正在调用工具或执行代码 │
│ 触发条件:计划生成完毕,开始第一步 │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ WAITING(等待) │
│ 等待外部系统返回(工具结果/API响应/用户确认) │
│ 触发条件:工具调用已发出,等待响应 │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ ERROR(错误) │
│ 检测到异常,进行错误处理和日志记录 │
│ 触发条件:工具失败 / 超时 / 解析错误 │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ COMPLETED(完成) │
│ 任务已完成,准备输出结果并重置 │
│ 触发条件:所有子任务完成 / 达成目标 │
└─────────────────────────────────────────────────────┘
状态属性设计
每个状态应携带必要的上下文信息:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
class AgentState(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
WAITING = "waiting"
ERROR = "error"
COMPLETED = "completed"
@dataclass
class StateContext:
"""状态上下文:记录当前状态的关键信息"""
state: AgentState
entered_at: datetime = field(default_factory=datetime.now)
# PLANNING 状态数据
task_description: Optional[str] = None
plan_steps: List[str] = field(default_factory=list)
# EXECUTING 状态数据
current_step_index: int = 0
current_tool: Optional[str] = None
tool_call_id: Optional[str] = None
# WAITING 状态数据
waiting_for: Optional[str] = None
timeout_seconds: int = 30
# ERROR 状态数据
error_type: Optional[str] = None
error_message: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
# COMPLETED 状态数据
result: Optional[Any] = None
total_tokens_used: int = 0
# 通用元数据
session_id: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
def time_in_state(self) -> float:
"""计算在当前状态停留的秒数"""
return (datetime.now() - self.entered_at).total_seconds()
57.4 状态转换规则与触发条件
完整状态转换图
user_input
┌─────────────────────────────┐
│ ▼
IDLE ──────────────────────► PLANNING
▲ │
│ plan_generated
│ ▼
task_reset EXECUTING ◄─────────────┐
│ │ │ │
│ tool_called plan_revised │
│ ▼ │ │
└─────── COMPLETED WAITING │ next_step │
▲ │ │ │
│ result_received │
│ │ PLANNING
│ └──────────────────► │
│ │
ERROR ◄──────────────────────────┘
│ error_occurred
│
(retry or abort)
转换规则表
| 当前状态 | 事件 | 守卫条件 | 下一状态 | 动作 |
|---|---|---|---|---|
| IDLE | user_input | task 非空 | PLANNING | 初始化上下文 |
| PLANNING | plan_generated | plan 非空 | EXECUTING | 记录计划到 agent.log |
| PLANNING | error_occurred | - | ERROR | 写入 errors.log |
| EXECUTING | tool_called | tool 已注册 | WAITING | 记录工具调用 ID |
| EXECUTING | all_steps_done | - | COMPLETED | 汇总结果 |
| EXECUTING | error_occurred | - | ERROR | 写入 errors.log |
| WAITING | result_received | - | EXECUTING | 注入结果到上下文 |
| WAITING | timeout | retry < max | ERROR | 记录超时 |
| WAITING | user_clarification | - | PLANNING | 重新规划 |
| ERROR | retry_triggered | retry < max | EXECUTING | 重置工具调用 |
| ERROR | abort_triggered | retry >= max | IDLE | 清理资源 |
| COMPLETED | task_reset | - | IDLE | 清理会话 |
57.5 Python 实现:使用 transitions 库
安装依赖
pip install transitions transitions[diagrams]
完整实现
import logging
import json
from datetime import datetime
from typing import Optional, Callable, Any
from transitions import Machine, State
from transitions.extensions import GraphMachine
# 配置日志
agent_logger = logging.getLogger('agent')
error_logger = logging.getLogger('errors')
class HermesAgentFSM:
"""
Hermes Agent 有限状态机
使用 transitions 库实现状态管理
"""
# 定义所有状态
STATES = [
State('idle', on_enter='on_enter_idle', on_exit='on_exit_idle'),
State('planning', on_enter='on_enter_planning', on_exit='on_exit_planning'),
State('executing', on_enter='on_enter_executing', on_exit='on_exit_executing'),
State('waiting', on_enter='on_enter_waiting', on_exit='on_exit_waiting'),
State('error', on_enter='on_enter_error', on_exit='on_exit_error'),
State('completed', on_enter='on_enter_completed', on_exit='on_exit_completed'),
]
# 定义状态转换规则
TRANSITIONS = [
# IDLE → PLANNING
{
'trigger': 'receive_task',
'source': 'idle',
'dest': 'planning',
'conditions': ['is_task_valid'],
'before': 'log_transition',
'after': 'initialize_context'
},
# PLANNING → EXECUTING
{
'trigger': 'plan_ready',
'source': 'planning',
'dest': 'executing',
'conditions': ['has_valid_plan'],
'before': 'log_transition'
},
# PLANNING → ERROR
{
'trigger': 'planning_failed',
'source': 'planning',
'dest': 'error',
'before': 'log_transition'
},
# EXECUTING → WAITING
{
'trigger': 'call_tool',
'source': 'executing',
'dest': 'waiting',
'conditions': ['is_tool_registered'],
'before': ['log_transition', 'record_tool_call']
},
# EXECUTING → COMPLETED
{
'trigger': 'finish_execution',
'source': 'executing',
'dest': 'completed',
'before': 'log_transition',
'after': 'compile_results'
},
# EXECUTING → ERROR
{
'trigger': 'execution_error',
'source': 'executing',
'dest': 'error',
'before': 'log_transition'
},
# WAITING → EXECUTING
{
'trigger': 'tool_result_received',
'source': 'waiting',
'dest': 'executing',
'before': ['log_transition', 'inject_result']
},
# WAITING → PLANNING(需要重新规划)
{
'trigger': 'needs_replan',
'source': 'waiting',
'dest': 'planning',
'before': 'log_transition'
},
# WAITING → ERROR(超时)
{
'trigger': 'timeout',
'source': 'waiting',
'dest': 'error',
'before': 'log_transition'
},
# ERROR → EXECUTING(重试)
{
'trigger': 'retry',
'source': 'error',
'dest': 'executing',
'conditions': ['can_retry'],
'before': ['log_transition', 'increment_retry']
},
# ERROR → IDLE(放弃)
{
'trigger': 'abort',
'source': 'error',
'dest': 'idle',
'before': 'log_transition',
'after': 'cleanup_resources'
},
# COMPLETED → IDLE
{
'trigger': 'reset',
'source': 'completed',
'dest': 'idle',
'before': 'log_transition',
'after': 'cleanup_resources'
},
]
def __init__(self, session_id: str, max_retries: int = 3):
self.session_id = session_id
self.max_retries = max_retries
self.retry_count = 0
self.task_description = None
self.plan_steps = []
self.current_step = 0
self.current_tool = None
self.last_result = None
self.final_result = None
self.registered_tools = set()
self.state_history = []
self.entered_at = datetime.now()
# 初始化状态机
self.machine = Machine(
model=self,
states=self.STATES,
transitions=self.TRANSITIONS,
initial='idle',
auto_transitions=False,
send_event=True # 将 EventData 传给回调
)
# ==================== 守卫条件 ====================
def is_task_valid(self, event) -> bool:
task = event.kwargs.get('task', '')
return bool(task and task.strip())
def has_valid_plan(self, event) -> bool:
return len(self.plan_steps) > 0
def is_tool_registered(self, event) -> bool:
tool_name = event.kwargs.get('tool_name', '')
return tool_name in self.registered_tools
def can_retry(self, event) -> bool:
return self.retry_count < self.max_retries
# ==================== 动作回调 ====================
def log_transition(self, event):
"""记录所有状态转换到 agent.log"""
msg = {
'timestamp': datetime.now().isoformat(),
'session_id': self.session_id,
'trigger': event.event.name,
'from_state': event.transition.source,
'to_state': event.transition.dest,
'kwargs': event.kwargs
}
agent_logger.info(json.dumps(msg, ensure_ascii=False))
self.state_history.append(msg)
def initialize_context(self, event):
self.task_description = event.kwargs.get('task')
self.retry_count = 0
self.entered_at = datetime.now()
agent_logger.info(f"[{self.session_id}] Task initialized: {self.task_description[:100]}")
def record_tool_call(self, event):
self.current_tool = event.kwargs.get('tool_name')
agent_logger.info(f"[{self.session_id}] Tool called: {self.current_tool}")
def inject_result(self, event):
self.last_result = event.kwargs.get('result')
agent_logger.info(f"[{self.session_id}] Tool result received for: {self.current_tool}")
def compile_results(self, event):
self.final_result = event.kwargs.get('result')
agent_logger.info(f"[{self.session_id}] Task completed. Result compiled.")
def increment_retry(self, event):
self.retry_count += 1
agent_logger.warning(f"[{self.session_id}] Retry attempt {self.retry_count}/{self.max_retries}")
def cleanup_resources(self, event):
self.plan_steps = []
self.current_step = 0
self.current_tool = None
agent_logger.info(f"[{self.session_id}] Resources cleaned up.")
# ==================== 状态进入/退出回调 ====================
def on_enter_idle(self, event):
self.entered_at = datetime.now()
def on_exit_idle(self, event):
duration = (datetime.now() - self.entered_at).total_seconds()
agent_logger.debug(f"[{self.session_id}] Left IDLE after {duration:.2f}s")
def on_enter_planning(self, event):
self.entered_at = datetime.now()
agent_logger.info(f"[{self.session_id}] Entering PLANNING state")
def on_exit_planning(self, event):
pass
def on_enter_executing(self, event):
self.entered_at = datetime.now()
def on_exit_executing(self, event):
pass
def on_enter_waiting(self, event):
self.entered_at = datetime.now()
agent_logger.info(f"[{self.session_id}] Waiting for tool: {self.current_tool}")
def on_exit_waiting(self, event):
duration = (datetime.now() - self.entered_at).total_seconds()
agent_logger.info(f"[{self.session_id}] Wait duration: {duration:.2f}s")
def on_enter_error(self, event):
error_msg = event.kwargs.get('error', 'Unknown error')
log_entry = {
'timestamp': datetime.now().isoformat(),
'session_id': self.session_id,
'state_when_error': event.transition.source,
'error': error_msg,
'retry_count': self.retry_count,
'current_tool': self.current_tool
}
error_logger.error(json.dumps(log_entry, ensure_ascii=False))
def on_exit_error(self, event):
pass
def on_enter_completed(self, event):
agent_logger.info(f"[{self.session_id}] Task COMPLETED successfully")
def on_exit_completed(self, event):
pass
# ==================== 工具注册 ====================
def register_tool(self, tool_name: str):
self.registered_tools.add(tool_name)
def get_state_summary(self) -> dict:
return {
'session_id': self.session_id,
'current_state': self.state,
'retry_count': self.retry_count,
'current_step': self.current_step,
'total_steps': len(self.plan_steps),
'state_history_count': len(self.state_history)
}
使用示例
import asyncio
import logging
# 配置日志输出
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logging.getLogger('agent').setLevel(logging.INFO)
logging.getLogger('errors').setLevel(logging.ERROR)
async def run_hermes_agent(task: str):
"""模拟 Hermes Agent 完整执行流程"""
agent = HermesAgentFSM(session_id="sess_001", max_retries=3)
agent.register_tool("web_search")
agent.register_tool("code_executor")
agent.register_tool("file_writer")
print(f"初始状态: {agent.state}")
# 1. 接收任务 → PLANNING
agent.receive_task(task=task)
print(f"收到任务后: {agent.state}")
# 2. 模拟 LLM 生成计划
await asyncio.sleep(0.1) # 模拟 LLM 延迟
agent.plan_steps = ["搜索相关资料", "分析结果", "生成报告"]
agent.plan_ready()
print(f"规划完成后: {agent.state}")
# 3. 执行第一步:调用工具
agent.call_tool(tool_name="web_search", query="Hermes Agent FSM")
print(f"调用工具后: {agent.state}")
# 4. 等待工具结果
await asyncio.sleep(0.2) # 模拟工具执行时间
agent.tool_result_received(result={"found": True, "data": "..."})
print(f"收到结果后: {agent.state}")
# 5. 完成执行
agent.finish_execution(result="任务完成,报告已生成")
print(f"执行完成后: {agent.state}")
# 6. 重置状态
agent.reset()
print(f"重置后: {agent.state}")
print("\n状态摘要:", agent.get_state_summary())
# 运行
asyncio.run(run_hermes_agent("分析 Hermes Agent 的架构特点"))
57.6 状态机调试技巧
技巧一:可视化状态图
from transitions.extensions import GraphMachine
class DebugHermesAgentFSM(HermesAgentFSM):
"""支持可视化输出的调试版本"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 替换为 GraphMachine 以支持图形输出
self.machine = GraphMachine(
model=self,
states=self.STATES,
transitions=self.TRANSITIONS,
initial='idle',
auto_transitions=False,
show_conditions=True,
show_state_attributes=True
)
def export_diagram(self, filename: str = "agent_fsm.png"):
self.machine.get_graph().draw(filename, prog='dot')
print(f"状态图已保存到 {filename}")
# 使用
debug_agent = DebugHermesAgentFSM(session_id="debug_001")
debug_agent.export_diagram("hermes_fsm.png")
技巧二:状态历史回放
def replay_session(session_id: str, log_file: str = "agent.log"):
"""从日志文件重放 Agent 执行历史"""
import json
events = []
with open(log_file, 'r') as f:
for line in f:
try:
entry = json.loads(line)
if entry.get('session_id') == session_id:
events.append(entry)
except json.JSONDecodeError:
continue
print(f"\n=== Session {session_id} 状态转换历史 ===")
for i, event in enumerate(events):
print(f"[{i+1}] {event['timestamp']}")
print(f" {event['from_state']} --[{event['trigger']}]--> {event['to_state']}")
if event.get('kwargs'):
print(f" 参数: {event['kwargs']}")
# 检测异常模式
state_counts = {}
for event in events:
dest = event['to_state']
state_counts[dest] = state_counts.get(dest, 0) + 1
if state_counts.get('error', 0) > 2:
print(f"\n⚠️ 警告:检测到 {state_counts['error']} 次错误状态,请检查 errors.log")
if state_counts.get('waiting', 0) > 10:
print(f"\n⚠️ 警告:等待次数过多({state_counts['waiting']}),可能存在工具超时问题")
技巧三:状态超时监控
import asyncio
from contextlib import asynccontextmanager
class TimeoutMonitor:
"""监控 Agent 在各状态的停留时间"""
STATE_TIMEOUTS = {
'planning': 30, # 规划最多 30 秒
'executing': 60, # 执行最多 60 秒
'waiting': 45, # 等待最多 45 秒
}
def __init__(self, agent: HermesAgentFSM):
self.agent = agent
self._monitoring = False
async def monitor(self):
self._monitoring = True
while self._monitoring:
await asyncio.sleep(5) # 每5秒检查一次
state = self.agent.state
timeout = self.STATE_TIMEOUTS.get(state)
if timeout:
time_in_state = (datetime.now() - self.agent.entered_at).total_seconds()
if time_in_state > timeout:
error_logger.warning(
f"State timeout: {state} has been active for "
f"{time_in_state:.1f}s (limit: {timeout}s)"
)
# 触发超时转换
if state == 'waiting':
self.agent.timeout(error=f"Tool timeout after {timeout}s")
def stop(self):
self._monitoring = False
技巧四:单元测试状态机
import pytest
class TestHermesAgentFSM:
def setup_method(self):
self.agent = HermesAgentFSM(session_id="test_001")
self.agent.register_tool("web_search")
def test_initial_state(self):
assert self.agent.state == 'idle'
def test_valid_task_triggers_planning(self):
self.agent.receive_task(task="分析市场数据")
assert self.agent.state == 'planning'
def test_empty_task_rejected(self):
with pytest.raises(Exception):
self.agent.receive_task(task="")
def test_full_happy_path(self):
self.agent.receive_task(task="执行测试任务")
self.agent.plan_steps = ["step1"]
self.agent.plan_ready()
self.agent.call_tool(tool_name="web_search")
self.agent.tool_result_received(result="ok")
self.agent.finish_execution(result="done")
assert self.agent.state == 'completed'
def test_retry_mechanism(self):
self.agent.receive_task(task="测试重试")
self.agent.plan_steps = ["step1"]
self.agent.plan_ready()
self.agent.execution_error(error="tool_failed")
assert self.agent.state == 'error'
assert self.agent.retry_count == 0
self.agent.retry()
assert self.agent.state == 'executing'
assert self.agent.retry_count == 1
def test_max_retry_exceeded(self):
agent = HermesAgentFSM(session_id="test_002", max_retries=1)
agent.register_tool("web_search")
agent.receive_task(task="测试最大重试")
agent.plan_steps = ["step1"]
agent.plan_ready()
agent.execution_error(error="tool_failed")
agent.retry() # retry_count = 1,达到上限
agent.execution_error(error="tool_failed_again")
# 现在不能再重试
with pytest.raises(Exception):
agent.retry()
assert agent.state == 'error'
本章小结
本章系统介绍了 Hermes Agent 的状态机设计:
- 为何需要 FSM:Agent 需要状态机来保证行为可预测、错误可隔离、过程可审计
- 六大状态:IDLE → PLANNING → EXECUTING → WAITING → ERROR/COMPLETED 构成完整生命周期
- 转换规则:每个转换都有明确的触发事件、守卫条件和副作用动作
- Python 实现:使用
transitions库实现了功能完整的 FSM,包含日志记录、错误处理和重试机制 - 调试工具:可视化图表、历史回放、超时监控和单元测试构成完整调试工具链
思考题
- 当 Agent 需要同时处理多个并发任务时,如何扩展 FSM 支持并发状态?(提示:研究 Statechart 的并行区域)
- 如果工具调用失败但 Agent 判断该步骤可跳过,如何设计"跳过"状态转换?
- 在分布式场景下,多个 Agent 实例共享一个状态机时,如何保证状态一致性?
- 如何将状态机与 Hermes Agent 的 MEMORY.md 结合,实现跨会话的状态持久化?