Chapter 49

Calling LLM APIs: Streaming and Error Handling

Calling LLM APIs: Streaming and Error Handling

In March 2023, the ChatGPT API opened to the public. Within three months, tens of thousands of developers began building LLM-powered applications. Most first versions shared the same problem: the user clicked "Generate," then waited—8 seconds, 15 seconds, sometimes 30—blank screen, then all the text appeared at once. The experience was counterintuitive, because the LLM itself generates output token by token. The real problem was: developers were waiting for the complete response before sending anything to the client.

Go is the ideal language to solve this problem. The http.Flusher interface from net/http, bufio.Scanner's streaming reads, the lightweight concurrency of goroutines—these tools combined allow Go to forward every LLM token to the browser in real time with minimal latency, while gracefully handling timeouts, retries, and concurrency limits.

This chapter builds a production-grade LLM client library, covering everything from basic Anthropic Claude API calls to a full multi-model abstraction layer.

Level 1 · What You Need to Know

The LLM API Landscape

The major LLM API providers and their Go ecosystem:

Anthropic Claude: Founded by former OpenAI researchers, renowned for safety and long context windows. Claude 3.5 Sonnet excels at coding, analysis, and writing. The API uses a Messages format supporting system prompts, multi-turn conversations, tool use, and streaming output.

OpenAI GPT: The most widely used LLM API; GPT-4o is its flagship model. There is an official Go SDK (github.com/openai/openai-go). The Chat Completions API shares design goals with Claude's but differs in details.

Google Gemini: Google's multimodal model supporting text, image, video, and audio input. Accessed via google.golang.org/genai.

Local Models (Ollama): ollama.com lets you run Llama 3, Mistral, and other open-source models locally. Ollama exposes an OpenAI-compatible API; Go code requires almost no changes to switch.

Why Go Is Exceptional for LLM Services

Building LLM-powered services poses unique challenges that Go's design directly addresses:

High latency and concurrency demands: LLM calls typically take 3-30 seconds, far slower than conventional API calls. Under high traffic, many requests are simultaneously in the "waiting for LLM response" state. Go's goroutines (each with ~8KB initial stack) let you maintain tens of thousands of concurrent LLM requests, whereas thread-based languages would exhaust memory in this scenario.

Streaming I/O processing: An LLM streaming response is fundamentally a continuous byte stream. Go's io.Reader interface and bufio package are the standard tools for this type of stream; standard library support means no extra dependencies.

Single binary deployment: LLM services need multi-region deployment (to reduce latency) and rapid scaling (to handle traffic spikes). Go's single-binary deployment is a major advantage in containerized and Kubernetes environments.

Context propagation: When a user closes their browser tab, the outstanding LLM API request should be immediately cancelled (saving token costs). Go's context.Context mechanism propagates request cancellation from the browser all the way through to the LLM API call, cleanly.

Token Economics: Understanding Cost

LLM APIs bill by the token. One token is approximately 3-4 English characters, or 1.5-2 CJK characters.

Claude 3.5 Sonnet (2024 pricing, for reference):
- Input:  $3 / million tokens
- Output: $15 / million tokens

A typical "summarize article" request:
- Article content:   2000 tokens (input)
- System prompt:      200 tokens (input)
- Summary output:     300 tokens (output)
- Cost: (2200 × $3 + 300 × $15) / 1,000,000 ≈ $0.011

Token cost management is critical when serving B2C applications:


Level 2 · Principles and Mechanisms

Anthropic Claude API: Message Format

The Claude API uses a Messages format supporting multi-turn conversations:

POST https://api.anthropic.com/v1/messages
{
    "model": "claude-opus-4-5",
    "max_tokens": 2048,
    "temperature": 0.7,
    "system": "You are a professional Go programming teacher. Answer questions concisely and accurately.",
    "messages": [
        {"role": "user", "content": "Explain the difference between channels and mutexes in Go"},
        {"role": "assistant", "content": "Channels and mutexes both handle concurrency..."},
        {"role": "user", "content": "Can you give me a concrete code example?"}
    ]
}

Parameter reference:

Differences from OpenAI:

SSE Streaming Responses: The Underlying Mechanism

