第 49 章

调用 LLM API:流式响应与错误处理

调用 LLM API:流式响应与错误处理

2023 年 3 月,ChatGPT 的 API 向公众开放。三个月内,数以万计的开发者开始构建 LLM 驱动的应用。大多数人的第一个版本都有同一个问题:用户点击"生成",然后等待——8 秒、15 秒、有时 30 秒——白屏,然后文本突然全部出现。这种体验是反直觉的,因为 LLM 本身是逐 token 生成输出的。真正的问题是:开发者在等待完整响应后才发送给客户端

Go 是解决这个问题的理想语言。net/httphttp.Flusher 接口、bufio.Scanner 的流式读取、goroutine 的轻量并发——这些工具的组合,让 Go 能够以极低的延迟将 LLM 的每一个 token 实时转发给浏览器,同时优雅地处理超时、重试和并发控制。

本章将构建一个生产级的 LLM 客户端库,涵盖从 Anthropic Claude API 的基本调用到多模型抽象的完整体系。

Level 1 · 你需要知道的

LLM API 生态全景

当前主流的 LLM API 提供商及其 Go 生态:

Anthropic Claude:由前 OpenAI 研究员创立,以安全性和长上下文著称。Claude 3.5 Sonnet 在编码、分析和写作方面表现突出。API 基于 Messages 格式,支持 System 提示词、多轮对话、工具调用(Tool Use)和流式输出。

OpenAI GPT:最广泛使用的 LLM API,GPT-4o 是其旗舰模型。有官方的 Go SDK(github.com/openai/openai-go)。Chat Completions API 与 Claude 的设计思路相近,但细节不同。

Google Gemini:Google 的多模态模型,支持文本、图像、视频、音频输入。通过 google.golang.org/genai 访问。

本地模型(Ollama)ollama.com 让你在本地运行 Llama 3、Mistral 等开源模型。Ollama 提供 OpenAI 兼容 API,Go 代码几乎不需要改动。

为什么 Go 特别适合 LLM 服务

构建 LLM 驱动的服务面临独特的挑战,而 Go 的设计恰好是这些挑战的对立面:

高延迟与并发需求:LLM 调用通常需要 3-30 秒,远比普通 API 调用慢。在高流量下,大量请求会同时处于"等待 LLM 响应"状态。Go 的 goroutine(每个约 8KB 初始栈)让你能同时维持数万个并发 LLM 请求,而线程模型的语言在此场景下内存消耗会爆炸。

流式 I/O 处理:LLM 流式响应本质上是一个持续的字节流。Go 的 io.Reader 接口和 bufio 包是处理这类流的标准工具,标准库层面的支持意味着不需要引入额外依赖。

单二进制部署:LLM 服务需要部署到多个区域(降低延迟)、快速扩缩容(响应流量峰值)。Go 的单二进制部署在容器化和 Kubernetes 场景下优势极大。

context 传播:当用户关闭浏览器标签时,理应立即取消对 LLM API 的请求(节省 Token 费用)。Go 的 context.Context 机制让请求取消从浏览器一路传播到 LLM API 调用,链路清晰。

Token 经济学:理解成本

LLM API 按 Token 计费。一个 Token 大约是 3-4 个英文字符,或 1.5-2 个中文字符。

Claude 3.5 Sonnet(2024 年价格,仅供参考):
- 输入:$3 / 百万 token
- 输出:$15 / 百万 token

一个典型的"总结文章"请求:
- 文章内容:2000 token(输入)
- System 提示词:200 token(输入)
- 摘要输出:300 token(输出)
- 费用:(2200 × $3 + 300 × $15) / 1,000,000 ≈ $0.011

在服务 B2C 应用时,Token 成本管理至关重要:


Level 2 · 原理与机制

Anthropic Claude API:消息格式

Claude API 使用 Messages 格式,支持多轮对话:

POST https://api.anthropic.com/v1/messages
{
    "model": "claude-opus-4-5",
    "max_tokens": 2048,
    "temperature": 0.7,
    "system": "你是一个专业的 Go 语言教师,用简洁、准确的语言解答问题。",
    "messages": [
        {"role": "user", "content": "解释 Go 中的 channel 和 mutex 的使用场景区别"},
        {"role": "assistant", "content": "Channel 和 Mutex 都用于处理并发..."},
        {"role": "user", "content": "能给我一个具体的代码例子吗?"}
    ]
}

