Chapter 32

Multi-Language Clients: redis-py, ioredis and go-redis

Chapter 32: Multi-Language Clients — redis-py, ioredis, and go-redis

Three dominant Redis client libraries span the most popular server-side ecosystems: redis-py for Python (asyncio-native since 4.x), ioredis for Node.js (automatic cluster routing), and go-redis v9 for Go (generic API with hook middleware). This chapter covers connection pooling, pipelining, cluster operation, Lua scripting, and observability integration for each.


32.1 redis-py (Python)

32.1.1 Connection Model and Pool

redis-py's synchronous client uses Python sockets with threading locks. Under multi-threading, a ConnectionPool manages borrow/return lifecycle — each command borrows a connection, executes, then returns it automatically.

import redis
from redis.retry import Retry
from redis.backoff import ExponentialBackoff

# Production-grade pool configuration
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    password='secret',
    max_connections=50,          # Hard cap on open sockets
    socket_connect_timeout=5,    # TCP handshake timeout (seconds)
    socket_timeout=3,            # Per-command read/write timeout
    socket_keepalive=True,       # Prevent carrier-level idle disconnects
    socket_keepalive_options={
        'TCP_KEEPIDLE': 30,
        'TCP_KEEPINTVL': 10,
        'TCP_KEEPCNT': 3,
    },
    decode_responses=True,       # Decode bytes → str automatically
    retry_on_timeout=True,
    retry=Retry(ExponentialBackoff(), retries=3),
    health_check_interval=30,    # Validate idle connections every 30 s
)
r = redis.Redis(connection_pool=pool)

The decode_responses trap: the default is False, meaning every return value is bytes. Forgetting to decode is a frequent source of TypeError in mixed codebases. Set decode_responses=True project-wide at initialization time.

32.1.2 Pipeline

# transaction=False: no MULTI/EXEC, only RTT reduction
pipe = r.pipeline(transaction=False)
for i in range(1000):
    pipe.set(f'key:{i}', f'value:{i}')
    pipe.expire(f'key:{i}', 3600)
results = pipe.execute()
# ['OK', True, 'OK', True, ...] — two entries per key (set + expire)

# Optimistic locking with WATCH + MULTI/EXEC
def transfer(from_key: str, to_key: str, amount: int):
    with r.pipeline() as pipe:
        while True:
            try:
                pipe.watch(from_key, to_key)
                balance = int(pipe.get(from_key) or 0)
                if balance < amount:
                    raise ValueError('Insufficient funds')
                pipe.multi()
                pipe.decrby(from_key, amount)
                pipe.incrby(to_key, amount)
                pipe.execute()
                break
            except redis.WatchError:
                continue  # retry optimistic lock

32.1.3 Lua Scripts

# register_script caches the SHA1 hash; subsequent calls use EVALSHA
ATOMIC_DEDUCT = r.register_script("""
local balance = tonumber(redis.call('get', KEYS[1]))
local amount  = tonumber(ARGV[1])
if balance == nil then
    return redis.error_reply('key not found')
end
if balance < amount then
    return redis.error_reply('insufficient balance')
end
redis.call('decrby', KEYS[1], amount)
return balance - amount
""")

new_balance = ATOMIC_DEDUCT(keys=['account:1001'], args=[100])
print(f'New balance: {new_balance}')

32.1.4 asyncio Client (redis-py 4.2+)

aioredis was merged into redis-py in version 4.2. The redis.asyncio module provides a drop-in async interface:

import asyncio
import redis.asyncio as aioredis

async def main():
    pool = aioredis.ConnectionPool.from_url(
        'redis://localhost:6379',
        max_connections=20,
        decode_responses=True,
    )
    r = aioredis.Redis(connection_pool=pool)

    # Basic ops
    await r.set('session:abc', 'user:1001', ex=3600)
    val = await r.get('session:abc')

    # Async pipeline
    async with r.pipeline(transaction=False) as pipe:
        for i in range(100):
            pipe.set(f'key:{i}', f'val:{i}', ex=600)
        results = await pipe.execute()

    # Pub/Sub
    pubsub = r.pubsub()
    await pubsub.subscribe('orders:new')
    async for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Order event: {message['data']}")
            break

    await r.aclose()

asyncio.run(main())

32.1.5 Cluster Mode

from redis.cluster import RedisCluster, ClusterNode

startup_nodes = [
    ClusterNode('10.0.0.1', 7000),
    ClusterNode('10.0.0.2', 7001),
]

