第 23 章

Cluster Gossip 源码:节点状态机与消息传播

第23章 Cluster Gossip 源码:节点状态机与消息传播

Redis Cluster 使用 Gossip 协议实现去中心化的节点发现和故障检测。本章深入 cluster.c 的 Gossip 实现,分析节点状态机转换、PING/PONG 消息构造、故障广播和槽信息传播的完整源码流程。


23.1 集群数据结构总览

clusterState:集群全局状态

// cluster.h
typedef struct clusterState {
    clusterNode *myself;        // 指向自身节点的指针
    uint64_t currentEpoch;      // 当前纪元(全局单调递增)
    int state;                  // CLUSTER_OK 或 CLUSTER_FAIL
    int size;                   // 集群中有槽的主节点数量
    dict *nodes;                // name → clusterNode* 映射(所有已知节点)
    dict *nodes_black_list;     // 黑名单(防止重复加入)
    clusterNode *migrating_slots_to[CLUSTER_SLOTS];   // 正在迁出的槽
    clusterNode *importing_slots_from[CLUSTER_SLOTS]; // 正在迁入的槽
    clusterNode *slots[CLUSTER_SLOTS]; // 槽 → 主节点映射(16384个)
    uint64_t slots_keys_count[CLUSTER_SLOTS]; // 每个槽的 key 数量
    rax *slots_to_keys;         // 槽 → key 列表(Radix Tree)
    // 故障转移状态
    mstime_t failover_auth_time;    // 上次发起选举的时间
    int failover_auth_count;        // 收到的选票数
    int failover_auth_sent;         // 是否已发起过选举
    int failover_auth_rank;         // 当前从库的排名
    uint64_t failover_auth_epoch;   // 选举纪元
    int cant_failover_reason;       // 无法故障转移的原因码
    // 统计
    long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
    long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
} clusterState;

clusterNode:节点描述符

typedef struct clusterNode {
    mstime_t ctime;             // 节点创建时间
    char name[CLUSTER_NAMELEN]; // 节点名(40字节十六进制 ID)
    int flags;                  // 节点标志位(见下方状态机)
    uint64_t configEpoch;       // 节点的配置纪元(槽所有权版本号)
    unsigned char slots[CLUSTER_SLOTS/8]; // 该节点负责的槽位图(2KB)
    int numslots;               // 负责的槽数量
    int numslaves;              // 从节点数量
    struct clusterNode **slaves; // 从节点数组
    struct clusterNode *slaveof; // 如果是从节点,指向主节点
    unsigned long long repl_offset; // 复制偏移量(从节点迁移决策用)
    mstime_t ping_sent;         // 上次发送 PING 的时间
    mstime_t pong_received;     // 上次收到 PONG 的时间
    mstime_t data_received;     // 上次收到任何数据的时间
    mstime_t fail_time;         // 被标记为 FAIL 的时间
    mstime_t voted_time;        // 上次投票的时间
    mstime_t repl_offset_time;  // repl_offset 更新时间
    mstime_t orphaned_time;     // 成为孤立主节点的时间
    long long repl_offset;      // 已知的复制偏移量
    char ip[NET_IP_STR_LEN];    // 节点 IP
    int port;                   // 节点端口(客户端端口)
    int pport;                  // 节点 TLS 端口
    int cport;                  // 集群通信端口(port + 10000)
    clusterLink *link;          // TCP 连接(用于发送 cluster bus 消息)
    list *fail_reports;         // 报告该节点 PFAIL 的节点列表
} clusterNode;

节点标志位(flags 字段)

#define CLUSTER_NODE_MASTER      (1<<0)  // 主节点
#define CLUSTER_NODE_SLAVE       (1<<1)  // 从节点
#define CLUSTER_NODE_PFAIL       (1<<2)  // 疑似故障(自己判断超时)
#define CLUSTER_NODE_FAIL        (1<<3)  // 确认故障(多节点同意)
#define CLUSTER_NODE_MYSELF      (1<<4)  // 代表自身
#define CLUSTER_NODE_HANDSHAKE   (1<<5)  // 正在进行握手
#define CLUSTER_NODE_NOADDR      (1<<6)  // 地址未知
#define CLUSTER_NODE_MEET        (1<<7)  // 需要发送 MEET
#define CLUSTER_NODE_MIGRATE_TO  (1<<8)  // 是槽迁移的目标
#define CLUSTER_NODE_NOFAILOVER  (1<<9)  // 禁止自动故障转移(slave-no-failover)

