Calling LLM APIs: Streaming and Error Handling
Calling LLM APIs: Streaming and Error Handling
In March 2023, the ChatGPT API opened to the public. Within three months, tens of thousands of developers began building LLM-powered applications. Most first versions shared the same problem: the user clicked "Generate," then waited—8 seconds, 15 seconds, sometimes 30—blank screen, then all the text appeared at once. The experience was counterintuitive, because the LLM itself generates output token by token. The real problem was: developers were waiting for the complete response before sending anything to the client.
Go is the ideal language to solve this problem. The http.Flusher interface from net/http, bufio.Scanner's streaming reads, the lightweight concurrency of goroutines—these tools combined allow Go to forward every LLM token to the browser in real time with minimal latency, while gracefully handling timeouts, retries, and concurrency limits.
This chapter builds a production-grade LLM client library, covering everything from basic Anthropic Claude API calls to a full multi-model abstraction layer.
Level 1 · What You Need to Know
The LLM API Landscape
The major LLM API providers and their Go ecosystem:
Anthropic Claude: Founded by former OpenAI researchers, renowned for safety and long context windows. Claude 3.5 Sonnet excels at coding, analysis, and writing. The API uses a Messages format supporting system prompts, multi-turn conversations, tool use, and streaming output.
OpenAI GPT: The most widely used LLM API; GPT-4o is its flagship model. There is an official Go SDK (github.com/openai/openai-go). The Chat Completions API shares design goals with Claude's but differs in details.
Google Gemini: Google's multimodal model supporting text, image, video, and audio input. Accessed via google.golang.org/genai.
Local Models (Ollama): ollama.com lets you run Llama 3, Mistral, and other open-source models locally. Ollama exposes an OpenAI-compatible API; Go code requires almost no changes to switch.
Why Go Is Exceptional for LLM Services
Building LLM-powered services poses unique challenges that Go's design directly addresses:
High latency and concurrency demands: LLM calls typically take 3-30 seconds, far slower than conventional API calls. Under high traffic, many requests are simultaneously in the "waiting for LLM response" state. Go's goroutines (each with ~8KB initial stack) let you maintain tens of thousands of concurrent LLM requests, whereas thread-based languages would exhaust memory in this scenario.
Streaming I/O processing: An LLM streaming response is fundamentally a continuous byte stream. Go's io.Reader interface and bufio package are the standard tools for this type of stream; standard library support means no extra dependencies.
Single binary deployment: LLM services need multi-region deployment (to reduce latency) and rapid scaling (to handle traffic spikes). Go's single-binary deployment is a major advantage in containerized and Kubernetes environments.
Context propagation: When a user closes their browser tab, the outstanding LLM API request should be immediately cancelled (saving token costs). Go's context.Context mechanism propagates request cancellation from the browser all the way through to the LLM API call, cleanly.
Token Economics: Understanding Cost
LLM APIs bill by the token. One token is approximately 3-4 English characters, or 1.5-2 CJK characters.
Claude 3.5 Sonnet (2024 pricing, for reference):
- Input: $3 / million tokens
- Output: $15 / million tokens
A typical "summarize article" request:
- Article content: 2000 tokens (input)
- System prompt: 200 tokens (input)
- Summary output: 300 tokens (output)
- Cost: (2200 × $3 + 300 × $15) / 1,000,000 ≈ $0.011
Token cost management is critical when serving B2C applications:
- Set a reasonable
max_tokens: Prevents unbounded generation (saves output tokens) - Cache system prompts: Anthropic supports Prompt Caching—repeated system prompts are only billed once
- Log per-request token usage: For cost attribution and usage alerting
Level 2 · Principles and Mechanisms
Anthropic Claude API: Message Format
The Claude API uses a Messages format supporting multi-turn conversations:
POST https://api.anthropic.com/v1/messages
{
"model": "claude-opus-4-5",
"max_tokens": 2048,
"temperature": 0.7,
"system": "You are a professional Go programming teacher. Answer questions concisely and accurately.",
"messages": [
{"role": "user", "content": "Explain the difference between channels and mutexes in Go"},
{"role": "assistant", "content": "Channels and mutexes both handle concurrency..."},
{"role": "user", "content": "Can you give me a concrete code example?"}
]
}
Parameter reference:
model: Model ID. Different models trade off speed, quality, and price.max_tokens: Maximum output token count. Must be set explicitly—Claude has no default unlike some APIs.temperature(0-1): Controls randomness. 0 = deterministic output (good for code generation), 1 = maximum creativity (good for writing).system: System prompt setting the AI's role and behavior rules, separate from the conversation history.
Differences from OpenAI:
- Claude uses
systemas an independent field, not arole: systemmessage inside the messages array - Claude does not support
functions(old format); it usestools(tool use) - Claude's streaming response uses SSE (Server-Sent Events) format with richer event types than OpenAI
SSE Streaming Responses: The Underlying Mechanism
Server-Sent Events (SSE) is a simple protocol over persistent HTTP connections. The server keeps the connection open and continuously sends formatted text data:
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"!"}}
data: [DONE]
SSE format rules:
- Each event starts with one or more
field: valuelines - Events are separated by blank lines
- The
data:field contains event payload - The
event:field specifies event type (used more extensively in Claude's API) [DONE]marks the end of the stream
At the HTTP level, the server must set:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
When Go's net/http client receives this response, it holds the connection open and reads data continuously until the connection closes or the end marker is received.
Exponential Backoff Retry
LLM APIs encounter multiple types of transient errors:
429 Too Many Requests: Rate limit exceeded, must wait529 Overloaded(Anthropic-specific): API overloaded500/502/503: Transient server-side failures
Exponential backoff is the standard strategy for these errors: each retry doubles the wait time, preventing a thundering herd of simultaneous retries hitting the server:
Retry 1: wait 1s
Retry 2: wait 2s
Retry 3: wait 4s
Retry 4: wait 8s
...plus random jitter (±20%) to prevent multiple instances retrying in sync
Retries must distinguish between:
- Retryable errors: 429, 529, 500, 502, 503, network timeouts
- Non-retryable errors: 400 (malformed request), 401 (authentication failure), 404 (model not found)
Concurrency Control: The Semaphore Pattern
When multiple goroutines call LLM APIs concurrently, you need to cap the maximum concurrency (otherwise you may trigger rate limits). The most idiomatic Go approach is a buffered channel as a semaphore:
// A channel with capacity 10 allows at most 10 concurrent requests
sem := make(chan struct{}, 10)
// Acquire semaphore (blocks if full)
sem <- struct{}{}
// Do work
doWork()
// Release semaphore
<-sem
This pattern is clean and requires no extra dependencies. In an LLM client, the semaphore size is typically set to the provider's concurrency limit (Anthropic's default is 5 concurrent requests per API key).
Level 3 · Code Practice
Complete Anthropic Claude Client
package llm
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
"math/rand"
"net/http"
"strings"
"time"
)
const anthropicAPIURL = "https://api.anthropic.com/v1/messages"
const anthropicVersion = "2023-06-01"
// Message represents a single message in the conversation
type Message struct {
Role string `json:"role"` // "user" or "assistant"
Content string `json:"content"`
}
// Request is the body sent to the Claude API
type Request struct {
Model string `json:"model"`
MaxTokens int `json:"max_tokens"`
System string `json:"system,omitempty"`
Messages []Message `json:"messages"`
Stream bool `json:"stream"`
Temperature float64 `json:"temperature,omitempty"`
}
// Usage records token consumption
type Usage struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
}
// Response is the structure of a non-streaming response
type Response struct {
ID string `json:"id"`
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"content"`
Usage Usage `json:"usage"`
Model string `json:"model"`
StopReason string `json:"stop_reason"`
}
// StreamEvent represents a single event in the streaming response
type StreamEvent struct {
Type string `json:"type"`
Delta *struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"delta,omitempty"`
Usage *Usage `json:"usage,omitempty"`
Error *struct {
Type string `json:"type"`
Message string `json:"message"`
} `json:"error,omitempty"`
}
// Client is the Anthropic API client
type Client struct {
apiKey string
httpClient *http.Client
semaphore chan struct{} // concurrency control semaphore
maxRetries int
}
// NewClient creates a new Client
func NewClient(apiKey string, maxConcurrency, maxRetries int) *Client {
return &Client{
apiKey: apiKey,
httpClient: &http.Client{
Timeout: 5 * time.Minute, // LLM requests can be slow
},
semaphore: make(chan struct{}, maxConcurrency),
maxRetries: maxRetries,
}
}
// Complete sends a non-streaming request and returns the full response
func (c *Client) Complete(ctx context.Context, req Request) (*Response, error) {
req.Stream = false
// Acquire semaphore
select {
case c.semaphore <- struct{}{}:
defer func() { <-c.semaphore }()
case <-ctx.Done():
return nil, ctx.Err()
}
var resp *Response
err := c.withRetry(ctx, func() error {
var err error
resp, err = c.doComplete(ctx, req)
return err
})
return resp, err
}
func (c *Client) doComplete(ctx context.Context, req Request) (*Response, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
httpResp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("send request: %w", err)
}
defer httpResp.Body.Close()
if err := c.checkStatusCode(httpResp); err != nil {
return nil, err
}
var resp Response
if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &resp, nil
}
// StreamComplete sends a streaming request and returns each text fragment via channel
func (c *Client) StreamComplete(ctx context.Context, req Request) (<-chan string, <-chan error) {
textCh := make(chan string, 100) // buffered to prevent backpressure
errCh := make(chan error, 1)
req.Stream = true
go func() {
defer close(textCh)
defer close(errCh)
// Acquire semaphore
select {
case c.semaphore <- struct{}{}:
defer func() { <-c.semaphore }()
case <-ctx.Done():
errCh <- ctx.Err()
return
}
err := c.withRetry(ctx, func() error {
return c.doStream(ctx, req, textCh)
})
if err != nil {
errCh <- err
}
}()
return textCh, errCh
}
func (c *Client) doStream(ctx context.Context, req Request, textCh chan<- string) error {
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", anthropicAPIURL, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
httpResp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
defer httpResp.Body.Close()
if err := c.checkStatusCode(httpResp); err != nil {
return err
}
// Parse the SSE stream line by line
scanner := bufio.NewScanner(httpResp.Body)
// Enlarge buffer to prevent truncation of long lines
scanner.Buffer(make([]byte, 64*1024), 64*1024)
for scanner.Scan() {
line := scanner.Text()
// SSE format: lines start with "data: "
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
// End-of-stream marker
if data == "[DONE]" {
return nil
}
var event StreamEvent
if err := json.Unmarshal([]byte(data), &event); err != nil {
// Some lines may be heartbeats or have other formats; skip them
continue
}
// Handle error events
if event.Type == "error" && event.Error != nil {
return fmt.Errorf("stream error: %s: %s", event.Error.Type, event.Error.Message)
}
// Extract text content
if event.Type == "content_block_delta" &&
event.Delta != nil &&
event.Delta.Type == "text_delta" {
select {
case textCh <- event.Delta.Text:
case <-ctx.Done():
return ctx.Err()
}
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scan stream: %w", err)
}
return nil
}
func (c *Client) setHeaders(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-api-key", c.apiKey)
req.Header.Set("anthropic-version", anthropicVersion)
}
// APIError represents an API error with retry policy information
type APIError struct {
StatusCode int
Message string
Retryable bool
}
func (e *APIError) Error() string {
return fmt.Sprintf("API error %d: %s", e.StatusCode, e.Message)
}
func (c *Client) checkStatusCode(resp *http.Response) error {
if resp.StatusCode == http.StatusOK {
return nil
}
body, _ := io.ReadAll(resp.Body)
retryable := resp.StatusCode == 429 ||
resp.StatusCode == 529 ||
resp.StatusCode >= 500
return &APIError{
StatusCode: resp.StatusCode,
Message: string(body),
Retryable: retryable,
}
}
// withRetry retries with exponential backoff and jitter
func (c *Client) withRetry(ctx context.Context, fn func() error) error {
var lastErr error
for attempt := 0; attempt <= c.maxRetries; attempt++ {
if attempt > 0 {
// base * 2^(attempt-1) + jitter
base := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
jitter := time.Duration(rand.Int63n(int64(base / 5)))
waitTime := base + jitter
if waitTime > 60*time.Second {
waitTime = 60 * time.Second
}
select {
case <-time.After(waitTime):
case <-ctx.Done():
return ctx.Err()
}
}
lastErr = fn()
if lastErr == nil {
return nil
}
// Check if retryable
var apiErr *APIError
if errors.As(lastErr, &apiErr) && !apiErr.Retryable {
return lastErr // Non-retryable error: return immediately
}
if attempt < c.maxRetries {
log.Printf("attempt %d failed: %v, retrying...", attempt+1, lastErr)
}
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
Proxying LLM Streaming Responses to the Browser
package handler
import (
"encoding/json"
"fmt"
"net/http"
)
type CompletionRequest struct {
Messages []llm.Message `json:"messages"`
System string `json:"system"`
}
// StreamHandler forwards LLM SSE responses to the browser in real time
func (h *Handler) StreamHandler(w http.ResponseWriter, r *http.Request) {
var req CompletionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
// JWT middleware has already validated auth; get user ID for rate limiting
claims, ok := middleware.GetClaims(r.Context())
if !ok {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
// Per-user rate limiting to prevent excessive token consumption
if !h.userRateLimiter.Allow(claims.UserID) {
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
// Set SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Obtain http.Flusher to immediately push data to the client
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
llmReq := llm.Request{
Model: "claude-opus-4-5",
MaxTokens: 2048,
System: req.System,
Messages: req.Messages,
}
textCh, errCh := h.llmClient.StreamComplete(r.Context(), llmReq)
// Forward each token to the browser immediately
for {
select {
case text, ok := <-textCh:
if !ok {
// Channel closed: streaming complete
fmt.Fprintf(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
data, _ := json.Marshal(map[string]string{"text": text})
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush() // Push immediately without waiting for buffer to fill
case err := <-errCh:
if err != nil {
errData, _ := json.Marshal(map[string]string{"error": err.Error()})
fmt.Fprintf(w, "event: error\ndata: %s\n\n", errData)
flusher.Flush()
}
return
case <-r.Context().Done():
// Client disconnected (closed browser tab, etc.)
// Context cancellation propagates to the LLM request, stopping token consumption
return
}
}
}
JavaScript client consuming the SSE stream:
async function streamCompletion(messages) {
const response = await fetch('/api/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({messages}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buffer += decoder.decode(value, {stream: true});
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete last line
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
appendToUI(parsed.text); // Show text as it arrives
} catch {}
}
}
}
}
Redis Caching for Common Responses
For identical or similar prompts, caching responses reduces latency and cost:
package cache
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"time"
"github.com/redis/go-redis/v9"
)
type LLMCache struct {
redis *redis.Client
ttl time.Duration
}
func NewLLMCache(redis *redis.Client, ttl time.Duration) *LLMCache {
return &LLMCache{redis: redis, ttl: ttl}
}
// cacheKey generates a cache key from the request content.
// Only cache deterministic requests (temperature=0).
func (c *LLMCache) cacheKey(req llm.Request) string {
data, _ := json.Marshal(req)
hash := sha256.Sum256(data)
return "llm:cache:" + hex.EncodeToString(hash[:])
}
// Get attempts to retrieve a cached response
func (c *LLMCache) Get(ctx context.Context, req llm.Request) (*llm.Response, bool) {
if req.Temperature > 0 {
return nil, false
}
key := c.cacheKey(req)
data, err := c.redis.Get(ctx, key).Bytes()
if err != nil {
return nil, false
}
var resp llm.Response
if err := json.Unmarshal(data, &resp); err != nil {
return nil, false
}
return &resp, true
}
// Set stores a response in the cache
func (c *LLMCache) Set(ctx context.Context, req llm.Request, resp *llm.Response) error {
if req.Temperature > 0 {
return nil // Don't cache non-deterministic responses
}
key := c.cacheKey(req)
data, err := json.Marshal(resp)
if err != nil {
return err
}
return c.redis.Set(ctx, key, data, c.ttl).Err()
}
// CachedClient wraps the LLM client with a caching layer
type CachedClient struct {
client *llm.Client
cache *LLMCache
}
func (c *CachedClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
// Check cache first
if cached, ok := c.cache.Get(ctx, req); ok {
return cached, nil
}
// Cache miss: call the LLM
resp, err := c.client.Complete(ctx, req)
if err != nil {
return nil, err
}
// Store asynchronously so the main request isn't blocked
go func() {
cacheCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.cache.Set(cacheCtx, req, resp)
}()
return resp, nil
}
Concurrent LLM Requests
package batch
import (
"context"
"fmt"
"sync"
)
type BatchResult struct {
Index int
Response *llm.Response
Error error
}
// BatchComplete concurrently executes multiple LLM requests and returns results in original order
func BatchComplete(ctx context.Context, client *llm.Client, requests []llm.Request) []BatchResult {
results := make([]BatchResult, len(requests))
var wg sync.WaitGroup
for i, req := range requests {
wg.Add(1)
go func(idx int, r llm.Request) {
defer wg.Done()
resp, err := client.Complete(ctx, r)
results[idx] = BatchResult{
Index: idx,
Response: resp,
Error: err,
}
}(i, req)
}
wg.Wait()
return results
}
// summarizeArticles generates summaries for 10 articles concurrently
func summarizeArticles(ctx context.Context, client *llm.Client, articles []string) ([]string, error) {
requests := make([]llm.Request, len(articles))
for i, article := range articles {
requests[i] = llm.Request{
Model: "claude-haiku-4-5", // Use faster, cheaper model for batch processing
MaxTokens: 200,
System: "Summarize the following article in 50 words or fewer.",
Messages: []llm.Message{
{Role: "user", Content: article},
},
}
}
results := BatchComplete(ctx, client, requests)
summaries := make([]string, len(articles))
for _, result := range results {
if result.Error != nil {
return nil, fmt.Errorf("article %d failed: %w", result.Index, result.Error)
}
if len(result.Response.Content) > 0 {
summaries[result.Index] = result.Response.Content[0].Text
}
}
return summaries, nil
}
Level 4 · Advanced and Edge Cases
Multi-Model Unified Abstraction
When your application needs to support multiple LLM providers simultaneously, defining a unified interface is essential:
package llm
import "context"
// Provider is the unified interface for all LLM providers
type Provider interface {
// Complete sends a synchronous request
Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error)
// Stream sends a streaming request
Stream(ctx context.Context, req UnifiedRequest) (<-chan string, <-chan error)
// Name returns the provider name (for logging and monitoring)
Name() string
}
// UnifiedRequest is a cross-provider request format
type UnifiedRequest struct {
Model string
MaxTokens int
Temperature float64
System string
Messages []Message
}
// UnifiedResponse is the unified response format
type UnifiedResponse struct {
Content string
InputTokens int
OutputTokens int
Model string
Provider string
}
// AnthropicProvider implements the Provider interface
type AnthropicProvider struct {
client *Client
}
func (p *AnthropicProvider) Name() string { return "anthropic" }
func (p *AnthropicProvider) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
anthropicReq := Request{
Model: req.Model,
MaxTokens: req.MaxTokens,
Temperature: req.Temperature,
System: req.System,
Messages: req.Messages,
}
resp, err := p.client.Complete(ctx, anthropicReq)
if err != nil {
return nil, err
}
content := ""
if len(resp.Content) > 0 {
content = resp.Content[0].Text
}
return &UnifiedResponse{
Content: content,
InputTokens: resp.Usage.InputTokens,
OutputTokens: resp.Usage.OutputTokens,
Model: resp.Model,
Provider: "anthropic",
}, nil
}
// Router implements model routing with failover and load balancing
type Router struct {
providers map[string]Provider
fallback []string // Provider names in priority order
}
func NewRouter(providers ...Provider) *Router {
r := &Router{providers: make(map[string]Provider)}
for _, p := range providers {
r.providers[p.Name()] = p
r.fallback = append(r.fallback, p.Name())
}
return r
}
// Complete routes with failover: tries backup providers if the primary fails
func (r *Router) Complete(ctx context.Context, req UnifiedRequest) (*UnifiedResponse, error) {
var lastErr error
for _, providerName := range r.fallback {
provider := r.providers[providerName]
resp, err := provider.Complete(ctx, req)
if err == nil {
return resp, nil
}
lastErr = err
log.Printf("provider %s failed: %v, trying next...", providerName, err)
}
return nil, fmt.Errorf("all providers failed: %w", lastErr)
}
Tool Use (Function Calling)
Modern LLMs support tool use: the model generates structured tool call requests, the application executes the tools, and returns results to the model:
package tools
// Tool defines a tool that can be called by the LLM
type Tool struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"input_schema"`
}
var availableTools = []Tool{
{
Name: "get_weather",
Description: "Get the current weather for a specified city",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}`),
},
{
Name: "search_database",
Description: "Search the product database",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["query"]
}`),
},
}
// ToolCallLoop implements the complete tool use cycle
func ToolCallLoop(ctx context.Context, client *llm.Client, userMessage string) (string, error) {
messages := []llm.Message{
{Role: "user", Content: userMessage},
}
// Maximum 10 iterations to prevent infinite loops
for i := 0; i < 10; i++ {
req := llm.Request{
Model: "claude-opus-4-5",
MaxTokens: 4096,
Messages: messages,
Tools: availableTools,
}
resp, err := client.Complete(ctx, req)
if err != nil {
return "", err
}
if resp.StopReason == "tool_use" {
// Execute all tool calls
toolResults := executeToolCalls(ctx, resp.Content)
// Append tool results to conversation history
messages = append(messages,
llm.Message{Role: "assistant", Content: resp.Content}, // model's tool call request
llm.Message{Role: "user", Content: toolResults}, // tool execution results
)
continue
}
// Model returned a final text response
if len(resp.Content) > 0 {
return resp.Content[0].Text, nil
}
}
return "", fmt.Errorf("tool call loop exceeded maximum iterations")
}
Prompt Templating with text/template
package prompt
import (
"bytes"
"fmt"
"text/template"
)
// PromptTemplate manages prompt templates
type PromptTemplate struct {
tmpl *template.Template
}
func NewPromptTemplate(name, tmplStr string) (*PromptTemplate, error) {
tmpl, err := template.New(name).Parse(tmplStr)
if err != nil {
return nil, fmt.Errorf("parse template %s: %w", name, err)
}
return &PromptTemplate{tmpl: tmpl}, nil
}
func (p *PromptTemplate) Render(data any) (string, error) {
var buf bytes.Buffer
if err := p.tmpl.Execute(&buf, data); err != nil {
return "", fmt.Errorf("render template: %w", err)
}
return buf.String(), nil
}
var articleSummaryTemplate = `Please summarize the following {{.Language}} article in {{.MaxWords}} words or fewer.
Requirements:
{{range .Requirements}}
- {{.}}
{{end}}
Article content:
{{.Content}}`
func SummarizeArticle(ctx context.Context, client *llm.Client, article string) (string, error) {
tmpl, _ := NewPromptTemplate("summary", articleSummaryTemplate)
prompt, err := tmpl.Render(map[string]any{
"Language": "English",
"MaxWords": 100,
"Requirements": []string{
"Preserve core arguments",
"Use objective tone",
"Exclude subjective evaluation",
},
"Content": article,
})
if err != nil {
return "", err
}
resp, err := client.Complete(ctx, llm.Request{
Model: "claude-haiku-4-5",
MaxTokens: 300,
Messages: []llm.Message{{Role: "user", Content: prompt}},
})
if err != nil {
return "", err
}
return resp.Content[0].Text, nil
}
Testing LLM Calls with Recorded Fixtures
Testing LLM calls faces two challenges: (1) real calls have latency and cost; (2) responses are non-deterministic, making assertions difficult. The solution is a Record-Replay pattern:
package testing
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
)
// RecordingClient wraps the real client to record request/response pairs to disk
type RecordingClient struct {
real *llm.Client
fixtureDir string
record bool // true=record mode, false=replay mode
}
func (c *RecordingClient) Complete(ctx context.Context, req llm.Request) (*llm.Response, error) {
fixturePath := c.fixturePath(req)
if !c.record {
// Replay mode: read the recorded response from disk
data, err := os.ReadFile(fixturePath)
if err != nil {
return nil, fmt.Errorf("fixture not found at %s (run with -record to record): %w",
fixturePath, err)
}
var resp llm.Response
if err := json.Unmarshal(data, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// Record mode: call the real API and save the response
resp, err := c.real.Complete(ctx, req)
if err != nil {
return nil, err
}
data, _ := json.MarshalIndent(resp, "", " ")
os.MkdirAll(filepath.Dir(fixturePath), 0755)
os.WriteFile(fixturePath, data, 0644)
return resp, nil
}
func (c *RecordingClient) fixturePath(req llm.Request) string {
data, _ := json.Marshal(req)
hash := sha256.Sum256(data)
return filepath.Join(c.fixtureDir, hex.EncodeToString(hash[:16])+".json")
}
// TestSummarize uses a fixture for deterministic testing
func TestSummarize(t *testing.T) {
record := os.Getenv("RECORD_FIXTURES") == "true"
client := &RecordingClient{
real: llm.NewClient(os.Getenv("ANTHROPIC_API_KEY"), 5, 3),
fixtureDir: "testdata/fixtures",
record: record,
}
result, err := SummarizeArticle(context.Background(), client, "Go was created by Google...")
if err != nil {
t.Fatal(err)
}
if len(result) == 0 {
t.Error("expected non-empty summary")
}
}
# Record fixtures (only needed the first time or when prompts change)
RECORD_FIXTURES=true ANTHROPIC_API_KEY=sk-... go test ./...
# Daily runs: use recorded fixtures — no API key, no cost, extremely fast
go test ./...
Cost Tracking Middleware
package cost
import (
"context"
"database/sql"
"log"
"sync/atomic"
"time"
)
// CostTracker tracks token usage and estimated cost for each LLM call
type CostTracker struct {
db *sql.DB
totalInputTokens atomic.Int64
totalOutputTokens atomic.Int64
}
// ModelPricing stores price per million tokens in USD
var ModelPricing = map[string]struct{ Input, Output float64 }{
"claude-opus-4-5": {Input: 15.0, Output: 75.0},
"claude-sonnet-4-5": {Input: 3.0, Output: 15.0},
"claude-haiku-4-5": {Input: 0.25, Output: 1.25},
}
func (t *CostTracker) TrackUsage(ctx context.Context, model string, usage llm.Usage, userID int64) {
t.totalInputTokens.Add(int64(usage.InputTokens))
t.totalOutputTokens.Add(int64(usage.OutputTokens))
pricing, ok := ModelPricing[model]
if !ok {
return
}
cost := float64(usage.InputTokens)/1e6*pricing.Input +
float64(usage.OutputTokens)/1e6*pricing.Output
// Write to database asynchronously to avoid blocking the main request
go func() {
_, err := t.db.ExecContext(ctx,
`INSERT INTO llm_usage (user_id, model, input_tokens, output_tokens, cost_usd, created_at)
VALUES (?, ?, ?, ?, ?, ?)`,
userID, model, usage.InputTokens, usage.OutputTokens, cost, time.Now(),
)
if err != nil {
log.Printf("failed to track LLM usage: %v", err)
}
}()
}
// GetStats returns usage statistics for the current session
func (t *CostTracker) GetStats() (inputTokens, outputTokens int64) {
return t.totalInputTokens.Load(), t.totalOutputTokens.Load()
}
Token Budget Management
For applications that must enforce per-user token limits, implement budget management with Redis atomic operations:
package budget
import (
"context"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ErrBudgetExceeded = errors.New("token budget exceeded")
type BudgetManager struct {
redis *redis.Client
dailyLimit int64 // tokens per day per user
monthlyLimit int64 // tokens per month per user
}
// CheckAndDeduct atomically checks budget and deducts tokens.
// Returns ErrBudgetExceeded if the user has insufficient budget.
func (b *BudgetManager) CheckAndDeduct(ctx context.Context, userID int64, tokens int64) error {
dayKey := fmt.Sprintf("budget:daily:%d:%s", userID, time.Now().Format("2006-01-02"))
monthKey := fmt.Sprintf("budget:monthly:%d:%s", userID, time.Now().Format("2006-01"))
// Use a Lua script for atomicity: check and increment in a single operation
script := redis.NewScript(`
local day_key = KEYS[1]
local month_key = KEYS[2]
local tokens = tonumber(ARGV[1])
local daily_limit = tonumber(ARGV[2])
local monthly_limit = tonumber(ARGV[3])
local day_usage = tonumber(redis.call('GET', day_key) or '0')
local month_usage = tonumber(redis.call('GET', month_key) or '0')
if day_usage + tokens > daily_limit then
return -1
end
if month_usage + tokens > monthly_limit then
return -2
end
redis.call('INCRBY', day_key, tokens)
redis.call('EXPIRE', day_key, 86400) -- 24 hours TTL
redis.call('INCRBY', month_key, tokens)
redis.call('EXPIRE', month_key, 2678400) -- 31 days TTL
return 0
`)
result, err := script.Run(ctx, b.redis,
[]string{dayKey, monthKey},
tokens, b.dailyLimit, b.monthlyLimit,
).Int64()
if err != nil {
return fmt.Errorf("budget check: %w", err)
}
if result == -1 {
return fmt.Errorf("%w: daily limit of %d tokens reached", ErrBudgetExceeded, b.dailyLimit)
}
if result == -2 {
return fmt.Errorf("%w: monthly limit of %d tokens reached", ErrBudgetExceeded, b.monthlyLimit)
}
return nil
}
LLM API integration is one of the most exciting engineering challenges today: you must handle slow network I/O, streaming protocol parsing, concurrency control, cost management, and unpredictable model behavior—and these happen to be exactly the scenarios Go excels at. The client library built in this chapter covers the core requirements of a production LLM service: reliable retry logic, low-latency streaming proxying, multi-model abstraction, and a testable architecture. The next step is integrating these components into real product features. Remember: LLMs are tools, not magic—correct engineering practices turn them into reliable services, not random black boxes.