Chapter 31

Cross-Cluster Replication and Disaster Recovery

Why Single-Cluster Deployments Hit a Wall

Production Kafka deployments quickly encounter scenarios that a single cluster cannot address:

Cross-cluster replication is the foundational technology for addressing all of these. This chapter focuses on MirrorMaker 2 (MM2), the Apache open-source replication engine built into Kafka since version 2.4, covering its architecture, three major deployment topologies, and the precise operational steps for disaster recovery failover.

MirrorMaker 2 Architecture

Built on Kafka Connect

MirrorMaker 1 (pre-Kafka 2.4) was a simple standalone process: multiple consumer threads reading from the source cluster, each acting as a producer writing to the destination cluster. Its fundamental problems were statelessness, inability to translate consumer group offsets, no topic metadata synchronization, and poor scalability.

MM2 is a complete architectural redesign built on the Kafka Connect framework:

By building on Kafka Connect's distributed mode, MM2 inherits horizontal scalability (add worker nodes to increase throughput), automatic task assignment and rebalancing, and built-in fault tolerance.

Topic Naming: Source Cluster Alias Prefix

MM2 prefixes destination topic names with the source cluster alias. This is not cosmetic โ€” it is a fundamental mechanism for preventing replication loops in bidirectional (Active-Active) setups:

MM2 recognizes that us-east.orders already originates from us-east and will not replicate it back, breaking any potential loop.

MM2 also automatically synchronizes:

Offset Translation Mechanism

Source cluster offsets and destination cluster offsets are independent. Source orders partition 0 at offset 100 may correspond to destination us-east.orders partition 0 at offset 95 (some messages may have been compacted or the destination started at a different point).

MM2 maintains this mapping through its Checkpoint mechanism:

Internal topic: us-east.checkpoints.internal (on destination cluster)
Record format:
  Key:   <consumer-group-id> | <topic> | <partition>
  Value: <source-offset> | <destination-offset> | <leader-epoch> | <metadata>

During failover, the RemoteClusterUtils API reads this mapping to translate consumer group offsets from source to destination equivalents, enabling consumers to resume from exactly where they left off.

Complete MM2 Deployment Configuration

Standalone Process Deployment

# mm2.properties โ€” MirrorMaker 2 configuration
# Define cluster aliases and their connection details
clusters = us-east, us-west

us-east.bootstrap.servers = kafka-east-1:9092,kafka-east-2:9092,kafka-east-3:9092
us-west.bootstrap.servers = kafka-west-1:9092,kafka-west-2:9092,kafka-west-3:9092

# Replication direction: us-east โ†’ us-west (Active-Passive)
us-east->us-west.enabled = true
us-west->us-east.enabled = false    # Disable for Active-Passive

# Topic filtering (regex)
us-east->us-west.topics = orders.*, payments.*, inventory.*
us-east->us-west.topics.blacklist = .*\.internal, __consumer_offsets

# Consumer group offset synchronization
us-east->us-west.groups = payment-consumer-group, order-processor-group
us-east->us-west.groups.blacklist = console-consumer-.*

# Enable checkpoint-based offset sync
us-east->us-west.sync.group.offsets.enabled = true
us-east->us-west.sync.group.offsets.interval.seconds = 60

# Heartbeat monitoring
us-east->us-west.emit.heartbeats.enabled = true
us-east->us-west.emit.heartbeats.interval.seconds = 5

# Replication factor for replicated topics on destination
us-east->us-west.replication.factor = 3

# Topic naming policy (DefaultReplicationPolicy adds source alias prefix)
# This is critical for Active-Active to prevent replication loops
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy

# MM2 internal topics (stored on destination cluster)
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3

# Kafka Connect distributed mode storage topics
config.storage.topic = mm2-configs
offset.storage.topic = mm2-offsets
status.storage.topic = mm2-status
config.storage.replication.factor = 3
offset.storage.replication.factor = 3
status.storage.replication.factor = 3

Start MM2:

connect-mirror-maker.sh mm2.properties

Kubernetes Deployment

# kubernetes/mm2-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mirrormaker2
  namespace: kafka
spec:
  replicas: 3    # 3 workers for load distribution and fault tolerance
  selector:
    matchLabels:
      app: mirrormaker2
  template:
    metadata:
      labels:
        app: mirrormaker2
    spec:
      containers:
        - name: mirrormaker2
          image: apache/kafka:3.7.0
          command:
            - connect-mirror-maker.sh
            - /etc/mm2/mm2.properties
          env:
            - name: KAFKA_HEAP_OPTS
              value: "-Xms512m -Xmx1g"
          volumeMounts:
            - name: mm2-config
              mountPath: /etc/mm2
              readOnly: true
          resources:
            requests:
              cpu: "1"
              memory: "2Gi"
            limits:
              cpu: "2"
              memory: "4Gi"
          livenessProbe:
            httpGet:
              path: /connectors
              port: 8083
            initialDelaySeconds: 60
            periodSeconds: 30
      volumes:
        - name: mm2-config
          configMap:
            name: mm2-config