参数说明:

与 OpenAI 的差异:

SSE 流式响应:底层机制

Server-Sent Events(SSE)是 HTTP 长连接的一种简单协议。服务器保持连接打开,持续发送格式化的文本数据:

data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}

data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"!"}}

data: [DONE]

SSE 格式规范:

在 HTTP 层面,服务端必须设置:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

客户端(包括 Go 的 net/http)收到这个响应后会保持连接,持续读取数据,直到连接关闭或读到流结束标记。

指数退避重试

LLM API 会遇到多种瞬时错误:

指数退避(Exponential Backoff) 是处理这类错误的标准策略:每次重试等待时间加倍,防止大量重试请求同时打到服务端(惊群效应):

第1次重试:等待 1s
第2次重试:等待 2s
第3次重试:等待 4s
第4次重试:等待 8s
...加上随机 jitter(±20%),防止多个实例同步重试

重试时必须区分:

并发控制:信号量模式

当多个 goroutine 并发调用 LLM API 时,需要限制最大并发数(否则可能触发速率限制)。Go 中最惯用的并发控制方式是带缓冲 channel 作信号量

// 容量为 10 的 channel 允许最多 10 个并发请求
sem := make(chan struct{}, 10)

// 获取信号量(如果已满则阻塞等待)
sem <- struct{}{}

// 执行操作
doWork()

// 释放信号量
<-sem

这个模式简洁且不需要额外依赖。在 LLM 客户端中,信号量通常设置为 API 提供商的并发限制(如 Anthropic 的默认限制是每个 API Key 5 个并发请求)。


Level 3 · 代码实践

完整的 Anthropic Claude 客户端

package llm

import (
    "bufio"
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "math"
    "math/rand"
    "net/http"
    "strings"
    "time"
)

const anthropicAPIURL = "https://api.anthropic.com/v1/messages"
const anthropicVersion = "2023-06-01"

// Message 代表对话中的一条消息
type Message struct {
    Role    string `json:"role"`    // "user" 或 "assistant"
    Content string `json:"content"`
}

// Request 是发送给 Claude API 的请求体
type Request struct {
    Model     string    `json:"model"`
    MaxTokens int       `json:"max_tokens"`
    System    string    `json:"system,omitempty"`
    Messages  []Message `json:"messages"`
    Stream    bool      `json:"stream"`
    Temperature float64 `json:"temperature,omitempty"`
}

// Usage 记录 token 用量
type Usage struct {
    InputTokens  int `json:"input_tokens"`
    OutputTokens int `json:"output_tokens"`
}

// Response 是非流式响应的结构
type Response struct {
    ID      string `json:"id"`
    Content []struct {
        Type string `json:"type"`
        Text string `json:"text"`
    } `json:"content"`
    Usage Usage `json:"usage"`
    Model string `json:"model"`
}

// StreamEvent 代表流式响应中的一个事件
type StreamEvent struct {
    Type  string `json:"type"`
    Delta *struct {
        Type string `json:"type"`
        Text string `json:"text"`
    } `json:"delta,omitempty"`
    Usage *Usage `json:"usage,omitempty"`
    Error *struct {
        Type    string `json:"type"`
        Message string `json:"message"`
    } `json:"error,omitempty"`
}

// Client 是 Anthropic API 客户端
type Client struct {
    apiKey     string
    httpClient *http.Client
    semaphore  chan struct{} // 并发控制信号量
    maxRetries int
}

// NewClient 创建一个新的 Client
func NewClient(apiKey string, maxConcurrency, maxRetries int) *Client {
    return &Client{
        apiKey: apiKey,
        httpClient: &http.Client{
            Timeout: 5 * time.Minute, // LLM 请求可能很慢
        },
        semaphore:  make(chan struct{}, maxConcurrency),
        maxRetries: maxRetries,
    }
}

