Chapter 23

Cluster Gossip Source: Node State Machine and Message Propagation

Chapter 23 — Cluster Gossip Source Code: Node State Machine and Message Propagation

Redis Cluster uses the Gossip protocol for decentralized node discovery and failure detection. This chapter dives deep into the Gossip implementation in cluster.c, analyzing the node state machine transitions, PING/PONG message construction, failure broadcasting, and slot information propagation from source code level.


23.1 Cluster Data Structure Overview

clusterState: Global Cluster State

// cluster.h
typedef struct clusterState {
    clusterNode *myself;        // Pointer to this node's own descriptor
    uint64_t currentEpoch;      // Current epoch (globally monotonically increasing)
    int state;                  // CLUSTER_OK or CLUSTER_FAIL
    int size;                   // Number of master nodes with assigned slots
    dict *nodes;                // name → clusterNode* mapping (all known nodes)
    dict *nodes_black_list;     // Blacklist (prevents readmitting recently removed nodes)
    clusterNode *migrating_slots_to[CLUSTER_SLOTS];   // Slots being migrated out
    clusterNode *importing_slots_from[CLUSTER_SLOTS]; // Slots being imported
    clusterNode *slots[CLUSTER_SLOTS]; // Slot → master node mapping (16384 entries)
    uint64_t slots_keys_count[CLUSTER_SLOTS]; // Key count per slot
    rax *slots_to_keys;         // Slot → key list (Radix tree)
    // Failover state
    mstime_t failover_auth_time;    // Time of last election initiation
    int failover_auth_count;        // Votes received so far
    int failover_auth_sent;         // Whether election request was sent
    int failover_auth_rank;         // This slave's rank among candidates
    uint64_t failover_auth_epoch;   // Epoch of the ongoing election
    int cant_failover_reason;       // Reason code for why failover cannot proceed
    // Statistics
    long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
    long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
} clusterState;

clusterNode: Node Descriptor

typedef struct clusterNode {
    mstime_t ctime;             // Node creation time
    char name[CLUSTER_NAMELEN]; // 40-byte hex node ID
    int flags;                  // Node flag bits (see state machine section)
    uint64_t configEpoch;       // Config epoch (version number of slot ownership)
    unsigned char slots[CLUSTER_SLOTS/8]; // Bitmap of slots this node serves (2KB)
    int numslots;               // Number of slots served
    int numslaves;              // Number of replica nodes
    struct clusterNode **slaves; // Array of replica node pointers
    struct clusterNode *slaveof; // If replica, points to the master
    unsigned long long repl_offset; // Replication offset (for replica migration decisions)
    mstime_t ping_sent;         // Timestamp of last PING sent to this node
    mstime_t pong_received;     // Timestamp of last PONG received from this node
    mstime_t data_received;     // Timestamp of last data received from this node
    mstime_t fail_time;         // Timestamp when FAIL flag was set
    mstime_t voted_time;        // Timestamp of last vote cast
    mstime_t repl_offset_time;  // Timestamp when repl_offset was last updated
    mstime_t orphaned_time;     // Timestamp when node became an orphaned master
    char ip[NET_IP_STR_LEN];    // Node IP address
    int port;                   // Client port
    int pport;                  // TLS port
    int cport;                  // Cluster bus port (port + 10000)
    clusterLink *link;          // TCP connection for cluster bus messages
    list *fail_reports;         // Nodes that reported this node as PFAIL
} clusterNode;

Node Flag Bits

#define CLUSTER_NODE_MASTER      (1<<0)  // This node is a master
#define CLUSTER_NODE_SLAVE       (1<<1)  // This node is a replica
#define CLUSTER_NODE_PFAIL       (1<<2)  // Possible failure (this node's local assessment)
#define CLUSTER_NODE_FAIL        (1<<3)  // Confirmed failure (agreed by quorum)
#define CLUSTER_NODE_MYSELF      (1<<4)  // Represents ourself
#define CLUSTER_NODE_HANDSHAKE   (1<<5)  // Handshake in progress
#define CLUSTER_NODE_NOADDR      (1<<6)  // Address unknown
#define CLUSTER_NODE_MEET        (1<<7)  // Should send MEET on next iteration
#define CLUSTER_NODE_MIGRATE_TO  (1<<8)  // This node is a slot migration target
#define CLUSTER_NODE_NOFAILOVER  (1<<9)  // Auto-failover disabled (slave-no-failover)

23.2 clusterCron: The Gossip Heartbeat Timer