Monitoring replication lag:

# Check that heartbeats are being received on the destination cluster
kafka-console-consumer.sh \
  --bootstrap-server kafka-west-1:9092 \
  --topic us-east.heartbeats \
  --property print.timestamp=true \
  --max-messages 5

# Check MM2 replication lag via JMX
kafka-jmx.sh \
  --object-name "kafka.connect.mirror:type=MirrorSourceConnector,target=us-west,topic=orders,partition=0" \
  --attributes replication-latency-ms-avg,record-lag

Three Deployment Topologies

Topology 1: Active-Passive (Hot Standby)

Scenario: One primary data center (US-East) serves all production traffic. A standby data center (US-West) maintains a continuously updated replica. When the primary fails, traffic is manually redirected to the standby.

US-East (Primary)              US-West (Standby)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Producers โ†’ Kafka  โ”‚โ”€โ”€MM2โ”€โ”€โ†’โ”‚  Kafka (replica)    โ”‚
โ”‚  Consumers โ† Kafka  โ”‚        โ”‚  Ready to activate  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Advantages:

Disadvantages:

Configuration:

# Active-Passive is the default direction setting
us-east->us-west.enabled = true
us-west->us-east.enabled = false

Topology 2: Active-Active (Bidirectional)

Scenario: Both data centers simultaneously serve production traffic. Producers in each region write to their local cluster; MM2 replicates data bidirectionally so consumers in either region can access all data.

US-East (Active)               US-West (Active)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  MM2   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Producers โ†’ Kafka   โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ†’โ”‚ Kafka โ†’ Consumers   โ”‚
โ”‚ Consumers โ† Kafka   โ”‚โ†โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚ Kafka โ† Producers   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  MM2   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Bidirectional configuration:

us-east->us-west.enabled = true
us-west->us-east.enabled = true

# The alias prefix is essential here:
# Producer writes orders to us-east โ†’ MM2 replicates as us-east.orders on us-west
# Producer writes orders to us-west โ†’ MM2 replicates as us-west.orders on us-east
# us-east.orders on us-west is NOT replicated back to us-east (loop prevented)

The central challenge: message duplicates and conflicts

In Active-Active mode, the same business event can originate in both clusters independently. A user submitting an order while connected to the US-East cluster generates one orders record; a different user submitting an order via US-West generates another. After replication, both clusters contain both records โ€” which is correct. The challenge is when the same user action could reach both clusters (network routing failure, client retry to a different endpoint).

Resolution strategies:

  1. Globally unique message keys: Use UUIDs or composite keys that include a region identifier (e.g., us-east:order-12345). Consumers deduplicate by key.
  2. Last-write-wins semantics: For state updates, include a timestamp in the message and use it to select the most recent value. Requires reliable clock synchronization (NTP within 100ms).
  3. Idempotent consumers: Implement consumer logic that is safe to replay โ€” the same message applied twice produces the same result as applying it once.

Active-Active is appropriate for:

Active-Active is not appropriate for:

Topology 3: Aggregation (Many Edge โ†’ Central)

Scenario: Many edge clusters operate independently at the data source (factory floor, retail stores, IoT gateways). Their data flows unidirectionally to a central cluster for unified storage, analytics, and processing.

Factory-A โ”€โ”€MM2โ”€โ”€โ”
Factory-B โ”€โ”€MM2โ”€โ”€โ”ผโ”€โ”€โ†’ Central Cluster โ†’ Data Lake / Analytics
Factory-C โ”€โ”€MM2โ”€โ”€โ”˜

Configuration for the central collector:

# mm2.properties โ€” aggregation topology
clusters = factory-a, factory-b, factory-c, central

factory-a.bootstrap.servers = factory-a-kafka:9092
factory-b.bootstrap.servers = factory-b-kafka:9092
factory-c.bootstrap.servers = factory-c-kafka:9092
central.bootstrap.servers = central-kafka-1:9092,central-kafka-2:9092,central-kafka-3:9092

factory-a->central.enabled = true
factory-b->central.enabled = true
factory-c->central.enabled = true

# No data flows from central back to edge clusters
central->factory-a.enabled = false
central->factory-b.enabled = false
central->factory-c.enabled = false

# Replicate sensor data from all factories
factory-a->central.topics = sensor-data, production-events
factory-b->central.topics = sensor-data, production-events
factory-c->central.topics = sensor-data, production-events

The central cluster receives topics named:

A Kafka Streams application can merge these into a unified stream, or the analytics layer can consume each separately.

Active-Passive Failover Runbook

The following is the standard operational procedure for failing over from a failed primary cluster to the standby cluster.

