Stream: Kafka-Style Message Stream Inside Redis
Chapter 11: Stream — Kafka-Style Message Streams in Redis
11.1 Why Stream Exists
Before Stream arrived in Redis 5.0 (2018), engineers shoehorned message queues out of List or Sorted Set. That approach has three fundamental flaws:
Flaw 1: No Consumer Groups
BLPOP on a List lets only one consumer receive each message. Parallel consumption requires manual sharding — fragile and operationally expensive.
Flaw 2: No Acknowledgment
LPOP removes the message atomically. If the consumer crashes before completing work, the message is gone. There is no re-delivery.
Flaw 3: No History Replay
A List is consumed destructively. New consumers cannot replay history, and audit trails are impossible.
Stream addresses all three deficiencies while maintaining Redis's characteristic memory efficiency through careful internal encoding.
11.2 Internal Storage Architecture
11.2.1 The Radix Tree Backbone
A Stream object is anchored by a Radix Tree (compressed prefix trie) implemented in rax.h and rax.c. The C struct looks like:
typedef struct stream {
rax *rax; /* the radix tree holding stream entries */
uint64_t length; /* number of elements in the stream */
streamID last_id; /* zero if there are no items */
streamID first_id; /* the first non-tombstoned entry (Redis 7.0+) */
int64_t entries_added; /* cumulative count of added entries */
rax *cgroups; /* Consumer groups dictionary */
} stream;
Lookup time in a Radix Tree is O(k) where k is the key length — a fixed 16 bytes for a Stream Entry ID. This is independent of the total number of messages n, giving predictable latency regardless of stream depth.
11.2.2 Listpack Leaf Nodes
The Radix Tree does not store one leaf node per message. Instead, multiple consecutive messages are packed into a single listpack hanging off each leaf. This is the key to Stream's memory efficiency.
Radix Tree Node:
shared key prefix → "1704067200000-"
leaf data → listpack [
entry 0: [flags=STREAM_ITEM_FLAG_NONE][ms_delta=0][seq_delta=0]
[num_fields=3][field="user_id"][val="1001"]
[field="action"][val="place_order"]
[field="amount"][val="299.00"]
[lp-element-backlen]
entry 1: [flags=STREAM_ITEM_FLAG_SAMEFIELDS][ms_delta=1][seq_delta=0]
[val="1002"][val="cancel"][val="0"]
[lp-element-backlen]
...
]
When consecutive entries share the same field names, the STREAM_ITEM_FLAG_SAMEFIELDS flag is set and field names are omitted — only values are stored. This can cut per-message overhead by 50% or more in homogeneous streams.
Tuning parameters:
# Maximum entries per listpack node (default 100)
stream-node-max-entries 100
# Maximum bytes per listpack node (default 4096)
stream-node-max-bytes 4096
When either limit is exceeded, the listpack splits into a new Radix Tree node.
Memory comparison (1,000 messages, 4 fields each, ~10 bytes per value):
| Approach | Memory | Notes |
|---|---|---|
| Hash per message | ~500 KB | each Hash has per-key overhead |
| Stream (listpack) | ~80 KB | field name dedup + integer encoding |
| Compression ratio | 6.25× | typical homogeneous stream |
11.2.3 Entry ID Design
Format: <millisecond-timestamp>-<sequence-within-millisecond>
Example: 1704067200000-3
^^^^^^^^^^^^^ ^
Unix epoch ms 4th entry in that millisecond (0-indexed)
Properties:
- Strictly monotonically increasing: guarantees global ordering
- Automatic generation:
*tells Redis to auto-assign using current clock - Manual specification: supply an explicit ID for data migration or testing
- Clock regression handling: if system clock goes backward, Redis keeps the last known ms timestamp and increments only the sequence
# Auto-generated ID
XADD orders * user_id 1001 action place_order amount 299.00
# Returns: "1704067200123-0"
# Manually specified ID (for migration)
XADD orders 1704067200000-0 user_id 999 action imported amount 150.00
11.3 Core Commands In Depth
11.3.1 XADD — Appending Messages
# Basic append, auto-generated ID
XADD stream_name * field1 value1 field2 value2
# Approximate trim to ~10,000 entries (recommended for production)
# '~' means Redis may keep a few more entries to avoid splitting a listpack node
XADD orders MAXLEN ~ 10000 * user_id 1001 status shipped
# Exact trim (slower — must split listpack at precise boundary)
XADD orders MAXLEN 10000 * user_id 1001 status shipped
# Trim by minimum ID (remove entries older than this ID)
XADD orders MINID ~ 1704000000000-0 * user_id 1002 status delivered
# Partial update with NOMKSTREAM (don't create stream if absent)
XADD orders NOMKSTREAM * user_id 1003 status refunded
Time complexity: O(1) amortized. Occasional listpack splits are O(node-size) but infrequent.
11.3.2 Reading Messages
# Range read: '-' is smallest possible ID, '+' is largest
XRANGE orders - + COUNT 100
# Range read from a specific starting ID (exclusive lower bound trick: add -0)
XRANGE orders 1704067200000-0 + COUNT 50
# Reverse range
XREVRANGE orders + - COUNT 10
# Non-blocking read from multiple streams
# '0-0' means "from the very beginning"
XREAD COUNT 10 STREAMS orders payments 0-0 0-0
# Non-blocking read: only entries after a known ID
XREAD COUNT 10 STREAMS orders 1704067200000-0
# Blocking read: '$' means "only new entries from now on"
XREAD COUNT 10 BLOCK 0 STREAMS orders $
# Blocking read with timeout (2000 ms)
XREAD COUNT 10 BLOCK 2000 STREAMS orders $
11.3.3 Deletion and Trimming
# Delete a specific entry (logical deletion — listpack entry remains until node GC)
XDEL orders 1704067200123-0
# Exact trim
XTRIM orders MAXLEN 5000
# Approximate trim (preferred — avoids listpack node splits)
XTRIM orders MAXLEN ~ 5000
# Length query
XLEN orders
Important nuance: XDEL decrements stream->length and sets a tombstone flag in the listpack entry. The raw listpack bytes are not freed until every entry in that listpack node is deleted, at which point the entire Radix Tree node is removed. This means the physical memory footprint may temporarily exceed the logical entry count.
11.4 Consumer Groups — Deep Dive
11.4.1 Group Management
# Create group starting at the tail ('$' = only future messages)
XGROUP CREATE orders order_processor $ MKSTREAM
# Create group starting from the beginning ('0' = all history)
XGROUP CREATE orders audit_log 0
# Reposition a group's delivery cursor
XGROUP SETID orders order_processor 1704067200000-0
# Remove a group (also deletes its PEL)
XGROUP DESTROY orders order_processor
# Explicitly add a consumer (also happens automatically on first XREADGROUP)
XGROUP CREATECONSUMER orders order_processor worker-1
# Remove a consumer (its PEL entries are reassigned to group-level PEL)
XGROUP DELCONSUMER orders order_processor worker-1
11.4.2 Consume, Acknowledge, and Reclaim
# Read up to 10 undelivered messages ('>' = entries not yet delivered to any consumer)
XREADGROUP GROUP order_processor worker-1 COUNT 10 STREAMS orders >
# Acknowledge processed messages
XACK orders order_processor 1704067200123-0 1704067200124-0
# Re-read your own pending messages (pass '0' instead of '>')
XREADGROUP GROUP order_processor worker-1 COUNT 10 STREAMS orders 0
# Inspect PEL: pending entries between '-' and '+', up to 10
XPENDING orders order_processor - + 10
# Returns per entry: [id, consumer_name, idle_ms, delivery_count]
# Transfer ownership of a timed-out entry (idle > 1 hour = 3,600,000 ms)
XCLAIM orders order_processor worker-2 3600000 1704067200123-0
# Optional flags: JUSTID (return only IDs), FORCE (claim even if not in PEL)
# Redis 7.0+: auto-claim all entries idle > 30 seconds, starting from '0-0'
XAUTOCLAIM orders order_processor worker-2 30000 0-0 COUNT 10
# Returns: [next-start-id, [[id, data], ...], [deleted-ids]]
11.4.3 PEL Internal Structure
The Pending Entry List is implemented as a Radix Tree mapping binary Entry IDs to streamNACK structs:
// From stream.h (simplified)
typedef struct streamCG {
streamID last_id; /* last delivered ID for this group */
long long entries_read; /* entries delivered (for lag calculation) */
rax *pel; /* group-level PEL: entry_id → streamNACK */
rax *consumers; /* consumers: name → streamConsumer */
} streamCG;
typedef struct streamConsumer {
mstime_t seen_time; /* last time the consumer was active */
mstime_t active_time; /* last time the consumer got a message */
sds name; /* consumer name */
rax *pel; /* consumer-level PEL: entry_id → streamNACK */
} streamConsumer;
typedef struct streamNACK {
mstime_t delivery_time; /* when this entry was last delivered */
uint64_t delivery_count; /* total delivery attempts */
streamConsumer *consumer; /* current owner */
} streamNACK;
Both the group-level PEL and the consumer-level PEL point to the same streamNACK object — no duplication. The PEL key is the 16-byte big-endian binary representation of the Entry ID (8 bytes ms + 8 bytes seq).
11.4.4 Message Delivery State Machine
State transitions for a single message:
[In Stream]
│
│ XREADGROUP GROUP g c COUNT 1 STREAMS s >
▼
[Pending / In PEL] ←──────── XCLAIM (ownership transfer)
│ │
│ XACK │ idle > threshold
▼ │
[Acknowledged / Removed from PEL] │
▼
[Pending, new owner]
11.5 Monitoring and Observability
# Full stream introspection
XINFO STREAM orders FULL COUNT 5
# Key fields: length, radix-tree-keys, radix-tree-nodes,
# last-generated-id, first-entry, last-entry
# Consumer group summary
XINFO GROUPS orders
# Key fields: name, consumers, pending, last-delivered-id,
# entries-read, lag
# Per-consumer details
XINFO CONSUMERS orders order_processor
# Key fields: name, pending, idle, inactive
Lag metric (Redis 7.0+):
lag = stream.entries_added - group.entries_read
lag measures how many messages the group has not yet delivered. It is the primary SLO metric for stream consumers. A monotonically increasing lag indicates consumer throughput is below producer rate.
Example Prometheus-style alerting:
import redis
r = redis.Redis()
def check_stream_lag(stream, group, threshold=10000):
info = r.xinfo_groups(stream)
for g in info:
if g['name'] == group:
lag = g.get('lag', 0) or 0
if lag > threshold:
alert(f"Stream {stream} group {group} lag={lag} > {threshold}")
11.6 Production Example: Order Status Notifications
import redis
import time
import logging
logger = logging.getLogger(__name__)
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM = 'stream:orders'
GROUP = 'notification_service'
MAX_PENDING_IDLE_MS = 30_000 # 30 seconds
# Producer side (Order Service)
def emit_order_event(order_id: int, status: str, amount: float = 0.0) -> str:
msg_id = r.xadd(
STREAM,
{
'order_id': str(order_id),
'status': status,
'amount': str(amount),
'emitted_at': str(int(time.time() * 1000)),
},
maxlen=200_000,
approximate=True,
)
logger.info(f"Emitted {msg_id}: order={order_id} status={status}")
return msg_id
# Consumer side (Notification Service)
def bootstrap(consumer_name: str):
try:
r.xgroup_create(STREAM, GROUP, '$', mkstream=True)
except redis.ResponseError as e:
if 'BUSYGROUP' not in str(e):
raise
def process_one_batch(consumer_name: str):
# Step 1: try to fetch new messages
results = r.xreadgroup(
GROUP, consumer_name,
{STREAM: '>'},
count=20,
block=2000,
)
if results:
for _stream, messages in results:
for msg_id, data in messages:
handle_message(consumer_name, msg_id, data)
return
# Step 2: no new messages — reclaim stale PEL entries
next_id, claimed, deleted = r.xautoclaim(
STREAM, GROUP, consumer_name,
MAX_PENDING_IDLE_MS, '0-0', count=20
)
for msg_id, data in claimed:
handle_message(consumer_name, msg_id, data)
def handle_message(consumer_name: str, msg_id: str, data: dict):
try:
logger.info(f"[{consumer_name}] Processing {msg_id}: {data}")
send_notification(data)
r.xack(STREAM, GROUP, msg_id)
except Exception:
logger.exception(f"Failed to process {msg_id} — will retry on next claim")
# Do NOT ack; entry stays in PEL for retry
def send_notification(data: dict):
# Placeholder: send push/SMS/email based on order status
pass
def run_consumer(consumer_name: str):
bootstrap(consumer_name)
while True:
try:
process_one_batch(consumer_name)
except Exception:
logger.exception("Consumer error")
time.sleep(1)
11.7 Redis Stream vs Apache Kafka
| Dimension | Redis Stream | Apache Kafka |
|---|---|---|
| Storage | Memory (optional persistence) | Disk (sequential write) |
| Partitioning | None (single node per Stream) | Topic → multiple Partitions |
| Peak throughput | ~350K msg/s single node | ~1M+ msg/s with partitions |
| Message retention | Trimmed by MAXLEN/MINID | Time/size-based, default 7 days |
| Consumer groups | Yes, PEL-based | Yes, offset-based |
| History replay | Yes (XRANGE by ID) | Yes (seek to offset) |
| Ordering guarantee | Total order within Stream | Per-partition order |
| Deployment | Single binary, trivial | ZooKeeper/KRaft + Broker cluster |
| Message schema | Free-form fields | Optional Schema Registry |
| Replication | Redis replication | Rack-aware partition replication |
Decision guide:
- Already running Redis, message volume < 500K/s, team size < 5 → Redis Stream
- Need durable disk storage for regulatory compliance → Kafka
- Need cross-datacenter replication with MirrorMaker → Kafka
- Need Kafka Connect or ksqlDB ecosystem → Kafka
- Sub-millisecond end-to-end latency required → Redis Stream (memory-resident)
11.8 Performance Benchmarks
Test environment: Redis 7.2, bare metal, 8-core CPU @ 3.5 GHz, 32 GB RAM.
# XADD throughput (50 concurrent connections, 5 fields per message)
redis-benchmark -t xadd -n 1000000 -c 50 --threads 4
# Result: ~350,000 ops/sec
# XRANGE (COUNT 100 per call) — throughput in terms of entries/sec
# ~8,000 calls/sec × 100 entries = 800,000 entries/sec read
# Memory: 1,000,000 messages, 5 fields, ~10 bytes per value
# Measured: ~120 MB with default listpack settings
# Compared: ~800 MB with Hash-per-message approach
Listpack tuning impact:
| stream-node-max-entries | Memory (1M msgs) | XRANGE latency (p99) |
|---|---|---|
| 10 | 240 MB | 0.8 ms |
| 100 (default) | 120 MB | 1.2 ms |
| 500 | 105 MB | 4.1 ms |
Larger nodes save memory but increase per-node scan cost. 100 is a good default.
11.9 Common Pitfalls and Best Practices
Pitfall 1: Unbounded stream growth
Always set MAXLEN ~ N on XADD or schedule XTRIM if retention is not needed. A stream without trimming will grow until Redis OOMs.
Pitfall 2: PEL accumulation
Dead consumers leave their PEL entries forever. Monitor with XPENDING and implement auto-reclaim using XAUTOCLAIM on startup and periodically during idle periods.
Pitfall 3: Using $ when you need history
Creating a consumer group with $ means it will miss all messages published before the group was created. Use 0 for full history replay.
Pitfall 4: Not handling partial XREADGROUP results
XREADGROUP ... > returns at most COUNT entries. Always loop until you get empty results before treating the queue as drained.
Pitfall 5: High delivery_count as a dead letter signal
pending = r.xpending_range(STREAM, GROUP, '-', '+', 100)
dead_letters = [p for p in pending if p['times_delivered'] > 5]
for dl in dead_letters:
r.xmove(STREAM, 'stream:dead_letters', dl['message_id']) # custom logic
r.xack(STREAM, GROUP, dl['message_id'])