Multi-Agent Collaboration: Orchestrator Pattern
Chapter 54: Multi-Agent Collaboration: Orchestrator Pattern
Introduction
Even a highly capable single Agent struggles when facing tasks requiring multi-domain expertise. Just as companies have a CEO handling strategic decisions while department experts handle executionโthe Orchestrator pattern brings this management philosophy to Agent systems: one master Agent (Orchestrator) understands the overall task and crafts dispatch strategies, while multiple specialized Worker Agents each focus on their specific subtasks. This chapter explores the Orchestrator pattern's design principles, Hermes implementation, and a complete three-Agent collaboration case study.
54.1 Orchestrator Design Philosophy
Core Idea: Separation of Responsibilities
The Orchestrator pattern fundamentally separates concerns:
- Orchestrator: Knows "what to do" (What) and "who should do it" (Who), but not "how to do it" (How)
- Worker Agents: Know "how to do it" (How), focus on specific domains, don't worry about the overall task
This separation brings key advantages:
- Specialization: Each Worker can be optimized with tailored system prompts and tool sets
- Parallelism: Independent subtasks run concurrently
- Replaceability: Workers can be upgraded without changing Orchestrator logic
- Observability: Orchestrator provides a clear task decomposition view
flowchart TD
User([User]) --> Orch[Orchestrator\nMaster Agent]
Orch --> |"Task A: Research"| W1[Research Worker\nSearch Expert]
Orch --> |"Task B: Analysis"| W2[Analysis Worker\nData Expert]
Orch --> |"Task C: Writing"| W3[Writing Worker\nContent Expert]
W1 --> |Result A| Orch
W2 --> |Result B| Orch
W3 --> |Result C| Orch
Orch --> |"Final Report"| User
W1 --> WebSearch[(Web Search)]
W2 --> CodeExec[(Code Execution)]
W3 --> DocTools[(Document Tools)]
Isolation and Communication Constraints in Hermes
In the current Hermes Agent architecture, sub-Agents are mutually isolated:
- Each sub-Agent has an independent context window
- No direct communication channel between sub-Agents
- All coordination happens via the Orchestrator through message passing
This is both a limitation and an advantageโisolation ensures sub-Agents don't interfere with each other, and each can be independently scaled and tested.
54.2 Task Decomposition Strategies
| Decomposition Dimension | Description | Example |
|---|---|---|
| Domain expertise | Split by required knowledge | Legal question โ Legal Agent, Tech question โ Code Agent |
| Tool dependency | Split by required toolset | Search tasks โ Web Agent, Compute tasks โ Data Agent |
| Parallelism | Independent subtasks run concurrently | Simultaneously research multiple topics |
| Sequential dependency | Tasks requiring prior results run serially | Search facts first, then write based on facts |
| Data volume | Partition by data size | 100 URLs split into 10 groups of 10 |
54.3 Complete Implementation
# orchestrator_agent.py
import asyncio
import json
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI
@dataclass
class SubTask:
task_id: str
worker_type: str
title: str
description: str
inputs: dict = field(default_factory=dict)
priority: int = 0
status: str = "pending"
result: Optional[str] = None
error: Optional[str] = None
@dataclass
class OrchestrationPlan:
overall_goal: str
subtasks: list
def get_ready_tasks(self, completed: dict) -> list:
return [
t for t in self.subtasks
if t.status == "pending" and all(dep in completed for dep in t.inputs.keys())
]
def is_complete(self) -> bool:
return all(t.status in ("success", "failed") for t in self.subtasks)
class ResearchWorker:
SYSTEM = """You are a professional information researcher.
Your responsibilities:
- Systematically search and collect relevant information
- Verify accuracy and timeliness of information
- Summarize key findings from multiple sources
- Provide concise, well-sourced research summaries
Output format:
1. Key findings (3-5 points)
2. Data support (specific numbers, dates, sources)
3. Information gaps (what couldn't be verified)"""
def __init__(self, client: AsyncOpenAI, model: str):
self.client = client
self.model = model
async def execute(self, task: SubTask) -> str:
context = ""
if task.inputs:
context = "\n\nAvailable prior results:\n" + json.dumps(task.inputs)
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM},
{"role": "user", "content": f"Research task: {task.title}\n\nDetails: {task.description}{context}"}
],
temperature=0.2,
max_tokens=2048
)
return response.choices[0].message.content or "Research complete"
class AnalysisWorker:
SYSTEM = """You are a professional data analyst.
Your responsibilities:
- Process and analyze structured/unstructured data
- Identify trends, patterns, and anomalies
- Generate executable Python analysis code when needed
- Provide data-driven insights and conclusions
Output format:
1. Main findings (data-supported conclusions)
2. Analysis method used
3. Key metrics (specific numbers)
4. Limitations (data quality or sample issues)"""
def __init__(self, client: AsyncOpenAI, model: str):
self.client = client
self.model = model
async def execute(self, task: SubTask) -> str:
input_data = ""
if task.inputs:
input_data = "\n\nAvailable input data:\n" + json.dumps(task.inputs, indent=2)
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM},
{"role": "user", "content": f"Analysis task: {task.title}\n\nRequirements: {task.description}{input_data}"}
],
temperature=0.1,
max_tokens=2048
)
return response.choices[0].message.content or "Analysis complete"
class WritingWorker:
SYSTEM = """You are a professional technical writer.
Your responsibilities:
- Transform research findings and data analysis into clear, readable content
- Maintain logical structure and smooth prose
- Ensure content accuracy and professionalism
- Adapt language style to target audience
Writing principles:
- Every argument must be supported by data or facts
- Use clear heading hierarchy to organize content
- Avoid empty filler phrases; deliver direct value"""
def __init__(self, client: AsyncOpenAI, model: str):
self.client = client
self.model = model
async def execute(self, task: SubTask) -> str:
input_content = "\n\n".join([
f"=== Source {tid} ===\n{content}"
for tid, content in task.inputs.items()
if content
])
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM},
{"role": "user", "content": f"Writing task: {task.title}\n\nRequirements: {task.description}\n\nSource material:\n{input_content}"}
],
temperature=0.4,
max_tokens=4096
)
return response.choices[0].message.content or "Writing complete"
class OrchestratorAgent:
ORCHESTRATOR_SYSTEM = """You are a task coordination expert (Orchestrator).
Analyze complex tasks and decompose them into subtasks for specialized Workers.
Available Worker types:
- research: Information search, fact-checking, data collection
- analysis: Data processing, statistical analysis, trend identification
- writing: Content creation, report writing, language optimization
Output JSON format:
{
"subtasks": [
{
"task_id": "task_001",
"worker_type": "research",
"title": "Subtask title",
"description": "Detailed task description",
"depends_on": [],
"priority": 1
}
]
}
Principles:
1. Assign tasks to the most appropriate Worker type
2. Mark dependencies via depends_on (list task_ids of prerequisite tasks)
3. Tasks without dependencies can run in parallel
4. Descriptions must be specific enough for Workers to act without clarification"""
def __init__(
self,
model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
base_url: str = "http://localhost:8000/v1",
api_key: str = "not-needed"
):
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
self.model = model
self.workers = {
"research": ResearchWorker(self.client, model),
"analysis": AnalysisWorker(self.client, model),
"writing": WritingWorker(self.client, model)
}
async def _create_plan(self, task: str) -> OrchestrationPlan:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.ORCHESTRATOR_SYSTEM},
{"role": "user", "content": f"Create a Worker assignment plan for:\n\n{task}"}
],
response_format={"type": "json_object"},
temperature=0.2,
max_tokens=2048
)
data = json.loads(response.choices[0].message.content)
subtasks = [
SubTask(
task_id=s["task_id"],
worker_type=s["worker_type"],
title=s["title"],
description=s["description"],
inputs={dep: None for dep in s.get("depends_on", [])},
priority=s.get("priority", 0)
)
for s in data["subtasks"]
]
print(f"\n[Orchestrator] Created {len(subtasks)}-task plan:")
for st in subtasks:
deps = list(st.inputs.keys())
parallel = "" if deps else " (parallel)"
print(f" [{st.task_id}] {st.worker_type}: {st.title}{parallel}")
return OrchestrationPlan(overall_goal=task, subtasks=subtasks)
async def _dispatch(self, subtask: SubTask, completed: dict) -> tuple:
for dep_id in list(subtask.inputs.keys()):
if dep_id in completed:
subtask.inputs[dep_id] = completed[dep_id]
worker = self.workers.get(subtask.worker_type)
if not worker:
raise ValueError(f"Unknown worker type: {subtask.worker_type}")
print(f" โ Dispatching to {subtask.worker_type}: {subtask.title}")
result = await worker.execute(subtask)
return subtask.task_id, result
async def _synthesize(self, task: str, plan: OrchestrationPlan, completed: dict) -> str:
results_str = "\n\n".join([
f"### {st.title} [{st.worker_type}]\n{st.result}"
for st in plan.subtasks if st.status == "success"
])
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "Synthesize multiple Worker outputs into one complete, coherent final answer. Remove duplicates, resolve conflicts."},
{"role": "user", "content": f"Original task: {task}\n\nWorker outputs:\n{results_str}"}
],
temperature=0.3,
max_tokens=4096
)
return response.choices[0].message.content
async def run(self, task: str) -> dict:
print(f"\n{'='*60}\nOrchestrator Agent\nTask: {task[:80]}\n{'='*60}")
plan = await self._create_plan(task)
completed = {}
while not plan.is_complete():
ready = plan.get_ready_tasks(completed)
if not ready:
break
print(f"\n[Orchestrator] Dispatching {len(ready)} tasks in parallel")
results = await asyncio.gather(
*[self._dispatch(st, completed) for st in ready],
return_exceptions=True
)
for subtask, result in zip(ready, results):
if isinstance(result, Exception):
subtask.status = "failed"
subtask.error = str(result)
print(f" x Task {subtask.task_id} failed: {subtask.error}")
else:
tid, tres = result
subtask.status = "success"
subtask.result = tres
completed[tid] = tres
print(f" + Task {tid} done ({len(tres)} chars)")
print(f"\n[Orchestrator] Synthesizing {len(completed)} results...")
final = await self._synthesize(task, plan, completed)
success = sum(1 for t in plan.subtasks if t.status == "success")
failed = sum(1 for t in plan.subtasks if t.status == "failed")
return {
"success": failed == 0,
"answer": final,
"subtasks_total": len(plan.subtasks),
"subtasks_succeeded": success,
"subtasks_failed": failed
}
async def main():
orchestrator = OrchestratorAgent()
result = await orchestrator.run(
"Generate a competitive analysis report for the 'AI coding assistant' market. "
"Research GitHub Copilot, Cursor, Tabnine, and CodeWhisperer. "
"Compare features, pricing, and market share. "
"Produce a complete report with executive summary, competitive matrix, and strategic recommendations."
)
print(f"\nFinal Report (excerpt):\n{result['answer'][:1000]}...")
print(f"\nCompleted {result['subtasks_succeeded']}/{result['subtasks_total']} subtasks")
if __name__ == "__main__":
asyncio.run(main())
54.4 Result Synthesis and Conflict Resolution
| Conflict Type | Example | Resolution Strategy |
|---|---|---|
| Factual conflict | Research says market size is $10B; Analysis says $8B | Prefer the source with citation |
| Conclusion conflict | Research says market saturated; Analysis says strong growth | Preserve both views, note sources |
| Format conflict | Different Workers use different units/formats | Normalize to a unified format |
| Content duplication | Multiple Workers described the same thing | Merge, keep the most detailed version |
Summary
This chapter provided a thorough exploration of the Orchestrator pattern:
- Separation of responsibilities: Orchestrator handles strategic dispatch; Workers handle specialized executionโeach focused on what they do best.
- Task decomposition strategies: Split by domain expertise, tool dependency, parallelism, and sequential dependency.
- Worker specialization: Research, Analysis, and Writing Workers each have dedicated system prompts and tool sets.
- Parallel execution: Independent subtasks dispatched concurrently via
asyncio.gathersignificantly improves throughput. - Conflict resolution: Orchestrator identifies and resolves factual conflicts and content duplication during synthesis.
Review Questions
- How should the Orchestrator handle a Worker returning low-quality results? Retry directly, or ask another Worker to verify?
- If tasks require inter-Worker communication (e.g., Research Worker needs to ask Analysis Worker a specific question), how can this be achieved under the current isolated architecture?
- How do you validate the quality of the Orchestrator's dispatch plan? What types of plans can LLM planners get wrong?
- As the number of Workers grows (beyond 10), how do you keep the Orchestrator's system prompt concise without losing completeness?