Chapter 54

Multi-Agent Collaboration: Orchestrator Pattern

Chapter 54: Multi-Agent Collaboration: Orchestrator Pattern

Introduction

Even a highly capable single Agent struggles when facing tasks requiring multi-domain expertise. Just as companies have a CEO handling strategic decisions while department experts handle execution—the Orchestrator pattern brings this management philosophy to Agent systems: one master Agent (Orchestrator) understands the overall task and crafts dispatch strategies, while multiple specialized Worker Agents each focus on their specific subtasks. This chapter explores the Orchestrator pattern's design principles, Hermes implementation, and a complete three-Agent collaboration case study.


54.1 Orchestrator Design Philosophy

Core Idea: Separation of Responsibilities

The Orchestrator pattern fundamentally separates concerns:

This separation brings key advantages:

  1. Specialization: Each Worker can be optimized with tailored system prompts and tool sets
  2. Parallelism: Independent subtasks run concurrently
  3. Replaceability: Workers can be upgraded without changing Orchestrator logic
  4. Observability: Orchestrator provides a clear task decomposition view
flowchart TD
    User([User]) --> Orch[Orchestrator\nMaster Agent]
    
    Orch --> |"Task A: Research"| W1[Research Worker\nSearch Expert]
    Orch --> |"Task B: Analysis"| W2[Analysis Worker\nData Expert]
    Orch --> |"Task C: Writing"| W3[Writing Worker\nContent Expert]
    
    W1 --> |Result A| Orch
    W2 --> |Result B| Orch
    W3 --> |Result C| Orch
    
    Orch --> |"Final Report"| User
    
    W1 --> WebSearch[(Web Search)]
    W2 --> CodeExec[(Code Execution)]
    W3 --> DocTools[(Document Tools)]

Isolation and Communication Constraints in Hermes

In the current Hermes Agent architecture, sub-Agents are mutually isolated:

This is both a limitation and an advantage—isolation ensures sub-Agents don't interfere with each other, and each can be independently scaled and tested.


54.2 Task Decomposition Strategies

Decomposition Dimension Description Example
Domain expertise Split by required knowledge Legal question → Legal Agent, Tech question → Code Agent
Tool dependency Split by required toolset Search tasks → Web Agent, Compute tasks → Data Agent
Parallelism Independent subtasks run concurrently Simultaneously research multiple topics
Sequential dependency Tasks requiring prior results run serially Search facts first, then write based on facts
Data volume Partition by data size 100 URLs split into 10 groups of 10

54.3 Complete Implementation

# orchestrator_agent.py
import asyncio
import json
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI


@dataclass
class SubTask:
    task_id: str
    worker_type: str
    title: str
    description: str
    inputs: dict = field(default_factory=dict)
    priority: int = 0
    status: str = "pending"
    result: Optional[str] = None
    error: Optional[str] = None


@dataclass
class OrchestrationPlan:
    overall_goal: str
    subtasks: list
    
    def get_ready_tasks(self, completed: dict) -> list:
        return [
            t for t in self.subtasks
            if t.status == "pending" and all(dep in completed for dep in t.inputs.keys())
        ]
    
    def is_complete(self) -> bool:
        return all(t.status in ("success", "failed") for t in self.subtasks)


class ResearchWorker:
    SYSTEM = """You are a professional information researcher.
Your responsibilities:
- Systematically search and collect relevant information
- Verify accuracy and timeliness of information
- Summarize key findings from multiple sources
- Provide concise, well-sourced research summaries

Output format:
1. Key findings (3-5 points)
2. Data support (specific numbers, dates, sources)
3. Information gaps (what couldn't be verified)"""
    
    def __init__(self, client: AsyncOpenAI, model: str):
        self.client = client
        self.model = model
    
    async def execute(self, task: SubTask) -> str:
        context = ""
        if task.inputs:
            context = "\n\nAvailable prior results:\n" + json.dumps(task.inputs)
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.SYSTEM},
                {"role": "user", "content": f"Research task: {task.title}\n\nDetails: {task.description}{context}"}
            ],
            temperature=0.2,
            max_tokens=2048
        )
        return response.choices[0].message.content or "Research complete"


