第 40 章

A2A 协议:Agent 间通信标准

第40章:A2A 协议:Agent 间通信标准

当单个 Agent 能力不足以完成复杂任务时,多个专业化 Agent 的协作就成为必然。但 Agent 间如何"对话"?如何委托任务?如何交接中间结果?这正是 A2A(Agent-to-Agent)协议要解决的问题。A2A 是 Google 主导、多家公司联合制定的 Agent 间通信标准,与 Anthropic 主导的 MCP 形成互补——MCP 解决"工具调用",A2A 解决"Agent 委托"。本章深入解析 A2A 协议,并通过两个 Hermes 实例协作的完整实战展示其应用。


40.1 A2A 协议背景与设计理念

为什么需要 A2A?

MCP 解决了什么问题:
    Agent ──MCP──→ 工具(数据库、文件、API)
    ✓ 工具是无状态的"函数调用"
    ✓ 工具执行时间短(毫秒到秒)
    ✓ 工具不需要"思考"

MCP 没有解决的问题:
    Agent A ──?──→ Agent B(另一个 AI Agent)
    ✗ Agent B 可能需要数分钟完成任务
    ✗ Agent B 有自己的推理过程(不是简单函数)
    ✗ Agent B 可能进一步委托给 Agent C
    ✗ Agent A 无法知道 Agent B 的能力边界

A2A 解决的核心问题:
    "如何让不同厂商、不同框架的 AI Agent 互相理解并委托任务"

Google 的设计理念

A2A 由 Google 在 2025 年初提出,核心设计原则:

原则 含义
能力优先 Agent 通过 Agent Card 声明自己能做什么
异步为主 长时间任务通过流式/轮询获取进度
协议中立 基于 HTTP+JSON,不依赖特定 AI 框架
可发现 Agent 自动发现其他 Agent 的能力
安全 内置认证,支持企业级访问控制

A2A 与 MCP 的互补关系

┌─────────────────────────────────────────────────────────┐
│                  Agent 能力调用层次                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│    用户请求                                              │
│       ↓                                                │
│    Hermes Agent(编排者)                               │
│       │                                                │
│       ├── MCP ──→ 工具层                                │
│       │          (数据库、搜索、文件系统)               │
│       │          特点:同步、无状态、快速                  │
│       │                                                │
│       └── A2A ──→ Agent 层                              │
│                  (专家 Agent、子 Agent)                │
│                  特点:异步、有状态、可长时间运行          │
│                                                         │
└─────────────────────────────────────────────────────────┘

MCP(工具调用):
  我需要查询数据库 → query_db() → 返回结果
  类比:调用函数

A2A(Agent 委托):
  我需要写一份法律合规分析 → 委托给合规 Agent → 
  等待(可能需要5分钟)→ 获取完整分析报告
  类比:外包给专家

40.2 核心概念:Task、Artifact、Message

Task(任务)

Task 是 A2A 中最基本的工作单元,代表一项需要由目标 Agent 完成的工作。

{
  "id": "task-8f3a92b1-4c2d-4e5f-a123-456789abcdef",
  "status": {
    "state": "working",      // submitted | working | completed | failed | canceled
    "timestamp": "2025-01-15T09:30:00Z",
    "progress": 0.45         // 可选:进度 0.0-1.0
  },
  "sessionId": "session-xxx",
  "metadata": {
    "priority": "high",
    "deadline": "2025-01-15T10:00:00Z"
  }
}

Task 状态机:

submitted → working → completed
              ↓           ↓
            failed     (返回 Artifacts)
              ↓
           canceled

Artifact(产物)

Artifact 是 Task 执行完成后产生的输出,可以是文本、文件、结构化数据等。

{
  "name": "compliance_report",
  "description": "合规分析报告",
  "parts": [
    {
      "type": "text",
      "text": "## 合规分析报告\n\n根据您提供的合同内容...",
      "metadata": {
        "language": "zh-CN",
        "word_count": 2847
      }
    },
    {
      "type": "file",
      "file": {
        "name": "compliance_report.pdf",
        "mimeType": "application/pdf",
        "bytes": "JVBERi0xLjQK..."    // Base64 编码
      }
    }
  ],
  "metadata": {
    "created_at": "2025-01-15T09:35:00Z",
    "confidence": 0.92
  }
}

Message(消息)

Message 是 Client 和 Agent 之间来回传递的通信单元,支持多媒体内容。

{
  "role": "user",    // user | agent
  "parts": [
    {
      "type": "text",
      "text": "请分析以下合同的合规风险"
    },
    {
      "type": "file",
      "file": {
        "name": "contract.pdf",
        "mimeType": "application/pdf",
        "uri": "https://storage.example.com/contracts/2025/001.pdf"
      }
    }
  ],
  "metadata": {
    "timestamp": "2025-01-15T09:28:00Z"
  }
}

