第 37 章

Pipeline、批量操作与性能优化

第37章 Pipeline、批量操作与性能优化

每一次 Redis 命令都经历:客户端发送 → 网络传输 → 服务端处理 → 网络返回 → 客户端接收。RTT(Round-Trip Time)往往是性能瓶颈,而非 Redis 本身的处理速度。本章系统讲解如何通过 Pipeline、批量命令和工具手段榨取极致性能。


1. Pipeline 原理与 RTT 节省

1.1 单命令模型的问题

假设网络 RTT = 1ms,Redis 处理速度 = 100,000 命令/秒:

单命令模式:
  CMD1: 发送(0.5ms) + 处理(0.01ms) + 接收(0.5ms) ≈ 1.01ms/命令
  → 实际吞吐上限 ≈ 990 命令/秒(被 RTT 卡死)

Pipeline 模式(批次=100):
  发送100条(0.5ms) + 处理100×0.01ms(1ms) + 接收100条(0.5ms) ≈ 2ms/100命令
  → 实际吞吐 ≈ 50,000 命令/秒(提升 50 倍)

1.2 Pipeline 使用

import redis

r = redis.Redis()

# 错误方式:逐条发送(N次RTT)
for i in range(1000):
    r.set(f"key:{i}", f"value:{i}")

# 正确方式:Pipeline(1次RTT内批量发送)
pipe = r.pipeline(transaction=False)  # transaction=False 不加 MULTI/EXEC
for i in range(1000):
    pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()  # 此时才真正发送

# 带事务的 Pipeline(自动加 MULTI/EXEC)
with r.pipeline() as pipe:
    pipe.set("balance:1001", 100)
    pipe.incr("tx_count")
    pipe.execute()
// Java (Jedis)
try (Jedis jedis = pool.getResource()) {
    Pipeline pipe = jedis.pipelined();
    for (int i = 0; i < 10000; i++) {
        pipe.set("key:" + i, "value:" + i);
    }
    pipe.sync();  // 发送并等待所有结果
}

// Java (Lettuce,异步)
RedisAsyncCommands<String, String> async = connection.async();
List<RedisFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
    futures.add(async.set("key:" + i, "value:" + i));
}
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));
// Go (go-redis)
pipe := rdb.Pipeline()
for i := 0; i < 1000; i++ {
    pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
}
_, err := pipe.Exec(ctx)

1.3 Pipeline 注意事项


2. MSET / MGET vs Pipeline

# MSET:单命令原子设置多个 key-value
MSET key1 val1 key2 val2 key3 val3   # O(N),N为键值对数

# MGET:单命令批量获取
MGET key1 key2 key3                   # 返回值列表,不存在的返回 nil

# MSETNX:仅当所有 key 都不存在时才设置(全部成功或全部失败)
MSETNX key1 val1 key2 val2            # 原子操作

2.1 与 Pipeline 的区别

维度 MSET/MGET Pipeline
原子性 是(单命令)
命令类型 仅 SET/GET 任意命令组合
Cluster 支持 受限(所有 key 必须在同一 slot) 受限(需手动按 slot 分组)
返回值顺序 有序对应 有序对应

2.2 Cluster 下的批量操作

# Cluster 模式下 MSET 跨 slot 会报错
# 解决方案1:使用 hash tag 强制同一 slot
r.mset({"{user}:1001": "Alice", "{user}:1002": "Bob"})  # {} 内容决定 slot

# 解决方案2:按 slot 分组发送 Pipeline
from redis.cluster import RedisCluster
from redis.cluster import ClusterNode

rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])

def mset_cluster(mapping: dict):
    """Cluster 下安全的批量 SET"""
    pipe = rc.pipeline()
    for key, value in mapping.items():
        pipe.set(key, value)
    return pipe.execute()

3. SCAN 替代 KEYS

3.1 为什么禁用 KEYS

KEYS *           # 扫描所有 key,O(N),N=10万时 ≈ 10ms,N=100万时 ≈ 100ms
                 # 单线程模型:执行期间阻塞所有其他命令!

生产事故场景:DBA 在有 500 万 key 的实例上执行 KEYS *,导致服务阻塞 5 秒,QPS 跌零,超时报警全部触发。

3.2 SCAN 用法详解

# SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]
SCAN 0 MATCH user:* COUNT 100
# 返回:[新游标, [key1, key2, ...]]
# 游标 = 0 表示遍历完成(完整游历)

