第 57 章

Agent 状态机设计

第57章:Agent 状态机设计

一个没有状态管理的 Agent,就像一个失忆的侦探——每次醒来都不知道自己在调查什么案子。状态机是 Agent 自主运行的骨架,决定了它在任何时刻"知道自己在做什么"。


57.1 为什么 Agent 需要状态机

现代 LLM Agent 并非单次问答的简单系统。它需要在多个工具调用、多轮对话、异步任务之间保持连贯性。没有明确的状态管理,Agent 会面临以下问题:

问题一:行为不可预测 Agent 可能在执行到一半时重新开始规划,或者在等待外部结果时继续发出新的工具调用。

问题二:错误无法隔离 单步工具失败会导致整个 Agent 崩溃,而非优雅地进入错误处理流程。

问题三:调试困难 没有状态记录,开发者无法复现问题,也无法判断 Agent 在崩溃前处于哪个阶段。

有限状态机(FSM)的价值:FSM 为 Agent 提供了一个正式的行为框架——在任意时刻,Agent 只处于一个确定的状态,状态之间的转换由明确的条件触发。这让 Agent 的行为从"黑盒魔法"变成"可审计的流程"。

Hermes Agent 的状态需求分析

Hermes Agent 作为自主执行框架,典型任务流程如下:

用户输入 → 理解任务 → 制定计划 → 逐步执行工具 → 处理结果 → 汇总输出

这个流程中存在多个需要状态追踪的关键节点:


57.2 有限状态机(FSM)核心概念

FSM 由以下要素构成:

要素 说明 Hermes 中的例子
状态(State) Agent 当前所处的阶段 PLANNING, EXECUTING
事件(Event) 触发状态转换的条件 tool_result_received, error_occurred
转换(Transition) 从一个状态到另一个状态的规则 EXECUTING → WAITING(工具调用后)
动作(Action) 进入/退出状态时执行的副作用 进入 ERROR 时写入 errors.log
守卫(Guard) 转换发生的前置条件 只有 plan 非空才能进入 EXECUTING

FSM vs 其他状态管理方案

方案对比:

FSM(有限状态机)
  优点:状态明确、易于测试、可视化
  缺点:状态数量多时图谱复杂

状态图(Statechart / SCXML)
  优点:支持嵌套状态、并发状态
  缺点:实现复杂

行为树(Behavior Tree)
  优点:模块化强、游戏 AI 常用
  缺点:状态共享困难

Petri Net
  优点:适合并发建模
  缺点:学习曲线陡峭

对于 Hermes Agent 这类顺序执行为主、偶有并发的场景,FSM 是最佳选择


57.3 Hermes Agent 状态定义

六大核心状态

┌─────────────────────────────────────────────────────┐
│                  IDLE(空闲)                         │
│  Agent 处于等待状态,资源占用最低                      │
│  触发条件:初始化完成 / 任务完成后重置                  │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                 PLANNING(规划)                       │
│  LLM 正在分析任务,生成执行计划                        │
│  触发条件:收到用户输入 / 子任务被创建                  │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                EXECUTING(执行)                       │
│  正在调用工具或执行代码                                │
│  触发条件:计划生成完毕,开始第一步                    │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                 WAITING(等待)                        │
│  等待外部系统返回(工具结果/API响应/用户确认)          │
│  触发条件:工具调用已发出,等待响应                    │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                  ERROR(错误)                         │
│  检测到异常,进行错误处理和日志记录                    │
│  触发条件:工具失败 / 超时 / 解析错误                  │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                COMPLETED(完成)                       │
│  任务已完成,准备输出结果并重置                        │
│  触发条件:所有子任务完成 / 达成目标                   │
└─────────────────────────────────────────────────────┘

状态属性设计

每个状态应携带必要的上下文信息:

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum

class AgentState(Enum):
    IDLE = "idle"
    PLANNING = "planning"
    EXECUTING = "executing"
    WAITING = "waiting"
    ERROR = "error"
    COMPLETED = "completed"

@dataclass
class StateContext:
    """状态上下文:记录当前状态的关键信息"""
    state: AgentState
    entered_at: datetime = field(default_factory=datetime.now)
    
    # PLANNING 状态数据
    task_description: Optional[str] = None
    plan_steps: List[str] = field(default_factory=list)
    
    # EXECUTING 状态数据
    current_step_index: int = 0
    current_tool: Optional[str] = None
    tool_call_id: Optional[str] = None
    
    # WAITING 状态数据
    waiting_for: Optional[str] = None
    timeout_seconds: int = 30
    
    # ERROR 状态数据
    error_type: Optional[str] = None
    error_message: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3
    
    # COMPLETED 状态数据
    result: Optional[Any] = None
    total_tokens_used: int = 0
    
    # 通用元数据
    session_id: str = ""
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def time_in_state(self) -> float:
        """计算在当前状态停留的秒数"""
        return (datetime.now() - self.entered_at).total_seconds()

