Chapter 11

Stream: Kafka-Style Message Stream Inside Redis

Chapter 11: Stream โ€” Kafka-Style Message Streams in Redis

11.1 Why Stream Exists

Before Stream arrived in Redis 5.0 (2018), engineers shoehorned message queues out of List or Sorted Set. That approach has three fundamental flaws:

Flaw 1: No Consumer Groups

BLPOP on a List lets only one consumer receive each message. Parallel consumption requires manual sharding โ€” fragile and operationally expensive.

Flaw 2: No Acknowledgment

LPOP removes the message atomically. If the consumer crashes before completing work, the message is gone. There is no re-delivery.

Flaw 3: No History Replay

A List is consumed destructively. New consumers cannot replay history, and audit trails are impossible.

Stream addresses all three deficiencies while maintaining Redis's characteristic memory efficiency through careful internal encoding.


11.2 Internal Storage Architecture

11.2.1 The Radix Tree Backbone

A Stream object is anchored by a Radix Tree (compressed prefix trie) implemented in rax.h and rax.c. The C struct looks like:

typedef struct stream {
    rax *rax;               /* the radix tree holding stream entries */
    uint64_t length;        /* number of elements in the stream */
    streamID last_id;       /* zero if there are no items */
    streamID first_id;      /* the first non-tombstoned entry (Redis 7.0+) */
    int64_t entries_added;  /* cumulative count of added entries */
    rax *cgroups;           /* Consumer groups dictionary */
} stream;

Lookup time in a Radix Tree is O(k) where k is the key length โ€” a fixed 16 bytes for a Stream Entry ID. This is independent of the total number of messages n, giving predictable latency regardless of stream depth.

11.2.2 Listpack Leaf Nodes

The Radix Tree does not store one leaf node per message. Instead, multiple consecutive messages are packed into a single listpack hanging off each leaf. This is the key to Stream's memory efficiency.

Radix Tree Node:
  shared key prefix โ†’ "1704067200000-"
  leaf data         โ†’ listpack [
    entry 0: [flags=STREAM_ITEM_FLAG_NONE][ms_delta=0][seq_delta=0]
             [num_fields=3][field="user_id"][val="1001"]
             [field="action"][val="place_order"]
             [field="amount"][val="299.00"]
             [lp-element-backlen]
    entry 1: [flags=STREAM_ITEM_FLAG_SAMEFIELDS][ms_delta=1][seq_delta=0]
             [val="1002"][val="cancel"][val="0"]
             [lp-element-backlen]
    ...
  ]

When consecutive entries share the same field names, the STREAM_ITEM_FLAG_SAMEFIELDS flag is set and field names are omitted โ€” only values are stored. This can cut per-message overhead by 50% or more in homogeneous streams.

Tuning parameters:

# Maximum entries per listpack node (default 100)
stream-node-max-entries 100

# Maximum bytes per listpack node (default 4096)
stream-node-max-bytes 4096

When either limit is exceeded, the listpack splits into a new Radix Tree node.

Memory comparison (1,000 messages, 4 fields each, ~10 bytes per value):

Approach Memory Notes
Hash per message ~500 KB each Hash has per-key overhead
Stream (listpack) ~80 KB field name dedup + integer encoding
Compression ratio 6.25ร— typical homogeneous stream

11.2.3 Entry ID Design

Format: <millisecond-timestamp>-<sequence-within-millisecond>
Example: 1704067200000-3
         ^^^^^^^^^^^^^  ^
         Unix epoch ms  4th entry in that millisecond (0-indexed)

Properties:

# Auto-generated ID
XADD orders * user_id 1001 action place_order amount 299.00
# Returns: "1704067200123-0"

# Manually specified ID (for migration)
XADD orders 1704067200000-0 user_id 999 action imported amount 150.00

11.3 Core Commands In Depth

11.3.1 XADD โ€” Appending Messages

# Basic append, auto-generated ID
XADD stream_name * field1 value1 field2 value2

# Approximate trim to ~10,000 entries (recommended for production)
# '~' means Redis may keep a few more entries to avoid splitting a listpack node
XADD orders MAXLEN ~ 10000 * user_id 1001 status shipped