Server-Sent Events (SSE) is a simple protocol over persistent HTTP connections. The server keeps the connection open and continuously sends formatted text data:

data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}

data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"!"}}

data: [DONE]

SSE format rules:

At the HTTP level, the server must set:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

When Go's net/http client receives this response, it holds the connection open and reads data continuously until the connection closes or the end marker is received.

Exponential Backoff Retry

LLM APIs encounter multiple types of transient errors:

Exponential backoff is the standard strategy for these errors: each retry doubles the wait time, preventing a thundering herd of simultaneous retries hitting the server:

Retry 1: wait 1s
Retry 2: wait 2s
Retry 3: wait 4s
Retry 4: wait 8s
...plus random jitter (±20%) to prevent multiple instances retrying in sync

Retries must distinguish between:

Concurrency Control: The Semaphore Pattern

When multiple goroutines call LLM APIs concurrently, you need to cap the maximum concurrency (otherwise you may trigger rate limits). The most idiomatic Go approach is a buffered channel as a semaphore:

// A channel with capacity 10 allows at most 10 concurrent requests
sem := make(chan struct{}, 10)

// Acquire semaphore (blocks if full)
sem <- struct{}{}

// Do work
doWork()

// Release semaphore
<-sem

This pattern is clean and requires no extra dependencies. In an LLM client, the semaphore size is typically set to the provider's concurrency limit (Anthropic's default is 5 concurrent requests per API key).


Level 3 · Code Practice

Complete Anthropic Claude Client

package llm

import (
    "bufio"
    "bytes"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "io"
    "log"
    "math"
    "math/rand"
    "net/http"
    "strings"
    "time"
)

const anthropicAPIURL = "https://api.anthropic.com/v1/messages"
const anthropicVersion = "2023-06-01"

// Message represents a single message in the conversation
type Message struct {
    Role    string `json:"role"`    // "user" or "assistant"
    Content string `json:"content"`
}

// Request is the body sent to the Claude API
type Request struct {
    Model       string    `json:"model"`
    MaxTokens   int       `json:"max_tokens"`
    System      string    `json:"system,omitempty"`
    Messages    []Message `json:"messages"`
    Stream      bool      `json:"stream"`
    Temperature float64   `json:"temperature,omitempty"`
}

// Usage records token consumption
type Usage struct {
    InputTokens  int `json:"input_tokens"`
    OutputTokens int `json:"output_tokens"`
}

// Response is the structure of a non-streaming response
type Response struct {
    ID      string `json:"id"`
    Content []struct {
        Type string `json:"type"`
        Text string `json:"text"`
    } `json:"content"`
    Usage      Usage  `json:"usage"`
    Model      string `json:"model"`
    StopReason string `json:"stop_reason"`
}

// StreamEvent represents a single event in the streaming response
type StreamEvent struct {
    Type  string `json:"type"`
    Delta *struct {
        Type string `json:"type"`
        Text string `json:"text"`
    } `json:"delta,omitempty"`
    Usage *Usage `json:"usage,omitempty"`
    Error *struct {
        Type    string `json:"type"`
        Message string `json:"message"`
    } `json:"error,omitempty"`
}

// Client is the Anthropic API client
type Client struct {
    apiKey     string
    httpClient *http.Client
    semaphore  chan struct{} // concurrency control semaphore
    maxRetries int
}

// NewClient creates a new Client
func NewClient(apiKey string, maxConcurrency, maxRetries int) *Client {
    return &Client{
        apiKey: apiKey,
        httpClient: &http.Client{
            Timeout: 5 * time.Minute, // LLM requests can be slow
        },
        semaphore:  make(chan struct{}, maxConcurrency),
        maxRetries: maxRetries,
    }
}

// Complete sends a non-streaming request and returns the full response
func (c *Client) Complete(ctx context.Context, req Request) (*Response, error) {
    req.Stream = false

    // Acquire semaphore
    select {
    case c.semaphore <- struct{}{}:
        defer func() { <-c.semaphore }()
    case <-ctx.Done():
        return nil, ctx.Err()
    }

    var resp *Response
    err := c.withRetry(ctx, func() error {
        var err error
        resp, err = c.doComplete(ctx, req)
        return err
    })
    return resp, err
}