// Complete 发送非流式请求,返回完整响应
func (c *Client) Complete(ctx context.Context, req Request) (*Response, error) {
    req.Stream = false

    // 获取信号量
    select {
    case c.semaphore <- struct{}{}:
        defer func() { <-c.semaphore }()
    case <-ctx.Done():
        return nil, ctx.Err()
    }

    var resp *Response
    err := c.withRetry(ctx, func() error {
        var err error
        resp, err = c.doComplete(ctx, req)
        return err
    })
    return resp, err
}

func (c *Client) doComplete(ctx context.Context, req Request) (*Response, error) {
    body, err := json.Marshal(req)
    if err != nil {
        return nil, fmt.Errorf("marshal request: %w", err)
    }

    httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
    if err != nil {
        return nil, fmt.Errorf("create request: %w", err)
    }

    c.setHeaders(httpReq)

    httpResp, err := c.httpClient.Do(httpReq)
    if err != nil {
        return nil, fmt.Errorf("send request: %w", err)
    }
    defer httpResp.Body.Close()

    if err := c.checkStatusCode(httpResp); err != nil {
        return nil, err
    }

    var resp Response
    if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
        return nil, fmt.Errorf("decode response: %w", err)
    }
    return &resp, nil
}

// StreamComplete 发送流式请求,通过 channel 返回每个文本片段
func (c *Client) StreamComplete(ctx context.Context, req Request) (<-chan string, <-chan error) {
    textCh := make(chan string, 100)   // 缓冲,防止背压
    errCh := make(chan error, 1)

    req.Stream = true

    go func() {
        defer close(textCh)
        defer close(errCh)

        // 获取信号量
        select {
        case c.semaphore <- struct{}{}:
            defer func() { <-c.semaphore }()
        case <-ctx.Done():
            errCh <- ctx.Err()
            return
        }

        err := c.withRetry(ctx, func() error {
            return c.doStream(ctx, req, textCh)
        })
        if err != nil {
            errCh <- err
        }
    }()

    return textCh, errCh
}

func (c *Client) doStream(ctx context.Context, req Request, textCh chan<- string) error {
    body, err := json.Marshal(req)
    if err != nil {
        return fmt.Errorf("marshal request: %w", err)
    }

    httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
    if err != nil {
        return fmt.Errorf("create request: %w", err)
    }

    c.setHeaders(httpReq)

    httpResp, err := c.httpClient.Do(httpReq)
    if err != nil {
        return fmt.Errorf("send request: %w", err)
    }
    defer httpResp.Body.Close()

    if err := c.checkStatusCode(httpResp); err != nil {
        return err
    }

    // 逐行解析 SSE 流
    scanner := bufio.NewScanner(httpResp.Body)
    // 增大缓冲区,防止长行截断
    scanner.Buffer(make([]byte, 64*1024), 64*1024)

    for scanner.Scan() {
        line := scanner.Text()

        // SSE 格式:以 "data: " 开头
        if !strings.HasPrefix(line, "data: ") {
            continue
        }

        data := strings.TrimPrefix(line, "data: ")

        // 流结束标记
        if data == "[DONE]" {
            return nil
        }

        var event StreamEvent
        if err := json.Unmarshal([]byte(data), &event); err != nil {
            // 某些行可能是心跳或其他格式,跳过
            continue
        }

        // 处理错误事件
        if event.Type == "error" && event.Error != nil {
            return fmt.Errorf("stream error: %s: %s", event.Error.Type, event.Error.Message)
        }

        // 提取文本内容
        if event.Type == "content_block_delta" &&
            event.Delta != nil &&
            event.Delta.Type == "text_delta" {
            select {
            case textCh <- event.Delta.Text:
            case <-ctx.Done():
                return ctx.Err()
            }
        }
    }

    if err := scanner.Err(); err != nil {
        return fmt.Errorf("scan stream: %w", err)
    }
    return nil
}

func (c *Client) setHeaders(req *http.Request) {
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("x-api-key", c.apiKey)
    req.Header.Set("anthropic-version", anthropicVersion)
}

// APIError 代表一个可区分重试策略的 API 错误
type APIError struct {
    StatusCode int
    Message    string
    Retryable  bool
}

