第 31 章

跨集群复制与容灾架构

第31章:跨集群复制与容灾架构

导读:如何实现 Kafka 跨集群复制与容灾?

本章核心问题:如何实现 Kafka 跨集群复制与容灾?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

为什么单集群无法满足生产需求

生产级 Kafka 部署很快就会面临单集群无法解决的问题:

跨集群复制(Cross-Cluster Replication)是解决上述问题的核心技术。本章重点介绍 MirrorMaker 2(Apache 开源,Kafka 2.4+ 内置),以及三种主要的拓扑架构及其切换操作。

MirrorMaker 2 架构深解

基于 Kafka Connect 的设计哲学

MirrorMaker 1(Kafka 2.3 及以前)是一个简单的独立进程:启动多个 Consumer 消费源集群,再作为 Producer 写入目标集群。它的问题是:无状态、难以扩展、不支持消费者位移同步、Topic 元数据不自动同步。

MirrorMaker 2(MM2)从根本上重构了这一设计,基于 Kafka Connect 框架构建:

MM2 利用 Kafka Connect 的分布式模式,支持水平扩展(增加 Worker 节点)、任务自动分配和重平衡、内置的容错机制。

Topic 命名:源集群别名前缀

MM2 默认在目标集群的 Topic 名称前加上源集群别名(alias),这是一个重要的设计决策:

这个命名约定解决了双向复制(Active-Active)中的循环复制问题——us-east.orders 不会再被复制回 us-east 集群,因为 MM2 知道这个 Topic 已经来自 us-east

MM2 同时自动同步以下元数据:

Offset 翻译机制

源集群的 Offset 和目标集群的 Offset 是不同的。源集群 orders 分区 0 的 Offset 100,在目标集群 us-east.orders 分区 0 的 Offset 可能是 95(部分消息可能已经被压缩或丢弃)。

MM2 通过 Checkpoint 机制维护这个映射关系:

内部 Topic:us-east.checkpoints.internal(存储在目标集群)
格式:
  key: <group-id> + <topic> + <partition>
  value: <source-offset> + <target-offset> + <metadata>

当需要 Failover 时,可以使用 kafka-consumer-groups.sh--reset-offsets 或 MM2 提供的 MirrorClient API 来翻译 Offset。

三种拓扑架构

拓扑一:Active-Passive(主备架构)

场景:一个主数据中心(US-East)承载所有生产流量,一个灾备数据中心(US-West)保持热备份,主数据中心不可用时手动切换。

US-East (Primary)              US-West (Standby)
┌─────────────────────┐        ┌─────────────────────┐
│  Producers → Kafka  │──MM2──→│  Kafka (read-only)  │
│  Consumers ← Kafka  │        │  (no producers)     │
└─────────────────────┘        └─────────────────────┘

优点

缺点

配置

# mm2.properties - Active-Passive
us-east->us-west.enabled = true
us-west->us-east.enabled = false

拓扑二:Active-Active(双活架构)

场景:两个数据中心同时承载生产流量,互相复制数据,任何一个数据中心故障时,流量自动切换到另一个。

US-East (Active)               US-West (Active)
┌─────────────────────┐  MM2   ┌─────────────────────┐
│ Producers → Kafka   │───────→│ Kafka → Consumers   │
│ Consumers ← Kafka   │←───────│ Kafka ← Producers   │
└─────────────────────┘  MM2   └─────────────────────┘

双向复制配置

# mm2.properties - Active-Active(双向复制)
us-east->us-west.enabled = true
us-west->us-east.enabled = true

# 关键:Topic 前缀防止循环复制
# us-east 产生的 orders → us-west 复制为 us-east.orders
# us-east.orders 不会再被复制回 us-east(MM2 识别前缀)

Active-Active 的核心挑战:消息重复和冲突

在双活模式下,同一条业务事件可能在两个集群中被独立产生(例如,同一个用户在两个数据中心同时发起订单)。解决方案:

  1. 全局唯一 Message Key:使用 UUID 或包含来源标识符的复合 Key(如 us-east:order-12345),消费者去重。
  2. "Last Write Wins" 语义:对于状态更新类操作,基于时间戳选择最新值(需要可靠的时钟同步)。
  3. 业务层幂等:Consumer 端实现幂等处理,相同 Key 的重复消息不会产生重复副作用。

