第 52 章
Plan-and-Execute:先规划再行动
第52章:Plan-and-Execute:先规划再行动
导语
ReAct 是一种"走一步看一步"的策略——每次行动后立即处理观察结果,再决定下一步。这种即时反应性使其擅长动态、不确定的任务。然而,当面对复杂的长任务时,ReAct 可能在细节中迷失,看不见全局。Plan-and-Execute(规划后执行)模式提供了另一条路:先由专门的规划器制定完整计划,再由执行器逐步落实。本章深入探讨这两种范式的本质区别,以及如何在 Hermes Agent 中实现 Plan-and-Execute。
52.1 与 ReAct 的核心区别
flowchart TD
subgraph ReAct["ReAct 模式(实时决策)"]
direction LR
R1[任务] --> R2[Thought] --> R3[Action] --> R4[Observe]
R4 --> R2
R2 --> R5[答案]
end
subgraph PE["Plan-and-Execute 模式(计划驱动)"]
direction TB
P1[任务] --> P2[Planner LLM\n生成完整计划]
P2 --> P3[步骤 1]
P2 --> P4[步骤 2]
P2 --> P5[步骤 3]
P3 --> E1[Executor\n执行步骤1]
P4 --> E2[Executor\n执行步骤2]
P5 --> E3[Executor\n执行步骤3]
E1 --> AGG[结果汇总]
E2 --> AGG
E3 --> AGG
AGG --> ANS[最终答案]
end
核心对比
| 维度 | ReAct | Plan-and-Execute |
|---|---|---|
| 决策时机 | 每步实时决策 | 预先规划,然后逐步执行 |
| 任务类型适合 | 动态、探索性、短任务 | 结构化、长任务、多步骤 |
| 灵活性 | 高(随时调整) | 低(计划确定后难以改变) |
| 全局一致性 | 低(容易迷失) | 高(始终有完整蓝图) |
| 并行执行潜力 | 有限(依赖前序观察) | 高(独立步骤可并行) |
| 调试可视性 | 低(步骤隐含在循环中) | 高(计划明确可见) |
| 需要的 LLM 调用次数 | 每步一次 | 规划一次 + 每步一次 |
何时选择 Plan-and-Execute
适合 Plan-and-Execute 的场景:
- 竞争分析报告(需要查询多个来源、整合数据、撰写报告)
- 代码重构(分析现有代码 → 制定修改计划 → 逐文件修改)
- 研究综述(确定主题 → 分配子主题 → 并行搜索 → 汇总)
- 批量数据处理(制定处理流程 → 批量执行)
不适合的场景:
- 需要根据中间结果随时调整方向的探索性任务
- 工具调用结果高度不可预测的场景
- 简单的单步问答
52.2 两阶段设计:Planner LLM + Executor LLM
Plan-and-Execute 的核心创新是将规划智能和执行智能分离:
Planner LLM:负责全局规划,接受任务描述,输出结构化的执行计划(步骤列表)。Planner 需要有强大的推理能力,但不需要了解工具的具体实现细节。
Executor LLM:负责执行单个步骤,接受步骤描述,使用工具完成具体操作,返回执行结果。Executor 是 ReAct 模式的缩小版,专注于单个子任务。
# plan_and_execute.py
"""
Plan-and-Execute Agent 完整实现
"""
import json
import asyncio
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI
# ─── 数据结构 ──────────────────────────────────────────────────
@dataclass
class ExecutionStep:
"""执行计划中的单个步骤"""
step_id: int
title: str # 步骤标题
description: str # 详细描述
depends_on: list[int] = field(default_factory=list) # 依赖的前序步骤
tool_hints: list[str] = field(default_factory=list) # 建议使用的工具
# 执行状态
status: str = "pending" # pending/running/success/failed
result: Optional[str] = None
error: Optional[str] = None
@dataclass
class ExecutionPlan:
"""完整执行计划"""
goal: str
steps: list[ExecutionStep]
created_at: float = field(default_factory=lambda: __import__("time").time())
def get_ready_steps(self) -> list[ExecutionStep]:
"""获取所有依赖已满足、可以执行的步骤"""
completed_ids = {
s.step_id for s in self.steps if s.status == "success"
}
return [
s for s in self.steps
if s.status == "pending" and all(dep in completed_ids for dep in s.depends_on)
]
def is_complete(self) -> bool:
return all(s.status in ("success", "failed") for s in self.steps)
def has_failed(self) -> bool:
return any(s.status == "failed" for s in self.steps)
def summary(self) -> str:
total = len(self.steps)
success = sum(1 for s in self.steps if s.status == "success")
failed = sum(1 for s in self.steps if s.status == "failed")
pending = sum(1 for s in self.steps if s.status == "pending")
return f"总步骤: {total}, 成功: {success}, 失败: {failed}, 待执行: {pending}"
52.3 完整实现
# ─── Planner LLM ──────────────────────────────────────────────
class Planner:
"""
任务规划器:将复杂任务分解为有序执行步骤
"""
PLANNER_SYSTEM = """你是一个任务规划专家。给定一个复杂任务,你需要将其分解为
清晰的、有序的执行步骤。
输出格式(JSON):
{
"steps": [
{
"step_id": 1,
"title": "步骤标题",
"description": "详细操作描述,说明需要做什么、为什么",
"depends_on": [],
"tool_hints": ["web_search", "code_exec"]
}
]
}
规划原则:
1. 每个步骤应该是可独立执行的原子操作
2. 明确标注步骤间的依赖关系
3. 可以并行执行的步骤不要设置依赖
4. 步骤描述要足够具体,执行者能够直接理解并操作
5. 步骤数量控制在 3-10 个,避免过度分解"""
def __init__(self, client: AsyncOpenAI, model: str):
self.client = client
self.model = model
async def create_plan(self, task: str, context: str = "") -> ExecutionPlan:
"""将任务分解为执行计划"""
user_msg = f"任务:{task}"
if context:
user_msg += f"\n\n背景信息:{context}"
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.PLANNER_SYSTEM},
{"role": "user", "content": user_msg}
],
response_format={"type": "json_object"},
temperature=0.2,
max_tokens=2048
)
plan_data = json.loads(response.choices[0].message.content)
steps = [
ExecutionStep(
step_id=s["step_id"],
title=s["title"],
description=s["description"],
depends_on=s.get("depends_on", []),
tool_hints=s.get("tool_hints", [])
)
for s in plan_data["steps"]
]
print(f"\n[Planner] 为任务制定了 {len(steps)} 步计划:")
for step in steps:
deps = f" (依赖: {step.depends_on})" if step.depends_on else ""
print(f" 步骤 {step.step_id}: {step.title}{deps}")
return ExecutionPlan(goal=task, steps=steps)
async def replan(
self,
original_plan: ExecutionPlan,
failed_step: ExecutionStep,
completed_results: dict
) -> ExecutionPlan:
"""
当某步骤失败时,基于已完成的工作重新规划
"""
completed_summary = "\n".join([
f"步骤 {sid}: {result[:200]}"
for sid, result in completed_results.items()
])
user_msg = f"""原始任务:{original_plan.goal}
已完成的步骤:
{completed_summary}
失败的步骤:
- 标题:{failed_step.title}
- 描述:{failed_step.description}
- 错误:{failed_step.error}
请基于已完成的工作,制定新的计划来完成剩余的目标。"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.PLANNER_SYSTEM},
{"role": "user", "content": user_msg}
],
response_format={"type": "json_object"},
temperature=0.3,
max_tokens=2048
)
plan_data = json.loads(response.choices[0].message.content)
steps = [
ExecutionStep(**s) for s in plan_data["steps"]
]
print(f"\n[Planner] 重新规划:{len(steps)} 步新计划")
return ExecutionPlan(goal=original_plan.goal, steps=steps)
# ─── Executor LLM ─────────────────────────────────────────────
class Executor:
"""
步骤执行器:执行单个计划步骤
本质上是一个小型 ReAct Agent,专注于单个子任务
"""
EXECUTOR_SYSTEM = """你是一个精确的任务执行者。给定一个具体的执行步骤,
你需要使用可用的工具来完成这个步骤,并返回清晰的执行结果。
执行原则:
1. 专注于完成当前步骤,不要偏离目标
2. 如果工具调用失败,尝试替代方案
3. 执行结果要简洁、结构化,便于后续步骤使用
4. 如果步骤无法完成,明确说明原因"""
def __init__(self, client: AsyncOpenAI, model: str, tools: list):
self.client = client
self.model = model
self.tools = tools
self.max_iterations = 8 # 每步最多 8 轮工具调用
async def execute_step(
self,
step: ExecutionStep,
context: dict, # 前序步骤的结果
plan_goal: str
) -> str:
"""执行单个步骤,返回执行结果"""
# 构建步骤执行提示词
context_str = ""
if context:
context_parts = []
for step_id, result in context.items():
context_parts.append(f"步骤 {step_id} 的结果: {result[:500]}")
context_str = "\n\n前序步骤的结果:\n" + "\n".join(context_parts)
user_msg = f"""整体目标:{plan_goal}
当前需要完成的步骤:
标题:{step.title}
描述:{step.description}
建议工具:{', '.join(step.tool_hints) or '根据需要选择'}
{context_str}
请完成这个步骤并返回结果。"""
messages = [
{"role": "system", "content": self.EXECUTOR_SYSTEM},
{"role": "user", "content": user_msg}
]
print(f"\n[Executor] 开始执行步骤 {step.step_id}: {step.title}")
# 内部 ReAct 循环(专注于单步)
for iteration in range(self.max_iterations):
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools,
tool_choice="auto",
temperature=0.1,
max_tokens=1024
)
msg = response.choices[0].message
if msg.tool_calls:
# 执行工具调用(简化示例)
tool_results = []
for tc in msg.tool_calls:
result = f"[工具 {tc.function.name} 执行结果]" # 实际需调用真实工具
tool_results.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result
})
messages.append({
"role": "assistant",
"content": msg.content,
"tool_calls": [tc.model_dump() for tc in msg.tool_calls]
})
messages.extend(tool_results)
else:
# 步骤完成
result = msg.content or "步骤完成,无明确输出。"
print(f"[Executor] 步骤 {step.step_id} 完成:{result[:100]}...")
return result
raise RuntimeError(f"步骤 {step.step_id} 在 {self.max_iterations} 次迭代后未完成")
# ─── Plan-and-Execute 主控制器 ─────────────────────────────────
class PlanAndExecuteAgent:
"""
Plan-and-Execute Agent 主控制器
支持:
- 并行执行独立步骤
- 失败重规划
- 完整执行日志
"""
def __init__(
self,
model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
base_url: str = "http://localhost:8000/v1",
api_key: str = "not-needed",
max_replan_attempts: int = 2
):
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
self.planner = Planner(self.client, model)
# 注册工具(格式与 ReAct 相同)
tools_schema = self._build_tools_schema()
self.executor = Executor(self.client, model, tools_schema)
self.max_replan_attempts = max_replan_attempts
def _build_tools_schema(self) -> list:
return [
{
"type": "function",
"function": {
"name": "web_search",
"description": "搜索互联网信息",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "将内容写入文件",
"parameters": {
"type": "object",
"properties": {
"filename": {"type": "string"},
"content": {"type": "string"}
},
"required": ["filename", "content"]
}
}
}
]
async def run(self, task: str) -> dict:
"""
执行完整的 Plan-and-Execute 流程
"""
print(f"\n{'='*60}")
print(f"Plan-and-Execute Agent")
print(f"任务:{task}")
print(f"{'='*60}")
# 阶段1:规划
plan = await self.planner.create_plan(task)
completed_results = {} # step_id -> result
replan_count = 0
# 阶段2:执行循环
while not plan.is_complete():
ready_steps = plan.get_ready_steps()
if not ready_steps:
print("[控制器] 无可执行步骤,检查依赖关系...")
break
print(f"\n[控制器] 当前可并行执行 {len(ready_steps)} 个步骤")
# 并行执行所有就绪步骤
tasks = [
self._execute_with_error_handling(step, completed_results, plan.goal)
for step in ready_steps
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理执行结果
all_failed = True
for step, result in zip(ready_steps, results):
if isinstance(result, Exception):
step.status = "failed"
step.error = str(result)
print(f"[控制器] 步骤 {step.step_id} 失败:{step.error}")
else:
step.status = "success"
step.result = result
completed_results[step.step_id] = result
all_failed = False
# 如果有步骤失败,尝试重规划
if plan.has_failed() and replan_count < self.max_replan_attempts:
failed_step = next(s for s in plan.steps if s.status == "failed")
print(f"\n[控制器] 触发重规划(第 {replan_count + 1} 次)")
plan = await self.planner.replan(
original_plan=plan,
failed_step=failed_step,
completed_results=completed_results
)
replan_count += 1
elif all_failed:
break
# 生成最终报告
final_report = await self._synthesize_results(task, plan, completed_results)
print(f"\n{'='*60}")
print(f"执行完成!{plan.summary()}")
print(f"{'='*60}")
return {
"success": not plan.has_failed(),
"answer": final_report,
"plan_summary": plan.summary(),
"steps_completed": len(completed_results),
"replan_count": replan_count
}
async def _execute_with_error_handling(
self,
step: ExecutionStep,
context: dict,
goal: str
) -> str:
"""带错误处理的步骤执行"""
step.status = "running"
try:
return await self.executor.execute_step(step, context, goal)
except Exception as e:
raise RuntimeError(f"步骤执行失败: {str(e)}")
async def _synthesize_results(
self,
task: str,
plan: ExecutionPlan,
results: dict
) -> str:
"""将所有步骤结果综合为最终答案"""
results_str = "\n\n".join([
f"步骤 {sid} 结果:\n{result}"
for sid, result in results.items()
])
response = await self.client.chat.completions.create(
model=self.planner.model,
messages=[
{
"role": "system",
"content": "你是一个结果综合专家。将多个步骤的执行结果综合为完整、连贯的最终答案。"
},
{
"role": "user",
"content": f"原始任务:{task}\n\n各步骤执行结果:\n{results_str}\n\n请综合以上结果,给出完整的最终答案。"
}
],
temperature=0.3,
max_tokens=2048
)
return response.choices[0].message.content
# ─── 使用示例 ──────────────────────────────────────────────────
async def main():
agent = PlanAndExecuteAgent(
model="NousResearch/Hermes-3-Llama-3.1-8B",
base_url="http://localhost:8000/v1",
max_replan_attempts=2
)
result = await agent.run(
"撰写一篇关于'2024年AI大模型发展趋势'的3000字研究报告,"
"要求包含:主要模型对比、技术突破、商业化进展、未来展望四个部分"
)
print("\n最终报告:")
print(result["answer"])
if __name__ == "__main__":
asyncio.run(main())
52.4 规划失败的处理策略
当计划中的步骤失败时,Plan-and-Execute 有三种应对策略:
策略一:跳过失败步骤
适用于:可选步骤、失败不影响后续的情况
async def skip_and_continue(plan: ExecutionPlan, failed_step: ExecutionStep):
"""标记失败步骤为跳过,继续执行其他步骤"""
failed_step.status = "skipped"
# 移除对失败步骤的依赖
for step in plan.steps:
if failed_step.step_id in step.depends_on:
step.depends_on.remove(failed_step.step_id)
策略二:重试(带退避)
async def retry_with_backoff(
executor: Executor,
step: ExecutionStep,
context: dict,
goal: str,
max_retries: int = 3
) -> str:
for attempt in range(max_retries):
try:
return await executor.execute_step(step, context, goal)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # 指数退避:1s, 2s, 4s
print(f" 步骤失败,{wait_time}s 后重试...")
await asyncio.sleep(wait_time)
策略三:全量重规划(已在主实现中展示)
适用于:核心步骤失败、需要根据失败信息调整整体策略
规划失败处理决策流
flowchart TD
F[步骤失败] --> Q1{是否关键步骤?}
Q1 -->|是| Q2{已重试次数?}
Q1 -->|否| SKIP[跳过,继续]
Q2 -->|< 3次| RETRY[重试]
Q2 -->|>= 3次| Q3{重规划次数?}
Q3 -->|< 2次| REPLAN[触发重规划]
Q3 -->|>= 2次| FAIL[任务失败,报告原因]
RETRY --> F
小结
本章系统讲解了 Plan-and-Execute 模式的设计哲学与实现:
- 与 ReAct 的本质区别:Plan-and-Execute 先全局规划再逐步执行,适合长任务;ReAct 即时决策,适合动态任务。
- 两阶段分工:Planner LLM 负责全局视角,Executor LLM 专注单步执行,职责分离使每个组件更专注。
- 并行执行:无依赖关系的步骤可并行执行,
asyncio.gather实现高效并发。 - 重规划机制:失败不是终点,而是触发重规划的信号。基于已完成工作的重规划比从零开始更高效。
- 三种失败策略:跳过(可选步骤)、重试(暂时性故障)、重规划(系统性问题),根据步骤重要性选择。
思考题
- 在什么情况下,Plan-and-Execute 的"预先规划"反而是劣势?(提示:考虑信息不完整的场景)
- 如何设计 Planner,使其生成的计划步骤粒度合适?太粗(步骤太少)和太细(步骤太多)各有什么问题?
- 如果不同步骤使用不同的 LLM(如 Planner 用 70B,Executor 用 8B),应该如何设计步骤描述使 8B 模型也能准确执行?
- Plan-and-Execute 的重规划需要消耗额外 LLM 调用。如何估算重规划的成本,并决定是否值得重规划?