Chapter 28

Pub/Sub and Sharded Pub/Sub

Chapter 28: Pub/Sub and Sharded Pub/Sub โ€” Real-Time Messaging in Redis

28.1 What Pub/Sub Is (and Isn't)

Redis Pub/Sub implements the publisher-subscriber messaging pattern: publishers send messages to named channels without knowing who (if anyone) is listening; subscribers receive messages from channels they've subscribed to without knowing who sent them.

The defining characteristicโ€”and the key limitationโ€”is fire-and-forget delivery. When a message is published, Redis immediately fans it out to all currently connected subscribers, then discards it. There is no message queue, no persistence, no acknowledgment. A subscriber that disconnects for even a millisecond will miss messages published during that window.

Good fit for Pub/Sub:

Poor fit for Pub/Sub (use Redis Streams instead):


28.2 Standard Pub/Sub โ€” Internal Architecture

28.2.1 Server-Side Data Structures

Redis tracks Pub/Sub state in pubsub.c using two global structures and per-client tracking:

/* server global state */
dict *pubsub_channels;
/* Maps: channel name (robj*) โ†’ linked list of subscriber clients */

list *pubsub_patterns;
/* Linked list of pubsubPattern structs for pattern subscribers */

/* Per-client state */
dict *client->pubsub_channels;
/* Channels this client has subscribed to (O(1) lookup) */

list *client->pubsub_patterns;
/* Pattern subscriptions for this client */

/* Pattern subscription descriptor */
typedef struct pubsubPattern {
    client *client;   /* The subscribing client */
    robj *pattern;    /* The glob pattern (e.g., "news.*") */
} pubsubPattern;

28.2.2 SUBSCRIBE Flow

# Client A subscribes to two channels
SUBSCRIBE news sports

Internal steps:

  1. Append clientA to server.pubsub_channels["news"] subscriber list
  2. Append clientA to server.pubsub_channels["sports"] subscriber list
  3. Add "news" and "sports" to clientA->pubsub_channels dict
  4. Send a subscribe confirmation response per channel
# Subscribe confirmation (RESP wire format)
*3\r\n
$9\r\nsubscribe\r\n    โ† message type
$4\r\nnews\r\n         โ† channel name
:1\r\n                 โ† total subscriptions for this client

28.2.3 PUBLISH Flow

PUBLISH news "Breaking: Redis 8 released"

Internal steps:

  1. Look up server.pubsub_channels["news"] โ†’ subscriber list
  2. Iterate the list, calling addReplyPubsubMessage on each subscriber client
  3. Iterate server.pubsub_patterns, glob-match against "news" for each pattern
  4. Send pmessage to any matching pattern subscribers
  5. Return integer: total number of clients that received the message
# Message received by subscriber (RESP wire format)
*3\r\n
$7\r\nmessage\r\n                         โ† message type
$4\r\nnews\r\n                            โ† channel name
$28\r\nBreaking: Redis 8 released\r\n    โ† payload

28.2.4 Pattern Subscriptions with PSUBSCRIBE

PSUBSCRIBE news.*          # Matches: news.cn, news.us, news.sports
PSUBSCRIBE user:[0-9]*     # Matches: user:1000, user:9999
PSUBSCRIBE *               # Matches all channels

Pattern subscribers receive a four-element message (includes the matched pattern):

*4\r\n
$8\r\npmessage\r\n         โ† message type (pmessage, not message)
$6\r\nnews.*\r\n           โ† the matched pattern
$7\r\nnews.cn\r\n          โ† the actual channel name
$15\r\nHello from China\r\n โ† payload

Performance note: PUBLISH scans server.pubsub_patterns linearly. With many pattern subscriptions, PUBLISH becomes O(N_patterns ร— M_channels). Avoid subscribing thousands of patterns.

28.2.5 Connection State Restrictions

In subscription mode, a client can only issue:

All other commands return: ERR Command not allowed inside a subscription context

Implication: a subscribed connection cannot be reused for ordinary commands. Production systems need a dedicated Pub/Sub connection pool, separate from the command connection pool.


28.3 Keyspace Notifications

28.3.1 Configuration

Keyspace notifications are Pub/Sub applied internally: Redis publishes events about its own key operations to special channels, allowing applications to subscribe to database events.

# redis.conf
notify-keyspace-events ""     # Disabled (default)
notify-keyspace-events KEA    # Enable all events

# Configuration flags
# K = Keyspace events (__keyspace@<db>__:<key>)
# E = Keyevent events (__keyevent@<db>__:<event>)
# g = Generic commands (DEL, EXPIRE, RENAME...)
# $ = String commands
# l = List commands
# s = Set commands
# h = Hash commands
# z = ZSet commands
# x = Expired events (when a key expires)
# d = Module key space events
# A = Alias for all event types (g$lshzxd)
# Enable at runtime โ€” no restart required
CONFIG SET notify-keyspace-events KEA

# Subscribe to all key expiration events in DB 0
SUBSCRIBE __keyevent@0__:expired

# Subscribe to all operations on a specific key
SUBSCRIBE __keyspace@0__:user:1000

# Subscribe to all SET commands in DB 0
SUBSCRIBE __keyevent@0__:set

28.3.2 Keyspace vs. Keyevent Channels

Channel type Format Message content Use case
keyspace __keyspace@<db>__:<key> Operation name ("set", "expired") Track all changes to a specific key
keyevent __keyevent@<db>__:<event> Key name Track all keys affected by a specific event
# Scenario: key "order:12345" expires

# Keyspace notification (who's watching this key?)
# Channel: __keyspace@0__:order:12345
# Message: "expired"

# Keyevent notification (who's watching expiration events?)
# Channel: __keyevent@0__:expired
# Message: "order:12345"

28.3.3 Performance Impact

Keyspace notifications carry a measurable CPU cost:

  1. Every write command triggers notifyKeyspaceEvent()
  2. That function looks up matching subscribers in server.pubsub_channels
  3. The lookup happens even if no one is subscribed

Measured impact: enabling notify-keyspace-events A (all events) reduces throughput by 15โ€“25% in write-heavy workloads.

Recommendation: Enable only the event types your application actually needs. For session expiration detection, Kx (keyspace + expired only) is sufficient:

CONFIG SET notify-keyspace-events Kx

28.4 Pub/Sub in Redis Cluster โ€” The Broadcast Problem

28.4.1 How Standard PUBLISH Behaves in Cluster

In Redis Cluster, PUBLISH broadcasts to all nodes:

Redis Cluster with 3 primary nodes
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Node A     โ”‚   โ”‚   Node B     โ”‚   โ”‚   Node C     โ”‚
โ”‚ Slots 0-5460 โ”‚   โ”‚Slots 5461-   โ”‚   โ”‚Slots 10923-  โ”‚
โ”‚              โ”‚   โ”‚   10922      โ”‚   โ”‚   16383      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
        โ†‘
PUBLISH "news" "hello"
        โ†“ broadcasts to all nodes
Node A โ†’ delivers to its local "news" subscribers
Node B โ†’ delivers to its local "news" subscribers
Node C โ†’ delivers to its local "news" subscribers

Problem: PUBLISH generates internal cluster messages proportional to the number of nodes. A 12-node cluster means 12x the internal traffic per published message. This doesn't scale.

Root cause: Standard Pub/Sub channels are not slot-addressed, so there's no way to route a publish to just one node. Redis falls back to full cluster-wide gossip.


28.5 Redis 7 Sharded Pub/Sub

28.5.1 Core Idea

Redis 7.0 (April 2022) introduced Sharded Pub/Sub with a clean solution: treat channels as slot-addressed resources. A channel's slot is computed with the same CRC16(channel) % 16384 function used for keys. SPUBLISH routes the message only to the node responsible for that slot. No cross-node broadcast.

Network cost drops from O(N_nodes) to O(1).

28.5.2 New Command Set

# Subscribe to a sharded channel
# Client MUST be connected to the node owning the channel's slot
SSUBSCRIBE {user:1}:events

# Unsubscribe from a sharded channel
SUNSUBSCRIBE {user:1}:events

# Publish to a sharded channel
# Client library routes to the correct node automatically
SPUBLISH {user:1}:events "login"

# Inspect sharded Pub/Sub state
PUBSUB SHARDCHANNELS          # Active sharded channels on this node
PUBSUB SHARDCHANNELS user:*   # Filtered by pattern
PUBSUB SHARDNUMSUB {user:1}:events {user:2}:events  # Subscriber counts

28.5.3 How the Routing Works

Step-by-step example: SPUBLISH {user:1}:events "login"

1. Compute slot: CRC16("{user:1}") % 16384 = 2222 (example)
2. Cluster map: slot 2222 โ†’ Node A
3. Client library sends SPUBLISH to Node A directly
4. Node A looks up local subscribers to {user:1}:events
5. Node A delivers message to all local subscribers
6. Done. No messages sent to Node B or Node C.

28.5.4 Hash Tags Are Essential

Sharded Pub/Sub makes sense only when related channels land on the same slot:

# Good: all channels for user:1 use the same hash tag
# They will all be on the same slot / same node
SSUBSCRIBE {user:1}:chat
SSUBSCRIBE {user:1}:notifications
SSUBSCRIBE {user:1}:presence

# Good: all channels for room:100 co-located
SSUBSCRIBE {room:100}:messages
SSUBSCRIBE {room:100}:system-events

# Bad: no hash tag โ€” unrelated channels could be on any node
SSUBSCRIBE user_1_events    # slot based on full string, unpredictable

Different users (e.g., {user:1} and {user:2}) will intentionally land on different slotsโ€”that's correct design. Messages for different users are processed on different nodes, providing natural isolation and load distribution.

28.5.5 Client Library Support

# redis-py 4.3+ โ€” sharded pub/sub in cluster mode
from redis.cluster import RedisCluster

rc = RedisCluster(
    startup_nodes=[{"host": "redis-node1", "port": 6379}],
    decode_responses=True
)

# Subscribe
pubsub = rc.pubsub()
pubsub.ssubscribe("{user:1}:events")

# Receive messages in a thread
def message_handler(message):
    if message["type"] == "smessage":
        print(f"User 1 event: {message['data']}")

pubsub.ssubscribe(**{"{user:1}:events": message_handler})
thread = pubsub.run_in_thread(sleep_time=0.001, daemon=True)

# Publish
rc.spublish("{user:1}:events", "user_logged_in")
// go-redis v9 โ€” cluster sharded pub/sub
rdb := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{"node1:6379", "node2:6379", "node3:6379"},
})

