第 23 章
Cluster Gossip 源码:节点状态机与消息传播
第23章 Cluster Gossip 源码:节点状态机与消息传播
Redis Cluster 使用 Gossip 协议实现去中心化的节点发现和故障检测。本章深入 cluster.c 的 Gossip 实现,分析节点状态机转换、PING/PONG 消息构造、故障广播和槽信息传播的完整源码流程。
23.1 集群数据结构总览
clusterState:集群全局状态
// cluster.h
typedef struct clusterState {
clusterNode *myself; // 指向自身节点的指针
uint64_t currentEpoch; // 当前纪元(全局单调递增)
int state; // CLUSTER_OK 或 CLUSTER_FAIL
int size; // 集群中有槽的主节点数量
dict *nodes; // name → clusterNode* 映射(所有已知节点)
dict *nodes_black_list; // 黑名单(防止重复加入)
clusterNode *migrating_slots_to[CLUSTER_SLOTS]; // 正在迁出的槽
clusterNode *importing_slots_from[CLUSTER_SLOTS]; // 正在迁入的槽
clusterNode *slots[CLUSTER_SLOTS]; // 槽 → 主节点映射(16384个)
uint64_t slots_keys_count[CLUSTER_SLOTS]; // 每个槽的 key 数量
rax *slots_to_keys; // 槽 → key 列表(Radix Tree)
// 故障转移状态
mstime_t failover_auth_time; // 上次发起选举的时间
int failover_auth_count; // 收到的选票数
int failover_auth_sent; // 是否已发起过选举
int failover_auth_rank; // 当前从库的排名
uint64_t failover_auth_epoch; // 选举纪元
int cant_failover_reason; // 无法故障转移的原因码
// 统计
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
} clusterState;
clusterNode:节点描述符
typedef struct clusterNode {
mstime_t ctime; // 节点创建时间
char name[CLUSTER_NAMELEN]; // 节点名(40字节十六进制 ID)
int flags; // 节点标志位(见下方状态机)
uint64_t configEpoch; // 节点的配置纪元(槽所有权版本号)
unsigned char slots[CLUSTER_SLOTS/8]; // 该节点负责的槽位图(2KB)
int numslots; // 负责的槽数量
int numslaves; // 从节点数量
struct clusterNode **slaves; // 从节点数组
struct clusterNode *slaveof; // 如果是从节点,指向主节点
unsigned long long repl_offset; // 复制偏移量(从节点迁移决策用)
mstime_t ping_sent; // 上次发送 PING 的时间
mstime_t pong_received; // 上次收到 PONG 的时间
mstime_t data_received; // 上次收到任何数据的时间
mstime_t fail_time; // 被标记为 FAIL 的时间
mstime_t voted_time; // 上次投票的时间
mstime_t repl_offset_time; // repl_offset 更新时间
mstime_t orphaned_time; // 成为孤立主节点的时间
long long repl_offset; // 已知的复制偏移量
char ip[NET_IP_STR_LEN]; // 节点 IP
int port; // 节点端口(客户端端口)
int pport; // 节点 TLS 端口
int cport; // 集群通信端口(port + 10000)
clusterLink *link; // TCP 连接(用于发送 cluster bus 消息)
list *fail_reports; // 报告该节点 PFAIL 的节点列表
} clusterNode;
节点标志位(flags 字段)
#define CLUSTER_NODE_MASTER (1<<0) // 主节点
#define CLUSTER_NODE_SLAVE (1<<1) // 从节点
#define CLUSTER_NODE_PFAIL (1<<2) // 疑似故障(自己判断超时)
#define CLUSTER_NODE_FAIL (1<<3) // 确认故障(多节点同意)
#define CLUSTER_NODE_MYSELF (1<<4) // 代表自身
#define CLUSTER_NODE_HANDSHAKE (1<<5) // 正在进行握手
#define CLUSTER_NODE_NOADDR (1<<6) // 地址未知
#define CLUSTER_NODE_MEET (1<<7) // 需要发送 MEET
#define CLUSTER_NODE_MIGRATE_TO (1<<8) // 是槽迁移的目标
#define CLUSTER_NODE_NOFAILOVER (1<<9) // 禁止自动故障转移(slave-no-failover)
23.2 clusterCron:Gossip 心跳定时器
clusterCron 每 100ms 由 serverCron 触发一次,是 Gossip 协议的核心驱动函数:
void clusterCron(void) {
static long long iteration = 0;
iteration++; // 用于控制不同频率的子任务
// ─── 任务1:处理超时节点 ───
mstime_t now = mstime();
mstime_t handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;
int orphaned_masters = 0, max_slaves = 0, this_slaves = 0;
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) continue;
// 握手超时:删除节点
if (nodeInHandshake(node) &&
now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}
// 确保已连接
if (node->link == NULL) {
clusterLink *link = createClusterLink(node);
clusterSendPing(link, CLUSTERMSG_TYPE_MEET);
continue;
}
// ─── PFAIL 检测 ───
// 超过 cluster-node-timeout 未收到 PONG → 标记 PFAIL
if (node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout) {
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
if (node->ping_sent != 0 &&
(now - node->ping_sent) > server.cluster_node_timeout / 2) {
// PING 超过 timeout/2 未收到 PONG → PFAIL
clusterSetNodeAsFailing(node); // 设置 CLUSTER_NODE_PFAIL
}
// 孤立主节点检测(用于从节点迁移)
if (nodeIsMaster(node) && node->numslots > 0) {
if (node->numslaves == 0) {
orphaned_masters++;
}
if (node->numslaves > max_slaves) max_slaves = node->numslaves;
if (myself->slaveof == node) this_slaves = node->numslaves;
}
}
dictReleaseIterator(di);
// ─── 任务2:随机选节点发送 PING(保证 Gossip 传播)───
// 每秒至少向一个随机节点发 PING
if (!(iteration % 10)) {
int j;
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
// 选出最久未 PING 的随机节点
if (minpong == 0 || this->pong_received < minpong) {
minpong = this->pong_received;
minpong_node = this;
}
}
if (minpong_node) {
clusterSendPing(minpong_node->link, CLUSTERMSG_TYPE_PING);
}
}
// ─── 任务3:孤立主节点的从节点迁移 ───
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) {
clusterHandleSlaveMigration(max_slaves, this_slaves);
}
// ─── 任务4:从节点检查是否需要发起故障转移 ───
if (nodeIsSlave(myself)) {
clusterHandleSlaveFailover();
}
// ─── 任务5:更新自身状态(slots、repl_offset)───
clusterUpdateState();
}
23.3 PING 消息构造:clusterSendPing
void clusterSendPing(clusterLink *link, int type) {
// 1. 确定 Gossip 节点数量
// 携带 min(1/10 * cluster_size, 3 + pfail_count) 个节点信息
int freshnodes = dictSize(server.cluster->nodes) - 2;
int wanted = floor(dictSize(server.cluster->nodes) / 10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
// pfail 节点强制加入(传播故障信息)
int pfail_wanted = server.cluster->stats_pfail_nodes;
// 2. 计算消息体大小
int totlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataGossip) * (wanted + pfail_wanted);
clusterMsg *hdr = zmalloc(totlen);
memset(hdr, 0, sizeof(*hdr));
// 3. 填写消息头
clusterBuildMessageHdr(hdr, type);
// 4. 填写 Gossip 节点信息
int gossipcount = 0;
while (gossipcount < wanted) {
// 随机选节点(排除自身和已选中的)
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
if (this == myself) continue;
if (this->flags & CLUSTER_NODE_HANDSHAKE) continue;
// 避免重复选择
if (this->flags & CLUSTER_NODE_SELECTED) continue;
// 填写 gossip 信息
clusterMsgDataGossip *gossip =
&(hdr->data.ping.gossip[gossipcount]);
memcpy(gossip->nodename, this->name, CLUSTER_NAMELEN);
gossip->ping_sent = htonl(this->ping_sent / 1000);
gossip->pong_received = htonl(this->pong_received / 1000);
memcpy(gossip->ip, this->ip, sizeof(this->ip));
gossip->port = htons(this->port);
gossip->cport = htons(this->cport);
gossip->flags = htons(this->flags);
gossip->notused1 = 0;
this->flags |= CLUSTER_NODE_SELECTED;
gossipcount++;
}
// 5. 强制加入 PFAIL 节点(确保故障信息传播)
if (pfail_wanted) {
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *this = dictGetVal(de);
if (this->flags & CLUSTER_NODE_PFAIL) {
// 填写 pfail 节点的 gossip 信息
// ...
pfail_wanted--;
gossipcount++;
}
}
dictReleaseIterator(di);
}
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
clusterSendMessage(link, (unsigned char*)hdr, totlen);
zfree(hdr);
}
clusterMsg 消息头
typedef struct {
char sig[4]; // "RCmb"(Redis Cluster message bus)
uint32_t totlen; // 消息总长度
uint16_t ver; // 协议版本(当前为1)
uint16_t port; // 发送方端口
uint16_t type; // 消息类型(PING/PONG/MEET/FAIL/...)
uint16_t count; // gossip 节点数量
uint64_t currentEpoch; // 发送方当前纪元
uint64_t configEpoch; // 发送方配置纪元(主节点)
uint64_t offset; // 复制偏移量(从节点)
char sender[CLUSTER_NAMELEN]; // 发送方节点名
unsigned char myslots[CLUSTER_SLOTS/8]; // 发送方负责的槽位图(2KB)
char slaveof[CLUSTER_NAMELEN]; // 如果是从节点,主节点名
char myip[NET_IP_STR_LEN]; // 发送方 IP
char notused1[34];
uint16_t pport; // TLS 端口
uint16_t cport; // 集群总线端口
uint16_t flags; // 发送方 flags
unsigned char state; // 集群状态(CLUSTER_OK/CLUSTER_FAIL)
unsigned char mflags[3]; // 消息标志位
union clusterMsgData data; // 消息体
} clusterMsg;
23.4 接收 PING/PONG:clusterProcessGossipSection
int clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
while (count--) {
uint16_t flags = ntohs(g->flags);
sds ci = clusterGetMessageFlagsString(flags);
// 1. 查找本地是否已知该节点
clusterNode *node = clusterLookupNode(g->nodename);
if (node) {
// 2. 已知节点:更新时间戳
if (sender && nodeIsMaster(sender) && node != myself) {
if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) {
// 3. Gossip 报告该节点故障 → 添加到 fail_reports
if (clusterNodeAddFailureReport(node, sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
// 4. 检查是否满足 FAIL 判定条件
markNodeAsFailingIfNeeded(node);
} else {
// 节点报告为正常 → 清除 fail_reports 中该 sender 的记录
if (clusterNodeDelFailureReport(node, sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
// 如果 gossip 说该节点可达但我们标记为 PFAIL → 清除 PFAIL
if (!(flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) &&
node->numfailreports) {
// 清除可能过期的故障报告
}
} else {
// 5. 未知节点:尝试加入集群(需要 MEET 或已有足够信任)
if (!(flags & CLUSTER_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) {
clusterStartHandshake(g->ip, ntohs(g->port), ntohs(g->cport));
}
}
sdsfree(ci);
g++;
}
return 1;
}
markNodeAsFailingIfNeeded:FAIL 判定逻辑
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
int needed_quorum = (server.cluster->size / 2) + 1;
// 不是 PFAIL 则跳过(我自己也必须认为它 PFAIL)
if (!nodeTimedOut(node)) return;
// 已经是 FAIL 则跳过
if (nodeFailed(node)) return;
// 统计有效的故障报告数(排除过期报告)
failures = clusterNodeFailureReportsCount(node);
// 自己也算一票
if (nodeIsMaster(myself)) failures++;
// 未达到 quorum 则不能标记 FAIL
if (failures < needed_quorum) return;
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
// 标记 FAIL(清除 PFAIL,设置 FAIL + fail_time)
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
// 立即广播 FAIL 消息(不等下次 PING 轮次)
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
}
23.5 节点状态机详解
状态转换图
┌──────────────────────────────────────┐
│ NORMAL │
│ (flags 不含 PFAIL/FAIL) │
└────────────────┬─────────────────────┘
│
自己超过 cluster-node-timeout
未收到该节点任何响应
│
▼
┌──────────────────────────────────────┐
│ PFAIL │
│ (CLUSTER_NODE_PFAIL 标志位) │
│ 仅自己认为,未广播 │
└────────────────┬─────────────────────┘
│
fail_reports >= quorum
(多个主节点同意)
│
▼
┌──────────────────────────────────────┐
│ FAIL │
│ (CLUSTER_NODE_FAIL 标志位) │
│ 广播 FAIL 消息给所有节点 │
│ 触发从节点发起选举 │
└────────────────┬─────────────────────┘
│
┌────────────────────────┼───────────────────────┐
│ │ │
故障转移成功 超时自动清除 手动 CLUSTER RESET
新主节点广播 (FAIL 持续超过 (管理员操作)
UPDATE 消息 cluster-node-timeout * 2)
│ │
▼ ▼
NORMAL NORMAL
FAIL 状态自动恢复
// clusterCron 中:FAIL 状态超时后自动恢复
if (nodeFailed(node)) {
mstime_t now = mstime();
// 如果 FAIL 超过 2 * cluster_node_timeout + 4 * cluster_node_timeout 还没恢复
// 并且节点有从节点(说明集群有冗余),则自动清除 FAIL
if (now - node->fail_time >
(server.cluster_node_timeout * CLUSTER_FAIL_EXPIRATION_FACTOR)) {
node->flags &= ~CLUSTER_NODE_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: "
"is reachable again and nobody is serving its slots after some time.",
node->name);
}
}
23.6 故障转移:从节点选举
void clusterHandleSlaveFailover(void) {
mstime_t now = mstime();
// 条件检查
if (nodeIsMaster(myself)) return;
if (myself->slaveof == NULL) return;
if (!nodeFailed(myself->slaveof)) return; // 主节点必须是 FAIL 状态
// 计算选举延迟(避免多个从节点同时发起)
// 复制偏移量越大的从节点,延迟越小(优先成为新主节点)
mstime_t auth_timeout = server.cluster_node_timeout * 2;
mstime_t auth_retry_time = auth_timeout * 2;
if (server.cluster->failover_auth_time == 0) {
// 首次发起:设置选举时间(加上随机延迟防止分裂脑)
server.cluster->failover_auth_time = now +
500 + // 固定等待500ms(等待 FAIL 消息传播)
random() % 500; // 额外随机延迟
// 基于复制偏移量的优先级延迟
// rank 越小(数据越新),延迟越小
server.cluster->failover_auth_rank = clusterGetSlaveRank();
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
return;
}
// 等待选举时机
if (now < server.cluster->failover_auth_time) return;
// 超时未收到足够选票,重试
if (now > server.cluster->failover_auth_time + auth_retry_time) {
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_sent = 0;
return;
}
// 发送 FAILOVER_AUTH_REQUEST 给所有主节点
if (!server.cluster->failover_auth_sent) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING, "Starting a failover election for epoch %llu.",
(unsigned long long)server.cluster->currentEpoch);
clusterBroadcastMessage(NULL, CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
server.cluster->failover_auth_sent = 1;
return;
}
// 检查是否收到足够票数(quorum)
if (server.cluster->failover_auth_count >= server.cluster->size / 2 + 1) {
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
// 成为新主节点
clusterFailoverReplaceYourMaster();
}
}
23.7 槽信息传播:UPDATE 消息
故障转移完成后,新主节点需要通知集群更新路由表:
// 发送 UPDATE 消息
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
clusterBuildMessageHdr(hdr, CLUSTERMSG_TYPE_UPDATE);
memcpy(hdr->data.update.nodecfg.nodename, node->name, CLUSTER_NAMELEN);
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
memcpy(hdr->data.update.nodecfg.slots, node->slots, sizeof(node->slots));
clusterSendMessage(link, (unsigned char*)hdr,
ntohl(hdr->totlen));
}
// 接收 UPDATE 消息处理
void clusterProcessUpdateMessage(clusterMsg *hdr, clusterLink *link) {
clusterNode *node = clusterLookupNode(hdr->data.update.nodecfg.nodename);
uint64_t reportedConfigEpoch =
ntohu64(hdr->data.update.nodecfg.configEpoch);
// 只接受纪元更大的更新(防止旧消息覆盖新状态)
if (reportedConfigEpoch <= node->configEpoch) return;
// 更新本地路由表:将相关槽分配给新主节点
clusterUpdateSlotsConfigWith(node, reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
CLUSTER_TODO_UPDATE_STATE |
CLUSTER_TODO_FSYNC_CONFIG);
}
23.8 集群消息类型完整列表
// cluster.h
#define CLUSTERMSG_TYPE_PING 0 // PING(带 Gossip 信息)
#define CLUSTERMSG_TYPE_PONG 1 // PONG(对 PING/MEET 的响应)
#define CLUSTERMSG_TYPE_MEET 2 // MEET(新节点加入)
#define CLUSTERMSG_TYPE_FAIL 3 // FAIL(节点故障广播)
#define CLUSTERMSG_TYPE_PUBLISH 4 // PUBLISH(集群模式的 Pub/Sub)
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 // 从节点请求投票
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 // 主节点确认投票
#define CLUSTERMSG_TYPE_UPDATE 7 // 配置更新(路由表变化)
#define CLUSTERMSG_TYPE_MFSTART 8 // 手动故障转移开始
#define CLUSTERMSG_TYPE_MODULE 9 // 模块消息
#define CLUSTERMSG_TYPE_PUBLISHSHARD 10 // 分片 Pub/Sub(Redis 7.0)
23.9 集群运维常用命令
# 查看集群状态
redis-cli -h 127.0.0.1 -p 7001 cluster info
# 查看所有节点
redis-cli -h 127.0.0.1 -p 7001 cluster nodes
# 查看槽分布
redis-cli -h 127.0.0.1 -p 7001 cluster slots
# 手动故障转移(在从节点上执行)
redis-cli -h 127.0.0.1 -p 7002 cluster failover
# 手动故障转移(强制,即使主节点存活)
redis-cli -h 127.0.0.1 -p 7002 cluster failover force
# 查看 meet 操作
redis-cli -h 127.0.0.1 -p 7001 cluster meet 127.0.0.1 7006
# 重置节点(谨慎!)
redis-cli -h 127.0.0.1 -p 7006 cluster reset hard
集群 Gossip 性能参数
| 参数 | 默认值 | 说明 |
|---|---|---|
| cluster-node-timeout | 15000ms | PFAIL 超时阈值 |
| cluster-announce-ip | 无 | 对外宣告的 IP(NAT 场景) |
| cluster-announce-port | 0 | 对外宣告的端口 |
| cluster-announce-bus-port | 0 | 对外宣告的集群总线端口 |
| cluster-migration-barrier | 1 | 从节点迁移前,主节点最少保留的从节点数 |
| cluster-allow-reads-when-down | no | 集群不可用时是否允许读操作 |
| cluster-link-sendbuf-limit | 0 | 集群消息发送缓冲区限制 |
本章小结
clusterState和clusterNode是集群状态的完整表示,slots数组(16384项)是路由核心clusterCron(每100ms)驱动 Gossip:超时检测、随机 PING、孤立主节点处理、从节点故障转移检查- PING 消息携带随机节点的 Gossip 信息(最多 max(cluster_size/10, 3) 个),PFAIL 节点优先加入
- PFAIL → FAIL 需要 quorum(过半主节点同意),避免网络分区误判
- FAIL 消息一旦确认立即广播,不等 PING 轮次
- 故障转移选举通过
FAILOVER_AUTH_REQUEST/ACK消息进行,复制偏移量最大的从节点优先胜出 UPDATE消息在配置纪元增大时传播新路由信息,接收方更新本地slots路由表- 整个 Gossip 系统无中心节点,完全去中心化,N个节点的信息传播时间复杂度为 O(log N)