第 50 章

构建 AI Agent:Tool Use 与多轮对话

构建 AI Agent:Tool Use 与多轮对话

L1:概念层——AI Agent 是什么,为什么 Tool Use 是关键

从自动化脚本到自主 Agent

在软件工程的历史上,自动化一直是提升效率的核心手段。但传统自动化有一个根本局限:它只能执行预先编写好的固定逻辑。当外部环境发生变化,或者需要处理未曾预料的情况时,传统脚本会失败或产生错误结果。

AI Agent 代表了一种范式转变。与固定脚本不同,Agent 能够:

  1. 感知(Perception):接收来自外部世界的输入——用户消息、文件内容、API 返回值、传感器数据
  2. 推理(Reasoning):基于感知到的信息,结合内部知识,制定行动计划
  3. 行动(Action):执行工具调用、写入文件、发送请求、与外部系统交互
  4. 反馈(Feedback):观察行动结果,将结果纳入下一轮推理

这个"感知→推理→行动→反馈"的循环,就是 Agent 的本质。与人类工程师处理复杂任务的方式惊人地相似:你拿到一个需求,思考怎么做,动手实现,看看结果,再调整。

为什么需要 Tool Use

大型语言模型(LLM)本身是一个强大的推理引擎,但它有几个内在局限:

知识截止日期:Claude 的训练数据有截止日期,它不知道今天发生了什么新闻,也无法告诉你最新的股票价格。

无法执行副作用:LLM 只能生成文本。它无法真正发送一封邮件,也无法读取你硬盘上的文件,更无法调用你公司内部的 API。

计算能力有限:对于精确的数学计算,比如计算 1234567 × 9876543,LLM 可能会犯错。它擅长推理,而不是精确计算。

Tool Use(工具调用) 正是解决这些问题的技术。通过工具调用,LLM 可以:

工具调用将 LLM 从一个"只会说话的顾问"变成了一个"能够动手实干的工程师"。这个改变是质变,而非量变。

Agent 的应用场景

理解了 Agent 的本质,我们来看一些典型的应用场景:

研究助手:给定一个研究课题,Agent 自动搜索相关论文、阅读关键文章、整合信息、生成结构化报告。这个过程可能需要数十次工具调用。

代码助手:理解代码库结构,定位 bug,编写修复代码,运行测试,直到测试通过。每一步都需要 Agent 自主决策。

数据分析 Agent:接受自然语言描述的数据分析需求,自动编写 SQL 查询、调用数据库、可视化结果、撰写分析报告。

客服 Agent:理解客户问题,查询订单系统,处理退款请求,发送确认邮件——整个流程无需人工介入。


L2:原理层——Anthropic Tool Use 协议与对话管理

Tool Use 协议详解

Anthropic 的 Tool Use 协议是一套精心设计的 JSON 规范,定义了 LLM 与外部工具之间的交互方式。理解这个协议的每一个细节,是构建健壮 Agent 的基础。

工具定义(Tool Definition)

每个工具由三个核心字段定义:

{
  "name": "search_web",
  "description": "Search the web for current information. Use this when you need up-to-date information that may not be in your training data. Returns a list of search results with titles, URLs, and snippets.",
  "input_schema": {
    "type": "object",
    "properties": {
      "query": {
        "type": "string",
        "description": "The search query to execute"
      },
      "num_results": {
        "type": "integer",
        "description": "Number of results to return (default: 5, max: 20)",
        "default": 5
      }
    },
    "required": ["query"]
  }
}

name 字段是工具的唯一标识符。命名应该清晰、描述性强,且对 LLM 友好。避免使用缩写或晦涩的名称。

description 字段至关重要,它直接影响 LLM 何时以及如何使用这个工具。描述应该:

input_schema 是标准的 JSON Schema,定义工具接受的参数。字段的 description 同样重要——LLM 依赖这些描述来正确填写参数。

Tool Use 内容块(tool_use Content Block)

当 Claude 决定调用某个工具时,它会在响应中返回一个 tool_use 类型的内容块:

{
  "type": "tool_use",
  "id": "toolu_01A09q90qw90lq917835lq9",
  "name": "search_web",
  "input": {
    "query": "Go programming language generics tutorial 2024",
    "num_results": 5
  }
}

id 字段是这次工具调用的唯一标识符,在返回工具结果时需要用到它。一次响应中可能包含多个 tool_use 块,代表并行工具调用——这是一个重要的性能优化点。

Tool Result 响应

执行完工具后,你需要将结果以特定格式返回给 Claude:

{
  "role": "user",
  "content": [
    {
      "type": "tool_result",
      "tool_use_id": "toolu_01A09q90qw90lq917835lq9",
      "content": "Search results:\n1. Go Generics Tutorial - go.dev/doc/tutorial/generics\n..."
    }
  ]
}

注意:tool_result 是作为 user 角色发送的,因为它是从"外部世界"返回给 Claude 的信息。

错误处理

工具执行失败时,应该返回带有 is_error: true 的结果:

{
  "type": "tool_result",
  "tool_use_id": "toolu_01A09q90qw90lq917835lq9",
  "is_error": true,
  "content": "Error: Network timeout after 30 seconds"
}

Claude 会根据错误信息决定如何继续——可能重试,可能尝试其他方法,可能向用户报告错误。

多轮对话管理

Agent 的核心是维护对话历史(conversation history)。每一轮交互都需要将完整的历史发送给 Claude,这样它才能理解当前的上下文。

对话历史的结构:

[系统提示]
  |
[用户消息 1]
[Claude 响应 1](可能包含 tool_use)
[工具结果 1](作为 user 消息)
[Claude 响应 2](可能包含更多 tool_use 或最终答案)
[工具结果 2]
...
[Claude 最终响应]

这个结构有几个重要的约束:

  1. 消息必须严格交替:user → assistant → user → assistant...
  2. 如果 assistant 消息包含 tool_use,下一条 user 消息必须包含对应的 tool_result
  3. 可以在一条 user 消息中包含多个 tool_result(对应并行调用)

Agent 主循环设计

Agent 的主循环是整个系统的核心控制流:

初始化(设置系统提示、工具列表)
↓
接收用户输入
↓
[主循环开始]
发送请求到 Claude API
↓
接收响应
↓
如果响应包含 tool_use:
  执行所有工具(可并行)
  将结果添加到历史
  继续主循环
↓
如果响应是 end_turn(最终答案):
  返回答案给用户
  等待下一个用户输入
[主循环结束]

循环终止条件:


L3:代码实践——构建完整 Go Agent

项目结构

agent/
├── main.go          # 程序入口
├── agent.go         # Agent 核心逻辑
├── tools.go         # 工具定义与实现
├── client.go        # Anthropic API 客户端
└── history.go       # 对话历史管理

客户端封装

// client.go
package agent

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "os"
)

const (
    AnthropicAPIURL = "https://api.anthropic.com/v1/messages"
    DefaultModel    = "claude-opus-4-5"
    MaxTokens       = 4096
)

type Client struct {
    apiKey     string
    httpClient *http.Client
    model      string
}

func NewClient() *Client {
    return &Client{
        apiKey:     os.Getenv("ANTHROPIC_API_KEY"),
        httpClient: &http.Client{},
        model:      DefaultModel,
    }
}

// MessageRequest 代表发送给 Claude 的完整请求
type MessageRequest struct {
    Model     string      `json:"model"`
    MaxTokens int         `json:"max_tokens"`
    System    string      `json:"system,omitempty"`
    Messages  []Message   `json:"messages"`
    Tools     []ToolDef   `json:"tools,omitempty"`
}

// Message 代表对话中的一条消息
type Message struct {
    Role    string        `json:"role"`
    Content []ContentBlock `json:"content"`
}