ctx := context.Background()

// Subscribe
pubsub := rdb.SSubscribe(ctx, "{user:1}:events")
defer pubsub.Close()

go func() {
    ch := pubsub.Channel()
    for msg := range ch {
        fmt.Printf("Received: %s\n", msg.Payload)
    }
}()

// Publish
err := rdb.SPublish(ctx, "{user:1}:events", "user_logged_in").Err()

28.6 Pub/Sub vs. Streams โ€” Choosing the Right Tool

28.6.1 Feature Matrix

Feature Pub/Sub Sharded Pub/Sub Redis Stream
Message persistence No No Yes
Offline message delivery Lost Lost Retained (consumer groups)
Message ACK No No Yes (XACK)
Consumer groups No No Yes (XGROUP)
Cluster broadcast All nodes One node (by slot) One node (by slot)
Backpressure handling None None MAXLEN trimming
Latency < 1ms < 1ms 1โ€“5ms
Best for Real-time notifications Large-scale real-time Reliable message queue

28.6.2 Decision Framework

Is at-least-once delivery required?
โ”œโ”€โ”€ Yes โ†’ Redis Streams with Consumer Groups
โ””โ”€โ”€ No (real-time, best-effort)
    โ”œโ”€โ”€ Running in Redis Cluster?
    โ”‚   โ”œโ”€โ”€ Yes โ†’ Sharded Pub/Sub (SSUBSCRIBE/SPUBLISH)
    โ”‚   โ””โ”€โ”€ No โ†’ Standard Pub/Sub
    โ””โ”€โ”€ High number of pattern subscriptions (> 100)?
        โ†’ Be careful: PSUBSCRIBE at scale makes PUBLISH O(N_patterns)
        โ†’ Consider converting patterns to explicit channel subscriptions