func (c *Client) doComplete(ctx context.Context, req Request) (*Response, error) {
    body, err := json.Marshal(req)
    if err != nil {
        return nil, fmt.Errorf("marshal request: %w", err)
    }

    httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
    if err != nil {
        return nil, fmt.Errorf("create request: %w", err)
    }

    c.setHeaders(httpReq)

    httpResp, err := c.httpClient.Do(httpReq)
    if err != nil {
        return nil, fmt.Errorf("send request: %w", err)
    }
    defer httpResp.Body.Close()

    if err := c.checkStatusCode(httpResp); err != nil {
        return nil, err
    }

    var resp Response
    if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
        return nil, fmt.Errorf("decode response: %w", err)
    }
    return &resp, nil
}

// StreamComplete sends a streaming request and returns each text fragment via channel
func (c *Client) StreamComplete(ctx context.Context, req Request) (<-chan string, <-chan error) {
    textCh := make(chan string, 100) // buffered to prevent backpressure
    errCh := make(chan error, 1)

    req.Stream = true

    go func() {
        defer close(textCh)
        defer close(errCh)

        // Acquire semaphore
        select {
        case c.semaphore <- struct{}{}:
            defer func() { <-c.semaphore }()
        case <-ctx.Done():
            errCh <- ctx.Err()
            return
        }

        err := c.withRetry(ctx, func() error {
            return c.doStream(ctx, req, textCh)
        })
        if err != nil {
            errCh <- err
        }
    }()

    return textCh, errCh
}

func (c *Client) doStream(ctx context.Context, req Request, textCh chan<- string) error {
    body, err := json.Marshal(req)
    if err != nil {
        return fmt.Errorf("marshal request: %w", err)
    }

    httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
    if err != nil {
        return fmt.Errorf("create request: %w", err)
    }

    c.setHeaders(httpReq)

    httpResp, err := c.httpClient.Do(httpReq)
    if err != nil {
        return fmt.Errorf("send request: %w", err)
    }
    defer httpResp.Body.Close()

    if err := c.checkStatusCode(httpResp); err != nil {
        return err
    }

    // Parse the SSE stream line by line
    scanner := bufio.NewScanner(httpResp.Body)
    // Enlarge buffer to prevent truncation of long lines
    scanner.Buffer(make([]byte, 64*1024), 64*1024)

    for scanner.Scan() {
        line := scanner.Text()

        // SSE format: lines start with "data: "
        if !strings.HasPrefix(line, "data: ") {
            continue
        }

        data := strings.TrimPrefix(line, "data: ")

        // End-of-stream marker
        if data == "[DONE]" {
            return nil
        }

        var event StreamEvent
        if err := json.Unmarshal([]byte(data), &event); err != nil {
            // Some lines may be heartbeats or have other formats; skip them
            continue
        }

        // Handle error events
        if event.Type == "error" && event.Error != nil {
            return fmt.Errorf("stream error: %s: %s", event.Error.Type, event.Error.Message)
        }

        // Extract text content
        if event.Type == "content_block_delta" &&
            event.Delta != nil &&
            event.Delta.Type == "text_delta" {
            select {
            case textCh <- event.Delta.Text:
            case <-ctx.Done():
                return ctx.Err()
            }
        }
    }

    if err := scanner.Err(); err != nil {
        return fmt.Errorf("scan stream: %w", err)
    }
    return nil
}

func (c *Client) setHeaders(req *http.Request) {
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("x-api-key", c.apiKey)
    req.Header.Set("anthropic-version", anthropicVersion)
}

// APIError represents an API error with retry policy information
type APIError struct {
    StatusCode int
    Message    string
    Retryable  bool
}

func (e *APIError) Error() string {
    return fmt.Sprintf("API error %d: %s", e.StatusCode, e.Message)
}

func (c *Client) checkStatusCode(resp *http.Response) error {
    if resp.StatusCode == http.StatusOK {
        return nil
    }

    body, _ := io.ReadAll(resp.Body)

    retryable := resp.StatusCode == 429 ||
        resp.StatusCode == 529 ||
        resp.StatusCode >= 500

    return &APIError{
        StatusCode: resp.StatusCode,
        Message:    string(body),
        Retryable:  retryable,
    }
}