# 示例:完整遍历
SCAN 0 MATCH cache:* COUNT 200
→ [12345, ["cache:a", "cache:b", ...]]  # 继续
SCAN 12345 MATCH cache:* COUNT 200
→ [67890, ["cache:c", ...]]              # 继续
SCAN 67890 MATCH cache:* COUNT 200
→ [0, ["cache:z"]]                       # 游标归零,完成

COUNT 是提示,非精确值:Redis 按 COUNT 为参考扫描一批 bucket,实际返回数量可能多可少,取决于 key 分布。

3.3 Python 封装

def scan_all_keys(pattern: str = "*", count: int = 100):
    """安全的全量 key 扫描,生成器模式"""
    cursor = 0
    while True:
        cursor, keys = r.scan(cursor, match=pattern, count=count)
        for key in keys:
            yield key.decode()
        if cursor == 0:
            break

def delete_by_pattern(pattern: str, batch_size: int = 500):
    """批量删除匹配 key(生产可用)"""
    keys_to_delete = []
    for key in scan_all_keys(pattern):
        keys_to_delete.append(key)
        if len(keys_to_delete) >= batch_size:
            r.delete(*keys_to_delete)
            keys_to_delete = []
    if keys_to_delete:
        r.delete(*keys_to_delete)

3.4 HSCAN / SSCAN / ZSCAN

# HSCAN:遍历 Hash 的 field-value 对
HSCAN myhash 0 MATCH field:* COUNT 50
# 返回:[游标, [field1, value1, field2, value2, ...]]

# SSCAN:遍历 Set 成员
SSCAN myset 0 COUNT 100

# ZSCAN:遍历 ZSet(成员 + 分数)
ZSCAN myzset 0 MATCH user:* COUNT 100
# 返回:[游标, [member1, score1, member2, score2, ...]]
def scan_hash_fields(hash_key: str, pattern: str = "*"):
    """分批遍历大 Hash 的所有 field"""
    cursor = 0
    while True:
        cursor, data = r.hscan(hash_key, cursor, match=pattern, count=200)
        for field, value in data.items():
            yield field.decode(), value.decode()
        if cursor == 0:
            break

4. 批量写入最佳实践

4.1 小批量(< 1000 条):Pipeline

见上文 Pipeline 章节,推荐批次 100–500。

4.2 大批量(百万条):redis-cli --pipe(Mass Insertion)

Redis 协议(RESP)格式预生成,通过管道一次性灌入:

# 生成 RESP 格式数据
python3 << 'EOF'
import sys

def gen_resp(*args):
    cmd = f"*{len(args)}\r\n"
    for arg in args:
        arg = str(arg)
        cmd += f"${len(arg)}\r\n{arg}\r\n"
    return cmd

for i in range(1_000_000):
    sys.stdout.write(gen_resp("SET", f"key:{i}", f"value:{i}"))
EOF | redis-cli --pipe

# 输出示例:
# All data transferred. Waiting for the last reply...
# Last reply received from server.
# errors: 0, replies: 1000000
# 或使用 awk 直接生成(更快)
awk 'BEGIN{
    for(i=1; i<=1000000; i++){
        printf "*3\r\n$3\r\nSET\r\n$%d\r\nkey:%d\r\n$%d\r\nvalue:%d\r\n",
               length("key:"i), i, length("value:"i), i
    }
}' | redis-cli --pipe

4.3 Cluster 大批量:按 slot 分组

from redis.cluster import RedisCluster, ClusterNode
from redis.crc import key_slot

def bulk_insert_cluster(data: dict, batch_size: int = 500):
    """Cluster 下按 slot 分组批量插入"""
    rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])
    
    # 按 slot 分组
    slot_groups = {}
    for key, value in data.items():
        slot = key_slot(key)
        slot_groups.setdefault(slot, {})[key] = value
    
    # 每个 slot 组单独 Pipeline
    for slot, group in slot_groups.items():
        pipe = rc.pipeline()
        for key, value in group.items():
            pipe.set(key, value)
        pipe.execute()

5. OBJECT ENCODING 内存优化

Redis 对不同大小的数据使用不同内部编码,影响内存和速度:

OBJECT ENCODING mykey

# String 编码
SET num 12345       → int(整数直接存储,极省内存)
SET short "hello"   → embstr(≤44字节,连续内存)
SET long "..."      → raw(>44字节,SDS动态字符串)

# Hash 编码(listpack vs hashtable)
HSET small f1 v1 f2 v2  → listpack(field数≤128且每个value≤64字节)
# 超过阈值自动转为 hashtable

