Chapter 37

Pipeline, Batch Operations and Performance Optimization

Chapter 37: Pipeline, Bulk Operations, and Performance Optimization

Every Redis command travels through a full round-trip: client sends → network → server processes → network → client receives. The RTT (Round-Trip Time) is almost always the bottleneck — not Redis's processing speed. This chapter covers Pipeline, bulk commands, SCAN patterns, mass insertion, and benchmark methodology to extract maximum throughput.


1. Pipeline: Principle and RTT Savings

1.1 The Problem with One-at-a-Time Commands

Assume network RTT = 1 ms, Redis processing speed = 100,000 commands/sec:

Single-command model:
  CMD: send (0.5 ms) + process (0.01 ms) + receive (0.5 ms) ≈ 1.01 ms per command
  → Effective throughput ceiling: ~990 commands/sec  (RTT-bound, not CPU-bound)

Pipeline model (batch size = 100):
  Send 100 commands (0.5 ms) + process 100×0.01 ms (1 ms) + receive (0.5 ms) ≈ 2 ms / 100 commands
  → Effective throughput: ~50,000 commands/sec  (50× improvement)

1.2 Pipeline Usage

import redis

r = redis.Redis()

# Wrong: N round-trips
for i in range(1000):
    r.set(f"key:{i}", f"value:{i}")

# Right: 1 round-trip for all 1000 commands
pipe = r.pipeline(transaction=False)    # transaction=False omits MULTI/EXEC wrapper
for i in range(1000):
    pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()

# Transactional pipeline (wraps with MULTI/EXEC)
with r.pipeline() as pipe:
    pipe.set("balance:1001", 500)
    pipe.incr("tx_count")
    pipe.execute()
// Java — Jedis
try (Jedis jedis = pool.getResource()) {
    Pipeline pipe = jedis.pipelined();
    for (int i = 0; i < 10_000; i++) {
        pipe.set("key:" + i, "value:" + i);
    }
    pipe.sync();   // flush and wait for all responses
}

// Java — Lettuce (async, non-blocking)
RedisAsyncCommands<String, String> async = connection.async();
List<RedisFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10_000; i++) {
    futures.add(async.set("key:" + i, "value:" + i));
}
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[0]));
// Go — go-redis
pipe := rdb.Pipeline()
for i := 0; i < 1000; i++ {
    pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
}
_, err := pipe.Exec(ctx)

1.3 Pipeline Caveats


2. MSET / MGET vs Pipeline

# MSET: atomically set multiple key-value pairs in one command
MSET key1 val1 key2 val2 key3 val3     # O(N)

# MGET: retrieve multiple values in one command
MGET key1 key2 key3                     # returns ordered list; missing keys → nil

# MSETNX: set all keys only if none of them already exist (atomic all-or-nothing)
MSETNX key1 val1 key2 val2

2.1 Comparison Table

Dimension MSET / MGET Pipeline
Atomicity Yes (single command) No
Command variety Only SET/GET Any command mix
Redis Cluster Restricted (all keys must hash to same slot) Restricted (manual slot grouping)
Return value order Matches input order Matches execution order

2.2 Bulk Operations in Cluster Mode

# MSET across different hash slots fails in Cluster mode
# Solution 1: force the same slot using hash tags {}
r.mset({"{user}:1001": "Alice", "{user}:1002": "Bob"})   # {} content determines slot

# Solution 2: group by slot and pipeline each group
from redis.cluster import RedisCluster, ClusterNode

rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])

def mset_cluster(mapping: dict):
    """Safe bulk SET in Cluster mode — groups by slot internally."""
    pipe = rc.pipeline()
    for key, value in mapping.items():
        pipe.set(key, value)
    return pipe.execute()   # redis-py cluster pipeline handles slot routing

3. SCAN vs KEYS

3.1 Why KEYS Is Banned in Production

KEYS *    # O(N) full scan; with 10M keys, this blocks the server for ~1 second
          # Redis is single-threaded: every other command waits during KEYS execution

Real incident: A DBA ran KEYS * on a 5 million key instance. The server blocked for ~5 seconds. QPS dropped to zero, thousands of timeouts fired, and cascading failures hit downstream services.

3.2 SCAN in Depth

# Syntax: SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]
SCAN 0 MATCH user:* COUNT 100
# Returns: [new_cursor, [key1, key2, ...]]
# When new_cursor = 0, the full keyspace has been traversed

# Full traversal example:
SCAN 0 MATCH cache:* COUNT 200
→ [12345, ["cache:a", "cache:b", ...]]   # cursor != 0, continue
SCAN 12345 MATCH cache:* COUNT 200
→ [67890, ["cache:c", ...]]
SCAN 67890 MATCH cache:* COUNT 200
→ [0, ["cache:z"]]                       # cursor = 0, done

