第 54 章

多 Agent 协作:Orchestrator 模式

第54章:多 Agent 协作:Orchestrator 模式

导语

单个 Agent 即使能力强大,面对需要多领域专业知识的复杂任务时也会力不从心。就像企业中,CEO 负责战略决策,各部门专家负责具体执行——Orchestrator 模式将这个管理哲学引入 Agent 系统:一个主控 Agent(Orchestrator)负责理解任务全局、制定分发策略,多个专用 Worker Agent 各司其职,完成具体子任务。本章深入讲解 Orchestrator 模式的设计原理、Hermes 实现方式,以及一个完整的三 Agent 协同实战案例。


54.1 Orchestrator 设计哲学

核心思想:职责分离

Orchestrator 模式的本质是职责分离

这种分离带来几个关键优势:

  1. 专业化:每个 Worker 可以针对特定任务优化系统提示词和工具集
  2. 并行性:无依赖关系的子任务可以并行执行
  3. 可替换性:可以在不改变 Orchestrator 逻辑的情况下升级或替换 Worker
  4. 可观测性:Orchestrator 提供清晰的任务分解视图
flowchart TD
    User([用户]) --> Orch[Orchestrator\n主控 Agent]
    
    Orch --> |"任务A: 市场调研"| W1[Research Worker\n搜索专家]
    Orch --> |"任务B: 数据分析"| W2[Analysis Worker\n数据专家]
    Orch --> |"任务C: 撰写报告"| W3[Writing Worker\n写作专家]
    
    W1 --> |结果A| Orch
    W2 --> |结果B| Orch
    W3 --> |结果C| Orch
    
    Orch --> |"综合最终报告"| User
    
    W1 --> WebSearch[(网络搜索工具)]
    W2 --> CodeExec[(代码执行工具)]
    W3 --> DocTools[(文档工具)]

Hermes 中的独立性与通信限制

在 Hermes Agent 当前架构中,子 Agent 是相互隔离的

这既是限制,也是优势——隔离确保了子 Agent 不会相互干扰,每个子 Agent 可以独立扩展和测试。


54.2 任务分解策略

Orchestrator 的核心能力是任务分解:将复杂任务拆解为适合各个 Worker 的子任务。

分解维度

分解维度 说明 示例
领域专业性 按需要的专业知识分 法律问题 → 法律 Agent,技术问题 → 代码 Agent
工具依赖 按需要的工具集分 搜索任务 → 网络 Agent,计算任务 → 数据 Agent
并行性 无依赖关系的子任务并行 同时搜索多个主题
顺序依赖 需要前置结果的任务串行 先搜索事实,再基于事实写报告
数据规模 按数据量分片 100个URL分10组,每组10个
# orchestrator_agent.py
"""
Orchestrator 模式完整实现
一个主控 Agent 协调三个专用 Worker Agent
"""

import asyncio
import json
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI


# ─── 数据结构 ──────────────────────────────────────────────────

@dataclass
class SubTask:
    """分配给 Worker 的子任务"""
    task_id: str
    worker_type: str        # 'research' / 'analysis' / 'writing'
    title: str
    description: str
    inputs: dict = field(default_factory=dict)    # 来自其他子任务的输入
    priority: int = 0
    
    # 执行状态
    status: str = "pending"
    result: Optional[str] = None
    error: Optional[str] = None


@dataclass
class OrchestrationPlan:
    """协调计划"""
    overall_goal: str
    subtasks: list[SubTask]
    
    def get_pending_tasks(self) -> list[SubTask]:
        return [t for t in self.subtasks if t.status == "pending"]
    
    def get_parallel_tasks(self) -> list[SubTask]:
        """获取可以并行执行的任务(无外部依赖)"""
        return [t for t in self.get_pending_tasks() if not t.inputs]
    
    def get_tasks_with_inputs_ready(self) -> list[SubTask]:
        """获取所有依赖输入都已就绪的任务"""
        completed = {t.task_id: t.result for t in self.subtasks if t.status == "success"}
        ready = []
        for task in self.get_pending_tasks():
            if all(dep_id in completed for dep_id in task.inputs.keys()):
                # 填充输入数据
                task.inputs = {dep_id: completed[dep_id] for dep_id in task.inputs.keys()}
                ready.append(task)
        return ready
    
    def is_complete(self) -> bool:
        return all(t.status in ("success", "failed") for t in self.subtasks)