# Exact trim (slower โ€” must split listpack at precise boundary)
XADD orders MAXLEN 10000 * user_id 1001 status shipped

# Trim by minimum ID (remove entries older than this ID)
XADD orders MINID ~ 1704000000000-0 * user_id 1002 status delivered

# Partial update with NOMKSTREAM (don't create stream if absent)
XADD orders NOMKSTREAM * user_id 1003 status refunded

Time complexity: O(1) amortized. Occasional listpack splits are O(node-size) but infrequent.

11.3.2 Reading Messages

# Range read: '-' is smallest possible ID, '+' is largest
XRANGE orders - + COUNT 100

# Range read from a specific starting ID (exclusive lower bound trick: add -0)
XRANGE orders 1704067200000-0 + COUNT 50

# Reverse range
XREVRANGE orders + - COUNT 10

# Non-blocking read from multiple streams
# '0-0' means "from the very beginning"
XREAD COUNT 10 STREAMS orders payments 0-0 0-0

# Non-blocking read: only entries after a known ID
XREAD COUNT 10 STREAMS orders 1704067200000-0

# Blocking read: '$' means "only new entries from now on"
XREAD COUNT 10 BLOCK 0 STREAMS orders $

# Blocking read with timeout (2000 ms)
XREAD COUNT 10 BLOCK 2000 STREAMS orders $

11.3.3 Deletion and Trimming

# Delete a specific entry (logical deletion โ€” listpack entry remains until node GC)
XDEL orders 1704067200123-0

# Exact trim
XTRIM orders MAXLEN 5000

# Approximate trim (preferred โ€” avoids listpack node splits)
XTRIM orders MAXLEN ~ 5000

# Length query
XLEN orders

Important nuance: XDEL decrements stream->length and sets a tombstone flag in the listpack entry. The raw listpack bytes are not freed until every entry in that listpack node is deleted, at which point the entire Radix Tree node is removed. This means the physical memory footprint may temporarily exceed the logical entry count.


11.4 Consumer Groups โ€” Deep Dive

11.4.1 Group Management

# Create group starting at the tail ('$' = only future messages)
XGROUP CREATE orders order_processor $ MKSTREAM

# Create group starting from the beginning ('0' = all history)
XGROUP CREATE orders audit_log 0

# Reposition a group's delivery cursor
XGROUP SETID orders order_processor 1704067200000-0

# Remove a group (also deletes its PEL)
XGROUP DESTROY orders order_processor

# Explicitly add a consumer (also happens automatically on first XREADGROUP)
XGROUP CREATECONSUMER orders order_processor worker-1

# Remove a consumer (its PEL entries are reassigned to group-level PEL)
XGROUP DELCONSUMER orders order_processor worker-1

11.4.2 Consume, Acknowledge, and Reclaim

# Read up to 10 undelivered messages ('>' = entries not yet delivered to any consumer)
XREADGROUP GROUP order_processor worker-1 COUNT 10 STREAMS orders >

# Acknowledge processed messages
XACK orders order_processor 1704067200123-0 1704067200124-0

# Re-read your own pending messages (pass '0' instead of '>')
XREADGROUP GROUP order_processor worker-1 COUNT 10 STREAMS orders 0

# Inspect PEL: pending entries between '-' and '+', up to 10
XPENDING orders order_processor - + 10
# Returns per entry: [id, consumer_name, idle_ms, delivery_count]

# Transfer ownership of a timed-out entry (idle > 1 hour = 3,600,000 ms)
XCLAIM orders order_processor worker-2 3600000 1704067200123-0
# Optional flags: JUSTID (return only IDs), FORCE (claim even if not in PEL)

# Redis 7.0+: auto-claim all entries idle > 30 seconds, starting from '0-0'
XAUTOCLAIM orders order_processor worker-2 30000 0-0 COUNT 10
# Returns: [next-start-id, [[id, data], ...], [deleted-ids]]

11.4.3 PEL Internal Structure

