第 32 章
多语言客户端:redis-py、ioredis 与 go-redis
第32章 多语言客户端:redis-py、ioredis 与 go-redis
不同技术栈的团队在接入 Redis 时有不同的最佳实践。本章聚焦三个主流多语言客户端:Python 的 redis-py(4.x 系统级重构后支持 asyncio)、Node.js 的 ioredis(集群路由自动化)以及 Go 的 go-redis v9(泛型 API + Hook 拦截器)。深入讲解连接池、Pipeline、Cluster、脚本执行及可观测性接入。
32.1 redis-py(Python)
32.1.1 连接模型与连接池
redis-py 的同步客户端基于 Python socket 和线程锁。Redis 对象在多线程场景下通过 ConnectionPool 保证安全:每次执行命令时从池中借一个连接,命令完成后自动归还。
import redis
from redis.retry import Retry
from redis.backoff import ExponentialBackoff
# 基础连接(不推荐直接使用,无连接池)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 生产配置(显式连接池)
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
password='secret',
max_connections=50, # 最大连接数
socket_connect_timeout=5, # TCP 握手超时(秒)
socket_timeout=3, # 命令读写超时(秒)
socket_keepalive=True, # TCP keepalive(避免运营商断连)
socket_keepalive_options={
'TCP_KEEPIDLE': 30,
'TCP_KEEPINTVL': 10,
'TCP_KEEPCNT': 3,
},
decode_responses=True, # 自动将 bytes decode 为 str
retry_on_timeout=True, # 超时后自动重试一次
retry=Retry(ExponentialBackoff(), retries=3),
health_check_interval=30, # 每30秒检测空闲连接
)
r = redis.Redis(connection_pool=pool)
decode_responses 陷阱:默认 False,所有返回值为 bytes,需要手动 .decode('utf-8')。在混合 bytes/str 的项目中极易出现 TypeError,建议在项目初始化时统一设置 decode_responses=True。
32.1.2 Pipeline
# transaction=False:不使用 MULTI/EXEC,仅减少 RTT
pipe = r.pipeline(transaction=False)
for i in range(1000):
pipe.set(f'key:{i}', f'value:{i}')
pipe.expire(f'key:{i}', 3600)
results = pipe.execute()
# results: ['OK', True, 'OK', True, ...] 每两个对应一个 key 的 set+expire
# transaction=True(默认):MULTI/EXEC 包裹,原子性
with r.pipeline() as pipe:
while True:
try:
pipe.watch('account:1001') # WATCH 键,若被修改则 DISCARD
balance = int(pipe.get('account:1001'))
if balance < 100:
raise InsufficientFundsError()
pipe.multi() # 开始 MULTI
pipe.decrby('account:1001', 100)
pipe.incrby('account:1002', 100)
pipe.execute() # EXEC
break
except redis.WatchError:
continue # 重试乐观锁
32.1.3 Lua 脚本
# 定义脚本(第一次调用自动注册到 Redis,后续用 SHA 调用)
ATOMIC_DEDUCT = r.register_script("""
local balance = tonumber(redis.call('get', KEYS[1]))
local amount = tonumber(ARGV[1])
if balance == nil then
return {err = 'key not found'}
end
if balance < amount then
return {err = 'insufficient balance'}
end
redis.call('decrby', KEYS[1], amount)
return balance - amount
""")
new_balance = ATOMIC_DEDUCT(keys=['account:1001'], args=[100])
32.1.4 asyncio 客户端(redis-py 4.2+)
redis-py 4.2 开始将 aioredis 合并进主库,通过 redis.asyncio 提供原生 asyncio 支持:
import asyncio
import redis.asyncio as aioredis
async def main():
# 创建连接池(asyncio 版)
pool = aioredis.ConnectionPool.from_url(
'redis://localhost:6379',
max_connections=20,
decode_responses=True,
)
r = aioredis.Redis(connection_pool=pool)
# 基本操作
await r.set('key', 'value', ex=3600)
val = await r.get('key')
# 异步 Pipeline
async with r.pipeline(transaction=False) as pipe:
for i in range(100):
pipe.set(f'key:{i}', f'val:{i}')
results = await pipe.execute()
# 发布订阅
pubsub = r.pubsub()
await pubsub.subscribe('channel:news')
async for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
await r.aclose()
asyncio.run(main())
32.1.5 Cluster 模式
from redis.cluster import RedisCluster, ClusterNode
# 只需提供部分启动节点,客户端自动发现其余节点
startup_nodes = [
ClusterNode('10.0.0.1', 7000),
ClusterNode('10.0.0.2', 7001),
]
rc = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
password='secret',
skip_full_coverage_check=True, # 跨机房部署时节点可能不完全在线
)
rc.set('user:1001', 'Alice')
rc.get('user:1001')
# Cluster pipeline:redis-py 自动按 slot 分组
pipe = rc.pipeline()
pipe.set('key1', 'val1') # slot A
pipe.set('key2', 'val2') # slot B(自动路由到不同节点)
results = pipe.execute()
32.1.6 常见坑总结
| 问题 | 原因 | 解决 |
|---|---|---|
ConnectionError: Too many connections |
max_connections 不足或连接未归还 |
调大上限;使用 contextmanager |
ResponseError: WRONGTYPE |
对错误类型执行命令(string vs hash) | 检查 key 类型,清理历史脏数据 |
UnicodeDecodeError |
decode_responses=False 时手动解码错误编码 |
统一设置 decode_responses=True |
| Cluster 下 Pipeline 报错 | key 哈希到多个 slot 但使用非 Cluster pipeline | 改用 RedisCluster.pipeline() |
READONLY 错误 |
向 Cluster 从节点发送写命令 | 检查路由逻辑;只向主节点写 |
32.2 ioredis(Node.js)
32.2.1 连接配置与重连策略
const Redis = require('ioredis');
const redis = new Redis({
host: 'localhost',
port: 6379,
password: process.env.REDIS_PASSWORD,
db: 0,
connectTimeout: 5000, // TCP连接超时(ms)
commandTimeout: 3000, // 单个命令超时(ms)
maxRetriesPerRequest: 3, // 每个请求最多重试次数(null=无限)
enableReadyCheck: true, // 连接建立后先确认服务器已就绪
// 指数退避重连策略
retryStrategy(times) {
if (times > 10) return null; // 超过10次停止重连,触发 error 事件
return Math.min(times * 100, 3000); // 等待 100ms, 200ms, ..., 3000ms
},
// 连接断开时是否缓存命令(等重连后执行)
enableOfflineQueue: true,
offlineQueueMaxLen: 1000, // 最多缓存1000条命令
// TLS/SSL
tls: process.env.REDIS_TLS === 'true' ? {
rejectUnauthorized: true,
ca: fs.readFileSync('/path/to/ca.crt'),
} : undefined,
});
redis.on('connect', () => console.log('Redis connected'));
redis.on('error', err => console.error('Redis error:', err));
redis.on('reconnecting', delay => console.log(`Reconnecting in ${delay}ms`));
32.2.2 Pipeline 与 Multi-Exec
// Pipeline(仅减少 RTT,非原子)
const pipeline = redis.pipeline();
pipeline.set('user:1001:name', 'Alice');
pipeline.set('user:1001:age', '25');
pipeline.expire('user:1001:name', 3600);
pipeline.expire('user:1001:age', 3600);
pipeline.hset('user:1001', 'name', 'Alice', 'age', '25');
const results = await pipeline.exec();
// results: [[null, 'OK'], [null, 'OK'], [null, 1], [null, 1], [null, 2]]
// 每个元素: [error, result]
// MULTI/EXEC(原子性)
const multi = redis.multi();
multi.decrby('account:1001', 100);
multi.incrby('account:1002', 100);
const txResults = await multi.exec();
// 乐观锁(WATCH + MULTI/EXEC)
async function transfer(from, to, amount) {
const watchResult = await redis.watch(from, to);
const fromBalance = parseInt(await redis.get(from));
if (fromBalance < amount) throw new Error('Insufficient funds');
const result = await redis
.multi()
.decrby(from, amount)
.incrby(to, amount)
.exec();
if (result === null) {
// WATCH 触发(被其他客户端修改),重试
return transfer(from, to, amount);
}
return result;
}
32.2.3 Lua 脚本
// 定义并缓存脚本(EVALSHA)
const unlockScript = redis.defineCommand('unlock', {
numberOfKeys: 1,
lua: `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`,
});
// 使用自定义命令
const result = await redis.unlock('lock:order:1001', 'token-uuid-abc');
console.log(result); // 1 = 成功释放, 0 = 不是自己的锁
// 原始 eval(不缓存)
const remaining = await redis.eval(
`
local ttl = redis.call('pttl', KEYS[1])
if ttl < 0 then return 0 end
return ttl
`,
1, // numkeys
'session:abc' // KEYS[1]
);
32.2.4 Cluster 模式
const cluster = new Redis.Cluster(
[
{ host: '10.0.0.1', port: 7000 },
{ host: '10.0.0.2', port: 7001 },
{ host: '10.0.0.3', port: 7002 },
],
{
scaleReads: 'slave', // 读操作路由到从节点(负载均衡)
maxRedirections: 16, // MOVED/ASK 最大重定向次数
retryDelayOnFailover: 100, // 故障转移后重试延迟(ms)
retryDelayOnClusterDown: 100, // 集群下线时重试延迟
clusterRetryStrategy(times) {
return Math.min(100 + times * 2, 2000);
},
redisOptions: {
password: process.env.REDIS_PASSWORD,
connectTimeout: 5000,
},
}
);
// Cluster Pipeline:ioredis 自动按节点分组,并行发送
const pipe = cluster.pipeline();
for (let i = 0; i < 100; i++) {
pipe.set(`key:${i}`, `val:${i}`);
}
const results = await pipe.exec();
// 强制路由到同一节点:使用 hash tag {}
await cluster.set('{user:1001}:profile', JSON.stringify(profile));
await cluster.set('{user:1001}:sessions', JSON.stringify(sessions));
// 两个 key 因 {user:1001} 相同,必然在同一 slot
32.2.5 发布订阅
// 发布者
const publisher = new Redis({ host: 'localhost' });
await publisher.publish('orders:new', JSON.stringify({ orderId: 1001, amount: 299 }));
// 订阅者(专用连接,不能执行普通命令)
const subscriber = new Redis({ host: 'localhost' });
await subscriber.subscribe('orders:new', 'orders:cancelled');
subscriber.on('message', (channel, message) => {
const data = JSON.parse(message);
console.log(`Channel: ${channel}, Order: ${data.orderId}`);
});
// 模式订阅
await subscriber.psubscribe('orders:*');
subscriber.on('pmessage', (pattern, channel, message) => {
console.log(`Pattern: ${pattern}, Channel: ${channel}`);
});
32.3 go-redis(Go)
32.3.1 泛型 API 与连接池(v9)
go-redis v9 引入了 Cmd 泛型包装,命令返回值类型更安全。连接池基于 go-redis 内置实现(非 sync.Pool),具备完整的 idle 检测与重连逻辑。
package main
import (
"context"
"time"
"github.com/redis/go-redis/v9"
)
func newRedisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "secret",
DB: 0,
// 连接池
PoolSize: 20, // 最大连接数
MinIdleConns: 5, // 最小空闲连接
MaxIdleConns: 10, // 最大空闲连接
ConnMaxIdleTime: 5 * time.Minute, // 空闲超过此时间则销毁
ConnMaxLifetime: 30 * time.Minute, // 连接最长存活时间(防内存泄漏)
// 超时
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolTimeout: 4 * time.Second, // 等待连接池可用连接的超时
// 重试
MaxRetries: 3,
MinRetryBackoff: 8 * time.Millisecond,
MaxRetryBackoff: 512 * time.Millisecond,
})
}
func main() {
ctx := context.Background()
rdb := newRedisClient()
defer rdb.Close()
// 测试连接
if err := rdb.Ping(ctx).Err(); err != nil {
panic(err)
}
// 基本操作
rdb.Set(ctx, "user:1001", "Alice", 1*time.Hour)
val, err := rdb.Get(ctx, "user:1001").Result()
if err == redis.Nil {
// key 不存在
} else if err != nil {
// 其他错误
}
_ = val
}
32.3.2 Pipeline 与事务
// Pipeline(无 MULTI/EXEC)
pipe := rdb.Pipeline()
setCmd := pipe.Set(ctx, "key1", "val1", time.Hour)
getCmd := pipe.Get(ctx, "key1")
incrCmd := pipe.Incr(ctx, "counter")
_, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
log.Fatal(err)
}
fmt.Println(setCmd.Val()) // "OK"
fmt.Println(getCmd.Val()) // "val1"
fmt.Println(incrCmd.Val()) // 1
// 使用 Pipelined 简化
results, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i := 0; i < 100; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("val:%d", i), time.Hour)
}
return nil
})
_ = results
// TxPipeline(MULTI/EXEC 包裹,原子性)
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.DecrBy(ctx, "account:1001", 100)
pipe.IncrBy(ctx, "account:1002", 100)
return nil
})
32.3.3 Hook 拦截器(监控与链路追踪)
go-redis v9 提供 redis.Hook 接口,可在命令执行前后注入自定义逻辑,常用于 Prometheus 指标采集和 OpenTelemetry 追踪。
type MetricsHook struct {
latency *prometheus.HistogramVec
}
func NewMetricsHook() *MetricsHook {
return &MetricsHook{
latency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "redis_command_duration_seconds",
Help: "Redis command execution duration",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
}, []string{"cmd", "status"}),
}
}
func (h *MetricsHook) DialHook(next redis.DialHook) redis.DialHook {
return next
}
func (h *MetricsHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
start := time.Now()
err := next(ctx, cmd)
status := "ok"
if err != nil && err != redis.Nil {
status = "error"
}
h.latency.WithLabelValues(cmd.Name(), status).
Observe(time.Since(start).Seconds())
return err
}
}
func (h *MetricsHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
start := time.Now()
err := next(ctx, cmds)
h.latency.WithLabelValues("pipeline", "ok").
Observe(time.Since(start).Seconds())
return err
}
}
// 注册 Hook
rdb.AddHook(NewMetricsHook())
32.3.4 Lua 脚本
// 定义脚本(首次执行自动选择 EVALSHA 或 EVAL)
var unlockScript = redis.NewScript(`
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`)
func releaseLock(ctx context.Context, rdb *redis.Client, key, token string) (bool, error) {
result, err := unlockScript.Run(ctx, rdb, []string{key}, token).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
// 原子 GETSET 用于缓存更新
var getOrSetScript = redis.NewScript(`
local val = redis.call("get", KEYS[1])
if val ~= false then
return val
end
redis.call("set", KEYS[1], ARGV[1])
redis.call("pexpire", KEYS[1], ARGV[2])
return ARGV[1]
`)
32.3.5 Cluster 与 Sentinel
// Cluster 客户端
clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"10.0.0.1:7000", "10.0.0.2:7001", "10.0.0.3:7002"},
Password: "secret",
ReadOnly: false,
RouteByLatency: true, // 路由读到延迟最低的节点
RouteRandomly: false,
PoolSize: 10,
MinIdleConns: 2,
})
// Sentinel 客户端(哨兵模式)
sentinelClient := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "mymaster",
SentinelAddrs: []string{"10.0.0.1:26379", "10.0.0.2:26379", "10.0.0.3:26379"},
SentinelPassword: "sentinel-pass",
Password: "redis-pass",
DB: 0,
PoolSize: 20,
})
32.3.6 Context 与超时控制
go-redis 所有命令接受 context.Context,可通过 Context 控制单个命令超时:
// 单命令超时
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
val, err := rdb.Get(ctx, "key").Result()
if errors.Is(err, context.DeadlineExceeded) {
log.Warn("Redis get timed out")
}
// 传播链路追踪(OpenTelemetry)
ctx = otel.GetTracer("redis").Start(ctx, "redis.get")
val, err = rdb.Get(ctx, "key").Result()
32.4 三客户端横向对比
| 维度 | redis-py | ioredis | go-redis |
|---|---|---|---|
| 连接模型 | 同步/asyncio | 异步(EventLoop) | goroutine + 连接池 |
| Cluster Pipeline | 自动按 slot 分组 | 自动按节点分组 | 自动按节点分组 |
| Lua 脚本缓存 | register_script(SHA 复用) |
defineCommand(SHA 复用) |
redis.NewScript(SHA 复用) |
| Hook/中间件 | 无官方 Hook | 自定义 middleware | redis.Hook 接口 |
| 序列化 | 手动(json/pickle) | 手动(JSON.stringify) | 手动(encoding/json) |
| 测试工具 | fakeredis | ioredis-mock | miniredis |
32.5 生产调优建议
Python:
- 使用
redis.asyncio配合 FastAPI/aiohttp,避免阻塞事件循环 - 连接池
max_connections设为 CPU核数 × 2,asyncio 场景通常更小 - 生产中禁止使用
r.delete(*r.keys('prefix:*'))(KEYS 阻塞 + 大量删除),改用 SCAN 分批
Node.js:
- ioredis 的
lazyConnect: true延迟到第一次命令时建立连接,适合 Lambda 等冷启动场景 - 订阅者连接不能执行普通命令,需要单独创建
subscriber = redis.duplicate() - 在 Serverless 环境中注意连接复用,避免每次函数调用创建新连接
Go:
- go-redis 连接池是 goroutine 安全的,一个
*redis.Client全局共享 - 使用
rdb.Conn()获取独占连接(如需SUBSCRIBE、BLPOP等阻塞命令) - 善用
PoolStats()监控池状态:HitRate = Hits / (Hits + Misses)应高于 95%
// 定期打印连接池统计
go func() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
stats := rdb.PoolStats()
log.Infof("Redis pool: total=%d idle=%d stale=%d hits=%d misses=%d timeouts=%d",
stats.TotalConns, stats.IdleConns, stats.StaleConns,
stats.Hits, stats.Misses, stats.Timeouts)
}
}()