第 32 章

多语言客户端:redis-py、ioredis 与 go-redis

第32章 多语言客户端:redis-py、ioredis 与 go-redis

不同技术栈的团队在接入 Redis 时有不同的最佳实践。本章聚焦三个主流多语言客户端:Python 的 redis-py(4.x 系统级重构后支持 asyncio)、Node.js 的 ioredis(集群路由自动化)以及 Go 的 go-redis v9(泛型 API + Hook 拦截器)。深入讲解连接池、Pipeline、Cluster、脚本执行及可观测性接入。


32.1 redis-py(Python)

32.1.1 连接模型与连接池

redis-py 的同步客户端基于 Python socket 和线程锁。Redis 对象在多线程场景下通过 ConnectionPool 保证安全:每次执行命令时从池中借一个连接,命令完成后自动归还。

import redis
from redis.retry import Retry
from redis.backoff import ExponentialBackoff

# 基础连接(不推荐直接使用,无连接池)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# 生产配置(显式连接池)
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    password='secret',
    max_connections=50,          # 最大连接数
    socket_connect_timeout=5,    # TCP 握手超时(秒)
    socket_timeout=3,            # 命令读写超时(秒)
    socket_keepalive=True,       # TCP keepalive(避免运营商断连)
    socket_keepalive_options={
        'TCP_KEEPIDLE': 30,
        'TCP_KEEPINTVL': 10,
        'TCP_KEEPCNT': 3,
    },
    decode_responses=True,       # 自动将 bytes decode 为 str
    retry_on_timeout=True,       # 超时后自动重试一次
    retry=Retry(ExponentialBackoff(), retries=3),
    health_check_interval=30,    # 每30秒检测空闲连接
)
r = redis.Redis(connection_pool=pool)

decode_responses 陷阱:默认 False,所有返回值为 bytes,需要手动 .decode('utf-8')。在混合 bytes/str 的项目中极易出现 TypeError,建议在项目初始化时统一设置 decode_responses=True

32.1.2 Pipeline

# transaction=False:不使用 MULTI/EXEC,仅减少 RTT
pipe = r.pipeline(transaction=False)
for i in range(1000):
    pipe.set(f'key:{i}', f'value:{i}')
    pipe.expire(f'key:{i}', 3600)
results = pipe.execute()
# results: ['OK', True, 'OK', True, ...] 每两个对应一个 key 的 set+expire

# transaction=True(默认):MULTI/EXEC 包裹,原子性
with r.pipeline() as pipe:
    while True:
        try:
            pipe.watch('account:1001')           # WATCH 键,若被修改则 DISCARD
            balance = int(pipe.get('account:1001'))
            if balance < 100:
                raise InsufficientFundsError()
            pipe.multi()                         # 开始 MULTI
            pipe.decrby('account:1001', 100)
            pipe.incrby('account:1002', 100)
            pipe.execute()                       # EXEC
            break
        except redis.WatchError:
            continue                             # 重试乐观锁

32.1.3 Lua 脚本

# 定义脚本(第一次调用自动注册到 Redis,后续用 SHA 调用)
ATOMIC_DEDUCT = r.register_script("""
local balance = tonumber(redis.call('get', KEYS[1]))
local amount  = tonumber(ARGV[1])
if balance == nil then
    return {err = 'key not found'}
end
if balance < amount then
    return {err = 'insufficient balance'}
end
redis.call('decrby', KEYS[1], amount)
return balance - amount
""")

new_balance = ATOMIC_DEDUCT(keys=['account:1001'], args=[100])

32.1.4 asyncio 客户端(redis-py 4.2+)

redis-py 4.2 开始将 aioredis 合并进主库,通过 redis.asyncio 提供原生 asyncio 支持:

import asyncio
import redis.asyncio as aioredis

async def main():
    # 创建连接池(asyncio 版)
    pool = aioredis.ConnectionPool.from_url(
        'redis://localhost:6379',
        max_connections=20,
        decode_responses=True,
    )
    r = aioredis.Redis(connection_pool=pool)

    # 基本操作
    await r.set('key', 'value', ex=3600)
    val = await r.get('key')

    # 异步 Pipeline
    async with r.pipeline(transaction=False) as pipe:
        for i in range(100):
            pipe.set(f'key:{i}', f'val:{i}')
        results = await pipe.execute()

    # 发布订阅
    pubsub = r.pubsub()
    await pubsub.subscribe('channel:news')
    async for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Received: {message['data']}")

    await r.aclose()

asyncio.run(main())

32.1.5 Cluster 模式

from redis.cluster import RedisCluster, ClusterNode