Step 1: Confirm Primary Failure; Stop Producers

# Attempt to connect to the primary cluster
kafka-broker-api-versions.sh \
  --bootstrap-server kafka-east-1:9092 \
  --command-config client.properties
# If this times out or fails, the cluster is unreachable

# Trigger circuit breaker / feature flag in application configuration
# This stops producers from attempting to write to the primary
# Allow in-flight requests to time out (typically 30-60 seconds)
echo "Primary cluster confirmed unreachable at $(date)"

Step 2: Verify MM2 Has Caught Up

# Check the most recent heartbeat timestamp on the standby cluster
# If heartbeats stopped updating, the primary was completely unavailable
# during that window โ€” records after the last heartbeat may be lost
kafka-console-consumer.sh \
  --bootstrap-server kafka-west-1:9092 \
  --topic us-east.heartbeats \
  --property print.timestamp=true \
  --from-beginning \
  --max-messages 20 | tail -5

# Check record lag on the MM2 connectors
# Zero lag means the standby has caught up to the last record received from primary
kafka-jmx.sh \
  --object-name "kafka.connect.mirror:type=MirrorSourceConnector" \
  --attributes record-lag

Step 3: Translate Consumer Group Offsets

This is the most technically precise step. Consumer offsets from the source cluster cannot be used directly on the destination cluster โ€” they must be translated using the mapping stored in the MM2 checkpoint topic.

Programmatic translation using RemoteClusterUtils (recommended for automation):

import org.apache.kafka.connect.mirror.RemoteClusterUtils;

Map<String, Object> drClusterConfig = new HashMap<>();
drClusterConfig.put("bootstrap.servers", "kafka-west-1:9092");
drClusterConfig.put("security.protocol", "SASL_SSL");
// ... additional auth config

// Translate the consumer group's source offsets to destination equivalents
Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
    RemoteClusterUtils.translateOffsets(
        drClusterConfig,
        "us-east",                  // Source cluster alias (matches mm2.properties)
        "payment-consumer-group",   // Consumer group to translate
        Duration.ofSeconds(30)      // Timeout for reading checkpoint topic
    );

System.out.println("Translated offsets: " + translatedOffsets);

// Commit the translated offsets to the DR cluster
try (AdminClient adminClient = AdminClient.create(drClusterConfig)) {
    adminClient.alterConsumerGroupOffsets(
        "payment-consumer-group",
        translatedOffsets
    ).all().get(30, TimeUnit.SECONDS);
    System.out.println("Offsets committed successfully to DR cluster");
}

Command-line translation for manual failovers:

# Read the latest checkpoint for the consumer group
kafka-console-consumer.sh \
  --bootstrap-server kafka-west-1:9092 \
  --topic us-east.checkpoints.internal \
  --property print.key=true \
  --property print.value=true \
  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
  --from-beginning \
  --max-messages 10000 \
  | grep "payment-consumer-group" \
  | tail -20

# Apply the translated offsets (adjust partition:offset pairs based on checkpoint output)
kafka-consumer-groups.sh \
  --bootstrap-server kafka-west-1:9092 \
  --group payment-consumer-group \
  --topic us-east.orders:0:95 \    # Partition 0 โ†’ offset 95 (from checkpoint)
  --reset-offsets \
  --execute

Step 4: Redirect Producers and Consumers to DR Cluster

# Update application configuration
# Change bootstrap.servers from kafka-east-1:9092 to kafka-west-1:9092
# This can be done via:
#   - Kubernetes ConfigMap + rolling restart
#   - Configuration management system (Consul, etcd)
#   - Environment variable update + pod restart

# Verify consumers are running on the DR cluster from the correct offsets
kafka-consumer-groups.sh \
  --bootstrap-server kafka-west-1:9092 \
  --group payment-consumer-group \
  --describe

# Monitor for any lag increase in the first minutes after failover
watch -n 10 "kafka-consumer-groups.sh \
  --bootstrap-server kafka-west-1:9092 \
  --group payment-consumer-group \
  --describe"

RPO and RTO Analysis

RPO (Recovery Point Objective โ€” maximum data loss):

RTO (Recovery Time Objective โ€” time to restore service):

Confluent Cluster Linking vs MirrorMaker 2

Feature MirrorMaker 2 Confluent Cluster Linking
License Apache 2.0 (open source) Confluent commercial only
Replication latency Seconds to minutes Sub-second (typically < 1s)
Implementation Consumer + Producer Reads directly from leader (like an internal follower)
Offset preservation Requires translation via checkpoints Original offsets preserved โ€” no translation needed
Consumer group sync Via checkpoint connector, minutes of delay Automatic, millisecond-level
Topic naming Default: adds source alias prefix Optional; supports transparent mirroring
Availability Any Kafka environment Confluent Cloud or Confluent Platform

