调用 LLM API:流式响应与错误处理
调用 LLM API:流式响应与错误处理
2023 年 3 月,ChatGPT 的 API 向公众开放。三个月内,数以万计的开发者开始构建 LLM 驱动的应用。大多数人的第一个版本都有同一个问题:用户点击"生成",然后等待——8 秒、15 秒、有时 30 秒——白屏,然后文本突然全部出现。这种体验是反直觉的,因为 LLM 本身是逐 token 生成输出的。真正的问题是:开发者在等待完整响应后才发送给客户端。
Go 是解决这个问题的理想语言。net/http 的 http.Flusher 接口、bufio.Scanner 的流式读取、goroutine 的轻量并发——这些工具的组合,让 Go 能够以极低的延迟将 LLM 的每一个 token 实时转发给浏览器,同时优雅地处理超时、重试和并发控制。
本章将构建一个生产级的 LLM 客户端库,涵盖从 Anthropic Claude API 的基本调用到多模型抽象的完整体系。
Level 1 · 你需要知道的
LLM API 生态全景
当前主流的 LLM API 提供商及其 Go 生态:
Anthropic Claude:由前 OpenAI 研究员创立,以安全性和长上下文著称。Claude 3.5 Sonnet 在编码、分析和写作方面表现突出。API 基于 Messages 格式,支持 System 提示词、多轮对话、工具调用(Tool Use)和流式输出。
OpenAI GPT:最广泛使用的 LLM API,GPT-4o 是其旗舰模型。有官方的 Go SDK(github.com/openai/openai-go)。Chat Completions API 与 Claude 的设计思路相近,但细节不同。
Google Gemini:Google 的多模态模型,支持文本、图像、视频、音频输入。通过 google.golang.org/genai 访问。
本地模型(Ollama):ollama.com 让你在本地运行 Llama 3、Mistral 等开源模型。Ollama 提供 OpenAI 兼容 API,Go 代码几乎不需要改动。
为什么 Go 特别适合 LLM 服务
构建 LLM 驱动的服务面临独特的挑战,而 Go 的设计恰好是这些挑战的对立面:
高延迟与并发需求:LLM 调用通常需要 3-30 秒,远比普通 API 调用慢。在高流量下,大量请求会同时处于"等待 LLM 响应"状态。Go 的 goroutine(每个约 8KB 初始栈)让你能同时维持数万个并发 LLM 请求,而线程模型的语言在此场景下内存消耗会爆炸。
流式 I/O 处理:LLM 流式响应本质上是一个持续的字节流。Go 的 io.Reader 接口和 bufio 包是处理这类流的标准工具,标准库层面的支持意味着不需要引入额外依赖。
单二进制部署:LLM 服务需要部署到多个区域(降低延迟)、快速扩缩容(响应流量峰值)。Go 的单二进制部署在容器化和 Kubernetes 场景下优势极大。
context 传播:当用户关闭浏览器标签时,理应立即取消对 LLM API 的请求(节省 Token 费用)。Go 的 context.Context 机制让请求取消从浏览器一路传播到 LLM API 调用,链路清晰。
Token 经济学:理解成本
LLM API 按 Token 计费。一个 Token 大约是 3-4 个英文字符,或 1.5-2 个中文字符。
Claude 3.5 Sonnet(2024 年价格,仅供参考):
- 输入:$3 / 百万 token
- 输出:$15 / 百万 token
一个典型的"总结文章"请求:
- 文章内容:2000 token(输入)
- System 提示词:200 token(输入)
- 摘要输出:300 token(输出)
- 费用:(2200 × $3 + 300 × $15) / 1,000,000 ≈ $0.011
在服务 B2C 应用时,Token 成本管理至关重要:
- 设置合理的
max_tokens:防止模型无限生成(节省输出 Token) - 缓存 System 提示词:Anthropic 支持 Prompt Caching,重复的 System 提示词只计费一次
- 记录每请求 Token 用量:用于成本归因和用量告警
Level 2 · 原理与机制
Anthropic Claude API:消息格式
Claude API 使用 Messages 格式,支持多轮对话:
POST https://api.anthropic.com/v1/messages
{
"model": "claude-opus-4-5",
"max_tokens": 2048,
"temperature": 0.7,
"system": "你是一个专业的 Go 语言教师,用简洁、准确的语言解答问题。",
"messages": [
{"role": "user", "content": "解释 Go 中的 channel 和 mutex 的使用场景区别"},
{"role": "assistant", "content": "Channel 和 Mutex 都用于处理并发..."},
{"role": "user", "content": "能给我一个具体的代码例子吗?"}
]
}
参数说明:
model:模型 ID。不同模型有速度/质量/价格的权衡max_tokens:最大输出 Token 数。必须显式设置,Claude 不像某些 API 有默认值temperature(0-1):控制随机性。0 = 确定性输出(适合代码生成),1 = 最大创意(适合写作)system:System 提示词,设置 AI 的角色和行为规则,独立于对话历史
与 OpenAI 的差异:
- Claude 将
system作为独立字段,不是 messages 中 role=system 的消息 - Claude 不支持
functions(旧格式),使用tools(工具调用) - Claude 的流式响应使用 SSE(Server-Sent Events)格式,事件类型比 OpenAI 更丰富
SSE 流式响应:底层机制
Server-Sent Events(SSE)是 HTTP 长连接的一种简单协议。服务器保持连接打开,持续发送格式化的文本数据:
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"!"}}
data: [DONE]
SSE 格式规范:
- 每个事件以一个或多个
field: value行开头 - 事件之间用空行分隔
data:字段包含事件数据event:字段指定事件类型(Claude API 用得更多)[DONE]标记流结束
在 HTTP 层面,服务端必须设置:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
客户端(包括 Go 的 net/http)收到这个响应后会保持连接,持续读取数据,直到连接关闭或读到流结束标记。
指数退避重试
LLM API 会遇到多种瞬时错误:
429 Too Many Requests:超过速率限制,需要等待529 Overloaded(Anthropic 特有):API 过载500/502/503:服务端临时故障
指数退避(Exponential Backoff) 是处理这类错误的标准策略:每次重试等待时间加倍,防止大量重试请求同时打到服务端(惊群效应):
第1次重试:等待 1s
第2次重试:等待 2s
第3次重试:等待 4s
第4次重试:等待 8s
...加上随机 jitter(±20%),防止多个实例同步重试
重试时必须区分:
- 可重试错误:429、529、500、502、503、网络超时
- 不可重试错误:400(请求格式错误)、401(认证失败)、404(模型不存在)
并发控制:信号量模式
当多个 goroutine 并发调用 LLM API 时,需要限制最大并发数(否则可能触发速率限制)。Go 中最惯用的并发控制方式是带缓冲 channel 作信号量:
// 容量为 10 的 channel 允许最多 10 个并发请求
sem := make(chan struct{}, 10)
// 获取信号量(如果已满则阻塞等待)
sem <- struct{}{}
// 执行操作
doWork()
// 释放信号量
<-sem
这个模式简洁且不需要额外依赖。在 LLM 客户端中,信号量通常设置为 API 提供商的并发限制(如 Anthropic 的默认限制是每个 API Key 5 个并发请求)。
Level 3 · 代码实践
完整的 Anthropic Claude 客户端
package llm
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"strings"
"time"
)
const anthropicAPIURL = "https://api.anthropic.com/v1/messages"
const anthropicVersion = "2023-06-01"
// Message 代表对话中的一条消息
type Message struct {
Role string `json:"role"` // "user" 或 "assistant"
Content string `json:"content"`
}
// Request 是发送给 Claude API 的请求体
type Request struct {
Model string `json:"model"`
MaxTokens int `json:"max_tokens"`
System string `json:"system,omitempty"`
Messages []Message `json:"messages"`
Stream bool `json:"stream"`
Temperature float64 `json:"temperature,omitempty"`
}
// Usage 记录 token 用量
type Usage struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
}
// Response 是非流式响应的结构
type Response struct {
ID string `json:"id"`
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"content"`
Usage Usage `json:"usage"`
Model string `json:"model"`
}
// StreamEvent 代表流式响应中的一个事件
type StreamEvent struct {
Type string `json:"type"`
Delta *struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"delta,omitempty"`
Usage *Usage `json:"usage,omitempty"`
Error *struct {
Type string `json:"type"`
Message string `json:"message"`
} `json:"error,omitempty"`
}
// Client 是 Anthropic API 客户端
type Client struct {
apiKey string
httpClient *http.Client
semaphore chan struct{} // 并发控制信号量
maxRetries int
}
// NewClient 创建一个新的 Client
func NewClient(apiKey string, maxConcurrency, maxRetries int) *Client {
return &Client{
apiKey: apiKey,
httpClient: &http.Client{
Timeout: 5 * time.Minute, // LLM 请求可能很慢
},
semaphore: make(chan struct{}, maxConcurrency),
maxRetries: maxRetries,
}
}
// Complete 发送非流式请求,返回完整响应
func (c *Client) Complete(ctx context.Context, req Request) (*Response, error) {
req.Stream = false
// 获取信号量
select {
case c.semaphore <- struct{}{}:
defer func() { <-c.semaphore }()
case <-ctx.Done():
return nil, ctx.Err()
}
var resp *Response
err := c.withRetry(ctx, func() error {
var err error
resp, err = c.doComplete(ctx, req)
return err
})
return resp, err
}
func (c *Client) doComplete(ctx context.Context, req Request) (*Response, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
httpResp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("send request: %w", err)
}
defer httpResp.Body.Close()
if err := c.checkStatusCode(httpResp); err != nil {
return nil, err
}
var resp Response
if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &resp, nil
}
// StreamComplete 发送流式请求,通过 channel 返回每个文本片段
func (c *Client) StreamComplete(ctx context.Context, req Request) (<-chan string, <-chan error) {
textCh := make(chan string, 100) // 缓冲,防止背压
errCh := make(chan error, 1)
req.Stream = true
go func() {
defer close(textCh)
defer close(errCh)
// 获取信号量
select {
case c.semaphore <- struct{}{}:
defer func() { <-c.semaphore }()
case <-ctx.Done():
errCh <- ctx.Err()
return
}
err := c.withRetry(ctx, func() error {
return c.doStream(ctx, req, textCh)
})
if err != nil {
errCh <- err
}
}()
return textCh, errCh
}
func (c *Client) doStream(ctx context.Context, req Request, textCh chan<- string) error {
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
httpResp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
defer httpResp.Body.Close()
if err := c.checkStatusCode(httpResp); err != nil {
return err
}
// 逐行解析 SSE 流
scanner := bufio.NewScanner(httpResp.Body)
// 增大缓冲区,防止长行截断
scanner.Buffer(make([]byte, 64*1024), 64*1024)
for scanner.Scan() {
line := scanner.Text()
// SSE 格式:以 "data: " 开头
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
// 流结束标记
if data == "[DONE]" {
return nil
}
var event StreamEvent
if err := json.Unmarshal([]byte(data), &event); err != nil {
// 某些行可能是心跳或其他格式,跳过
continue
}
// 处理错误事件
if event.Type == "error" && event.Error != nil {
return fmt.Errorf("stream error: %s: %s", event.Error.Type, event.Error.Message)
}
// 提取文本内容
if event.Type == "content_block_delta" &&
event.Delta != nil &&
event.Delta.Type == "text_delta" {
select {
case textCh <- event.Delta.Text:
case <-ctx.Done():
return ctx.Err()
}
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scan stream: %w", err)
}
return nil
}
func (c *Client) setHeaders(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-api-key", c.apiKey)
req.Header.Set("anthropic-version", anthropicVersion)
}
// APIError 代表一个可区分重试策略的 API 错误
type APIError struct {
StatusCode int
Message string
Retryable bool
}
func (e *APIError) Error() string {
return fmt.Sprintf("API error %d: %s", e.StatusCode, e.Message)
}
func (c *Client) checkStatusCode(resp *http.Response) error {
if resp.StatusCode == http.StatusOK {
return nil
}
body, _ := io.ReadAll(resp.Body)
retryable := resp.StatusCode == 429 ||
resp.StatusCode == 529 ||
resp.StatusCode >= 500
return &APIError{
StatusCode: resp.StatusCode,
Message: string(body),
Retryable: retryable,
}
}
// withRetry 使用指数退避重试,带 jitter
func (c *Client) withRetry(ctx context.Context, fn func() error) error {
var lastErr error
for attempt := 0; attempt <= c.maxRetries; attempt++ {
if attempt > 0 {
// 计算等待时间:base * 2^(attempt-1) + jitter
base := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
jitter := time.Duration(rand.Int63n(int64(base / 5)))
waitTime := base + jitter
// 最长等待 60 秒
if waitTime > 60*time.Second {
waitTime = 60 * time.Second
}
select {
case <-time.After(waitTime):
case <-ctx.Done():
return ctx.Err()
}
}
lastErr = fn()
if lastErr == nil {
return nil
}
// 判断是否可重试
var apiErr *APIError
if errors.As(lastErr, &apiErr) && !apiErr.Retryable {
return lastErr // 不可重试错误,立即返回
}
if attempt < c.maxRetries {
// 记录重试日志
log.Printf("attempt %d failed: %v, retrying...", attempt+1, lastErr)
}
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
将 LLM 流式响应代理给浏览器
package handler
import (
"encoding/json"
"fmt"
"net/http"
)
type CompletionRequest struct {
Messages []llm.Message `json:"messages"`
System string `json:"system"`
}
// StreamHandler 将 LLM 的 SSE 响应实时转发给浏览器
func (h *Handler) StreamHandler(w http.ResponseWriter, r *http.Request) {
// 解析请求
var req CompletionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
// 验证用户(JWT 中间件已验证,此处获取用户 ID 用于限速)
claims, ok := middleware.GetClaims(r.Context())
if !ok {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
// 用户级别的速率限制(防止单个用户消耗过多 Token)
if !h.userRateLimiter.Allow(claims.UserID) {
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
// 设置 SSE 响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// 允许跨域(如果需要)
w.Header().Set("Access-Control-Allow-Origin", "*")
// 获取 http.Flusher,用于立即将数据发送给客户端
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
// 发送流式请求
llmReq := llm.Request{
Model: "claude-opus-4-5",
MaxTokens: 2048,
System: req.System,
Messages: req.Messages,
}
textCh, errCh := h.llmClient.StreamComplete(r.Context(), llmReq)
// 将每个 token 立即发送给浏览器
for {
select {
case text, ok := <-textCh:
if !ok {
// channel 关闭,流式传输完成
// 发送结束事件
fmt.Fprintf(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
// 将 token 包装成 SSE 格式发送
data, _ := json.Marshal(map[string]string{"text": text})
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush() // 立即推送给客户端,不等缓冲区满
case err := <-errCh:
if err != nil {
// 发送错误事件给客户端
errData, _ := json.Marshal(map[string]string{"error": err.Error()})
fmt.Fprintf(w, "event: error\ndata: %s\n\n", errData)
flusher.Flush()
}
return
case <-r.Context().Done():
// 客户端断开连接(关闭浏览器标签等)
// context 取消会传播到 LLM 请求,停止消耗 Token
return
}
}
}
前端消费 SSE 的 JavaScript 代码:
const eventSource = new EventSource('/api/stream', {
// POST 请求需要用 fetch + ReadableStream,不能直接用 EventSource
});
// 或者用 fetch API(更灵活):
async function streamCompletion(messages) {
const response = await fetch('/api/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({messages}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buffer += decoder.decode(value, {stream: true});
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
appendToUI(parsed.text); // 实时显示文本
} catch {}
}
}
}
}
Redis 缓存常见响应
对于相同或相似的提示词,可以缓存响应以减少延迟和成本:
package cache
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type LLMCache struct {
redis *redis.Client
ttl time.Duration
}
func NewLLMCache(redis *redis.Client, ttl time.Duration) *LLMCache {
return &LLMCache{redis: redis, ttl: ttl}
}
// cacheKey 基于请求内容生成缓存键
// 注意:只对确定性请求(temperature=0)缓存,随机性请求不缓存
func (c *LLMCache) cacheKey(req llm.Request) string {
data, _ := json.Marshal(req)
hash := sha256.Sum256(data)
return "llm:cache:" + hex.EncodeToString(hash[:])
}
// Get 尝试从缓存获取响应
func (c *LLMCache) Get(ctx context.Context, req llm.Request) (*llm.Response, bool) {
// 只缓存确定性请求
if req.Temperature > 0 {
return nil, false
}
key := c.cacheKey(req)
data, err := c.redis.Get(ctx, key).Bytes()
if err != nil {
return nil, false
}
var resp llm.Response
if err := json.Unmarshal(data, &resp); err != nil {
return nil, false
}
return &resp, true
}
// Set 将响应存入缓存
func (c *LLMCache) Set(ctx context.Context, req llm.Request, resp *llm.Response) error {
if req.Temperature > 0 {
return nil // 不缓存非确定性响应
}
key := c.cacheKey(req)
data, err := json.Marshal(resp)
if err != nil {
return err
}
return c.redis.Set(ctx, key, data, c.ttl).Err()
}
// CachedClient 包装 LLM 客户端,添加缓存层
type CachedClient struct {
client *llm.Client
cache *LLMCache
}
func (c *CachedClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
// 先查缓存
if cached, ok := c.cache.Get(ctx, req); ok {
return cached, nil
}
// 缓存未命中,调用 LLM
resp, err := c.client.Complete(ctx, req)
if err != nil {
return nil, err
}
// 将结果存入缓存(异步,不阻塞主请求)
go func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.cache.Set(cacheCtx, req, resp)
}()
return resp, nil
}
并发 LLM 请求
package batch
import (
"context"
"sync"
)
type BatchResult struct {
Index int
Response *llm.Response
Error error
}
// BatchComplete 并发执行多个 LLM 请求,按原始顺序返回结果
func BatchComplete(ctx context.Context, client *llm.Client, requests []llm.Request) []BatchResult {
results := make([]BatchResult, len(requests))
var wg sync.WaitGroup
for i, req := range requests {
wg.Add(1)
go func(idx int, r llm.Request) {
defer wg.Done()
resp, err := client.Complete(ctx, r)
results[idx] = BatchResult{
Index: idx,
Response: resp,
Error: err,
}
}(i, req)
}
wg.Wait()
return results
}
// 实际使用:为 10 篇文章分别生成摘要
func summarizeArticles(ctx context.Context, client *llm.Client, articles []string) ([]string, error) {
requests := make([]llm.Request, len(articles))
for i, article := range articles {
requests[i] = llm.Request{
Model: "claude-haiku-4-5", // 用更快更便宜的模型做批处理
MaxTokens: 200,
System: "用 50 字以内总结以下文章。",
Messages: []llm.Message{
{Role: "user", Content: article},
},
}
}
results := BatchComplete(ctx, client, requests)
summaries := make([]string, len(articles))
for _, result := range results {
if result.Error != nil {
return nil, fmt.Errorf("article %d failed: %w", result.Index, result.Error)
}
if len(result.Response.Content) > 0 {
summaries[result.Index] = result.Response.Content[0].Text
}
}
return summaries, nil
}
Level 4 · 进阶与边界
多模型统一抽象
当你的应用需要同时支持多个 LLM 提供商时,定义统一接口是关键:
package llm
import "context"
// Provider 是所有 LLM 提供商的统一接口
type Provider interface {
// Complete 发送同步请求
Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error)
// Stream 发送流式请求
Stream(ctx context.Context, req UnifiedRequest) (<-chan string, <-chan error)
// Name 返回提供商名称(用于日志和监控)
Name() string
}
// UnifiedRequest 是跨提供商的统一请求格式
type UnifiedRequest struct {
Model string
MaxTokens int
Temperature float64
System string
Messages []Message
}
// UnifiedResponse 是统一响应格式
type UnifiedResponse struct {
Content string
InputTokens int
OutputTokens int
Model string
Provider string
}
// AnthropicProvider 实现 Provider 接口
type AnthropicProvider struct {
client *Client
}
func (p *AnthropicProvider) Name() string { return "anthropic" }
func (p *AnthropicProvider) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
anthropicReq := Request{
Model: req.Model,
MaxTokens: req.MaxTokens,
Temperature: req.Temperature,
System: req.System,
Messages: req.Messages,
}
resp, err := p.client.Complete(ctx, anthropicReq)
if err != nil {
return nil, err
}
content := ""
if len(resp.Content) > 0 {
content = resp.Content[0].Text
}
return &UnifiedResponse{
Content: content,
InputTokens: resp.Usage.InputTokens,
OutputTokens: resp.Usage.OutputTokens,
Model: resp.Model,
Provider: "anthropic",
}, nil
}
// OpenAIProvider 同样实现 Provider 接口
type OpenAIProvider struct {
client *openai.Client
}
func (p *OpenAIProvider) Name() string { return "openai" }
func (p *OpenAIProvider) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
// 转换为 OpenAI 格式...
openaiMessages := convertMessages(req)
resp, err := p.client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{
Model: openai.F(req.Model),
MaxTokens: openai.F(int64(req.MaxTokens)),
Messages: openai.F(openaiMessages),
})
// ... 转换响应
}
// Router 实现模型路由(故障转移、负载均衡)
type Router struct {
providers map[string]Provider
fallback []string // 按优先级排列的 provider 名称
}
func NewRouter(providers ...Provider) *Router {
r := &Router{providers: make(map[string]Provider)}
for _, p := range providers {
r.providers[p.Name()] = p
r.fallback = append(r.fallback, p.Name())
}
return r
}
// Complete 带故障转移的请求:主 provider 失败时尝试备用
func (r *Router) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
var lastErr error
for _, providerName := range r.fallback {
provider := r.providers[providerName]
resp, err := provider.Complete(ctx, req)
if err == nil {
return resp, nil
}
lastErr = err
log.Printf("provider %s failed: %v, trying next...", providerName, err)
}
return nil, fmt.Errorf("all providers failed: %w", lastErr)
}
工具调用(Function Calling / Tool Use)
现代 LLM 支持"工具调用":模型生成结构化的工具调用请求,应用执行工具并将结果返回给模型:
package tools
// Tool 定义可以被 LLM 调用的工具
type Tool struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"input_schema"`
}
// 定义工具集
var availableTools = []Tool{
{
Name: "get_weather",
Description: "获取指定城市的当前天气",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}`),
},
{
Name: "search_database",
Description: "在产品数据库中搜索",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["query"]
}`),
},
}
// ToolCallLoop 实现工具调用的完整循环
func ToolCallLoop(ctx context.Context, client *llm.Client, userMessage string) (string, error) {
messages := []llm.Message{
{Role: "user", Content: userMessage},
}
// 最多循环 10 次,防止无限循环
for i := 0; i < 10; i++ {
req := llm.Request{
Model: "claude-opus-4-5",
MaxTokens: 4096,
Messages: messages,
Tools: availableTools, // 告诉模型可用的工具
}
resp, err := client.Complete(ctx, req)
if err != nil {
return "", err
}
// 检查是否是工具调用
if resp.StopReason == "tool_use" {
// 执行所有工具调用
toolResults := executeToolCalls(ctx, resp.Content)
// 将工具结果追加到对话历史
messages = append(messages,
llm.Message{Role: "assistant", Content: resp.Content}, // 模型的工具调用请求
llm.Message{Role: "user", Content: toolResults}, // 工具执行结果
)
continue
}
// 模型返回了最终文本响应
if len(resp.Content) > 0 {
return resp.Content[0].Text, nil
}
}
return "", fmt.Errorf("tool call loop exceeded maximum iterations")
}
Prompt 模板:text/template
package prompt
import (
"bytes"
"text/template"
)
// PromptTemplate 管理 prompt 模板
type PromptTemplate struct {
tmpl *template.Template
}
func NewPromptTemplate(name, tmplStr string) (*PromptTemplate, error) {
tmpl, err := template.New(name).Parse(tmplStr)
if err != nil {
return nil, fmt.Errorf("parse template %s: %w", name, err)
}
return &PromptTemplate{tmpl: tmpl}, nil
}
func (p *PromptTemplate) Render(data any) (string, error) {
var buf bytes.Buffer
if err := p.tmpl.Execute(&buf, data); err != nil {
return "", fmt.Errorf("render template: %w", err)
}
return buf.String(), nil
}
// 使用示例
var articleSummaryTemplate = `请将以下{{.Language}}文章总结为{{.MaxWords}}字以内的摘要。
要求:
{{range .Requirements}}
- {{.}}
{{end}}
文章内容:
{{.Content}}`
func SummarizeArticle(ctx context.Context, client *llm.Client, article string) (string, error) {
tmpl, _ := NewPromptTemplate("summary", articleSummaryTemplate)
prompt, err := tmpl.Render(map[string]any{
"Language": "中文",
"MaxWords": 100,
"Requirements": []string{
"保留核心观点",
"使用客观语气",
"不要包含主观评价",
},
"Content": article,
})
if err != nil {
return "", err
}
resp, err := client.Complete(ctx, llm.Request{
Model: "claude-haiku-4-5",
MaxTokens: 300,
Messages: []llm.Message{{Role: "user", Content: prompt}},
})
if err != nil {
return "", err
}
return resp.Content[0].Text, nil
}
用录制的 Fixture 测试 LLM 调用
LLM 调用的测试面临两个挑战:1) 真实调用有延迟和成本;2) 响应不是确定性的,断言困难。解决方案是**录制-回放(Record-Replay)**模式:
package testing
import (
"encoding/json"
"os"
"path/filepath"
"testing"
)
// RecordingClient 包装真实客户端,录制请求/响应到文件
type RecordingClient struct {
real *llm.Client
fixtureDir string
record bool // true=录制模式,false=回放模式
}
func (c *RecordingClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
fixturePath := c.fixturePath(req)
if !c.record {
// 回放模式:从文件读取录制的响应
data, err := os.ReadFile(fixturePath)
if err != nil {
return nil, fmt.Errorf("fixture not found at %s (run with -record to record): %w",
fixturePath, err)
}
var resp llm.Response
if err := json.Unmarshal(data, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// 录制模式:调用真实 API,保存响应
resp, err := c.real.Complete(ctx, req)
if err != nil {
return nil, err
}
data, _ := json.MarshalIndent(resp, "", " ")
os.MkdirAll(filepath.Dir(fixturePath), 0755)
os.WriteFile(fixturePath, data, 0644)
return resp, nil
}
func (c *RecordingClient) fixturePath(req llm.Request) string {
hash := sha256.Sum256(mustMarshal(req))
return filepath.Join(c.fixtureDir, hex.EncodeToString(hash[:16])+".json")
}
// TestSummarize 使用 fixture 的测试
func TestSummarize(t *testing.T) {
record := os.Getenv("RECORD_FIXTURES") == "true"
client := &RecordingClient{
real: llm.NewClient(os.Getenv("ANTHROPIC_API_KEY"), 5, 3),
fixtureDir: "testdata/fixtures",
record: record,
}
result, err := SummarizeArticle(context.Background(), client, "Go 语言由 Google 开发...")
if err != nil {
t.Fatal(err)
}
if len(result) == 0 {
t.Error("expected non-empty summary")
}
// 更多断言...
}
# 录制 fixture(只需要在首次运行或 prompt 变更时执行)
RECORD_FIXTURES=true ANTHROPIC_API_KEY=sk-... go test ./...
# 日常运行:使用录制的 fixture,无需 API Key,无费用,极快
go test ./...
成本追踪中间件
package cost
import (
"context"
"sync/atomic"
"time"
)
// CostTracker 追踪每次 LLM 调用的 Token 用量和估算费用
type CostTracker struct {
db *sql.DB
// 原子计数器用于实时统计(不需要锁)
totalInputTokens atomic.Int64
totalOutputTokens atomic.Int64
}
// ModelPricing 每百万 token 的价格(美元)
var ModelPricing = map[string]struct{ Input, Output float64 }{
"claude-opus-4-5": {Input: 15.0, Output: 75.0},
"claude-sonnet-4-5": {Input: 3.0, Output: 15.0},
"claude-haiku-4-5": {Input: 0.25, Output: 1.25},
}
func (t *CostTracker) TrackUsage(ctx context.Context, model string, usage llm.Usage, userID int64) {
t.totalInputTokens.Add(int64(usage.InputTokens))
t.totalOutputTokens.Add(int64(usage.OutputTokens))
pricing, ok := ModelPricing[model]
if !ok {
return
}
cost := float64(usage.InputTokens)/1e6*pricing.Input +
float64(usage.OutputTokens)/1e6*pricing.Output
// 异步写入数据库,不阻塞主请求
go func() {
_, err := t.db.ExecContext(ctx,
`INSERT INTO llm_usage (user_id, model, input_tokens, output_tokens, cost_usd, created_at)
VALUES (?, ?, ?, ?, ?, ?)`,
userID, model, usage.InputTokens, usage.OutputTokens, cost, time.Now(),
)
if err != nil {
log.Printf("failed to track LLM usage: %v", err)
}
}()
}
// GetStats 返回当前会话的使用统计
func (t *CostTracker) GetStats() (inputTokens, outputTokens int64) {
return t.totalInputTokens.Load(), t.totalOutputTokens.Load()
}
LLM API 集成是当前最激动人心的工程挑战之一:你需要处理慢速网络 I/O、流式协议解析、并发控制、成本管理和不可预测的模型行为——而这些恰好是 Go 最擅长的场景。本章构建的客户端库涵盖了生产级 LLM 服务的核心需求:可靠的重试逻辑、低延迟的流式代理、多模型抽象和可测试的架构。下一步是将这些组件集成到实际的产品功能中,记住:LLM 是工具,不是魔法——正确的工程实践让它成为可靠的服务,而不是随机的黑盒。