# 只需提供部分启动节点,客户端自动发现其余节点
startup_nodes = [
    ClusterNode('10.0.0.1', 7000),
    ClusterNode('10.0.0.2', 7001),
]

rc = RedisCluster(
    startup_nodes=startup_nodes,
    decode_responses=True,
    password='secret',
    skip_full_coverage_check=True,  # 跨机房部署时节点可能不完全在线
)

rc.set('user:1001', 'Alice')
rc.get('user:1001')

# Cluster pipeline:redis-py 自动按 slot 分组
pipe = rc.pipeline()
pipe.set('key1', 'val1')  # slot A
pipe.set('key2', 'val2')  # slot B(自动路由到不同节点)
results = pipe.execute()

32.1.6 常见坑总结

问题 原因 解决
ConnectionError: Too many connections max_connections 不足或连接未归还 调大上限;使用 contextmanager
ResponseError: WRONGTYPE 对错误类型执行命令(string vs hash) 检查 key 类型,清理历史脏数据
UnicodeDecodeError decode_responses=False 时手动解码错误编码 统一设置 decode_responses=True
Cluster 下 Pipeline 报错 key 哈希到多个 slot 但使用非 Cluster pipeline 改用 RedisCluster.pipeline()
READONLY 错误 向 Cluster 从节点发送写命令 检查路由逻辑;只向主节点写

32.2 ioredis(Node.js)

32.2.1 连接配置与重连策略

const Redis = require('ioredis');

const redis = new Redis({
    host: 'localhost',
    port: 6379,
    password: process.env.REDIS_PASSWORD,
    db: 0,
    connectTimeout: 5000,        // TCP连接超时(ms)
    commandTimeout: 3000,        // 单个命令超时(ms)
    maxRetriesPerRequest: 3,     // 每个请求最多重试次数(null=无限)
    enableReadyCheck: true,      // 连接建立后先确认服务器已就绪

    // 指数退避重连策略
    retryStrategy(times) {
        if (times > 10) return null; // 超过10次停止重连,触发 error 事件
        return Math.min(times * 100, 3000); // 等待 100ms, 200ms, ..., 3000ms
    },

    // 连接断开时是否缓存命令(等重连后执行)
    enableOfflineQueue: true,
    offlineQueueMaxLen: 1000,    // 最多缓存1000条命令

    // TLS/SSL
    tls: process.env.REDIS_TLS === 'true' ? {
        rejectUnauthorized: true,
        ca: fs.readFileSync('/path/to/ca.crt'),
    } : undefined,
});

redis.on('connect', () => console.log('Redis connected'));
redis.on('error', err => console.error('Redis error:', err));
redis.on('reconnecting', delay => console.log(`Reconnecting in ${delay}ms`));

32.2.2 Pipeline 与 Multi-Exec

// Pipeline(仅减少 RTT,非原子)
const pipeline = redis.pipeline();
pipeline.set('user:1001:name', 'Alice');
pipeline.set('user:1001:age', '25');
pipeline.expire('user:1001:name', 3600);
pipeline.expire('user:1001:age', 3600);
pipeline.hset('user:1001', 'name', 'Alice', 'age', '25');

const results = await pipeline.exec();
// results: [[null, 'OK'], [null, 'OK'], [null, 1], [null, 1], [null, 2]]
// 每个元素: [error, result]

// MULTI/EXEC(原子性)
const multi = redis.multi();
multi.decrby('account:1001', 100);
multi.incrby('account:1002', 100);
const txResults = await multi.exec();

// 乐观锁(WATCH + MULTI/EXEC)
async function transfer(from, to, amount) {
    const watchResult = await redis.watch(from, to);
    const fromBalance = parseInt(await redis.get(from));
    if (fromBalance < amount) throw new Error('Insufficient funds');

    const result = await redis
        .multi()
        .decrby(from, amount)
        .incrby(to, amount)
        .exec();

    if (result === null) {
        // WATCH 触发(被其他客户端修改),重试
        return transfer(from, to, amount);
    }
    return result;
}

32.2.3 Lua 脚本

// 定义并缓存脚本(EVALSHA)
const unlockScript = redis.defineCommand('unlock', {
    numberOfKeys: 1,
    lua: `
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
    `,
});

// 使用自定义命令
const result = await redis.unlock('lock:order:1001', 'token-uuid-abc');
console.log(result); // 1 = 成功释放, 0 = 不是自己的锁

// 原始 eval(不缓存)
const remaining = await redis.eval(
    `
    local ttl = redis.call('pttl', KEYS[1])
    if ttl < 0 then return 0 end
    return ttl
    `,
    1,           // numkeys
    'session:abc' // KEYS[1]
);