// withRetry retries with exponential backoff and jitter
func (c *Client) withRetry(ctx context.Context, fn func() error) error {
    var lastErr error

    for attempt := 0; attempt <= c.maxRetries; attempt++ {
        if attempt > 0 {
            // base * 2^(attempt-1) + jitter
            base := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
            jitter := time.Duration(rand.Int63n(int64(base / 5)))
            waitTime := base + jitter

            if waitTime > 60*time.Second {
                waitTime = 60 * time.Second
            }

            select {
            case <-time.After(waitTime):
            case <-ctx.Done():
                return ctx.Err()
            }
        }

        lastErr = fn()
        if lastErr == nil {
            return nil
        }

        // Check if retryable
        var apiErr *APIError
        if errors.As(lastErr, &apiErr) && !apiErr.Retryable {
            return lastErr // Non-retryable error: return immediately
        }

        if attempt < c.maxRetries {
            log.Printf("attempt %d failed: %v, retrying...", attempt+1, lastErr)
        }
    }

    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

Proxying LLM Streaming Responses to the Browser

package handler

import (
    "encoding/json"
    "fmt"
    "net/http"
)

type CompletionRequest struct {
    Messages []llm.Message `json:"messages"`
    System   string        `json:"system"`
}

// StreamHandler forwards LLM SSE responses to the browser in real time
func (h *Handler) StreamHandler(w http.ResponseWriter, r *http.Request) {
    var req CompletionRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid request", http.StatusBadRequest)
        return
    }

    // JWT middleware has already validated auth; get user ID for rate limiting
    claims, ok := middleware.GetClaims(r.Context())
    if !ok {
        http.Error(w, "unauthorized", http.StatusUnauthorized)
        return
    }

    // Per-user rate limiting to prevent excessive token consumption
    if !h.userRateLimiter.Allow(claims.UserID) {
        http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
        return
    }

    // Set SSE response headers
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    // Obtain http.Flusher to immediately push data to the client
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }

    llmReq := llm.Request{
        Model:     "claude-opus-4-5",
        MaxTokens: 2048,
        System:    req.System,
        Messages:  req.Messages,
    }

    textCh, errCh := h.llmClient.StreamComplete(r.Context(), llmReq)

    // Forward each token to the browser immediately
    for {
        select {
        case text, ok := <-textCh:
            if !ok {
                // Channel closed: streaming complete
                fmt.Fprintf(w, "data: [DONE]\n\n")
                flusher.Flush()
                return
            }

            data, _ := json.Marshal(map[string]string{"text": text})
            fmt.Fprintf(w, "data: %s\n\n", data)
            flusher.Flush() // Push immediately without waiting for buffer to fill

        case err := <-errCh:
            if err != nil {
                errData, _ := json.Marshal(map[string]string{"error": err.Error()})
                fmt.Fprintf(w, "event: error\ndata: %s\n\n", errData)
                flusher.Flush()
            }
            return

        case <-r.Context().Done():
            // Client disconnected (closed browser tab, etc.)
            // Context cancellation propagates to the LLM request, stopping token consumption
            return
        }
    }
}

JavaScript client consuming the SSE stream:

async function streamCompletion(messages) {
    const response = await fetch('/api/stream', {
        method: 'POST',
        headers: {'Content-Type': 'application/json'},
        body: JSON.stringify({messages}),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const {done, value} = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, {stream: true});
        const lines = buffer.split('\n');
        buffer = lines.pop(); // Keep incomplete last line

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data === '[DONE]') return;
                try {
                    const parsed = JSON.parse(data);
                    appendToUI(parsed.text); // Show text as it arrives
                } catch {}
            }
        }
    }
}

Redis Caching for Common Responses

For identical or similar prompts, caching responses reduces latency and cost:

package cache

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "time"

    "github.com/redis/go-redis/v9"
)

type LLMCache struct {
    redis *redis.Client
    ttl   time.Duration
}

func NewLLMCache(redis *redis.Client, ttl time.Duration) *LLMCache {
    return &LLMCache{redis: redis, ttl: ttl}
}

