第 37 章
Pipeline、批量操作与性能优化
第37章 Pipeline、批量操作与性能优化
每一次 Redis 命令都经历:客户端发送 → 网络传输 → 服务端处理 → 网络返回 → 客户端接收。RTT(Round-Trip Time)往往是性能瓶颈,而非 Redis 本身的处理速度。本章系统讲解如何通过 Pipeline、批量命令和工具手段榨取极致性能。
1. Pipeline 原理与 RTT 节省
1.1 单命令模型的问题
假设网络 RTT = 1ms,Redis 处理速度 = 100,000 命令/秒:
单命令模式:
CMD1: 发送(0.5ms) + 处理(0.01ms) + 接收(0.5ms) ≈ 1.01ms/命令
→ 实际吞吐上限 ≈ 990 命令/秒(被 RTT 卡死)
Pipeline 模式(批次=100):
发送100条(0.5ms) + 处理100×0.01ms(1ms) + 接收100条(0.5ms) ≈ 2ms/100命令
→ 实际吞吐 ≈ 50,000 命令/秒(提升 50 倍)
1.2 Pipeline 使用
import redis
r = redis.Redis()
# 错误方式:逐条发送(N次RTT)
for i in range(1000):
r.set(f"key:{i}", f"value:{i}")
# 正确方式:Pipeline(1次RTT内批量发送)
pipe = r.pipeline(transaction=False) # transaction=False 不加 MULTI/EXEC
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute() # 此时才真正发送
# 带事务的 Pipeline(自动加 MULTI/EXEC)
with r.pipeline() as pipe:
pipe.set("balance:1001", 100)
pipe.incr("tx_count")
pipe.execute()
// Java (Jedis)
try (Jedis jedis = pool.getResource()) {
Pipeline pipe = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
pipe.set("key:" + i, "value:" + i);
}
pipe.sync(); // 发送并等待所有结果
}
// Java (Lettuce,异步)
RedisAsyncCommands<String, String> async = connection.async();
List<RedisFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
futures.add(async.set("key:" + i, "value:" + i));
}
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));
// Go (go-redis)
pipe := rdb.Pipeline()
for i := 0; i < 1000; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
}
_, err := pipe.Exec(ctx)
1.3 Pipeline 注意事项
- Pipeline 不是原子的:中间命令失败不回滚,其他命令继续执行
- Pipeline 命令在服务端排队执行,后面的命令不能依赖前面的返回值
- Pipeline 内不能使用
WATCH(需要用事务模式) - 批次大小建议 100–1000,过大会占用客户端内存和服务端输出缓冲
2. MSET / MGET vs Pipeline
# MSET:单命令原子设置多个 key-value
MSET key1 val1 key2 val2 key3 val3 # O(N),N为键值对数
# MGET:单命令批量获取
MGET key1 key2 key3 # 返回值列表,不存在的返回 nil
# MSETNX:仅当所有 key 都不存在时才设置(全部成功或全部失败)
MSETNX key1 val1 key2 val2 # 原子操作
2.1 与 Pipeline 的区别
| 维度 | MSET/MGET | Pipeline |
|---|---|---|
| 原子性 | 是(单命令) | 否 |
| 命令类型 | 仅 SET/GET | 任意命令组合 |
| Cluster 支持 | 受限(所有 key 必须在同一 slot) | 受限(需手动按 slot 分组) |
| 返回值顺序 | 有序对应 | 有序对应 |
2.2 Cluster 下的批量操作
# Cluster 模式下 MSET 跨 slot 会报错
# 解决方案1:使用 hash tag 强制同一 slot
r.mset({"{user}:1001": "Alice", "{user}:1002": "Bob"}) # {} 内容决定 slot
# 解决方案2:按 slot 分组发送 Pipeline
from redis.cluster import RedisCluster
from redis.cluster import ClusterNode
rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])
def mset_cluster(mapping: dict):
"""Cluster 下安全的批量 SET"""
pipe = rc.pipeline()
for key, value in mapping.items():
pipe.set(key, value)
return pipe.execute()
3. SCAN 替代 KEYS
3.1 为什么禁用 KEYS
KEYS * # 扫描所有 key,O(N),N=10万时 ≈ 10ms,N=100万时 ≈ 100ms
# 单线程模型:执行期间阻塞所有其他命令!
生产事故场景:DBA 在有 500 万 key 的实例上执行 KEYS *,导致服务阻塞 5 秒,QPS 跌零,超时报警全部触发。
3.2 SCAN 用法详解
# SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]
SCAN 0 MATCH user:* COUNT 100
# 返回:[新游标, [key1, key2, ...]]
# 游标 = 0 表示遍历完成(完整游历)
# 示例:完整遍历
SCAN 0 MATCH cache:* COUNT 200
→ [12345, ["cache:a", "cache:b", ...]] # 继续
SCAN 12345 MATCH cache:* COUNT 200
→ [67890, ["cache:c", ...]] # 继续
SCAN 67890 MATCH cache:* COUNT 200
→ [0, ["cache:z"]] # 游标归零,完成
COUNT 是提示,非精确值:Redis 按 COUNT 为参考扫描一批 bucket,实际返回数量可能多可少,取决于 key 分布。
3.3 Python 封装
def scan_all_keys(pattern: str = "*", count: int = 100):
"""安全的全量 key 扫描,生成器模式"""
cursor = 0
while True:
cursor, keys = r.scan(cursor, match=pattern, count=count)
for key in keys:
yield key.decode()
if cursor == 0:
break
def delete_by_pattern(pattern: str, batch_size: int = 500):
"""批量删除匹配 key(生产可用)"""
keys_to_delete = []
for key in scan_all_keys(pattern):
keys_to_delete.append(key)
if len(keys_to_delete) >= batch_size:
r.delete(*keys_to_delete)
keys_to_delete = []
if keys_to_delete:
r.delete(*keys_to_delete)
3.4 HSCAN / SSCAN / ZSCAN
# HSCAN:遍历 Hash 的 field-value 对
HSCAN myhash 0 MATCH field:* COUNT 50
# 返回:[游标, [field1, value1, field2, value2, ...]]
# SSCAN:遍历 Set 成员
SSCAN myset 0 COUNT 100
# ZSCAN:遍历 ZSet(成员 + 分数)
ZSCAN myzset 0 MATCH user:* COUNT 100
# 返回:[游标, [member1, score1, member2, score2, ...]]
def scan_hash_fields(hash_key: str, pattern: str = "*"):
"""分批遍历大 Hash 的所有 field"""
cursor = 0
while True:
cursor, data = r.hscan(hash_key, cursor, match=pattern, count=200)
for field, value in data.items():
yield field.decode(), value.decode()
if cursor == 0:
break
4. 批量写入最佳实践
4.1 小批量(< 1000 条):Pipeline
见上文 Pipeline 章节,推荐批次 100–500。
4.2 大批量(百万条):redis-cli --pipe(Mass Insertion)
Redis 协议(RESP)格式预生成,通过管道一次性灌入:
# 生成 RESP 格式数据
python3 << 'EOF'
import sys
def gen_resp(*args):
cmd = f"*{len(args)}\r\n"
for arg in args:
arg = str(arg)
cmd += f"${len(arg)}\r\n{arg}\r\n"
return cmd
for i in range(1_000_000):
sys.stdout.write(gen_resp("SET", f"key:{i}", f"value:{i}"))
EOF | redis-cli --pipe
# 输出示例:
# All data transferred. Waiting for the last reply...
# Last reply received from server.
# errors: 0, replies: 1000000
# 或使用 awk 直接生成(更快)
awk 'BEGIN{
for(i=1; i<=1000000; i++){
printf "*3\r\n$3\r\nSET\r\n$%d\r\nkey:%d\r\n$%d\r\nvalue:%d\r\n",
length("key:"i), i, length("value:"i), i
}
}' | redis-cli --pipe
4.3 Cluster 大批量:按 slot 分组
from redis.cluster import RedisCluster, ClusterNode
from redis.crc import key_slot
def bulk_insert_cluster(data: dict, batch_size: int = 500):
"""Cluster 下按 slot 分组批量插入"""
rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])
# 按 slot 分组
slot_groups = {}
for key, value in data.items():
slot = key_slot(key)
slot_groups.setdefault(slot, {})[key] = value
# 每个 slot 组单独 Pipeline
for slot, group in slot_groups.items():
pipe = rc.pipeline()
for key, value in group.items():
pipe.set(key, value)
pipe.execute()
5. OBJECT ENCODING 内存优化
Redis 对不同大小的数据使用不同内部编码,影响内存和速度:
OBJECT ENCODING mykey
# String 编码
SET num 12345 → int(整数直接存储,极省内存)
SET short "hello" → embstr(≤44字节,连续内存)
SET long "..." → raw(>44字节,SDS动态字符串)
# Hash 编码(listpack vs hashtable)
HSET small f1 v1 f2 v2 → listpack(field数≤128且每个value≤64字节)
# 超过阈值自动转为 hashtable
# ZSet 编码
ZADD small 1 a 2 b → listpack(元素数≤128且value≤64字节)
# 超过阈值转 skiplist
# Set 编码
SADD intset 1 2 3 → intset(全整数且元素≤512)
SADD mixed 1 a 2 → listpack 或 hashtable
# List 编码
LPUSH mylist a b c → listpack(元素数≤128且每个≤64字节)
# 超过阈值转 quicklist
# 查看编码阈值配置
CONFIG GET hash-max-listpack-entries # 默认 128
CONFIG GET hash-max-listpack-value # 默认 64
CONFIG GET zset-max-listpack-entries # 默认 128
CONFIG GET zset-max-listpack-value # 默认 64
CONFIG GET set-max-intset-entries # 默认 512
CONFIG GET list-max-listpack-size # 默认 128
内存优化技巧:
- 保持 Hash/ZSet/Set 的元素数量在阈值内,享受 listpack 紧凑存储
- 大量小 Hash 可以存储用户数据代替 String(减少 key 数量和元数据开销)
6. redis-benchmark 方法论
6.1 基础用法
# 基础测试:50并发,10万请求,测试 SET 和 GET
redis-benchmark -h 127.0.0.1 -p 6379 -c 50 -n 100000 -t set,get
# Pipeline 测试(每批16条)
redis-benchmark -c 50 -n 100000 -t set -P 16
# 指定 value 大小(默认3字节,生产应与实际一致)
redis-benchmark -c 50 -n 100000 -t set -d 512
# 输出 CSV(便于对比分析)
redis-benchmark -c 50 -n 100000 -t set,get --csv
# Cluster 模式
redis-benchmark --cluster -c 50 -n 100000 -t set,get
# 测试自定义命令
redis-benchmark -c 50 -n 100000 \
eval "return redis.call('set', KEYS[1], ARGV[1])" 1 mykey myvalue
6.2 结果解读
====== SET ======
100000 requests completed in 1.23 seconds
50 parallel clients
3 bytes payload
keep alive: 1
99.00% <= 1 milliseconds ← P99 延迟
99.90% <= 2 milliseconds ← P999 延迟
100.00% <= 3 milliseconds ← 最大延迟
81300.81 requests per second ← QPS
6.3 常见测试误区
| 误区 | 正确做法 |
|---|---|
| value 使用默认 3 字节 | 与生产 value 大小一致(通常 100B–10KB) |
| 并发数设 1 | 与生产客户端并发数对齐 |
| 在 Redis 服务器本机测试 | 从客户端机器测试(网络 RTT 是关键变量) |
| 只看平均值 | 关注 P99/P999 延迟 |
| 不预热就测试 | 先跑 1 万请求预热,再开始计时 |
| 忽略 CPU 使用率 | 同时监控 redis-server CPU,避免 CPU 成为瓶颈 |
6.4 Keyspace notifications 对基准测试的影响
# 如果开启了 keyspace notifications,会显著降低性能
# 基准测试前检查并临时关闭
CONFIG GET notify-keyspace-events
CONFIG SET notify-keyspace-events "" # 关闭
7. 连接池优化
# Python (redis-py) 连接池配置
pool = redis.ConnectionPool(
host="localhost",
port=6379,
max_connections=50, # 最大连接数,与并发线程数匹配
socket_timeout=1.0, # 读超时(秒)
socket_connect_timeout=1.0, # 连接超时
retry_on_timeout=True,
health_check_interval=30, # 定期检测连接健康
)
r = redis.Redis(connection_pool=pool)
// Java (Jedis Pool)
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(50); // 最大连接数
config.setMaxIdle(20); // 最大空闲连接
config.setMinIdle(5); // 最小空闲连接
config.setMaxWaitMillis(1000); // 获取连接最大等待时间
config.setTestOnBorrow(false); // 借用时不测试(高并发下禁用)
config.setTestWhileIdle(true); // 空闲时定期测试
JedisPool pool = new JedisPool(config, "localhost", 6379, 2000, "password");
8. 性能问题排查流程
1. 确认慢在哪里
redis-cli --latency-history -i 1 # 实时延迟监控
redis-cli --stat # 实时统计(QPS/内存/连接数)
2. 查看慢查询日志
SLOWLOG GET 10 # 最近10条慢命令
SLOWLOG RESET
3. 确认是否有大 key
redis-cli --bigkeys # 扫描大 key
4. 确认热 key
redis-cli --hotkeys # 需要 LFU 淘汰策略
5. 查看客户端连接
CLIENT LIST # 所有客户端详情
INFO clients # 连接数统计
6. 查看内存碎片率
INFO memory | grep mem_fragmentation_ratio
# > 1.5 建议重启或 MEMORY PURGE(Redis 4.0+)
本章总结
- Pipeline 是解决 RTT 瓶颈的首选手段,批次 100–500 最优
- MSET/MGET 在非 Cluster 场景下简单原子,Cluster 下注意 slot 限制
- 生产环境严禁
KEYS *,统一改用SCAN - 百万级批量写入用
redis-cli --pipe(Mass Insertion 协议) OBJECT ENCODING指导 listpack→hashtable 阈值调优- benchmark 测试要模拟真实 value 大小、并发数和网络 RTT