第 9 章

Pi Agent 执行循环:状态机、11个生命周期事件与7层工具管道

第九章:Pi Agent 执行循环:状态机、11 个生命周期事件与 7 层工具管道

9.1 createAgentSession 完整参数详解

createAgentSession 是 Pi 框架的核心入口点。理解每个参数的作用,是理解整个执行循环的基础。

interface AgentSessionParams {
  // ── 基础运行环境 ──
  cwd: string;                    // 工作目录(Agent 执行 bash 命令的起点)
  agentDir: string;               // Agent 配置目录(存放 system prompt 等资源)
  
  // ── 认证与存储 ──
  authStorage: AuthStorage;       // API 密钥存储(OS 密钥链)
  
  // ── 模型配置 ──
  modelRegistry: ModelRegistry;   // 可用模型注册表
  model: string;                  // 使用的模型 ID 或别名
  thinkingLevel: "none" | "low" | "medium" | "high";  // 扩展思考深度
  
  // ── 工具配置 ──
  tools: Tool[];                  // 基础工具列表(通常是 Pi 四工具)
  customTools?: Tool[];           // 渠道特定的自定义工具
  
  // ── 状态管理 ──
  sessionManager: SessionManager; // 会话持久化(JSONL 存储)
  settingsManager: SettingsManager; // 运行时设置(超时、限制等)
  resourceLoader: ResourceLoader; // 资源热重载
  
  // ── 事件与会话关联 ──
  parentSessionId?: string;       // 父会话 ID(用于子 Agent)
  sessionId?: string;             // 恢复已有会话时指定
  
  // ── 生命周期控制 ──
  abortSignal?: AbortSignal;      // 外部中断信号
  onEvent?: (event: AgentEvent) => void; // 事件回调
}

9.1.1 各参数的深层作用

cwd 和 agentDir 的区别:

cwd(工作目录):
  /workspace/my-project/       ← bash 命令在这里执行
  ├── src/
  ├── package.json
  └── ...

agentDir(Agent 配置目录):
  ~/.openclaw/agents/my-agent/  ← Agent 的"大脑配置"存放处
  ├── system-prompt.md          ← System Prompt 模板
  ├── tools.yaml                ← 工具配置
  └── resources/                ← 其他静态资源

thinkingLevel 的实际影响:

"none"   → 不使用 thinking 功能,最快,适合简单任务
"low"    → 最多 1024 tokens 的内部推理,轻度复杂任务
"medium" → 最多 8192 tokens,适合中等复杂的工程问题
"high"   → 最多 32768 tokens,适合复杂架构设计和难题调试

高 thinking level 会显著提升多步推理能力,但也增加延迟和成本。


9.2 11 个生命周期事件的完整状态机

9.2.1 状态机全览

                    ┌─────────────────────────────────────────┐
                    │              AGENT 状态机                │
                    └─────────────────────────────────────────┘

  创建会话
     │
     ▼
① agent_start ──────────────────────────────────────────────────────┐
     │                                                               │
     ▼                                                               │
② turn_start                                                         │
     │                                                               │
     ▼                                                               │
③ message_start                                                       │
     │                                                               │
     ▼                                                               │
④ text_delta (×N)  ← 流式输出,重复触发                              │
     │                                                               │
     ├────────────── 有工具调用 ──→ ⑤ tool_execution_start           │
     │                                      │                       │
     │                                      ▼                       │
     │                             ⑥ tool_execution_update (×N)     │
     │                                      │                       │
     │                                      ▼                       │
     │                             ⑦ tool_execution_end             │
     │                                      │                       │
     │                                      └──→ 返回 ③(继续生成)  │
     │                                                               │
     ▼                                                               │
⑧ message_end                                                         │
     │                                                               │
     ▼                                                               │
⑨ turn_end                                                           │
     │                                                               │
     ├────── 有工具调用 & 未达终止条件 ──→ 返回 ② (新一轮)           │
     │                                                               │
     ▼                                                               │
⑩ agent_end ◄───────────────────────────────────────────────────────┘
     │            (任何阶段的错误或中断都直接跳到 agent_end)
     ▼
⑪ error(如果有错误)

9.2.2 每个事件的详细说明

① agent_start

interface AgentStartEvent {
  type: "agent_start";
  sessionId: string;
  model: string;
  tools: string[];           // 工具名称列表
  thinkingLevel: string;
  resumedFrom?: string;      // 如果是恢复会话,填写原会话 ID
  timestamp: number;
}

Agent 开始执行的信号。此时所有参数已验证,工具管道已构建,但 LLM 调用尚未发生。

② turn_start

