第 52 章

Plan-and-Execute:先规划再行动

第52章:Plan-and-Execute:先规划再行动

导语

ReAct 是一种"走一步看一步"的策略——每次行动后立即处理观察结果,再决定下一步。这种即时反应性使其擅长动态、不确定的任务。然而,当面对复杂的长任务时,ReAct 可能在细节中迷失,看不见全局。Plan-and-Execute(规划后执行)模式提供了另一条路:先由专门的规划器制定完整计划,再由执行器逐步落实。本章深入探讨这两种范式的本质区别,以及如何在 Hermes Agent 中实现 Plan-and-Execute。


52.1 与 ReAct 的核心区别

flowchart TD
    subgraph ReAct["ReAct 模式(实时决策)"]
        direction LR
        R1[任务] --> R2[Thought] --> R3[Action] --> R4[Observe]
        R4 --> R2
        R2 --> R5[答案]
    end
    
    subgraph PE["Plan-and-Execute 模式(计划驱动)"]
        direction TB
        P1[任务] --> P2[Planner LLM\n生成完整计划]
        P2 --> P3[步骤 1]
        P2 --> P4[步骤 2]
        P2 --> P5[步骤 3]
        P3 --> E1[Executor\n执行步骤1]
        P4 --> E2[Executor\n执行步骤2]
        P5 --> E3[Executor\n执行步骤3]
        E1 --> AGG[结果汇总]
        E2 --> AGG
        E3 --> AGG
        AGG --> ANS[最终答案]
    end

核心对比

维度 ReAct Plan-and-Execute
决策时机 每步实时决策 预先规划,然后逐步执行
任务类型适合 动态、探索性、短任务 结构化、长任务、多步骤
灵活性 高(随时调整) 低(计划确定后难以改变)
全局一致性 低(容易迷失) 高(始终有完整蓝图)
并行执行潜力 有限(依赖前序观察) 高(独立步骤可并行)
调试可视性 低(步骤隐含在循环中) 高(计划明确可见)
需要的 LLM 调用次数 每步一次 规划一次 + 每步一次

何时选择 Plan-and-Execute

适合 Plan-and-Execute 的场景:

不适合的场景:


52.2 两阶段设计:Planner LLM + Executor LLM

Plan-and-Execute 的核心创新是将规划智能执行智能分离:

Planner LLM:负责全局规划,接受任务描述,输出结构化的执行计划(步骤列表)。Planner 需要有强大的推理能力,但不需要了解工具的具体实现细节。

Executor LLM:负责执行单个步骤,接受步骤描述,使用工具完成具体操作,返回执行结果。Executor 是 ReAct 模式的缩小版,专注于单个子任务。

# plan_and_execute.py
"""
Plan-and-Execute Agent 完整实现
"""

import json
import asyncio
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI

# ─── 数据结构 ──────────────────────────────────────────────────

@dataclass
class ExecutionStep:
    """执行计划中的单个步骤"""
    step_id: int
    title: str                  # 步骤标题
    description: str            # 详细描述
    depends_on: list[int] = field(default_factory=list)  # 依赖的前序步骤
    tool_hints: list[str] = field(default_factory=list)  # 建议使用的工具
    
    # 执行状态
    status: str = "pending"     # pending/running/success/failed
    result: Optional[str] = None
    error: Optional[str] = None


@dataclass
class ExecutionPlan:
    """完整执行计划"""
    goal: str
    steps: list[ExecutionStep]
    created_at: float = field(default_factory=lambda: __import__("time").time())
    
    def get_ready_steps(self) -> list[ExecutionStep]:
        """获取所有依赖已满足、可以执行的步骤"""
        completed_ids = {
            s.step_id for s in self.steps if s.status == "success"
        }
        return [
            s for s in self.steps
            if s.status == "pending" and all(dep in completed_ids for dep in s.depends_on)
        ]
    
    def is_complete(self) -> bool:
        return all(s.status in ("success", "failed") for s in self.steps)
    
    def has_failed(self) -> bool:
        return any(s.status == "failed" for s in self.steps)
    
    def summary(self) -> str:
        total = len(self.steps)
        success = sum(1 for s in self.steps if s.status == "success")
        failed = sum(1 for s in self.steps if s.status == "failed")
        pending = sum(1 for s in self.steps if s.status == "pending")
        return f"总步骤: {total}, 成功: {success}, 失败: {failed}, 待执行: {pending}"

52.3 完整实现

# ─── Planner LLM ──────────────────────────────────────────────

