Pub/Sub and Sharded Pub/Sub
Chapter 28: Pub/Sub and Sharded Pub/Sub — Real-Time Messaging in Redis
28.1 What Pub/Sub Is (and Isn't)
Redis Pub/Sub implements the publisher-subscriber messaging pattern: publishers send messages to named channels without knowing who (if anyone) is listening; subscribers receive messages from channels they've subscribed to without knowing who sent them.
The defining characteristic—and the key limitation—is fire-and-forget delivery. When a message is published, Redis immediately fans it out to all currently connected subscribers, then discards it. There is no message queue, no persistence, no acknowledgment. A subscriber that disconnects for even a millisecond will miss messages published during that window.
Good fit for Pub/Sub:
- Real-time online notifications (user A messages user B, who is currently connected)
- Configuration change broadcasts (propagate cache invalidation across application nodes)
- Lightweight event bus between services (metrics events, audit events)
- Live leaderboard refreshes, monitoring alerts
Poor fit for Pub/Sub (use Redis Streams instead):
- Messages must survive subscriber downtime
- Processing is slow and a backlog may accumulate
- At-least-once or exactly-once delivery is required
- Message acknowledgment (ACK) is needed
28.2 Standard Pub/Sub — Internal Architecture
28.2.1 Server-Side Data Structures
Redis tracks Pub/Sub state in pubsub.c using two global structures and per-client tracking:
/* server global state */
dict *pubsub_channels;
/* Maps: channel name (robj*) → linked list of subscriber clients */
list *pubsub_patterns;
/* Linked list of pubsubPattern structs for pattern subscribers */
/* Per-client state */
dict *client->pubsub_channels;
/* Channels this client has subscribed to (O(1) lookup) */
list *client->pubsub_patterns;
/* Pattern subscriptions for this client */
/* Pattern subscription descriptor */
typedef struct pubsubPattern {
client *client; /* The subscribing client */
robj *pattern; /* The glob pattern (e.g., "news.*") */
} pubsubPattern;
28.2.2 SUBSCRIBE Flow
# Client A subscribes to two channels
SUBSCRIBE news sports
Internal steps:
- Append clientA to
server.pubsub_channels["news"]subscriber list - Append clientA to
server.pubsub_channels["sports"]subscriber list - Add "news" and "sports" to
clientA->pubsub_channelsdict - Send a subscribe confirmation response per channel
# Subscribe confirmation (RESP wire format)
*3\r\n
$9\r\nsubscribe\r\n ← message type
$4\r\nnews\r\n ← channel name
:1\r\n ← total subscriptions for this client
28.2.3 PUBLISH Flow
PUBLISH news "Breaking: Redis 8 released"
Internal steps:
- Look up
server.pubsub_channels["news"]→ subscriber list - Iterate the list, calling
addReplyPubsubMessageon each subscriber client - Iterate
server.pubsub_patterns, glob-match against "news" for each pattern - Send
pmessageto any matching pattern subscribers - Return integer: total number of clients that received the message
# Message received by subscriber (RESP wire format)
*3\r\n
$7\r\nmessage\r\n ← message type
$4\r\nnews\r\n ← channel name
$28\r\nBreaking: Redis 8 released\r\n ← payload
28.2.4 Pattern Subscriptions with PSUBSCRIBE
PSUBSCRIBE news.* # Matches: news.cn, news.us, news.sports
PSUBSCRIBE user:[0-9]* # Matches: user:1000, user:9999
PSUBSCRIBE * # Matches all channels
Pattern subscribers receive a four-element message (includes the matched pattern):
*4\r\n
$8\r\npmessage\r\n ← message type (pmessage, not message)
$6\r\nnews.*\r\n ← the matched pattern
$7\r\nnews.cn\r\n ← the actual channel name
$15\r\nHello from China\r\n ← payload
Performance note: PUBLISH scans server.pubsub_patterns linearly. With many pattern subscriptions, PUBLISH becomes O(N_patterns × M_channels). Avoid subscribing thousands of patterns.
28.2.5 Connection State Restrictions
In subscription mode, a client can only issue:
SUBSCRIBE / UNSUBSCRIBEPSUBSCRIBE / PUNSUBSCRIBEPING(for keepalive)QUITRESET(Redis 6+, exits subscription mode)
All other commands return: ERR Command not allowed inside a subscription context
Implication: a subscribed connection cannot be reused for ordinary commands. Production systems need a dedicated Pub/Sub connection pool, separate from the command connection pool.
28.3 Keyspace Notifications
28.3.1 Configuration
Keyspace notifications are Pub/Sub applied internally: Redis publishes events about its own key operations to special channels, allowing applications to subscribe to database events.
# redis.conf
notify-keyspace-events "" # Disabled (default)
notify-keyspace-events KEA # Enable all events
# Configuration flags
# K = Keyspace events (__keyspace@<db>__:<key>)
# E = Keyevent events (__keyevent@<db>__:<event>)
# g = Generic commands (DEL, EXPIRE, RENAME...)
# $ = String commands
# l = List commands
# s = Set commands
# h = Hash commands
# z = ZSet commands
# x = Expired events (when a key expires)
# d = Module key space events
# A = Alias for all event types (g$lshzxd)
# Enable at runtime — no restart required
CONFIG SET notify-keyspace-events KEA
# Subscribe to all key expiration events in DB 0
SUBSCRIBE __keyevent@0__:expired
# Subscribe to all operations on a specific key
SUBSCRIBE __keyspace@0__:user:1000
# Subscribe to all SET commands in DB 0
SUBSCRIBE __keyevent@0__:set
28.3.2 Keyspace vs. Keyevent Channels
| Channel type | Format | Message content | Use case |
|---|---|---|---|
| keyspace | __keyspace@<db>__:<key> |
Operation name ("set", "expired") | Track all changes to a specific key |
| keyevent | __keyevent@<db>__:<event> |
Key name | Track all keys affected by a specific event |
# Scenario: key "order:12345" expires
# Keyspace notification (who's watching this key?)
# Channel: __keyspace@0__:order:12345
# Message: "expired"
# Keyevent notification (who's watching expiration events?)
# Channel: __keyevent@0__:expired
# Message: "order:12345"
28.3.3 Performance Impact
Keyspace notifications carry a measurable CPU cost:
- Every write command triggers
notifyKeyspaceEvent() - That function looks up matching subscribers in
server.pubsub_channels - The lookup happens even if no one is subscribed
Measured impact: enabling notify-keyspace-events A (all events) reduces throughput by 15–25% in write-heavy workloads.
Recommendation: Enable only the event types your application actually needs. For session expiration detection, Kx (keyspace + expired only) is sufficient:
CONFIG SET notify-keyspace-events Kx
28.4 Pub/Sub in Redis Cluster — The Broadcast Problem
28.4.1 How Standard PUBLISH Behaves in Cluster
In Redis Cluster, PUBLISH broadcasts to all nodes:
Redis Cluster with 3 primary nodes
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Node A │ │ Node B │ │ Node C │
│ Slots 0-5460 │ │Slots 5461- │ │Slots 10923- │
│ │ │ 10922 │ │ 16383 │
└──────────────┘ └──────────────┘ └──────────────┘
↑
PUBLISH "news" "hello"
↓ broadcasts to all nodes
Node A → delivers to its local "news" subscribers
Node B → delivers to its local "news" subscribers
Node C → delivers to its local "news" subscribers
Problem: PUBLISH generates internal cluster messages proportional to the number of nodes. A 12-node cluster means 12x the internal traffic per published message. This doesn't scale.
Root cause: Standard Pub/Sub channels are not slot-addressed, so there's no way to route a publish to just one node. Redis falls back to full cluster-wide gossip.
28.5 Redis 7 Sharded Pub/Sub
28.5.1 Core Idea
Redis 7.0 (April 2022) introduced Sharded Pub/Sub with a clean solution: treat channels as slot-addressed resources. A channel's slot is computed with the same CRC16(channel) % 16384 function used for keys. SPUBLISH routes the message only to the node responsible for that slot. No cross-node broadcast.
Network cost drops from O(N_nodes) to O(1).
28.5.2 New Command Set
# Subscribe to a sharded channel
# Client MUST be connected to the node owning the channel's slot
SSUBSCRIBE {user:1}:events
# Unsubscribe from a sharded channel
SUNSUBSCRIBE {user:1}:events
# Publish to a sharded channel
# Client library routes to the correct node automatically
SPUBLISH {user:1}:events "login"
# Inspect sharded Pub/Sub state
PUBSUB SHARDCHANNELS # Active sharded channels on this node
PUBSUB SHARDCHANNELS user:* # Filtered by pattern
PUBSUB SHARDNUMSUB {user:1}:events {user:2}:events # Subscriber counts
28.5.3 How the Routing Works
Step-by-step example: SPUBLISH {user:1}:events "login"
1. Compute slot: CRC16("{user:1}") % 16384 = 2222 (example)
2. Cluster map: slot 2222 → Node A
3. Client library sends SPUBLISH to Node A directly
4. Node A looks up local subscribers to {user:1}:events
5. Node A delivers message to all local subscribers
6. Done. No messages sent to Node B or Node C.
28.5.4 Hash Tags Are Essential
Sharded Pub/Sub makes sense only when related channels land on the same slot:
# Good: all channels for user:1 use the same hash tag
# They will all be on the same slot / same node
SSUBSCRIBE {user:1}:chat
SSUBSCRIBE {user:1}:notifications
SSUBSCRIBE {user:1}:presence
# Good: all channels for room:100 co-located
SSUBSCRIBE {room:100}:messages
SSUBSCRIBE {room:100}:system-events
# Bad: no hash tag — unrelated channels could be on any node
SSUBSCRIBE user_1_events # slot based on full string, unpredictable
Different users (e.g., {user:1} and {user:2}) will intentionally land on different slots—that's correct design. Messages for different users are processed on different nodes, providing natural isolation and load distribution.
28.5.5 Client Library Support
# redis-py 4.3+ — sharded pub/sub in cluster mode
from redis.cluster import RedisCluster
rc = RedisCluster(
startup_nodes=[{"host": "redis-node1", "port": 6379}],
decode_responses=True
)
# Subscribe
pubsub = rc.pubsub()
pubsub.ssubscribe("{user:1}:events")
# Receive messages in a thread
def message_handler(message):
if message["type"] == "smessage":
print(f"User 1 event: {message['data']}")
pubsub.ssubscribe(**{"{user:1}:events": message_handler})
thread = pubsub.run_in_thread(sleep_time=0.001, daemon=True)
# Publish
rc.spublish("{user:1}:events", "user_logged_in")
// go-redis v9 — cluster sharded pub/sub
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"node1:6379", "node2:6379", "node3:6379"},
})
ctx := context.Background()
// Subscribe
pubsub := rdb.SSubscribe(ctx, "{user:1}:events")
defer pubsub.Close()
go func() {
ch := pubsub.Channel()
for msg := range ch {
fmt.Printf("Received: %s\n", msg.Payload)
}
}()
// Publish
err := rdb.SPublish(ctx, "{user:1}:events", "user_logged_in").Err()
28.6 Pub/Sub vs. Streams — Choosing the Right Tool
28.6.1 Feature Matrix
| Feature | Pub/Sub | Sharded Pub/Sub | Redis Stream |
|---|---|---|---|
| Message persistence | No | No | Yes |
| Offline message delivery | Lost | Lost | Retained (consumer groups) |
| Message ACK | No | No | Yes (XACK) |
| Consumer groups | No | No | Yes (XGROUP) |
| Cluster broadcast | All nodes | One node (by slot) | One node (by slot) |
| Backpressure handling | None | None | MAXLEN trimming |
| Latency | < 1ms | < 1ms | 1–5ms |
| Best for | Real-time notifications | Large-scale real-time | Reliable message queue |
28.6.2 Decision Framework
Is at-least-once delivery required?
├── Yes → Redis Streams with Consumer Groups
└── No (real-time, best-effort)
├── Running in Redis Cluster?
│ ├── Yes → Sharded Pub/Sub (SSUBSCRIBE/SPUBLISH)
│ └── No → Standard Pub/Sub
└── High number of pattern subscriptions (> 100)?
→ Be careful: PSUBSCRIBE at scale makes PUBLISH O(N_patterns)
→ Consider converting patterns to explicit channel subscriptions
28.7 Production Operations Guide
28.7.1 Connection Architecture
# Production pattern: dedicated Pub/Sub connections
import redis
import threading
from typing import Callable, Dict
class PubSubManager:
"""Manages dedicated Pub/Sub connections separate from command connections."""
def __init__(self, redis_url: str):
# Command connection (for PUBLISH)
self._cmd = redis.from_url(redis_url)
# Dedicated subscriber connection
self._pubsub = self._cmd.pubsub()
self._listener_thread = None
self._handlers: Dict[str, Callable] = {}
def subscribe(self, channel: str, handler: Callable):
self._handlers[channel] = handler
self._pubsub.subscribe(**{channel: self._dispatch})
if not self._listener_thread:
self._listener_thread = self._pubsub.run_in_thread(
sleep_time=0.001,
daemon=True
)
def _dispatch(self, message):
if message["type"] in ("message", "pmessage"):
channel = message["channel"]
if isinstance(channel, bytes):
channel = channel.decode()
handler = self._handlers.get(channel)
if handler:
handler(message["data"])
def publish(self, channel: str, data: str) -> int:
return self._cmd.publish(channel, data)
def unsubscribe(self, *channels):
self._pubsub.unsubscribe(*channels)
for ch in channels:
self._handlers.pop(ch, None)
def close(self):
if self._listener_thread:
self._listener_thread.stop()
self._pubsub.close()
self._cmd.close()
28.7.2 Keepalive Strategy
Pub/Sub connections are long-lived and at risk of silent termination by NAT devices, load balancers, or firewalls that time out idle connections:
# Server-side TCP keepalive
tcp-keepalive 60 # Send TCP keepalive probes every 60 seconds
# Client-side PING (redis-py handles PONG automatically in run_in_thread)
# For manual ping:
pubsub.ping()
# Expected response: {'type': 'pong', 'data': b''}
# Reconnection handling
class ResilientPubSub:
def reconnect(self):
try:
self._pubsub.close()
except:
pass
self._pubsub = self._cmd.pubsub()
# Re-subscribe to all previous channels
for channel, handler in self._handlers.items():
self._pubsub.subscribe(**{channel: self._dispatch})
28.7.3 Monitoring Pub/Sub Health
# How many channels have active subscribers?
PUBSUB CHANNELS
# Returns list of channel names
# Filter by pattern
PUBSUB CHANNELS user:*
# Subscriber count per channel
PUBSUB NUMSUB news sports chat
# Returns: news 45, sports 12, chat 8
# Number of active pattern subscriptions
PUBSUB NUMPAT
# Returns: 3
# Sharded pub/sub monitoring (Redis 7+)
PUBSUB SHARDCHANNELS
PUBSUB SHARDNUMSUB {user:1}:events {user:2}:events
# INFO stats
redis-cli INFO stats | grep pubsub
# pubsub_channels:5
# pubsub_patterns:2
# pubsub_shardchannels:10 (Redis 7+)
28.7.4 Message Serialization
Pub/Sub payloads are byte strings. Choose a serialization format appropriate to your latency and debuggability needs:
import json
import msgpack
from dataclasses import dataclass, asdict
@dataclass
class UserEvent:
event_type: str
user_id: int
timestamp: float
# JSON — human readable, larger payload
event = UserEvent("login", 1000, 1716000000.0)
r.publish("user:events", json.dumps(asdict(event)))
# MessagePack — compact binary, ~50% smaller than JSON
r.publish("user:events", msgpack.packb(asdict(event), use_bin_type=True))
# For simple string events (minimal overhead)
r.publish(f"user:{user_id}:events", f"login:{timestamp}")
28.8 Summary
Redis Pub/Sub delivers real-time, best-effort messaging with extremely low latency. Standard Pub/Sub excels in standalone and master-replica deployments. In Redis Cluster, standard PUBLISH suffers from O(N_nodes) broadcast cost—Redis 7's Sharded Pub/Sub solves this by routing channels to specific slots, reducing cluster communication to O(1).
Key takeaways:
- Pub/Sub is fire-and-forget; use Streams when you need durability or ACK
- Keyspace notifications are Pub/Sub under the hood; enable selectively to avoid CPU overhead
- In Cluster, prefer
SSUBSCRIBE/SPUBLISHoverSUBSCRIBE/PUBLISHfor efficiency - Hash tags (
{...}) ensure related channels co-locate on the same node - Always maintain a dedicated subscriber connection, separate from the command connection
- Implement PING-based keepalive and reconnection logic for production robustness