Active-Active 适用的业务场景

不适用 Active-Active 的场景

拓扑三:聚合架构(多 Edge → 中央)

场景:多个边缘集群(工厂、零售门店、IoT 网关)各自独立运行,将数据汇聚到中央集群进行统一存储、分析和处理。

Factory-A ──MM2──┐
Factory-B ──MM2──┼──→ Central Cluster → Analytics
Factory-C ──MM2──┘

配置示例(中央集群接收来自多个边缘的数据):

# mm2.properties - 聚合模式
clusters = factory-a, factory-b, factory-c, central

factory-a.bootstrap.servers = factory-a-kafka:9092
factory-b.bootstrap.servers = factory-b-kafka:9092
factory-c.bootstrap.servers = factory-c-kafka:9092
central.bootstrap.servers = central-kafka-1:9092,central-kafka-2:9092

factory-a->central.enabled = true
factory-b->central.enabled = true
factory-c->central.enabled = true

# 中央集群不向边缘回写
central->factory-a.enabled = false
central->factory-b.enabled = false
central->factory-c.enabled = false

中央集群接收到的 Topic:

分析层可以从这三个 Topic 中消费,或者通过 Kafka Streams 将它们 merge 成一个统一流。

灾难恢复演练:像 Netflix 一样测试

未经测试的 DR 方案在真正需要时往往会失败。Netflix 的 Chaos Engineering 文化给了我们重要启示:定期主动引入故障,比等待真实故障时手忙脚乱要好得多

季度性 Failover 演练计划

演练计划(每季度执行一次):

1. 提前 1 周通知(选择低峰时段,如周六凌晨)
2. 演练范围:选择 1-2 个非核心 Consumer Group
3. 操作步骤:
   a. 停止选定的 Consumer Group(模拟应用故障)
   b. 等待 MM2 追赶(确认复制延迟归零)
   c. 执行 Offset 翻译
   d. 在备用集群重启 Consumer Group
   e. 验证消费位置正确(无消息丢失、无重复消费)
   f. 记录实际 RTO(从步骤 c 开始到步骤 e 完成)
4. 复盘会议:对比预期 RTO vs 实际 RTO,更新 Runbook

Chaos Engineering 自动化

#!/bin/bash
# dr-failover-test.sh - 自动化 DR 测试脚本

set -e

SOURCE_CLUSTER="kafka-east-1:9092"
DR_CLUSTER="kafka-west-1:9092"
CONSUMER_GROUP="payment-consumer-group-test"
TOPIC="us-east.orders"

echo "=== DR Failover Test $(date) ==="

# Step 1: 记录源集群的当前 Offset
echo "Step 1: Recording current offsets on source cluster..."
kafka-consumer-groups.sh \
  --bootstrap-server "$SOURCE_CLUSTER" \
  --group "$CONSUMER_GROUP" \
  --describe > /tmp/pre-failover-offsets.txt

# Step 2: 翻译 Offset
echo "Step 2: Translating offsets via MM2 checkpoints..."
# 调用 RemoteClusterUtils(此处简化为读取 checkpoint topic)
kafka-console-consumer.sh \
  --bootstrap-server "$DR_CLUSTER" \
  --topic us-east.checkpoints.internal \
  --from-beginning \
  --max-messages 1000 \
  | grep "$CONSUMER_GROUP" > /tmp/checkpoints.txt

# Step 3: 在 DR 集群提交翻译后的 Offset
echo "Step 3: Applying translated offsets to DR cluster..."
# (实际操作需要解析 checkpoints.txt 并执行 reset-offsets)

# Step 4: 验证消费位置
echo "Step 4: Verifying consumer group offsets on DR cluster..."
kafka-consumer-groups.sh \
  --bootstrap-server "$DR_CLUSTER" \
  --group "$CONSUMER_GROUP" \
  --describe

echo "=== Failover test completed ==="

Level 2 · 它是怎么运行的(3-5年经验)

Active-Passive Failover 操作手册

以下是 Active-Passive 架构下,主集群故障时的标准 Failover 操作流程:

步骤 1:确认主集群故障,停止 Producer

# 在主集群(US-East)确认无法连接
kafka-broker-api-versions.sh --bootstrap-server kafka-east-1:9092
# 如果连接失败,确认故障属实

# 通知应用层:停止向主集群写入(通过配置中心或特性开关)
# 等待所有 in-flight 请求超时(通常 30-60 秒)