23.2 clusterCron:Gossip 心跳定时器

clusterCron 每 100ms 由 serverCron 触发一次,是 Gossip 协议的核心驱动函数:

void clusterCron(void) {
    static long long iteration = 0;
    iteration++;  // 用于控制不同频率的子任务

    // ─── 任务1:处理超时节点 ───
    mstime_t now = mstime();
    mstime_t handshake_timeout = server.cluster_node_timeout;
    if (handshake_timeout < 1000) handshake_timeout = 1000;

    dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
    dictEntry *de;
    int orphaned_masters = 0, max_slaves = 0, this_slaves = 0;

    while ((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) continue;

        // 握手超时:删除节点
        if (nodeInHandshake(node) &&
            now - node->ctime > handshake_timeout) {
            clusterDelNode(node);
            continue;
        }

        // 确保已连接
        if (node->link == NULL) {
            clusterLink *link = createClusterLink(node);
            clusterSendPing(link, CLUSTERMSG_TYPE_MEET);
            continue;
        }

        // ─── PFAIL 检测 ───
        // 超过 cluster-node-timeout 未收到 PONG → 标记 PFAIL
        if (node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout) {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        if (node->ping_sent != 0 &&
            (now - node->ping_sent) > server.cluster_node_timeout / 2) {
            // PING 超过 timeout/2 未收到 PONG → PFAIL
            clusterSetNodeAsFailing(node);  // 设置 CLUSTER_NODE_PFAIL
        }

        // 孤立主节点检测(用于从节点迁移)
        if (nodeIsMaster(node) && node->numslots > 0) {
            if (node->numslaves == 0) {
                orphaned_masters++;
            }
            if (node->numslaves > max_slaves) max_slaves = node->numslaves;
            if (myself->slaveof == node)    this_slaves = node->numslaves;
        }
    }
    dictReleaseIterator(di);

    // ─── 任务2:随机选节点发送 PING(保证 Gossip 传播)───
    // 每秒至少向一个随机节点发 PING
    if (!(iteration % 10)) {
        int j;
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
            // 选出最久未 PING 的随机节点
            if (minpong == 0 || this->pong_received < minpong) {
                minpong = this->pong_received;
                minpong_node = this;
            }
        }
        if (minpong_node) {
            clusterSendPing(minpong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }

    // ─── 任务3:孤立主节点的从节点迁移 ───
    if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) {
        clusterHandleSlaveMigration(max_slaves, this_slaves);
    }

    // ─── 任务4:从节点检查是否需要发起故障转移 ───
    if (nodeIsSlave(myself)) {
        clusterHandleSlaveFailover();
    }

    // ─── 任务5:更新自身状态(slots、repl_offset)───
    clusterUpdateState();
}

23.3 PING 消息构造:clusterSendPing

void clusterSendPing(clusterLink *link, int type) {
    // 1. 确定 Gossip 节点数量
    //    携带 min(1/10 * cluster_size, 3 + pfail_count) 个节点信息
    int freshnodes = dictSize(server.cluster->nodes) - 2;
    int wanted = floor(dictSize(server.cluster->nodes) / 10);
    if (wanted < 3) wanted = 3;
    if (wanted > freshnodes) wanted = freshnodes;

    // pfail 节点强制加入(传播故障信息)
    int pfail_wanted = server.cluster->stats_pfail_nodes;

    // 2. 计算消息体大小
    int totlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
    totlen += sizeof(clusterMsgDataGossip) * (wanted + pfail_wanted);

    clusterMsg *hdr = zmalloc(totlen);
    memset(hdr, 0, sizeof(*hdr));

    // 3. 填写消息头
    clusterBuildMessageHdr(hdr, type);

    // 4. 填写 Gossip 节点信息
    int gossipcount = 0;
    while (gossipcount < wanted) {
        // 随机选节点(排除自身和已选中的)
        de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);
        if (this == myself) continue;
        if (this->flags & CLUSTER_NODE_HANDSHAKE) continue;
        // 避免重复选择
        if (this->flags & CLUSTER_NODE_SELECTED) continue;

        // 填写 gossip 信息
        clusterMsgDataGossip *gossip =
            &(hdr->data.ping.gossip[gossipcount]);
        memcpy(gossip->nodename, this->name, CLUSTER_NAMELEN);
        gossip->ping_sent = htonl(this->ping_sent / 1000);
        gossip->pong_received = htonl(this->pong_received / 1000);
        memcpy(gossip->ip, this->ip, sizeof(this->ip));
        gossip->port = htons(this->port);
        gossip->cport = htons(this->cport);
        gossip->flags = htons(this->flags);
        gossip->notused1 = 0;

        this->flags |= CLUSTER_NODE_SELECTED;
        gossipcount++;
    }

    // 5. 强制加入 PFAIL 节点(确保故障信息传播)
    if (pfail_wanted) {
        dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
        while ((de = dictNext(di)) != NULL && pfail_wanted > 0) {
            clusterNode *this = dictGetVal(de);
            if (this->flags & CLUSTER_NODE_PFAIL) {
                // 填写 pfail 节点的 gossip 信息
                // ...
                pfail_wanted--;
                gossipcount++;
            }
        }
        dictReleaseIterator(di);
    }

    hdr->count = htons(gossipcount);
    hdr->totlen = htonl(totlen);
    clusterSendMessage(link, (unsigned char*)hdr, totlen);
    zfree(hdr);
}