// ContentBlock 可以是文本、工具调用或工具结果
type ContentBlock struct {
    Type       string          `json:"type"`
    Text       string          `json:"text,omitempty"`
    ID         string          `json:"id,omitempty"`
    Name       string          `json:"name,omitempty"`
    Input      json.RawMessage `json:"input,omitempty"`
    ToolUseID  string          `json:"tool_use_id,omitempty"`
    Content    string          `json:"content,omitempty"`
    IsError    bool            `json:"is_error,omitempty"`
}

// MessageResponse 是 Claude API 的响应
type MessageResponse struct {
    ID           string         `json:"id"`
    Type         string         `json:"type"`
    Role         string         `json:"role"`
    Content      []ContentBlock `json:"content"`
    Model        string         `json:"model"`
    StopReason   string         `json:"stop_reason"`
    Usage        Usage          `json:"usage"`
}

type Usage struct {
    InputTokens  int `json:"input_tokens"`
    OutputTokens int `json:"output_tokens"`
}

// ToolDef 定义一个工具
type ToolDef struct {
    Name        string          `json:"name"`
    Description string          `json:"description"`
    InputSchema json.RawMessage `json:"input_schema"`
}

func (c *Client) SendMessage(ctx context.Context, req MessageRequest) (*MessageResponse, error) {
    body, err := json.Marshal(req)
    if err != nil {
        return nil, fmt.Errorf("marshal request: %w", err)
    }

    httpReq, err := http.NewRequestWithContext(ctx, "POST", AnthropicAPIURL, bytes.NewReader(body))
    if err != nil {
        return nil, fmt.Errorf("create request: %w", err)
    }

    httpReq.Header.Set("Content-Type", "application/json")
    httpReq.Header.Set("x-api-key", c.apiKey)
    httpReq.Header.Set("anthropic-version", "2023-06-01")

    resp, err := c.httpClient.Do(httpReq)
    if err != nil {
        return nil, fmt.Errorf("send request: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        var errResp struct {
            Error struct {
                Message string `json:"message"`
            } `json:"error"`
        }
        json.NewDecoder(resp.Body).Decode(&errResp)
        return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, errResp.Error.Message)
    }

    var msgResp MessageResponse
    if err := json.NewDecoder(resp.Body).Decode(&msgResp); err != nil {
        return nil, fmt.Errorf("decode response: %w", err)
    }

    return &msgResp, nil
}

工具定义与实现

// tools.go
package agent

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "os"
    "strings"
    "time"
)

// ToolHandler 是工具执行函数的类型
type ToolHandler func(ctx context.Context, input json.RawMessage) (string, error)

// ToolRegistry 管理所有工具
type ToolRegistry struct {
    defs     []ToolDef
    handlers map[string]ToolHandler
}

func NewToolRegistry() *ToolRegistry {
    return &ToolRegistry{
        handlers: make(map[string]ToolHandler),
    }
}

func (r *ToolRegistry) Register(def ToolDef, handler ToolHandler) {
    r.defs = append(r.defs, def)
    r.handlers[def.Name] = handler
}

func (r *ToolRegistry) Execute(ctx context.Context, name string, input json.RawMessage) (string, error) {
    handler, ok := r.handlers[name]
    if !ok {
        return "", fmt.Errorf("unknown tool: %s", name)
    }
    return handler(ctx, input)
}

func (r *ToolRegistry) Definitions() []ToolDef {
    return r.defs
}

// --- 工具实现 ---

// SearchWebInput 是 search_web 工具的输入
type SearchWebInput struct {
    Query      string `json:"query"`
    NumResults int    `json:"num_results"`
}