func (e *APIError) Error() string {
    return fmt.Sprintf("API error %d: %s", e.StatusCode, e.Message)
}

func (c *Client) checkStatusCode(resp *http.Response) error {
    if resp.StatusCode == http.StatusOK {
        return nil
    }

    body, _ := io.ReadAll(resp.Body)

    retryable := resp.StatusCode == 429 ||
        resp.StatusCode == 529 ||
        resp.StatusCode >= 500

    return &APIError{
        StatusCode: resp.StatusCode,
        Message:    string(body),
        Retryable:  retryable,
    }
}

// withRetry 使用指数退避重试,带 jitter
func (c *Client) withRetry(ctx context.Context, fn func() error) error {
    var lastErr error

    for attempt := 0; attempt <= c.maxRetries; attempt++ {
        if attempt > 0 {
            // 计算等待时间:base * 2^(attempt-1) + jitter
            base := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
            jitter := time.Duration(rand.Int63n(int64(base / 5)))
            waitTime := base + jitter

            // 最长等待 60 秒
            if waitTime > 60*time.Second {
                waitTime = 60 * time.Second
            }

            select {
            case <-time.After(waitTime):
            case <-ctx.Done():
                return ctx.Err()
            }
        }

        lastErr = fn()
        if lastErr == nil {
            return nil
        }

        // 判断是否可重试
        var apiErr *APIError
        if errors.As(lastErr, &apiErr) && !apiErr.Retryable {
            return lastErr // 不可重试错误,立即返回
        }

        if attempt < c.maxRetries {
            // 记录重试日志
            log.Printf("attempt %d failed: %v, retrying...", attempt+1, lastErr)
        }
    }

    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

将 LLM 流式响应代理给浏览器

package handler

import (
    "encoding/json"
    "fmt"
    "net/http"
)

type CompletionRequest struct {
    Messages []llm.Message `json:"messages"`
    System   string        `json:"system"`
}

// StreamHandler 将 LLM 的 SSE 响应实时转发给浏览器
func (h *Handler) StreamHandler(w http.ResponseWriter, r *http.Request) {
    // 解析请求
    var req CompletionRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid request", http.StatusBadRequest)
        return
    }

    // 验证用户(JWT 中间件已验证,此处获取用户 ID 用于限速)
    claims, ok := middleware.GetClaims(r.Context())
    if !ok {
        http.Error(w, "unauthorized", http.StatusUnauthorized)
        return
    }

    // 用户级别的速率限制(防止单个用户消耗过多 Token)
    if !h.userRateLimiter.Allow(claims.UserID) {
        http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
        return
    }

    // 设置 SSE 响应头
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    // 允许跨域(如果需要)
    w.Header().Set("Access-Control-Allow-Origin", "*")

    // 获取 http.Flusher,用于立即将数据发送给客户端
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }

    // 发送流式请求
    llmReq := llm.Request{
        Model:     "claude-opus-4-5",
        MaxTokens: 2048,
        System:    req.System,
        Messages:  req.Messages,
    }

    textCh, errCh := h.llmClient.StreamComplete(r.Context(), llmReq)

    // 将每个 token 立即发送给浏览器
    for {
        select {
        case text, ok := <-textCh:
            if !ok {
                // channel 关闭,流式传输完成
                // 发送结束事件
                fmt.Fprintf(w, "data: [DONE]\n\n")
                flusher.Flush()
                return
            }

            // 将 token 包装成 SSE 格式发送
            data, _ := json.Marshal(map[string]string{"text": text})
            fmt.Fprintf(w, "data: %s\n\n", data)
            flusher.Flush() // 立即推送给客户端,不等缓冲区满

        case err := <-errCh:
            if err != nil {
                // 发送错误事件给客户端
                errData, _ := json.Marshal(map[string]string{"error": err.Error()})
                fmt.Fprintf(w, "event: error\ndata: %s\n\n", errData)
                flusher.Flush()
            }
            return

        case <-r.Context().Done():
            // 客户端断开连接(关闭浏览器标签等)
            // context 取消会传播到 LLM 请求,停止消耗 Token
            return
        }
    }
}

前端消费 SSE 的 JavaScript 代码:

const eventSource = new EventSource('/api/stream', {
    // POST 请求需要用 fetch + ReadableStream,不能直接用 EventSource
});

// 或者用 fetch API(更灵活):
async function streamCompletion(messages) {
    const response = await fetch('/api/stream', {
        method: 'POST',
        headers: {'Content-Type': 'application/json'},
        body: JSON.stringify({messages}),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const {done, value} = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, {stream: true});
        const lines = buffer.split('\n');
        buffer = lines.pop(); // 保留不完整的行

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data === '[DONE]') return;
                try {
                    const parsed = JSON.parse(data);
                    appendToUI(parsed.text); // 实时显示文本
                } catch {}
            }
        }
    }
}

Redis 缓存常见响应

对于相同或相似的提示词,可以缓存响应以减少延迟和成本:

package cache

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

type LLMCache struct {
    redis *redis.Client
    ttl   time.Duration
}

func NewLLMCache(redis *redis.Client, ttl time.Duration) *LLMCache {
    return &LLMCache{redis: redis, ttl: ttl}
}

// cacheKey 基于请求内容生成缓存键
// 注意:只对确定性请求(temperature=0)缓存,随机性请求不缓存
func (c *LLMCache) cacheKey(req llm.Request) string {
    data, _ := json.Marshal(req)
    hash := sha256.Sum256(data)
    return "llm:cache:" + hex.EncodeToString(hash[:])
}

// Get 尝试从缓存获取响应
func (c *LLMCache) Get(ctx context.Context, req llm.Request) (*llm.Response, bool) {
    // 只缓存确定性请求
    if req.Temperature > 0 {
        return nil, false
    }

    key := c.cacheKey(req)
    data, err := c.redis.Get(ctx, key).Bytes()
    if err != nil {
        return nil, false
    }

    var resp llm.Response
    if err := json.Unmarshal(data, &resp); err != nil {
        return nil, false
    }
    return &resp, true
}

// Set 将响应存入缓存
func (c *LLMCache) Set(ctx context.Context, req llm.Request, resp *llm.Response) error {
    if req.Temperature > 0 {
        return nil // 不缓存非确定性响应
    }

    key := c.cacheKey(req)
    data, err := json.Marshal(resp)
    if err != nil {
        return err
    }
    return c.redis.Set(ctx, key, data, c.ttl).Err()
}

// CachedClient 包装 LLM 客户端,添加缓存层
type CachedClient struct {
    client *llm.Client
    cache  *LLMCache
}

func (c *CachedClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
    // 先查缓存
    if cached, ok := c.cache.Get(ctx, req); ok {
        return cached, nil
    }

    // 缓存未命中,调用 LLM
    resp, err := c.client.Complete(ctx, req)
    if err != nil {
        return nil, err
    }

    // 将结果存入缓存(异步,不阻塞主请求)
    go func() {
        cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        c.cache.Set(cacheCtx, req, resp)
    }()

    return resp, nil
}

并发 LLM 请求

package batch

import (
    "context"
    "sync"
)

type BatchResult struct {
    Index    int
    Response *llm.Response
    Error    error
}

// BatchComplete 并发执行多个 LLM 请求,按原始顺序返回结果
func BatchComplete(ctx context.Context, client *llm.Client, requests []llm.Request) []BatchResult {
    results := make([]BatchResult, len(requests))
    var wg sync.WaitGroup

    for i, req := range requests {
        wg.Add(1)
        go func(idx int, r llm.Request) {
            defer wg.Done()
            resp, err := client.Complete(ctx, r)
            results[idx] = BatchResult{
                Index:    idx,
                Response: resp,
                Error:    err,
            }
        }(i, req)
    }

    wg.Wait()
    return results
}