// cacheKey generates a cache key from the request content.
// Only cache deterministic requests (temperature=0).
func (c *LLMCache) cacheKey(req llm.Request) string {
    data, _ := json.Marshal(req)
    hash := sha256.Sum256(data)
    return "llm:cache:" + hex.EncodeToString(hash[:])
}

// Get attempts to retrieve a cached response
func (c *LLMCache) Get(ctx context.Context, req llm.Request) (*llm.Response, bool) {
    if req.Temperature > 0 {
        return nil, false
    }

    key := c.cacheKey(req)
    data, err := c.redis.Get(ctx, key).Bytes()
    if err != nil {
        return nil, false
    }

    var resp llm.Response
    if err := json.Unmarshal(data, &resp); err != nil {
        return nil, false
    }
    return &resp, true
}

// Set stores a response in the cache
func (c *LLMCache) Set(ctx context.Context, req llm.Request, resp *llm.Response) error {
    if req.Temperature > 0 {
        return nil // Don't cache non-deterministic responses
    }

    key := c.cacheKey(req)
    data, err := json.Marshal(resp)
    if err != nil {
        return err
    }
    return c.redis.Set(ctx, key, data, c.ttl).Err()
}

// CachedClient wraps the LLM client with a caching layer
type CachedClient struct {
    client *llm.Client
    cache  *LLMCache
}

func (c *CachedClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
    // Check cache first
    if cached, ok := c.cache.Get(ctx, req); ok {
        return cached, nil
    }

    // Cache miss: call the LLM
    resp, err := c.client.Complete(ctx, req)
    if err != nil {
        return nil, err
    }

    // Store asynchronously so the main request isn't blocked
    go func() {
        cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        c.cache.Set(cacheCtx, req, resp)
    }()

    return resp, nil
}

Concurrent LLM Requests

package batch

import (
    "context"
    "fmt"
    "sync"
)

type BatchResult struct {
    Index    int
    Response *llm.Response
    Error    error
}

// BatchComplete concurrently executes multiple LLM requests and returns results in original order
func BatchComplete(ctx context.Context, client *llm.Client, requests []llm.Request) []BatchResult {
    results := make([]BatchResult, len(requests))
    var wg sync.WaitGroup

    for i, req := range requests {
        wg.Add(1)
        go func(idx int, r llm.Request) {
            defer wg.Done()
            resp, err := client.Complete(ctx, r)
            results[idx] = BatchResult{
                Index:    idx,
                Response: resp,
                Error:    err,
            }
        }(i, req)
    }

    wg.Wait()
    return results
}

// summarizeArticles generates summaries for 10 articles concurrently
func summarizeArticles(ctx context.Context, client *llm.Client, articles []string) ([]string, error) {
    requests := make([]llm.Request, len(articles))
    for i, article := range articles {
        requests[i] = llm.Request{
            Model:     "claude-haiku-4-5", // Use faster, cheaper model for batch processing
            MaxTokens: 200,
            System:    "Summarize the following article in 50 words or fewer.",
            Messages: []llm.Message{
                {Role: "user", Content: article},
            },
        }
    }

    results := BatchComplete(ctx, client, requests)

    summaries := make([]string, len(articles))
    for _, result := range results {
        if result.Error != nil {
            return nil, fmt.Errorf("article %d failed: %w", result.Index, result.Error)
        }
        if len(result.Response.Content) > 0 {
            summaries[result.Index] = result.Response.Content[0].Text
        }
    }
    return summaries, nil
}

Level 4 · Advanced and Edge Cases

Multi-Model Unified Abstraction

When your application needs to support multiple LLM providers simultaneously, defining a unified interface is essential:

package llm

import "context"

// Provider is the unified interface for all LLM providers
type Provider interface {
    // Complete sends a synchronous request
    Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error)
    // Stream sends a streaming request
    Stream(ctx context.Context, req UnifiedRequest) (<-chan string, <-chan error)
    // Name returns the provider name (for logging and monitoring)
    Name() string
}

// UnifiedRequest is a cross-provider request format
type UnifiedRequest struct {
    Model       string
    MaxTokens   int
    Temperature float64
    System      string
    Messages    []Message
}

// UnifiedResponse is the unified response format
type UnifiedResponse struct {
    Content      string
    InputTokens  int
    OutputTokens int
    Model        string
    Provider     string
}