32.2.4 Cluster 模式

const cluster = new Redis.Cluster(
    [
        { host: '10.0.0.1', port: 7000 },
        { host: '10.0.0.2', port: 7001 },
        { host: '10.0.0.3', port: 7002 },
    ],
    {
        scaleReads: 'slave',           // 读操作路由到从节点(负载均衡)
        maxRedirections: 16,           // MOVED/ASK 最大重定向次数
        retryDelayOnFailover: 100,     // 故障转移后重试延迟(ms)
        retryDelayOnClusterDown: 100,  // 集群下线时重试延迟

        clusterRetryStrategy(times) {
            return Math.min(100 + times * 2, 2000);
        },

        redisOptions: {
            password: process.env.REDIS_PASSWORD,
            connectTimeout: 5000,
        },
    }
);

// Cluster Pipeline:ioredis 自动按节点分组,并行发送
const pipe = cluster.pipeline();
for (let i = 0; i < 100; i++) {
    pipe.set(`key:${i}`, `val:${i}`);
}
const results = await pipe.exec();

// 强制路由到同一节点:使用 hash tag {}
await cluster.set('{user:1001}:profile', JSON.stringify(profile));
await cluster.set('{user:1001}:sessions', JSON.stringify(sessions));
// 两个 key 因 {user:1001} 相同,必然在同一 slot

32.2.5 发布订阅

// 发布者
const publisher = new Redis({ host: 'localhost' });
await publisher.publish('orders:new', JSON.stringify({ orderId: 1001, amount: 299 }));

// 订阅者(专用连接,不能执行普通命令)
const subscriber = new Redis({ host: 'localhost' });
await subscriber.subscribe('orders:new', 'orders:cancelled');

subscriber.on('message', (channel, message) => {
    const data = JSON.parse(message);
    console.log(`Channel: ${channel}, Order: ${data.orderId}`);
});

// 模式订阅
await subscriber.psubscribe('orders:*');
subscriber.on('pmessage', (pattern, channel, message) => {
    console.log(`Pattern: ${pattern}, Channel: ${channel}`);
});

32.3 go-redis(Go)

32.3.1 泛型 API 与连接池(v9)

go-redis v9 引入了 Cmd 泛型包装,命令返回值类型更安全。连接池基于 go-redis 内置实现(非 sync.Pool),具备完整的 idle 检测与重连逻辑。

package main

import (
    "context"
    "time"

    "github.com/redis/go-redis/v9"
)

func newRedisClient() *redis.Client {
    return redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "secret",
        DB:       0,

        // 连接池
        PoolSize:        20,            // 最大连接数
        MinIdleConns:    5,             // 最小空闲连接
        MaxIdleConns:    10,            // 最大空闲连接
        ConnMaxIdleTime: 5 * time.Minute,  // 空闲超过此时间则销毁
        ConnMaxLifetime: 30 * time.Minute, // 连接最长存活时间(防内存泄漏)

        // 超时
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
        PoolTimeout:  4 * time.Second,  // 等待连接池可用连接的超时

        // 重试
        MaxRetries:      3,
        MinRetryBackoff: 8 * time.Millisecond,
        MaxRetryBackoff: 512 * time.Millisecond,
    })
}

func main() {
    ctx := context.Background()
    rdb := newRedisClient()
    defer rdb.Close()

    // 测试连接
    if err := rdb.Ping(ctx).Err(); err != nil {
        panic(err)
    }

    // 基本操作
    rdb.Set(ctx, "user:1001", "Alice", 1*time.Hour)
    val, err := rdb.Get(ctx, "user:1001").Result()
    if err == redis.Nil {
        // key 不存在
    } else if err != nil {
        // 其他错误
    }
    _ = val
}

32.3.2 Pipeline 与事务

// Pipeline(无 MULTI/EXEC)
pipe := rdb.Pipeline()
setCmd := pipe.Set(ctx, "key1", "val1", time.Hour)
getCmd := pipe.Get(ctx, "key1")
incrCmd := pipe.Incr(ctx, "counter")

_, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
    log.Fatal(err)
}
fmt.Println(setCmd.Val()) // "OK"
fmt.Println(getCmd.Val()) // "val1"
fmt.Println(incrCmd.Val()) // 1

// 使用 Pipelined 简化
results, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
    for i := 0; i < 100; i++ {
        pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("val:%d", i), time.Hour)
    }
    return nil
})
_ = results

// TxPipeline(MULTI/EXEC 包裹,原子性)
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
    pipe.DecrBy(ctx, "account:1001", 100)
    pipe.IncrBy(ctx, "account:1002", 100)
    return nil
})