class AnalysisWorker:
    SYSTEM = """You are a professional data analyst.
Your responsibilities:
- Process and analyze structured/unstructured data
- Identify trends, patterns, and anomalies
- Generate executable Python analysis code when needed
- Provide data-driven insights and conclusions

Output format:
1. Main findings (data-supported conclusions)
2. Analysis method used
3. Key metrics (specific numbers)
4. Limitations (data quality or sample issues)"""
    
    def __init__(self, client: AsyncOpenAI, model: str):
        self.client = client
        self.model = model
    
    async def execute(self, task: SubTask) -> str:
        input_data = ""
        if task.inputs:
            input_data = "\n\nAvailable input data:\n" + json.dumps(task.inputs, indent=2)
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.SYSTEM},
                {"role": "user", "content": f"Analysis task: {task.title}\n\nRequirements: {task.description}{input_data}"}
            ],
            temperature=0.1,
            max_tokens=2048
        )
        return response.choices[0].message.content or "Analysis complete"


class WritingWorker:
    SYSTEM = """You are a professional technical writer.
Your responsibilities:
- Transform research findings and data analysis into clear, readable content
- Maintain logical structure and smooth prose
- Ensure content accuracy and professionalism
- Adapt language style to target audience

Writing principles:
- Every argument must be supported by data or facts
- Use clear heading hierarchy to organize content
- Avoid empty filler phrases; deliver direct value"""
    
    def __init__(self, client: AsyncOpenAI, model: str):
        self.client = client
        self.model = model
    
    async def execute(self, task: SubTask) -> str:
        input_content = "\n\n".join([
            f"=== Source {tid} ===\n{content}"
            for tid, content in task.inputs.items()
            if content
        ])
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.SYSTEM},
                {"role": "user", "content": f"Writing task: {task.title}\n\nRequirements: {task.description}\n\nSource material:\n{input_content}"}
            ],
            temperature=0.4,
            max_tokens=4096
        )
        return response.choices[0].message.content or "Writing complete"


class OrchestratorAgent:
    ORCHESTRATOR_SYSTEM = """You are a task coordination expert (Orchestrator).
Analyze complex tasks and decompose them into subtasks for specialized Workers.

Available Worker types:
- research: Information search, fact-checking, data collection
- analysis: Data processing, statistical analysis, trend identification
- writing: Content creation, report writing, language optimization

Output JSON format:
{
  "subtasks": [
    {
      "task_id": "task_001",
      "worker_type": "research",
      "title": "Subtask title",
      "description": "Detailed task description",
      "depends_on": [],
      "priority": 1
    }
  ]
}

Principles:
1. Assign tasks to the most appropriate Worker type
2. Mark dependencies via depends_on (list task_ids of prerequisite tasks)
3. Tasks without dependencies can run in parallel
4. Descriptions must be specific enough for Workers to act without clarification"""
    
    def __init__(
        self,
        model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
        base_url: str = "http://localhost:8000/v1",
        api_key: str = "not-needed"
    ):
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        self.model = model
        self.workers = {
            "research": ResearchWorker(self.client, model),
            "analysis": AnalysisWorker(self.client, model),
            "writing": WritingWorker(self.client, model)
        }
    
    async def _create_plan(self, task: str) -> OrchestrationPlan:
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.ORCHESTRATOR_SYSTEM},
                {"role": "user", "content": f"Create a Worker assignment plan for:\n\n{task}"}
            ],
            response_format={"type": "json_object"},
            temperature=0.2,
            max_tokens=2048
        )
        
        data = json.loads(response.choices[0].message.content)
        subtasks = [
            SubTask(
                task_id=s["task_id"],
                worker_type=s["worker_type"],
                title=s["title"],
                description=s["description"],
                inputs={dep: None for dep in s.get("depends_on", [])},
                priority=s.get("priority", 0)
            )
            for s in data["subtasks"]
        ]
        
        print(f"\n[Orchestrator] Created {len(subtasks)}-task plan:")
        for st in subtasks:
            deps = list(st.inputs.keys())
            parallel = "" if deps else " (parallel)"
            print(f"  [{st.task_id}] {st.worker_type}: {st.title}{parallel}")
        
        return OrchestrationPlan(overall_goal=task, subtasks=subtasks)
    
    async def _dispatch(self, subtask: SubTask, completed: dict) -> tuple:
        for dep_id in list(subtask.inputs.keys()):
            if dep_id in completed:
                subtask.inputs[dep_id] = completed[dep_id]
        
        worker = self.workers.get(subtask.worker_type)
        if not worker:
            raise ValueError(f"Unknown worker type: {subtask.worker_type}")
        
        print(f"  → Dispatching to {subtask.worker_type}: {subtask.title}")
        result = await worker.execute(subtask)
        return subtask.task_id, result
    
    async def _synthesize(self, task: str, plan: OrchestrationPlan, completed: dict) -> str:
        results_str = "\n\n".join([
            f"### {st.title} [{st.worker_type}]\n{st.result}"
            for st in plan.subtasks if st.status == "success"
        ])
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "Synthesize multiple Worker outputs into one complete, coherent final answer. Remove duplicates, resolve conflicts."},
                {"role": "user", "content": f"Original task: {task}\n\nWorker outputs:\n{results_str}"}
            ],
            temperature=0.3,
            max_tokens=4096
        )
        return response.choices[0].message.content
    
    async def run(self, task: str) -> dict:
        print(f"\n{'='*60}\nOrchestrator Agent\nTask: {task[:80]}\n{'='*60}")
        
        plan = await self._create_plan(task)
        completed = {}
        
        while not plan.is_complete():
            ready = plan.get_ready_tasks(completed)
            if not ready:
                break
            
            print(f"\n[Orchestrator] Dispatching {len(ready)} tasks in parallel")
            
            results = await asyncio.gather(
                *[self._dispatch(st, completed) for st in ready],
                return_exceptions=True
            )
            
            for subtask, result in zip(ready, results):
                if isinstance(result, Exception):
                    subtask.status = "failed"
                    subtask.error = str(result)
                    print(f"  x Task {subtask.task_id} failed: {subtask.error}")
                else:
                    tid, tres = result
                    subtask.status = "success"
                    subtask.result = tres
                    completed[tid] = tres
                    print(f"  + Task {tid} done ({len(tres)} chars)")
        
        print(f"\n[Orchestrator] Synthesizing {len(completed)} results...")
        final = await self._synthesize(task, plan, completed)
        
        success = sum(1 for t in plan.subtasks if t.status == "success")
        failed = sum(1 for t in plan.subtasks if t.status == "failed")
        
        return {
            "success": failed == 0,
            "answer": final,
            "subtasks_total": len(plan.subtasks),
            "subtasks_succeeded": success,
            "subtasks_failed": failed
        }