rc = RedisCluster(
    startup_nodes=startup_nodes,
    decode_responses=True,
    password='secret',
    skip_full_coverage_check=True,  # allow partial coverage (cross-DC deployments)
)

rc.set('user:1001', 'Alice')
rc.get('user:1001')

# Pipeline automatically groups commands by slot/node
pipe = rc.pipeline()
pipe.set('key1', 'val1')
pipe.set('key2', 'val2')
results = pipe.execute()

32.1.6 Common Pitfalls

Problem Root cause Fix
ConnectionError: Too many connections max_connections too low or leak Increase limit; use context manager
ResponseError: WRONGTYPE Command used against wrong type Check key type; flush stale keys
UnicodeDecodeError Manual decode on wrongly-encoded bytes Set decode_responses=True globally
Cluster pipeline failure Non-cluster pipeline with cross-slot keys Use RedisCluster.pipeline()
READONLY error Write sent to replica Verify routing; write only to primaries

32.2 ioredis (Node.js)

32.2.1 Connection Configuration and Reconnection

const Redis = require('ioredis');

const redis = new Redis({
    host: 'localhost',
    port: 6379,
    password: process.env.REDIS_PASSWORD,
    db: 0,
    connectTimeout: 5000,        // TCP connect timeout (ms)
    commandTimeout: 3000,        // Per-command timeout (ms)
    maxRetriesPerRequest: 3,     // null = retry indefinitely
    enableReadyCheck: true,      // Wait for server LOADING to finish

    // Exponential backoff reconnection
    retryStrategy(times) {
        if (times > 10) return null; // give up, emit 'error'
        return Math.min(times * 100, 3000);
    },

    enableOfflineQueue: true,    // Buffer commands while disconnected
    offlineQueueMaxLen: 1000,

    // TLS
    tls: process.env.REDIS_TLS === 'true'
        ? { rejectUnauthorized: true, ca: fs.readFileSync('/path/to/ca.crt') }
        : undefined,
});

redis.on('connect',      ()    => console.log('Redis: connected'));
redis.on('ready',        ()    => console.log('Redis: ready'));
redis.on('error',        err   => console.error('Redis error:', err.message));
redis.on('reconnecting', delay => console.log(`Redis: reconnecting in ${delay}ms`));
redis.on('close',        ()    => console.log('Redis: connection closed'));

32.2.2 Pipeline and MULTI/EXEC

// Pipeline — batches RTT, not atomic
const pipeline = redis.pipeline();
pipeline.set('user:1001:name', 'Alice');
pipeline.set('user:1001:age', '25');
pipeline.expire('user:1001:name', 3600);
pipeline.expire('user:1001:age', 3600);
const results = await pipeline.exec();
// [[null, 'OK'], [null, 'OK'], [null, 1], [null, 1]]
// Format: [error, result] per command

// MULTI/EXEC — atomic
const txResult = await redis
    .multi()
    .decrby('account:from', 100)
    .incrby('account:to', 100)
    .exec();
// null if WATCH triggered, otherwise [[null, newFrom], [null, newTo]]

// Optimistic locking
async function transfer(from, to, amount) {
    await redis.watch(from);
    const balance = parseInt(await redis.get(from));
    if (balance < amount) {
        await redis.unwatch();
        throw new Error('Insufficient funds');
    }
    const result = await redis
        .multi()
        .decrby(from, amount)
        .incrby(to, amount)
        .exec();
    if (result === null) return transfer(from, to, amount); // WATCH fired, retry
    return result;
}

32.2.3 Lua Scripts

// defineCommand caches the script SHA1 and uses EVALSHA automatically
redis.defineCommand('releaseLock', {
    numberOfKeys: 1,
    lua: `
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        end
        return 0
    `,
});

const released = await redis.releaseLock('lock:order:1001', 'token-uuid-xyz');
// 1 = released, 0 = token mismatch (not our lock)

// Rate limiter script (sliding window counter)
redis.defineCommand('rateLimit', {
    numberOfKeys: 1,
    lua: `
        local key    = KEYS[1]
        local limit  = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local now    = tonumber(ARGV[3])

        redis.call('zremrangebyscore', key, '-inf', now - window)
        local count = redis.call('zcard', key)
        if count < limit then
            redis.call('zadd', key, now, now)
            redis.call('pexpire', key, window)
            return 1
        end
        return 0
    `,
});

const allowed = await redis.rateLimit(
    'rate:user:1001',
    100,                    // max requests
    60000,                  // window (ms)
    Date.now()
);

