Pi Agent 执行循环:状态机、11个生命周期事件与7层工具管道
第九章:Pi Agent 执行循环:状态机、11 个生命周期事件与 7 层工具管道
9.1 createAgentSession 完整参数详解
createAgentSession 是 Pi 框架的核心入口点。理解每个参数的作用,是理解整个执行循环的基础。
interface AgentSessionParams {
// ── 基础运行环境 ──
cwd: string; // 工作目录(Agent 执行 bash 命令的起点)
agentDir: string; // Agent 配置目录(存放 system prompt 等资源)
// ── 认证与存储 ──
authStorage: AuthStorage; // API 密钥存储(OS 密钥链)
// ── 模型配置 ──
modelRegistry: ModelRegistry; // 可用模型注册表
model: string; // 使用的模型 ID 或别名
thinkingLevel: "none" | "low" | "medium" | "high"; // 扩展思考深度
// ── 工具配置 ──
tools: Tool[]; // 基础工具列表(通常是 Pi 四工具)
customTools?: Tool[]; // 渠道特定的自定义工具
// ── 状态管理 ──
sessionManager: SessionManager; // 会话持久化(JSONL 存储)
settingsManager: SettingsManager; // 运行时设置(超时、限制等)
resourceLoader: ResourceLoader; // 资源热重载
// ── 事件与会话关联 ──
parentSessionId?: string; // 父会话 ID(用于子 Agent)
sessionId?: string; // 恢复已有会话时指定
// ── 生命周期控制 ──
abortSignal?: AbortSignal; // 外部中断信号
onEvent?: (event: AgentEvent) => void; // 事件回调
}
9.1.1 各参数的深层作用
cwd 和 agentDir 的区别:
cwd(工作目录):
/workspace/my-project/ ← bash 命令在这里执行
├── src/
├── package.json
└── ...
agentDir(Agent 配置目录):
~/.openclaw/agents/my-agent/ ← Agent 的"大脑配置"存放处
├── system-prompt.md ← System Prompt 模板
├── tools.yaml ← 工具配置
└── resources/ ← 其他静态资源
thinkingLevel 的实际影响:
"none" → 不使用 thinking 功能,最快,适合简单任务
"low" → 最多 1024 tokens 的内部推理,轻度复杂任务
"medium" → 最多 8192 tokens,适合中等复杂的工程问题
"high" → 最多 32768 tokens,适合复杂架构设计和难题调试
高 thinking level 会显著提升多步推理能力,但也增加延迟和成本。
9.2 11 个生命周期事件的完整状态机
9.2.1 状态机全览
┌─────────────────────────────────────────┐
│ AGENT 状态机 │
└─────────────────────────────────────────┘
创建会话
│
▼
① agent_start ──────────────────────────────────────────────────────┐
│ │
▼ │
② turn_start │
│ │
▼ │
③ message_start │
│ │
▼ │
④ text_delta (×N) ← 流式输出,重复触发 │
│ │
├────────────── 有工具调用 ──→ ⑤ tool_execution_start │
│ │ │
│ ▼ │
│ ⑥ tool_execution_update (×N) │
│ │ │
│ ▼ │
│ ⑦ tool_execution_end │
│ │ │
│ └──→ 返回 ③(继续生成) │
│ │
▼ │
⑧ message_end │
│ │
▼ │
⑨ turn_end │
│ │
├────── 有工具调用 & 未达终止条件 ──→ 返回 ② (新一轮) │
│ │
▼ │
⑩ agent_end ◄───────────────────────────────────────────────────────┘
│ (任何阶段的错误或中断都直接跳到 agent_end)
▼
⑪ error(如果有错误)
9.2.2 每个事件的详细说明
① agent_start
interface AgentStartEvent {
type: "agent_start";
sessionId: string;
model: string;
tools: string[]; // 工具名称列表
thinkingLevel: string;
resumedFrom?: string; // 如果是恢复会话,填写原会话 ID
timestamp: number;
}
Agent 开始执行的信号。此时所有参数已验证,工具管道已构建,但 LLM 调用尚未发生。
② turn_start
interface TurnStartEvent {
type: "turn_start";
turnId: string; // 本轮的唯一 ID
turnIndex: number; // 第几轮(0-based)
messages: Message[]; // 本轮携带的完整上下文
}
一次"轮次"从 LLM 调用开始到 LLM 停止生成(不产生工具调用,或产生所有工具调用后完成)为止。一个 Agent 执行通常包含多个轮次(用户消息 → 工具调用 → 工具结果 → 继续生成 → ...)。
③ message_start
interface MessageStartEvent {
type: "message_start";
messageId: string;
role: "assistant";
model: string;
usage: {
inputTokens: number; // 输入 token 数(基于上下文)
// outputTokens 在 message_end 时才确定
};
}
④ text_delta(流式文本,重复触发)
interface TextDeltaEvent {
type: "text_delta";
messageId: string;
delta: string; // 增量文本片段(通常 1-50 字符)
index: number; // 累计字符位置
}
每个 delta 应当立即渲染到 UI,实现打字机效果。
⑤ tool_execution_start
interface ToolExecutionStartEvent {
type: "tool_execution_start";
toolCallId: string; // 唯一标识此次工具调用
toolName: string; // 工具名称
input: Record<string, unknown>; // 工具参数(LLM 生成)
messageId: string; // 所属消息 ID
}
⑥ tool_execution_update(重复触发)
interface ToolExecutionUpdateEvent {
type: "tool_execution_update";
toolCallId: string;
updateType: "stdout" | "stderr" | "progress" | "partial-result";
content: string;
progressPercent?: number; // 0-100,如果工具支持进度报告
}
⑦ tool_execution_end
interface ToolExecutionEndEvent {
type: "tool_execution_end";
toolCallId: string;
success: boolean;
output: string; // 工具返回给 LLM 的完整输出
durationMs: number; // 执行耗时
error?: {
code: string;
message: string;
};
}
⑧ message_end
interface MessageEndEvent {
type: "message_end";
messageId: string;
stopReason: "end_turn" | "tool_use" | "max_tokens" | "stop_sequence";
usage: {
inputTokens: number;
outputTokens: number;
thinkingTokens?: number; // 如果使用了 thinking 功能
cacheReadTokens?: number; // 缓存命中的 token 数
};
}
⑨ turn_end
interface TurnEndEvent {
type: "turn_end";
turnId: string;
hasToolCalls: boolean;
shouldContinue: boolean; // 执行循环是否继续
terminationReason?: string; // 如果终止,说明原因
}
⑩ agent_end
interface AgentEndEvent {
type: "agent_end";
sessionId: string;
totalTurns: number;
totalTokens: number;
durationMs: number;
terminationReason:
| "no_tool_calls" // 正常完成:LLM 不产生工具调用
| "timeout_48h" // 48 小时超时
| "idle_timeout_120s" // LLM 空闲 120 秒
| "abort_signal" // 外部中断
| "gateway_disconnected" // Gateway 断连
| "error"; // 错误终止
}
⑪ error
interface ErrorEvent {
type: "error";
code: string;
message: string;
context?: Record<string, unknown>; // 错误发生时的上下文
recoverable: boolean; // 是否可以尝试恢复
}
9.3 7 层工具注入过滤管道
工具在传递给 LLM 之前,经过 7 层处理。这个管道在 createAgentSession 调用时构建,不是在每次工具调用时重建。
工具管道构建流程:
输入工具(params.tools + params.customTools)
│
▼
┌─────────────┐
│ Layer 1 │ Pi 基础工具
│ │ read / write / edit / bash
└─────┬───────┘
│
▼
┌─────────────┐
│ Layer 2 │ 沙箱感知替换
│ │ bash → sandboxed-bash
│ │ exec → sandboxed-exec(如果有)
└─────┬───────┘
│
▼
┌─────────────┐
│ Layer 3 │ OpenClaw 扩展工具
│ │ session.branch / session.navigate
│ │ gateway.reload / agent.interrupt
└─────┬───────┘
│
▼
┌─────────────┐
│ Layer 4 │ 渠道工具(Channel-specific)
│ │ discord.reply / slack.reaction
│ │ (由渠道 Binding 注入)
└─────┬───────┘
│
▼
┌─────────────┐
│ Layer 5 │ 策略过滤
│ │ 根据 Agent 权限过滤不允许的工具
│ │ 根据 scopes 限制可用工具集
└─────┬───────┘
│
▼
┌─────────────┐
│ Layer 6 │ Schema 规范化
│ │ 处理 Gemini/OpenAI 的 Schema 差异
│ │ 确保 JSON Schema 合规(additionalProperties 等)
└─────┬───────┘
│
▼
┌─────────────┐
│ Layer 7 │ AbortSignal 包装
│ │ 为每个工具调用注入取消信号
│ │ 连接到 Session 的生命周期管理
└─────┬───────┘
│
▼
最终工具列表(传递给 LLM 的 tools 参数)
9.3.1 各层详解
Layer 1:Pi 基础工具
这是工具管道的起点,包含 Pi 框架定义的四个核心工具:
const piBaseTools: Tool[] = [
{
name: "read",
description: "读取文件内容。支持读取部分内容(offset/limit)。",
inputSchema: {
type: "object",
properties: {
file_path: { type: "string", description: "绝对路径" },
offset: { type: "number", description: "起始行(0-based)" },
limit: { type: "number", description: "读取行数限制" }
},
required: ["file_path"]
},
execute: readFileImpl
},
// write, edit, bash 类似
];
Layer 2:沙箱感知替换
function applySandboxLayer(tools: Tool[], sandbox: Sandbox): Tool[] {
return tools.map(tool => {
if (tool.name === "bash") {
return {
...tool,
execute: (input) => sandbox.execute(input.command, {
cwd: sessionParams.cwd,
timeout: sandbox.config.timeout,
allowedPaths: sandbox.config.allowedPaths
})
};
}
return tool;
});
}
Layer 3:OpenClaw 扩展工具
const openclawExtensionTools: Tool[] = [
{
name: "session_branch",
description: "创建当前会话的分支,返回新的 sessionId",
execute: async () => {
return sessionManager.branchSession(currentSessionId, latestMessageId);
}
},
{
name: "agent_interrupt",
description: "请求中断当前 Agent 执行(优雅停止)",
execute: async () => {
abortController.abort("agent-requested");
return { message: "中断请求已发送" };
}
}
];
Layer 4:渠道工具
渠道工具由 Binding 层注入,允许 Agent 直接与消息渠道交互:
// Discord Binding 注入的渠道工具
const discordChannelTools: Tool[] = [
{
name: "discord_add_reaction",
description: "向触发此会话的 Discord 消息添加表情反应",
execute: async (input) => {
await discordClient.addReaction(
binding.sourceMessageId,
input.emoji
);
}
},
{
name: "discord_send_file",
description: "向当前 Discord 频道发送文件",
execute: async (input) => {
return discordClient.sendFile(binding.channelId, input.filePath);
}
}
];
Layer 5:策略过滤
function applyPolicyFilter(tools: Tool[], policy: AgentPolicy): Tool[] {
return tools.filter(tool => {
// 检查工具是否在允许列表中
if (policy.allowedTools && !policy.allowedTools.includes(tool.name)) {
return false;
}
// 检查所需 scope
if (tool.requiredScope && !policy.scopes.includes(tool.requiredScope)) {
return false;
}
return true;
});
}
Layer 6:Schema 规范化
不同 LLM Provider 对 JSON Schema 的支持存在差异:
function normalizeToolSchema(tool: Tool, provider: string): Tool {
const schema = structuredClone(tool.inputSchema);
if (provider === "gemini") {
// Gemini 不支持 $schema 字段
delete schema["$schema"];
// Gemini 不支持 additionalProperties: false
// 需要转换为等价的 Gemini 格式
removeAdditionalPropertiesFalse(schema);
}
if (provider === "openai") {
// OpenAI 要求 additionalProperties: false 在顶层
if (!schema.additionalProperties) {
schema.additionalProperties = false;
}
}
return { ...tool, inputSchema: schema };
}
Layer 7:AbortSignal 包装
function wrapWithAbortSignal(tools: Tool[], signal: AbortSignal): Tool[] {
return tools.map(tool => ({
...tool,
execute: async (input) => {
// 检查是否已经被中断
if (signal.aborted) {
throw new AbortError("Tool execution aborted before start");
}
// 执行工具,传入中断信号
return tool.execute(input, { signal });
}
}));
}
9.4 循环终止的 5 个条件
执行循环终止条件(任一条件满足即终止):
条件 1: 不产生工具调用(正常完成)
├── LLM 生成了完整的文本响应
└── 没有包含任何 tool_use block
→ terminationReason: "no_tool_calls"
→ 这是最常见的正常完成方式
条件 2: 48 小时超时(硬限制)
├── 从 agent_start 开始计时
└── 无论当前处于何种状态
→ terminationReason: "timeout_48h"
→ 防止永久运行的 zombie 会话
条件 3: LLM 空闲 120 秒(软超时)
├── LLM 响应超过 120 秒未产生任何 token
└── 通常表示 LLM Provider 服务异常
→ terminationReason: "idle_timeout_120s"
→ 与条件2 配合提供分层超时保护
条件 4: AbortSignal 触发
├── 用户主动中断(Ctrl+C)
├── steer 模式注入的中断
├── Gateway 收到 agent.interrupt 请求
└── 父 Agent 终止(子 Agent 的 AbortSignal 联动)
→ terminationReason: "abort_signal"
→ 最高优先级,立即生效(但等待当前工具调用完成)
条件 5: Gateway 断连
├── WebSocket 连接意外断开
└── Gateway 进程崩溃重启
→ terminationReason: "gateway_disconnected"
→ 会话状态已持久化,可以在新连接建立后恢复
9.5 事件桥接:subscribeEmbeddedPiSession
当 Pi Agent 以嵌入式运行时,其内部事件需要被翻译为 OpenClaw 的 Gateway 事件流,以便客户端(Web UI、CLI)能够订阅。
// 事件桥接的核心实现
function subscribeEmbeddedPiSession(
piSession: AgentSession,
gateway: Gateway,
gatewaySessionId: string
): Unsubscribe {
return piSession.onEvent((event: AgentEvent) => {
// 将 Pi 内部事件翻译为 Gateway WebSocket 事件
const gatewayEvent = translatePiEvent(event, gatewaySessionId);
if (gatewayEvent) {
gateway.broadcastToSession(gatewaySessionId, {
type: "event",
event: gatewayEvent.name,
payload: gatewayEvent.payload,
seq: gateway.nextSeq(),
stateVersion: gateway.currentStateVersion()
});
}
});
}
function translatePiEvent(event: AgentEvent, sessionId: string): GatewayEvent | null {
switch (event.type) {
case "text_delta":
return {
name: "session.message.delta",
payload: {
sessionId,
messageId: event.messageId,
delta: event.delta,
index: event.index
}
};
case "tool_execution_start":
return {
name: "session.tool.started",
payload: {
sessionId,
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input
}
};
case "agent_end":
return {
name: "session.status.changed",
payload: {
sessionId,
status: "idle",
terminationReason: event.terminationReason
}
};
default:
return null; // 某些内部事件不需要广播给客户端
}
}
9.6 Per-session 队列序列化
如第七章所述,Session Lane 的串行设计防止同一 Session 的并发状态竞争。但在执行循环内部,还有另一层序列化机制:
class SessionExecutionQueue {
private queue: Promise<void> = Promise.resolve();
// 所有对此 Session 的操作都通过这个函数串行化
async enqueue<T>(operation: () => Promise<T>): Promise<T> {
let resolve!: (value: T) => void;
let reject!: (error: Error) => void;
const resultPromise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
// 链式 Promise 确保串行执行
this.queue = this.queue.then(async () => {
try {
const result = await operation();
resolve(result);
} catch (error) {
reject(error as Error);
}
});
return resultPromise;
}
}
这种实现使用 Promise 链实现无锁的异步串行化,比传统互斥锁(mutex)更适合 JavaScript 的单线程事件循环模型。
9.7 Session 树形结构
每个 Session 携带 id 和 parentId,形成树形结构:
interface SessionNode {
id: string; // 全局唯一 ID
parentId: string | null; // null 表示根会话
branchPoint: string | null; // 分支点的消息 ID(null 表示根节点)
createdAt: number; // Unix 时间戳
lastActiveAt: number;
}
9.7.1 树形结构的实际应用
应用场景 1:子 Agent 追踪
sess_root(用户主会话)
│ parentId: null
├── sess_sub_1(子 Agent:分析 src/)
│ parentId: sess_root
├── sess_sub_2(子 Agent:分析 tests/)
│ parentId: sess_root
└── sess_sub_3(子 Agent:生成报告)
parentId: sess_root
应用场景 2:分支导航(探索不同方案)
sess_v1(初始方案 A)
│
└── sess_v2(在 msg_020 处分支,探索方案 B)
branchPoint: "msg_020"
│
└── sess_v3(在 msg_025 处再分支,探索方案 B 的变体)
branchPoint: "msg_025"
应用场景 3:用户在 TUI 中导航
# 列出所有 Session 树
openclaw sessions tree
# 切换到特定分支
openclaw sessions switch sess_v2
# 从 msg_015 处创建新分支
openclaw sessions branch --from msg_015
9.7.2 分支时的数据继承
async function branchSession(
parentSessionId: string,
fromMessageId: string
): Promise<Session> {
const parent = await sessionManager.loadSession(parentSessionId);
// 找到分支点之前的所有消息
const inheritedMessages = parent.transcript.filter(
msg => msg.timestamp <= parent.messages.find(m => m.id === fromMessageId)!.timestamp
);
// 创建新会话,携带继承的消息历史
const branchSession = await sessionManager.createSession({
parentId: parentSessionId,
branchPoint: fromMessageId,
initialMessages: inheritedMessages // 继承历史
});
return branchSession;
}
9.8 执行循环的完整代码骨架
async function runAgentLoop(params: AgentSessionParams): Promise<AgentEndEvent> {
const session = await sessionManager.createSession(params);
const tools = buildToolPipeline(params); // 构建 7 层管道
emit({ type: "agent_start", sessionId: session.id, ... });
let turnIndex = 0;
while (true) {
// 检查终止条件(条件 2 和 4)
if (params.abortSignal?.aborted) break;
if (Date.now() - session.createdAt > 48 * 60 * 60 * 1000) break;
emit({ type: "turn_start", turnId: newId(), turnIndex });
// 调用 LLM(带 120s 空闲超时检测)
const stream = params.modelRegistry.getModel(params.model).chat(
session.messages,
{ tools, signal: params.abortSignal }
);
let hasToolCalls = false;
for await (const chunk of withIdleTimeout(stream, 120_000)) {
// 处理流式事件
if (chunk.type === "text_delta") {
emit({ type: "text_delta", delta: chunk.delta, ... });
}
if (chunk.type === "tool_use") {
hasToolCalls = true;
emit({ type: "tool_execution_start", toolName: chunk.name, ... });
const result = await executeToolWithUpdates(chunk, tools);
emit({ type: "tool_execution_end", output: result, ... });
session.appendToolResult(chunk.id, result);
}
}
emit({ type: "turn_end", hasToolCalls, shouldContinue: hasToolCalls });
// 条件 1:不产生工具调用,正常退出
if (!hasToolCalls) break;
turnIndex++;
}
const endEvent: AgentEndEvent = {
type: "agent_end",
sessionId: session.id,
terminationReason: resolveTerminationReason(params.abortSignal),
...
};
emit(endEvent);
return endEvent;
}
本章小结
Pi Agent 执行循环的精密之处在于多个系统的协同:
- createAgentSession 的每个参数都有明确职责,共同构成 Agent 的完整运行环境
- 11 个生命周期事件 覆盖从启动到终止的全过程,提供精细的可观测性
- 7 层工具管道 在 Agent 启动时一次性构建,每层职责清晰:基础工具→沙箱→扩展→渠道→策略→规范化→取消信号
- 5 个终止条件 提供分层保护:正常完成、软超时、硬超时、外部中断、Gateway 断连
- subscribeEmbeddedPiSession 将 Pi 内部事件桥接为 Gateway WebSocket 事件,使客户端可以实时订阅
- per-session Promise 链 提供无锁的异步串行化,与 Session Lane 双重保障状态安全
- Session 树形结构 支持子 Agent 追踪和分支导航,
parentId是实现这一切的关键字段
下一章将探讨消息如何被路由到正确的 Agent——8 级优先级匹配算法的细节。