Chapter 52

Plan-and-Execute: Plan First, Then Act

Chapter 52: Plan-and-Execute: Plan First, Then Act

Introduction

ReAct is a "step-by-step, see as you go" strategy—each action immediately processes its observation before deciding the next step. This reactivity makes it excellent for dynamic, uncertain tasks. However, when facing complex long-horizon tasks, ReAct can get lost in details and lose sight of the big picture. Plan-and-Execute provides an alternative: a dedicated planner first generates a complete plan, then an executor methodically implements it. This chapter explores the fundamental differences between these two paradigms and how to implement Plan-and-Execute in Hermes Agent.


52.1 Core Differences from ReAct

flowchart TD
    subgraph ReAct["ReAct (Reactive)"]
        direction LR
        R1[Task] --> R2[Thought] --> R3[Action] --> R4[Observe]
        R4 --> R2
        R2 --> R5[Answer]
    end
    
    subgraph PE["Plan-and-Execute (Proactive)"]
        direction TB
        P1[Task] --> P2[Planner LLM\nGenerates complete plan]
        P2 --> P3[Step 1]
        P2 --> P4[Step 2]
        P2 --> P5[Step 3]
        P3 --> E1[Executor\nStep 1]
        P4 --> E2[Executor\nStep 2]
        P5 --> E3[Executor\nStep 3]
        E1 --> AGG[Synthesize]
        E2 --> AGG
        E3 --> AGG
        AGG --> ANS[Final Answer]
    end

Core Comparison

Dimension ReAct Plan-and-Execute
Decision timing Real-time per step Upfront plan, then sequential execution
Best task type Dynamic, exploratory, short tasks Structured, long-horizon, multi-step
Flexibility High (adjust anytime) Lower (plan hard to change mid-flight)
Global coherence Lower (can lose the thread) High (always has full blueprint)
Parallelism potential Limited (depends on prior observations) High (independent steps run in parallel)
Debug visibility Low (steps implicit in loop) High (plan explicitly visible)
LLM calls One per step One for plan + one per step

When to Choose Plan-and-Execute

Good fit:

Poor fit:


52.2 Two-Phase Design: Planner LLM + Executor LLM

The core innovation of Plan-and-Execute is separating planning intelligence from execution intelligence:

# plan_and_execute.py

import json
import asyncio
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI


@dataclass
class ExecutionStep:
    step_id: int
    title: str
    description: str
    depends_on: list = field(default_factory=list)
    tool_hints: list = field(default_factory=list)
    status: str = "pending"   # pending/running/success/failed/skipped
    result: Optional[str] = None
    error: Optional[str] = None


@dataclass
class ExecutionPlan:
    goal: str
    steps: list
    
    def get_ready_steps(self) -> list:
        completed_ids = {s.step_id for s in self.steps if s.status == "success"}
        return [
            s for s in self.steps
            if s.status == "pending" and all(dep in completed_ids for dep in s.depends_on)
        ]
    
    def is_complete(self) -> bool:
        return all(s.status in ("success", "failed", "skipped") for s in self.steps)
    
    def has_failed(self) -> bool:
        return any(s.status == "failed" for s in self.steps)