// 实际使用:为 10 篇文章分别生成摘要
func summarizeArticles(ctx context.Context, client *llm.Client, articles []string) ([]string, error) {
    requests := make([]llm.Request, len(articles))
    for i, article := range articles {
        requests[i] = llm.Request{
            Model:     "claude-haiku-4-5", // 用更快更便宜的模型做批处理
            MaxTokens: 200,
            System:    "用 50 字以内总结以下文章。",
            Messages: []llm.Message{
                {Role: "user", Content: article},
            },
        }
    }

    results := BatchComplete(ctx, client, requests)

    summaries := make([]string, len(articles))
    for _, result := range results {
        if result.Error != nil {
            return nil, fmt.Errorf("article %d failed: %w", result.Index, result.Error)
        }
        if len(result.Response.Content) > 0 {
            summaries[result.Index] = result.Response.Content[0].Text
        }
    }
    return summaries, nil
}

Level 4 · 进阶与边界

多模型统一抽象

当你的应用需要同时支持多个 LLM 提供商时,定义统一接口是关键:

package llm

import "context"

// Provider 是所有 LLM 提供商的统一接口
type Provider interface {
    // Complete 发送同步请求
    Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error)
    // Stream 发送流式请求
    Stream(ctx context.Context, req UnifiedRequest) (<-chan string, <-chan error)
    // Name 返回提供商名称(用于日志和监控)
    Name() string
}

// UnifiedRequest 是跨提供商的统一请求格式
type UnifiedRequest struct {
    Model       string
    MaxTokens   int
    Temperature float64
    System      string
    Messages    []Message
}

// UnifiedResponse 是统一响应格式
type UnifiedResponse struct {
    Content      string
    InputTokens  int
    OutputTokens int
    Model        string
    Provider     string
}

// AnthropicProvider 实现 Provider 接口
type AnthropicProvider struct {
    client *Client
}

func (p *AnthropicProvider) Name() string { return "anthropic" }

func (p *AnthropicProvider) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
    anthropicReq := Request{
        Model:       req.Model,
        MaxTokens:   req.MaxTokens,
        Temperature: req.Temperature,
        System:      req.System,
        Messages:    req.Messages,
    }

    resp, err := p.client.Complete(ctx, anthropicReq)
    if err != nil {
        return nil, err
    }

    content := ""
    if len(resp.Content) > 0 {
        content = resp.Content[0].Text
    }

    return &UnifiedResponse{
        Content:      content,
        InputTokens:  resp.Usage.InputTokens,
        OutputTokens: resp.Usage.OutputTokens,
        Model:        resp.Model,
        Provider:     "anthropic",
    }, nil
}

// OpenAIProvider 同样实现 Provider 接口
type OpenAIProvider struct {
    client *openai.Client
}

func (p *OpenAIProvider) Name() string { return "openai" }

func (p *OpenAIProvider) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
    // 转换为 OpenAI 格式...
    openaiMessages := convertMessages(req)
    resp, err := p.client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{
        Model:     openai.F(req.Model),
        MaxTokens: openai.F(int64(req.MaxTokens)),
        Messages:  openai.F(openaiMessages),
    })
    // ... 转换响应
}

// Router 实现模型路由(故障转移、负载均衡)
type Router struct {
    providers map[string]Provider
    fallback  []string // 按优先级排列的 provider 名称
}

func NewRouter(providers ...Provider) *Router {
    r := &Router{providers: make(map[string]Provider)}
    for _, p := range providers {
        r.providers[p.Name()] = p
        r.fallback = append(r.fallback, p.Name())
    }
    return r
}

// Complete 带故障转移的请求:主 provider 失败时尝试备用
func (r *Router) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
    var lastErr error
    for _, providerName := range r.fallback {
        provider := r.providers[providerName]
        resp, err := provider.Complete(ctx, req)
        if err == nil {
            return resp, nil
        }
        lastErr = err
        log.Printf("provider %s failed: %v, trying next...", providerName, err)
    }
    return nil, fmt.Errorf("all providers failed: %w", lastErr)
}

工具调用(Function Calling / Tool Use)

现代 LLM 支持"工具调用":模型生成结构化的工具调用请求,应用执行工具并将结果返回给模型:

package tools

// Tool 定义可以被 LLM 调用的工具
type Tool struct {
    Name        string          `json:"name"`
    Description string          `json:"description"`
    InputSchema json.RawMessage `json:"input_schema"`
}