28.7 Production Operations Guide

28.7.1 Connection Architecture

# Production pattern: dedicated Pub/Sub connections
import redis
import threading
from typing import Callable, Dict

class PubSubManager:
    """Manages dedicated Pub/Sub connections separate from command connections."""

    def __init__(self, redis_url: str):
        # Command connection (for PUBLISH)
        self._cmd = redis.from_url(redis_url)
        # Dedicated subscriber connection
        self._pubsub = self._cmd.pubsub()
        self._listener_thread = None
        self._handlers: Dict[str, Callable] = {}

    def subscribe(self, channel: str, handler: Callable):
        self._handlers[channel] = handler
        self._pubsub.subscribe(**{channel: self._dispatch})
        if not self._listener_thread:
            self._listener_thread = self._pubsub.run_in_thread(
                sleep_time=0.001,
                daemon=True
            )

    def _dispatch(self, message):
        if message["type"] in ("message", "pmessage"):
            channel = message["channel"]
            if isinstance(channel, bytes):
                channel = channel.decode()
            handler = self._handlers.get(channel)
            if handler:
                handler(message["data"])

    def publish(self, channel: str, data: str) -> int:
        return self._cmd.publish(channel, data)

    def unsubscribe(self, *channels):
        self._pubsub.unsubscribe(*channels)
        for ch in channels:
            self._handlers.pop(ch, None)

    def close(self):
        if self._listener_thread:
            self._listener_thread.stop()
        self._pubsub.close()
        self._cmd.close()