// SearchWebHandler 实现网络搜索(使用 DuckDuckGo Instant Answer API)
func SearchWebHandler(ctx context.Context, input json.RawMessage) (string, error) {
    var req SearchWebInput
    if err := json.Unmarshal(input, &req); err != nil {
        return "", fmt.Errorf("invalid input: %w", err)
    }
    if req.NumResults == 0 {
        req.NumResults = 5
    }

    // 使用 DuckDuckGo Instant Answer API(免费,无需 API key)
    apiURL := fmt.Sprintf("https://api.duckduckgo.com/?q=%s&format=json&no_html=1&skip_disambig=1",
        url.QueryEscape(req.Query))

    httpReq, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
    if err != nil {
        return "", err
    }
    httpReq.Header.Set("User-Agent", "GoAgent/1.0")

    client := &http.Client{Timeout: 10 * time.Second}
    resp, err := client.Do(httpReq)
    if err != nil {
        return "", fmt.Errorf("search request failed: %w", err)
    }
    defer resp.Body.Close()

    var result struct {
        Abstract       string `json:"Abstract"`
        AbstractSource string `json:"AbstractSource"`
        AbstractURL    string `json:"AbstractURL"`
        RelatedTopics  []struct {
            Text     string `json:"Text"`
            FirstURL string `json:"FirstURL"`
        } `json:"RelatedTopics"`
    }

    if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
        return "", fmt.Errorf("decode response: %w", err)
    }

    var sb strings.Builder
    sb.WriteString(fmt.Sprintf("Search results for: %s\n\n", req.Query))

    if result.Abstract != "" {
        sb.WriteString(fmt.Sprintf("Summary: %s\nSource: %s\nURL: %s\n\n",
            result.Abstract, result.AbstractSource, result.AbstractURL))
    }

    count := 0
    for _, topic := range result.RelatedTopics {
        if count >= req.NumResults {
            break
        }
        if topic.Text != "" {
            sb.WriteString(fmt.Sprintf("%d. %s\n   URL: %s\n\n", count+1, topic.Text, topic.FirstURL))
            count++
        }
    }

    if sb.Len() == 0 {
        return fmt.Sprintf("No results found for query: %s", req.Query), nil
    }

    return sb.String(), nil
}

// ReadFileInput 是 read_file 工具的输入
type ReadFileInput struct {
    Path      string `json:"path"`
    MaxLines  int    `json:"max_lines"`
}

// ReadFileHandler 读取本地文件
func ReadFileHandler(ctx context.Context, input json.RawMessage) (string, error) {
    var req ReadFileInput
    if err := json.Unmarshal(input, &req); err != nil {
        return "", fmt.Errorf("invalid input: %w", err)
    }
    if req.MaxLines == 0 {
        req.MaxLines = 500
    }

    // 安全检查:防止路径遍历
    if strings.Contains(req.Path, "..") {
        return "", fmt.Errorf("path traversal not allowed")
    }

    f, err := os.Open(req.Path)
    if err != nil {
        return "", fmt.Errorf("open file: %w", err)
    }
    defer f.Close()

    content, err := io.ReadAll(io.LimitReader(f, 1<<20)) // 最多读 1MB
    if err != nil {
        return "", fmt.Errorf("read file: %w", err)
    }

    lines := strings.Split(string(content), "\n")
    if len(lines) > req.MaxLines {
        lines = lines[:req.MaxLines]
        lines = append(lines, fmt.Sprintf("\n... (truncated, showing first %d lines)", req.MaxLines))
    }

    return strings.Join(lines, "\n"), nil
}

// ExecuteCodeInput 是 execute_code 工具的输入
type ExecuteCodeInput struct {
    Language string `json:"language"`
    Code     string `json:"code"`
}

// ExecuteCodeHandler 在沙盒中执行代码(此处为简化示例)
func ExecuteCodeHandler(ctx context.Context, input json.RawMessage) (string, error) {
    var req ExecuteCodeInput
    if err := json.Unmarshal(input, &req); err != nil {
        return "", fmt.Errorf("invalid input: %w", err)
    }

    // 实际生产环境应使用 Docker 沙盒或 gVisor
    // 这里返回模拟结果用于演示
    return fmt.Sprintf("[Code execution sandbox]\nLanguage: %s\nCode length: %d chars\n"+
        "Note: In production, this would execute in an isolated container.",
        req.Language, len(req.Code)), nil
}

// SendEmailInput 是 send_email 工具的输入
type SendEmailInput struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

