Chapter 23
Cluster Gossip Source: Node State Machine and Message Propagation
Chapter 23 — Cluster Gossip Source Code: Node State Machine and Message Propagation
Redis Cluster uses the Gossip protocol for decentralized node discovery and failure detection. This chapter dives deep into the Gossip implementation in cluster.c, analyzing the node state machine transitions, PING/PONG message construction, failure broadcasting, and slot information propagation from source code level.
23.1 Cluster Data Structure Overview
clusterState: Global Cluster State
// cluster.h
typedef struct clusterState {
clusterNode *myself; // Pointer to this node's own descriptor
uint64_t currentEpoch; // Current epoch (globally monotonically increasing)
int state; // CLUSTER_OK or CLUSTER_FAIL
int size; // Number of master nodes with assigned slots
dict *nodes; // name → clusterNode* mapping (all known nodes)
dict *nodes_black_list; // Blacklist (prevents readmitting recently removed nodes)
clusterNode *migrating_slots_to[CLUSTER_SLOTS]; // Slots being migrated out
clusterNode *importing_slots_from[CLUSTER_SLOTS]; // Slots being imported
clusterNode *slots[CLUSTER_SLOTS]; // Slot → master node mapping (16384 entries)
uint64_t slots_keys_count[CLUSTER_SLOTS]; // Key count per slot
rax *slots_to_keys; // Slot → key list (Radix tree)
// Failover state
mstime_t failover_auth_time; // Time of last election initiation
int failover_auth_count; // Votes received so far
int failover_auth_sent; // Whether election request was sent
int failover_auth_rank; // This slave's rank among candidates
uint64_t failover_auth_epoch; // Epoch of the ongoing election
int cant_failover_reason; // Reason code for why failover cannot proceed
// Statistics
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
} clusterState;
clusterNode: Node Descriptor
typedef struct clusterNode {
mstime_t ctime; // Node creation time
char name[CLUSTER_NAMELEN]; // 40-byte hex node ID
int flags; // Node flag bits (see state machine section)
uint64_t configEpoch; // Config epoch (version number of slot ownership)
unsigned char slots[CLUSTER_SLOTS/8]; // Bitmap of slots this node serves (2KB)
int numslots; // Number of slots served
int numslaves; // Number of replica nodes
struct clusterNode **slaves; // Array of replica node pointers
struct clusterNode *slaveof; // If replica, points to the master
unsigned long long repl_offset; // Replication offset (for replica migration decisions)
mstime_t ping_sent; // Timestamp of last PING sent to this node
mstime_t pong_received; // Timestamp of last PONG received from this node
mstime_t data_received; // Timestamp of last data received from this node
mstime_t fail_time; // Timestamp when FAIL flag was set
mstime_t voted_time; // Timestamp of last vote cast
mstime_t repl_offset_time; // Timestamp when repl_offset was last updated
mstime_t orphaned_time; // Timestamp when node became an orphaned master
char ip[NET_IP_STR_LEN]; // Node IP address
int port; // Client port
int pport; // TLS port
int cport; // Cluster bus port (port + 10000)
clusterLink *link; // TCP connection for cluster bus messages
list *fail_reports; // Nodes that reported this node as PFAIL
} clusterNode;
Node Flag Bits
#define CLUSTER_NODE_MASTER (1<<0) // This node is a master
#define CLUSTER_NODE_SLAVE (1<<1) // This node is a replica
#define CLUSTER_NODE_PFAIL (1<<2) // Possible failure (this node's local assessment)
#define CLUSTER_NODE_FAIL (1<<3) // Confirmed failure (agreed by quorum)
#define CLUSTER_NODE_MYSELF (1<<4) // Represents ourself
#define CLUSTER_NODE_HANDSHAKE (1<<5) // Handshake in progress
#define CLUSTER_NODE_NOADDR (1<<6) // Address unknown
#define CLUSTER_NODE_MEET (1<<7) // Should send MEET on next iteration
#define CLUSTER_NODE_MIGRATE_TO (1<<8) // This node is a slot migration target
#define CLUSTER_NODE_NOFAILOVER (1<<9) // Auto-failover disabled (slave-no-failover)
23.2 clusterCron: The Gossip Heartbeat Timer
clusterCron is called by serverCron every 100ms and is the core driver of the Gossip protocol:
void clusterCron(void) {
static long long iteration = 0;
iteration++; // Used to stagger subtasks at different frequencies
// ─── Task 1: Handle timed-out nodes ───
mstime_t now = mstime();
mstime_t handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;
int orphaned_masters = 0, max_slaves = 0, this_slaves = 0;
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) continue;
// Remove nodes stuck in handshake timeout
if (nodeInHandshake(node) &&
now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}
// Ensure we have a connection; send MEET if not
if (node->link == NULL) {
clusterLink *link = createClusterLink(node);
clusterSendPing(link, CLUSTERMSG_TYPE_MEET);
continue;
}
// ─── PFAIL detection ───
// Not yet PING'd but pong_received is stale → send PING
if (node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout) {
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// PING has been outstanding for > timeout/2 without a PONG → PFAIL
if (node->ping_sent != 0 &&
(now - node->ping_sent) > server.cluster_node_timeout / 2) {
clusterSetNodeAsFailing(node); // Sets CLUSTER_NODE_PFAIL
}
// Orphaned master detection (for replica migration)
if (nodeIsMaster(node) && node->numslots > 0) {
if (node->numslaves == 0) orphaned_masters++;
if (node->numslaves > max_slaves) max_slaves = node->numslaves;
if (myself->slaveof == node) this_slaves = node->numslaves;
}
}
dictReleaseIterator(di);
// ─── Task 2: Random PING to ensure Gossip propagation ───
// At least once per second, PING a random node (even if not timed out)
if (!(iteration % 10)) {
clusterNode *minpong_node = NULL;
mstime_t minpong = 0;
for (int j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (minpong == 0 || this->pong_received < minpong) {
minpong = this->pong_received;
minpong_node = this;
}
}
if (minpong_node) {
clusterSendPing(minpong_node->link, CLUSTERMSG_TYPE_PING);
}
}
// ─── Task 3: Replica migration for orphaned masters ───
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) {
clusterHandleSlaveMigration(max_slaves, this_slaves);
}
// ─── Task 4: Check whether this slave should initiate failover ───
if (nodeIsSlave(myself)) {
clusterHandleSlaveFailover();
}
// ─── Task 5: Update our own state (slots, repl_offset) ───
clusterUpdateState();
}
23.3 Building a PING Message: clusterSendPing
void clusterSendPing(clusterLink *link, int type) {
// 1. Determine how many Gossip entries to include
// Target: max(cluster_size / 10, 3) entries, plus all PFAIL nodes
int freshnodes = dictSize(server.cluster->nodes) - 2;
int wanted = floor(dictSize(server.cluster->nodes) / 10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
int pfail_wanted = server.cluster->stats_pfail_nodes;
// 2. Compute message size
int totlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataGossip) * (wanted + pfail_wanted);
clusterMsg *hdr = zmalloc(totlen);
memset(hdr, 0, sizeof(*hdr));
// 3. Fill in the message header
clusterBuildMessageHdr(hdr, type);
// 4. Select and fill Gossip entries (random nodes)
int gossipcount = 0;
while (gossipcount < wanted) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
if (this == myself) continue;
if (this->flags & CLUSTER_NODE_HANDSHAKE) continue;
if (this->flags & CLUSTER_NODE_SELECTED) continue; // No duplicates
clusterMsgDataGossip *gossip =
&(hdr->data.ping.gossip[gossipcount]);
memcpy(gossip->nodename, this->name, CLUSTER_NAMELEN);
gossip->ping_sent = htonl(this->ping_sent / 1000);
gossip->pong_received = htonl(this->pong_received / 1000);
memcpy(gossip->ip, this->ip, sizeof(this->ip));
gossip->port = htons(this->port);
gossip->cport = htons(this->cport);
gossip->flags = htons(this->flags);
this->flags |= CLUSTER_NODE_SELECTED;
gossipcount++;
}
// 5. Force-include PFAIL nodes (ensure failure info propagates)
if (pfail_wanted) {
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *this = dictGetVal(de);
if (this->flags & CLUSTER_NODE_PFAIL) {
// Fill the gossip entry for this pfail node
// ... (same structure as above)
pfail_wanted--;
gossipcount++;
}
}
dictReleaseIterator(di);
}
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
clusterSendMessage(link, (unsigned char*)hdr, totlen);
zfree(hdr);
}
The clusterMsg Header
typedef struct {
char sig[4]; // "RCmb" (Redis Cluster message bus)
uint32_t totlen; // Total message length in bytes
uint16_t ver; // Protocol version (currently 1)
uint16_t port; // Sender's client port
uint16_t type; // Message type (PING/PONG/MEET/FAIL/...)
uint16_t count; // Number of gossip entries
uint64_t currentEpoch; // Sender's current epoch
uint64_t configEpoch; // Sender's config epoch (for masters)
uint64_t offset; // Replication offset (for slaves)
char sender[CLUSTER_NAMELEN]; // Sender's node name (40-char hex ID)
unsigned char myslots[CLUSTER_SLOTS/8]; // Sender's slot bitmap (2KB)
char slaveof[CLUSTER_NAMELEN]; // Master's name if this is a replica
char myip[NET_IP_STR_LEN]; // Sender's IP
char notused1[34];
uint16_t pport; // TLS port
uint16_t cport; // Cluster bus port
uint16_t flags; // Sender's flags
unsigned char state; // Cluster state (CLUSTER_OK/CLUSTER_FAIL)
unsigned char mflags[3]; // Message flags
union clusterMsgData data; // Message body (varies by type)
} clusterMsg;
23.4 Receiving PING/PONG: clusterProcessGossipSection
int clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
clusterNode *sender = link->node
? link->node
: clusterLookupNode(hdr->sender);
while (count--) {
uint16_t flags = ntohs(g->flags);
// 1. Look up the node locally
clusterNode *node = clusterLookupNode(g->nodename);
if (node) {
if (sender && nodeIsMaster(sender) && node != myself) {
if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) {
// 2. This Gossip entry reports the node as failing
// Add to fail_reports list
if (clusterNodeAddFailureReport(node, sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
// 3. Check whether we now have quorum to mark FAIL
markNodeAsFailingIfNeeded(node);
} else {
// Gossip says node is reachable → clear sender's fail report
if (clusterNodeDelFailureReport(node, sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
} else {
// 4. Unknown node — try to initiate handshake
if (!(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename)) {
clusterStartHandshake(g->ip, ntohs(g->port), ntohs(g->cport));
}
}
g++;
}
return 1;
}
markNodeAsFailingIfNeeded: FAIL Determination Logic
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
int needed_quorum = (server.cluster->size / 2) + 1;
// Must be in PFAIL state from our perspective
if (!nodeTimedOut(node)) return;
// Already in FAIL state
if (nodeFailed(node)) return;
// Count valid (non-expired) failure reports
failures = clusterNodeFailureReportsCount(node);
// Our own vote counts if we are a master
if (nodeIsMaster(myself)) failures++;
if (failures < needed_quorum) return; // Not enough agreement yet
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
// Transition: PFAIL → FAIL
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
// Broadcast FAIL immediately (don't wait for next PING round)
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}
23.5 The Node State Machine in Detail
State Transition Diagram
┌──────────────────────────────────────┐
│ NORMAL │
│ (no PFAIL or FAIL flag set) │
└────────────────┬─────────────────────┘
│
This node exceeds cluster-node-timeout
without any response from target node
│
▼
┌──────────────────────────────────────┐
│ PFAIL │
│ (CLUSTER_NODE_PFAIL bit set) │
│ Local assessment only, not broadcast │
└────────────────┬─────────────────────┘
│
fail_reports >= quorum
(majority of masters agree)
│
▼
┌──────────────────────────────────────┐
│ FAIL │
│ (CLUSTER_NODE_FAIL bit set) │
│ FAIL message broadcast to all nodes │
│ Triggers replica election │
└────────────────┬─────────────────────┘
│
┌────────────────────────┼───────────────────────┐
│ │ │
Failover completes Auto-timeout clears Manual CLUSTER RESET
New master broadcasts FAIL flag (admin operation)
UPDATE message (FAIL age > timeout*2)
│ │
▼ ▼
NORMAL NORMAL
Automatic FAIL State Recovery
// In clusterCron: automatically clear stale FAIL flags
if (nodeFailed(node)) {
mstime_t now = mstime();
// If FAIL has persisted beyond the expiration window and nobody
// has taken over the slots, clear the flag to allow retry
if (now - node->fail_time >
(server.cluster_node_timeout * CLUSTER_FAIL_EXPIRATION_FACTOR)) {
node->flags &= ~CLUSTER_NODE_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE |
CLUSTER_TODO_SAVE_CONFIG);
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: "
"is reachable again and nobody is serving its slots after some time.",
node->name);
}
}
23.6 Failover: Replica Election
void clusterHandleSlaveFailover(void) {
mstime_t now = mstime();
// Prerequisites
if (nodeIsMaster(myself)) return;
if (myself->slaveof == NULL) return;
if (!nodeFailed(myself->slaveof)) return; // Master must be in FAIL state
// Calculate election delay to prevent split-brain
// Replicas with larger repl_offset (more data) get shorter delays
mstime_t auth_timeout = server.cluster_node_timeout * 2;
mstime_t auth_retry_time = auth_timeout * 2;
if (server.cluster->failover_auth_time == 0) {
// First time: schedule the election
server.cluster->failover_auth_time = now +
500 + // Fixed 500ms wait (time for FAIL message to propagate)
random() % 500; // Random jitter to reduce simultaneous elections
// Priority delay based on replication offset rank
// Rank 0 = most up-to-date = shortest delay
server.cluster->failover_auth_rank = clusterGetSlaveRank();
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
return;
}
// Not yet time
if (now < server.cluster->failover_auth_time) return;
// Election timed out without enough votes — reset for retry
if (now > server.cluster->failover_auth_time + auth_retry_time) {
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_sent = 0;
return;
}
// Broadcast FAILOVER_AUTH_REQUEST to all master nodes
if (!server.cluster->failover_auth_sent) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,
"Starting a failover election for epoch %llu.",
(unsigned long long)server.cluster->currentEpoch);
clusterBroadcastMessage(NULL, CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
server.cluster->failover_auth_sent = 1;
return;
}
// Check if we have won (collected quorum of votes)
if (server.cluster->failover_auth_count >=
server.cluster->size / 2 + 1) {
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
clusterFailoverReplaceYourMaster();
}
}
How Master Nodes Cast Votes
// Handling FAILOVER_AUTH_REQUEST on a master node
void clusterSendFailoverAuth(clusterNode *node) {
// Verify eligibility to vote:
// 1. We are a master
// 2. We haven't voted for this epoch yet
// 3. The requesting slave's master is in FAIL state
// 4. The requesting slave serves the expected slots
if (!nodeIsMaster(myself)) return;
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) return;
// Cast vote (send FAILOVER_AUTH_ACK)
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->voted_time = mstime();
clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
clusterSendMessage(node->link, (unsigned char*)hdr, ntohl(hdr->totlen));
serverLog(LL_WARNING,
"Sending FAILOVER_AUTH_ACK to %.40s", node->name);
}
23.7 Slot Information Propagation: UPDATE Messages
After a failover completes, the new master must notify the cluster to update routing tables:
// Send UPDATE message to a specific node
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_UPDATE);
memcpy(hdr->data.update.nodecfg.nodename, node->name, CLUSTER_NAMELEN);
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
memcpy(hdr->data.update.nodecfg.slots, node->slots, sizeof(node->slots));
clusterSendMessage(link, (unsigned char*)hdr, ntohl(hdr->totlen));
}
// Process incoming UPDATE message
void clusterProcessUpdateMessage(clusterMsg *hdr, clusterLink *link) {
clusterNode *node = clusterLookupNode(hdr->data.update.nodecfg.nodename);
uint64_t reportedConfigEpoch =
ntohu64(hdr->data.update.nodecfg.configEpoch);
// Only accept updates with a higher epoch (prevents stale messages from
// overriding newer state)
if (reportedConfigEpoch <= node->configEpoch) return;
// Update local routing table: reassign the affected slots
clusterUpdateSlotsConfigWith(node, reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_UPDATE_STATE |
CLUSTER_TODO_FSYNC_CONFIG);
}
23.8 Complete Message Type Reference
// cluster.h — all Gossip bus message types
#define CLUSTERMSG_TYPE_PING 0 // PING (with Gossip payload)
#define CLUSTERMSG_TYPE_PONG 1 // PONG (response to PING/MEET)
#define CLUSTERMSG_TYPE_MEET 2 // MEET (node joining the cluster)
#define CLUSTERMSG_TYPE_FAIL 3 // FAIL (failure broadcast)
#define CLUSTERMSG_TYPE_PUBLISH 4 // PUBLISH (Pub/Sub in cluster mode)
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 // Replica requests votes
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 // Master casts a vote
#define CLUSTERMSG_TYPE_UPDATE 7 // Config update (routing table change)
#define CLUSTERMSG_TYPE_MFSTART 8 // Manual failover start
#define CLUSTERMSG_TYPE_MODULE 9 // Module-defined message
#define CLUSTERMSG_TYPE_PUBLISHSHARD 10 // Sharded Pub/Sub (Redis 7.0)
23.9 Cluster Operations Reference
# View cluster health
redis-cli -h 127.0.0.1 -p 7001 cluster info
# List all known nodes
redis-cli -h 127.0.0.1 -p 7001 cluster nodes
# View slot distribution
redis-cli -h 127.0.0.1 -p 7001 cluster slots
# Initiate manual failover (run on the target replica)
redis-cli -h 127.0.0.1 -p 7002 cluster failover
# Force failover (even if master is reachable — e.g., for planned maintenance)
redis-cli -h 127.0.0.1 -p 7002 cluster failover force
# Add a new node to the cluster
redis-cli -h 127.0.0.1 -p 7001 cluster meet 127.0.0.1 7006
# Reset a node (dangerous — removes all slot assignments)
redis-cli -h 127.0.0.1 -p 7006 cluster reset hard
# Debug: dump the cluster configuration to a file
redis-cli -h 127.0.0.1 -p 7001 cluster saveconfig
Gossip Performance Parameters
| Parameter | Default | Description |
|---|---|---|
| cluster-node-timeout | 15000ms | PFAIL timeout threshold |
| cluster-announce-ip | (none) | IP to announce to the cluster (for NAT) |
| cluster-announce-port | 0 | Client port to announce |
| cluster-announce-bus-port | 0 | Cluster bus port to announce |
| cluster-migration-barrier | 1 | Minimum replicas a master must keep before donating one |
| cluster-allow-reads-when-down | no | Allow read commands when cluster is in FAIL state |
| cluster-link-sendbuf-limit | 0 | Per-link send buffer size limit (0 = unlimited) |
Measured Gossip Overhead
- PING message size (30-node cluster, 3 gossip entries): ~400 bytes
- PING frequency per node: ~1 PING per
cluster-node-timeout/2for timed-out nodes + 1 random PING per second - Failure detection latency (best case):
cluster-node-timeout/2(PFAIL) +cluster-node-timeout(quorum confirmation) + ~500ms (election jitter) = ~23 seconds with default settings - Total election time (typical): 5–10 seconds with a 15-second timeout setting
- Gossip convergence (information spread to all N nodes): O(log N) rounds × 100ms ≈ 1–3 seconds for typical 100-node clusters
Chapter Summary
clusterStateandclusterNodefully represent cluster state; the 16384-entryslotsarray is the routing coreclusterCron(every 100ms) drives Gossip: timeout detection, random PING, orphaned master handling, slave failover checks- PING messages carry Gossip entries for
max(cluster_size/10, 3)random nodes, with PFAIL nodes always included - The PFAIL → FAIL transition requires quorum (majority of masters agree), preventing false positives from network partitions
- FAIL messages are broadcast immediately upon confirmation — not deferred to the next PING round
- Replica election uses
FAILOVER_AUTH_REQUEST/ACKmessages; the replica with the largest replication offset wins by getting a shorter election delay UPDATEmessages propagate new routing information whenever the config epoch increases; receivers only accept updates with a strictly larger epoch- The entire Gossip system is fully decentralized with no single point of coordination; information propagation time for N nodes is O(log N)