// 定义工具集
var availableTools = []Tool{
    {
        Name:        "get_weather",
        Description: "获取指定城市的当前天气",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "城市名称"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["city"]
        }`),
    },
    {
        Name:        "search_database",
        Description: "在产品数据库中搜索",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "limit": {"type": "integer", "default": 10}
            },
            "required": ["query"]
        }`),
    },
}

// ToolCallLoop 实现工具调用的完整循环
func ToolCallLoop(ctx context.Context, client *llm.Client, userMessage string) (string, error) {
    messages := []llm.Message{
        {Role: "user", Content: userMessage},
    }

    // 最多循环 10 次,防止无限循环
    for i := 0; i < 10; i++ {
        req := llm.Request{
            Model:     "claude-opus-4-5",
            MaxTokens: 4096,
            Messages:  messages,
            Tools:     availableTools, // 告诉模型可用的工具
        }

        resp, err := client.Complete(ctx, req)
        if err != nil {
            return "", err
        }

        // 检查是否是工具调用
        if resp.StopReason == "tool_use" {
            // 执行所有工具调用
            toolResults := executeToolCalls(ctx, resp.Content)

            // 将工具结果追加到对话历史
            messages = append(messages,
                llm.Message{Role: "assistant", Content: resp.Content},     // 模型的工具调用请求
                llm.Message{Role: "user", Content: toolResults},           // 工具执行结果
            )
            continue
        }

        // 模型返回了最终文本响应
        if len(resp.Content) > 0 {
            return resp.Content[0].Text, nil
        }
    }

    return "", fmt.Errorf("tool call loop exceeded maximum iterations")
}

Prompt 模板:text/template

package prompt

import (
    "bytes"
    "text/template"
)

// PromptTemplate 管理 prompt 模板
type PromptTemplate struct {
    tmpl *template.Template
}

func NewPromptTemplate(name, tmplStr string) (*PromptTemplate, error) {
    tmpl, err := template.New(name).Parse(tmplStr)
    if err != nil {
        return nil, fmt.Errorf("parse template %s: %w", name, err)
    }
    return &PromptTemplate{tmpl: tmpl}, nil
}

func (p *PromptTemplate) Render(data any) (string, error) {
    var buf bytes.Buffer
    if err := p.tmpl.Execute(&buf, data); err != nil {
        return "", fmt.Errorf("render template: %w", err)
    }
    return buf.String(), nil
}

// 使用示例
var articleSummaryTemplate = `请将以下{{.Language}}文章总结为{{.MaxWords}}字以内的摘要。

要求:
{{range .Requirements}}
- {{.}}
{{end}}

文章内容:
{{.Content}}`

func SummarizeArticle(ctx context.Context, client *llm.Client, article string) (string, error) {
    tmpl, _ := NewPromptTemplate("summary", articleSummaryTemplate)
    prompt, err := tmpl.Render(map[string]any{
        "Language": "中文",
        "MaxWords": 100,
        "Requirements": []string{
            "保留核心观点",
            "使用客观语气",
            "不要包含主观评价",
        },
        "Content": article,
    })
    if err != nil {
        return "", err
    }

    resp, err := client.Complete(ctx, llm.Request{
        Model:     "claude-haiku-4-5",
        MaxTokens: 300,
        Messages:  []llm.Message{{Role: "user", Content: prompt}},
    })
    if err != nil {
        return "", err
    }
    return resp.Content[0].Text, nil
}

用录制的 Fixture 测试 LLM 调用

LLM 调用的测试面临两个挑战:1) 真实调用有延迟和成本;2) 响应不是确定性的,断言困难。解决方案是**录制-回放(Record-Replay)**模式:

package testing

import (
    "encoding/json"
    "os"
    "path/filepath"
    "testing"
)

// RecordingClient 包装真实客户端,录制请求/响应到文件
type RecordingClient struct {
    real      *llm.Client
    fixtureDir string
    record    bool // true=录制模式,false=回放模式
}