clusterCron is called by serverCron every 100ms and is the core driver of the Gossip protocol:

void clusterCron(void) {
    static long long iteration = 0;
    iteration++;  // Used to stagger subtasks at different frequencies

    // ─── Task 1: Handle timed-out nodes ───
    mstime_t now = mstime();
    mstime_t handshake_timeout = server.cluster_node_timeout;
    if (handshake_timeout < 1000) handshake_timeout = 1000;

    dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
    dictEntry *de;
    int orphaned_masters = 0, max_slaves = 0, this_slaves = 0;

    while ((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) continue;

        // Remove nodes stuck in handshake timeout
        if (nodeInHandshake(node) &&
            now - node->ctime > handshake_timeout) {
            clusterDelNode(node);
            continue;
        }

        // Ensure we have a connection; send MEET if not
        if (node->link == NULL) {
            clusterLink *link = createClusterLink(node);
            clusterSendPing(link, CLUSTERMSG_TYPE_MEET);
            continue;
        }

        // ─── PFAIL detection ───
        // Not yet PING'd but pong_received is stale → send PING
        if (node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout) {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        // PING has been outstanding for > timeout/2 without a PONG → PFAIL
        if (node->ping_sent != 0 &&
            (now - node->ping_sent) > server.cluster_node_timeout / 2) {
            clusterSetNodeAsFailing(node);  // Sets CLUSTER_NODE_PFAIL
        }

        // Orphaned master detection (for replica migration)
        if (nodeIsMaster(node) && node->numslots > 0) {
            if (node->numslaves == 0) orphaned_masters++;
            if (node->numslaves > max_slaves) max_slaves = node->numslaves;
            if (myself->slaveof == node) this_slaves = node->numslaves;
        }
    }
    dictReleaseIterator(di);

    // ─── Task 2: Random PING to ensure Gossip propagation ───
    // At least once per second, PING a random node (even if not timed out)
    if (!(iteration % 10)) {
        clusterNode *minpong_node = NULL;
        mstime_t minpong = 0;
        for (int j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
            if (minpong == 0 || this->pong_received < minpong) {
                minpong = this->pong_received;
                minpong_node = this;
            }
        }
        if (minpong_node) {
            clusterSendPing(minpong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }

    // ─── Task 3: Replica migration for orphaned masters ───
    if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) {
        clusterHandleSlaveMigration(max_slaves, this_slaves);
    }

    // ─── Task 4: Check whether this slave should initiate failover ───
    if (nodeIsSlave(myself)) {
        clusterHandleSlaveFailover();
    }

    // ─── Task 5: Update our own state (slots, repl_offset) ───
    clusterUpdateState();
}

23.3 Building a PING Message: clusterSendPing

void clusterSendPing(clusterLink *link, int type) {
    // 1. Determine how many Gossip entries to include
    //    Target: max(cluster_size / 10, 3) entries, plus all PFAIL nodes
    int freshnodes = dictSize(server.cluster->nodes) - 2;
    int wanted = floor(dictSize(server.cluster->nodes) / 10);
    if (wanted < 3) wanted = 3;
    if (wanted > freshnodes) wanted = freshnodes;

    int pfail_wanted = server.cluster->stats_pfail_nodes;

    // 2. Compute message size
    int totlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
    totlen += sizeof(clusterMsgDataGossip) * (wanted + pfail_wanted);

    clusterMsg *hdr = zmalloc(totlen);
    memset(hdr, 0, sizeof(*hdr));

    // 3. Fill in the message header
    clusterBuildMessageHdr(hdr, type);

    // 4. Select and fill Gossip entries (random nodes)
    int gossipcount = 0;
    while (gossipcount < wanted) {
        de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);
        if (this == myself) continue;
        if (this->flags & CLUSTER_NODE_HANDSHAKE) continue;
        if (this->flags & CLUSTER_NODE_SELECTED) continue;  // No duplicates

        clusterMsgDataGossip *gossip =
            &(hdr->data.ping.gossip[gossipcount]);
        memcpy(gossip->nodename, this->name, CLUSTER_NAMELEN);
        gossip->ping_sent     = htonl(this->ping_sent / 1000);
        gossip->pong_received = htonl(this->pong_received / 1000);
        memcpy(gossip->ip, this->ip, sizeof(this->ip));
        gossip->port  = htons(this->port);
        gossip->cport = htons(this->cport);
        gossip->flags = htons(this->flags);

        this->flags |= CLUSTER_NODE_SELECTED;
        gossipcount++;
    }

    // 5. Force-include PFAIL nodes (ensure failure info propagates)
    if (pfail_wanted) {
        dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
        while ((de = dictNext(di)) != NULL && pfail_wanted > 0) {
            clusterNode *this = dictGetVal(de);
            if (this->flags & CLUSTER_NODE_PFAIL) {
                // Fill the gossip entry for this pfail node
                // ... (same structure as above)
                pfail_wanted--;
                gossipcount++;
            }
        }
        dictReleaseIterator(di);
    }

    hdr->count  = htons(gossipcount);
    hdr->totlen = htonl(totlen);
    clusterSendMessage(link, (unsigned char*)hdr, totlen);
    zfree(hdr);
}

The clusterMsg Header

typedef struct {
    char sig[4];                    // "RCmb" (Redis Cluster message bus)
    uint32_t totlen;                // Total message length in bytes
    uint16_t ver;                   // Protocol version (currently 1)
    uint16_t port;                  // Sender's client port
    uint16_t type;                  // Message type (PING/PONG/MEET/FAIL/...)
    uint16_t count;                 // Number of gossip entries
    uint64_t currentEpoch;          // Sender's current epoch
    uint64_t configEpoch;           // Sender's config epoch (for masters)
    uint64_t offset;                // Replication offset (for slaves)
    char sender[CLUSTER_NAMELEN];   // Sender's node name (40-char hex ID)
    unsigned char myslots[CLUSTER_SLOTS/8]; // Sender's slot bitmap (2KB)
    char slaveof[CLUSTER_NAMELEN];  // Master's name if this is a replica
    char myip[NET_IP_STR_LEN];      // Sender's IP
    char notused1[34];
    uint16_t pport;                 // TLS port
    uint16_t cport;                 // Cluster bus port
    uint16_t flags;                 // Sender's flags
    unsigned char state;            // Cluster state (CLUSTER_OK/CLUSTER_FAIL)
    unsigned char mflags[3];        // Message flags
    union clusterMsgData data;      // Message body (varies by type)
} clusterMsg;

23.4 Receiving PING/PONG: clusterProcessGossipSection

int clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    clusterNode *sender = link->node
                        ? link->node
                        : clusterLookupNode(hdr->sender);

    while (count--) {
        uint16_t flags = ntohs(g->flags);

        // 1. Look up the node locally
        clusterNode *node = clusterLookupNode(g->nodename);

        if (node) {
            if (sender && nodeIsMaster(sender) && node != myself) {
                if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) {
                    // 2. This Gossip entry reports the node as failing
                    //    Add to fail_reports list
                    if (clusterNodeAddFailureReport(node, sender)) {
                        serverLog(LL_VERBOSE,
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name);
                    }
                    // 3. Check whether we now have quorum to mark FAIL
                    markNodeAsFailingIfNeeded(node);
                } else {
                    // Gossip says node is reachable → clear sender's fail report
                    if (clusterNodeDelFailureReport(node, sender)) {
                        serverLog(LL_VERBOSE,
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);
                    }
                }
            }
        } else {
            // 4. Unknown node — try to initiate handshake
            if (!(flags & CLUSTER_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename)) {
                clusterStartHandshake(g->ip, ntohs(g->port), ntohs(g->cport));
            }
        }
        g++;
    }
    return 1;
}

