第 40 章
A2A 协议:Agent 间通信标准
第40章:A2A 协议:Agent 间通信标准
当单个 Agent 能力不足以完成复杂任务时,多个专业化 Agent 的协作就成为必然。但 Agent 间如何"对话"?如何委托任务?如何交接中间结果?这正是 A2A(Agent-to-Agent)协议要解决的问题。A2A 是 Google 主导、多家公司联合制定的 Agent 间通信标准,与 Anthropic 主导的 MCP 形成互补——MCP 解决"工具调用",A2A 解决"Agent 委托"。本章深入解析 A2A 协议,并通过两个 Hermes 实例协作的完整实战展示其应用。
40.1 A2A 协议背景与设计理念
为什么需要 A2A?
MCP 解决了什么问题:
Agent ──MCP──→ 工具(数据库、文件、API)
✓ 工具是无状态的"函数调用"
✓ 工具执行时间短(毫秒到秒)
✓ 工具不需要"思考"
MCP 没有解决的问题:
Agent A ──?──→ Agent B(另一个 AI Agent)
✗ Agent B 可能需要数分钟完成任务
✗ Agent B 有自己的推理过程(不是简单函数)
✗ Agent B 可能进一步委托给 Agent C
✗ Agent A 无法知道 Agent B 的能力边界
A2A 解决的核心问题:
"如何让不同厂商、不同框架的 AI Agent 互相理解并委托任务"
Google 的设计理念
A2A 由 Google 在 2025 年初提出,核心设计原则:
| 原则 | 含义 |
|---|---|
| 能力优先 | Agent 通过 Agent Card 声明自己能做什么 |
| 异步为主 | 长时间任务通过流式/轮询获取进度 |
| 协议中立 | 基于 HTTP+JSON,不依赖特定 AI 框架 |
| 可发现 | Agent 自动发现其他 Agent 的能力 |
| 安全 | 内置认证,支持企业级访问控制 |
A2A 与 MCP 的互补关系
┌─────────────────────────────────────────────────────────┐
│ Agent 能力调用层次 │
├─────────────────────────────────────────────────────────┤
│ │
│ 用户请求 │
│ ↓ │
│ Hermes Agent(编排者) │
│ │ │
│ ├── MCP ──→ 工具层 │
│ │ (数据库、搜索、文件系统) │
│ │ 特点:同步、无状态、快速 │
│ │ │
│ └── A2A ──→ Agent 层 │
│ (专家 Agent、子 Agent) │
│ 特点:异步、有状态、可长时间运行 │
│ │
└─────────────────────────────────────────────────────────┘
MCP(工具调用):
我需要查询数据库 → query_db() → 返回结果
类比:调用函数
A2A(Agent 委托):
我需要写一份法律合规分析 → 委托给合规 Agent →
等待(可能需要5分钟)→ 获取完整分析报告
类比:外包给专家
40.2 核心概念:Task、Artifact、Message
Task(任务)
Task 是 A2A 中最基本的工作单元,代表一项需要由目标 Agent 完成的工作。
{
"id": "task-8f3a92b1-4c2d-4e5f-a123-456789abcdef",
"status": {
"state": "working", // submitted | working | completed | failed | canceled
"timestamp": "2025-01-15T09:30:00Z",
"progress": 0.45 // 可选:进度 0.0-1.0
},
"sessionId": "session-xxx",
"metadata": {
"priority": "high",
"deadline": "2025-01-15T10:00:00Z"
}
}
Task 状态机:
submitted → working → completed
↓ ↓
failed (返回 Artifacts)
↓
canceled
Artifact(产物)
Artifact 是 Task 执行完成后产生的输出,可以是文本、文件、结构化数据等。
{
"name": "compliance_report",
"description": "合规分析报告",
"parts": [
{
"type": "text",
"text": "## 合规分析报告\n\n根据您提供的合同内容...",
"metadata": {
"language": "zh-CN",
"word_count": 2847
}
},
{
"type": "file",
"file": {
"name": "compliance_report.pdf",
"mimeType": "application/pdf",
"bytes": "JVBERi0xLjQK..." // Base64 编码
}
}
],
"metadata": {
"created_at": "2025-01-15T09:35:00Z",
"confidence": 0.92
}
}
Message(消息)
Message 是 Client 和 Agent 之间来回传递的通信单元,支持多媒体内容。
{
"role": "user", // user | agent
"parts": [
{
"type": "text",
"text": "请分析以下合同的合规风险"
},
{
"type": "file",
"file": {
"name": "contract.pdf",
"mimeType": "application/pdf",
"uri": "https://storage.example.com/contracts/2025/001.pdf"
}
}
],
"metadata": {
"timestamp": "2025-01-15T09:28:00Z"
}
}
40.3 Agent Card 规范
Agent Card 是 A2A 中最重要的发现机制——每个 Agent 在其固定地址 /.well-known/agent.json 发布自己的能力声明。
完整 Agent Card 示例
{
"$schema": "https://a2aprotocol.ai/schema/agent-card.json",
"name": "Legal Compliance Analyzer",
"description": "专注于商业合同的法律合规性分析,覆盖中国、美国和欧盟法规。可识别潜在法律风险,提供修改建议。",
"version": "2.1.0",
"url": "https://legal-agent.example.com",
"provider": {
"organization": "LegalTech Corp",
"url": "https://legaltech.example.com",
"contact": "[email protected]"
},
"capabilities": {
"streaming": true, // 支持流式响应
"pushNotifications": true, // 支持 Webhook 推送
"stateTransitionHistory": true // 支持查询状态历史
},
"skills": [
{
"id": "contract-compliance",
"name": "合同合规分析",
"description": "分析商业合同的法律合规性,识别潜在风险条款",
"tags": ["legal", "compliance", "contract", "risk-analysis"],
"examples": [
"分析这份采购合同是否符合中国《合同法》要求",
"识别这份劳动合同中的不平等条款"
],
"inputModes": ["text", "file"],
"outputModes": ["text", "file"]
},
{
"id": "gdpr-check",
"name": "GDPR 合规检查",
"description": "检查数据处理协议是否符合欧盟 GDPR 要求",
"tags": ["gdpr", "data-privacy", "eu-compliance"],
"inputModes": ["text"],
"outputModes": ["text"]
}
],
"defaultInputModes": ["text", "file"],
"defaultOutputModes": ["text", "file"],
"authentication": {
"schemes": ["bearer", "oauth2"],
"oauth2": {
"authorizationUrl": "https://auth.legaltech.example.com/oauth/authorize",
"tokenUrl": "https://auth.legaltech.example.com/oauth/token",
"scopes": ["a2a:tasks:create", "a2a:tasks:read"]
}
}
}
Agent Card 的发现机制
# hermes/a2a/discovery.py
import aiohttp
from typing import Dict, Optional
import json
class AgentDiscovery:
"""A2A Agent 发现服务"""
WELL_KNOWN_PATH = "/.well-known/agent.json"
def __init__(self):
self._cache: Dict[str, Dict] = {}
async def discover(self, agent_url: str) -> Optional[Dict]:
"""
从指定 URL 发现 Agent 的能力
Args:
agent_url: Agent 的基础 URL(如 https://legal-agent.example.com)
Returns:
Agent Card 字典,或 None(如果无法发现)
"""
if agent_url in self._cache:
return self._cache[agent_url]
card_url = agent_url.rstrip("/") + self.WELL_KNOWN_PATH
async with aiohttp.ClientSession() as session:
try:
async with session.get(card_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
card = await resp.json()
self._cache[agent_url] = card
return card
except Exception as e:
print(f"无法发现 Agent: {agent_url}: {e}")
return None
def can_handle(self, agent_card: Dict, task_description: str) -> float:
"""
评估 Agent 处理特定任务的能力(0.0-1.0)
简单实现:基于关键词匹配
生产实现:使用向量相似度或 LLM 评估
"""
description = task_description.lower()
score = 0.0
for skill in agent_card.get("skills", []):
skill_tags = skill.get("tags", [])
skill_desc = skill.get("description", "").lower()
skill_name = skill.get("name", "").lower()
# 标签匹配
for tag in skill_tags:
if tag.lower() in description:
score += 0.3
# 描述关键词匹配
words = description.split()
for word in words:
if word in skill_desc or word in skill_name:
score += 0.1
return min(score, 1.0)
40.4 A2A 协议 API 规范
核心 API 端点
POST /tasks 创建新任务
GET /tasks/{id} 获取任务状态
PUT /tasks/{id}/cancel 取消任务
GET /tasks/{id}/artifacts 获取任务产物
POST /tasks/{id}/messages 发送消息(多轮对话)
GET /tasks/{id}/messages 获取消息历史
# 流式响应
GET /tasks/{id}/stream SSE 流式获取任务进度
# Agent 发现
GET /.well-known/agent.json 获取 Agent Card
创建任务的完整请求/响应
// POST /tasks
{
"message": {
"role": "user",
"parts": [
{
"type": "text",
"text": "请分析以下合同的合规风险,重点关注数据保护和知识产权条款"
},
{
"type": "file",
"file": {
"name": "service_agreement.pdf",
"mimeType": "application/pdf",
"bytes": "JVBERi0xLjQK..."
}
}
]
},
"sessionId": "session-12345",
"metadata": {
"priority": "high",
"callback_url": "https://hermes.myapp.com/a2a/callbacks"
}
}
// 响应 202 Accepted
{
"id": "task-8f3a92b1-4c2d-4e5f-a123-456789abcdef",
"status": {
"state": "submitted",
"timestamp": "2025-01-15T09:28:00Z"
},
"metadata": {
"estimated_completion": "2025-01-15T09:33:00Z"
}
}
40.5 Hermes 的 A2A 支持现状
Hermes 1.7.x A2A 支持矩阵
| 功能 | 支持状态 | 版本 | 备注 |
|---|---|---|---|
| Agent Card 发布 | 完全支持 | 1.7.0 | 自动生成 |
| 发送 A2A 任务 | 完全支持 | 1.7.0 | 作为 Client |
| 接收 A2A 任务 | 完全支持 | 1.7.0 | 作为 Server |
| 流式任务进度 | 完全支持 | 1.7.1 | SSE |
| Webhook 推送 | 完全支持 | 1.7.2 | 需配置回调 URL |
| 多轮对话任务 | 实验性支持 | 1.7.3 | Beta |
| 跨组织认证 | 部分支持 | 1.7.x | 仅 Bearer Token |
| 任务优先级 | 计划中 | 1.8.0 | - |
Hermes 中配置 A2A
# hermes_config.yaml
a2a:
# 作为 A2A Server(接受其他 Agent 委托)
server:
enabled: true
url: "https://hermes.myapp.com"
# 发布的技能
skills:
- id: "research-and-summarize"
name: "研究与摘要"
description: "对给定主题进行深度网络研究并生成摘要报告"
tags: ["research", "summarization", "information-retrieval"]
# 认证配置
auth:
required: true
schemes: ["bearer"]
trusted_issuers:
- "https://auth.partner-company.com"
# 作为 A2A Client(委托其他 Agent)
client:
known_agents:
- name: "legal-agent"
url: "https://legal-agent.partner.com"
auth:
type: "bearer"
token_env: "LEGAL_AGENT_TOKEN"
- name: "translation-agent"
url: "https://translate.service.com"
auth:
type: "oauth2"
client_id_env: "TRANSLATE_CLIENT_ID"
client_secret_env: "TRANSLATE_CLIENT_SECRET"
40.6 实战:两个 Hermes 实例通过 A2A 协作
场景描述
用户请求:
"请帮我分析这份英文合同的合规风险,并翻译成中文"
Hermes 主实例(编排者)的任务分解:
1. 委托 Legal Agent(法律合规分析)
2. 委托 Translation Agent(翻译)
3. 合并两个结果,返回给用户
Hermes A2A Client 实现
# hermes/a2a/client.py
import asyncio
import aiohttp
import json
from typing import Any, Dict, Optional, AsyncGenerator
import logging
logger = logging.getLogger(__name__)
class A2AClient:
"""Hermes A2A 客户端"""
def __init__(self, agent_url: str, auth_token: str):
self.agent_url = agent_url.rstrip("/")
self.auth_token = auth_token
@property
def headers(self) -> Dict:
return {
"Authorization": f"Bearer {self.auth_token}",
"Content-Type": "application/json"
}
async def create_task(
self,
text: str,
files: list = None,
session_id: str = None,
callback_url: str = None
) -> Dict:
"""创建 A2A 任务"""
parts = [{"type": "text", "text": text}]
if files:
for f in files:
parts.append({"type": "file", "file": f})
payload = {
"message": {"role": "user", "parts": parts}
}
if session_id:
payload["sessionId"] = session_id
if callback_url:
payload["metadata"] = {"callback_url": callback_url}
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.post(
f"{self.agent_url}/tasks",
json=payload
) as resp:
resp.raise_for_status()
return await resp.json()
async def wait_for_completion(
self,
task_id: str,
poll_interval: float = 2.0,
timeout: float = 300.0
) -> Dict:
"""
轮询等待任务完成
对于支持流式的 Agent,建议使用 stream_task_progress()
"""
import time
start = time.time()
while time.time() - start < timeout:
task = await self.get_task(task_id)
state = task["status"]["state"]
if state == "completed":
# 获取产物
artifacts = await self.get_artifacts(task_id)
task["artifacts"] = artifacts
return task
elif state == "failed":
raise A2ATaskError(
f"任务失败: {task_id}",
task.get("status", {}).get("message", "Unknown error")
)
elif state == "canceled":
raise A2ATaskError(f"任务被取消: {task_id}")
# 还在执行中,等待后重试
progress = task["status"].get("progress", 0)
logger.info(f"任务 {task_id[:8]}... 进度: {progress*100:.0f}%")
await asyncio.sleep(poll_interval)
raise TimeoutError(f"任务 {task_id} 超时(>{timeout}s)")
async def stream_task_progress(
self,
task_id: str
) -> AsyncGenerator[Dict, None]:
"""通过 SSE 流式获取任务进度"""
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.get(f"{self.agent_url}/tasks/{task_id}/stream") as resp:
resp.raise_for_status()
async for line in resp.content:
line = line.decode().strip()
if line.startswith("data: "):
data = json.loads(line[6:])
yield data
if data.get("type") in ("completed", "failed"):
break
async def get_task(self, task_id: str) -> Dict:
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.get(f"{self.agent_url}/tasks/{task_id}") as resp:
resp.raise_for_status()
return await resp.json()
async def get_artifacts(self, task_id: str) -> list:
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.get(f"{self.agent_url}/tasks/{task_id}/artifacts") as resp:
resp.raise_for_status()
return await resp.json()
async def cancel_task(self, task_id: str):
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.put(f"{self.agent_url}/tasks/{task_id}/cancel") as resp:
resp.raise_for_status()
class A2ATaskError(Exception):
def __init__(self, message: str, detail: str = None):
super().__init__(message)
self.detail = detail
完整的协作编排
# orchestrator_demo.py
import asyncio
import os
from hermes.a2a.client import A2AClient
async def analyze_contract_with_collaboration(
contract_text: str,
contract_file_bytes: bytes = None
) -> Dict:
"""
使用两个专业 Agent 协作完成合同分析+翻译任务
"""
# 创建两个 Agent 的客户端
legal_client = A2AClient(
agent_url="https://legal-agent.partner.com",
auth_token=os.environ["LEGAL_AGENT_TOKEN"]
)
translation_client = A2AClient(
agent_url="https://hermes-translation.myapp.com", # 另一个 Hermes 实例
auth_token=os.environ["TRANSLATION_AGENT_TOKEN"]
)
# 准备文件(如果有)
files = []
if contract_file_bytes:
files = [{
"name": "contract.pdf",
"mimeType": "application/pdf",
"bytes": __import__("base64").b64encode(contract_file_bytes).decode()
}]
# 步骤 1:同时发起两个任务(法律分析和翻译可以并行开始)
print("发起并行任务:法律分析 + 翻译...")
legal_task, translation_task = await asyncio.gather(
legal_client.create_task(
text=f"请分析以下合同的合规风险,重点关注数据保护和知识产权条款:\n\n{contract_text}",
files=files
),
translation_client.create_task(
text=f"请将以下英文合同翻译成中文(专业法律翻译):\n\n{contract_text}"
)
)
legal_task_id = legal_task["id"]
translation_task_id = translation_task["id"]
print(f"法律分析任务 ID: {legal_task_id[:8]}...")
print(f"翻译任务 ID: {translation_task_id[:8]}...")
# 步骤 2:并行等待两个任务完成(使用流式进度)
async def wait_with_streaming(client: A2AClient, task_id: str, name: str):
async for event in client.stream_task_progress(task_id):
state = event.get("status", {}).get("state", "")
progress = event.get("status", {}).get("progress", 0)
print(f"[{name}] 进度: {progress*100:.0f}% ({state})")
if state == "completed":
return await client.get_task(task_id)
elif state == "failed":
raise Exception(f"{name} 任务失败")
legal_result, translation_result = await asyncio.gather(
wait_with_streaming(legal_client, legal_task_id, "法律分析"),
wait_with_streaming(translation_client, translation_task_id, "翻译")
)
# 步骤 3:提取并合并结果
def extract_text_from_artifacts(task_result: Dict) -> str:
for artifact in task_result.get("artifacts", []):
for part in artifact.get("parts", []):
if part["type"] == "text":
return part["text"]
return ""
legal_analysis = extract_text_from_artifacts(legal_result)
chinese_translation = extract_text_from_artifacts(translation_result)
# 步骤 4:组合最终报告
final_report = f"""
# 合同分析综合报告
## 中文译文
{chinese_translation}
---
## 合规风险分析
{legal_analysis}
---
*本报告由 Legal Agent 和 Translation Agent 协作生成*
*生成时间:{__import__("datetime").datetime.now().isoformat()}*
"""
return {
"report": final_report,
"legal_task_id": legal_task_id,
"translation_task_id": translation_task_id
}
# Hermes Skill 封装
class ContractAnalysisSkill:
"""将 A2A 协作封装为 Hermes Skill"""
async def execute(self, inputs: Dict) -> Dict:
contract_text = inputs.get("contract_text", "")
try:
result = await analyze_contract_with_collaboration(
contract_text=contract_text
)
return {
"success": True,
"report": result["report"],
"metadata": {
"legal_task_id": result["legal_task_id"],
"translation_task_id": result["translation_task_id"]
}
}
except Exception as e:
return {
"success": False,
"error": str(e),
"report": None
}
# 主函数测试
if __name__ == "__main__":
import asyncio
sample_contract = """
SERVICE AGREEMENT
This Service Agreement ("Agreement") is entered into as of January 1, 2025,
by and between TechCorp Inc. ("Service Provider") and ClientCo Ltd. ("Client").
1. SERVICES: Service Provider agrees to provide software development services...
2. DATA PROTECTION: All client data shall be processed in accordance with GDPR...
3. INTELLECTUAL PROPERTY: All work product created hereunder shall be...
"""
result = asyncio.run(analyze_contract_with_collaboration(sample_contract))
print(result["report"])
40.7 A2A vs MCP 深度对比
| 维度 | MCP | A2A |
|---|---|---|
| 主导方 | Anthropic | Google(多方联合) |
| 设计目标 | Agent 调用外部工具 | Agent 委托其他 Agent |
| 执行时间 | 秒级(同步为主) | 分钟到小时(异步为主) |
| 状态管理 | 无状态(单次调用) | 有状态(Task 生命周期) |
| 多轮交互 | 不支持 | 支持(/tasks/{id}/messages) |
| 流式输出 | 有限支持 | 原生 SSE 支持 |
| 能力声明 | Tool/Resource/Prompt | Agent Card + Skill |
| 认证 | 协议层无规定 | 内置 Bearer/OAuth2 |
| 互补性 | 工具层 | Agent 委托层 |
本章小结
A2A 协议填补了多 Agent 协作的空白,与 MCP 共同构成完整的 Agent 能力调用体系:
- 设计背景:MCP 解决工具调用,A2A 解决 Agent 委托——两者定位不同,互为补充
- 核心概念:Task(工作单元)、Artifact(输出产物)、Message(通信消息)三位一体
- Agent Card:
/.well-known/agent.json是 Agent 能力的"名片",支持自动发现 - 异步为主:A2A 原生支持长时间任务、流式进度、Webhook 回调
- Hermes 支持:1.7.x 版本已完整支持 A2A Client 和 Server 两种角色
- 实战:两个 Hermes 实例并行协作,完成法律分析+翻译的复杂任务
思考题
- A2A 的 Agent Card 目前是静态声明的(
/.well-known/agent.json)。如果 Agent 的能力会随时间动态变化(例如根据负载动态关闭某些 Skill),Agent Card 应该如何设计? - 在 A2A 多跳委托(A→B→C→D)的场景中,如何追踪完整的调用链?如何实现分布式 Tracing?
- 如果 Agent B 在处理委托任务时需要反向询问 Agent A 一些问题(澄清需求),A2A 的多轮对话机制是否够用?有什么局限性?
- 设计一个"A2A 任务市场":Agent 可以发布需要其他 Agent 帮助的任务,其他 Agent 可以投标承接。这需要在 A2A 协议基础上增加哪些机制?