Cluster Linking's offset preservation is transformative for failover operations. The offset translation step โ€” the most complex and error-prone part of MM2-based failover โ€” is completely eliminated. Consumers simply reconnect to the destination cluster and continue from the same offsets. The cost is a Confluent commercial subscription.

For organizations running on Confluent Cloud, Cluster Linking is the clear choice. For open-source Kafka deployments, MM2 remains the standard.

Disaster Recovery Testing: Quarterly Failover Drills

An untested DR plan is not a DR plan โ€” it is a document that creates false confidence. The Netflix approach to resilience engineering offers the right mindset: proactively inject failures before they happen spontaneously, during controlled conditions, so you can learn and improve.

Quarterly Failover Drill Template

Quarterly DR Drill Checklist

Pre-drill (1 week before):
  โ–ก Select drill window (off-peak: Saturday 02:00-05:00 local)
  โ–ก Notify stakeholders (on-call, product team, SRE)
  โ–ก Define scope (which consumer groups will participate)
  โ–ก Confirm MM2 replication is healthy and lag is near zero
  โ–ก Verify DR cluster has sufficient capacity

During drill:
  โ–ก Record start time
  โ–ก Stop selected consumer group on primary
  โ–ก Wait for MM2 to catch up (verify via heartbeats + record-lag)
  โ–ก Execute offset translation (time this step)
  โ–ก Start consumer group on DR cluster
  โ–ก Verify: no messages skipped, no messages duplicated
  โ–ก Record actual RTO (from consumer group stop to verified DR consumption)

Post-drill:
  โ–ก Fail back to primary cluster (reverse the procedure)
  โ–ก Verify primary consumer group resumes correctly
  โ–ก Document: expected RTO vs actual RTO, any friction encountered
  โ–ก Update runbook with lessons learned

Automated Failover Validation Script

#!/bin/bash
# dr-validation.sh โ€” Run after every failover to validate correctness

set -e

DR_CLUSTER="${1:?Usage: dr-validation.sh <dr-bootstrap-servers> <consumer-group>}"
CONSUMER_GROUP="${2:?}"

echo "=== DR Failover Validation ==="
echo "Cluster: $DR_CLUSTER"
echo "Consumer Group: $CONSUMER_GROUP"
echo "Time: $(date -u)"
echo ""

# 1. Verify consumer group is present and active on DR cluster
echo "--- Consumer Group Status ---"
kafka-consumer-groups.sh \
  --bootstrap-server "$DR_CLUSTER" \
  --group "$CONSUMER_GROUP" \
  --describe

# 2. Check that lag is not exploding (consumer is making progress)
echo ""
echo "--- Lag Trend (checking twice, 30s apart) ---"
lag1=$(kafka-consumer-groups.sh \
  --bootstrap-server "$DR_CLUSTER" \
  --group "$CONSUMER_GROUP" \
  --describe 2>/dev/null \
  | awk 'NR>1 {sum += $5} END {print sum}')
echo "Lag at T+0: $lag1"

sleep 30

lag2=$(kafka-consumer-groups.sh \
  --bootstrap-server "$DR_CLUSTER" \
  --group "$CONSUMER_GROUP" \
  --describe 2>/dev/null \
  | awk 'NR>1 {sum += $5} END {print sum}')
echo "Lag at T+30s: $lag2"

if [ "$lag2" -lt "$lag1" ]; then
  echo "PASS: Lag is decreasing โ€” consumer is catching up"
elif [ "$lag2" -eq "$lag1" ]; then
  echo "WARN: Lag is stable โ€” consumer may be keeping pace but not catching up"
else
  echo "FAIL: Lag is increasing โ€” consumer is falling behind on DR cluster"
  exit 1
fi

echo ""
echo "=== Validation complete ==="

Summary: Choosing Your Disaster Recovery Architecture

The right cross-cluster replication topology depends on the trade-offs your business can accept:

Dimension Active-Passive Active-Active Aggregation
RPO Seconds to minutes Near zero (dual-write) N/A (unidirectional)
RTO 5-30 minutes (manual) Near zero (automatic) N/A
Cost Medium (idle standby) High (double resources) Low (lightweight edges)
Operational complexity Low High (conflict resolution) Medium
Best for General DR Extreme availability Edge data collection

No single architecture fits all scenarios. Business-critical systems (payments, core transactions) justify the complexity of Active-Active for near-zero RTO. Standard production services achieve a good cost/reliability balance with Active-Passive. Edge data collection is a natural fit for aggregation topology.

The most important principle, regardless of architecture choice: test your failover procedure regularly under realistic conditions, record the actual RTO achieved, and iteratively improve the runbook until the failover is routine. A DR plan that has never been executed will fail at the worst possible moment.

Rate this chapter
4.6  / 5  (3 ratings)

๐Ÿ’ฌ Comments