28.7.2 Keepalive Strategy

Pub/Sub connections are long-lived and at risk of silent termination by NAT devices, load balancers, or firewalls that time out idle connections:

# Server-side TCP keepalive
tcp-keepalive 60    # Send TCP keepalive probes every 60 seconds
# Client-side PING (redis-py handles PONG automatically in run_in_thread)
# For manual ping:
pubsub.ping()
# Expected response: {'type': 'pong', 'data': b''}

# Reconnection handling
class ResilientPubSub:
    def reconnect(self):
        try:
            self._pubsub.close()
        except:
            pass
        self._pubsub = self._cmd.pubsub()
        # Re-subscribe to all previous channels
        for channel, handler in self._handlers.items():
            self._pubsub.subscribe(**{channel: self._dispatch})

28.7.3 Monitoring Pub/Sub Health

# How many channels have active subscribers?
PUBSUB CHANNELS
# Returns list of channel names

# Filter by pattern
PUBSUB CHANNELS user:*

# Subscriber count per channel
PUBSUB NUMSUB news sports chat
# Returns: news 45, sports 12, chat 8

# Number of active pattern subscriptions
PUBSUB NUMPAT
# Returns: 3

# Sharded pub/sub monitoring (Redis 7+)
PUBSUB SHARDCHANNELS
PUBSUB SHARDNUMSUB {user:1}:events {user:2}:events

# INFO stats
redis-cli INFO stats | grep pubsub
# pubsub_channels:5
# pubsub_patterns:2
# pubsub_shardchannels:10  (Redis 7+)

28.7.4 Message Serialization

Pub/Sub payloads are byte strings. Choose a serialization format appropriate to your latency and debuggability needs:

import json
import msgpack
from dataclasses import dataclass, asdict

@dataclass
class UserEvent:
    event_type: str
    user_id: int
    timestamp: float

# JSON โ€” human readable, larger payload
event = UserEvent("login", 1000, 1716000000.0)
r.publish("user:events", json.dumps(asdict(event)))

# MessagePack โ€” compact binary, ~50% smaller than JSON
r.publish("user:events", msgpack.packb(asdict(event), use_bin_type=True))

# For simple string events (minimal overhead)
r.publish(f"user:{user_id}:events", f"login:{timestamp}")

28.8 Summary

Redis Pub/Sub delivers real-time, best-effort messaging with extremely low latency. Standard Pub/Sub excels in standalone and master-replica deployments. In Redis Cluster, standard PUBLISH suffers from O(N_nodes) broadcast costโ€”Redis 7's Sharded Pub/Sub solves this by routing channels to specific slots, reducing cluster communication to O(1).

Key takeaways:

Rate this chapter
4.5  / 5  (3 ratings)

๐Ÿ’ฌ Comments