// SendEmailHandler 发送邮件
func SendEmailHandler(ctx context.Context, input json.RawMessage) (string, error) {
    var req SendEmailInput
    if err := json.Unmarshal(input, &req); err != nil {
        return "", fmt.Errorf("invalid input: %w", err)
    }

    // 在实际应用中,这里会调用 SMTP 或邮件服务 API
    return fmt.Sprintf("Email sent successfully to %s with subject: %s", req.To, req.Subject), nil
}

// BuildDefaultRegistry 创建包含默认工具的注册表
func BuildDefaultRegistry() *ToolRegistry {
    registry := NewToolRegistry()

    registry.Register(ToolDef{
        Name:        "search_web",
        Description: "Search the web for current information. Use when you need up-to-date facts, news, or technical documentation not in your training data.",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "The search query"},
                "num_results": {"type": "integer", "description": "Number of results (default: 5, max: 20)"}
            },
            "required": ["query"]
        }`),
    }, SearchWebHandler)

    registry.Register(ToolDef{
        Name:        "read_file",
        Description: "Read the contents of a local file. Use to access code files, configuration, or documents on disk.",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "path": {"type": "string", "description": "Absolute path to the file"},
                "max_lines": {"type": "integer", "description": "Maximum number of lines to read (default: 500)"}
            },
            "required": ["path"]
        }`),
    }, ReadFileHandler)

    registry.Register(ToolDef{
        Name:        "execute_code",
        Description: "Execute code in a sandboxed environment and return the output. Supports Python, Go, JavaScript.",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "language": {"type": "string", "enum": ["python", "go", "javascript"]},
                "code": {"type": "string", "description": "The code to execute"}
            },
            "required": ["language", "code"]
        }`),
    }, ExecuteCodeHandler)

    registry.Register(ToolDef{
        Name:        "send_email",
        Description: "Send an email to a recipient. Use when the user asks to send an email or when a report needs to be delivered.",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "to": {"type": "string", "description": "Recipient email address"},
                "subject": {"type": "string", "description": "Email subject"},
                "body": {"type": "string", "description": "Email body (supports markdown)"}
            },
            "required": ["to", "subject", "body"]
        }`),
    }, SendEmailHandler)

    return registry
}

Agent 核心逻辑

// agent.go
package agent

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
)

const (
    MaxIterations = 20
    SystemPrompt  = `You are a helpful research assistant with access to tools for searching the web, reading files, executing code, and sending emails.

When given a task:
1. Break it down into steps
2. Use tools to gather information and take actions
3. Synthesize results into a clear, well-structured response
4. If asked to write a report, use markdown formatting

Always explain what you're doing and why before using a tool.`
)

// Agent 封装了完整的 Agent 逻辑
type Agent struct {
    client   *Client
    registry *ToolRegistry
    history  []Message
    mu       sync.Mutex

    // 统计信息
    totalInputTokens  int
    totalOutputTokens int
    iterationCount    int
}

func NewAgent(client *Client, registry *ToolRegistry) *Agent {
    return &Agent{
        client:   client,
        registry: registry,
    }
}

// Run 处理一次用户请求,返回最终响应
func (a *Agent) Run(ctx context.Context, userMessage string) (string, error) {
    a.mu.Lock()
    defer a.mu.Unlock()

    // 将用户消息加入历史
    a.history = append(a.history, Message{
        Role: "user",
        Content: []ContentBlock{
            {Type: "text", Text: userMessage},
        },
    })

    // 主循环
    for iteration := 0; iteration < MaxIterations; iteration++ {
        a.iterationCount++

        // 发送请求
        req := MessageRequest{
            Model:     DefaultModel,
            MaxTokens: MaxTokens,
            System:    SystemPrompt,
            Messages:  a.history,
            Tools:     a.registry.Definitions(),
        }

        resp, err := a.client.SendMessage(ctx, req)
        if err != nil {
            return "", fmt.Errorf("iteration %d: %w", iteration, err)
        }

        // 更新 token 统计
        a.totalInputTokens += resp.Usage.InputTokens
        a.totalOutputTokens += resp.Usage.OutputTokens

        // 将 Claude 的响应加入历史
        a.history = append(a.history, Message{
            Role:    "assistant",
            Content: resp.Content,
        })

        // 如果没有工具调用,返回最终答案
        if resp.StopReason == "end_turn" || !hasToolUse(resp.Content) {
            return extractText(resp.Content), nil
        }

        // 执行所有工具调用(并行)
        toolResults, err := a.executeTools(ctx, resp.Content)
        if err != nil {
            return "", fmt.Errorf("execute tools: %w", err)
        }

        // 将工具结果作为 user 消息加入历史
        a.history = append(a.history, Message{
            Role:    "user",
            Content: toolResults,
        })
    }

    return "", fmt.Errorf("exceeded maximum iterations (%d)", MaxIterations)
}