57.4 状态转换规则与触发条件

完整状态转换图

                    user_input
          ┌─────────────────────────────┐
          │                             ▼
        IDLE ──────────────────────► PLANNING
          ▲                             │
          │                    plan_generated
          │                             ▼
     task_reset               EXECUTING ◄─────────────┐
          │                       │    │               │
          │              tool_called   plan_revised    │
          │                       ▼    │               │
          └─────── COMPLETED   WAITING │    next_step  │
                       ▲          │    │               │
                       │  result_received              │
                       │          │               PLANNING
                       │          └──────────────────► │
                       │                               │
                    ERROR ◄──────────────────────────┘
                       │    error_occurred
                       │
                    (retry or abort)

转换规则表

当前状态 事件 守卫条件 下一状态 动作
IDLE user_input task 非空 PLANNING 初始化上下文
PLANNING plan_generated plan 非空 EXECUTING 记录计划到 agent.log
PLANNING error_occurred - ERROR 写入 errors.log
EXECUTING tool_called tool 已注册 WAITING 记录工具调用 ID
EXECUTING all_steps_done - COMPLETED 汇总结果
EXECUTING error_occurred - ERROR 写入 errors.log
WAITING result_received - EXECUTING 注入结果到上下文
WAITING timeout retry < max ERROR 记录超时
WAITING user_clarification - PLANNING 重新规划
ERROR retry_triggered retry < max EXECUTING 重置工具调用
ERROR abort_triggered retry >= max IDLE 清理资源
COMPLETED task_reset - IDLE 清理会话

57.5 Python 实现:使用 transitions 库

安装依赖

pip install transitions transitions[diagrams]

完整实现

import logging
import json
from datetime import datetime
from typing import Optional, Callable, Any
from transitions import Machine, State
from transitions.extensions import GraphMachine

# 配置日志
agent_logger = logging.getLogger('agent')
error_logger = logging.getLogger('errors')