markNodeAsFailingIfNeeded: FAIL Determination Logic

void markNodeAsFailingIfNeeded(clusterNode *node) {
    int failures;
    int needed_quorum = (server.cluster->size / 2) + 1;

    // Must be in PFAIL state from our perspective
    if (!nodeTimedOut(node)) return;
    // Already in FAIL state
    if (nodeFailed(node)) return;

    // Count valid (non-expired) failure reports
    failures = clusterNodeFailureReportsCount(node);

    // Our own vote counts if we are a master
    if (nodeIsMaster(myself)) failures++;

    if (failures < needed_quorum) return;  // Not enough agreement yet

    serverLog(LL_NOTICE,
        "Marking node %.40s as failing (quorum reached).", node->name);

    // Transition: PFAIL → FAIL
    node->flags &= ~CLUSTER_NODE_PFAIL;
    node->flags |= CLUSTER_NODE_FAIL;
    node->fail_time = mstime();

    // Broadcast FAIL immediately (don't wait for next PING round)
    if (nodeIsMaster(myself)) clusterSendFail(node->name);
    clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}

23.5 The Node State Machine in Detail

State Transition Diagram

                   ┌──────────────────────────────────────┐
                   │              NORMAL                    │
                   │  (no PFAIL or FAIL flag set)          │
                   └────────────────┬─────────────────────┘
                                    │
                     This node exceeds cluster-node-timeout
                     without any response from target node
                                    │
                                    ▼
                   ┌──────────────────────────────────────┐
                   │              PFAIL                    │
                   │  (CLUSTER_NODE_PFAIL bit set)         │
                   │  Local assessment only, not broadcast  │
                   └────────────────┬─────────────────────┘
                                    │
                     fail_reports >= quorum
                     (majority of masters agree)
                                    │
                                    ▼
                   ┌──────────────────────────────────────┐
                   │              FAIL                     │
                   │  (CLUSTER_NODE_FAIL bit set)          │
                   │  FAIL message broadcast to all nodes  │
                   │  Triggers replica election            │
                   └────────────────┬─────────────────────┘
                                    │
           ┌────────────────────────┼───────────────────────┐
           │                        │                       │
    Failover completes         Auto-timeout clears     Manual CLUSTER RESET
    New master broadcasts      FAIL flag               (admin operation)
    UPDATE message             (FAIL age > timeout*2)
           │                        │
           ▼                        ▼
       NORMAL                   NORMAL

