Kafka Connect 深入:框架原理与自定义 Connector
第26章:Kafka Connect 深入:框架原理与自定义 Connector
导读:Kafka Connect 如何标准化数据集成?
本章核心问题:Kafka Connect 如何标准化数据集成?
读完本章你将理解:
- Source/Sink Connector 架构
- Worker/Task/Converter 分层
- 自定义 Connector 开发
- 分布式容错与扩展
Level 1 · 你需要知道的(1-3年经验)
框架核心组件
Connector:配置管理者
Connector 类不执行任何实际的数据传输,它的职责是:
- 验证配置参数的合法性
- 根据
maxTasks参数决定如何分割工作(将数据源分成多个并行任务) - 为每个 Task 生成各自的配置
// Connector 的两个核心方法
public class MyDatabaseSourceConnector extends SourceConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
// 验证配置,建立连接测试(不持久保持连接)
validateConfig(props);
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// 假设要读取 10 张表,最多 maxTasks 个并行任务
List<String> tables = Arrays.asList(config.get("tables").split(","));
int numTasks = Math.min(maxTasks, tables.size());
// 把表列表均匀分配给每个 Task
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(config);
// 每个 Task 负责一个子集的表
taskConfig.put("tables",
String.join(",", tables.subList(
i * tables.size() / numTasks,
(i + 1) * tables.size() / numTasks
))
);
taskConfigs.add(taskConfig);
}
return taskConfigs;
}
@Override
public Class<? extends Task> taskClass() {
return MyDatabaseSourceTask.class;
}
@Override
public ConfigDef config() {
// 声明支持的配置项、类型、默认值和文档
return new ConfigDef()
.define("db.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"JDBC URL of the source database")
.define("tables", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"Comma-separated list of tables to replicate")
.define("poll.interval.ms", ConfigDef.Type.INT, 1000,
ConfigDef.Importance.MEDIUM, "Poll interval in milliseconds");
}
}
Task:数据搬运工
Task 是真正执行数据传输的单元。Source Task 从外部系统拉取数据并生成 SourceRecord 列表;Sink Task 从 Kafka 消费记录并写入外部系统。
public class MyDatabaseSourceTask extends SourceTask {
private Connection connection;
private String tables;
private long lastPollOffset;
@Override
public void start(Map<String, String> props) {
tables = props.get("tables");
connection = DriverManager.getConnection(props.get("db.url"));
// 恢复上次的处理位置(Connect 框架自动保存和传入)
Map<String, Object> offset = context.offsetStorageReader()
.offset(sourcePartition());
lastPollOffset = offset != null ?
(Long) offset.get("last_id") : 0L;
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
try (PreparedStatement stmt = connection.prepareStatement(
"SELECT id, data, created_at FROM my_table " +
"WHERE id > ? ORDER BY id LIMIT 1000")) {
stmt.setLong(1, lastPollOffset);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
long id = rs.getLong("id");
String data = rs.getString("data");
// Source partition:标识这条记录的来源(用于 offset 追踪)
Map<String, String> sourcePartition = Map.of("table", "my_table");
// Source offset:记录在源系统中的位置(框架用来恢复处理)
Map<String, Object> sourceOffset = Map.of("last_id", id);
records.add(new SourceRecord(
sourcePartition, // 来源标识
sourceOffset, // 当前处理位置
"my-kafka-topic", // 目标 Kafka topic
null, // 分区(null = 让 Kafka 决定)
Schema.STRING_SCHEMA, // key schema
String.valueOf(id), // key 值
Schema.STRING_SCHEMA, // value schema
data, // value 值
rs.getLong("created_at") // 时间戳
));
lastPollOffset = id;
}
} catch (SQLException e) {
throw new ConnectException("Failed to poll database", e);
}
if (records.isEmpty()) {
// 没有新数据时等待 poll interval,避免空转浪费 CPU
Thread.sleep(1000);
}
return records;
}
@Override
public void stop() {
try { connection.close(); } catch (SQLException ignored) {}
}
private Map<String, String> sourcePartition() {
return Map.of("table", "my_table");
}
}
Converter:数据格式的翻译官
Converter 负责在 Kafka 的二进制字节和 Java 对象(SourceRecord / SinkRecord)之间做转换。框架内置了几个常用 Converter:
JsonConverter:JSON 格式,可选带/不带 schemaAvroConverter:Avro 二进制格式,需要配合 Schema Registry 使用ProtobufConverter:Protocol Buffers 格式,需要 Schema RegistryByteArrayConverter:原始字节,不做任何转换StringConverter:UTF-8 字符串
Converter 在 Worker 级别配置(影响所有 Connector),也可以在 Connector 级别覆盖:
{
"connector.class": "...",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
REST API:动态管理 Connector
分布式模式下,所有 Connect 管理操作通过 REST API 完成:
# 列出所有 Connector
curl http://connect-worker:8083/connectors
# 创建 Connector
curl -X POST http://connect-worker:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-orders-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz_password",
"database.server.name": "production",
"database.include.list": "orders",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.orders",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}'
# 查询 Connector 状态
curl http://connect-worker:8083/connectors/mysql-orders-source/status
# 暂停 / 恢复 Connector
curl -X PUT http://connect-worker:8083/connectors/mysql-orders-source/pause
curl -X PUT http://connect-worker:8083/connectors/mysql-orders-source/resume
# 删除 Connector(停止后删除配置,offset 保留)
curl -X DELETE http://connect-worker:8083/connectors/mysql-orders-source
Debezium MySQL CDC Connector 深入
CDC(Change Data Capture)是 Kafka Connect 最重要的使用场景之一。Debezium MySQL Connector 通过读取 MySQL 的 binlog 来捕获数据变更,将 INSERT/UPDATE/DELETE 操作转换为 Kafka 消息。
Binlog 读取原理
Debezium 将自己伪装成 MySQL Replica,向 MySQL 主库注册为一个从库,然后从指定的 binlog 位置(file + position)开始接收 binlog events。初次启动时,Debezium 先对目标表做一次全量快照(SELECT *),然后再从快照时对应的 binlog 位置开始增量捕获,保证数据无遗漏。
消息结构
Debezium 产生的每条消息包含变更前后的完整行数据:
{
"before": {
"id": 123,
"amount": 99.99,
"status": "PENDING"
},
"after": {
"id": 123,
"amount": 99.99,
"status": "COMPLETED"
},
"source": {
"db": "orders",
"table": "orders",
"ts_ms": 1719820800000,
"file": "mysql-bin.000042",
"pos": 1234567,
"gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:23"
},
"op": "u", // c=CREATE, u=UPDATE, d=DELETE, r=READ(snapshot)
"ts_ms": 1719820800100
}
before 字段需要 MySQL 开启 binlog_row_image=FULL(默认),否则 UPDATE 只有 after。
SMT(Single Message Transform)链
SMT 是 Kafka Connect 提供的消息级别的轻量转换机制,在数据从 Source 写入 Kafka(或从 Kafka 写入 Sink)时执行,无需单独的处理层:
{
"transforms": "unwrap,maskSensitive,routeByStatus",
"transforms.unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.maskSensitive.type":
"org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskSensitive.fields": "credit_card_number,ssn",
"transforms.maskSensitive.replacement": "***MASKED***",
"transforms.routeByStatus.type":
"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.routeByStatus.fields": "status"
}
常用 SMT:
ExtractNewRecordState(Debezium 专用):从 Debezium 的{before, after, op}结构提取after字段,简化下游处理MaskField:用固定字符串覆盖指定字段(脱敏)ReplaceField:重命名、包含或排除字段TimestampConverter:转换时间戳格式HoistField:将整个消息值包装到某个字段下ValueToKey:用 value 中的字段覆盖 key(用于 topic 分区路由)
SMT 是轻量的,但不适合复杂逻辑(如 Join、聚合、条件分支)。复杂转换应该在 Kafka Streams 或 ksqlDB 中完成。
自定义 Source Connector 完整实现
以下是一个读取 REST API 数据的 Source Connector 完整示例:
// Connector 类:配置和任务分割
public class RestApiSourceConnector extends SourceConnector {
public static final String BASE_URL_CONFIG = "rest.api.url";
public static final String ENDPOINT_CONFIG = "endpoints";
public static final String POLL_INTERVAL_CONFIG = "poll.interval.ms";
public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
config = props;
// 验证 REST API 可达性
String url = props.get(BASE_URL_CONFIG);
if (url == null || url.isBlank()) {
throw new ConfigException(BASE_URL_CONFIG, url, "Must not be blank");
}
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
String[] endpoints = config.get(ENDPOINT_CONFIG).split(",");
int numTasks = Math.min(maxTasks, endpoints.length);
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
// 每个 Task 负责一组 endpoints
Map<String, String> taskConfig = new HashMap<>(config);
List<String> endpointSlice = new ArrayList<>();
for (int j = i; j < endpoints.length; j += numTasks) {
endpointSlice.add(endpoints[j].trim());
}
taskConfig.put(ENDPOINT_CONFIG, String.join(",", endpointSlice));
taskConfigs.add(taskConfig);
}
return taskConfigs;
}
@Override
public Class<? extends Task> taskClass() {
return RestApiSourceTask.class;
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(BASE_URL_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "Base URL of the REST API")
.define(ENDPOINT_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "Comma-separated API endpoints")
.define(POLL_INTERVAL_CONFIG, ConfigDef.Type.INT, 5000,
ConfigDef.Importance.MEDIUM, "Poll interval in ms")
.define(TOPIC_PREFIX_CONFIG, ConfigDef.Type.STRING, "",
ConfigDef.Importance.LOW, "Prefix for Kafka topic names");
}
@Override
public String version() {
return "1.0.0";
}
@Override
public void stop() { }
}
// Task 类:实际执行数据拉取
public class RestApiSourceTask extends SourceTask {
private String baseUrl;
private List<String> endpoints;
private int pollIntervalMs;
private String topicPrefix;
private HttpClient httpClient;
private ObjectMapper objectMapper;
@Override
public void start(Map<String, String> props) {
baseUrl = props.get(RestApiSourceConnector.BASE_URL_CONFIG);
endpoints = Arrays.asList(
props.get(RestApiSourceConnector.ENDPOINT_CONFIG).split(","));
pollIntervalMs = Integer.parseInt(
props.get(RestApiSourceConnector.POLL_INTERVAL_CONFIG));
topicPrefix = props.getOrDefault(
RestApiSourceConnector.TOPIC_PREFIX_CONFIG, "");
httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
objectMapper = new ObjectMapper();
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
for (String endpoint : endpoints) {
Map<String, String> partition = Map.of("endpoint", endpoint);
// 读取该 endpoint 上次处理到的 cursor
Map<String, Object> storedOffset = context
.offsetStorageReader()
.offset(partition);
String cursor = storedOffset != null ?
(String) storedOffset.get("cursor") : null;
try {
String url = baseUrl + endpoint +
(cursor != null ? "?after=" + cursor : "");
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Accept", "application/json")
.timeout(Duration.ofSeconds(30))
.GET()
.build();
HttpResponse<String> response = httpClient.send(
request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
// 非致命错误:记录日志,跳过本次轮询
log.warn("Got HTTP {} for {}", response.statusCode(), url);
continue;
}
JsonNode root = objectMapper.readTree(response.body());
JsonNode items = root.get("items");
String nextCursor = root.path("next_cursor").asText(null);
for (JsonNode item : items) {
String id = item.get("id").asText();
Map<String, Object> offset = new HashMap<>();
offset.put("cursor", nextCursor != null ? nextCursor : id);
records.add(new SourceRecord(
partition,
offset,
topicPrefix + endpoint.replace("/", "-").substring(1),
null,
Schema.STRING_SCHEMA,
id,
Schema.STRING_SCHEMA,
item.toString()
));
}
} catch (IOException e) {
// 网络错误:不抛出异常(会导致 Task 重启),而是记录并继续
log.error("Failed to poll endpoint {}: {}", endpoint, e.getMessage());
}
}
if (records.isEmpty()) {
Thread.sleep(pollIntervalMs);
}
return records;
}
@Override
public void stop() {
// HttpClient 无需显式关闭
}
@Override
public String version() {
return "1.0.0";
}
}
Level 2 · 它是怎么运行的(3-5年经验)
本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。
Level 3 · 规范怎么定义的(资深)
错误处理:死信队列
Sink Connector 在写入目标系统时,可能因为数据格式错误、目标系统限制等原因处理失败。默认行为是 Task 失败并需要人工干预。生产环境中推荐配置死信队列(DLQ):
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "connect-dlq",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
errors.tolerance=all:遇到错误时跳过该记录(而不是停止 Task)errors.deadletterqueue.topic.name:失败的记录写入此 topicerrors.deadletterqueue.context.headers.enable:在 DLQ 消息的 header 中附加错误信息(错误类型、栈轨迹、原始 topic/partition/offset)
Level 4 · 边界与陷阱(所有人)
Connect 框架的设计哲学
在 Kafka Connect 出现之前,将外部系统的数据移入或移出 Kafka 是每个团队各自为战的工程问题:有人写 Python 脚本轮询数据库,有人用定时任务读 CSV 文件,有人维护手写的 Kafka Producer。这些方案都有相同的缺陷:缺乏统一的错误处理、监控、重试和流量控制机制,也难以在团队间复用。
Kafka Connect 的设计目标是提供一个可插拔的数据集成框架,让数据移动的通用问题(连接管理、offset 追踪、错误重试、并行度控制)由框架负责,开发者只需要关注特定系统的数据访问逻辑。这个框架的核心抽象层次非常清晰:
Connector(配置 + 任务分割策略)
↓
Task(实际的数据移动执行单元)
↓
Converter(数据格式序列化/反序列化)
↓
Kafka(中间的持久化层)
独立模式 vs 分布式模式
独立模式(Standalone Mode)
单进程运行,配置存储在本地文件,offset 也存储在本地文件。适合开发和测试:
connect-standalone.sh connect-standalone.properties \
my-source-connector.properties \
my-sink-connector.properties
独立模式的 Connector 不具备容错性——进程崩溃,所有 Connector 停止,必须手动重启。
分布式模式(Distributed Mode)
多个 Worker 进程组成一个 Connect 集群,通过 Kafka 的消费者组协议协调。这是生产环境的唯一推荐选择。
分布式模式的三个关键内部 topic:
config.storage.topic = connect-configs # Connector 和 Task 的配置
offset.storage.topic = connect-offsets # 每个 Task 的处理进度(Source offset)
status.storage.topic = connect-statuses # Connector 和 Task 的状态
这三个 topic 是高度压缩和高复制因子(建议 replication.factor=3)的。它们是 Connect 集群的"数据库"——Worker 重启后从这里恢复配置和 offset。
Worker 之间的协调使用 Kafka 消费者组协议(Group ID = group.id 配置项)。当 Worker 加入或离开时,触发 rebalance,Task 在存活的 Worker 间重新分配。Connect 的 rebalance 要求所有 Task 停止并重新分配,这在 Worker 数量变化时会造成短暂的处理中断(Eager Rebalance)。Connect 3.x 开始支持 Incremental Cooperative Rebalance,减少中断时间。
监控:JMX 指标
Connect 通过 JMX 暴露丰富的运行时指标,关键指标包括:
# Connector 级别
kafka.connect:type=connector-metrics,connector=<name>
- connector-total-task-count # 总 Task 数
- connector-running-task-count # 运行中 Task 数
- connector-failed-task-count # 失败 Task 数
# Task 级别(Source Task)
kafka.connect:type=source-task-metrics,connector=<name>,task=<id>
- source-record-poll-rate # 每秒从源拉取的记录数
- source-record-write-rate # 每秒写入 Kafka 的记录数
- poll-batch-avg-time-ms # 单次 poll() 调用的平均耗时
# Task 级别(Sink Task)
kafka.connect:type=sink-task-metrics,connector=<name>,task=<id>
- sink-record-read-rate # 每秒从 Kafka 读取的记录数
- sink-record-send-rate # 每秒成功写入目标系统的记录数
- put-batch-avg-time-ms # 单次批量写入的平均耗时
# Worker 级别
kafka.connect:type=connect-worker-metrics
- connector-count # Worker 上的 Connector 数
- task-count # Worker 上的 Task 数
- connector-startup-success-rate # Connector 启动成功率
Kafka Connect 是 Kafka 生态系统中最被低估的组件之一。当你面临"如何把数据搬进 Kafka"或"如何从 Kafka 写入目标系统"的问题时,先看看是否有现成的 Connector(Confluent Hub 收录了几百个),再考虑自己实现。自己实现时,牢记 Connector 只管任务分割、Task 只管数据访问、错误处理交给框架,是写出健壮 Connect 插件的基本原则。