54.3 专用 Worker Agent 实现

# ─── Worker Agent 基类 ───────────────────────────────────────

class WorkerAgent:
    """Worker Agent 基类,专注于特定领域的任务执行"""
    
    SYSTEM_PROMPT = """你是一个专业的任务执行 Agent。专注完成分配的子任务,
提供清晰、结构化的输出供主控 Agent 使用。"""
    
    def __init__(
        self,
        client: AsyncOpenAI,
        model: str,
        tools: list = None,
        max_steps: int = 8
    ):
        self.client = client
        self.model = model
        self.tools = tools or []
        self.max_steps = max_steps
    
    async def execute(self, task: SubTask) -> str:
        """执行子任务,返回结果字符串"""
        raise NotImplementedError


# ─── 研究型 Worker ────────────────────────────────────────────

class ResearchWorker(WorkerAgent):
    """
    研究专家 Worker
    专长:网络搜索、信息收集、事实核查
    工具:web_search, webpage_fetch
    """
    
    SYSTEM_PROMPT = """你是一个专业的信息研究员。
你的职责:
- 系统性搜索和收集相关信息
- 验证信息的准确性和时效性
- 汇总多个来源的关键发现
- 提供简洁、有据可查的研究摘要

输出格式:
1. 关键发现(3-5条)
2. 数据支持(具体数字、日期、来源)
3. 信息缺口(哪些方面未找到可靠资料)"""
    
    async def execute(self, task: SubTask) -> str:
        context = ""
        if task.inputs:
            context = "\n\n可参考的前序结果:\n" + json.dumps(task.inputs, ensure_ascii=False)
        
        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT},
            {
                "role": "user",
                "content": f"""研究任务:{task.title}

任务详情:{task.description}
{context}

请进行系统性研究并提供完整的研究报告。"""
            }
        ]
        
        # 内部 ReAct 循环(专注于搜索)
        for _ in range(self.max_steps):
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.tools if self.tools else None,
                tool_choice="auto" if self.tools else None,
                temperature=0.2,
                max_tokens=2048
            )
            
            msg = response.choices[0].message
            
            if not msg.tool_calls:
                return msg.content or "研究完成,无明确输出"
            
            # 处理工具调用(简化示例)
            tool_results = [
                {"role": "tool", "tool_call_id": tc.id, "content": f"[{tc.function.name} 搜索结果]"}
                for tc in msg.tool_calls
            ]
            messages.append({"role": "assistant", "content": msg.content, 
                           "tool_calls": [tc.model_dump() for tc in msg.tool_calls]})
            messages.extend(tool_results)
        
        return "研究完成(达到最大步骤数)"


# ─── 分析型 Worker ────────────────────────────────────────────

class AnalysisWorker(WorkerAgent):
    """
    数据分析专家 Worker
    专长:数据处理、统计分析、趋势识别、可视化
    工具:code_exec, data_query
    """
    
    SYSTEM_PROMPT = """你是一个专业的数据分析师。
你的职责:
- 处理和分析结构化/非结构化数据
- 识别趋势、模式和异常
- 生成可执行的 Python 分析代码
- 提供基于数据的洞察和结论

输出格式:
1. 主要发现(数据支撑的结论)
2. 分析方法(使用了什么分析方法)
3. 关键指标(具体数字)
4. 局限性(数据质量或样本问题)"""
    
    async def execute(self, task: SubTask) -> str:
        # 将输入数据融入分析任务
        input_data = ""
        if task.inputs:
            input_data = "\n\n可用的输入数据:\n" + json.dumps(task.inputs, ensure_ascii=False, indent=2)
        
        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT},
            {
                "role": "user",
                "content": f"""分析任务:{task.title}

任务要求:{task.description}
{input_data}

请进行深入分析并提供数据驱动的洞察。"""
            }
        ]
        
        for _ in range(self.max_steps):
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.tools if self.tools else None,
                tool_choice="auto" if self.tools else None,
                temperature=0.1,
                max_tokens=2048
            )
            
            msg = response.choices[0].message
            
            if not msg.tool_calls:
                return msg.content or "分析完成"
            
            tool_results = [
                {"role": "tool", "tool_call_id": tc.id, "content": f"[代码执行结果]"}
                for tc in msg.tool_calls
            ]
            messages.append({"role": "assistant", "content": msg.content,
                           "tool_calls": [tc.model_dump() for tc in msg.tool_calls]})
            messages.extend(tool_results)
        
        return "分析完成(达到最大步骤数)"