40.3 Agent Card 规范

Agent Card 是 A2A 中最重要的发现机制——每个 Agent 在其固定地址 /.well-known/agent.json 发布自己的能力声明。

完整 Agent Card 示例

{
  "$schema": "https://a2aprotocol.ai/schema/agent-card.json",
  "name": "Legal Compliance Analyzer",
  "description": "专注于商业合同的法律合规性分析,覆盖中国、美国和欧盟法规。可识别潜在法律风险,提供修改建议。",
  "version": "2.1.0",
  "url": "https://legal-agent.example.com",
  
  "provider": {
    "organization": "LegalTech Corp",
    "url": "https://legaltech.example.com",
    "contact": "[email protected]"
  },
  
  "capabilities": {
    "streaming": true,           // 支持流式响应
    "pushNotifications": true,   // 支持 Webhook 推送
    "stateTransitionHistory": true  // 支持查询状态历史
  },
  
  "skills": [
    {
      "id": "contract-compliance",
      "name": "合同合规分析",
      "description": "分析商业合同的法律合规性,识别潜在风险条款",
      "tags": ["legal", "compliance", "contract", "risk-analysis"],
      "examples": [
        "分析这份采购合同是否符合中国《合同法》要求",
        "识别这份劳动合同中的不平等条款"
      ],
      "inputModes": ["text", "file"],
      "outputModes": ["text", "file"]
    },
    {
      "id": "gdpr-check",
      "name": "GDPR 合规检查",
      "description": "检查数据处理协议是否符合欧盟 GDPR 要求",
      "tags": ["gdpr", "data-privacy", "eu-compliance"],
      "inputModes": ["text"],
      "outputModes": ["text"]
    }
  ],
  
  "defaultInputModes": ["text", "file"],
  "defaultOutputModes": ["text", "file"],
  
  "authentication": {
    "schemes": ["bearer", "oauth2"],
    "oauth2": {
      "authorizationUrl": "https://auth.legaltech.example.com/oauth/authorize",
      "tokenUrl": "https://auth.legaltech.example.com/oauth/token",
      "scopes": ["a2a:tasks:create", "a2a:tasks:read"]
    }
  }
}

Agent Card 的发现机制

# hermes/a2a/discovery.py
import aiohttp
from typing import Dict, Optional
import json