32.2.4 Cluster Mode

const cluster = new Redis.Cluster(
    [
        { host: '10.0.0.1', port: 7000 },
        { host: '10.0.0.2', port: 7001 },
        { host: '10.0.0.3', port: 7002 },
    ],
    {
        scaleReads: 'slave',           // Route reads to replicas
        maxRedirections: 16,
        retryDelayOnFailover: 100,
        retryDelayOnClusterDown: 100,
        clusterRetryStrategy: times => Math.min(100 + times * 2, 2000),
        redisOptions: {
            password: process.env.REDIS_PASSWORD,
            connectTimeout: 5000,
        },
    }
);

// ioredis automatically groups pipeline commands by node
const pipe = cluster.pipeline();
for (let i = 0; i < 200; i++) {
    pipe.set(`key:${i}`, `val:${i}`, 'EX', 3600);
}
const results = await pipe.exec();

// Force co-location with hash tags
await cluster.set('{user:1001}:profile',  JSON.stringify(profile));
await cluster.set('{user:1001}:settings', JSON.stringify(settings));
// Both keys share slot determined by {user:1001}

32.2.5 Pub/Sub

// Publisher — use a dedicated connection
const pub = new Redis({ host: 'localhost' });
setInterval(() => {
    pub.publish('metrics:cpu', JSON.stringify({ value: process.cpuUsage() }));
}, 1000);

// Subscriber — cannot execute normal commands while subscribed
const sub = redis.duplicate(); // clone connection config
await sub.subscribe('metrics:cpu', 'metrics:memory');
sub.on('message', (channel, message) => {
    console.log(`[${channel}]`, JSON.parse(message));
});

// Pattern subscribe
await sub.psubscribe('metrics:*');
sub.on('pmessage', (pattern, channel, message) => {
    // handle all metrics channels
});

32.3 go-redis (Go)

32.3.1 Generic API and Connection Pool (v9)

go-redis v9 introduced a generic Cmd type system for type-safe return values. The built-in pool tracks idle connections, validates them via PING, and handles automatic reconnection.

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/redis/go-redis/v9"
)

func newClient() *redis.Client {
    return redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "secret",
        DB:       0,

        // Pool
        PoolSize:        20,
        MinIdleConns:    5,
        MaxIdleConns:    10,
        ConnMaxIdleTime: 5 * time.Minute,
        ConnMaxLifetime: 30 * time.Minute,

        // Timeouts
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
        PoolTimeout:  4 * time.Second,

        // Retry
        MaxRetries:      3,
        MinRetryBackoff: 8 * time.Millisecond,
        MaxRetryBackoff: 512 * time.Millisecond,
    })
}

func main() {
    ctx := context.Background()
    rdb := newClient()
    defer rdb.Close()

    if err := rdb.Ping(ctx).Err(); err != nil {
        log.Fatalf("Redis unavailable: %v", err)
    }

    // Set with TTL
    if err := rdb.Set(ctx, "session:abc", "user:1001", time.Hour).Err(); err != nil {
        log.Fatal(err)
    }

    // Get — distinguish nil from error
    val, err := rdb.Get(ctx, "session:abc").Result()
    switch {
    case err == redis.Nil:
        fmt.Println("key does not exist")
    case err != nil:
        log.Fatalf("get error: %v", err)
    default:
        fmt.Println("session user:", val)
    }
}

32.3.2 Pipeline and Transactions

// Pipelined helper — cleaner syntax
results, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
    for i := 0; i < 100; i++ {
        pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("val:%d", i), time.Hour)
    }
    return nil
})
if err != nil {
    log.Fatal(err)
}
_ = results

// TxPipelined — wraps in MULTI/EXEC
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
    pipe.DecrBy(ctx, "account:from", 100)
    pipe.IncrBy(ctx, "account:to",   100)
    return nil
})

// Optimistic lock with Watch
const maxRetries = 5
for i := 0; i < maxRetries; i++ {
    err = rdb.Watch(ctx, func(tx *redis.Tx) error {
        balance, err := tx.Get(ctx, "account:from").Int64()
        if err != nil {
            return err
        }
        if balance < 100 {
            return fmt.Errorf("insufficient funds")
        }
        _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
            pipe.DecrBy(ctx, "account:from", 100)
            pipe.IncrBy(ctx, "account:to",   100)
            return nil
        })
        return err
    }, "account:from")
    if err == nil {
        break
    }
    if !errors.Is(err, redis.TxFailedErr) {
        log.Fatal(err)
    }
    // TxFailedErr = WATCH key was modified; retry
}