interface TurnStartEvent {
  type: "turn_start";
  turnId: string;            // 本轮的唯一 ID
  turnIndex: number;         // 第几轮(0-based)
  messages: Message[];       // 本轮携带的完整上下文
}

一次"轮次"从 LLM 调用开始到 LLM 停止生成(不产生工具调用,或产生所有工具调用后完成)为止。一个 Agent 执行通常包含多个轮次(用户消息 → 工具调用 → 工具结果 → 继续生成 → ...)。

③ message_start

interface MessageStartEvent {
  type: "message_start";
  messageId: string;
  role: "assistant";
  model: string;
  usage: {
    inputTokens: number;   // 输入 token 数(基于上下文)
    // outputTokens 在 message_end 时才确定
  };
}

④ text_delta(流式文本,重复触发)

interface TextDeltaEvent {
  type: "text_delta";
  messageId: string;
  delta: string;             // 增量文本片段(通常 1-50 字符)
  index: number;             // 累计字符位置
}

每个 delta 应当立即渲染到 UI,实现打字机效果。

⑤ tool_execution_start

interface ToolExecutionStartEvent {
  type: "tool_execution_start";
  toolCallId: string;        // 唯一标识此次工具调用
  toolName: string;          // 工具名称
  input: Record<string, unknown>;  // 工具参数(LLM 生成)
  messageId: string;         // 所属消息 ID
}

⑥ tool_execution_update(重复触发)

interface ToolExecutionUpdateEvent {
  type: "tool_execution_update";
  toolCallId: string;
  updateType: "stdout" | "stderr" | "progress" | "partial-result";
  content: string;
  progressPercent?: number;  // 0-100,如果工具支持进度报告
}

⑦ tool_execution_end

interface ToolExecutionEndEvent {
  type: "tool_execution_end";
  toolCallId: string;
  success: boolean;
  output: string;            // 工具返回给 LLM 的完整输出
  durationMs: number;        // 执行耗时
  error?: {
    code: string;
    message: string;
  };
}

⑧ message_end

interface MessageEndEvent {
  type: "message_end";
  messageId: string;
  stopReason: "end_turn" | "tool_use" | "max_tokens" | "stop_sequence";
  usage: {
    inputTokens: number;
    outputTokens: number;
    thinkingTokens?: number;  // 如果使用了 thinking 功能
    cacheReadTokens?: number; // 缓存命中的 token 数
  };
}

⑨ turn_end

interface TurnEndEvent {
  type: "turn_end";
  turnId: string;
  hasToolCalls: boolean;
  shouldContinue: boolean;   // 执行循环是否继续
  terminationReason?: string; // 如果终止,说明原因
}

⑩ agent_end

interface AgentEndEvent {
  type: "agent_end";
  sessionId: string;
  totalTurns: number;
  totalTokens: number;
  durationMs: number;
  terminationReason: 
    | "no_tool_calls"         // 正常完成:LLM 不产生工具调用
    | "timeout_48h"           // 48 小时超时
    | "idle_timeout_120s"     // LLM 空闲 120 秒
    | "abort_signal"          // 外部中断
    | "gateway_disconnected"  // Gateway 断连
    | "error";                // 错误终止
}

⑪ error

interface ErrorEvent {
  type: "error";
  code: string;
  message: string;
  context?: Record<string, unknown>;  // 错误发生时的上下文
  recoverable: boolean;               // 是否可以尝试恢复
}

9.3 7 层工具注入过滤管道

工具在传递给 LLM 之前,经过 7 层处理。这个管道在 createAgentSession 调用时构建,不是在每次工具调用时重建。

工具管道构建流程:

输入工具(params.tools + params.customTools)
          │
          ▼
    ┌─────────────┐
    │  Layer 1    │  Pi 基础工具
    │             │  read / write / edit / bash
    └─────┬───────┘
          │
          ▼
    ┌─────────────┐
    │  Layer 2    │  沙箱感知替换
    │             │  bash → sandboxed-bash
    │             │  exec → sandboxed-exec(如果有)
    └─────┬───────┘
          │
          ▼
    ┌─────────────┐
    │  Layer 3    │  OpenClaw 扩展工具
    │             │  session.branch / session.navigate
    │             │  gateway.reload / agent.interrupt
    └─────┬───────┘
          │
          ▼
    ┌─────────────┐
    │  Layer 4    │  渠道工具(Channel-specific)
    │             │  discord.reply / slack.reaction
    │             │  (由渠道 Binding 注入)
    └─────┬───────┘
          │
          ▼
    ┌─────────────┐
    │  Layer 5    │  策略过滤
    │             │  根据 Agent 权限过滤不允许的工具
    │             │  根据 scopes 限制可用工具集
    └─────┬───────┘
          │
          ▼
    ┌─────────────┐
    │  Layer 6    │  Schema 规范化
    │             │  处理 Gemini/OpenAI 的 Schema 差异
    │             │  确保 JSON Schema 合规(additionalProperties 等)
    └─────┬───────┘
          │
          ▼
    ┌─────────────┐
    │  Layer 7    │  AbortSignal 包装
    │             │  为每个工具调用注入取消信号
    │             │  连接到 Session 的生命周期管理
    └─────┬───────┘
          │
          ▼