// executeTools 并行执行所有工具调用
func (a *Agent) executeTools(ctx context.Context, blocks []ContentBlock) ([]ContentBlock, error) {
    var toolUses []ContentBlock
    for _, block := range blocks {
        if block.Type == "tool_use" {
            toolUses = append(toolUses, block)
        }
    }

    results := make([]ContentBlock, len(toolUses))
    var wg sync.WaitGroup
    var mu sync.Mutex
    var firstErr error

    for i, toolUse := range toolUses {
        wg.Add(1)
        go func(idx int, tu ContentBlock) {
            defer wg.Done()

            output, err := a.registry.Execute(ctx, tu.Name, tu.Input)

            result := ContentBlock{
                Type:      "tool_result",
                ToolUseID: tu.ID,
            }

            if err != nil {
                result.IsError = true
                result.Content = fmt.Sprintf("Error: %s", err.Error())
                mu.Lock()
                if firstErr == nil {
                    firstErr = err
                }
                mu.Unlock()
            } else {
                result.Content = output
            }

            results[idx] = result
        }(i, toolUse)
    }

    wg.Wait()
    return results, nil
}

// Stats 返回 Agent 的统计信息
func (a *Agent) Stats() map[string]int {
    return map[string]int{
        "total_input_tokens":  a.totalInputTokens,
        "total_output_tokens": a.totalOutputTokens,
        "iterations":          a.iterationCount,
        "history_length":      len(a.history),
    }
}

// Reset 清除对话历史(开始新对话)
func (a *Agent) Reset() {
    a.mu.Lock()
    defer a.mu.Unlock()
    a.history = nil
    a.totalInputTokens = 0
    a.totalOutputTokens = 0
    a.iterationCount = 0
}

func hasToolUse(blocks []ContentBlock) bool {
    for _, b := range blocks {
        if b.Type == "tool_use" {
            return true
        }
    }
    return false
}

func extractText(blocks []ContentBlock) string {
    var parts []string
    for _, b := range blocks {
        if b.Type == "text" && b.Text != "" {
            parts = append(parts, b.Text)
        }
    }
    return joinStrings(parts, "\n")
}

func joinStrings(parts []string, sep string) string {
    result := ""
    for i, p := range parts {
        if i > 0 {
            result += sep
        }
        result += p
    }
    return result
}

主程序:研究 Agent 示例

// main.go
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/yourorg/agent"
)

func main() {
    client := agent.NewClient()
    registry := agent.BuildDefaultRegistry()
    a := agent.NewAgent(client, registry)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()

    // 研究任务:搜索信息并生成报告
    task := `Please research the current state of Go generics (as of 2024) and write a comprehensive report covering:
1. Key features and syntax
2. Common use cases and patterns
3. Performance implications
4. Community adoption status

Search the web for recent information, then compile a structured markdown report.
Finally, send the report to [email protected] with subject "Go Generics Report 2024".`

    fmt.Println("Starting research agent...")
    fmt.Println("Task:", task)
    fmt.Println(strings.Repeat("-", 60))

    result, err := a.Run(ctx, task)
    if err != nil {
        log.Fatalf("Agent error: %v", err)
    }

    fmt.Println("\nFinal Result:")
    fmt.Println(result)

    stats := a.Stats()
    fmt.Printf("\nStats: %d iterations, %d input tokens, %d output tokens\n",
        stats["iterations"], stats["total_input_tokens"], stats["total_output_tokens"])
}

