第 29 章

CDC 数据管道

CDC 变更数据捕获

CDC(Change Data Capture)通过监听数据库的变更日志,将每一行数据的增删改操作以事件流的形式实时传播到下游系统,是构建实时数据管道、异构数据同步、微服务解耦的核心基础设施。

什么是 CDC

传统的数据同步方式(定时全量/增量轮询)存在明显缺陷:延迟高、对源库压力大、难以捕获 DELETE 操作。CDC 通过解析事务日志,彻底解决了这些问题。

方式 延迟 能否捕获 DELETE 源库压力 复杂度
全量定时轮询 分钟级 不能 高(全表扫描)
时间戳增量轮询 秒级 不能
触发器方式 毫秒级 高(写放大)
CDC(日志解析) 毫秒级 极低

MySQL Binlog 原理

CDC 的核心是 MySQL 的 Binary Log(binlog),它记录了所有修改数据库状态的 SQL 语句或行级别变更事件。

Binlog 配置要求

# my.cnf 必须配置
server_id       = 1          # 主从复制/CDC 必须唯一
log_bin         = /var/log/mysql/mysql-bin.log
binlog_format   = ROW        # CDC 必须使用 ROW 格式
binlog_row_image = FULL      # 记录完整的前后镜像(默认)
expire_logs_days = 7         # 保留天数

# 验证
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';
SHOW MASTER STATUS;

为什么必须是 ROW 格式?

STATEMENT 格式只记录 SQL 语句,不确定性函数(NOW()、UUID())会导致下游数据不一致;ROW 格式记录每行的实际变更,是 CDC 的必要条件。

Binlog 事件结构


Binlog File
├── Format_desc_event  (文件头,包含 MySQL 版本)
├── Previous_gtids_event (GTID 集合)
├── Gtid_event  (gtid: server_uuid:seq_no)
├── Query_event (BEGIN)
├── Table_map_event (表元数据: db, table, column types)
├── Write_rows_event  (INSERT: 新行数据)
│   Update_rows_event (UPDATE: 旧行 + 新行)
│   Delete_rows_event (DELETE: 旧行数据)
└── Xid_event  (COMMIT + xid)

GTID 模式

生产环境强烈建议开启 GTID(Global Transaction ID),它为每个事务赋予全局唯一 ID,使 CDC 工具能精确断点续传,避免数据重复或丢失。

gtid_mode                = ON
enforce_gtid_consistency = ON

-- 查看当前 GTID 进度
SHOW MASTER STATUS\G
-- Executed_Gtid_Set: server_uuid:1-12345

Debezium 实战

Debezium 是 Red Hat 开源的 CDC 平台,基于 Kafka Connect 构建,是目前使用最广泛的 CDC 框架。


MySQL Binlog → Debezium MySQL Connector → Kafka Topic → Consumer
                      (Kafka Connect)
                          ↓
              Topic: dbserver.mydb.orders
              每条消息: {before, after, op, ts_ms, source}

部署 Debezium MySQL Connector

# 1. 为 Debezium 创建专用 MySQL 账号
CREATE USER 'debezium'@'%' IDENTIFIED BY 'StrongPass!123';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE,
      REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

# 2. 注册 Connector(通过 REST API)
curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "mysql-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "tasks.max": "1",
      "database.hostname": "mysql-host",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "StrongPass!123",
      "database.server.id": "184054",
      "topic.prefix": "dbserver",
      "database.include.list": "mydb",
      "table.include.list": "mydb.orders,mydb.inventory",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "schema-changes.mydb",
      "include.schema.changes": "true",
      "snapshot.mode": "initial",
      "heartbeat.interval.ms": "10000"
    }
  }'

消息格式解析

{
  "before": {"id": 1001, "qty": 10, "status": "NEW"},
  "after":  {"id": 1001, "qty": 5,  "status": "FILLED"},
  "op":     "u",   // c=INSERT, u=UPDATE, d=DELETE, r=READ(snapshot)
  "ts_ms":  1700000000000,
  "source": {
    "version": "2.4.0.Final",
    "connector": "mysql",
    "db": "mydb",
    "table": "orders",
    "server_id": 1,
    "gtid": "server-uuid:12345",
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "snapshot": false
  }
}

重要配置项说明

配置项 推荐值 说明
snapshot.mode initial 首次运行做全量快照,之后增量
snapshot.isolation.mode repeatable_read 快照隔离级别
decimal.handling.mode string 避免 DECIMAL 精度丢失
time.precision.mode adaptive_time_microseconds 时间精度
tombstones.on.delete true DELETE 后发送 null 消息(Log Compaction 用)
heartbeat.interval.ms 10000 低流量时维持 binlog 位置

Canal 实战

Canal 是阿里巴巴开源的 MySQL Binlog 订阅组件,在国内互联网企业广泛使用,轻量、高性能,适合需要定制化的场景。


MySQL Master ←──── Canal Server (伪装成从库)
                        ↓
              Canal Client / MQ Adapter
                   /           \
          RocketMQ           Kafka/Redis

Canal Server 配置

# conf/canal.properties
canal.serverMode = kafka   # tcp | kafka | rocketMQ | rabbitMQ

# kafka 模式配置
canal.mq.servers = kafka:9092
canal.mq.topic   = canal-topic
canal.mq.partition = 0

# conf/example/instance.properties
canal.instance.master.address = mysql-host:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal_pass
canal.instance.filter.regex = mydb\\..*   # 监听所有表

# 启动
./bin/startup.sh

Canal Client 消费(Java)

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("canal-server", 11111), "example", "", "");
connector.connect();
connector.subscribe("mydb\\.orders");