最终工具列表(传递给 LLM 的 tools 参数)

9.3.1 各层详解

Layer 1:Pi 基础工具

这是工具管道的起点,包含 Pi 框架定义的四个核心工具:

const piBaseTools: Tool[] = [
  {
    name: "read",
    description: "读取文件内容。支持读取部分内容(offset/limit)。",
    inputSchema: {
      type: "object",
      properties: {
        file_path: { type: "string", description: "绝对路径" },
        offset: { type: "number", description: "起始行(0-based)" },
        limit: { type: "number", description: "读取行数限制" }
      },
      required: ["file_path"]
    },
    execute: readFileImpl
  },
  // write, edit, bash 类似
];

Layer 2:沙箱感知替换

function applySandboxLayer(tools: Tool[], sandbox: Sandbox): Tool[] {
  return tools.map(tool => {
    if (tool.name === "bash") {
      return {
        ...tool,
        execute: (input) => sandbox.execute(input.command, {
          cwd: sessionParams.cwd,
          timeout: sandbox.config.timeout,
          allowedPaths: sandbox.config.allowedPaths
        })
      };
    }
    return tool;
  });
}

Layer 3:OpenClaw 扩展工具

const openclawExtensionTools: Tool[] = [
  {
    name: "session_branch",
    description: "创建当前会话的分支,返回新的 sessionId",
    execute: async () => {
      return sessionManager.branchSession(currentSessionId, latestMessageId);
    }
  },
  {
    name: "agent_interrupt",
    description: "请求中断当前 Agent 执行(优雅停止)",
    execute: async () => {
      abortController.abort("agent-requested");
      return { message: "中断请求已发送" };
    }
  }
];

Layer 4:渠道工具

渠道工具由 Binding 层注入,允许 Agent 直接与消息渠道交互:

// Discord Binding 注入的渠道工具
const discordChannelTools: Tool[] = [
  {
    name: "discord_add_reaction",
    description: "向触发此会话的 Discord 消息添加表情反应",
    execute: async (input) => {
      await discordClient.addReaction(
        binding.sourceMessageId,
        input.emoji
      );
    }
  },
  {
    name: "discord_send_file",
    description: "向当前 Discord 频道发送文件",
    execute: async (input) => {
      return discordClient.sendFile(binding.channelId, input.filePath);
    }
  }
];

Layer 5:策略过滤

function applyPolicyFilter(tools: Tool[], policy: AgentPolicy): Tool[] {
  return tools.filter(tool => {
    // 检查工具是否在允许列表中
    if (policy.allowedTools && !policy.allowedTools.includes(tool.name)) {
      return false;
    }
    // 检查所需 scope
    if (tool.requiredScope && !policy.scopes.includes(tool.requiredScope)) {
      return false;
    }
    return true;
  });
}

Layer 6:Schema 规范化

不同 LLM Provider 对 JSON Schema 的支持存在差异:

function normalizeToolSchema(tool: Tool, provider: string): Tool {
  const schema = structuredClone(tool.inputSchema);
  
  if (provider === "gemini") {
    // Gemini 不支持 $schema 字段
    delete schema["$schema"];
    // Gemini 不支持 additionalProperties: false
    // 需要转换为等价的 Gemini 格式
    removeAdditionalPropertiesFalse(schema);
  }
  
  if (provider === "openai") {
    // OpenAI 要求 additionalProperties: false 在顶层
    if (!schema.additionalProperties) {
      schema.additionalProperties = false;
    }
  }
  
  return { ...tool, inputSchema: schema };
}

Layer 7:AbortSignal 包装

function wrapWithAbortSignal(tools: Tool[], signal: AbortSignal): Tool[] {
  return tools.map(tool => ({
    ...tool,
    execute: async (input) => {
      // 检查是否已经被中断
      if (signal.aborted) {
        throw new AbortError("Tool execution aborted before start");
      }
      
      // 执行工具,传入中断信号
      return tool.execute(input, { signal });
    }
  }));
}