// AnthropicProvider implements the Provider interface
type AnthropicProvider struct {
    client *Client
}

func (p *AnthropicProvider) Name() string { return "anthropic" }

func (p *AnthropicProvider) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
    anthropicReq := Request{
        Model:       req.Model,
        MaxTokens:   req.MaxTokens,
        Temperature: req.Temperature,
        System:      req.System,
        Messages:    req.Messages,
    }

    resp, err := p.client.Complete(ctx, anthropicReq)
    if err != nil {
        return nil, err
    }

    content := ""
    if len(resp.Content) > 0 {
        content = resp.Content[0].Text
    }

    return &UnifiedResponse{
        Content:      content,
        InputTokens:  resp.Usage.InputTokens,
        OutputTokens: resp.Usage.OutputTokens,
        Model:        resp.Model,
        Provider:     "anthropic",
    }, nil
}

// Router implements model routing with failover and load balancing
type Router struct {
    providers map[string]Provider
    fallback  []string // Provider names in priority order
}

func NewRouter(providers ...Provider) *Router {
    r := &Router{providers: make(map[string]Provider)}
    for _, p := range providers {
        r.providers[p.Name()] = p
        r.fallback = append(r.fallback, p.Name())
    }
    return r
}

// Complete routes with failover: tries backup providers if the primary fails
func (r *Router) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
    var lastErr error
    for _, providerName := range r.fallback {
        provider := r.providers[providerName]
        resp, err := provider.Complete(ctx, req)
        if err == nil {
            return resp, nil
        }
        lastErr = err
        log.Printf("provider %s failed: %v, trying next...", providerName, err)
    }
    return nil, fmt.Errorf("all providers failed: %w", lastErr)
}

Tool Use (Function Calling)

Modern LLMs support tool use: the model generates structured tool call requests, the application executes the tools, and returns results to the model:

package tools

// Tool defines a tool that can be called by the LLM
type Tool struct {
    Name        string          `json:"name"`
    Description string          `json:"description"`
    InputSchema json.RawMessage `json:"input_schema"`
}

var availableTools = []Tool{
    {
        Name:        "get_weather",
        Description: "Get the current weather for a specified city",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "City name"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["city"]
        }`),
    },
    {
        Name:        "search_database",
        Description: "Search the product database",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "limit": {"type": "integer", "default": 10}
            },
            "required": ["query"]
        }`),
    },
}

// ToolCallLoop implements the complete tool use cycle
func ToolCallLoop(ctx context.Context, client *llm.Client, userMessage string) (string, error) {
    messages := []llm.Message{
        {Role: "user", Content: userMessage},
    }

    // Maximum 10 iterations to prevent infinite loops
    for i := 0; i < 10; i++ {
        req := llm.Request{
            Model:     "claude-opus-4-5",
            MaxTokens: 4096,
            Messages:  messages,
            Tools:     availableTools,
        }

        resp, err := client.Complete(ctx, req)
        if err != nil {
            return "", err
        }

        if resp.StopReason == "tool_use" {
            // Execute all tool calls
            toolResults := executeToolCalls(ctx, resp.Content)

            // Append tool results to conversation history
            messages = append(messages,
                llm.Message{Role: "assistant", Content: resp.Content}, // model's tool call request
                llm.Message{Role: "user", Content: toolResults},       // tool execution results
            )
            continue
        }

        // Model returned a final text response
        if len(resp.Content) > 0 {
            return resp.Content[0].Text, nil
        }
    }

    return "", fmt.Errorf("tool call loop exceeded maximum iterations")
}

Prompt Templating with text/template

package prompt

import (
    "bytes"
    "fmt"
    "text/template"
)

// PromptTemplate manages prompt templates
type PromptTemplate struct {
    tmpl *template.Template
}

func NewPromptTemplate(name, tmplStr string) (*PromptTemplate, error) {
    tmpl, err := template.New(name).Parse(tmplStr)
    if err != nil {
        return nil, fmt.Errorf("parse template %s: %w", name, err)
    }
    return &PromptTemplate{tmpl: tmpl}, nil
}