Automatic FAIL State Recovery

// In clusterCron: automatically clear stale FAIL flags
if (nodeFailed(node)) {
    mstime_t now = mstime();
    // If FAIL has persisted beyond the expiration window and nobody
    // has taken over the slots, clear the flag to allow retry
    if (now - node->fail_time >
        (server.cluster_node_timeout * CLUSTER_FAIL_EXPIRATION_FACTOR)) {
        node->flags &= ~CLUSTER_NODE_FAIL;
        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE |
                             CLUSTER_TODO_SAVE_CONFIG);
        serverLog(LL_NOTICE,
            "Clear FAIL state for node %.40s: "
            "is reachable again and nobody is serving its slots after some time.",
            node->name);
    }
}

23.6 Failover: Replica Election

void clusterHandleSlaveFailover(void) {
    mstime_t now = mstime();

    // Prerequisites
    if (nodeIsMaster(myself)) return;
    if (myself->slaveof == NULL) return;
    if (!nodeFailed(myself->slaveof)) return;  // Master must be in FAIL state

    // Calculate election delay to prevent split-brain
    // Replicas with larger repl_offset (more data) get shorter delays
    mstime_t auth_timeout = server.cluster_node_timeout * 2;
    mstime_t auth_retry_time = auth_timeout * 2;

    if (server.cluster->failover_auth_time == 0) {
        // First time: schedule the election
        server.cluster->failover_auth_time = now +
            500 +           // Fixed 500ms wait (time for FAIL message to propagate)
            random() % 500; // Random jitter to reduce simultaneous elections

        // Priority delay based on replication offset rank
        // Rank 0 = most up-to-date = shortest delay
        server.cluster->failover_auth_rank = clusterGetSlaveRank();
        server.cluster->failover_auth_time +=
            server.cluster->failover_auth_rank * 1000;
        return;
    }

    // Not yet time
    if (now < server.cluster->failover_auth_time) return;

    // Election timed out without enough votes — reset for retry
    if (now > server.cluster->failover_auth_time + auth_retry_time) {
        server.cluster->failover_auth_time = 0;
        server.cluster->failover_auth_sent = 0;
        return;
    }

    // Broadcast FAILOVER_AUTH_REQUEST to all master nodes
    if (!server.cluster->failover_auth_sent) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        serverLog(LL_WARNING,
            "Starting a failover election for epoch %llu.",
            (unsigned long long)server.cluster->currentEpoch);
        clusterBroadcastMessage(NULL, CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
        server.cluster->failover_auth_sent = 1;
        return;
    }

    // Check if we have won (collected quorum of votes)
    if (server.cluster->failover_auth_count >=
        server.cluster->size / 2 + 1) {
        serverLog(LL_WARNING,
            "Failover election won: I'm the new master.");
        clusterFailoverReplaceYourMaster();
    }
}

How Master Nodes Cast Votes