步骤 2:等待 MM2 追赶

# 查看备用集群(US-West)中最新 heartbeat 的时间戳
# 如果 heartbeat 停止更新,说明主集群已完全不可用
kafka-console-consumer.sh \
  --bootstrap-server kafka-west-1:9092 \
  --topic us-east.heartbeats \
  --from-beginning \
  --property print.timestamp=true \
  | tail -5

# 检查复制 Lag(目标集群是否已追上源集群的最后已知位置)
kafka-jmx.sh \
  --object-name "kafka.connect.mirror:type=MirrorSourceConnector" \
  --attributes record-lag

步骤 3:翻译 Consumer Group Offset

这是 Failover 中最关键的技术步骤。Consumer 在源集群的 Offset 不能直接用于目标集群——必须通过 MM2 的 Checkpoint 数据进行翻译。

使用 MM2 提供的 RemoteClusterUtils(编程方式):

import org.apache.kafka.connect.mirror.RemoteClusterUtils;

Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", "kafka-west-1:9092");

// 将 Consumer Group 在 us-east 集群的 Offset 翻译到 us-west 集群的等效 Offset
Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
    RemoteClusterUtils.translateOffsets(
        config,
        "us-east",                  // 源集群别名
        "payment-consumer-group",   // 要翻译的 Consumer Group
        Duration.ofSeconds(30)      // 超时时间
    );

// 将翻译后的 Offset 提交到目标集群
AdminClient adminClient = AdminClient.create(config);
adminClient.alterConsumerGroupOffsets(
    "payment-consumer-group",
    translatedOffsets
).all().get();

命令行方式(适合快速操作):

# 1. 从 Checkpoint Topic 读取最新的 Offset 映射
kafka-console-consumer.sh \
  --bootstrap-server kafka-west-1:9092 \
  --topic us-east.checkpoints.internal \
  --property print.key=true \
  --property print.value=true \
  --from-beginning \
  | grep "payment-consumer-group"

# 2. 手动重置 Offset 到对应位置
# (根据上一步的输出确定目标 Offset)
kafka-consumer-groups.sh \
  --bootstrap-server kafka-west-1:9092 \
  --group payment-consumer-group \
  --reset-offsets \
  --topic us-east.orders:0:95 \  # 分区 0 重置到 Offset 95
  --execute

步骤 4:将 Producer 和 Consumer 重定向到备用集群

# 更新应用配置(通过 Kubernetes ConfigMap 或配置中心)
# 将 bootstrap.servers 从 kafka-east-1:9092 改为 kafka-west-1:9092

# 验证 Consumer 已从正确位置开始消费
kafka-consumer-groups.sh \
  --bootstrap-server kafka-west-1:9092 \
  --group payment-consumer-group \
  --describe

RPO 和 RTO 分析

Confluent Cluster Linking vs MirrorMaker 2

Confluent Platform 和 Confluent Cloud 提供了 Cluster Linking 功能,作为 MM2 的商业替代方案:

特性 MirrorMaker 2 Confluent Cluster Linking
开源 是(Apache 2.0) 否(Confluent 商业版)
复制延迟 秒级到分钟级 亚秒级(通常 < 1s)
实现机制 Consumer + Producer 直接从 Leader 读取(类似内部副本同步机制)
Offset 保留 需要翻译 保持原始 Offset(无需翻译!)
Consumer Group 同步 通过 Checkpoint,延迟分钟级 自动,毫秒级
Topic 命名 默认加前缀 可选,支持透明镜像
适用场景 所有 Kafka 环境 Confluent Cloud 或 Confluent Platform

Cluster Linking 的 Offset 保留能力彻底消除了 Failover 时的 Offset 翻译步骤,大幅降低了 Failover 的复杂性和 RTO。代价是需要 Confluent 商业订阅。

小结:容灾能力的成本模型

选择容灾架构时,需要在以下维度之间做权衡:

维度 Active-Passive Active-Active 聚合架构
RPO 秒级到分钟级 接近零(双写) N/A(单向)
RTO 5-15 分钟(手动) 接近零(自动) N/A
成本 中(备用集群闲置) 高(双倍资源) 低(边缘轻量)
复杂度 高(需处理冲突)
适用场景 通用 DR 极高可用性要求 边缘数据收集