func (p *PromptTemplate) Render(data any) (string, error) {
    var buf bytes.Buffer
    if err := p.tmpl.Execute(&buf, data); err != nil {
        return "", fmt.Errorf("render template: %w", err)
    }
    return buf.String(), nil
}

var articleSummaryTemplate = `Please summarize the following {{.Language}} article in {{.MaxWords}} words or fewer.

Requirements:
{{range .Requirements}}
- {{.}}
{{end}}

Article content:
{{.Content}}`

func SummarizeArticle(ctx context.Context, client *llm.Client, article string) (string, error) {
    tmpl, _ := NewPromptTemplate("summary", articleSummaryTemplate)
    prompt, err := tmpl.Render(map[string]any{
        "Language": "English",
        "MaxWords": 100,
        "Requirements": []string{
            "Preserve core arguments",
            "Use objective tone",
            "Exclude subjective evaluation",
        },
        "Content": article,
    })
    if err != nil {
        return "", err
    }

    resp, err := client.Complete(ctx, llm.Request{
        Model:     "claude-haiku-4-5",
        MaxTokens: 300,
        Messages:  []llm.Message{{Role: "user", Content: prompt}},
    })
    if err != nil {
        return "", err
    }
    return resp.Content[0].Text, nil
}

Testing LLM Calls with Recorded Fixtures

Testing LLM calls faces two challenges: (1) real calls have latency and cost; (2) responses are non-deterministic, making assertions difficult. The solution is a Record-Replay pattern:

package testing

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "os"
    "path/filepath"
    "testing"
)

// RecordingClient wraps the real client to record request/response pairs to disk
type RecordingClient struct {
    real       *llm.Client
    fixtureDir string
    record     bool // true=record mode, false=replay mode
}

func (c *RecordingClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
    fixturePath := c.fixturePath(req)

    if !c.record {
        // Replay mode: read the recorded response from disk
        data, err := os.ReadFile(fixturePath)
        if err != nil {
            return nil, fmt.Errorf("fixture not found at %s (run with -record to record): %w",
                fixturePath, err)
        }
        var resp llm.Response
        if err := json.Unmarshal(data, &resp); err != nil {
            return nil, err
        }
        return &resp, nil
    }

    // Record mode: call the real API and save the response
    resp, err := c.real.Complete(ctx, req)
    if err != nil {
        return nil, err
    }

    data, _ := json.MarshalIndent(resp, "", "  ")
    os.MkdirAll(filepath.Dir(fixturePath), 0755)
    os.WriteFile(fixturePath, data, 0644)

    return resp, nil
}

func (c *RecordingClient) fixturePath(req llm.Request) string {
    data, _ := json.Marshal(req)
    hash := sha256.Sum256(data)
    return filepath.Join(c.fixtureDir, hex.EncodeToString(hash[:16])+".json")
}

// TestSummarize uses a fixture for deterministic testing
func TestSummarize(t *testing.T) {
    record := os.Getenv("RECORD_FIXTURES") == "true"

    client := &RecordingClient{
        real:       llm.NewClient(os.Getenv("ANTHROPIC_API_KEY"), 5, 3),
        fixtureDir: "testdata/fixtures",
        record:     record,
    }

    result, err := SummarizeArticle(context.Background(), client, "Go was created by Google...")
    if err != nil {
        t.Fatal(err)
    }

    if len(result) == 0 {
        t.Error("expected non-empty summary")
    }
}
# Record fixtures (only needed the first time or when prompts change)
RECORD_FIXTURES=true ANTHROPIC_API_KEY=sk-... go test ./...

# Daily runs: use recorded fixtures — no API key, no cost, extremely fast
go test ./...

Cost Tracking Middleware

package cost

import (
    "context"
    "database/sql"
    "log"
    "sync/atomic"
    "time"
)

// CostTracker tracks token usage and estimated cost for each LLM call
type CostTracker struct {
    db                *sql.DB
    totalInputTokens  atomic.Int64
    totalOutputTokens atomic.Int64
}

// ModelPricing stores price per million tokens in USD
var ModelPricing = map[string]struct{ Input, Output float64 }{
    "claude-opus-4-5":   {Input: 15.0, Output: 75.0},
    "claude-sonnet-4-5": {Input: 3.0, Output: 15.0},
    "claude-haiku-4-5":  {Input: 0.25, Output: 1.25},
}