The Pending Entry List is implemented as a Radix Tree mapping binary Entry IDs to streamNACK structs:

// From stream.h (simplified)
typedef struct streamCG {
    streamID last_id;        /* last delivered ID for this group */
    long long entries_read;  /* entries delivered (for lag calculation) */
    rax *pel;                /* group-level PEL: entry_id โ†’ streamNACK */
    rax *consumers;          /* consumers: name โ†’ streamConsumer */
} streamCG;

typedef struct streamConsumer {
    mstime_t seen_time;      /* last time the consumer was active */
    mstime_t active_time;    /* last time the consumer got a message */
    sds name;                /* consumer name */
    rax *pel;                /* consumer-level PEL: entry_id โ†’ streamNACK */
} streamConsumer;

typedef struct streamNACK {
    mstime_t delivery_time;  /* when this entry was last delivered */
    uint64_t delivery_count; /* total delivery attempts */
    streamConsumer *consumer; /* current owner */
} streamNACK;

Both the group-level PEL and the consumer-level PEL point to the same streamNACK object โ€” no duplication. The PEL key is the 16-byte big-endian binary representation of the Entry ID (8 bytes ms + 8 bytes seq).

11.4.4 Message Delivery State Machine

State transitions for a single message:

  [In Stream]
      โ”‚
      โ”‚  XREADGROUP GROUP g c COUNT 1 STREAMS s >
      โ–ผ
  [Pending / In PEL]  โ†โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ XCLAIM (ownership transfer)
      โ”‚                              โ”‚
      โ”‚  XACK                        โ”‚ idle > threshold
      โ–ผ                              โ”‚
  [Acknowledged / Removed from PEL] โ”‚
                                     โ–ผ
                              [Pending, new owner]

11.5 Monitoring and Observability

# Full stream introspection
XINFO STREAM orders FULL COUNT 5
# Key fields: length, radix-tree-keys, radix-tree-nodes,
#             last-generated-id, first-entry, last-entry

# Consumer group summary
XINFO GROUPS orders
# Key fields: name, consumers, pending, last-delivered-id,
#             entries-read, lag

# Per-consumer details
XINFO CONSUMERS orders order_processor
# Key fields: name, pending, idle, inactive

Lag metric (Redis 7.0+):

lag = stream.entries_added - group.entries_read

lag measures how many messages the group has not yet delivered. It is the primary SLO metric for stream consumers. A monotonically increasing lag indicates consumer throughput is below producer rate.

Example Prometheus-style alerting:

import redis

r = redis.Redis()

def check_stream_lag(stream, group, threshold=10000):
    info = r.xinfo_groups(stream)
    for g in info:
        if g['name'] == group:
            lag = g.get('lag', 0) or 0
            if lag > threshold:
                alert(f"Stream {stream} group {group} lag={lag} > {threshold}")

11.6 Production Example: Order Status Notifications

import redis
import time
import logging

logger = logging.getLogger(__name__)
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM = 'stream:orders'
GROUP  = 'notification_service'
MAX_PENDING_IDLE_MS = 30_000  # 30 seconds

# Producer side (Order Service)
def emit_order_event(order_id: int, status: str, amount: float = 0.0) -> str:
    msg_id = r.xadd(
        STREAM,
        {
            'order_id': str(order_id),
            'status': status,
            'amount': str(amount),
            'emitted_at': str(int(time.time() * 1000)),
        },
        maxlen=200_000,
        approximate=True,
    )
    logger.info(f"Emitted {msg_id}: order={order_id} status={status}")
    return msg_id

# Consumer side (Notification Service)
def bootstrap(consumer_name: str):
    try:
        r.xgroup_create(STREAM, GROUP, '$', mkstream=True)
    except redis.ResponseError as e:
        if 'BUSYGROUP' not in str(e):
            raise