clusterMsg 消息头

typedef struct {
    char sig[4];                    // "RCmb"(Redis Cluster message bus)
    uint32_t totlen;                // 消息总长度
    uint16_t ver;                   // 协议版本(当前为1)
    uint16_t port;                  // 发送方端口
    uint16_t type;                  // 消息类型(PING/PONG/MEET/FAIL/...)
    uint16_t count;                 // gossip 节点数量
    uint64_t currentEpoch;          // 发送方当前纪元
    uint64_t configEpoch;           // 发送方配置纪元(主节点)
    uint64_t offset;                // 复制偏移量(从节点)
    char sender[CLUSTER_NAMELEN];   // 发送方节点名
    unsigned char myslots[CLUSTER_SLOTS/8]; // 发送方负责的槽位图(2KB)
    char slaveof[CLUSTER_NAMELEN];  // 如果是从节点,主节点名
    char myip[NET_IP_STR_LEN];      // 发送方 IP
    char notused1[34];
    uint16_t pport;                 // TLS 端口
    uint16_t cport;                 // 集群总线端口
    uint16_t flags;                 // 发送方 flags
    unsigned char state;            // 集群状态(CLUSTER_OK/CLUSTER_FAIL)
    unsigned char mflags[3];        // 消息标志位
    union clusterMsgData data;      // 消息体
} clusterMsg;

23.4 接收 PING/PONG:clusterProcessGossipSection

int clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

    while (count--) {
        uint16_t flags = ntohs(g->flags);
        sds ci = clusterGetMessageFlagsString(flags);

        // 1. 查找本地是否已知该节点
        clusterNode *node = clusterLookupNode(g->nodename);

        if (node) {
            // 2. 已知节点:更新时间戳
            if (sender && nodeIsMaster(sender) && node != myself) {
                if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) {
                    // 3. Gossip 报告该节点故障 → 添加到 fail_reports
                    if (clusterNodeAddFailureReport(node, sender)) {
                        serverLog(LL_VERBOSE,
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name);
                    }
                    // 4. 检查是否满足 FAIL 判定条件
                    markNodeAsFailingIfNeeded(node);
                } else {
                    // 节点报告为正常 → 清除 fail_reports 中该 sender 的记录
                    if (clusterNodeDelFailureReport(node, sender)) {
                        serverLog(LL_VERBOSE,
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);
                    }
                }
            }

            // 如果 gossip 说该节点可达但我们标记为 PFAIL → 清除 PFAIL
            if (!(flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) &&
                node->numfailreports) {
                // 清除可能过期的故障报告
            }
        } else {
            // 5. 未知节点:尝试加入集群(需要 MEET 或已有足够信任)
            if (!(flags & CLUSTER_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) {
                clusterStartHandshake(g->ip, ntohs(g->port), ntohs(g->cport));
            }
        }

        sdsfree(ci);
        g++;
    }
    return 1;
}

markNodeAsFailingIfNeeded:FAIL 判定逻辑