class AgentDiscovery:
    """A2A Agent 发现服务"""
    
    WELL_KNOWN_PATH = "/.well-known/agent.json"
    
    def __init__(self):
        self._cache: Dict[str, Dict] = {}
    
    async def discover(self, agent_url: str) -> Optional[Dict]:
        """
        从指定 URL 发现 Agent 的能力
        
        Args:
            agent_url: Agent 的基础 URL(如 https://legal-agent.example.com)
        
        Returns:
            Agent Card 字典,或 None(如果无法发现)
        """
        if agent_url in self._cache:
            return self._cache[agent_url]
        
        card_url = agent_url.rstrip("/") + self.WELL_KNOWN_PATH
        
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(card_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                    if resp.status == 200:
                        card = await resp.json()
                        self._cache[agent_url] = card
                        return card
            except Exception as e:
                print(f"无法发现 Agent: {agent_url}: {e}")
        
        return None
    
    def can_handle(self, agent_card: Dict, task_description: str) -> float:
        """
        评估 Agent 处理特定任务的能力(0.0-1.0)
        
        简单实现:基于关键词匹配
        生产实现:使用向量相似度或 LLM 评估
        """
        description = task_description.lower()
        score = 0.0
        
        for skill in agent_card.get("skills", []):
            skill_tags = skill.get("tags", [])
            skill_desc = skill.get("description", "").lower()
            skill_name = skill.get("name", "").lower()
            
            # 标签匹配
            for tag in skill_tags:
                if tag.lower() in description:
                    score += 0.3
            
            # 描述关键词匹配
            words = description.split()
            for word in words:
                if word in skill_desc or word in skill_name:
                    score += 0.1
        
        return min(score, 1.0)

40.4 A2A 协议 API 规范

核心 API 端点

POST   /tasks                      创建新任务
GET    /tasks/{id}                 获取任务状态
PUT    /tasks/{id}/cancel          取消任务
GET    /tasks/{id}/artifacts       获取任务产物
POST   /tasks/{id}/messages        发送消息(多轮对话)
GET    /tasks/{id}/messages        获取消息历史

# 流式响应
GET    /tasks/{id}/stream          SSE 流式获取任务进度

# Agent 发现
GET    /.well-known/agent.json     获取 Agent Card

创建任务的完整请求/响应

// POST /tasks
{
  "message": {
    "role": "user",
    "parts": [
      {
        "type": "text",
        "text": "请分析以下合同的合规风险,重点关注数据保护和知识产权条款"
      },
      {
        "type": "file",
        "file": {
          "name": "service_agreement.pdf",
          "mimeType": "application/pdf",
          "bytes": "JVBERi0xLjQK..."
        }
      }
    ]
  },
  "sessionId": "session-12345",
  "metadata": {
    "priority": "high",
    "callback_url": "https://hermes.myapp.com/a2a/callbacks"
  }
}

// 响应 202 Accepted
{
  "id": "task-8f3a92b1-4c2d-4e5f-a123-456789abcdef",
  "status": {
    "state": "submitted",
    "timestamp": "2025-01-15T09:28:00Z"
  },
  "metadata": {
    "estimated_completion": "2025-01-15T09:33:00Z"
  }
}

40.5 Hermes 的 A2A 支持现状

Hermes 1.7.x A2A 支持矩阵

功能 支持状态 版本 备注
Agent Card 发布 完全支持 1.7.0 自动生成
发送 A2A 任务 完全支持 1.7.0 作为 Client
接收 A2A 任务 完全支持 1.7.0 作为 Server
流式任务进度 完全支持 1.7.1 SSE
Webhook 推送 完全支持 1.7.2 需配置回调 URL
多轮对话任务 实验性支持 1.7.3 Beta
跨组织认证 部分支持 1.7.x 仅 Bearer Token
任务优先级 计划中 1.8.0 -

Hermes 中配置 A2A

# hermes_config.yaml
a2a:
  # 作为 A2A Server(接受其他 Agent 委托)
  server:
    enabled: true
    url: "https://hermes.myapp.com"
    
    # 发布的技能
    skills:
      - id: "research-and-summarize"
        name: "研究与摘要"
        description: "对给定主题进行深度网络研究并生成摘要报告"
        tags: ["research", "summarization", "information-retrieval"]
    
    # 认证配置
    auth:
      required: true
      schemes: ["bearer"]
      trusted_issuers:
        - "https://auth.partner-company.com"
  
  # 作为 A2A Client(委托其他 Agent)
  client:
    known_agents:
      - name: "legal-agent"
        url: "https://legal-agent.partner.com"
        auth:
          type: "bearer"
          token_env: "LEGAL_AGENT_TOKEN"
      
      - name: "translation-agent"
        url: "https://translate.service.com"
        auth:
          type: "oauth2"
          client_id_env: "TRANSLATE_CLIENT_ID"
          client_secret_env: "TRANSLATE_CLIENT_SECRET"

40.6 实战:两个 Hermes 实例通过 A2A 协作

场景描述

用户请求:
"请帮我分析这份英文合同的合规风险,并翻译成中文"

Hermes 主实例(编排者)的任务分解:
1. 委托 Legal Agent(法律合规分析)
2. 委托 Translation Agent(翻译)
3. 合并两个结果,返回给用户

Hermes A2A Client 实现

# hermes/a2a/client.py
import asyncio
import aiohttp
import json
from typing import Any, Dict, Optional, AsyncGenerator
import logging

logger = logging.getLogger(__name__)

class A2AClient:
    """Hermes A2A 客户端"""
    
    def __init__(self, agent_url: str, auth_token: str):
        self.agent_url = agent_url.rstrip("/")
        self.auth_token = auth_token
    
    @property
    def headers(self) -> Dict:
        return {
            "Authorization": f"Bearer {self.auth_token}",
            "Content-Type": "application/json"
        }
    
    async def create_task(
        self,
        text: str,
        files: list = None,
        session_id: str = None,
        callback_url: str = None
    ) -> Dict:
        """创建 A2A 任务"""
        parts = [{"type": "text", "text": text}]
        
        if files:
            for f in files:
                parts.append({"type": "file", "file": f})
        
        payload = {
            "message": {"role": "user", "parts": parts}
        }
        if session_id:
            payload["sessionId"] = session_id
        if callback_url:
            payload["metadata"] = {"callback_url": callback_url}
        
        async with aiohttp.ClientSession(headers=self.headers) as session:
            async with session.post(
                f"{self.agent_url}/tasks",
                json=payload
            ) as resp:
                resp.raise_for_status()
                return await resp.json()
    
    async def wait_for_completion(
        self,
        task_id: str,
        poll_interval: float = 2.0,
        timeout: float = 300.0
    ) -> Dict:
        """
        轮询等待任务完成
        
        对于支持流式的 Agent,建议使用 stream_task_progress()
        """
        import time
        start = time.time()
        
        while time.time() - start < timeout:
            task = await self.get_task(task_id)
            state = task["status"]["state"]
            
            if state == "completed":
                # 获取产物
                artifacts = await self.get_artifacts(task_id)
                task["artifacts"] = artifacts
                return task
            
            elif state == "failed":
                raise A2ATaskError(
                    f"任务失败: {task_id}",
                    task.get("status", {}).get("message", "Unknown error")
                )
            
            elif state == "canceled":
                raise A2ATaskError(f"任务被取消: {task_id}")
            
            # 还在执行中,等待后重试
            progress = task["status"].get("progress", 0)
            logger.info(f"任务 {task_id[:8]}... 进度: {progress*100:.0f}%")
            await asyncio.sleep(poll_interval)
        
        raise TimeoutError(f"任务 {task_id} 超时(>{timeout}s)")
    
    async def stream_task_progress(
        self,
        task_id: str
    ) -> AsyncGenerator[Dict, None]:
        """通过 SSE 流式获取任务进度"""
        async with aiohttp.ClientSession(headers=self.headers) as session:
            async with session.get(f"{self.agent_url}/tasks/{task_id}/stream") as resp:
                resp.raise_for_status()
                async for line in resp.content:
                    line = line.decode().strip()
                    if line.startswith("data: "):
                        data = json.loads(line[6:])
                        yield data
                        if data.get("type") in ("completed", "failed"):
                            break
    
    async def get_task(self, task_id: str) -> Dict:
        async with aiohttp.ClientSession(headers=self.headers) as session:
            async with session.get(f"{self.agent_url}/tasks/{task_id}") as resp:
                resp.raise_for_status()
                return await resp.json()
    
    async def get_artifacts(self, task_id: str) -> list:
        async with aiohttp.ClientSession(headers=self.headers) as session:
            async with session.get(f"{self.agent_url}/tasks/{task_id}/artifacts") as resp:
                resp.raise_for_status()
                return await resp.json()
    
    async def cancel_task(self, task_id: str):
        async with aiohttp.ClientSession(headers=self.headers) as session:
            async with session.put(f"{self.agent_url}/tasks/{task_id}/cancel") as resp:
                resp.raise_for_status()


class A2ATaskError(Exception):
    def __init__(self, message: str, detail: str = None):
        super().__init__(message)
        self.detail = detail

完整的协作编排

# orchestrator_demo.py
import asyncio
import os
from hermes.a2a.client import A2AClient

async def analyze_contract_with_collaboration(
    contract_text: str,
    contract_file_bytes: bytes = None
) -> Dict:
    """
    使用两个专业 Agent 协作完成合同分析+翻译任务
    """
    
    # 创建两个 Agent 的客户端
    legal_client = A2AClient(
        agent_url="https://legal-agent.partner.com",
        auth_token=os.environ["LEGAL_AGENT_TOKEN"]
    )
    
    translation_client = A2AClient(
        agent_url="https://hermes-translation.myapp.com",  # 另一个 Hermes 实例
        auth_token=os.environ["TRANSLATION_AGENT_TOKEN"]
    )
    
    # 准备文件(如果有)
    files = []
    if contract_file_bytes:
        files = [{
            "name": "contract.pdf",
            "mimeType": "application/pdf",
            "bytes": __import__("base64").b64encode(contract_file_bytes).decode()
        }]
    
    # 步骤 1:同时发起两个任务(法律分析和翻译可以并行开始)
    print("发起并行任务:法律分析 + 翻译...")
    
    legal_task, translation_task = await asyncio.gather(
        legal_client.create_task(
            text=f"请分析以下合同的合规风险,重点关注数据保护和知识产权条款:\n\n{contract_text}",
            files=files
        ),
        translation_client.create_task(
            text=f"请将以下英文合同翻译成中文(专业法律翻译):\n\n{contract_text}"
        )
    )
    
    legal_task_id = legal_task["id"]
    translation_task_id = translation_task["id"]
    
    print(f"法律分析任务 ID: {legal_task_id[:8]}...")
    print(f"翻译任务 ID: {translation_task_id[:8]}...")
    
    # 步骤 2:并行等待两个任务完成(使用流式进度)
    async def wait_with_streaming(client: A2AClient, task_id: str, name: str):
        async for event in client.stream_task_progress(task_id):
            state = event.get("status", {}).get("state", "")
            progress = event.get("status", {}).get("progress", 0)
            print(f"[{name}] 进度: {progress*100:.0f}% ({state})")
            
            if state == "completed":
                return await client.get_task(task_id)
            elif state == "failed":
                raise Exception(f"{name} 任务失败")
    
    legal_result, translation_result = await asyncio.gather(
        wait_with_streaming(legal_client, legal_task_id, "法律分析"),
        wait_with_streaming(translation_client, translation_task_id, "翻译")
    )
    
    # 步骤 3:提取并合并结果
    def extract_text_from_artifacts(task_result: Dict) -> str:
        for artifact in task_result.get("artifacts", []):
            for part in artifact.get("parts", []):
                if part["type"] == "text":
                    return part["text"]
        return ""
    
    legal_analysis = extract_text_from_artifacts(legal_result)
    chinese_translation = extract_text_from_artifacts(translation_result)
    
    # 步骤 4:组合最终报告
    final_report = f"""
# 合同分析综合报告

## 中文译文

{chinese_translation}

---

## 合规风险分析

{legal_analysis}

---
*本报告由 Legal Agent 和 Translation Agent 协作生成*
*生成时间:{__import__("datetime").datetime.now().isoformat()}*
"""
    
    return {
        "report": final_report,
        "legal_task_id": legal_task_id,
        "translation_task_id": translation_task_id
    }


# Hermes Skill 封装
class ContractAnalysisSkill:
    """将 A2A 协作封装为 Hermes Skill"""
    
    async def execute(self, inputs: Dict) -> Dict:
        contract_text = inputs.get("contract_text", "")
        
        try:
            result = await analyze_contract_with_collaboration(
                contract_text=contract_text
            )
            return {
                "success": True,
                "report": result["report"],
                "metadata": {
                    "legal_task_id": result["legal_task_id"],
                    "translation_task_id": result["translation_task_id"]
                }
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "report": None
            }


# 主函数测试
if __name__ == "__main__":
    import asyncio
    
    sample_contract = """
    SERVICE AGREEMENT
    
    This Service Agreement ("Agreement") is entered into as of January 1, 2025,
    by and between TechCorp Inc. ("Service Provider") and ClientCo Ltd. ("Client").
    
    1. SERVICES: Service Provider agrees to provide software development services...
    2. DATA PROTECTION: All client data shall be processed in accordance with GDPR...
    3. INTELLECTUAL PROPERTY: All work product created hereunder shall be...
    """
    
    result = asyncio.run(analyze_contract_with_collaboration(sample_contract))
    print(result["report"])

40.7 A2A vs MCP 深度对比

维度 MCP A2A
主导方 Anthropic Google(多方联合)
设计目标 Agent 调用外部工具 Agent 委托其他 Agent
执行时间 秒级(同步为主) 分钟到小时(异步为主)
状态管理 无状态(单次调用) 有状态(Task 生命周期)
多轮交互 不支持 支持(/tasks/{id}/messages)
流式输出 有限支持 原生 SSE 支持
能力声明 Tool/Resource/Prompt Agent Card + Skill
认证 协议层无规定 内置 Bearer/OAuth2
互补性 工具层 Agent 委托层

本章小结

A2A 协议填补了多 Agent 协作的空白,与 MCP 共同构成完整的 Agent 能力调用体系:

  1. 设计背景:MCP 解决工具调用,A2A 解决 Agent 委托——两者定位不同,互为补充
  2. 核心概念:Task(工作单元)、Artifact(输出产物)、Message(通信消息)三位一体
  3. Agent Card/.well-known/agent.json 是 Agent 能力的"名片",支持自动发现
  4. 异步为主:A2A 原生支持长时间任务、流式进度、Webhook 回调
  5. Hermes 支持:1.7.x 版本已完整支持 A2A Client 和 Server 两种角色
  6. 实战:两个 Hermes 实例并行协作,完成法律分析+翻译的复杂任务

思考题

  1. A2A 的 Agent Card 目前是静态声明的(/.well-known/agent.json)。如果 Agent 的能力会随时间动态变化(例如根据负载动态关闭某些 Skill),Agent Card 应该如何设计?
  2. 在 A2A 多跳委托(A→B→C→D)的场景中,如何追踪完整的调用链?如何实现分布式 Tracing?
  3. 如果 Agent B 在处理委托任务时需要反向询问 Agent A 一些问题(澄清需求),A2A 的多轮对话机制是否够用?有什么局限性?
  4. 设计一个"A2A 任务市场":Agent 可以发布需要其他 Agent 帮助的任务,其他 Agent 可以投标承接。这需要在 A2A 协议基础上增加哪些机制?
本章评分
4.5  / 5  (3 评分)

💬 留言讨论