async def main():
    orchestrator = OrchestratorAgent()
    result = await orchestrator.run(
        "Generate a competitive analysis report for the 'AI coding assistant' market. "
        "Research GitHub Copilot, Cursor, Tabnine, and CodeWhisperer. "
        "Compare features, pricing, and market share. "
        "Produce a complete report with executive summary, competitive matrix, and strategic recommendations."
    )
    print(f"\nFinal Report (excerpt):\n{result['answer'][:1000]}...")
    print(f"\nCompleted {result['subtasks_succeeded']}/{result['subtasks_total']} subtasks")


if __name__ == "__main__":
    asyncio.run(main())

54.4 Result Synthesis and Conflict Resolution

Conflict Type Example Resolution Strategy
Factual conflict Research says market size is $10B; Analysis says $8B Prefer the source with citation
Conclusion conflict Research says market saturated; Analysis says strong growth Preserve both views, note sources
Format conflict Different Workers use different units/formats Normalize to a unified format
Content duplication Multiple Workers described the same thing Merge, keep the most detailed version

Summary

This chapter provided a thorough exploration of the Orchestrator pattern:

  1. Separation of responsibilities: Orchestrator handles strategic dispatch; Workers handle specialized execution—each focused on what they do best.
  2. Task decomposition strategies: Split by domain expertise, tool dependency, parallelism, and sequential dependency.
  3. Worker specialization: Research, Analysis, and Writing Workers each have dedicated system prompts and tool sets.
  4. Parallel execution: Independent subtasks dispatched concurrently via asyncio.gather significantly improves throughput.
  5. Conflict resolution: Orchestrator identifies and resolves factual conflicts and content duplication during synthesis.

Review Questions

  1. How should the Orchestrator handle a Worker returning low-quality results? Retry directly, or ask another Worker to verify?
  2. If tasks require inter-Worker communication (e.g., Research Worker needs to ask Analysis Worker a specific question), how can this be achieved under the current isolated architecture?
  3. How do you validate the quality of the Orchestrator's dispatch plan? What types of plans can LLM planners get wrong?
  4. As the number of Workers grows (beyond 10), how do you keep the Orchestrator's system prompt concise without losing completeness?
Rate this chapter
4.7  / 5  (3 ratings)

💬 Comments