void markNodeAsFailingIfNeeded(clusterNode *node) {
    int failures;
    int needed_quorum = (server.cluster->size / 2) + 1;

    // 不是 PFAIL 则跳过(我自己也必须认为它 PFAIL)
    if (!nodeTimedOut(node)) return;
    // 已经是 FAIL 则跳过
    if (nodeFailed(node)) return;

    // 统计有效的故障报告数(排除过期报告)
    failures = clusterNodeFailureReportsCount(node);

    // 自己也算一票
    if (nodeIsMaster(myself)) failures++;

    // 未达到 quorum 则不能标记 FAIL
    if (failures < needed_quorum) return;

    serverLog(LL_NOTICE,
        "Marking node %.40s as failing (quorum reached).", node->name);

    // 标记 FAIL(清除 PFAIL,设置 FAIL + fail_time)
    node->flags &= ~CLUSTER_NODE_PFAIL;
    node->flags |= CLUSTER_NODE_FAIL;
    node->fail_time = mstime();

    // 立即广播 FAIL 消息(不等下次 PING 轮次)
    if (nodeIsMaster(myself)) clusterSendFail(node->name);
    clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}

23.5 节点状态机详解

状态转换图

                   ┌──────────────────────────────────────┐
                   │              NORMAL                    │
                   │  (flags 不含 PFAIL/FAIL)              │
                   └────────────────┬─────────────────────┘
                                    │
                     自己超过 cluster-node-timeout
                     未收到该节点任何响应
                                    │
                                    ▼
                   ┌──────────────────────────────────────┐
                   │              PFAIL                    │
                   │  (CLUSTER_NODE_PFAIL 标志位)          │
                   │  仅自己认为,未广播                    │
                   └────────────────┬─────────────────────┘
                                    │
                     fail_reports >= quorum
                     (多个主节点同意)
                                    │
                                    ▼
                   ┌──────────────────────────────────────┐
                   │              FAIL                     │
                   │  (CLUSTER_NODE_FAIL 标志位)           │
                   │  广播 FAIL 消息给所有节点             │
                   │  触发从节点发起选举                    │
                   └────────────────┬─────────────────────┘
                                    │
           ┌────────────────────────┼───────────────────────┐
           │                        │                       │
    故障转移成功               超时自动清除               手动 CLUSTER RESET
    新主节点广播                (FAIL 持续超过             (管理员操作)
    UPDATE 消息                 cluster-node-timeout * 2)
           │                        │
           ▼                        ▼
       NORMAL                   NORMAL

FAIL 状态自动恢复

// clusterCron 中:FAIL 状态超时后自动恢复
if (nodeFailed(node)) {
    mstime_t now = mstime();
    // 如果 FAIL 超过 2 * cluster_node_timeout + 4 * cluster_node_timeout 还没恢复
    // 并且节点有从节点(说明集群有冗余),则自动清除 FAIL
    if (now - node->fail_time >
        (server.cluster_node_timeout * CLUSTER_FAIL_EXPIRATION_FACTOR)) {
        node->flags &= ~CLUSTER_NODE_FAIL;
        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
        serverLog(LL_NOTICE,
            "Clear FAIL state for node %.40s: "
            "is reachable again and nobody is serving its slots after some time.",
            node->name);
    }
}

23.6 故障转移:从节点选举

void clusterHandleSlaveFailover(void) {
    mstime_t now = mstime();

    // 条件检查
    if (nodeIsMaster(myself)) return;
    if (myself->slaveof == NULL) return;
    if (!nodeFailed(myself->slaveof)) return;  // 主节点必须是 FAIL 状态

    // 计算选举延迟(避免多个从节点同时发起)
    // 复制偏移量越大的从节点,延迟越小(优先成为新主节点)
    mstime_t auth_timeout = server.cluster_node_timeout * 2;
    mstime_t auth_retry_time = auth_timeout * 2;

    if (server.cluster->failover_auth_time == 0) {
        // 首次发起:设置选举时间(加上随机延迟防止分裂脑)
        server.cluster->failover_auth_time = now +
            500 +  // 固定等待500ms(等待 FAIL 消息传播)
            random() % 500;  // 额外随机延迟

        // 基于复制偏移量的优先级延迟
        // rank 越小(数据越新),延迟越小
        server.cluster->failover_auth_rank = clusterGetSlaveRank();
        server.cluster->failover_auth_time +=
            server.cluster->failover_auth_rank * 1000;
        return;
    }

    // 等待选举时机
    if (now < server.cluster->failover_auth_time) return;

    // 超时未收到足够选票,重试
    if (now > server.cluster->failover_auth_time + auth_retry_time) {
        server.cluster->failover_auth_time = 0;
        server.cluster->failover_auth_sent = 0;
        return;
    }

    // 发送 FAILOVER_AUTH_REQUEST 给所有主节点
    if (!server.cluster->failover_auth_sent) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        serverLog(LL_WARNING, "Starting a failover election for epoch %llu.",
            (unsigned long long)server.cluster->currentEpoch);
        clusterBroadcastMessage(NULL, CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
        server.cluster->failover_auth_sent = 1;
        return;
    }

    // 检查是否收到足够票数(quorum)
    if (server.cluster->failover_auth_count >= server.cluster->size / 2 + 1) {
        serverLog(LL_WARNING,
            "Failover election won: I'm the new master.");
        // 成为新主节点
        clusterFailoverReplaceYourMaster();
    }
}