func (c *RecordingClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
    fixturePath := c.fixturePath(req)

    if !c.record {
        // 回放模式:从文件读取录制的响应
        data, err := os.ReadFile(fixturePath)
        if err != nil {
            return nil, fmt.Errorf("fixture not found at %s (run with -record to record): %w",
                fixturePath, err)
        }
        var resp llm.Response
        if err := json.Unmarshal(data, &resp); err != nil {
            return nil, err
        }
        return &resp, nil
    }

    // 录制模式:调用真实 API,保存响应
    resp, err := c.real.Complete(ctx, req)
    if err != nil {
        return nil, err
    }

    data, _ := json.MarshalIndent(resp, "", "  ")
    os.MkdirAll(filepath.Dir(fixturePath), 0755)
    os.WriteFile(fixturePath, data, 0644)

    return resp, nil
}

func (c *RecordingClient) fixturePath(req llm.Request) string {
    hash := sha256.Sum256(mustMarshal(req))
    return filepath.Join(c.fixtureDir, hex.EncodeToString(hash[:16])+".json")
}

// TestSummarize 使用 fixture 的测试
func TestSummarize(t *testing.T) {
    record := os.Getenv("RECORD_FIXTURES") == "true"

    client := &RecordingClient{
        real:       llm.NewClient(os.Getenv("ANTHROPIC_API_KEY"), 5, 3),
        fixtureDir: "testdata/fixtures",
        record:     record,
    }

    result, err := SummarizeArticle(context.Background(), client, "Go 语言由 Google 开发...")
    if err != nil {
        t.Fatal(err)
    }

    if len(result) == 0 {
        t.Error("expected non-empty summary")
    }
    // 更多断言...
}
# 录制 fixture(只需要在首次运行或 prompt 变更时执行)
RECORD_FIXTURES=true ANTHROPIC_API_KEY=sk-... go test ./...

# 日常运行:使用录制的 fixture,无需 API Key,无费用,极快
go test ./...

成本追踪中间件

package cost

import (
    "context"
    "sync/atomic"
    "time"
)

// CostTracker 追踪每次 LLM 调用的 Token 用量和估算费用
type CostTracker struct {
    db     *sql.DB
    // 原子计数器用于实时统计(不需要锁)
    totalInputTokens  atomic.Int64
    totalOutputTokens atomic.Int64
}

// ModelPricing 每百万 token 的价格(美元)
var ModelPricing = map[string]struct{ Input, Output float64 }{
    "claude-opus-4-5":   {Input: 15.0, Output: 75.0},
    "claude-sonnet-4-5": {Input: 3.0, Output: 15.0},
    "claude-haiku-4-5":  {Input: 0.25, Output: 1.25},
}

func (t *CostTracker) TrackUsage(ctx context.Context, model string, usage llm.Usage, userID int64) {
    t.totalInputTokens.Add(int64(usage.InputTokens))
    t.totalOutputTokens.Add(int64(usage.OutputTokens))

    pricing, ok := ModelPricing[model]
    if !ok {
        return
    }

    cost := float64(usage.InputTokens)/1e6*pricing.Input +
        float64(usage.OutputTokens)/1e6*pricing.Output

    // 异步写入数据库,不阻塞主请求
    go func() {
        _, err := t.db.ExecContext(ctx,
            `INSERT INTO llm_usage (user_id, model, input_tokens, output_tokens, cost_usd, created_at)
             VALUES (?, ?, ?, ?, ?, ?)`,
            userID, model, usage.InputTokens, usage.OutputTokens, cost, time.Now(),
        )
        if err != nil {
            log.Printf("failed to track LLM usage: %v", err)
        }
    }()
}

// GetStats 返回当前会话的使用统计
func (t *CostTracker) GetStats() (inputTokens, outputTokens int64) {
    return t.totalInputTokens.Load(), t.totalOutputTokens.Load()
}

LLM API 集成是当前最激动人心的工程挑战之一:你需要处理慢速网络 I/O、流式协议解析、并发控制、成本管理和不可预测的模型行为——而这些恰好是 Go 最擅长的场景。本章构建的客户端库涵盖了生产级 LLM 服务的核心需求:可靠的重试逻辑、低延迟的流式代理、多模型抽象和可测试的架构。下一步是将这些组件集成到实际的产品功能中,记住:LLM 是工具,不是魔法——正确的工程实践让它成为可靠的服务,而不是随机的黑盒。

本章评分
4.7  / 5  (3 评分)

💬 留言讨论