Pipeline, Batch Operations and Performance Optimization
Chapter 37: Pipeline, Bulk Operations, and Performance Optimization
Every Redis command travels through a full round-trip: client sends → network → server processes → network → client receives. The RTT (Round-Trip Time) is almost always the bottleneck — not Redis's processing speed. This chapter covers Pipeline, bulk commands, SCAN patterns, mass insertion, and benchmark methodology to extract maximum throughput.
1. Pipeline: Principle and RTT Savings
1.1 The Problem with One-at-a-Time Commands
Assume network RTT = 1 ms, Redis processing speed = 100,000 commands/sec:
Single-command model:
CMD: send (0.5 ms) + process (0.01 ms) + receive (0.5 ms) ≈ 1.01 ms per command
→ Effective throughput ceiling: ~990 commands/sec (RTT-bound, not CPU-bound)
Pipeline model (batch size = 100):
Send 100 commands (0.5 ms) + process 100×0.01 ms (1 ms) + receive (0.5 ms) ≈ 2 ms / 100 commands
→ Effective throughput: ~50,000 commands/sec (50× improvement)
1.2 Pipeline Usage
import redis
r = redis.Redis()
# Wrong: N round-trips
for i in range(1000):
r.set(f"key:{i}", f"value:{i}")
# Right: 1 round-trip for all 1000 commands
pipe = r.pipeline(transaction=False) # transaction=False omits MULTI/EXEC wrapper
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()
# Transactional pipeline (wraps with MULTI/EXEC)
with r.pipeline() as pipe:
pipe.set("balance:1001", 500)
pipe.incr("tx_count")
pipe.execute()
// Java — Jedis
try (Jedis jedis = pool.getResource()) {
Pipeline pipe = jedis.pipelined();
for (int i = 0; i < 10_000; i++) {
pipe.set("key:" + i, "value:" + i);
}
pipe.sync(); // flush and wait for all responses
}
// Java — Lettuce (async, non-blocking)
RedisAsyncCommands<String, String> async = connection.async();
List<RedisFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10_000; i++) {
futures.add(async.set("key:" + i, "value:" + i));
}
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));
// Go — go-redis
pipe := rdb.Pipeline()
for i := 0; i < 1000; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
}
_, err := pipe.Exec(ctx)
1.3 Pipeline Caveats
- Not atomic: if a command in the middle fails, others still execute — use
MULTI/EXECfor atomicity. - No inter-command dependencies: later commands in the pipeline cannot use return values from earlier ones.
- No
WATCHinside a plain pipeline: use the transactional pipeline form instead. - Optimal batch size: 100–1,000 commands. Larger batches increase client-side buffer memory and server output buffer pressure.
2. MSET / MGET vs Pipeline
# MSET: atomically set multiple key-value pairs in one command
MSET key1 val1 key2 val2 key3 val3 # O(N)
# MGET: retrieve multiple values in one command
MGET key1 key2 key3 # returns ordered list; missing keys → nil
# MSETNX: set all keys only if none of them already exist (atomic all-or-nothing)
MSETNX key1 val1 key2 val2
2.1 Comparison Table
| Dimension | MSET / MGET | Pipeline |
|---|---|---|
| Atomicity | Yes (single command) | No |
| Command variety | Only SET/GET | Any command mix |
| Redis Cluster | Restricted (all keys must hash to same slot) | Restricted (manual slot grouping) |
| Return value order | Matches input order | Matches execution order |
2.2 Bulk Operations in Cluster Mode
# MSET across different hash slots fails in Cluster mode
# Solution 1: force the same slot using hash tags {}
r.mset({"{user}:1001": "Alice", "{user}:1002": "Bob"}) # {} content determines slot
# Solution 2: group by slot and pipeline each group
from redis.cluster import RedisCluster, ClusterNode
rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])
def mset_cluster(mapping: dict):
"""Safe bulk SET in Cluster mode — groups by slot internally."""
pipe = rc.pipeline()
for key, value in mapping.items():
pipe.set(key, value)
return pipe.execute() # redis-py cluster pipeline handles slot routing
3. SCAN vs KEYS
3.1 Why KEYS Is Banned in Production
KEYS * # O(N) full scan; with 10M keys, this blocks the server for ~1 second
# Redis is single-threaded: every other command waits during KEYS execution
Real incident: A DBA ran KEYS * on a 5 million key instance. The server blocked for ~5 seconds. QPS dropped to zero, thousands of timeouts fired, and cascading failures hit downstream services.
3.2 SCAN in Depth
# Syntax: SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]
SCAN 0 MATCH user:* COUNT 100
# Returns: [new_cursor, [key1, key2, ...]]
# When new_cursor = 0, the full keyspace has been traversed
# Full traversal example:
SCAN 0 MATCH cache:* COUNT 200
→ [12345, ["cache:a", "cache:b", ...]] # cursor != 0, continue
SCAN 12345 MATCH cache:* COUNT 200
→ [67890, ["cache:c", ...]]
SCAN 67890 MATCH cache:* COUNT 200
→ [0, ["cache:z"]] # cursor = 0, done
Important: COUNT is a hint, not an exact limit. Redis scans approximately that many hash buckets; actual key count returned may differ due to key distribution.
3.3 Python Generator Wrapper
def scan_all_keys(pattern: str = "*", count: int = 100):
"""Generator — yields keys one at a time, never blocking."""
cursor = 0
while True:
cursor, keys = r.scan(cursor, match=pattern, count=count)
for key in keys:
yield key.decode()
if cursor == 0:
break
def delete_by_pattern(pattern: str, batch_size: int = 500):
"""Production-safe bulk delete matching a pattern."""
batch = []
for key in scan_all_keys(pattern):
batch.append(key)
if len(batch) >= batch_size:
r.delete(*batch)
batch = []
if batch:
r.delete(*batch)
3.4 HSCAN / SSCAN / ZSCAN
# HSCAN: iterate Hash field-value pairs
HSCAN myhash 0 MATCH field:* COUNT 50
# Returns: [cursor, [field1, value1, field2, value2, ...]]
# SSCAN: iterate Set members
SSCAN myset 0 COUNT 100
# ZSCAN: iterate Sorted Set members with scores
ZSCAN myzset 0 MATCH user:* COUNT 100
# Returns: [cursor, [member1, score1, member2, score2, ...]]
def scan_hash_fields(hash_key: str, pattern: str = "*"):
"""Incrementally iterate a large Hash without blocking."""
cursor = 0
while True:
cursor, data = r.hscan(hash_key, cursor, match=pattern, count=200)
for field, value in data.items():
yield field.decode(), value.decode()
if cursor == 0:
break
4. Bulk Write Best Practices
4.1 Small Batches (< 1,000 records): Pipeline
See Section 1 above. Optimal batch: 100–500 commands.
4.2 Large Batches (Millions of Records): redis-cli --pipe
The Mass Insertion protocol pre-formats commands in RESP wire format and streams them to Redis without handshaking on each command:
# Generate RESP-formatted data in Python
python3 << 'EOF'
import sys
def gen_resp(*args):
cmd = f"*{len(args)}\r\n"
for arg in args:
arg = str(arg)
cmd += f"${len(arg)}\r\n{arg}\r\n"
return cmd
for i in range(1_000_000):
sys.stdout.write(gen_resp("SET", f"key:{i}", f"value:{i}"))
EOF | redis-cli --pipe
# Sample output:
# All data transferred. Waiting for the last reply...
# Last reply received from server.
# errors: 0, replies: 1000000
# Pure awk (much faster generation than Python for simple patterns)
awk 'BEGIN{
for (i = 1; i <= 1000000; i++) {
key = "key:" i
value = "value:" i
printf "*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n",
length(key), key, length(value), value
}
}' | redis-cli --pipe
4.3 Cluster Bulk Insert: Slot Grouping
from redis.cluster import RedisCluster, ClusterNode
from redis.commands.cluster import ClusterCommands
from redis.crc import key_slot
def bulk_insert_cluster(data: dict, batch_size: int = 500):
rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])
# Group keys by slot
slot_groups: dict[int, dict] = {}
for key, value in data.items():
slot = key_slot(key)
slot_groups.setdefault(slot, {})[key] = value
# Pipeline per slot group
for slot, group in slot_groups.items():
pipe = rc.pipeline()
for key, value in group.items():
pipe.set(key, value)
pipe.execute()
5. OBJECT ENCODING and Memory Optimization
Redis selects compact internal encodings for small collections; knowing the thresholds lets you tune memory usage.
OBJECT ENCODING mykey
# String encodings
SET count 42 # → int (integer stored directly, very low overhead)
SET msg "hello" # → embstr (≤44 bytes, contiguous allocation)
SET blob "..." # → raw (>44 bytes, dynamic SDS)
# Hash encodings
HSET h f1 v1 f2 v2 # → listpack (≤128 fields AND each value ≤64 bytes)
# → hashtable (once either threshold is exceeded)
# ZSet encodings
ZADD z 1.0 a 2.0 b # → listpack (≤128 members AND each member ≤64 bytes)
# → skiplist
# Set encodings
SADD s 1 2 3 # → intset (all integers AND ≤512 members)
SADD s 1 "a" # → listpack or hashtable
# List encodings
LPUSH l a b c # → listpack (≤128 elements AND each ≤64 bytes)
# → quicklist
# Inspect and tune encoding thresholds
CONFIG GET hash-max-listpack-entries # default: 128
CONFIG GET hash-max-listpack-value # default: 64 bytes
CONFIG GET zset-max-listpack-entries # default: 128
CONFIG GET zset-max-listpack-value # default: 64 bytes
CONFIG GET set-max-intset-entries # default: 512
CONFIG GET list-max-listpack-size # default: 128
Optimization pattern: Store user data as Hash fields (many small Hashes) rather than as many individual String keys. This exploits listpack encoding and dramatically reduces per-key metadata overhead.
6. redis-benchmark Methodology
6.1 Basic Usage
# 50 concurrent clients, 100k requests, test SET and GET
redis-benchmark -h 127.0.0.1 -p 6379 -c 50 -n 100000 -t set,get
# Pipeline batch of 16
redis-benchmark -c 50 -n 100000 -t set -P 16
# Match real-world value size (default is only 3 bytes)
redis-benchmark -c 50 -n 100000 -t set -d 512
# CSV output for comparison and charting
redis-benchmark -c 50 -n 100000 -t set,get --csv
# Cluster mode
redis-benchmark --cluster -c 50 -n 100000 -t set,get
# Test a custom Lua script
redis-benchmark -c 50 -n 100000 \
eval "return redis.call('set', KEYS[1], ARGV[1])" 1 bench:key bench:val
6.2 Reading Results
====== SET ======
100000 requests completed in 1.23 seconds
50 parallel clients
512 bytes payload
keep alive: 1
99.00% <= 1 milliseconds ← P99 latency: 99% of requests under 1 ms
99.90% <= 2 milliseconds ← P999
100.00% <= 3 milliseconds ← worst case observed
81300.81 requests per second ← throughput
6.3 Common Benchmarking Mistakes
| Mistake | Correct Approach |
|---|---|
| Use default 3-byte value | Match production value size (typically 100 B – 10 KB) |
| Concurrency = 1 | Match the number of concurrent connections in production |
| Test on the Redis server itself | Test from a client machine (RTT is the variable to measure) |
| Look only at mean latency | Focus on P99 / P999 — mean hides tail latency |
| Skip warm-up | Run ~10k requests first, then start timing |
| Ignore CPU | Monitor redis-server CPU simultaneously; avoid CPU being the bottleneck |
6.4 Disable Keyspace Notifications Before Benchmarking
# Keyspace notifications add meaningful overhead
CONFIG GET notify-keyspace-events
CONFIG SET notify-keyspace-events "" # disable during benchmark
7. Connection Pool Tuning
# Python (redis-py)
pool = redis.ConnectionPool(
host="localhost",
port=6379,
max_connections=50, # should match application thread/coroutine count
socket_timeout=1.0,
socket_connect_timeout=1.0,
retry_on_timeout=True,
health_check_interval=30, # periodic connection health checks
)
r = redis.Redis(connection_pool=pool)
// Java (Jedis)
JedisPoolConfig cfg = new JedisPoolConfig();
cfg.setMaxTotal(50);
cfg.setMaxIdle(20);
cfg.setMinIdle(5);
cfg.setMaxWaitMillis(1000);
cfg.setTestOnBorrow(false); // disable for high-throughput paths
cfg.setTestWhileIdle(true); // health-check idle connections
JedisPool pool = new JedisPool(cfg, "localhost", 6379, 2000, "password");
// Go (go-redis)
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 50,
MinIdleConns: 5,
DialTimeout: time.Second,
ReadTimeout: time.Second,
WriteTimeout: time.Second,
})
8. Performance Troubleshooting Workflow
When Redis becomes slow, follow this sequence:
1. Measure real-time latency
redis-cli --latency-history -i 1 # latency sample every 1 second
redis-cli --stat # live stats (QPS, memory, connections)
2. Check the slowlog
SLOWLOG GET 20
SLOWLOG RESET
3. Identify big keys
redis-cli --bigkeys -i 0.1 # throttled scan (100 SCAN cmds then 0.1s sleep)
4. Identify hot keys
redis-cli --hotkeys # requires LFU eviction policy
5. Inspect clients
CLIENT LIST # per-connection details
INFO clients # aggregate client stats
6. Check memory fragmentation
INFO memory | grep mem_fragmentation_ratio
# > 1.5: consider activedefrag or a planned restart
7. Check persistence load
INFO persistence # is bgsave / AOF rewrite running?
8. Check replication backlog
INFO replication # is repl_backlog_histlen near repl_backlog_size?
Chapter Summary
- Pipeline eliminates RTT overhead; batch size 100–500 is the sweet spot.
MSET/MGETare atomic and clean for non-Cluster use; in Cluster, slot-group your keys.KEYS *is unconditionally banned in production — replace withSCAN.- Mass insertion of millions of records: use
redis-cli --pipewith RESP pre-formatted data. OBJECT ENCODINGreveals the internal representation; keeping collections within listpack thresholds saves significant memory.- Benchmark with realistic value sizes, matching concurrency, and from a remote client machine.