CDC 数据管道
CDC 变更数据捕获
CDC(Change Data Capture)通过监听数据库的变更日志,将每一行数据的增删改操作以事件流的形式实时传播到下游系统,是构建实时数据管道、异构数据同步、微服务解耦的核心基础设施。
什么是 CDC
传统的数据同步方式(定时全量/增量轮询)存在明显缺陷:延迟高、对源库压力大、难以捕获 DELETE 操作。CDC 通过解析事务日志,彻底解决了这些问题。
| 方式 | 延迟 | 能否捕获 DELETE | 源库压力 | 复杂度 |
|---|---|---|---|---|
| 全量定时轮询 | 分钟级 | 不能 | 高(全表扫描) | 低 |
| 时间戳增量轮询 | 秒级 | 不能 | 中 | 低 |
| 触发器方式 | 毫秒级 | 能 | 高(写放大) | 高 |
| CDC(日志解析) | 毫秒级 | 能 | 极低 | 中 |
MySQL Binlog 原理
CDC 的核心是 MySQL 的 Binary Log(binlog),它记录了所有修改数据库状态的 SQL 语句或行级别变更事件。
Binlog 配置要求
# my.cnf 必须配置
server_id = 1 # 主从复制/CDC 必须唯一
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW # CDC 必须使用 ROW 格式
binlog_row_image = FULL # 记录完整的前后镜像(默认)
expire_logs_days = 7 # 保留天数
# 验证
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';
SHOW MASTER STATUS;
为什么必须是 ROW 格式?
STATEMENT 格式只记录 SQL 语句,不确定性函数(NOW()、UUID())会导致下游数据不一致;ROW 格式记录每行的实际变更,是 CDC 的必要条件。
Binlog 事件结构
Binlog File
├── Format_desc_event (文件头,包含 MySQL 版本)
├── Previous_gtids_event (GTID 集合)
├── Gtid_event (gtid: server_uuid:seq_no)
├── Query_event (BEGIN)
├── Table_map_event (表元数据: db, table, column types)
├── Write_rows_event (INSERT: 新行数据)
│ Update_rows_event (UPDATE: 旧行 + 新行)
│ Delete_rows_event (DELETE: 旧行数据)
└── Xid_event (COMMIT + xid)
GTID 模式
生产环境强烈建议开启 GTID(Global Transaction ID),它为每个事务赋予全局唯一 ID,使 CDC 工具能精确断点续传,避免数据重复或丢失。
gtid_mode = ON
enforce_gtid_consistency = ON
-- 查看当前 GTID 进度
SHOW MASTER STATUS\G
-- Executed_Gtid_Set: server_uuid:1-12345
Debezium 实战
Debezium 是 Red Hat 开源的 CDC 平台,基于 Kafka Connect 构建,是目前使用最广泛的 CDC 框架。
MySQL Binlog → Debezium MySQL Connector → Kafka Topic → Consumer
(Kafka Connect)
↓
Topic: dbserver.mydb.orders
每条消息: {before, after, op, ts_ms, source}
部署 Debezium MySQL Connector
# 1. 为 Debezium 创建专用 MySQL 账号
CREATE USER 'debezium'@'%' IDENTIFIED BY 'StrongPass!123';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE,
REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
# 2. 注册 Connector(通过 REST API)
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "StrongPass!123",
"database.server.id": "184054",
"topic.prefix": "dbserver",
"database.include.list": "mydb",
"table.include.list": "mydb.orders,mydb.inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mydb",
"include.schema.changes": "true",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000"
}
}'
消息格式解析
{
"before": {"id": 1001, "qty": 10, "status": "NEW"},
"after": {"id": 1001, "qty": 5, "status": "FILLED"},
"op": "u", // c=INSERT, u=UPDATE, d=DELETE, r=READ(snapshot)
"ts_ms": 1700000000000,
"source": {
"version": "2.4.0.Final",
"connector": "mysql",
"db": "mydb",
"table": "orders",
"server_id": 1,
"gtid": "server-uuid:12345",
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": false
}
}
重要配置项说明
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| snapshot.mode | initial | 首次运行做全量快照,之后增量 |
| snapshot.isolation.mode | repeatable_read | 快照隔离级别 |
| decimal.handling.mode | string | 避免 DECIMAL 精度丢失 |
| time.precision.mode | adaptive_time_microseconds | 时间精度 |
| tombstones.on.delete | true | DELETE 后发送 null 消息(Log Compaction 用) |
| heartbeat.interval.ms | 10000 | 低流量时维持 binlog 位置 |
Canal 实战
Canal 是阿里巴巴开源的 MySQL Binlog 订阅组件,在国内互联网企业广泛使用,轻量、高性能,适合需要定制化的场景。
MySQL Master ←──── Canal Server (伪装成从库)
↓
Canal Client / MQ Adapter
/ \
RocketMQ Kafka/Redis
Canal Server 配置
# conf/canal.properties
canal.serverMode = kafka # tcp | kafka | rocketMQ | rabbitMQ
# kafka 模式配置
canal.mq.servers = kafka:9092
canal.mq.topic = canal-topic
canal.mq.partition = 0
# conf/example/instance.properties
canal.instance.master.address = mysql-host:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal_pass
canal.instance.filter.regex = mydb\\..* # 监听所有表
# 启动
./bin/startup.sh
Canal Client 消费(Java)
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("canal-server", 11111), "example", "", "");
connector.connect();
connector.subscribe("mydb\\.orders");
while (true) {
Message message = connector.getWithoutAck(100); // 批量拉取
long batchId = message.getId();
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() != EntryType.ROWDATA) continue;
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == EventType.UPDATE) {
// rowData.getBeforeColumnsList() → 变更前
// rowData.getAfterColumnsList() → 变更后
processUpdate(rowData);
}
}
}
connector.ack(batchId); // 确认消费
}
Kafka 全链路数据管道
MySQL → Debezium/Canal → Kafka → Stream Processing → Sink
Flink / Spark
↓
┌──────────────────────┐
│ Elasticsearch (搜索) │
│ ClickHouse (OLAP) │
│ Redis (缓存失效) │
│ 另一个 MySQL (同步) │
└──────────────────────┘
Flink CDC 消费 Kafka
-- Flink SQL 直接消费 Debezium 格式
CREATE TABLE orders_cdc (
id BIGINT,
qty INT,
status STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver.mydb.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode'= 'earliest-offset'
);
-- 实时同步到 ClickHouse
INSERT INTO orders_olap SELECT * FROM orders_cdc;
用 CDC 做缓存失效(Go)
// 消费 Kafka 中的 CDC 事件,精准删除缓存
func consumeCDCEvents(consumer *kafka.Consumer, rdb *redis.Client) {
for {
msg, _ := consumer.ReadMessage(-1)
var event CDCEvent
json.Unmarshal(msg.Value, &event)
if event.Table == "products" && (event.Op == "u" || event.Op == "d") {
key := fmt.Sprintf("product:%d", event.After.ID)
rdb.Del(context.Background(), key)
log.Printf("cache invalidated: %s", key)
}
consumer.CommitMessage(msg)
}
}
典型应用场景
①
异构数据库同步 MySQL → Elasticsearch 全文搜索索引实时同步,用户搜索商品时走 ES,写操作走 MySQL,CDC 保持数据一致。
②
缓存精准失效 通过 CDC 监听数据库变更,精准删除对应 Redis 缓存 Key,彻底解决缓存与数据库不一致问题,比应用层主动删除更可靠。
③
数据仓库实时入库 MySQL → Kafka → Flink → ClickHouse/Doris,实现 T+0 的实时报表,替代原来 T+1 的 ETL 批处理流程。
④
微服务事件溯源 Outbox Pattern:业务写数据库时顺带写 outbox 表,CDC 捕获 outbox 变更发布到消息队列,实现事务性消息投递,无需分布式事务。
⑤
零停机数据库迁移 双写迁移阶段,用 CDC 保证新旧库数据同步,验证一致性后切流,迁移期间服务不停机。
一致性保障
At-Least-Once vs Exactly-Once
CDC 管道默认提供 At-Least-Once 语义(故障恢复后可能重发)。要实现 Exactly-Once 需要下游做幂等处理。
-- 幂等消费示例:使用 binlog position 或 GTID 去重
CREATE TABLE cdc_offset (
consumer_id VARCHAR(64) PRIMARY KEY,
gtid_set TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 消费前检查,消费后更新(在同一事务中)
BEGIN;
SELECT gtid_set FROM cdc_offset WHERE consumer_id = 'my-consumer' FOR UPDATE;
-- 处理事件...
INSERT INTO target_table ...;
UPDATE cdc_offset SET gtid_set = ? WHERE consumer_id = 'my-consumer';
COMMIT;
Schema 演进处理
DDL 变更是 CDC 最大的挑战:加列、删列、改类型都可能导致消费者解析失败。使用 Schema Registry(Confluent Schema Registry + Avro)可以管理 Schema 版本,下游消费者能优雅处理字段变更。
运维要点
| 问题 | 排查/处理方法 |
|---|---|
| Connector 延迟升高 | 查看 kafka-lag,检查 MySQL binlog 写入速率,增加 Connector 批量大小 |
| binlog 被提前清理 | 增大 expire_logs_days,或配置 binlog_expire_logs_seconds;设置 heartbeat 保持位置 |
| 快照超时/OOM | 使用 chunk 模式快照(Debezium 2.x);限制 snapshot.fetch.size |
| 重复消费 | 检查 ack 机制,确认下游幂等处理,核对 offset 提交时机 |
| 主从切换后断连 | Debezium 支持 GTID 自动重连;Canal 需要手动更新 master 地址 |
监控指标
-- 查看 Debezium Connector 状态
GET /connectors/mysql-connector/status
-- 关键 JMX 指标
debezium.mysql:type=connector-metrics,context=snapshot → SnapshotCompleted
debezium.mysql:type=connector-metrics,context=streaming → MilliSecondsBehindSource
debezium.mysql:type=connector-metrics,context=streaming → TotalNumberOfEventsSeen
-- Canal 监控
canal_instance_parser_fakeCanalLogPosition (位点延迟)
canal_instance_store_put_seq (写入序号)
canal_instance_store_get_seq (消费序号)
最佳实践:生产环境 CDC 应从从库(Replica)读取 binlog 而非主库,既不增加主库压力,又能提供独立的故障隔离点。确保从库的 log_slave_updates=ON,让从库自己也写 binlog。