// Handling FAILOVER_AUTH_REQUEST on a master node
void clusterSendFailoverAuth(clusterNode *node) {
    // Verify eligibility to vote:
    // 1. We are a master
    // 2. We haven't voted for this epoch yet
    // 3. The requesting slave's master is in FAIL state
    // 4. The requesting slave serves the expected slots

    if (!nodeIsMaster(myself)) return;
    if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) return;

    // Cast vote (send FAILOVER_AUTH_ACK)
    server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
    node->voted_time = mstime();
    clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
    clusterSendMessage(node->link, (unsigned char*)hdr, ntohl(hdr->totlen));
    serverLog(LL_WARNING,
        "Sending FAILOVER_AUTH_ACK to %.40s", node->name);
}

23.7 Slot Information Propagation: UPDATE Messages

After a failover completes, the new master must notify the cluster to update routing tables:

// Send UPDATE message to a specific node
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;

    clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_UPDATE);
    memcpy(hdr->data.update.nodecfg.nodename, node->name, CLUSTER_NAMELEN);
    hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
    memcpy(hdr->data.update.nodecfg.slots, node->slots, sizeof(node->slots));
    clusterSendMessage(link, (unsigned char*)hdr, ntohl(hdr->totlen));
}

// Process incoming UPDATE message
void clusterProcessUpdateMessage(clusterMsg *hdr, clusterLink *link) {
    clusterNode *node = clusterLookupNode(hdr->data.update.nodecfg.nodename);
    uint64_t reportedConfigEpoch =
        ntohu64(hdr->data.update.nodecfg.configEpoch);

    // Only accept updates with a higher epoch (prevents stale messages from
    // overriding newer state)
    if (reportedConfigEpoch <= node->configEpoch) return;

    // Update local routing table: reassign the affected slots
    clusterUpdateSlotsConfigWith(node, reportedConfigEpoch,
                                 hdr->data.update.nodecfg.slots);

    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
                         CLUSTER_TODO_UPDATE_STATE |
                         CLUSTER_TODO_FSYNC_CONFIG);
}

23.8 Complete Message Type Reference

// cluster.h — all Gossip bus message types
#define CLUSTERMSG_TYPE_PING                 0  // PING (with Gossip payload)
#define CLUSTERMSG_TYPE_PONG                 1  // PONG (response to PING/MEET)
#define CLUSTERMSG_TYPE_MEET                 2  // MEET (node joining the cluster)
#define CLUSTERMSG_TYPE_FAIL                 3  // FAIL (failure broadcast)
#define CLUSTERMSG_TYPE_PUBLISH              4  // PUBLISH (Pub/Sub in cluster mode)
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 // Replica requests votes
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK    6  // Master casts a vote
#define CLUSTERMSG_TYPE_UPDATE               7  // Config update (routing table change)
#define CLUSTERMSG_TYPE_MFSTART              8  // Manual failover start
#define CLUSTERMSG_TYPE_MODULE               9  // Module-defined message
#define CLUSTERMSG_TYPE_PUBLISHSHARD         10 // Sharded Pub/Sub (Redis 7.0)

23.9 Cluster Operations Reference

# View cluster health
redis-cli -h 127.0.0.1 -p 7001 cluster info

# List all known nodes
redis-cli -h 127.0.0.1 -p 7001 cluster nodes

# View slot distribution
redis-cli -h 127.0.0.1 -p 7001 cluster slots

# Initiate manual failover (run on the target replica)
redis-cli -h 127.0.0.1 -p 7002 cluster failover

# Force failover (even if master is reachable — e.g., for planned maintenance)
redis-cli -h 127.0.0.1 -p 7002 cluster failover force

# Add a new node to the cluster
redis-cli -h 127.0.0.1 -p 7001 cluster meet 127.0.0.1 7006

# Reset a node (dangerous — removes all slot assignments)
redis-cli -h 127.0.0.1 -p 7006 cluster reset hard

# Debug: dump the cluster configuration to a file
redis-cli -h 127.0.0.1 -p 7001 cluster saveconfig

Gossip Performance Parameters

Parameter Default Description
cluster-node-timeout 15000ms PFAIL timeout threshold
cluster-announce-ip (none) IP to announce to the cluster (for NAT)
cluster-announce-port 0 Client port to announce
cluster-announce-bus-port 0 Cluster bus port to announce
cluster-migration-barrier 1 Minimum replicas a master must keep before donating one
cluster-allow-reads-when-down no Allow read commands when cluster is in FAIL state
cluster-link-sendbuf-limit 0 Per-link send buffer size limit (0 = unlimited)

Measured Gossip Overhead


Chapter Summary

Rate this chapter
4.8  / 5  (7 ratings)

💬 Comments