L4:进阶——多 Agent 编排、记忆与评估框架

多 Agent 编排(Subagent Delegation)

单个 Agent 的能力是有限的。对于复杂任务,我们可以将工作分解给多个专门化的子 Agent:

// orchestrator.go - 多 Agent 编排器

type OrchestratorAgent struct {
    *Agent
    subagents map[string]*Agent
}

// DelegateToSubagent 将任务委派给子 Agent
func (o *OrchestratorAgent) DelegateToSubagent(ctx context.Context, name, task string) (string, error) {
    subagent, ok := o.subagents[name]
    if !ok {
        return "", fmt.Errorf("subagent %s not found", name)
    }
    // 子 Agent 有自己独立的对话历史和工具集
    subagent.Reset()
    return subagent.Run(ctx, task)
}

// 编排器可以定义一个 delegate_task 工具
func buildDelegateTool(orchestrator *OrchestratorAgent) ToolHandler {
    return func(ctx context.Context, input json.RawMessage) (string, error) {
        var req struct {
            AgentName string `json:"agent_name"`
            Task      string `json:"task"`
        }
        if err := json.Unmarshal(input, &req); err != nil {
            return "", err
        }
        return orchestrator.DelegateToSubagent(ctx, req.AgentName, req.Task)
    }
}

ReAct 模式(Reasoning + Acting)

ReAct 是一种让 Agent 在行动前先显式推理的模式,通过系统提示强制实现:

const ReActSystemPrompt = `You are a ReAct agent. For each step, follow this format:

Thought: [Analyze the current situation and decide what to do next]
Action: [The tool to call and why]
Observation: [What you learned from the tool result]

Repeat until you have enough information to provide a final answer.

Final Answer: [Your comprehensive response]`

这种模式使 Agent 的推理过程可见、可审计,也更容易调试。

Agent 记忆与向量搜索

长期运行的 Agent 需要记忆机制。结合向量数据库实现语义记忆:

type AgentMemory struct {
    embedder  EmbeddingClient
    store     VectorStore
    maxTokens int
}

func (m *AgentMemory) Store(ctx context.Context, content string) error {
    embedding, err := m.embedder.Embed(ctx, content)
    if err != nil {
        return err
    }
    return m.store.Insert(ctx, embedding, content)
}

func (m *AgentMemory) Recall(ctx context.Context, query string, k int) ([]string, error) {
    queryEmbedding, err := m.embedder.Embed(ctx, query)
    if err != nil {
        return nil, err
    }
    results, err := m.store.Search(ctx, queryEmbedding, k)
    if err != nil {
        return nil, err
    }
    var memories []string
    for _, r := range results {
        memories = append(memories, r.Content)
    }
    return memories, nil
}

// 在 Agent 的系统提示中注入相关记忆
func (a *Agent) buildSystemPromptWithMemory(ctx context.Context, query string) (string, error) {
    memories, err := a.memory.Recall(ctx, query, 5)
    if err != nil {
        return BaseSystemPrompt, nil // 记忆失败时降级
    }
    if len(memories) == 0 {
        return BaseSystemPrompt, nil
    }
    memoryContext := "Relevant memories from past conversations:\n"
    for _, m := range memories {
        memoryContext += "- " + m + "\n"
    }
    return BaseSystemPrompt + "\n\n" + memoryContext, nil
}

成本控制策略

在生产环境中,Agent 的成本控制至关重要:

type CostController struct {
    maxIterations   int
    maxInputTokens  int
    maxOutputTokens int
    // 根据任务复杂度选择模型
    modelSelector func(estimatedComplexity int) string
}