class HermesAgentFSM:
    """
    Hermes Agent 有限状态机
    使用 transitions 库实现状态管理
    """
    
    # 定义所有状态
    STATES = [
        State('idle', on_enter='on_enter_idle', on_exit='on_exit_idle'),
        State('planning', on_enter='on_enter_planning', on_exit='on_exit_planning'),
        State('executing', on_enter='on_enter_executing', on_exit='on_exit_executing'),
        State('waiting', on_enter='on_enter_waiting', on_exit='on_exit_waiting'),
        State('error', on_enter='on_enter_error', on_exit='on_exit_error'),
        State('completed', on_enter='on_enter_completed', on_exit='on_exit_completed'),
    ]
    
    # 定义状态转换规则
    TRANSITIONS = [
        # IDLE → PLANNING
        {
            'trigger': 'receive_task',
            'source': 'idle',
            'dest': 'planning',
            'conditions': ['is_task_valid'],
            'before': 'log_transition',
            'after': 'initialize_context'
        },
        # PLANNING → EXECUTING
        {
            'trigger': 'plan_ready',
            'source': 'planning',
            'dest': 'executing',
            'conditions': ['has_valid_plan'],
            'before': 'log_transition'
        },
        # PLANNING → ERROR
        {
            'trigger': 'planning_failed',
            'source': 'planning',
            'dest': 'error',
            'before': 'log_transition'
        },
        # EXECUTING → WAITING
        {
            'trigger': 'call_tool',
            'source': 'executing',
            'dest': 'waiting',
            'conditions': ['is_tool_registered'],
            'before': ['log_transition', 'record_tool_call']
        },
        # EXECUTING → COMPLETED
        {
            'trigger': 'finish_execution',
            'source': 'executing',
            'dest': 'completed',
            'before': 'log_transition',
            'after': 'compile_results'
        },
        # EXECUTING → ERROR
        {
            'trigger': 'execution_error',
            'source': 'executing',
            'dest': 'error',
            'before': 'log_transition'
        },
        # WAITING → EXECUTING
        {
            'trigger': 'tool_result_received',
            'source': 'waiting',
            'dest': 'executing',
            'before': ['log_transition', 'inject_result']
        },
        # WAITING → PLANNING(需要重新规划)
        {
            'trigger': 'needs_replan',
            'source': 'waiting',
            'dest': 'planning',
            'before': 'log_transition'
        },
        # WAITING → ERROR(超时)
        {
            'trigger': 'timeout',
            'source': 'waiting',
            'dest': 'error',
            'before': 'log_transition'
        },
        # ERROR → EXECUTING(重试)
        {
            'trigger': 'retry',
            'source': 'error',
            'dest': 'executing',
            'conditions': ['can_retry'],
            'before': ['log_transition', 'increment_retry']
        },
        # ERROR → IDLE(放弃)
        {
            'trigger': 'abort',
            'source': 'error',
            'dest': 'idle',
            'before': 'log_transition',
            'after': 'cleanup_resources'
        },
        # COMPLETED → IDLE
        {
            'trigger': 'reset',
            'source': 'completed',
            'dest': 'idle',
            'before': 'log_transition',
            'after': 'cleanup_resources'
        },
    ]
    
    def __init__(self, session_id: str, max_retries: int = 3):
        self.session_id = session_id
        self.max_retries = max_retries
        self.retry_count = 0
        self.task_description = None
        self.plan_steps = []
        self.current_step = 0
        self.current_tool = None
        self.last_result = None
        self.final_result = None
        self.registered_tools = set()
        self.state_history = []
        self.entered_at = datetime.now()
        
        # 初始化状态机
        self.machine = Machine(
            model=self,
            states=self.STATES,
            transitions=self.TRANSITIONS,
            initial='idle',
            auto_transitions=False,
            send_event=True  # 将 EventData 传给回调
        )
    
    # ==================== 守卫条件 ====================
    
    def is_task_valid(self, event) -> bool:
        task = event.kwargs.get('task', '')
        return bool(task and task.strip())
    
    def has_valid_plan(self, event) -> bool:
        return len(self.plan_steps) > 0
    
    def is_tool_registered(self, event) -> bool:
        tool_name = event.kwargs.get('tool_name', '')
        return tool_name in self.registered_tools
    
    def can_retry(self, event) -> bool:
        return self.retry_count < self.max_retries
    
    # ==================== 动作回调 ====================
    
    def log_transition(self, event):
        """记录所有状态转换到 agent.log"""
        msg = {
            'timestamp': datetime.now().isoformat(),
            'session_id': self.session_id,
            'trigger': event.event.name,
            'from_state': event.transition.source,
            'to_state': event.transition.dest,
            'kwargs': event.kwargs
        }
        agent_logger.info(json.dumps(msg, ensure_ascii=False))
        self.state_history.append(msg)
    
    def initialize_context(self, event):
        self.task_description = event.kwargs.get('task')
        self.retry_count = 0
        self.entered_at = datetime.now()
        agent_logger.info(f"[{self.session_id}] Task initialized: {self.task_description[:100]}")
    
    def record_tool_call(self, event):
        self.current_tool = event.kwargs.get('tool_name')
        agent_logger.info(f"[{self.session_id}] Tool called: {self.current_tool}")
    
    def inject_result(self, event):
        self.last_result = event.kwargs.get('result')
        agent_logger.info(f"[{self.session_id}] Tool result received for: {self.current_tool}")
    
    def compile_results(self, event):
        self.final_result = event.kwargs.get('result')
        agent_logger.info(f"[{self.session_id}] Task completed. Result compiled.")
    
    def increment_retry(self, event):
        self.retry_count += 1
        agent_logger.warning(f"[{self.session_id}] Retry attempt {self.retry_count}/{self.max_retries}")
    
    def cleanup_resources(self, event):
        self.plan_steps = []
        self.current_step = 0
        self.current_tool = None
        agent_logger.info(f"[{self.session_id}] Resources cleaned up.")
    
    # ==================== 状态进入/退出回调 ====================
    
    def on_enter_idle(self, event):
        self.entered_at = datetime.now()
    
    def on_exit_idle(self, event):
        duration = (datetime.now() - self.entered_at).total_seconds()
        agent_logger.debug(f"[{self.session_id}] Left IDLE after {duration:.2f}s")
    
    def on_enter_planning(self, event):
        self.entered_at = datetime.now()
        agent_logger.info(f"[{self.session_id}] Entering PLANNING state")
    
    def on_exit_planning(self, event):
        pass
    
    def on_enter_executing(self, event):
        self.entered_at = datetime.now()
    
    def on_exit_executing(self, event):
        pass
    
    def on_enter_waiting(self, event):
        self.entered_at = datetime.now()
        agent_logger.info(f"[{self.session_id}] Waiting for tool: {self.current_tool}")
    
    def on_exit_waiting(self, event):
        duration = (datetime.now() - self.entered_at).total_seconds()
        agent_logger.info(f"[{self.session_id}] Wait duration: {duration:.2f}s")
    
    def on_enter_error(self, event):
        error_msg = event.kwargs.get('error', 'Unknown error')
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'session_id': self.session_id,
            'state_when_error': event.transition.source,
            'error': error_msg,
            'retry_count': self.retry_count,
            'current_tool': self.current_tool
        }
        error_logger.error(json.dumps(log_entry, ensure_ascii=False))
    
    def on_exit_error(self, event):
        pass
    
    def on_enter_completed(self, event):
        agent_logger.info(f"[{self.session_id}] Task COMPLETED successfully")
    
    def on_exit_completed(self, event):
        pass
    
    # ==================== 工具注册 ====================
    
    def register_tool(self, tool_name: str):
        self.registered_tools.add(tool_name)
    
    def get_state_summary(self) -> dict:
        return {
            'session_id': self.session_id,
            'current_state': self.state,
            'retry_count': self.retry_count,
            'current_step': self.current_step,
            'total_steps': len(self.plan_steps),
            'state_history_count': len(self.state_history)
        }