Important: COUNT is a hint, not an exact limit. Redis scans approximately that many hash buckets; actual key count returned may differ due to key distribution.

3.3 Python Generator Wrapper

def scan_all_keys(pattern: str = "*", count: int = 100):
    """Generator — yields keys one at a time, never blocking."""
    cursor = 0
    while True:
        cursor, keys = r.scan(cursor, match=pattern, count=count)
        for key in keys:
            yield key.decode()
        if cursor == 0:
            break

def delete_by_pattern(pattern: str, batch_size: int = 500):
    """Production-safe bulk delete matching a pattern."""
    batch = []
    for key in scan_all_keys(pattern):
        batch.append(key)
        if len(batch) >= batch_size:
            r.delete(*batch)
            batch = []
    if batch:
        r.delete(*batch)

3.4 HSCAN / SSCAN / ZSCAN

# HSCAN: iterate Hash field-value pairs
HSCAN myhash 0 MATCH field:* COUNT 50
# Returns: [cursor, [field1, value1, field2, value2, ...]]

# SSCAN: iterate Set members
SSCAN myset 0 COUNT 100

# ZSCAN: iterate Sorted Set members with scores
ZSCAN myzset 0 MATCH user:* COUNT 100
# Returns: [cursor, [member1, score1, member2, score2, ...]]
def scan_hash_fields(hash_key: str, pattern: str = "*"):
    """Incrementally iterate a large Hash without blocking."""
    cursor = 0
    while True:
        cursor, data = r.hscan(hash_key, cursor, match=pattern, count=200)
        for field, value in data.items():
            yield field.decode(), value.decode()
        if cursor == 0:
            break

4. Bulk Write Best Practices

4.1 Small Batches (< 1,000 records): Pipeline

See Section 1 above. Optimal batch: 100–500 commands.

4.2 Large Batches (Millions of Records): redis-cli --pipe

The Mass Insertion protocol pre-formats commands in RESP wire format and streams them to Redis without handshaking on each command:

# Generate RESP-formatted data in Python
python3 << 'EOF'
import sys

def gen_resp(*args):
    cmd = f"*{len(args)}\r\n"
    for arg in args:
        arg = str(arg)
        cmd += f"${len(arg)}\r\n{arg}\r\n"
    return cmd

for i in range(1_000_000):
    sys.stdout.write(gen_resp("SET", f"key:{i}", f"value:{i}"))
EOF | redis-cli --pipe

# Sample output:
# All data transferred. Waiting for the last reply...
# Last reply received from server.
# errors: 0, replies: 1000000
# Pure awk (much faster generation than Python for simple patterns)
awk 'BEGIN{
    for (i = 1; i <= 1000000; i++) {
        key   = "key:" i
        value = "value:" i
        printf "*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n",
               length(key), key, length(value), value
    }
}' | redis-cli --pipe

4.3 Cluster Bulk Insert: Slot Grouping

from redis.cluster import RedisCluster, ClusterNode
from redis.commands.cluster import ClusterCommands
from redis.crc import key_slot

def bulk_insert_cluster(data: dict, batch_size: int = 500):
    rc = RedisCluster(startup_nodes=[ClusterNode("localhost", 7000)])
    
    # Group keys by slot
    slot_groups: dict[int, dict] = {}
    for key, value in data.items():
        slot = key_slot(key)
        slot_groups.setdefault(slot, {})[key] = value
    
    # Pipeline per slot group
    for slot, group in slot_groups.items():
        pipe = rc.pipeline()
        for key, value in group.items():
            pipe.set(key, value)
        pipe.execute()

5. OBJECT ENCODING and Memory Optimization

Redis selects compact internal encodings for small collections; knowing the thresholds lets you tune memory usage.

OBJECT ENCODING mykey

# String encodings
SET count 42          # → int (integer stored directly, very low overhead)
SET msg "hello"       # → embstr (≤44 bytes, contiguous allocation)
SET blob "..."        # → raw (>44 bytes, dynamic SDS)

# Hash encodings
HSET h f1 v1 f2 v2   # → listpack (≤128 fields AND each value ≤64 bytes)
                      # → hashtable (once either threshold is exceeded)

# ZSet encodings
ZADD z 1.0 a 2.0 b   # → listpack (≤128 members AND each member ≤64 bytes)
                      # → skiplist

# Set encodings
SADD s 1 2 3          # → intset (all integers AND ≤512 members)
SADD s 1 "a"          # → listpack or hashtable

# List encodings
LPUSH l a b c         # → listpack (≤128 elements AND each ≤64 bytes)
                      # → quicklist