没有一种架构适合所有场景。关键业务(支付、核心交易)值得投入 Active-Active 的复杂性换取接近零的 RTO;一般业务使用 Active-Passive 在成本和可靠性之间取得平衡;数据采集场景使用聚合架构最为合适。

最重要的是:无论选择哪种架构,都必须定期演练 Failover,并将演练结果记录和改进到 Runbook 中。从未演练的容灾方案,在真正的灾难发生时等于没有。


Level 3 · 规范怎么定义的(资深)

完整部署配置

MM2 作为独立进程部署

# mm2.properties - MirrorMaker 2 配置文件
# 定义集群别名和连接信息
clusters = us-east, us-west

us-east.bootstrap.servers = kafka-east-1:9092,kafka-east-2:9092,kafka-east-3:9092
us-west.bootstrap.servers = kafka-west-1:9092,kafka-west-2:9092,kafka-west-3:9092

# 复制方向:us-east 到 us-west(Active-Passive 模式)
us-east->us-west.enabled = true
us-west->us-east.enabled = false    # Active-Passive 下禁用反向复制

# Topic 过滤规则(正则表达式)
us-east->us-west.topics = orders.*, payments.*, inventory.*
us-east->us-west.topics.blacklist = .*\.internal, __consumer_offsets

# Consumer Group Offset 同步
us-east->us-west.groups = payment-consumer-group, order-processor-group
us-east->us-west.groups.blacklist = console-consumer-.*

# 同步 Consumer Group Offset(启用 Checkpoint)
us-east->us-west.sync.group.offsets.enabled = true
us-east->us-west.sync.group.offsets.interval.seconds = 60

# Heartbeat(用于监控复制延迟)
us-east->us-west.emit.heartbeats.enabled = true
us-east->us-west.emit.heartbeats.interval.seconds = 5

# 复制因子(目标集群)
us-east->us-west.replication.factor = 3

# 主题名称转换(是否保留源集群前缀)
# true: 目标 Topic 命名为 us-east.orders(推荐,避免循环复制)
# false: 目标 Topic 命名为 orders(危险!Active-Active 模式会导致无限循环)
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy

# MirrorMaker 2 自身元数据存储位置(目标集群)
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3

# Connect Worker 配置
# 分布式模式:允许多个 Worker 节点共享负载
config.storage.topic = mm2-configs
offset.storage.topic = mm2-offsets
status.storage.topic = mm2-status
config.storage.replication.factor = 3
offset.storage.replication.factor = 3
status.storage.replication.factor = 3

启动 MM2:

connect-mirror-maker.sh mm2.properties

在 Kubernetes 上部署 MM2

# kubernetes/mm2-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mirrormaker2
  namespace: kafka
spec:
  replicas: 3    # 3 个 Worker 节点,负载均衡
  selector:
    matchLabels:
      app: mirrormaker2
  template:
    metadata:
      labels:
        app: mirrormaker2
    spec:
      containers:
        - name: mirrormaker2
          image: confluentinc/cp-kafka-connect:7.6.0
          command:
            - /bin/bash
            - -c
            - connect-mirror-maker.sh /etc/mm2/mm2.properties
          env:
            - name: KAFKA_HEAP_OPTS
              value: "-Xms512m -Xmx1g"
          volumeMounts:
            - name: mm2-config
              mountPath: /etc/mm2
          resources:
            requests:
              cpu: "1"
              memory: "2Gi"
            limits:
              cpu: "2"
              memory: "4Gi"
      volumes:
        - name: mm2-config
          configMap:
            name: mm2-config

监控复制延迟:

# 查看 heartbeat topic,检查最新 heartbeat 时间
kafka-console-consumer.sh \
  --bootstrap-server kafka-west-1:9092 \
  --topic us-east.heartbeats \
  --from-beginning \
  --max-messages 10

# 通过 JMX 查看 MM2 复制延迟(毫秒)
kafka-jmx.sh \
  --object-name "kafka.connect.mirror:type=MirrorSourceConnector,target=us-west,topic=orders,partition=0" \
  --attributes replication-latency-ms-avg

Level 4 · 边界与陷阱(所有人)

以下是与"跨集群复制与容灾架构"相关的常见边界问题和生产陷阱:

陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。

陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。

陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。

本章评分
4.6  / 5  (3 评分)

💬 留言讨论