使用示例

import asyncio
import logging

# 配置日志输出
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logging.getLogger('agent').setLevel(logging.INFO)
logging.getLogger('errors').setLevel(logging.ERROR)

async def run_hermes_agent(task: str):
    """模拟 Hermes Agent 完整执行流程"""
    
    agent = HermesAgentFSM(session_id="sess_001", max_retries=3)
    agent.register_tool("web_search")
    agent.register_tool("code_executor")
    agent.register_tool("file_writer")
    
    print(f"初始状态: {agent.state}")
    
    # 1. 接收任务 → PLANNING
    agent.receive_task(task=task)
    print(f"收到任务后: {agent.state}")
    
    # 2. 模拟 LLM 生成计划
    await asyncio.sleep(0.1)  # 模拟 LLM 延迟
    agent.plan_steps = ["搜索相关资料", "分析结果", "生成报告"]
    agent.plan_ready()
    print(f"规划完成后: {agent.state}")
    
    # 3. 执行第一步:调用工具
    agent.call_tool(tool_name="web_search", query="Hermes Agent FSM")
    print(f"调用工具后: {agent.state}")
    
    # 4. 等待工具结果
    await asyncio.sleep(0.2)  # 模拟工具执行时间
    agent.tool_result_received(result={"found": True, "data": "..."})
    print(f"收到结果后: {agent.state}")
    
    # 5. 完成执行
    agent.finish_execution(result="任务完成,报告已生成")
    print(f"执行完成后: {agent.state}")
    
    # 6. 重置状态
    agent.reset()
    print(f"重置后: {agent.state}")
    
    print("\n状态摘要:", agent.get_state_summary())

# 运行
asyncio.run(run_hermes_agent("分析 Hermes Agent 的架构特点"))

57.6 状态机调试技巧

技巧一:可视化状态图

from transitions.extensions import GraphMachine