9.4 循环终止的 5 个条件

执行循环终止条件(任一条件满足即终止):

条件 1: 不产生工具调用(正常完成)
  ├── LLM 生成了完整的文本响应
  └── 没有包含任何 tool_use block
  → terminationReason: "no_tool_calls"
  → 这是最常见的正常完成方式

条件 2: 48 小时超时(硬限制)
  ├── 从 agent_start 开始计时
  └── 无论当前处于何种状态
  → terminationReason: "timeout_48h"
  → 防止永久运行的 zombie 会话

条件 3: LLM 空闲 120 秒(软超时)
  ├── LLM 响应超过 120 秒未产生任何 token
  └── 通常表示 LLM Provider 服务异常
  → terminationReason: "idle_timeout_120s"
  → 与条件2 配合提供分层超时保护

条件 4: AbortSignal 触发
  ├── 用户主动中断(Ctrl+C)
  ├── steer 模式注入的中断
  ├── Gateway 收到 agent.interrupt 请求
  └── 父 Agent 终止(子 Agent 的 AbortSignal 联动)
  → terminationReason: "abort_signal"
  → 最高优先级,立即生效(但等待当前工具调用完成)

条件 5: Gateway 断连
  ├── WebSocket 连接意外断开
  └── Gateway 进程崩溃重启
  → terminationReason: "gateway_disconnected"
  → 会话状态已持久化,可以在新连接建立后恢复

9.5 事件桥接:subscribeEmbeddedPiSession

当 Pi Agent 以嵌入式运行时,其内部事件需要被翻译为 OpenClaw 的 Gateway 事件流,以便客户端(Web UI、CLI)能够订阅。

// 事件桥接的核心实现
function subscribeEmbeddedPiSession(
  piSession: AgentSession,
  gateway: Gateway,
  gatewaySessionId: string
): Unsubscribe {
  
  return piSession.onEvent((event: AgentEvent) => {
    // 将 Pi 内部事件翻译为 Gateway WebSocket 事件
    const gatewayEvent = translatePiEvent(event, gatewaySessionId);
    
    if (gatewayEvent) {
      gateway.broadcastToSession(gatewaySessionId, {
        type: "event",
        event: gatewayEvent.name,
        payload: gatewayEvent.payload,
        seq: gateway.nextSeq(),
        stateVersion: gateway.currentStateVersion()
      });
    }
  });
}

function translatePiEvent(event: AgentEvent, sessionId: string): GatewayEvent | null {
  switch (event.type) {
    case "text_delta":
      return {
        name: "session.message.delta",
        payload: {
          sessionId,
          messageId: event.messageId,
          delta: event.delta,
          index: event.index
        }
      };
    
    case "tool_execution_start":
      return {
        name: "session.tool.started",
        payload: {
          sessionId,
          toolCallId: event.toolCallId,
          toolName: event.toolName,
          input: event.input
        }
      };
    
    case "agent_end":
      return {
        name: "session.status.changed",
        payload: {
          sessionId,
          status: "idle",
          terminationReason: event.terminationReason
        }
      };
    
    default:
      return null;  // 某些内部事件不需要广播给客户端
  }
}

9.6 Per-session 队列序列化

如第七章所述,Session Lane 的串行设计防止同一 Session 的并发状态竞争。但在执行循环内部,还有另一层序列化机制:

class SessionExecutionQueue {
  private queue: Promise<void> = Promise.resolve();
  
  // 所有对此 Session 的操作都通过这个函数串行化
  async enqueue<T>(operation: () => Promise<T>): Promise<T> {
    let resolve!: (value: T) => void;
    let reject!: (error: Error) => void;
    
    const resultPromise = new Promise<T>((res, rej) => {
      resolve = res;
      reject = rej;
    });
    
    // 链式 Promise 确保串行执行
    this.queue = this.queue.then(async () => {
      try {
        const result = await operation();
        resolve(result);
      } catch (error) {
        reject(error as Error);
      }
    });
    
    return resultPromise;
  }
}

这种实现使用 Promise 链实现无锁的异步串行化,比传统互斥锁(mutex)更适合 JavaScript 的单线程事件循环模型。


9.7 Session 树形结构

每个 Session 携带 idparentId,形成树形结构:

interface SessionNode {
  id: string;               // 全局唯一 ID
  parentId: string | null;  // null 表示根会话
  branchPoint: string | null; // 分支点的消息 ID(null 表示根节点)
  createdAt: number;        // Unix 时间戳
  lastActiveAt: number;
}

9.7.1 树形结构的实际应用