def process_one_batch(consumer_name: str):
    # Step 1: try to fetch new messages
    results = r.xreadgroup(
        GROUP, consumer_name,
        {STREAM: '>'},
        count=20,
        block=2000,
    )

    if results:
        for _stream, messages in results:
            for msg_id, data in messages:
                handle_message(consumer_name, msg_id, data)
        return

    # Step 2: no new messages โ€” reclaim stale PEL entries
    next_id, claimed, deleted = r.xautoclaim(
        STREAM, GROUP, consumer_name,
        MAX_PENDING_IDLE_MS, '0-0', count=20
    )
    for msg_id, data in claimed:
        handle_message(consumer_name, msg_id, data)

def handle_message(consumer_name: str, msg_id: str, data: dict):
    try:
        logger.info(f"[{consumer_name}] Processing {msg_id}: {data}")
        send_notification(data)
        r.xack(STREAM, GROUP, msg_id)
    except Exception:
        logger.exception(f"Failed to process {msg_id} โ€” will retry on next claim")
        # Do NOT ack; entry stays in PEL for retry

def send_notification(data: dict):
    # Placeholder: send push/SMS/email based on order status
    pass

def run_consumer(consumer_name: str):
    bootstrap(consumer_name)
    while True:
        try:
            process_one_batch(consumer_name)
        except Exception:
            logger.exception("Consumer error")
            time.sleep(1)

11.7 Redis Stream vs Apache Kafka

Dimension Redis Stream Apache Kafka
Storage Memory (optional persistence) Disk (sequential write)
Partitioning None (single node per Stream) Topic โ†’ multiple Partitions
Peak throughput ~350K msg/s single node ~1M+ msg/s with partitions
Message retention Trimmed by MAXLEN/MINID Time/size-based, default 7 days
Consumer groups Yes, PEL-based Yes, offset-based
History replay Yes (XRANGE by ID) Yes (seek to offset)
Ordering guarantee Total order within Stream Per-partition order
Deployment Single binary, trivial ZooKeeper/KRaft + Broker cluster
Message schema Free-form fields Optional Schema Registry
Replication Redis replication Rack-aware partition replication

Decision guide:


11.8 Performance Benchmarks

Test environment: Redis 7.2, bare metal, 8-core CPU @ 3.5 GHz, 32 GB RAM.

# XADD throughput (50 concurrent connections, 5 fields per message)
redis-benchmark -t xadd -n 1000000 -c 50 --threads 4
# Result: ~350,000 ops/sec

# XRANGE (COUNT 100 per call) โ€” throughput in terms of entries/sec
# ~8,000 calls/sec ร— 100 entries = 800,000 entries/sec read

# Memory: 1,000,000 messages, 5 fields, ~10 bytes per value
# Measured: ~120 MB with default listpack settings
# Compared: ~800 MB with Hash-per-message approach

Listpack tuning impact:

stream-node-max-entries Memory (1M msgs) XRANGE latency (p99)
10 240 MB 0.8 ms
100 (default) 120 MB 1.2 ms
500 105 MB 4.1 ms

Larger nodes save memory but increase per-node scan cost. 100 is a good default.


11.9 Common Pitfalls and Best Practices

Pitfall 1: Unbounded stream growth

Always set MAXLEN ~ N on XADD or schedule XTRIM if retention is not needed. A stream without trimming will grow until Redis OOMs.

Pitfall 2: PEL accumulation

Dead consumers leave their PEL entries forever. Monitor with XPENDING and implement auto-reclaim using XAUTOCLAIM on startup and periodically during idle periods.

Pitfall 3: Using $ when you need history

Creating a consumer group with $ means it will miss all messages published before the group was created. Use 0 for full history replay.

Pitfall 4: Not handling partial XREADGROUP results

XREADGROUP ... > returns at most COUNT entries. Always loop until you get empty results before treating the queue as drained.

Pitfall 5: High delivery_count as a dead letter signal

pending = r.xpending_range(STREAM, GROUP, '-', '+', 100)
dead_letters = [p for p in pending if p['times_delivered'] > 5]
for dl in dead_letters:
    r.xmove(STREAM, 'stream:dead_letters', dl['message_id'])  # custom logic
    r.xack(STREAM, GROUP, dl['message_id'])
Rate this chapter
4.8  / 5  (34 ratings)

๐Ÿ’ฌ Comments