// 动态选择模型:简单任务用 Haiku,复杂任务用 Opus
func adaptiveModelSelector(estimatedComplexity int) string {
    switch {
    case estimatedComplexity < 3:
        return "claude-haiku-4-5" // 快速、便宜
    case estimatedComplexity < 7:
        return "claude-sonnet-4-5" // 平衡
    default:
        return "claude-opus-4-5" // 最强
    }
}

// 令牌预算跟踪
type TokenBudget struct {
    MaxTotal int
    Used     int
}

func (b *TokenBudget) Remaining() int {
    return b.MaxTotal - b.Used
}

func (b *TokenBudget) Consume(tokens int) error {
    if b.Used+tokens > b.MaxTotal {
        return fmt.Errorf("token budget exceeded: used %d, limit %d", b.Used+tokens, b.MaxTotal)
    }
    b.Used += tokens
    return nil
}

Agent 评估框架

Agent 的评估比单次 LLM 调用更复杂,需要考虑:

type AgentEvaluator struct {
    testCases []AgentTestCase
}

type AgentTestCase struct {
    Name           string
    UserMessage    string
    ExpectedTools  []string  // 期望调用的工具集合
    EvalFn         func(result string) bool  // 评估最终结果的函数
    MaxIterations  int
    MaxCost        float64
}

func (e *AgentEvaluator) RunBenchmark(ctx context.Context, agentFactory func() *Agent) BenchmarkResult {
    var results []TestResult
    for _, tc := range e.testCases {
        a := agentFactory()
        start := time.Now()
        result, err := a.Run(ctx, tc.UserMessage)
        duration := time.Since(start)

        stats := a.Stats()
        passed := err == nil && tc.EvalFn(result)

        results = append(results, TestResult{
            Name:       tc.Name,
            Passed:     passed,
            Duration:   duration,
            Iterations: stats["iterations"],
            Tokens:     stats["total_input_tokens"] + stats["total_output_tokens"],
        })
    }
    return BenchmarkResult{Tests: results}
}

沙盒代码执行

生产环境中的代码执行必须在隔离环境中进行:

// 使用 Docker 实现真正的沙盒执行
func DockerCodeExecutor(ctx context.Context, input json.RawMessage) (string, error) {
    var req ExecuteCodeInput
    if err := json.Unmarshal(input, &req); err != nil {
        return "", err
    }

    // Docker 命令:资源限制、网络隔离、只读文件系统
    cmd := exec.CommandContext(ctx, "docker", "run",
        "--rm",                    // 执行后删除容器
        "--network=none",          // 无网络访问
        "--memory=256m",           // 内存限制
        "--cpus=0.5",              // CPU 限制
        "--read-only",             // 只读文件系统
        "--tmpfs=/tmp:size=64m",   // 临时目录
        "-i",
        fmt.Sprintf("code-sandbox-%s:latest", req.Language),
    )

    cmd.Stdin = strings.NewReader(req.Code)
    output, err := cmd.CombinedOutput()
    if err != nil {
        return "", fmt.Errorf("execution failed: %s\nOutput: %s", err, output)
    }

    // 截断过长的输出
    if len(output) > 10000 {
        output = append(output[:10000], []byte("\n... (output truncated)")...)
    }

    return string(output), nil
}

小结

构建一个生产级别的 Go AI Agent 需要深入理解:

  1. 协议层:Anthropic Tool Use 的 JSON 规范,消息结构,stop_reason 处理
  2. 并发执行:利用 Go 的 goroutine 并行执行多个工具调用,大幅降低延迟
  3. 历史管理:正确维护对话历史,处理 token 限制,实现摘要压缩
  4. 安全性:工具执行的沙盒隔离,路径遍历防护,资源限制
  5. 可观测性:迭代次数、token 消耗、工具调用日志、错误追踪
  6. 成本控制:动态模型选择、token 预算、最大迭代限制

Agent 技术仍在快速演进,但上述核心原则在未来相当长一段时间内都将保持有效。掌握这些原则,你就掌握了构建任意复杂 AI 应用的能力。

本章评分
4.6  / 5  (3 评分)

💬 留言讨论