class Planner:
    """
    任务规划器:将复杂任务分解为有序执行步骤
    """
    
    PLANNER_SYSTEM = """你是一个任务规划专家。给定一个复杂任务,你需要将其分解为
清晰的、有序的执行步骤。

输出格式(JSON):
{
  "steps": [
    {
      "step_id": 1,
      "title": "步骤标题",
      "description": "详细操作描述,说明需要做什么、为什么",
      "depends_on": [],
      "tool_hints": ["web_search", "code_exec"]
    }
  ]
}

规划原则:
1. 每个步骤应该是可独立执行的原子操作
2. 明确标注步骤间的依赖关系
3. 可以并行执行的步骤不要设置依赖
4. 步骤描述要足够具体,执行者能够直接理解并操作
5. 步骤数量控制在 3-10 个,避免过度分解"""
    
    def __init__(self, client: AsyncOpenAI, model: str):
        self.client = client
        self.model = model
    
    async def create_plan(self, task: str, context: str = "") -> ExecutionPlan:
        """将任务分解为执行计划"""
        
        user_msg = f"任务:{task}"
        if context:
            user_msg += f"\n\n背景信息:{context}"
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.PLANNER_SYSTEM},
                {"role": "user", "content": user_msg}
            ],
            response_format={"type": "json_object"},
            temperature=0.2,
            max_tokens=2048
        )
        
        plan_data = json.loads(response.choices[0].message.content)
        
        steps = [
            ExecutionStep(
                step_id=s["step_id"],
                title=s["title"],
                description=s["description"],
                depends_on=s.get("depends_on", []),
                tool_hints=s.get("tool_hints", [])
            )
            for s in plan_data["steps"]
        ]
        
        print(f"\n[Planner] 为任务制定了 {len(steps)} 步计划:")
        for step in steps:
            deps = f" (依赖: {step.depends_on})" if step.depends_on else ""
            print(f"  步骤 {step.step_id}: {step.title}{deps}")
        
        return ExecutionPlan(goal=task, steps=steps)
    
    async def replan(
        self, 
        original_plan: ExecutionPlan,
        failed_step: ExecutionStep,
        completed_results: dict
    ) -> ExecutionPlan:
        """
        当某步骤失败时,基于已完成的工作重新规划
        """
        completed_summary = "\n".join([
            f"步骤 {sid}: {result[:200]}"
            for sid, result in completed_results.items()
        ])
        
        user_msg = f"""原始任务:{original_plan.goal}

已完成的步骤:
{completed_summary}

失败的步骤:
- 标题:{failed_step.title}
- 描述:{failed_step.description}
- 错误:{failed_step.error}

请基于已完成的工作,制定新的计划来完成剩余的目标。"""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.PLANNER_SYSTEM},
                {"role": "user", "content": user_msg}
            ],
            response_format={"type": "json_object"},
            temperature=0.3,
            max_tokens=2048
        )
        
        plan_data = json.loads(response.choices[0].message.content)
        steps = [
            ExecutionStep(**s) for s in plan_data["steps"]
        ]
        
        print(f"\n[Planner] 重新规划:{len(steps)} 步新计划")
        return ExecutionPlan(goal=original_plan.goal, steps=steps)


# ─── Executor LLM ─────────────────────────────────────────────

class Executor:
    """
    步骤执行器:执行单个计划步骤
    本质上是一个小型 ReAct Agent,专注于单个子任务
    """
    
    EXECUTOR_SYSTEM = """你是一个精确的任务执行者。给定一个具体的执行步骤,
你需要使用可用的工具来完成这个步骤,并返回清晰的执行结果。

执行原则:
1. 专注于完成当前步骤,不要偏离目标
2. 如果工具调用失败,尝试替代方案
3. 执行结果要简洁、结构化,便于后续步骤使用
4. 如果步骤无法完成,明确说明原因"""
    
    def __init__(self, client: AsyncOpenAI, model: str, tools: list):
        self.client = client
        self.model = model
        self.tools = tools
        self.max_iterations = 8  # 每步最多 8 轮工具调用
    
    async def execute_step(
        self,
        step: ExecutionStep,
        context: dict,         # 前序步骤的结果
        plan_goal: str
    ) -> str:
        """执行单个步骤,返回执行结果"""
        
        # 构建步骤执行提示词
        context_str = ""
        if context:
            context_parts = []
            for step_id, result in context.items():
                context_parts.append(f"步骤 {step_id} 的结果: {result[:500]}")
            context_str = "\n\n前序步骤的结果:\n" + "\n".join(context_parts)
        
        user_msg = f"""整体目标:{plan_goal}

当前需要完成的步骤:
标题:{step.title}
描述:{step.description}
建议工具:{', '.join(step.tool_hints) or '根据需要选择'}
{context_str}

请完成这个步骤并返回结果。"""
        
        messages = [
            {"role": "system", "content": self.EXECUTOR_SYSTEM},
            {"role": "user", "content": user_msg}
        ]
        
        print(f"\n[Executor] 开始执行步骤 {step.step_id}: {step.title}")
        
        # 内部 ReAct 循环(专注于单步)
        for iteration in range(self.max_iterations):
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.tools,
                tool_choice="auto",
                temperature=0.1,
                max_tokens=1024
            )
            
            msg = response.choices[0].message
            
            if msg.tool_calls:
                # 执行工具调用(简化示例)
                tool_results = []
                for tc in msg.tool_calls:
                    result = f"[工具 {tc.function.name} 执行结果]"  # 实际需调用真实工具
                    tool_results.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result
                    })
                
                messages.append({
                    "role": "assistant",
                    "content": msg.content,
                    "tool_calls": [tc.model_dump() for tc in msg.tool_calls]
                })
                messages.extend(tool_results)
                
            else:
                # 步骤完成
                result = msg.content or "步骤完成,无明确输出。"
                print(f"[Executor] 步骤 {step.step_id} 完成:{result[:100]}...")
                return result
        
        raise RuntimeError(f"步骤 {step.step_id} 在 {self.max_iterations} 次迭代后未完成")


