Plan-and-Execute: Plan First, Then Act
Chapter 52: Plan-and-Execute: Plan First, Then Act
Introduction
ReAct is a "step-by-step, see as you go" strategy—each action immediately processes its observation before deciding the next step. This reactivity makes it excellent for dynamic, uncertain tasks. However, when facing complex long-horizon tasks, ReAct can get lost in details and lose sight of the big picture. Plan-and-Execute provides an alternative: a dedicated planner first generates a complete plan, then an executor methodically implements it. This chapter explores the fundamental differences between these two paradigms and how to implement Plan-and-Execute in Hermes Agent.
52.1 Core Differences from ReAct
flowchart TD
subgraph ReAct["ReAct (Reactive)"]
direction LR
R1[Task] --> R2[Thought] --> R3[Action] --> R4[Observe]
R4 --> R2
R2 --> R5[Answer]
end
subgraph PE["Plan-and-Execute (Proactive)"]
direction TB
P1[Task] --> P2[Planner LLM\nGenerates complete plan]
P2 --> P3[Step 1]
P2 --> P4[Step 2]
P2 --> P5[Step 3]
P3 --> E1[Executor\nStep 1]
P4 --> E2[Executor\nStep 2]
P5 --> E3[Executor\nStep 3]
E1 --> AGG[Synthesize]
E2 --> AGG
E3 --> AGG
AGG --> ANS[Final Answer]
end
Core Comparison
| Dimension | ReAct | Plan-and-Execute |
|---|---|---|
| Decision timing | Real-time per step | Upfront plan, then sequential execution |
| Best task type | Dynamic, exploratory, short tasks | Structured, long-horizon, multi-step |
| Flexibility | High (adjust anytime) | Lower (plan hard to change mid-flight) |
| Global coherence | Lower (can lose the thread) | High (always has full blueprint) |
| Parallelism potential | Limited (depends on prior observations) | High (independent steps run in parallel) |
| Debug visibility | Low (steps implicit in loop) | High (plan explicitly visible) |
| LLM calls | One per step | One for plan + one per step |
When to Choose Plan-and-Execute
Good fit:
- Competitive analysis reports (query multiple sources, integrate data, write report)
- Code refactoring (analyze codebase → make change plan → modify file by file)
- Research surveys (define topics → assign subtopics → parallel search → synthesize)
- Batch data processing (design pipeline → execute at scale)
Poor fit:
- Exploratory tasks requiring continuous direction changes based on intermediate results
- Scenarios where tool results are highly unpredictable
- Simple single-step Q&A
52.2 Two-Phase Design: Planner LLM + Executor LLM
The core innovation of Plan-and-Execute is separating planning intelligence from execution intelligence:
- Planner LLM: Responsible for global planning. Accepts a task description, outputs a structured execution plan (list of steps). Needs strong reasoning, but doesn't need to know tool implementation details.
- Executor LLM: Responsible for executing individual steps. Receives a step description, uses tools to complete specific operations, returns results. It's essentially a mini-ReAct Agent focused on a single subtask.
# plan_and_execute.py
import json
import asyncio
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI
@dataclass
class ExecutionStep:
step_id: int
title: str
description: str
depends_on: list = field(default_factory=list)
tool_hints: list = field(default_factory=list)
status: str = "pending" # pending/running/success/failed/skipped
result: Optional[str] = None
error: Optional[str] = None
@dataclass
class ExecutionPlan:
goal: str
steps: list
def get_ready_steps(self) -> list:
completed_ids = {s.step_id for s in self.steps if s.status == "success"}
return [
s for s in self.steps
if s.status == "pending" and all(dep in completed_ids for dep in s.depends_on)
]
def is_complete(self) -> bool:
return all(s.status in ("success", "failed", "skipped") for s in self.steps)
def has_failed(self) -> bool:
return any(s.status == "failed" for s in self.steps)
class Planner:
SYSTEM = """You are a task planning expert. Given a complex task, decompose it
into clear, ordered execution steps.
Output JSON format:
{
"steps": [
{
"step_id": 1,
"title": "Step title",
"description": "Detailed description: what to do and why",
"depends_on": [],
"tool_hints": ["web_search", "write_file"]
}
]
}
Planning principles:
1. Each step should be an independently executable atomic operation
2. Clearly mark dependencies between steps
3. Steps that can run in parallel should have no dependency between them
4. Descriptions must be specific enough for an executor to act without clarification
5. Keep steps between 3-10; avoid over-decomposition"""
def __init__(self, client: AsyncOpenAI, model: str):
self.client = client
self.model = model
async def create_plan(self, task: str) -> ExecutionPlan:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM},
{"role": "user", "content": f"Task: {task}"}
],
response_format={"type": "json_object"},
temperature=0.2,
max_tokens=2048
)
data = json.loads(response.choices[0].message.content)
steps = [ExecutionStep(**s) for s in data["steps"]]
print(f"\n[Planner] Created {len(steps)}-step plan:")
for step in steps:
deps = f" (depends on: {step.depends_on})" if step.depends_on else ""
print(f" Step {step.step_id}: {step.title}{deps}")
return ExecutionPlan(goal=task, steps=steps)
async def replan(
self,
original_plan: ExecutionPlan,
failed_step: ExecutionStep,
completed_results: dict
) -> ExecutionPlan:
completed_summary = "\n".join([
f"Step {sid}: {result[:200]}" for sid, result in completed_results.items()
])
user_msg = f"""Original task: {original_plan.goal}
Completed steps:
{completed_summary}
Failed step:
- Title: {failed_step.title}
- Description: {failed_step.description}
- Error: {failed_step.error}
Based on completed work, create a new plan to finish the remaining goal."""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM},
{"role": "user", "content": user_msg}
],
response_format={"type": "json_object"},
temperature=0.3,
max_tokens=2048
)
data = json.loads(response.choices[0].message.content)
steps = [ExecutionStep(**s) for s in data["steps"]]
print(f"\n[Planner] Replanned: {len(steps)}-step new plan")
return ExecutionPlan(goal=original_plan.goal, steps=steps)
class Executor:
SYSTEM = """You are a precise task executor. Given a specific step, use available
tools to complete it and return a clear result.
Rules:
1. Stay focused on the current step
2. If a tool fails, try an alternative approach
3. Return concise, structured results usable by subsequent steps
4. If the step cannot be completed, explain why clearly"""
def __init__(self, client: AsyncOpenAI, model: str, tools: list):
self.client = client
self.model = model
self.tools = tools
self.max_iterations = 8
async def execute_step(self, step: ExecutionStep, context: dict, goal: str) -> str:
context_str = ""
if context:
parts = [f"Step {sid} result: {result[:400]}" for sid, result in context.items()]
context_str = "\n\nPrevious step results:\n" + "\n".join(parts)
user_msg = f"""Overall goal: {goal}
Current step to complete:
Title: {step.title}
Description: {step.description}
Suggested tools: {', '.join(step.tool_hints) or 'choose as needed'}
{context_str}
Please complete this step and return the result."""
messages = [
{"role": "system", "content": self.SYSTEM},
{"role": "user", "content": user_msg}
]
print(f"\n[Executor] Starting step {step.step_id}: {step.title}")
for iteration in range(self.max_iterations):
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools,
tool_choice="auto",
temperature=0.1,
max_tokens=1024
)
msg = response.choices[0].message
if msg.tool_calls:
tool_results = []
for tc in msg.tool_calls:
result = f"[{tc.function.name} result placeholder]"
tool_results.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result
})
messages.append({
"role": "assistant",
"content": msg.content,
"tool_calls": [tc.model_dump() for tc in msg.tool_calls]
})
messages.extend(tool_results)
else:
result = msg.content or "Step completed."
print(f"[Executor] Step {step.step_id} done: {result[:100]}...")
return result
raise RuntimeError(f"Step {step.step_id} did not complete within {self.max_iterations} iterations")
class PlanAndExecuteAgent:
"""Plan-and-Execute Agent with parallel execution and replanning."""
def __init__(
self,
model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
base_url: str = "http://localhost:8000/v1",
api_key: str = "not-needed",
max_replan_attempts: int = 2
):
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
self.planner = Planner(self.client, model)
tools = [{"type": "function", "function": {
"name": "web_search",
"description": "Search the internet",
"parameters": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}
}}]
self.executor = Executor(self.client, model, tools)
self.max_replan_attempts = max_replan_attempts
async def run(self, task: str) -> dict:
print(f"\n{'='*60}\nPlan-and-Execute Agent\nTask: {task}\n{'='*60}")
plan = await self.planner.create_plan(task)
completed_results = {}
replan_count = 0
while not plan.is_complete():
ready_steps = plan.get_ready_steps()
if not ready_steps:
break
print(f"\n[Controller] Executing {len(ready_steps)} steps in parallel")
tasks = [
self._execute_with_error_handling(step, completed_results, plan.goal)
for step in ready_steps
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for step, result in zip(ready_steps, results):
if isinstance(result, Exception):
step.status = "failed"
step.error = str(result)
print(f"[Controller] Step {step.step_id} failed: {step.error}")
else:
step.status = "success"
step.result = result
completed_results[step.step_id] = result
if plan.has_failed() and replan_count < self.max_replan_attempts:
failed_step = next(s for s in plan.steps if s.status == "failed")
print(f"\n[Controller] Triggering replan (attempt {replan_count + 1})")
plan = await self.planner.replan(plan, failed_step, completed_results)
replan_count += 1
final = await self._synthesize(task, completed_results)
total = len(plan.steps)
success = sum(1 for s in plan.steps if s.status == "success")
return {
"success": not plan.has_failed(),
"answer": final,
"steps_total": total,
"steps_completed": success,
"replan_count": replan_count
}
async def _execute_with_error_handling(self, step, context, goal) -> str:
step.status = "running"
try:
return await self.executor.execute_step(step, context, goal)
except Exception as e:
raise RuntimeError(str(e))
async def _synthesize(self, task: str, results: dict) -> str:
results_str = "\n\n".join([f"Step {sid}:\n{r}" for sid, r in results.items()])
response = await self.client.chat.completions.create(
model=self.planner.model,
messages=[
{"role": "system", "content": "Synthesize multiple step results into a complete, coherent final answer."},
{"role": "user", "content": f"Task: {task}\n\nStep results:\n{results_str}\n\nProvide the complete final answer."}
],
temperature=0.3,
max_tokens=2048
)
return response.choices[0].message.content
52.3 Failure Handling Strategies
Strategy 1: Skip Optional Steps
async def skip_and_continue(plan: ExecutionPlan, failed_step: ExecutionStep):
failed_step.status = "skipped"
for step in plan.steps:
if failed_step.step_id in step.depends_on:
step.depends_on.remove(failed_step.step_id)
Strategy 2: Retry with Backoff
async def retry_with_backoff(executor, step, context, goal, max_retries=3) -> str:
for attempt in range(max_retries):
try:
return await executor.execute_step(step, context, goal)
except Exception:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 1s, 2s, 4s
Strategy 3: Full Replan (shown in main implementation)
Best when a critical step fails and the overall strategy needs adjustment based on failure information.
Failure Decision Flow
flowchart TD
F[Step Failed] --> Q1{Critical step?}
Q1 -->|No| SKIP[Skip, continue]
Q1 -->|Yes| Q2{Retry count?}
Q2 -->|< 3| RETRY[Retry with backoff]
Q2 -->|>= 3| Q3{Replan count?}
Q3 -->|< 2| REPLAN[Trigger replan]
Q3 -->|>= 2| FAIL[Fail with report]
RETRY --> F
Summary
This chapter systematically explored the design philosophy and implementation of Plan-and-Execute:
- Core difference from ReAct: Plan-and-Execute plans globally first then executes methodically—ideal for long-horizon tasks. ReAct decides reactively per step—ideal for dynamic tasks.
- Two-phase separation: Planner LLM maintains global perspective; Executor LLM focuses on single-step execution. This separation makes each component more capable.
- Parallel execution: Independent steps run concurrently via
asyncio.gather, significantly improving throughput. - Replanning: Failure triggers informed replanning based on completed work—far more efficient than starting from scratch.
- Three failure strategies: Skip (optional steps), retry with backoff (transient failures), full replan (systemic issues)—chosen based on step criticality.
Review Questions
- In what scenarios does Plan-and-Execute's "upfront planning" become a disadvantage? (Hint: consider information-incomplete scenarios.)
- How do you design the Planner to generate steps at appropriate granularity? What are the problems with too-coarse (few steps) vs. too-fine (many steps) plans?
- If different steps use different LLMs (Planner uses 70B, Executor uses 8B), how should step descriptions be crafted so the 8B model can execute accurately?
- Replanning consumes additional LLM calls. How do you estimate replanning cost and decide whether it's worth triggering?