# ZSet 编码
ZADD small 1 a 2 b      → listpack(元素数≤128且value≤64字节)
# 超过阈值转 skiplist

# Set 编码
SADD intset 1 2 3       → intset(全整数且元素≤512)
SADD mixed 1 a 2        → listpack 或 hashtable

# List 编码
LPUSH mylist a b c      → listpack(元素数≤128且每个≤64字节)
# 超过阈值转 quicklist
# 查看编码阈值配置
CONFIG GET hash-max-listpack-entries   # 默认 128
CONFIG GET hash-max-listpack-value     # 默认 64
CONFIG GET zset-max-listpack-entries   # 默认 128
CONFIG GET zset-max-listpack-value     # 默认 64
CONFIG GET set-max-intset-entries      # 默认 512
CONFIG GET list-max-listpack-size      # 默认 128

内存优化技巧


6. redis-benchmark 方法论

6.1 基础用法

# 基础测试:50并发,10万请求,测试 SET 和 GET
redis-benchmark -h 127.0.0.1 -p 6379 -c 50 -n 100000 -t set,get

# Pipeline 测试(每批16条)
redis-benchmark -c 50 -n 100000 -t set -P 16

# 指定 value 大小(默认3字节,生产应与实际一致)
redis-benchmark -c 50 -n 100000 -t set -d 512

# 输出 CSV(便于对比分析)
redis-benchmark -c 50 -n 100000 -t set,get --csv

# Cluster 模式
redis-benchmark --cluster -c 50 -n 100000 -t set,get

# 测试自定义命令
redis-benchmark -c 50 -n 100000 \
    eval "return redis.call('set', KEYS[1], ARGV[1])" 1 mykey myvalue

6.2 结果解读

====== SET ======
  100000 requests completed in 1.23 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

99.00% <= 1 milliseconds       ← P99 延迟
99.90% <= 2 milliseconds       ← P999 延迟
100.00% <= 3 milliseconds      ← 最大延迟

81300.81 requests per second   ← QPS

6.3 常见测试误区

误区 正确做法
value 使用默认 3 字节 与生产 value 大小一致(通常 100B–10KB)
并发数设 1 与生产客户端并发数对齐
在 Redis 服务器本机测试 从客户端机器测试(网络 RTT 是关键变量)
只看平均值 关注 P99/P999 延迟
不预热就测试 先跑 1 万请求预热,再开始计时
忽略 CPU 使用率 同时监控 redis-server CPU,避免 CPU 成为瓶颈

6.4 Keyspace notifications 对基准测试的影响

# 如果开启了 keyspace notifications,会显著降低性能
# 基准测试前检查并临时关闭
CONFIG GET notify-keyspace-events
CONFIG SET notify-keyspace-events ""   # 关闭

7. 连接池优化

# Python (redis-py) 连接池配置
pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    max_connections=50,        # 最大连接数,与并发线程数匹配
    socket_timeout=1.0,        # 读超时(秒)
    socket_connect_timeout=1.0, # 连接超时
    retry_on_timeout=True,
    health_check_interval=30,  # 定期检测连接健康
)
r = redis.Redis(connection_pool=pool)
// Java (Jedis Pool)
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(50);               // 最大连接数
config.setMaxIdle(20);                // 最大空闲连接
config.setMinIdle(5);                 // 最小空闲连接
config.setMaxWaitMillis(1000);        // 获取连接最大等待时间
config.setTestOnBorrow(false);        // 借用时不测试(高并发下禁用)
config.setTestWhileIdle(true);        // 空闲时定期测试

JedisPool pool = new JedisPool(config, "localhost", 6379, 2000, "password");

8. 性能问题排查流程

1. 确认慢在哪里
   redis-cli --latency-history -i 1   # 实时延迟监控
   redis-cli --stat                    # 实时统计(QPS/内存/连接数)

2. 查看慢查询日志
   SLOWLOG GET 10                      # 最近10条慢命令
   SLOWLOG RESET

3. 确认是否有大 key
   redis-cli --bigkeys                 # 扫描大 key

4. 确认热 key
   redis-cli --hotkeys                 # 需要 LFU 淘汰策略

5. 查看客户端连接
   CLIENT LIST                         # 所有客户端详情
   INFO clients                        # 连接数统计

6. 查看内存碎片率
   INFO memory | grep mem_fragmentation_ratio
   # > 1.5 建议重启或 MEMORY PURGE(Redis 4.0+)

本章总结

本章评分
4.7  / 5  (3 评分)

💬 留言讨论