class Planner:
    SYSTEM = """You are a task planning expert. Given a complex task, decompose it 
into clear, ordered execution steps.

Output JSON format:
{
  "steps": [
    {
      "step_id": 1,
      "title": "Step title",
      "description": "Detailed description: what to do and why",
      "depends_on": [],
      "tool_hints": ["web_search", "write_file"]
    }
  ]
}

Planning principles:
1. Each step should be an independently executable atomic operation
2. Clearly mark dependencies between steps
3. Steps that can run in parallel should have no dependency between them
4. Descriptions must be specific enough for an executor to act without clarification
5. Keep steps between 3-10; avoid over-decomposition"""
    
    def __init__(self, client: AsyncOpenAI, model: str):
        self.client = client
        self.model = model
    
    async def create_plan(self, task: str) -> ExecutionPlan:
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.SYSTEM},
                {"role": "user", "content": f"Task: {task}"}
            ],
            response_format={"type": "json_object"},
            temperature=0.2,
            max_tokens=2048
        )
        
        data = json.loads(response.choices[0].message.content)
        steps = [ExecutionStep(**s) for s in data["steps"]]
        
        print(f"\n[Planner] Created {len(steps)}-step plan:")
        for step in steps:
            deps = f" (depends on: {step.depends_on})" if step.depends_on else ""
            print(f"  Step {step.step_id}: {step.title}{deps}")
        
        return ExecutionPlan(goal=task, steps=steps)
    
    async def replan(
        self,
        original_plan: ExecutionPlan,
        failed_step: ExecutionStep,
        completed_results: dict
    ) -> ExecutionPlan:
        completed_summary = "\n".join([
            f"Step {sid}: {result[:200]}" for sid, result in completed_results.items()
        ])
        
        user_msg = f"""Original task: {original_plan.goal}

Completed steps:
{completed_summary}

Failed step:
- Title: {failed_step.title}
- Description: {failed_step.description}
- Error: {failed_step.error}

Based on completed work, create a new plan to finish the remaining goal."""
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.SYSTEM},
                {"role": "user", "content": user_msg}
            ],
            response_format={"type": "json_object"},
            temperature=0.3,
            max_tokens=2048
        )
        
        data = json.loads(response.choices[0].message.content)
        steps = [ExecutionStep(**s) for s in data["steps"]]
        print(f"\n[Planner] Replanned: {len(steps)}-step new plan")
        return ExecutionPlan(goal=original_plan.goal, steps=steps)


class Executor:
    SYSTEM = """You are a precise task executor. Given a specific step, use available 
tools to complete it and return a clear result.

Rules:
1. Stay focused on the current step
2. If a tool fails, try an alternative approach
3. Return concise, structured results usable by subsequent steps
4. If the step cannot be completed, explain why clearly"""
    
    def __init__(self, client: AsyncOpenAI, model: str, tools: list):
        self.client = client
        self.model = model
        self.tools = tools
        self.max_iterations = 8
    
    async def execute_step(self, step: ExecutionStep, context: dict, goal: str) -> str:
        context_str = ""
        if context:
            parts = [f"Step {sid} result: {result[:400]}" for sid, result in context.items()]
            context_str = "\n\nPrevious step results:\n" + "\n".join(parts)
        
        user_msg = f"""Overall goal: {goal}

Current step to complete:
Title: {step.title}
Description: {step.description}
Suggested tools: {', '.join(step.tool_hints) or 'choose as needed'}
{context_str}

Please complete this step and return the result."""
        
        messages = [
            {"role": "system", "content": self.SYSTEM},
            {"role": "user", "content": user_msg}
        ]
        
        print(f"\n[Executor] Starting step {step.step_id}: {step.title}")
        
        for iteration in range(self.max_iterations):
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.tools,
                tool_choice="auto",
                temperature=0.1,
                max_tokens=1024
            )
            
            msg = response.choices[0].message
            
            if msg.tool_calls:
                tool_results = []
                for tc in msg.tool_calls:
                    result = f"[{tc.function.name} result placeholder]"
                    tool_results.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result
                    })
                
                messages.append({
                    "role": "assistant",
                    "content": msg.content,
                    "tool_calls": [tc.model_dump() for tc in msg.tool_calls]
                })
                messages.extend(tool_results)
            else:
                result = msg.content or "Step completed."
                print(f"[Executor] Step {step.step_id} done: {result[:100]}...")
                return result
        
        raise RuntimeError(f"Step {step.step_id} did not complete within {self.max_iterations} iterations")