# ─── 写作型 Worker ────────────────────────────────────────────

class WritingWorker(WorkerAgent):
    """
    专业写作 Worker
    专长:结构化写作、内容组织、语言优化
    工具:(较少使用,主要依赖 LLM 能力)
    """
    
    SYSTEM_PROMPT = """你是一个专业的技术作家和内容创作者。
你的职责:
- 将研究发现和数据分析转化为清晰易读的内容
- 保持逻辑结构清晰、行文流畅
- 确保内容的准确性和专业性
- 针对目标读者调整语言风格

写作原则:
- 每个论点都要有数据或事实支撑
- 使用清晰的标题层级组织内容
- 避免空洞的套话,直接呈现价值"""
    
    async def execute(self, task: SubTask) -> str:
        # 整合所有输入内容
        input_content = ""
        for task_id, content in task.inputs.items():
            input_content += f"\n\n=== 来源 {task_id} ===\n{content}"
        
        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT},
            {
                "role": "user",
                "content": f"""写作任务:{task.title}

任务要求:{task.description}

可用素材:
{input_content}

请基于以上素材,创作高质量的内容。"""
            }
        ]
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=0.4,    # 写作任务允许更高创造性
            max_tokens=4096
        )
        
        return response.choices[0].message.content or "写作完成"


# ─── Orchestrator 主控 Agent ──────────────────────────────────