32.3.3 Hook 拦截器(监控与链路追踪)

go-redis v9 提供 redis.Hook 接口,可在命令执行前后注入自定义逻辑,常用于 Prometheus 指标采集和 OpenTelemetry 追踪。

type MetricsHook struct {
    latency *prometheus.HistogramVec
}

func NewMetricsHook() *MetricsHook {
    return &MetricsHook{
        latency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
            Name:    "redis_command_duration_seconds",
            Help:    "Redis command execution duration",
            Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
        }, []string{"cmd", "status"}),
    }
}

func (h *MetricsHook) DialHook(next redis.DialHook) redis.DialHook {
    return next
}

func (h *MetricsHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
    return func(ctx context.Context, cmd redis.Cmder) error {
        start := time.Now()
        err := next(ctx, cmd)
        status := "ok"
        if err != nil && err != redis.Nil {
            status = "error"
        }
        h.latency.WithLabelValues(cmd.Name(), status).
            Observe(time.Since(start).Seconds())
        return err
    }
}

func (h *MetricsHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
    return func(ctx context.Context, cmds []redis.Cmder) error {
        start := time.Now()
        err := next(ctx, cmds)
        h.latency.WithLabelValues("pipeline", "ok").
            Observe(time.Since(start).Seconds())
        return err
    }
}

// 注册 Hook
rdb.AddHook(NewMetricsHook())

32.3.4 Lua 脚本

// 定义脚本(首次执行自动选择 EVALSHA 或 EVAL)
var unlockScript = redis.NewScript(`
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
`)

func releaseLock(ctx context.Context, rdb *redis.Client, key, token string) (bool, error) {
    result, err := unlockScript.Run(ctx, rdb, []string{key}, token).Int()
    if err != nil {
        return false, err
    }
    return result == 1, nil
}

// 原子 GETSET 用于缓存更新
var getOrSetScript = redis.NewScript(`
    local val = redis.call("get", KEYS[1])
    if val ~= false then
        return val
    end
    redis.call("set", KEYS[1], ARGV[1])
    redis.call("pexpire", KEYS[1], ARGV[2])
    return ARGV[1]
`)

32.3.5 Cluster 与 Sentinel

// Cluster 客户端
clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs:          []string{"10.0.0.1:7000", "10.0.0.2:7001", "10.0.0.3:7002"},
    Password:       "secret",
    ReadOnly:       false,
    RouteByLatency: true,  // 路由读到延迟最低的节点
    RouteRandomly:  false,
    PoolSize:       10,
    MinIdleConns:   2,
})

// Sentinel 客户端(哨兵模式)
sentinelClient := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:       "mymaster",
    SentinelAddrs:    []string{"10.0.0.1:26379", "10.0.0.2:26379", "10.0.0.3:26379"},
    SentinelPassword: "sentinel-pass",
    Password:         "redis-pass",
    DB:               0,
    PoolSize:         20,
})

32.3.6 Context 与超时控制

go-redis 所有命令接受 context.Context,可通过 Context 控制单个命令超时:

// 单命令超时
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

val, err := rdb.Get(ctx, "key").Result()
if errors.Is(err, context.DeadlineExceeded) {
    log.Warn("Redis get timed out")
}

// 传播链路追踪(OpenTelemetry)
ctx = otel.GetTracer("redis").Start(ctx, "redis.get")
val, err = rdb.Get(ctx, "key").Result()

32.4 三客户端横向对比

维度 redis-py ioredis go-redis
连接模型 同步/asyncio 异步(EventLoop) goroutine + 连接池
Cluster Pipeline 自动按 slot 分组 自动按节点分组 自动按节点分组
Lua 脚本缓存 register_script(SHA 复用) defineCommand(SHA 复用) redis.NewScript(SHA 复用)
Hook/中间件 无官方 Hook 自定义 middleware redis.Hook 接口
序列化 手动(json/pickle) 手动(JSON.stringify) 手动(encoding/json)
测试工具 fakeredis ioredis-mock miniredis

32.5 生产调优建议

Python

Node.js

Go

// 定期打印连接池统计
go func() {
    ticker := time.NewTicker(30 * time.Second)
    for range ticker.C {
        stats := rdb.PoolStats()
        log.Infof("Redis pool: total=%d idle=%d stale=%d hits=%d misses=%d timeouts=%d",
            stats.TotalConns, stats.IdleConns, stats.StaleConns,
            stats.Hits, stats.Misses, stats.Timeouts)
    }
}()
本章评分
4.8  / 5  (3 评分)

💬 留言讨论