class PlanAndExecuteAgent:
    """Plan-and-Execute Agent with parallel execution and replanning."""
    
    def __init__(
        self,
        model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
        base_url: str = "http://localhost:8000/v1",
        api_key: str = "not-needed",
        max_replan_attempts: int = 2
    ):
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        self.planner = Planner(self.client, model)
        tools = [{"type": "function", "function": {
            "name": "web_search",
            "description": "Search the internet",
            "parameters": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}
        }}]
        self.executor = Executor(self.client, model, tools)
        self.max_replan_attempts = max_replan_attempts
    
    async def run(self, task: str) -> dict:
        print(f"\n{'='*60}\nPlan-and-Execute Agent\nTask: {task}\n{'='*60}")
        
        plan = await self.planner.create_plan(task)
        completed_results = {}
        replan_count = 0
        
        while not plan.is_complete():
            ready_steps = plan.get_ready_steps()
            if not ready_steps:
                break
            
            print(f"\n[Controller] Executing {len(ready_steps)} steps in parallel")
            
            tasks = [
                self._execute_with_error_handling(step, completed_results, plan.goal)
                for step in ready_steps
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            for step, result in zip(ready_steps, results):
                if isinstance(result, Exception):
                    step.status = "failed"
                    step.error = str(result)
                    print(f"[Controller] Step {step.step_id} failed: {step.error}")
                else:
                    step.status = "success"
                    step.result = result
                    completed_results[step.step_id] = result
            
            if plan.has_failed() and replan_count < self.max_replan_attempts:
                failed_step = next(s for s in plan.steps if s.status == "failed")
                print(f"\n[Controller] Triggering replan (attempt {replan_count + 1})")
                plan = await self.planner.replan(plan, failed_step, completed_results)
                replan_count += 1
        
        final = await self._synthesize(task, completed_results)
        total = len(plan.steps)
        success = sum(1 for s in plan.steps if s.status == "success")
        
        return {
            "success": not plan.has_failed(),
            "answer": final,
            "steps_total": total,
            "steps_completed": success,
            "replan_count": replan_count
        }
    
    async def _execute_with_error_handling(self, step, context, goal) -> str:
        step.status = "running"
        try:
            return await self.executor.execute_step(step, context, goal)
        except Exception as e:
            raise RuntimeError(str(e))
    
    async def _synthesize(self, task: str, results: dict) -> str:
        results_str = "\n\n".join([f"Step {sid}:\n{r}" for sid, r in results.items()])
        response = await self.client.chat.completions.create(
            model=self.planner.model,
            messages=[
                {"role": "system", "content": "Synthesize multiple step results into a complete, coherent final answer."},
                {"role": "user", "content": f"Task: {task}\n\nStep results:\n{results_str}\n\nProvide the complete final answer."}
            ],
            temperature=0.3,
            max_tokens=2048
        )
        return response.choices[0].message.content

52.3 Failure Handling Strategies

Strategy 1: Skip Optional Steps

async def skip_and_continue(plan: ExecutionPlan, failed_step: ExecutionStep):
    failed_step.status = "skipped"
    for step in plan.steps:
        if failed_step.step_id in step.depends_on:
            step.depends_on.remove(failed_step.step_id)

Strategy 2: Retry with Backoff

async def retry_with_backoff(executor, step, context, goal, max_retries=3) -> str:
    for attempt in range(max_retries):
        try:
            return await executor.execute_step(step, context, goal)
        except Exception:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # 1s, 2s, 4s

Strategy 3: Full Replan (shown in main implementation)

Best when a critical step fails and the overall strategy needs adjustment based on failure information.

Failure Decision Flow

flowchart TD
    F[Step Failed] --> Q1{Critical step?}
    Q1 -->|No| SKIP[Skip, continue]
    Q1 -->|Yes| Q2{Retry count?}
    Q2 -->|< 3| RETRY[Retry with backoff]
    Q2 -->|>= 3| Q3{Replan count?}
    Q3 -->|< 2| REPLAN[Trigger replan]
    Q3 -->|>= 2| FAIL[Fail with report]
    RETRY --> F

Summary

This chapter systematically explored the design philosophy and implementation of Plan-and-Execute:

  1. Core difference from ReAct: Plan-and-Execute plans globally first then executes methodically—ideal for long-horizon tasks. ReAct decides reactively per step—ideal for dynamic tasks.
  2. Two-phase separation: Planner LLM maintains global perspective; Executor LLM focuses on single-step execution. This separation makes each component more capable.
  3. Parallel execution: Independent steps run concurrently via asyncio.gather, significantly improving throughput.
  4. Replanning: Failure triggers informed replanning based on completed work—far more efficient than starting from scratch.
  5. Three failure strategies: Skip (optional steps), retry with backoff (transient failures), full replan (systemic issues)—chosen based on step criticality.

Review Questions

  1. In what scenarios does Plan-and-Execute's "upfront planning" become a disadvantage? (Hint: consider information-incomplete scenarios.)
  2. How do you design the Planner to generate steps at appropriate granularity? What are the problems with too-coarse (few steps) vs. too-fine (many steps) plans?
  3. If different steps use different LLMs (Planner uses 70B, Executor uses 8B), how should step descriptions be crafted so the 8B model can execute accurately?
  4. Replanning consumes additional LLM calls. How do you estimate replanning cost and decide whether it's worth triggering?
Rate this chapter
4.5  / 5  (3 ratings)

💬 Comments