Multi-Language Clients: redis-py, ioredis and go-redis
Chapter 32: Multi-Language Clients — redis-py, ioredis, and go-redis
Three dominant Redis client libraries span the most popular server-side ecosystems: redis-py for Python (asyncio-native since 4.x), ioredis for Node.js (automatic cluster routing), and go-redis v9 for Go (generic API with hook middleware). This chapter covers connection pooling, pipelining, cluster operation, Lua scripting, and observability integration for each.
32.1 redis-py (Python)
32.1.1 Connection Model and Pool
redis-py's synchronous client uses Python sockets with threading locks. Under multi-threading, a ConnectionPool manages borrow/return lifecycle — each command borrows a connection, executes, then returns it automatically.
import redis
from redis.retry import Retry
from redis.backoff import ExponentialBackoff
# Production-grade pool configuration
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
password='secret',
max_connections=50, # Hard cap on open sockets
socket_connect_timeout=5, # TCP handshake timeout (seconds)
socket_timeout=3, # Per-command read/write timeout
socket_keepalive=True, # Prevent carrier-level idle disconnects
socket_keepalive_options={
'TCP_KEEPIDLE': 30,
'TCP_KEEPINTVL': 10,
'TCP_KEEPCNT': 3,
},
decode_responses=True, # Decode bytes → str automatically
retry_on_timeout=True,
retry=Retry(ExponentialBackoff(), retries=3),
health_check_interval=30, # Validate idle connections every 30 s
)
r = redis.Redis(connection_pool=pool)
The decode_responses trap: the default is False, meaning every return value is bytes. Forgetting to decode is a frequent source of TypeError in mixed codebases. Set decode_responses=True project-wide at initialization time.
32.1.2 Pipeline
# transaction=False: no MULTI/EXEC, only RTT reduction
pipe = r.pipeline(transaction=False)
for i in range(1000):
pipe.set(f'key:{i}', f'value:{i}')
pipe.expire(f'key:{i}', 3600)
results = pipe.execute()
# ['OK', True, 'OK', True, ...] — two entries per key (set + expire)
# Optimistic locking with WATCH + MULTI/EXEC
def transfer(from_key: str, to_key: str, amount: int):
with r.pipeline() as pipe:
while True:
try:
pipe.watch(from_key, to_key)
balance = int(pipe.get(from_key) or 0)
if balance < amount:
raise ValueError('Insufficient funds')
pipe.multi()
pipe.decrby(from_key, amount)
pipe.incrby(to_key, amount)
pipe.execute()
break
except redis.WatchError:
continue # retry optimistic lock
32.1.3 Lua Scripts
# register_script caches the SHA1 hash; subsequent calls use EVALSHA
ATOMIC_DEDUCT = r.register_script("""
local balance = tonumber(redis.call('get', KEYS[1]))
local amount = tonumber(ARGV[1])
if balance == nil then
return redis.error_reply('key not found')
end
if balance < amount then
return redis.error_reply('insufficient balance')
end
redis.call('decrby', KEYS[1], amount)
return balance - amount
""")
new_balance = ATOMIC_DEDUCT(keys=['account:1001'], args=[100])
print(f'New balance: {new_balance}')
32.1.4 asyncio Client (redis-py 4.2+)
aioredis was merged into redis-py in version 4.2. The redis.asyncio module provides a drop-in async interface:
import asyncio
import redis.asyncio as aioredis
async def main():
pool = aioredis.ConnectionPool.from_url(
'redis://localhost:6379',
max_connections=20,
decode_responses=True,
)
r = aioredis.Redis(connection_pool=pool)
# Basic ops
await r.set('session:abc', 'user:1001', ex=3600)
val = await r.get('session:abc')
# Async pipeline
async with r.pipeline(transaction=False) as pipe:
for i in range(100):
pipe.set(f'key:{i}', f'val:{i}', ex=600)
results = await pipe.execute()
# Pub/Sub
pubsub = r.pubsub()
await pubsub.subscribe('orders:new')
async for message in pubsub.listen():
if message['type'] == 'message':
print(f"Order event: {message['data']}")
break
await r.aclose()
asyncio.run(main())
32.1.5 Cluster Mode
from redis.cluster import RedisCluster, ClusterNode
startup_nodes = [
ClusterNode('10.0.0.1', 7000),
ClusterNode('10.0.0.2', 7001),
]
rc = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
password='secret',
skip_full_coverage_check=True, # allow partial coverage (cross-DC deployments)
)
rc.set('user:1001', 'Alice')
rc.get('user:1001')
# Pipeline automatically groups commands by slot/node
pipe = rc.pipeline()
pipe.set('key1', 'val1')
pipe.set('key2', 'val2')
results = pipe.execute()
32.1.6 Common Pitfalls
| Problem | Root cause | Fix |
|---|---|---|
ConnectionError: Too many connections |
max_connections too low or leak |
Increase limit; use context manager |
ResponseError: WRONGTYPE |
Command used against wrong type | Check key type; flush stale keys |
UnicodeDecodeError |
Manual decode on wrongly-encoded bytes | Set decode_responses=True globally |
| Cluster pipeline failure | Non-cluster pipeline with cross-slot keys | Use RedisCluster.pipeline() |
READONLY error |
Write sent to replica | Verify routing; write only to primaries |
32.2 ioredis (Node.js)
32.2.1 Connection Configuration and Reconnection
const Redis = require('ioredis');
const redis = new Redis({
host: 'localhost',
port: 6379,
password: process.env.REDIS_PASSWORD,
db: 0,
connectTimeout: 5000, // TCP connect timeout (ms)
commandTimeout: 3000, // Per-command timeout (ms)
maxRetriesPerRequest: 3, // null = retry indefinitely
enableReadyCheck: true, // Wait for server LOADING to finish
// Exponential backoff reconnection
retryStrategy(times) {
if (times > 10) return null; // give up, emit 'error'
return Math.min(times * 100, 3000);
},
enableOfflineQueue: true, // Buffer commands while disconnected
offlineQueueMaxLen: 1000,
// TLS
tls: process.env.REDIS_TLS === 'true'
? { rejectUnauthorized: true, ca: fs.readFileSync('/path/to/ca.crt') }
: undefined,
});
redis.on('connect', () => console.log('Redis: connected'));
redis.on('ready', () => console.log('Redis: ready'));
redis.on('error', err => console.error('Redis error:', err.message));
redis.on('reconnecting', delay => console.log(`Redis: reconnecting in ${delay}ms`));
redis.on('close', () => console.log('Redis: connection closed'));
32.2.2 Pipeline and MULTI/EXEC
// Pipeline — batches RTT, not atomic
const pipeline = redis.pipeline();
pipeline.set('user:1001:name', 'Alice');
pipeline.set('user:1001:age', '25');
pipeline.expire('user:1001:name', 3600);
pipeline.expire('user:1001:age', 3600);
const results = await pipeline.exec();
// [[null, 'OK'], [null, 'OK'], [null, 1], [null, 1]]
// Format: [error, result] per command
// MULTI/EXEC — atomic
const txResult = await redis
.multi()
.decrby('account:from', 100)
.incrby('account:to', 100)
.exec();
// null if WATCH triggered, otherwise [[null, newFrom], [null, newTo]]
// Optimistic locking
async function transfer(from, to, amount) {
await redis.watch(from);
const balance = parseInt(await redis.get(from));
if (balance < amount) {
await redis.unwatch();
throw new Error('Insufficient funds');
}
const result = await redis
.multi()
.decrby(from, amount)
.incrby(to, amount)
.exec();
if (result === null) return transfer(from, to, amount); // WATCH fired, retry
return result;
}
32.2.3 Lua Scripts
// defineCommand caches the script SHA1 and uses EVALSHA automatically
redis.defineCommand('releaseLock', {
numberOfKeys: 1,
lua: `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
`,
});
const released = await redis.releaseLock('lock:order:1001', 'token-uuid-xyz');
// 1 = released, 0 = token mismatch (not our lock)
// Rate limiter script (sliding window counter)
redis.defineCommand('rateLimit', {
numberOfKeys: 1,
lua: `
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('zremrangebyscore', key, '-inf', now - window)
local count = redis.call('zcard', key)
if count < limit then
redis.call('zadd', key, now, now)
redis.call('pexpire', key, window)
return 1
end
return 0
`,
});
const allowed = await redis.rateLimit(
'rate:user:1001',
100, // max requests
60000, // window (ms)
Date.now()
);
32.2.4 Cluster Mode
const cluster = new Redis.Cluster(
[
{ host: '10.0.0.1', port: 7000 },
{ host: '10.0.0.2', port: 7001 },
{ host: '10.0.0.3', port: 7002 },
],
{
scaleReads: 'slave', // Route reads to replicas
maxRedirections: 16,
retryDelayOnFailover: 100,
retryDelayOnClusterDown: 100,
clusterRetryStrategy: times => Math.min(100 + times * 2, 2000),
redisOptions: {
password: process.env.REDIS_PASSWORD,
connectTimeout: 5000,
},
}
);
// ioredis automatically groups pipeline commands by node
const pipe = cluster.pipeline();
for (let i = 0; i < 200; i++) {
pipe.set(`key:${i}`, `val:${i}`, 'EX', 3600);
}
const results = await pipe.exec();
// Force co-location with hash tags
await cluster.set('{user:1001}:profile', JSON.stringify(profile));
await cluster.set('{user:1001}:settings', JSON.stringify(settings));
// Both keys share slot determined by {user:1001}
32.2.5 Pub/Sub
// Publisher — use a dedicated connection
const pub = new Redis({ host: 'localhost' });
setInterval(() => {
pub.publish('metrics:cpu', JSON.stringify({ value: process.cpuUsage() }));
}, 1000);
// Subscriber — cannot execute normal commands while subscribed
const sub = redis.duplicate(); // clone connection config
await sub.subscribe('metrics:cpu', 'metrics:memory');
sub.on('message', (channel, message) => {
console.log(`[${channel}]`, JSON.parse(message));
});
// Pattern subscribe
await sub.psubscribe('metrics:*');
sub.on('pmessage', (pattern, channel, message) => {
// handle all metrics channels
});
32.3 go-redis (Go)
32.3.1 Generic API and Connection Pool (v9)
go-redis v9 introduced a generic Cmd type system for type-safe return values. The built-in pool tracks idle connections, validates them via PING, and handles automatic reconnection.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
)
func newClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "secret",
DB: 0,
// Pool
PoolSize: 20,
MinIdleConns: 5,
MaxIdleConns: 10,
ConnMaxIdleTime: 5 * time.Minute,
ConnMaxLifetime: 30 * time.Minute,
// Timeouts
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolTimeout: 4 * time.Second,
// Retry
MaxRetries: 3,
MinRetryBackoff: 8 * time.Millisecond,
MaxRetryBackoff: 512 * time.Millisecond,
})
}
func main() {
ctx := context.Background()
rdb := newClient()
defer rdb.Close()
if err := rdb.Ping(ctx).Err(); err != nil {
log.Fatalf("Redis unavailable: %v", err)
}
// Set with TTL
if err := rdb.Set(ctx, "session:abc", "user:1001", time.Hour).Err(); err != nil {
log.Fatal(err)
}
// Get — distinguish nil from error
val, err := rdb.Get(ctx, "session:abc").Result()
switch {
case err == redis.Nil:
fmt.Println("key does not exist")
case err != nil:
log.Fatalf("get error: %v", err)
default:
fmt.Println("session user:", val)
}
}
32.3.2 Pipeline and Transactions
// Pipelined helper — cleaner syntax
results, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i := 0; i < 100; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("val:%d", i), time.Hour)
}
return nil
})
if err != nil {
log.Fatal(err)
}
_ = results
// TxPipelined — wraps in MULTI/EXEC
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.DecrBy(ctx, "account:from", 100)
pipe.IncrBy(ctx, "account:to", 100)
return nil
})
// Optimistic lock with Watch
const maxRetries = 5
for i := 0; i < maxRetries; i++ {
err = rdb.Watch(ctx, func(tx *redis.Tx) error {
balance, err := tx.Get(ctx, "account:from").Int64()
if err != nil {
return err
}
if balance < 100 {
return fmt.Errorf("insufficient funds")
}
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.DecrBy(ctx, "account:from", 100)
pipe.IncrBy(ctx, "account:to", 100)
return nil
})
return err
}, "account:from")
if err == nil {
break
}
if !errors.Is(err, redis.TxFailedErr) {
log.Fatal(err)
}
// TxFailedErr = WATCH key was modified; retry
}
32.3.3 Hook Middleware
The redis.Hook interface intercepts command execution — ideal for Prometheus metrics and OpenTelemetry tracing:
type MetricsHook struct {
duration *prometheus.HistogramVec
errors *prometheus.CounterVec
}
func NewMetricsHook(reg prometheus.Registerer) *MetricsHook {
h := &MetricsHook{
duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "redis_cmd_duration_seconds",
Help: "Redis command latency histogram",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
}, []string{"cmd"}),
errors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "redis_cmd_errors_total",
Help: "Redis command error count",
}, []string{"cmd"}),
}
reg.MustRegister(h.duration, h.errors)
return h
}
func (h *MetricsHook) DialHook(next redis.DialHook) redis.DialHook { return next }
func (h *MetricsHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
start := time.Now()
err := next(ctx, cmd)
h.duration.WithLabelValues(cmd.Name()).Observe(time.Since(start).Seconds())
if err != nil && err != redis.Nil {
h.errors.WithLabelValues(cmd.Name()).Inc()
}
return err
}
}
func (h *MetricsHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
start := time.Now()
err := next(ctx, cmds)
h.duration.WithLabelValues("pipeline").Observe(time.Since(start).Seconds())
return err
}
}
// Register
rdb.AddHook(NewMetricsHook(prometheus.DefaultRegisterer))
32.3.4 Lua Scripts
var releaseLockScript = redis.NewScript(`
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
end
return 0
`)
func releaseLock(ctx context.Context, rdb *redis.Client, key, token string) (bool, error) {
n, err := releaseLockScript.Run(ctx, rdb, []string{key}, token).Int()
return n == 1, err
}
// Sliding window rate limiter
var rateLimitScript = redis.NewScript(`
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('zremrangebyscore', key, '-inf', now - window)
local count = redis.call('zcard', key)
if count < limit then
redis.call('zadd', key, now, now)
redis.call('pexpire', key, window)
return 1
end
return 0
`)
32.3.5 Cluster and Sentinel
// Cluster
clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"10.0.0.1:7000", "10.0.0.2:7001", "10.0.0.3:7002"},
Password: "secret",
RouteByLatency: true,
PoolSize: 10,
MinIdleConns: 2,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
// Sentinel (automatic failover)
sentinelClient := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "mymaster",
SentinelAddrs: []string{"10.0.0.1:26379", "10.0.0.2:26379", "10.0.0.3:26379"},
SentinelPassword: "sentinel-pass",
Password: "redis-pass",
DB: 0,
PoolSize: 20,
MinIdleConns: 5,
})
32.3.6 Pool Monitoring
// Periodically log pool health
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
s := rdb.PoolStats()
hitRate := float64(s.Hits) / float64(s.Hits+s.Misses+1) * 100
log.Printf("Redis pool — total:%d idle:%d stale:%d hit:%.1f%% timeouts:%d",
s.TotalConns, s.IdleConns, s.StaleConns, hitRate, s.Timeouts)
}
}()
32.4 Cross-Client Comparison
| Dimension | redis-py | ioredis | go-redis |
|---|---|---|---|
| Async model | asyncio (4.2+) | Node.js event loop | goroutines |
| Cluster pipeline | Auto slot grouping | Auto node grouping | Auto node grouping |
| Script caching | register_script (SHA) |
defineCommand (SHA) |
redis.NewScript (SHA) |
| Hook middleware | None built-in | Custom middleware | redis.Hook interface |
| Type safety | Dynamic (Python) | Dynamic (JS) | Generics (Go 1.18+) |
| Testing mock | fakeredis-py | ioredis-mock | miniredis |
| Connection pool impl | Pure Python | C++ binding (hiredis optional) | Pure Go |
32.5 Production Tuning Recommendations
Python:
- Combine
redis.asynciowith FastAPI or aiohttp to avoid blocking the event loop. - Never call
r.delete(*r.keys('prefix:*'))in production —KEYSblocks the server. UseSCANwith batched deletes. - Set
health_check_interval=30to detect stale connections before they fail under load.
Node.js:
- Set
lazyConnect: truefor Lambda/serverless functions to defer TCP connection until the first command. - Subscriber connections must be duplicated (
redis.duplicate()) — they cannot run regular commands while subscribed. - In Serverless environments, store the client instance outside the handler function and reuse across invocations.
Go:
- A single
*redis.Clientis goroutine-safe and should be a global singleton per logical Redis endpoint. - Use
rdb.Conn()to obtain a dedicated connection for blocking commands (SUBSCRIBE,BLPOP,BRPOP). - Target pool hit rate above 95%; if below that, increase
PoolSizeor optimize command latency.