class OrchestratorAgent:
    """
    Orchestrator 主控 Agent
    
    职责:
    1. 理解任务全局
    2. 制定分配策略(哪个任务给哪个 Worker)
    3. 管理并行执行
    4. 汇总结果并处理冲突
    5. 生成最终输出
    """
    
    ORCHESTRATOR_SYSTEM = """你是一个任务协调专家(Orchestrator)。
你的职责是分析复杂任务,将其分解为适合专业 Worker 的子任务。

可用的 Worker 类型:
- research: 擅长信息搜索、事实核查、资料收集
- analysis: 擅长数据处理、统计分析、趋势识别
- writing: 擅长内容撰写、报告创作、语言优化

输出格式(JSON):
{
  "subtasks": [
    {
      "task_id": "task_001",
      "worker_type": "research",
      "title": "子任务标题",
      "description": "详细任务描述",
      "depends_on": [],
      "priority": 1
    }
  ]
}

原则:
1. 合理分配任务给最适合的 Worker
2. 标注任务间的依赖关系(depends_on 填写前置任务的 task_id)
3. 无依赖关系的任务不要添加 depends_on(可并行执行)
4. 任务描述要足够具体"""
    
    def __init__(
        self,
        model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
        base_url: str = "http://localhost:8000/v1",
        api_key: str = "not-needed"
    ):
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        self.model = model
        
        # 初始化 Worker Pool
        self.workers = {
            "research": ResearchWorker(self.client, model),
            "analysis": AnalysisWorker(self.client, model),
            "writing": WritingWorker(self.client, model)
        }
    
    async def _create_plan(self, task: str) -> OrchestrationPlan:
        """制定任务分配计划"""
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.ORCHESTRATOR_SYSTEM},
                {"role": "user", "content": f"请为以下任务制定 Worker 分配计划:\n\n{task}"}
            ],
            response_format={"type": "json_object"},
            temperature=0.2,
            max_tokens=2048
        )
        
        plan_data = json.loads(response.choices[0].message.content)
        subtasks = []
        
        for st_data in plan_data["subtasks"]:
            # 将 depends_on 转换为 inputs 格式
            inputs = {dep_id: None for dep_id in st_data.get("depends_on", [])}
            
            subtasks.append(SubTask(
                task_id=st_data["task_id"],
                worker_type=st_data["worker_type"],
                title=st_data["title"],
                description=st_data["description"],
                inputs=inputs,
                priority=st_data.get("priority", 0)
            ))
        
        print(f"\n[Orchestrator] 制定了 {len(subtasks)} 个子任务:")
        for st in subtasks:
            deps = list(st.inputs.keys())
            print(f"  [{st.task_id}] {st.worker_type} Worker: {st.title}"
                  + (f" (依赖: {deps})" if deps else "(可并行)"))
        
        return OrchestrationPlan(overall_goal=task, subtasks=subtasks)
    
    async def _dispatch_task(self, subtask: SubTask, completed: dict) -> tuple[str, str]:
        """将子任务分配给对应 Worker,返回 (task_id, result)"""
        # 填充依赖输入
        for dep_id in list(subtask.inputs.keys()):
            if dep_id in completed:
                subtask.inputs[dep_id] = completed[dep_id]
        
        worker = self.workers.get(subtask.worker_type)
        if worker is None:
            raise ValueError(f"未知 Worker 类型: {subtask.worker_type}")
        
        print(f"  → 分配给 {subtask.worker_type} Worker: {subtask.title}")
        result = await worker.execute(subtask)
        return subtask.task_id, result
    
    async def _synthesize(self, task: str, plan: OrchestrationPlan, completed: dict) -> str:
        """汇总所有 Worker 的结果,生成最终输出"""
        results_str = ""
        for subtask in plan.subtasks:
            if subtask.status == "success":
                results_str += f"\n\n### {subtask.title} [{subtask.worker_type}]\n{subtask.result}"
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": "你是一个结果综合专家。将多个专业 Worker 的工作成果整合为一个完整、一致的最终输出。"
                },
                {
                    "role": "user",
                    "content": f"""原始任务:{task}

各 Worker 的工作成果:
{results_str}

请将以上成果综合为完整的最终答案。确保内容连贯、去除重复、解决冲突。"""
                }
            ],
            temperature=0.3,
            max_tokens=4096
        )
        
        return response.choices[0].message.content
    
    async def run(self, task: str) -> dict:
        """
        执行完整的 Orchestrator 协调流程
        """
        print(f"\n{'='*60}")
        print(f"Orchestrator Agent")
        print(f"任务:{task[:80]}...")
        print(f"{'='*60}")
        
        # 阶段1:制定计划
        plan = await self._create_plan(task)
        completed = {}  # task_id -> result
        
        # 阶段2:执行循环
        while not plan.is_complete():
            # 找出当前可执行的任务(无依赖或依赖已满足)
            ready = [
                t for t in plan.subtasks
                if t.status == "pending" and all(
                    dep in completed for dep in t.inputs.keys()
                )
            ]
            
            if not ready:
                print("[Orchestrator] 无可执行任务,检查依赖...")
                break
            
            print(f"\n[Orchestrator] 并行分发 {len(ready)} 个子任务")
            
            # 并行执行所有就绪任务
            dispatch_tasks = [
                self._dispatch_task(st, completed) for st in ready
            ]
            results = await asyncio.gather(*dispatch_tasks, return_exceptions=True)
            
            # 处理结果
            for subtask, result in zip(ready, results):
                if isinstance(result, Exception):
                    subtask.status = "failed"
                    subtask.error = str(result)
                    print(f"  ✗ 任务 {subtask.task_id} 失败:{subtask.error}")
                else:
                    task_id, task_result = result
                    subtask.status = "success"
                    subtask.result = task_result
                    completed[task_id] = task_result
                    print(f"  ✓ 任务 {task_id} 完成({len(task_result)} 字符)")
        
        # 阶段3:结果汇总
        print(f"\n[Orchestrator] 开始汇总 {len(completed)} 个子任务的结果...")
        final_output = await self._synthesize(task, plan, completed)
        
        success_count = sum(1 for t in plan.subtasks if t.status == "success")
        failed_count = sum(1 for t in plan.subtasks if t.status == "failed")
        
        print(f"\n[Orchestrator] 完成!成功: {success_count}, 失败: {failed_count}")
        
        return {
            "success": failed_count == 0,
            "answer": final_output,
            "subtasks_total": len(plan.subtasks),
            "subtasks_succeeded": success_count,
            "subtasks_failed": failed_count,
            "worker_results": completed
        }