32.3.3 Hook Middleware

The redis.Hook interface intercepts command execution — ideal for Prometheus metrics and OpenTelemetry tracing:

type MetricsHook struct {
    duration *prometheus.HistogramVec
    errors   *prometheus.CounterVec
}

func NewMetricsHook(reg prometheus.Registerer) *MetricsHook {
    h := &MetricsHook{
        duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
            Name:    "redis_cmd_duration_seconds",
            Help:    "Redis command latency histogram",
            Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
        }, []string{"cmd"}),
        errors: prometheus.NewCounterVec(prometheus.CounterOpts{
            Name: "redis_cmd_errors_total",
            Help: "Redis command error count",
        }, []string{"cmd"}),
    }
    reg.MustRegister(h.duration, h.errors)
    return h
}

func (h *MetricsHook) DialHook(next redis.DialHook) redis.DialHook { return next }

func (h *MetricsHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
    return func(ctx context.Context, cmd redis.Cmder) error {
        start := time.Now()
        err := next(ctx, cmd)
        h.duration.WithLabelValues(cmd.Name()).Observe(time.Since(start).Seconds())
        if err != nil && err != redis.Nil {
            h.errors.WithLabelValues(cmd.Name()).Inc()
        }
        return err
    }
}

func (h *MetricsHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
    return func(ctx context.Context, cmds []redis.Cmder) error {
        start := time.Now()
        err := next(ctx, cmds)
        h.duration.WithLabelValues("pipeline").Observe(time.Since(start).Seconds())
        return err
    }
}

// Register
rdb.AddHook(NewMetricsHook(prometheus.DefaultRegisterer))

32.3.4 Lua Scripts

var releaseLockScript = redis.NewScript(`
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    end
    return 0
`)

func releaseLock(ctx context.Context, rdb *redis.Client, key, token string) (bool, error) {
    n, err := releaseLockScript.Run(ctx, rdb, []string{key}, token).Int()
    return n == 1, err
}

// Sliding window rate limiter
var rateLimitScript = redis.NewScript(`
    local key    = KEYS[1]
    local limit  = tonumber(ARGV[1])
    local window = tonumber(ARGV[2])
    local now    = tonumber(ARGV[3])

    redis.call('zremrangebyscore', key, '-inf', now - window)
    local count = redis.call('zcard', key)
    if count < limit then
        redis.call('zadd', key, now, now)
        redis.call('pexpire', key, window)
        return 1
    end
    return 0
`)

32.3.5 Cluster and Sentinel

// Cluster
clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs:          []string{"10.0.0.1:7000", "10.0.0.2:7001", "10.0.0.3:7002"},
    Password:       "secret",
    RouteByLatency: true,
    PoolSize:       10,
    MinIdleConns:   2,
    ReadTimeout:    3 * time.Second,
    WriteTimeout:   3 * time.Second,
})

// Sentinel (automatic failover)
sentinelClient := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:       "mymaster",
    SentinelAddrs:    []string{"10.0.0.1:26379", "10.0.0.2:26379", "10.0.0.3:26379"},
    SentinelPassword: "sentinel-pass",
    Password:         "redis-pass",
    DB:               0,
    PoolSize:         20,
    MinIdleConns:     5,
})

32.3.6 Pool Monitoring

// Periodically log pool health
go func() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    for range ticker.C {
        s := rdb.PoolStats()
        hitRate := float64(s.Hits) / float64(s.Hits+s.Misses+1) * 100
        log.Printf("Redis pool — total:%d idle:%d stale:%d hit:%.1f%% timeouts:%d",
            s.TotalConns, s.IdleConns, s.StaleConns, hitRate, s.Timeouts)
    }
}()

32.4 Cross-Client Comparison

Dimension redis-py ioredis go-redis
Async model asyncio (4.2+) Node.js event loop goroutines
Cluster pipeline Auto slot grouping Auto node grouping Auto node grouping
Script caching register_script (SHA) defineCommand (SHA) redis.NewScript (SHA)
Hook middleware None built-in Custom middleware redis.Hook interface
Type safety Dynamic (Python) Dynamic (JS) Generics (Go 1.18+)
Testing mock fakeredis-py ioredis-mock miniredis
Connection pool impl Pure Python C++ binding (hiredis optional) Pure Go

32.5 Production Tuning Recommendations

Python:

Node.js:

Go:

Rate this chapter
4.8  / 5  (3 ratings)

💬 Comments