应用场景 1:子 Agent 追踪

sess_root(用户主会话)
    │ parentId: null
    ├── sess_sub_1(子 Agent:分析 src/)
    │       parentId: sess_root
    ├── sess_sub_2(子 Agent:分析 tests/)
    │       parentId: sess_root
    └── sess_sub_3(子 Agent:生成报告)
            parentId: sess_root

应用场景 2:分支导航(探索不同方案)

sess_v1(初始方案 A)
    │
    └── sess_v2(在 msg_020 处分支,探索方案 B)
            branchPoint: "msg_020"
            │
            └── sess_v3(在 msg_025 处再分支,探索方案 B 的变体)
                    branchPoint: "msg_025"

应用场景 3:用户在 TUI 中导航

# 列出所有 Session 树
openclaw sessions tree

# 切换到特定分支
openclaw sessions switch sess_v2

# 从 msg_015 处创建新分支
openclaw sessions branch --from msg_015

9.7.2 分支时的数据继承

async function branchSession(
  parentSessionId: string,
  fromMessageId: string
): Promise<Session> {
  const parent = await sessionManager.loadSession(parentSessionId);
  
  // 找到分支点之前的所有消息
  const inheritedMessages = parent.transcript.filter(
    msg => msg.timestamp <= parent.messages.find(m => m.id === fromMessageId)!.timestamp
  );
  
  // 创建新会话,携带继承的消息历史
  const branchSession = await sessionManager.createSession({
    parentId: parentSessionId,
    branchPoint: fromMessageId,
    initialMessages: inheritedMessages  // 继承历史
  });
  
  return branchSession;
}

9.8 执行循环的完整代码骨架

async function runAgentLoop(params: AgentSessionParams): Promise<AgentEndEvent> {
  const session = await sessionManager.createSession(params);
  const tools = buildToolPipeline(params);  // 构建 7 层管道
  
  emit({ type: "agent_start", sessionId: session.id, ... });
  
  let turnIndex = 0;
  
  while (true) {
    // 检查终止条件(条件 2 和 4)
    if (params.abortSignal?.aborted) break;
    if (Date.now() - session.createdAt > 48 * 60 * 60 * 1000) break;
    
    emit({ type: "turn_start", turnId: newId(), turnIndex });
    
    // 调用 LLM(带 120s 空闲超时检测)
    const stream = params.modelRegistry.getModel(params.model).chat(
      session.messages,
      { tools, signal: params.abortSignal }
    );
    
    let hasToolCalls = false;
    
    for await (const chunk of withIdleTimeout(stream, 120_000)) {
      // 处理流式事件
      if (chunk.type === "text_delta") {
        emit({ type: "text_delta", delta: chunk.delta, ... });
      }
      
      if (chunk.type === "tool_use") {
        hasToolCalls = true;
        emit({ type: "tool_execution_start", toolName: chunk.name, ... });
        
        const result = await executeToolWithUpdates(chunk, tools);
        
        emit({ type: "tool_execution_end", output: result, ... });
        session.appendToolResult(chunk.id, result);
      }
    }
    
    emit({ type: "turn_end", hasToolCalls, shouldContinue: hasToolCalls });
    
    // 条件 1:不产生工具调用,正常退出
    if (!hasToolCalls) break;
    
    turnIndex++;
  }
  
  const endEvent: AgentEndEvent = {
    type: "agent_end",
    sessionId: session.id,
    terminationReason: resolveTerminationReason(params.abortSignal),
    ...
  };
  
  emit(endEvent);
  return endEvent;
}

本章小结

Pi Agent 执行循环的精密之处在于多个系统的协同:

  1. createAgentSession 的每个参数都有明确职责,共同构成 Agent 的完整运行环境
  2. 11 个生命周期事件 覆盖从启动到终止的全过程,提供精细的可观测性
  3. 7 层工具管道 在 Agent 启动时一次性构建,每层职责清晰:基础工具→沙箱→扩展→渠道→策略→规范化→取消信号
  4. 5 个终止条件 提供分层保护:正常完成、软超时、硬超时、外部中断、Gateway 断连
  5. subscribeEmbeddedPiSession 将 Pi 内部事件桥接为 Gateway WebSocket 事件,使客户端可以实时订阅
  6. per-session Promise 链 提供无锁的异步串行化,与 Session Lane 双重保障状态安全
  7. Session 树形结构 支持子 Agent 追踪和分支导航,parentId 是实现这一切的关键字段

下一章将探讨消息如何被路由到正确的 Agent——8 级优先级匹配算法的细节。

本章评分
4.7  / 5  (44 评分)

💬 留言讨论