class DebugHermesAgentFSM(HermesAgentFSM):
    """支持可视化输出的调试版本"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 替换为 GraphMachine 以支持图形输出
        self.machine = GraphMachine(
            model=self,
            states=self.STATES,
            transitions=self.TRANSITIONS,
            initial='idle',
            auto_transitions=False,
            show_conditions=True,
            show_state_attributes=True
        )
    
    def export_diagram(self, filename: str = "agent_fsm.png"):
        self.machine.get_graph().draw(filename, prog='dot')
        print(f"状态图已保存到 {filename}")

# 使用
debug_agent = DebugHermesAgentFSM(session_id="debug_001")
debug_agent.export_diagram("hermes_fsm.png")

技巧二:状态历史回放

def replay_session(session_id: str, log_file: str = "agent.log"):
    """从日志文件重放 Agent 执行历史"""
    import json
    
    events = []
    with open(log_file, 'r') as f:
        for line in f:
            try:
                entry = json.loads(line)
                if entry.get('session_id') == session_id:
                    events.append(entry)
            except json.JSONDecodeError:
                continue
    
    print(f"\n=== Session {session_id} 状态转换历史 ===")
    for i, event in enumerate(events):
        print(f"[{i+1}] {event['timestamp']}")
        print(f"     {event['from_state']} --[{event['trigger']}]--> {event['to_state']}")
        if event.get('kwargs'):
            print(f"     参数: {event['kwargs']}")
    
    # 检测异常模式
    state_counts = {}
    for event in events:
        dest = event['to_state']
        state_counts[dest] = state_counts.get(dest, 0) + 1
    
    if state_counts.get('error', 0) > 2:
        print(f"\n⚠️  警告:检测到 {state_counts['error']} 次错误状态,请检查 errors.log")
    
    if state_counts.get('waiting', 0) > 10:
        print(f"\n⚠️  警告:等待次数过多({state_counts['waiting']}),可能存在工具超时问题")

技巧三:状态超时监控

import asyncio
from contextlib import asynccontextmanager

class TimeoutMonitor:
    """监控 Agent 在各状态的停留时间"""
    
    STATE_TIMEOUTS = {
        'planning': 30,    # 规划最多 30 秒
        'executing': 60,   # 执行最多 60 秒
        'waiting': 45,     # 等待最多 45 秒
    }
    
    def __init__(self, agent: HermesAgentFSM):
        self.agent = agent
        self._monitoring = False
    
    async def monitor(self):
        self._monitoring = True
        while self._monitoring:
            await asyncio.sleep(5)  # 每5秒检查一次
            state = self.agent.state
            timeout = self.STATE_TIMEOUTS.get(state)
            
            if timeout:
                time_in_state = (datetime.now() - self.agent.entered_at).total_seconds()
                if time_in_state > timeout:
                    error_logger.warning(
                        f"State timeout: {state} has been active for "
                        f"{time_in_state:.1f}s (limit: {timeout}s)"
                    )
                    # 触发超时转换
                    if state == 'waiting':
                        self.agent.timeout(error=f"Tool timeout after {timeout}s")
    
    def stop(self):
        self._monitoring = False

技巧四:单元测试状态机

import pytest

class TestHermesAgentFSM:
    
    def setup_method(self):
        self.agent = HermesAgentFSM(session_id="test_001")
        self.agent.register_tool("web_search")
    
    def test_initial_state(self):
        assert self.agent.state == 'idle'
    
    def test_valid_task_triggers_planning(self):
        self.agent.receive_task(task="分析市场数据")
        assert self.agent.state == 'planning'
    
    def test_empty_task_rejected(self):
        with pytest.raises(Exception):
            self.agent.receive_task(task="")
    
    def test_full_happy_path(self):
        self.agent.receive_task(task="执行测试任务")
        self.agent.plan_steps = ["step1"]
        self.agent.plan_ready()
        self.agent.call_tool(tool_name="web_search")
        self.agent.tool_result_received(result="ok")
        self.agent.finish_execution(result="done")
        assert self.agent.state == 'completed'
    
    def test_retry_mechanism(self):
        self.agent.receive_task(task="测试重试")
        self.agent.plan_steps = ["step1"]
        self.agent.plan_ready()
        self.agent.execution_error(error="tool_failed")
        assert self.agent.state == 'error'
        assert self.agent.retry_count == 0
        
        self.agent.retry()
        assert self.agent.state == 'executing'
        assert self.agent.retry_count == 1
    
    def test_max_retry_exceeded(self):
        agent = HermesAgentFSM(session_id="test_002", max_retries=1)
        agent.register_tool("web_search")
        agent.receive_task(task="测试最大重试")
        agent.plan_steps = ["step1"]
        agent.plan_ready()
        agent.execution_error(error="tool_failed")
        agent.retry()  # retry_count = 1,达到上限
        agent.execution_error(error="tool_failed_again")
        
        # 现在不能再重试
        with pytest.raises(Exception):
            agent.retry()
        assert agent.state == 'error'

本章小结

本章系统介绍了 Hermes Agent 的状态机设计:

  1. 为何需要 FSM:Agent 需要状态机来保证行为可预测、错误可隔离、过程可审计
  2. 六大状态:IDLE → PLANNING → EXECUTING → WAITING → ERROR/COMPLETED 构成完整生命周期
  3. 转换规则:每个转换都有明确的触发事件、守卫条件和副作用动作
  4. Python 实现:使用 transitions 库实现了功能完整的 FSM,包含日志记录、错误处理和重试机制
  5. 调试工具:可视化图表、历史回放、超时监控和单元测试构成完整调试工具链

思考题

  1. 当 Agent 需要同时处理多个并发任务时,如何扩展 FSM 支持并发状态?(提示:研究 Statechart 的并行区域)
  2. 如果工具调用失败但 Agent 判断该步骤可跳过,如何设计"跳过"状态转换?
  3. 在分布式场景下,多个 Agent 实例共享一个状态机时,如何保证状态一致性?
  4. 如何将状态机与 Hermes Agent 的 MEMORY.md 结合,实现跨会话的状态持久化?
本章评分
4.7  / 5  (3 评分)

💬 留言讨论