# ─── Plan-and-Execute 主控制器 ─────────────────────────────────

class PlanAndExecuteAgent:
    """
    Plan-and-Execute Agent 主控制器
    
    支持:
    - 并行执行独立步骤
    - 失败重规划
    - 完整执行日志
    """
    
    def __init__(
        self,
        model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
        base_url: str = "http://localhost:8000/v1",
        api_key: str = "not-needed",
        max_replan_attempts: int = 2
    ):
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        self.planner = Planner(self.client, model)
        
        # 注册工具(格式与 ReAct 相同)
        tools_schema = self._build_tools_schema()
        self.executor = Executor(self.client, model, tools_schema)
        
        self.max_replan_attempts = max_replan_attempts
    
    def _build_tools_schema(self) -> list:
        return [
            {
                "type": "function",
                "function": {
                    "name": "web_search",
                    "description": "搜索互联网信息",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string"}
                        },
                        "required": ["query"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "write_file",
                    "description": "将内容写入文件",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "filename": {"type": "string"},
                            "content": {"type": "string"}
                        },
                        "required": ["filename", "content"]
                    }
                }
            }
        ]
    
    async def run(self, task: str) -> dict:
        """
        执行完整的 Plan-and-Execute 流程
        """
        print(f"\n{'='*60}")
        print(f"Plan-and-Execute Agent")
        print(f"任务:{task}")
        print(f"{'='*60}")
        
        # 阶段1:规划
        plan = await self.planner.create_plan(task)
        
        completed_results = {}  # step_id -> result
        replan_count = 0
        
        # 阶段2:执行循环
        while not plan.is_complete():
            ready_steps = plan.get_ready_steps()
            
            if not ready_steps:
                print("[控制器] 无可执行步骤,检查依赖关系...")
                break
            
            print(f"\n[控制器] 当前可并行执行 {len(ready_steps)} 个步骤")
            
            # 并行执行所有就绪步骤
            tasks = [
                self._execute_with_error_handling(step, completed_results, plan.goal)
                for step in ready_steps
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理执行结果
            all_failed = True
            for step, result in zip(ready_steps, results):
                if isinstance(result, Exception):
                    step.status = "failed"
                    step.error = str(result)
                    print(f"[控制器] 步骤 {step.step_id} 失败:{step.error}")
                else:
                    step.status = "success"
                    step.result = result
                    completed_results[step.step_id] = result
                    all_failed = False
            
            # 如果有步骤失败,尝试重规划
            if plan.has_failed() and replan_count < self.max_replan_attempts:
                failed_step = next(s for s in plan.steps if s.status == "failed")
                print(f"\n[控制器] 触发重规划(第 {replan_count + 1} 次)")
                
                plan = await self.planner.replan(
                    original_plan=plan,
                    failed_step=failed_step,
                    completed_results=completed_results
                )
                replan_count += 1
            elif all_failed:
                break
        
        # 生成最终报告
        final_report = await self._synthesize_results(task, plan, completed_results)
        
        print(f"\n{'='*60}")
        print(f"执行完成!{plan.summary()}")
        print(f"{'='*60}")
        
        return {
            "success": not plan.has_failed(),
            "answer": final_report,
            "plan_summary": plan.summary(),
            "steps_completed": len(completed_results),
            "replan_count": replan_count
        }
    
    async def _execute_with_error_handling(
        self,
        step: ExecutionStep,
        context: dict,
        goal: str
    ) -> str:
        """带错误处理的步骤执行"""
        step.status = "running"
        try:
            return await self.executor.execute_step(step, context, goal)
        except Exception as e:
            raise RuntimeError(f"步骤执行失败: {str(e)}")
    
    async def _synthesize_results(
        self,
        task: str,
        plan: ExecutionPlan,
        results: dict
    ) -> str:
        """将所有步骤结果综合为最终答案"""
        
        results_str = "\n\n".join([
            f"步骤 {sid} 结果:\n{result}"
            for sid, result in results.items()
        ])
        
        response = await self.client.chat.completions.create(
            model=self.planner.model,
            messages=[
                {
                    "role": "system",
                    "content": "你是一个结果综合专家。将多个步骤的执行结果综合为完整、连贯的最终答案。"
                },
                {
                    "role": "user",
                    "content": f"原始任务:{task}\n\n各步骤执行结果:\n{results_str}\n\n请综合以上结果,给出完整的最终答案。"
                }
            ],
            temperature=0.3,
            max_tokens=2048
        )
        
        return response.choices[0].message.content


# ─── 使用示例 ──────────────────────────────────────────────────

async def main():
    agent = PlanAndExecuteAgent(
        model="NousResearch/Hermes-3-Llama-3.1-8B",
        base_url="http://localhost:8000/v1",
        max_replan_attempts=2
    )
    
    result = await agent.run(
        "撰写一篇关于'2024年AI大模型发展趋势'的3000字研究报告,"
        "要求包含:主要模型对比、技术突破、商业化进展、未来展望四个部分"
    )
    
    print("\n最终报告:")
    print(result["answer"])


if __name__ == "__main__":
    asyncio.run(main())

52.4 规划失败的处理策略

当计划中的步骤失败时,Plan-and-Execute 有三种应对策略:

策略一:跳过失败步骤

适用于:可选步骤、失败不影响后续的情况

async def skip_and_continue(plan: ExecutionPlan, failed_step: ExecutionStep):
    """标记失败步骤为跳过,继续执行其他步骤"""
    failed_step.status = "skipped"
    # 移除对失败步骤的依赖
    for step in plan.steps:
        if failed_step.step_id in step.depends_on:
            step.depends_on.remove(failed_step.step_id)

策略二:重试(带退避)

async def retry_with_backoff(
    executor: Executor,
    step: ExecutionStep,
    context: dict,
    goal: str,
    max_retries: int = 3
) -> str:
    for attempt in range(max_retries):
        try:
            return await executor.execute_step(step, context, goal)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt  # 指数退避:1s, 2s, 4s
            print(f"  步骤失败,{wait_time}s 后重试...")
            await asyncio.sleep(wait_time)

策略三:全量重规划(已在主实现中展示)

适用于:核心步骤失败、需要根据失败信息调整整体策略

规划失败处理决策流

flowchart TD
    F[步骤失败] --> Q1{是否关键步骤?}
    Q1 -->|是| Q2{已重试次数?}
    Q1 -->|否| SKIP[跳过,继续]
    Q2 -->|< 3次| RETRY[重试]
    Q2 -->|>= 3次| Q3{重规划次数?}
    Q3 -->|< 2次| REPLAN[触发重规划]
    Q3 -->|>= 2次| FAIL[任务失败,报告原因]
    RETRY --> F

小结

本章系统讲解了 Plan-and-Execute 模式的设计哲学与实现:

  1. 与 ReAct 的本质区别:Plan-and-Execute 先全局规划再逐步执行,适合长任务;ReAct 即时决策,适合动态任务。
  2. 两阶段分工:Planner LLM 负责全局视角,Executor LLM 专注单步执行,职责分离使每个组件更专注。
  3. 并行执行:无依赖关系的步骤可并行执行,asyncio.gather 实现高效并发。
  4. 重规划机制:失败不是终点,而是触发重规划的信号。基于已完成工作的重规划比从零开始更高效。
  5. 三种失败策略:跳过(可选步骤)、重试(暂时性故障)、重规划(系统性问题),根据步骤重要性选择。

思考题

  1. 在什么情况下,Plan-and-Execute 的"预先规划"反而是劣势?(提示:考虑信息不完整的场景)
  2. 如何设计 Planner,使其生成的计划步骤粒度合适?太粗(步骤太少)和太细(步骤太多)各有什么问题?
  3. 如果不同步骤使用不同的 LLM(如 Planner 用 70B,Executor 用 8B),应该如何设计步骤描述使 8B 模型也能准确执行?
  4. Plan-and-Execute 的重规划需要消耗额外 LLM 调用。如何估算重规划的成本,并决定是否值得重规划?
本章评分
4.5  / 5  (3 评分)

💬 留言讨论