# Inspect and tune encoding thresholds
CONFIG GET hash-max-listpack-entries    # default: 128
CONFIG GET hash-max-listpack-value      # default: 64 bytes
CONFIG GET zset-max-listpack-entries    # default: 128
CONFIG GET zset-max-listpack-value      # default: 64 bytes
CONFIG GET set-max-intset-entries       # default: 512
CONFIG GET list-max-listpack-size       # default: 128

Optimization pattern: Store user data as Hash fields (many small Hashes) rather than as many individual String keys. This exploits listpack encoding and dramatically reduces per-key metadata overhead.


6. redis-benchmark Methodology

6.1 Basic Usage

# 50 concurrent clients, 100k requests, test SET and GET
redis-benchmark -h 127.0.0.1 -p 6379 -c 50 -n 100000 -t set,get

# Pipeline batch of 16
redis-benchmark -c 50 -n 100000 -t set -P 16

# Match real-world value size (default is only 3 bytes)
redis-benchmark -c 50 -n 100000 -t set -d 512

# CSV output for comparison and charting
redis-benchmark -c 50 -n 100000 -t set,get --csv

# Cluster mode
redis-benchmark --cluster -c 50 -n 100000 -t set,get

# Test a custom Lua script
redis-benchmark -c 50 -n 100000 \
    eval "return redis.call('set', KEYS[1], ARGV[1])" 1 bench:key bench:val

6.2 Reading Results

====== SET ======
  100000 requests completed in 1.23 seconds
  50 parallel clients
  512 bytes payload
  keep alive: 1

99.00% <= 1 milliseconds      ← P99 latency: 99% of requests under 1 ms
99.90% <= 2 milliseconds      ← P999
100.00% <= 3 milliseconds     ← worst case observed

81300.81 requests per second  ← throughput

6.3 Common Benchmarking Mistakes

Mistake Correct Approach
Use default 3-byte value Match production value size (typically 100 B – 10 KB)
Concurrency = 1 Match the number of concurrent connections in production
Test on the Redis server itself Test from a client machine (RTT is the variable to measure)
Look only at mean latency Focus on P99 / P999 — mean hides tail latency
Skip warm-up Run ~10k requests first, then start timing
Ignore CPU Monitor redis-server CPU simultaneously; avoid CPU being the bottleneck

6.4 Disable Keyspace Notifications Before Benchmarking

# Keyspace notifications add meaningful overhead
CONFIG GET notify-keyspace-events
CONFIG SET notify-keyspace-events ""    # disable during benchmark

7. Connection Pool Tuning

# Python (redis-py)
pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    max_connections=50,             # should match application thread/coroutine count
    socket_timeout=1.0,
    socket_connect_timeout=1.0,
    retry_on_timeout=True,
    health_check_interval=30,       # periodic connection health checks
)
r = redis.Redis(connection_pool=pool)
// Java (Jedis)
JedisPoolConfig cfg = new JedisPoolConfig();
cfg.setMaxTotal(50);
cfg.setMaxIdle(20);
cfg.setMinIdle(5);
cfg.setMaxWaitMillis(1000);
cfg.setTestOnBorrow(false);        // disable for high-throughput paths
cfg.setTestWhileIdle(true);        // health-check idle connections

JedisPool pool = new JedisPool(cfg, "localhost", 6379, 2000, "password");
// Go (go-redis)
rdb := redis.NewClient(&redis.Options{
    Addr:         "localhost:6379",
    PoolSize:     50,
    MinIdleConns: 5,
    DialTimeout:  time.Second,
    ReadTimeout:  time.Second,
    WriteTimeout: time.Second,
})

8. Performance Troubleshooting Workflow

When Redis becomes slow, follow this sequence:

1. Measure real-time latency
   redis-cli --latency-history -i 1      # latency sample every 1 second
   redis-cli --stat                       # live stats (QPS, memory, connections)

2. Check the slowlog
   SLOWLOG GET 20
   SLOWLOG RESET

3. Identify big keys
   redis-cli --bigkeys -i 0.1            # throttled scan (100 SCAN cmds then 0.1s sleep)

4. Identify hot keys
   redis-cli --hotkeys                   # requires LFU eviction policy

5. Inspect clients
   CLIENT LIST                           # per-connection details
   INFO clients                          # aggregate client stats

6. Check memory fragmentation
   INFO memory | grep mem_fragmentation_ratio
   # > 1.5: consider activedefrag or a planned restart

7. Check persistence load
   INFO persistence                      # is bgsave / AOF rewrite running?

8. Check replication backlog
   INFO replication                      # is repl_backlog_histlen near repl_backlog_size?

Chapter Summary

Rate this chapter
4.7  / 5  (3 ratings)

💬 Comments