第 54 章
多 Agent 协作:Orchestrator 模式
第54章:多 Agent 协作:Orchestrator 模式
导语
单个 Agent 即使能力强大,面对需要多领域专业知识的复杂任务时也会力不从心。就像企业中,CEO 负责战略决策,各部门专家负责具体执行——Orchestrator 模式将这个管理哲学引入 Agent 系统:一个主控 Agent(Orchestrator)负责理解任务全局、制定分发策略,多个专用 Worker Agent 各司其职,完成具体子任务。本章深入讲解 Orchestrator 模式的设计原理、Hermes 实现方式,以及一个完整的三 Agent 协同实战案例。
54.1 Orchestrator 设计哲学
核心思想:职责分离
Orchestrator 模式的本质是职责分离:
- Orchestrator:了解"做什么"(What)和"谁来做"(Who),不负责"怎么做"(How)
- Worker Agent:了解"怎么做"(How),专注于特定领域,不关心全局任务
这种分离带来几个关键优势:
- 专业化:每个 Worker 可以针对特定任务优化系统提示词和工具集
- 并行性:无依赖关系的子任务可以并行执行
- 可替换性:可以在不改变 Orchestrator 逻辑的情况下升级或替换 Worker
- 可观测性:Orchestrator 提供清晰的任务分解视图
flowchart TD
User([用户]) --> Orch[Orchestrator\n主控 Agent]
Orch --> |"任务A: 市场调研"| W1[Research Worker\n搜索专家]
Orch --> |"任务B: 数据分析"| W2[Analysis Worker\n数据专家]
Orch --> |"任务C: 撰写报告"| W3[Writing Worker\n写作专家]
W1 --> |结果A| Orch
W2 --> |结果B| Orch
W3 --> |结果C| Orch
Orch --> |"综合最终报告"| User
W1 --> WebSearch[(网络搜索工具)]
W2 --> CodeExec[(代码执行工具)]
W3 --> DocTools[(文档工具)]
Hermes 中的独立性与通信限制
在 Hermes Agent 当前架构中,子 Agent 是相互隔离的:
- 每个子 Agent 有独立的上下文窗口
- 子 Agent 之间没有直接通信通道
- 所有协调由 Orchestrator 通过消息传递完成
这既是限制,也是优势——隔离确保了子 Agent 不会相互干扰,每个子 Agent 可以独立扩展和测试。
54.2 任务分解策略
Orchestrator 的核心能力是任务分解:将复杂任务拆解为适合各个 Worker 的子任务。
分解维度
| 分解维度 | 说明 | 示例 |
|---|---|---|
| 领域专业性 | 按需要的专业知识分 | 法律问题 → 法律 Agent,技术问题 → 代码 Agent |
| 工具依赖 | 按需要的工具集分 | 搜索任务 → 网络 Agent,计算任务 → 数据 Agent |
| 并行性 | 无依赖关系的子任务并行 | 同时搜索多个主题 |
| 顺序依赖 | 需要前置结果的任务串行 | 先搜索事实,再基于事实写报告 |
| 数据规模 | 按数据量分片 | 100个URL分10组,每组10个 |
# orchestrator_agent.py
"""
Orchestrator 模式完整实现
一个主控 Agent 协调三个专用 Worker Agent
"""
import asyncio
import json
from typing import Optional
from dataclasses import dataclass, field
from openai import AsyncOpenAI
# ─── 数据结构 ──────────────────────────────────────────────────
@dataclass
class SubTask:
"""分配给 Worker 的子任务"""
task_id: str
worker_type: str # 'research' / 'analysis' / 'writing'
title: str
description: str
inputs: dict = field(default_factory=dict) # 来自其他子任务的输入
priority: int = 0
# 执行状态
status: str = "pending"
result: Optional[str] = None
error: Optional[str] = None
@dataclass
class OrchestrationPlan:
"""协调计划"""
overall_goal: str
subtasks: list[SubTask]
def get_pending_tasks(self) -> list[SubTask]:
return [t for t in self.subtasks if t.status == "pending"]
def get_parallel_tasks(self) -> list[SubTask]:
"""获取可以并行执行的任务(无外部依赖)"""
return [t for t in self.get_pending_tasks() if not t.inputs]
def get_tasks_with_inputs_ready(self) -> list[SubTask]:
"""获取所有依赖输入都已就绪的任务"""
completed = {t.task_id: t.result for t in self.subtasks if t.status == "success"}
ready = []
for task in self.get_pending_tasks():
if all(dep_id in completed for dep_id in task.inputs.keys()):
# 填充输入数据
task.inputs = {dep_id: completed[dep_id] for dep_id in task.inputs.keys()}
ready.append(task)
return ready
def is_complete(self) -> bool:
return all(t.status in ("success", "failed") for t in self.subtasks)
54.3 专用 Worker Agent 实现
# ─── Worker Agent 基类 ───────────────────────────────────────
class WorkerAgent:
"""Worker Agent 基类,专注于特定领域的任务执行"""
SYSTEM_PROMPT = """你是一个专业的任务执行 Agent。专注完成分配的子任务,
提供清晰、结构化的输出供主控 Agent 使用。"""
def __init__(
self,
client: AsyncOpenAI,
model: str,
tools: list = None,
max_steps: int = 8
):
self.client = client
self.model = model
self.tools = tools or []
self.max_steps = max_steps
async def execute(self, task: SubTask) -> str:
"""执行子任务,返回结果字符串"""
raise NotImplementedError
# ─── 研究型 Worker ────────────────────────────────────────────
class ResearchWorker(WorkerAgent):
"""
研究专家 Worker
专长:网络搜索、信息收集、事实核查
工具:web_search, webpage_fetch
"""
SYSTEM_PROMPT = """你是一个专业的信息研究员。
你的职责:
- 系统性搜索和收集相关信息
- 验证信息的准确性和时效性
- 汇总多个来源的关键发现
- 提供简洁、有据可查的研究摘要
输出格式:
1. 关键发现(3-5条)
2. 数据支持(具体数字、日期、来源)
3. 信息缺口(哪些方面未找到可靠资料)"""
async def execute(self, task: SubTask) -> str:
context = ""
if task.inputs:
context = "\n\n可参考的前序结果:\n" + json.dumps(task.inputs, ensure_ascii=False)
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT},
{
"role": "user",
"content": f"""研究任务:{task.title}
任务详情:{task.description}
{context}
请进行系统性研究并提供完整的研究报告。"""
}
]
# 内部 ReAct 循环(专注于搜索)
for _ in range(self.max_steps):
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools if self.tools else None,
tool_choice="auto" if self.tools else None,
temperature=0.2,
max_tokens=2048
)
msg = response.choices[0].message
if not msg.tool_calls:
return msg.content or "研究完成,无明确输出"
# 处理工具调用(简化示例)
tool_results = [
{"role": "tool", "tool_call_id": tc.id, "content": f"[{tc.function.name} 搜索结果]"}
for tc in msg.tool_calls
]
messages.append({"role": "assistant", "content": msg.content,
"tool_calls": [tc.model_dump() for tc in msg.tool_calls]})
messages.extend(tool_results)
return "研究完成(达到最大步骤数)"
# ─── 分析型 Worker ────────────────────────────────────────────
class AnalysisWorker(WorkerAgent):
"""
数据分析专家 Worker
专长:数据处理、统计分析、趋势识别、可视化
工具:code_exec, data_query
"""
SYSTEM_PROMPT = """你是一个专业的数据分析师。
你的职责:
- 处理和分析结构化/非结构化数据
- 识别趋势、模式和异常
- 生成可执行的 Python 分析代码
- 提供基于数据的洞察和结论
输出格式:
1. 主要发现(数据支撑的结论)
2. 分析方法(使用了什么分析方法)
3. 关键指标(具体数字)
4. 局限性(数据质量或样本问题)"""
async def execute(self, task: SubTask) -> str:
# 将输入数据融入分析任务
input_data = ""
if task.inputs:
input_data = "\n\n可用的输入数据:\n" + json.dumps(task.inputs, ensure_ascii=False, indent=2)
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT},
{
"role": "user",
"content": f"""分析任务:{task.title}
任务要求:{task.description}
{input_data}
请进行深入分析并提供数据驱动的洞察。"""
}
]
for _ in range(self.max_steps):
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools if self.tools else None,
tool_choice="auto" if self.tools else None,
temperature=0.1,
max_tokens=2048
)
msg = response.choices[0].message
if not msg.tool_calls:
return msg.content or "分析完成"
tool_results = [
{"role": "tool", "tool_call_id": tc.id, "content": f"[代码执行结果]"}
for tc in msg.tool_calls
]
messages.append({"role": "assistant", "content": msg.content,
"tool_calls": [tc.model_dump() for tc in msg.tool_calls]})
messages.extend(tool_results)
return "分析完成(达到最大步骤数)"
# ─── 写作型 Worker ────────────────────────────────────────────
class WritingWorker(WorkerAgent):
"""
专业写作 Worker
专长:结构化写作、内容组织、语言优化
工具:(较少使用,主要依赖 LLM 能力)
"""
SYSTEM_PROMPT = """你是一个专业的技术作家和内容创作者。
你的职责:
- 将研究发现和数据分析转化为清晰易读的内容
- 保持逻辑结构清晰、行文流畅
- 确保内容的准确性和专业性
- 针对目标读者调整语言风格
写作原则:
- 每个论点都要有数据或事实支撑
- 使用清晰的标题层级组织内容
- 避免空洞的套话,直接呈现价值"""
async def execute(self, task: SubTask) -> str:
# 整合所有输入内容
input_content = ""
for task_id, content in task.inputs.items():
input_content += f"\n\n=== 来源 {task_id} ===\n{content}"
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT},
{
"role": "user",
"content": f"""写作任务:{task.title}
任务要求:{task.description}
可用素材:
{input_content}
请基于以上素材,创作高质量的内容。"""
}
]
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.4, # 写作任务允许更高创造性
max_tokens=4096
)
return response.choices[0].message.content or "写作完成"
# ─── Orchestrator 主控 Agent ──────────────────────────────────
class OrchestratorAgent:
"""
Orchestrator 主控 Agent
职责:
1. 理解任务全局
2. 制定分配策略(哪个任务给哪个 Worker)
3. 管理并行执行
4. 汇总结果并处理冲突
5. 生成最终输出
"""
ORCHESTRATOR_SYSTEM = """你是一个任务协调专家(Orchestrator)。
你的职责是分析复杂任务,将其分解为适合专业 Worker 的子任务。
可用的 Worker 类型:
- research: 擅长信息搜索、事实核查、资料收集
- analysis: 擅长数据处理、统计分析、趋势识别
- writing: 擅长内容撰写、报告创作、语言优化
输出格式(JSON):
{
"subtasks": [
{
"task_id": "task_001",
"worker_type": "research",
"title": "子任务标题",
"description": "详细任务描述",
"depends_on": [],
"priority": 1
}
]
}
原则:
1. 合理分配任务给最适合的 Worker
2. 标注任务间的依赖关系(depends_on 填写前置任务的 task_id)
3. 无依赖关系的任务不要添加 depends_on(可并行执行)
4. 任务描述要足够具体"""
def __init__(
self,
model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
base_url: str = "http://localhost:8000/v1",
api_key: str = "not-needed"
):
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
self.model = model
# 初始化 Worker Pool
self.workers = {
"research": ResearchWorker(self.client, model),
"analysis": AnalysisWorker(self.client, model),
"writing": WritingWorker(self.client, model)
}
async def _create_plan(self, task: str) -> OrchestrationPlan:
"""制定任务分配计划"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.ORCHESTRATOR_SYSTEM},
{"role": "user", "content": f"请为以下任务制定 Worker 分配计划:\n\n{task}"}
],
response_format={"type": "json_object"},
temperature=0.2,
max_tokens=2048
)
plan_data = json.loads(response.choices[0].message.content)
subtasks = []
for st_data in plan_data["subtasks"]:
# 将 depends_on 转换为 inputs 格式
inputs = {dep_id: None for dep_id in st_data.get("depends_on", [])}
subtasks.append(SubTask(
task_id=st_data["task_id"],
worker_type=st_data["worker_type"],
title=st_data["title"],
description=st_data["description"],
inputs=inputs,
priority=st_data.get("priority", 0)
))
print(f"\n[Orchestrator] 制定了 {len(subtasks)} 个子任务:")
for st in subtasks:
deps = list(st.inputs.keys())
print(f" [{st.task_id}] {st.worker_type} Worker: {st.title}"
+ (f" (依赖: {deps})" if deps else "(可并行)"))
return OrchestrationPlan(overall_goal=task, subtasks=subtasks)
async def _dispatch_task(self, subtask: SubTask, completed: dict) -> tuple[str, str]:
"""将子任务分配给对应 Worker,返回 (task_id, result)"""
# 填充依赖输入
for dep_id in list(subtask.inputs.keys()):
if dep_id in completed:
subtask.inputs[dep_id] = completed[dep_id]
worker = self.workers.get(subtask.worker_type)
if worker is None:
raise ValueError(f"未知 Worker 类型: {subtask.worker_type}")
print(f" → 分配给 {subtask.worker_type} Worker: {subtask.title}")
result = await worker.execute(subtask)
return subtask.task_id, result
async def _synthesize(self, task: str, plan: OrchestrationPlan, completed: dict) -> str:
"""汇总所有 Worker 的结果,生成最终输出"""
results_str = ""
for subtask in plan.subtasks:
if subtask.status == "success":
results_str += f"\n\n### {subtask.title} [{subtask.worker_type}]\n{subtask.result}"
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "你是一个结果综合专家。将多个专业 Worker 的工作成果整合为一个完整、一致的最终输出。"
},
{
"role": "user",
"content": f"""原始任务:{task}
各 Worker 的工作成果:
{results_str}
请将以上成果综合为完整的最终答案。确保内容连贯、去除重复、解决冲突。"""
}
],
temperature=0.3,
max_tokens=4096
)
return response.choices[0].message.content
async def run(self, task: str) -> dict:
"""
执行完整的 Orchestrator 协调流程
"""
print(f"\n{'='*60}")
print(f"Orchestrator Agent")
print(f"任务:{task[:80]}...")
print(f"{'='*60}")
# 阶段1:制定计划
plan = await self._create_plan(task)
completed = {} # task_id -> result
# 阶段2:执行循环
while not plan.is_complete():
# 找出当前可执行的任务(无依赖或依赖已满足)
ready = [
t for t in plan.subtasks
if t.status == "pending" and all(
dep in completed for dep in t.inputs.keys()
)
]
if not ready:
print("[Orchestrator] 无可执行任务,检查依赖...")
break
print(f"\n[Orchestrator] 并行分发 {len(ready)} 个子任务")
# 并行执行所有就绪任务
dispatch_tasks = [
self._dispatch_task(st, completed) for st in ready
]
results = await asyncio.gather(*dispatch_tasks, return_exceptions=True)
# 处理结果
for subtask, result in zip(ready, results):
if isinstance(result, Exception):
subtask.status = "failed"
subtask.error = str(result)
print(f" ✗ 任务 {subtask.task_id} 失败:{subtask.error}")
else:
task_id, task_result = result
subtask.status = "success"
subtask.result = task_result
completed[task_id] = task_result
print(f" ✓ 任务 {task_id} 完成({len(task_result)} 字符)")
# 阶段3:结果汇总
print(f"\n[Orchestrator] 开始汇总 {len(completed)} 个子任务的结果...")
final_output = await self._synthesize(task, plan, completed)
success_count = sum(1 for t in plan.subtasks if t.status == "success")
failed_count = sum(1 for t in plan.subtasks if t.status == "failed")
print(f"\n[Orchestrator] 完成!成功: {success_count}, 失败: {failed_count}")
return {
"success": failed_count == 0,
"answer": final_output,
"subtasks_total": len(plan.subtasks),
"subtasks_succeeded": success_count,
"subtasks_failed": failed_count,
"worker_results": completed
}
# ─── 实战:竞争分析报告 ────────────────────────────────────────
async def main():
"""
实战示例:让三个专用 Worker 协作生成竞争分析报告
- Research Worker:调研竞争对手信息
- Analysis Worker:分析市场数据
- Writing Worker:撰写完整报告
"""
orchestrator = OrchestratorAgent(
model="NousResearch/Hermes-3-Llama-3.1-8B",
base_url="http://localhost:8000/v1"
)
task = """
生成一份关于"AI 编程助手"市场的竞争分析报告,要求:
1. 调研 GitHub Copilot、Cursor、Tabnine、CodeWhisperer 四个主要竞品
2. 比较各产品的功能特性、定价策略、市场份额
3. 分析市场趋势和增长机会
4. 撰写完整的竞争分析报告(包含执行摘要、竞品矩阵、战略建议)
"""
result = await orchestrator.run(task)
print(f"\n{'='*60}")
print(f"最终报告(节选):")
print(f"{'='*60}")
print(result["answer"][:1000])
print("...")
print(f"\n总计完成 {result['subtasks_succeeded']}/{result['subtasks_total']} 个子任务")
if __name__ == "__main__":
asyncio.run(main())
54.4 结果汇总与冲突解决
当多个 Worker 的输出存在冲突时,Orchestrator 需要处理:
冲突类型
| 冲突类型 | 示例 | 解决策略 |
|---|---|---|
| 事实冲突 | Research 说市场规模 10B,Analysis 说 8B | 以有来源引用的为准 |
| 结论冲突 | Research 认为市场饱和,Analysis 认为增长强劲 | 两种观点都保留,注明来源 |
| 格式冲突 | 不同 Worker 使用不同单位或格式 | 标准化为统一格式 |
| 内容重复 | 多个 Worker 描述了相同内容 | 合并,保留最详细的版本 |
async def resolve_conflicts(
client: AsyncOpenAI,
model: str,
task: str,
worker_outputs: dict
) -> str:
"""使用 LLM 解决 Worker 输出间的冲突"""
outputs_str = "\n\n".join([
f"[{worker}]: {output[:500]}"
for worker, output in worker_outputs.items()
])
response = await client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": """分析多个来源的输出,识别并解决以下类型的冲突:
1. 事实冲突:以有明确来源的信息为准
2. 结论冲突:保留多方观点,注明来源
3. 内容重复:合并,保留最完整的版本
生成统一、一致的最终输出。"""
},
{
"role": "user",
"content": f"任务:{task}\n\n各来源输出:\n{outputs_str}"
}
],
temperature=0.2,
max_tokens=3000
)
return response.choices[0].message.content
小结
本章深入讲解了 Orchestrator 模式的设计与实现:
- 职责分离:Orchestrator 负责战略分配,Worker 负责专业执行,各司其职。
- 任务分解策略:按领域专业性、工具依赖、并行性和顺序依赖四个维度分解任务。
- Worker 专业化:Research Worker(搜索)、Analysis Worker(数据)、Writing Worker(写作)各有专属系统提示词和工具集。
- 并行执行:无依赖关系的子任务通过
asyncio.gather并行分发,显著提升效率。 - 冲突解决:Orchestrator 在汇总阶段识别并解决 Worker 输出间的事实冲突和内容重复。
思考题
- Orchestrator 如何处理一个 Worker 返回低质量结果的情况?应该直接重试还是请另一个 Worker 验证?
- 如果任务需要 Worker 之间的中间通信(如 Research Worker 需要问 Analysis Worker 一个具体问题),在当前隔离架构下如何实现?
- 对于 Orchestrator 制定的分配计划,应该如何验证其合理性?LLM 制定的计划可能存在什么问题?
- 当 Worker 数量增多时(超过10个),Orchestrator 的系统提示词如何保持简洁而不失完整?