23.7 槽信息传播:UPDATE 消息

故障转移完成后,新主节点需要通知集群更新路由表:

// 发送 UPDATE 消息
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;

    clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_UPDATE);
    memcpy(hdr->data.update.nodecfg.nodename, node->name, CLUSTER_NAMELEN);
    hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
    memcpy(hdr->data.update.nodecfg.slots, node->slots, sizeof(node->slots));
    clusterSendMessage(link, (unsigned char*)hdr,
                       ntohl(hdr->totlen));
}

// 接收 UPDATE 消息处理
void clusterProcessUpdateMessage(clusterMsg *hdr, clusterLink *link) {
    clusterNode *node = clusterLookupNode(hdr->data.update.nodecfg.nodename);
    uint64_t reportedConfigEpoch =
        ntohu64(hdr->data.update.nodecfg.configEpoch);

    // 只接受纪元更大的更新(防止旧消息覆盖新状态)
    if (reportedConfigEpoch <= node->configEpoch) return;

    // 更新本地路由表:将相关槽分配给新主节点
    clusterUpdateSlotsConfigWith(node, reportedConfigEpoch,
                                 hdr->data.update.nodecfg.slots);

    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
                         CLUSTER_TODO_UPDATE_STATE |
                         CLUSTER_TODO_FSYNC_CONFIG);
}

23.8 集群消息类型完整列表

// cluster.h
#define CLUSTERMSG_TYPE_PING           0  // PING(带 Gossip 信息)
#define CLUSTERMSG_TYPE_PONG           1  // PONG(对 PING/MEET 的响应)
#define CLUSTERMSG_TYPE_MEET           2  // MEET(新节点加入)
#define CLUSTERMSG_TYPE_FAIL           3  // FAIL(节点故障广播)
#define CLUSTERMSG_TYPE_PUBLISH        4  // PUBLISH(集群模式的 Pub/Sub)
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5  // 从节点请求投票
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK    6   // 主节点确认投票
#define CLUSTERMSG_TYPE_UPDATE         7  // 配置更新(路由表变化)
#define CLUSTERMSG_TYPE_MFSTART        8  // 手动故障转移开始
#define CLUSTERMSG_TYPE_MODULE         9  // 模块消息
#define CLUSTERMSG_TYPE_PUBLISHSHARD   10 // 分片 Pub/Sub(Redis 7.0)

23.9 集群运维常用命令

# 查看集群状态
redis-cli -h 127.0.0.1 -p 7001 cluster info

# 查看所有节点
redis-cli -h 127.0.0.1 -p 7001 cluster nodes

# 查看槽分布
redis-cli -h 127.0.0.1 -p 7001 cluster slots

# 手动故障转移(在从节点上执行)
redis-cli -h 127.0.0.1 -p 7002 cluster failover

# 手动故障转移(强制,即使主节点存活)
redis-cli -h 127.0.0.1 -p 7002 cluster failover force

# 查看 meet 操作
redis-cli -h 127.0.0.1 -p 7001 cluster meet 127.0.0.1 7006

# 重置节点(谨慎!)
redis-cli -h 127.0.0.1 -p 7006 cluster reset hard

集群 Gossip 性能参数

参数 默认值 说明
cluster-node-timeout 15000ms PFAIL 超时阈值
cluster-announce-ip 对外宣告的 IP(NAT 场景)
cluster-announce-port 0 对外宣告的端口
cluster-announce-bus-port 0 对外宣告的集群总线端口
cluster-migration-barrier 1 从节点迁移前,主节点最少保留的从节点数
cluster-allow-reads-when-down no 集群不可用时是否允许读操作
cluster-link-sendbuf-limit 0 集群消息发送缓冲区限制

本章小结

本章评分
4.8  / 5  (7 评分)

💬 留言讨论