while (true) {
    Message message = connector.getWithoutAck(100); // 批量拉取
    long batchId = message.getId();
    for (CanalEntry.Entry entry : message.getEntries()) {
        if (entry.getEntryType() != EntryType.ROWDATA) continue;
        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
        for (RowData rowData : rowChange.getRowDatasList()) {
            if (rowChange.getEventType() == EventType.UPDATE) {
                // rowData.getBeforeColumnsList() → 变更前
                // rowData.getAfterColumnsList()  → 变更后
                processUpdate(rowData);
            }
        }
    }
    connector.ack(batchId); // 确认消费
}

Kafka 全链路数据管道


MySQL → Debezium/Canal → Kafka → Stream Processing → Sink
                                    Flink / Spark
                                         ↓
                              ┌──────────────────────┐
                              │  Elasticsearch (搜索) │
                              │  ClickHouse (OLAP)    │
                              │  Redis (缓存失效)     │
                              │  另一个 MySQL (同步)  │
                              └──────────────────────┘

-- Flink SQL 直接消费 Debezium 格式
CREATE TABLE orders_cdc (
    id        BIGINT,
    qty       INT,
    status    STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector'        = 'kafka',
    'topic'            = 'dbserver.mydb.orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'           = 'debezium-json',
    'scan.startup.mode'= 'earliest-offset'
);

-- 实时同步到 ClickHouse
INSERT INTO orders_olap SELECT * FROM orders_cdc;

用 CDC 做缓存失效(Go)

// 消费 Kafka 中的 CDC 事件,精准删除缓存
func consumeCDCEvents(consumer *kafka.Consumer, rdb *redis.Client) {
    for {
        msg, _ := consumer.ReadMessage(-1)
        var event CDCEvent
        json.Unmarshal(msg.Value, &event)

        if event.Table == "products" && (event.Op == "u" || event.Op == "d") {
            key := fmt.Sprintf("product:%d", event.After.ID)
            rdb.Del(context.Background(), key)
            log.Printf("cache invalidated: %s", key)
        }
        consumer.CommitMessage(msg)
    }
}

典型应用场景

异构数据库同步 MySQL → Elasticsearch 全文搜索索引实时同步,用户搜索商品时走 ES,写操作走 MySQL,CDC 保持数据一致。

缓存精准失效 通过 CDC 监听数据库变更,精准删除对应 Redis 缓存 Key,彻底解决缓存与数据库不一致问题,比应用层主动删除更可靠。

数据仓库实时入库 MySQL → Kafka → Flink → ClickHouse/Doris,实现 T+0 的实时报表,替代原来 T+1 的 ETL 批处理流程。

微服务事件溯源 Outbox Pattern:业务写数据库时顺带写 outbox 表,CDC 捕获 outbox 变更发布到消息队列,实现事务性消息投递,无需分布式事务。

零停机数据库迁移 双写迁移阶段,用 CDC 保证新旧库数据同步,验证一致性后切流,迁移期间服务不停机。

一致性保障

At-Least-Once vs Exactly-Once

CDC 管道默认提供 At-Least-Once 语义(故障恢复后可能重发)。要实现 Exactly-Once 需要下游做幂等处理。

-- 幂等消费示例:使用 binlog position 或 GTID 去重
CREATE TABLE cdc_offset (
    consumer_id VARCHAR(64) PRIMARY KEY,
    gtid_set    TEXT,
    updated_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 消费前检查,消费后更新(在同一事务中)
BEGIN;
SELECT gtid_set FROM cdc_offset WHERE consumer_id = 'my-consumer' FOR UPDATE;
-- 处理事件...
INSERT INTO target_table ...;
UPDATE cdc_offset SET gtid_set = ? WHERE consumer_id = 'my-consumer';
COMMIT;

Schema 演进处理

DDL 变更是 CDC 最大的挑战:加列、删列、改类型都可能导致消费者解析失败。使用 Schema Registry(Confluent Schema Registry + Avro)可以管理 Schema 版本,下游消费者能优雅处理字段变更。

运维要点

问题 排查/处理方法
Connector 延迟升高 查看 kafka-lag,检查 MySQL binlog 写入速率,增加 Connector 批量大小
binlog 被提前清理 增大 expire_logs_days,或配置 binlog_expire_logs_seconds;设置 heartbeat 保持位置
快照超时/OOM 使用 chunk 模式快照(Debezium 2.x);限制 snapshot.fetch.size
重复消费 检查 ack 机制,确认下游幂等处理,核对 offset 提交时机
主从切换后断连 Debezium 支持 GTID 自动重连;Canal 需要手动更新 master 地址

监控指标

-- 查看 Debezium Connector 状态
GET /connectors/mysql-connector/status

-- 关键 JMX 指标
debezium.mysql:type=connector-metrics,context=snapshot  → SnapshotCompleted
debezium.mysql:type=connector-metrics,context=streaming → MilliSecondsBehindSource
debezium.mysql:type=connector-metrics,context=streaming → TotalNumberOfEventsSeen

-- Canal 监控
canal_instance_parser_fakeCanalLogPosition (位点延迟)
canal_instance_store_put_seq  (写入序号)
canal_instance_store_get_seq  (消费序号)

最佳实践:生产环境 CDC 应从从库(Replica)读取 binlog 而非主库,既不增加主库压力,又能提供独立的故障隔离点。确保从库的 log_slave_updates=ON,让从库自己也写 binlog。

本章评分
4.9  / 5  (3 评分)

💬 留言讨论