Production Best Practices: Multi-Key Rotation, Exponential Backoff, Circuit Breaker Design and Reliability Architecture
Chapter 71: Production Error Handling: Retry Strategies, Fallback Plans, and SLA Guarantees
71.1 Claude API Error Classification
Reliable error handling is the foundation of service stability when integrating the Claude API in production. Claude API errors can be broadly divided into two categories: retryable errors and non-retryable errors. This classification is the prerequisite for designing retry strategies.
HTTP Status Code Overview
# Claude API error code classification (source: Anthropic official documentation)
ERROR_CATEGORIES = {
# 4xx Client Errors (mostly non-retryable)
400: {
"name": "Bad Request",
"retryable": False,
"description": "Malformed request format or invalid parameters",
"action": "Check request parameters and fix before resending"
},
401: {
"name": "Unauthorized",
"retryable": False,
"description": "Invalid or missing API Key",
"action": "Check API Key configuration"
},
403: {
"name": "Forbidden",
"retryable": False,
"description": "No permission to access this resource",
"action": "Check account permissions and usage policies"
},
413: {
"name": "Request Too Large",
"retryable": False,
"description": "Request body exceeds size limit",
"action": "Reduce input content or process in batches"
},
422: {
"name": "Unprocessable Entity",
"retryable": False,
"description": "Parameter format correct but value invalid",
"action": "Check specific error message and correct parameter values"
},
429: {
"name": "Too Many Requests",
"retryable": True, # Rate limited — needs backoff before retry
"description": "Rate limit exceeded (TPM or RPM)",
"action": "Retry with exponential backoff, check retry-after header"
},
# 5xx Server Errors (usually retryable)
500: {
"name": "Internal Server Error",
"retryable": True,
"description": "Anthropic service internal error",
"action": "Retry after brief wait; contact support if persistent"
},
503: {
"name": "Service Unavailable",
"retryable": True,
"description": "Service temporarily unavailable (maintenance or overload)",
"action": "Retry after longer wait"
},
529: {
"name": "Overloaded",
"retryable": True,
"description": "API in high load state (Anthropic custom error code)",
"action": "Retry with exponential backoff"
}
}
71.2 Exponential Backoff Retry Strategy
Basic Backoff Algorithm
Exponential backoff is the standard approach for handling rate limits and transient errors:
import anthropic
import time
import random
import logging
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class RetryConfig:
max_retries: int = 3
base_delay: float = 1.0 # Initial wait time (seconds)
max_delay: float = 60.0 # Maximum wait time (seconds)
exponential_base: float = 2.0 # Exponential base
jitter: bool = True # Whether to add random jitter
retryable_status_codes: set = None
def __post_init__(self):
if self.retryable_status_codes is None:
self.retryable_status_codes = {429, 500, 503, 529}
def calculate_delay(self, attempt: int) -> float:
delay = min(
self.base_delay * (self.exponential_base ** attempt),
self.max_delay
)
if self.jitter:
# Add ±25% random jitter to avoid thundering herd
jitter_range = delay * 0.25
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay)
class ClaudeClientWithRetry:
"""Claude client wrapper with retry logic."""
def __init__(self, api_key: str, retry_config: RetryConfig = None):
self.client = anthropic.Anthropic(api_key=api_key)
self.config = retry_config or RetryConfig()
def create_message(self, **kwargs) -> anthropic.types.Message:
last_exception = None
for attempt in range(self.config.max_retries + 1):
try:
response = self.client.messages.create(**kwargs)
if attempt > 0:
logger.info(f"Request succeeded on attempt {attempt + 1}")
return response
except anthropic.RateLimitError as e:
last_exception = e
if attempt >= self.config.max_retries:
logger.error(f"Rate limit exceeded after {self.config.max_retries} retries")
raise
retry_after = self._get_retry_after(e)
delay = retry_after or self.config.calculate_delay(attempt)
logger.warning(
f"Rate limited (attempt {attempt + 1}/{self.config.max_retries + 1}). "
f"Retrying in {delay:.1f}s"
)
time.sleep(delay)
except anthropic.APIStatusError as e:
last_exception = e
if e.status_code not in self.config.retryable_status_codes:
logger.error(f"Non-retryable error {e.status_code}: {e.message}")
raise
if attempt >= self.config.max_retries:
logger.error(f"API error after {self.config.max_retries} retries: {e}")
raise
delay = self.config.calculate_delay(attempt)
logger.warning(
f"API error {e.status_code} (attempt {attempt + 1}). "
f"Retrying in {delay:.1f}s"
)
time.sleep(delay)
except (anthropic.APIConnectionError, anthropic.APITimeoutError) as e:
last_exception = e
if attempt >= self.config.max_retries:
raise
delay = self.config.calculate_delay(attempt)
logger.warning(f"Connection error (attempt {attempt + 1}). Retrying in {delay:.1f}s")
time.sleep(delay)
raise last_exception
def _get_retry_after(self, error: anthropic.RateLimitError) -> Optional[float]:
try:
headers = error.response.headers
retry_after = headers.get('retry-after') or headers.get('x-ratelimit-reset-requests')
if retry_after:
return float(retry_after)
except Exception:
pass
return None
Async Retry Implementation
For high-concurrency scenarios, use the async version:
import asyncio
class AsyncClaudeClientWithRetry:
def __init__(self, api_key: str, retry_config: RetryConfig = None):
self.client = anthropic.AsyncAnthropic(api_key=api_key)
self.config = retry_config or RetryConfig()
async def create_message(self, **kwargs) -> anthropic.types.Message:
for attempt in range(self.config.max_retries + 1):
try:
return await self.client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
if attempt >= self.config.max_retries:
raise
retry_after = self._get_retry_after(e)
delay = retry_after or self.config.calculate_delay(attempt)
logger.warning(f"Rate limited, waiting {delay:.1f}s (attempt {attempt + 1})")
await asyncio.sleep(delay)
except anthropic.APIStatusError as e:
if e.status_code not in self.config.retryable_status_codes:
raise
if attempt >= self.config.max_retries:
raise
delay = self.config.calculate_delay(attempt)
await asyncio.sleep(delay)
71.3 Circuit Breaker Pattern
Why Circuit Breakers Are Needed
A pure retry strategy has one fatal weakness: when the downstream service (Claude API) is persistently unavailable, retries cause request accumulation — large numbers of requests waiting to retry, consuming connection pool resources, ultimately cascading the entire application to failure.
The circuit breaker pattern gracefully handles this with three states:
CLOSED state → Processes requests normally
↓ error rate exceeds threshold
OPEN state → Fails fast without sending requests
↓ after timeout, enters half-open
HALF_OPEN state → Sends a few probe requests
↓ success → returns to CLOSED ↓ failure → returns to OPEN
import time
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures to trigger opening
success_threshold: int = 2 # Successes to close from half-open
timeout: float = 60.0 # Timeout for open state (seconds)
half_open_max_calls: int = 3 # Max requests allowed in half-open
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig = None):
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time: Optional[float] = None
self.half_open_calls = 0
self._lock = asyncio.Lock()
async def call(self, func, *args, **kwargs):
async with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.success_count = 0
logger.info("Circuit breaker: OPEN → HALF_OPEN")
else:
remaining = self.config.timeout - (time.time() - self.last_failure_time)
raise CircuitBreakerOpenError(
f"Circuit breaker is OPEN. Will try again in {remaining:.0f}s"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.config.half_open_max_calls:
raise CircuitBreakerOpenError("Too many calls in HALF_OPEN state")
self.half_open_calls += 1
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure(e)
raise
async def _on_success(self):
async with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info("Circuit breaker: HALF_OPEN → CLOSED (recovered)")
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
async def _on_failure(self, error: Exception):
async with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
logger.warning("Circuit breaker: HALF_OPEN → OPEN (probe failed)")
elif (self.state == CircuitState.CLOSED and
self.failure_count >= self.config.failure_threshold):
self.state = CircuitState.OPEN
logger.warning(f"Circuit breaker: CLOSED → OPEN ({self.failure_count} failures)")
def _should_attempt_reset(self) -> bool:
return (self.last_failure_time is not None and
time.time() - self.last_failure_time >= self.config.timeout)
class CircuitBreakerOpenError(Exception):
pass
71.4 Fallback Strategy Design
Fallback Strategy Hierarchy
When the Claude API is unavailable, robust fallback plans should serve users gracefully rather than presenting raw errors:
from enum import Enum
class FallbackStrategy(Enum):
CACHED_RESPONSE = "cached_response" # Return cached historical response
SIMPLER_MODEL = "simpler_model" # Switch to simpler/local model
TEMPLATE_RESPONSE = "template_response" # Return preset template response
QUEUE_FOR_LATER = "queue_for_later" # Queue for later processing
GRACEFUL_ERROR = "graceful_error" # User-friendly error message
class ClaudeFallbackManager:
def __init__(self, cache_client, queue_client, fallback_model_client=None):
self.cache = cache_client
self.queue = queue_client
self.fallback_model = fallback_model_client
async def handle_with_fallback(
self,
request_params: dict,
strategy: FallbackStrategy = FallbackStrategy.CACHED_RESPONSE
) -> dict:
cache_key = self._compute_cache_key(request_params)
# Strategy 1: Cached response
if strategy == FallbackStrategy.CACHED_RESPONSE:
cached = await self.cache.get(cache_key)
if cached:
logger.info("Serving cached response due to API unavailability")
return {
"response": cached,
"source": "cache",
"degraded": True,
"note": "This response may not be fully current"
}
# Strategy 2: Switch to fallback model
if strategy == FallbackStrategy.SIMPLER_MODEL and self.fallback_model:
try:
fallback_params = {**request_params, "model": "claude-haiku-3-5"}
response = await self.fallback_model.create(**fallback_params)
return {
"response": response,
"source": "fallback_model",
"degraded": True,
"note": "Using fallback model due to primary service unavailability"
}
except Exception as e:
logger.error(f"Fallback model also failed: {e}")
# Strategy 3: Queue for later
if strategy == FallbackStrategy.QUEUE_FOR_LATER:
job_id = await self.queue.enqueue(request_params)
return {
"response": None,
"source": "queued",
"job_id": job_id,
"degraded": True,
"note": "Request queued for processing. You'll be notified when complete."
}
# Final fallback: graceful error
return {
"response": None,
"source": "error",
"degraded": True,
"error": "AI service is temporarily unavailable. Please try again later."
}
def _compute_cache_key(self, request_params: dict) -> str:
import hashlib, json
normalized = {
"model": request_params.get("model"),
"messages": request_params.get("messages"),
"system": request_params.get("system")
}
return hashlib.md5(
json.dumps(normalized, sort_keys=True).encode()
).hexdigest()
71.5 SLA Guarantees and Monitoring
SLA Metric Definitions
For production-grade Claude API integration, define and monitor these SLA metrics:
from dataclasses import dataclass
@dataclass
class SLATargets:
availability_pct: float = 99.5 # 99.5% availability
p99_latency_ms: float = 10000 # P99 latency 10 seconds
p95_latency_ms: float = 5000 # P95 latency 5 seconds
error_rate_pct: float = 1.0 # Error rate < 1%
max_consecutive_failures: int = 3 # Max consecutive failures
class SLAMonitor:
def __init__(self, targets: SLATargets, window_minutes: int = 60):
self.targets = targets
self.window_minutes = window_minutes
self.requests = []
def record_request(self, latency_ms: float, success: bool, error_code: str = None):
now = time.time()
window_start = now - (self.window_minutes * 60)
self.requests = [r for r in self.requests if r["timestamp"] > window_start]
self.requests.append({
"timestamp": now,
"latency_ms": latency_ms,
"success": success,
"error_code": error_code
})
def get_current_sla(self) -> dict:
if not self.requests:
return {"status": "no_data"}
total = len(self.requests)
successes = sum(1 for r in self.requests if r["success"])
errors = total - successes
latencies = sorted([r["latency_ms"] for r in self.requests if r["success"]])
p95_latency = latencies[int(len(latencies) * 0.95)] if latencies else 0
p99_latency = latencies[int(len(latencies) * 0.99)] if latencies else 0
availability = successes / total * 100
error_rate = errors / total * 100
sla_status = {
"availability_pct": round(availability, 2),
"error_rate_pct": round(error_rate, 2),
"p95_latency_ms": round(p95_latency, 1),
"p99_latency_ms": round(p99_latency, 1),
"sample_size": total,
}
sla_status["violations"] = {
"availability": availability < self.targets.availability_pct,
"p99_latency": p99_latency > self.targets.p99_latency_ms,
"p95_latency": p95_latency > self.targets.p95_latency_ms,
"error_rate": error_rate > self.targets.error_rate_pct
}
sla_status["is_healthy"] = not any(sla_status["violations"].values())
return sla_status
Timeout Configuration Best Practices
TIMEOUT_CONFIGS = {
"interactive": {
"description": "Real-time interactive scenario (chatbot)",
"connect_timeout": 5.0,
"read_timeout": 30.0,
"max_tokens": 1024,
},
"batch_processing": {
"description": "Batch processing scenario (document analysis)",
"connect_timeout": 10.0,
"read_timeout": 300.0,
"max_tokens": 4096,
},
"streaming": {
"description": "Streaming output scenario",
"connect_timeout": 5.0,
"read_timeout": 120.0,
"max_tokens": 8192,
}
}
def create_client_with_timeout(scenario: str) -> anthropic.Anthropic:
config = TIMEOUT_CONFIGS.get(scenario, TIMEOUT_CONFIGS["interactive"])
return anthropic.Anthropic(
timeout=anthropic.Timeout(
connect=config["connect_timeout"],
read=config["read_timeout"],
write=10.0,
pool=5.0
)
)
71.6 Complete Production Error Handling Architecture
class ProductionClaudeService:
"""
Production-grade Claude service wrapper.
Integrates retry, circuit breaker, fallback, and SLA monitoring.
"""
def __init__(self, config: dict):
self.retry_client = ClaudeClientWithRetry(
api_key=config["api_key"],
retry_config=RetryConfig(max_retries=3, base_delay=1.0, max_delay=30.0)
)
self.circuit_breaker = CircuitBreaker(
CircuitBreakerConfig(failure_threshold=5, timeout=60.0)
)
self.fallback_manager = ClaudeFallbackManager(
cache_client=config["cache"],
queue_client=config["queue"]
)
self.sla_monitor = SLAMonitor(targets=SLATargets(), window_minutes=60)
async def complete(self, **kwargs) -> dict:
start_time = time.time()
try:
response = await self.circuit_breaker.call(
self.retry_client.create_message,
**kwargs
)
latency = (time.time() - start_time) * 1000
self.sla_monitor.record_request(latency, success=True)
return {"response": response, "degraded": False}
except CircuitBreakerOpenError:
logger.warning("Circuit breaker open, using fallback")
return await self.fallback_manager.handle_with_fallback(kwargs)
except anthropic.RateLimitError as e:
latency = (time.time() - start_time) * 1000
self.sla_monitor.record_request(latency, success=False, error_code="429")
return await self.fallback_manager.handle_with_fallback(
kwargs, strategy=FallbackStrategy.QUEUE_FOR_LATER
)
except anthropic.APIStatusError as e:
latency = (time.time() - start_time) * 1000
self.sla_monitor.record_request(
latency, success=False, error_code=str(e.status_code)
)
return await self.fallback_manager.handle_with_fallback(kwargs)
Summary
Production error handling is where the engineering maturity of a Claude API integration becomes most visible. Core elements include: correctly distinguishing retryable from non-retryable errors; implementing exponential backoff with jitter to handle rate limits; deploying circuit breakers to prevent cascading failures; preparing multi-tier fallback plans to ensure graceful degradation rather than complete outages; and continuously monitoring SLA metrics for proactive operations.
The investment in building this system pays off completely when production incidents occur — whether Anthropic service instability or sudden traffic spikes. The stability of user experience and the composure of engineers receiving 3 AM alerts both depend on whether these engineering safeguards are in place.