# ─── 实战:竞争分析报告 ────────────────────────────────────────

async def main():
    """
    实战示例:让三个专用 Worker 协作生成竞争分析报告
    - Research Worker:调研竞争对手信息
    - Analysis Worker:分析市场数据
    - Writing Worker:撰写完整报告
    """
    
    orchestrator = OrchestratorAgent(
        model="NousResearch/Hermes-3-Llama-3.1-8B",
        base_url="http://localhost:8000/v1"
    )
    
    task = """
    生成一份关于"AI 编程助手"市场的竞争分析报告,要求:
    1. 调研 GitHub Copilot、Cursor、Tabnine、CodeWhisperer 四个主要竞品
    2. 比较各产品的功能特性、定价策略、市场份额
    3. 分析市场趋势和增长机会
    4. 撰写完整的竞争分析报告(包含执行摘要、竞品矩阵、战略建议)
    """
    
    result = await orchestrator.run(task)
    
    print(f"\n{'='*60}")
    print(f"最终报告(节选):")
    print(f"{'='*60}")
    print(result["answer"][:1000])
    print("...")
    print(f"\n总计完成 {result['subtasks_succeeded']}/{result['subtasks_total']} 个子任务")


if __name__ == "__main__":
    asyncio.run(main())

54.4 结果汇总与冲突解决

当多个 Worker 的输出存在冲突时,Orchestrator 需要处理:

冲突类型

冲突类型 示例 解决策略
事实冲突 Research 说市场规模 10B,Analysis 说 8B 以有来源引用的为准
结论冲突 Research 认为市场饱和,Analysis 认为增长强劲 两种观点都保留,注明来源
格式冲突 不同 Worker 使用不同单位或格式 标准化为统一格式
内容重复 多个 Worker 描述了相同内容 合并,保留最详细的版本
async def resolve_conflicts(
    client: AsyncOpenAI,
    model: str,
    task: str,
    worker_outputs: dict
) -> str:
    """使用 LLM 解决 Worker 输出间的冲突"""
    
    outputs_str = "\n\n".join([
        f"[{worker}]: {output[:500]}"
        for worker, output in worker_outputs.items()
    ])
    
    response = await client.chat.completions.create(
        model=model,
        messages=[
            {
                "role": "system",
                "content": """分析多个来源的输出,识别并解决以下类型的冲突:
1. 事实冲突:以有明确来源的信息为准
2. 结论冲突:保留多方观点,注明来源
3. 内容重复:合并,保留最完整的版本
生成统一、一致的最终输出。"""
            },
            {
                "role": "user",
                "content": f"任务:{task}\n\n各来源输出:\n{outputs_str}"
            }
        ],
        temperature=0.2,
        max_tokens=3000
    )
    
    return response.choices[0].message.content

小结

本章深入讲解了 Orchestrator 模式的设计与实现:

  1. 职责分离:Orchestrator 负责战略分配,Worker 负责专业执行,各司其职。
  2. 任务分解策略:按领域专业性、工具依赖、并行性和顺序依赖四个维度分解任务。
  3. Worker 专业化:Research Worker(搜索)、Analysis Worker(数据)、Writing Worker(写作)各有专属系统提示词和工具集。
  4. 并行执行:无依赖关系的子任务通过 asyncio.gather 并行分发,显著提升效率。
  5. 冲突解决:Orchestrator 在汇总阶段识别并解决 Worker 输出间的事实冲突和内容重复。

思考题

  1. Orchestrator 如何处理一个 Worker 返回低质量结果的情况?应该直接重试还是请另一个 Worker 验证?
  2. 如果任务需要 Worker 之间的中间通信(如 Research Worker 需要问 Analysis Worker 一个具体问题),在当前隔离架构下如何实现?
  3. 对于 Orchestrator 制定的分配计划,应该如何验证其合理性?LLM 制定的计划可能存在什么问题?
  4. 当 Worker 数量增多时(超过10个),Orchestrator 的系统提示词如何保持简洁而不失完整?
本章评分
4.7  / 5  (3 评分)

💬 留言讨论