func (t *CostTracker) TrackUsage(ctx context.Context, model string, usage llm.Usage, userID int64) {
    t.totalInputTokens.Add(int64(usage.InputTokens))
    t.totalOutputTokens.Add(int64(usage.OutputTokens))

    pricing, ok := ModelPricing[model]
    if !ok {
        return
    }

    cost := float64(usage.InputTokens)/1e6*pricing.Input +
        float64(usage.OutputTokens)/1e6*pricing.Output

    // Write to database asynchronously to avoid blocking the main request
    go func() {
        _, err := t.db.ExecContext(ctx,
            `INSERT INTO llm_usage (user_id, model, input_tokens, output_tokens, cost_usd, created_at)
             VALUES (?, ?, ?, ?, ?, ?)`,
            userID, model, usage.InputTokens, usage.OutputTokens, cost, time.Now(),
        )
        if err != nil {
            log.Printf("failed to track LLM usage: %v", err)
        }
    }()
}

// GetStats returns usage statistics for the current session
func (t *CostTracker) GetStats() (inputTokens, outputTokens int64) {
    return t.totalInputTokens.Load(), t.totalOutputTokens.Load()
}

Token Budget Management

For applications that must enforce per-user token limits, implement budget management with Redis atomic operations:

package budget

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

var ErrBudgetExceeded = errors.New("token budget exceeded")

type BudgetManager struct {
    redis        *redis.Client
    dailyLimit   int64 // tokens per day per user
    monthlyLimit int64 // tokens per month per user
}

// CheckAndDeduct atomically checks budget and deducts tokens.
// Returns ErrBudgetExceeded if the user has insufficient budget.
func (b *BudgetManager) CheckAndDeduct(ctx context.Context, userID int64, tokens int64) error {
    dayKey := fmt.Sprintf("budget:daily:%d:%s", userID, time.Now().Format("2006-01-02"))
    monthKey := fmt.Sprintf("budget:monthly:%d:%s", userID, time.Now().Format("2006-01"))

    // Use a Lua script for atomicity: check and increment in a single operation
    script := redis.NewScript(`
        local day_key = KEYS[1]
        local month_key = KEYS[2]
        local tokens = tonumber(ARGV[1])
        local daily_limit = tonumber(ARGV[2])
        local monthly_limit = tonumber(ARGV[3])
        
        local day_usage = tonumber(redis.call('GET', day_key) or '0')
        local month_usage = tonumber(redis.call('GET', month_key) or '0')
        
        if day_usage + tokens > daily_limit then
            return -1
        end
        if month_usage + tokens > monthly_limit then
            return -2
        end
        
        redis.call('INCRBY', day_key, tokens)
        redis.call('EXPIRE', day_key, 86400)  -- 24 hours TTL
        redis.call('INCRBY', month_key, tokens)
        redis.call('EXPIRE', month_key, 2678400)  -- 31 days TTL
        return 0
    `)

    result, err := script.Run(ctx, b.redis,
        []string{dayKey, monthKey},
        tokens, b.dailyLimit, b.monthlyLimit,
    ).Int64()
    if err != nil {
        return fmt.Errorf("budget check: %w", err)
    }

    if result == -1 {
        return fmt.Errorf("%w: daily limit of %d tokens reached", ErrBudgetExceeded, b.dailyLimit)
    }
    if result == -2 {
        return fmt.Errorf("%w: monthly limit of %d tokens reached", ErrBudgetExceeded, b.monthlyLimit)
    }

    return nil
}

LLM API integration is one of the most exciting engineering challenges today: you must handle slow network I/O, streaming protocol parsing, concurrency control, cost management, and unpredictable model behavior—and these happen to be exactly the scenarios Go excels at. The client library built in this chapter covers the core requirements of a production LLM service: reliable retry logic, low-latency streaming proxying, multi-model abstraction, and a testable architecture. The next step is integrating these components into real product features. Remember: LLMs are tools, not magic—correct engineering practices turn them into reliable services, not random black boxes.

Rate this chapter
4.7  / 5  (3 ratings)

💬 Comments