第 26 章

Kafka Connect 深入:框架原理与自定义 Connector

第26章:Kafka Connect 深入:框架原理与自定义 Connector

导读:Kafka Connect 如何标准化数据集成?

本章核心问题:Kafka Connect 如何标准化数据集成?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

框架核心组件

Connector:配置管理者

Connector 类不执行任何实际的数据传输,它的职责是:

  1. 验证配置参数的合法性
  2. 根据 maxTasks 参数决定如何分割工作(将数据源分成多个并行任务)
  3. 为每个 Task 生成各自的配置
// Connector 的两个核心方法
public class MyDatabaseSourceConnector extends SourceConnector {
    private Map<String, String> config;
    
    @Override
    public void start(Map<String, String> props) {
        this.config = props;
        // 验证配置,建立连接测试(不持久保持连接)
        validateConfig(props);
    }
    
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // 假设要读取 10 张表,最多 maxTasks 个并行任务
        List<String> tables = Arrays.asList(config.get("tables").split(","));
        int numTasks = Math.min(maxTasks, tables.size());
        
        // 把表列表均匀分配给每个 Task
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < numTasks; i++) {
            Map<String, String> taskConfig = new HashMap<>(config);
            // 每个 Task 负责一个子集的表
            taskConfig.put("tables", 
                String.join(",", tables.subList(
                    i * tables.size() / numTasks,
                    (i + 1) * tables.size() / numTasks
                ))
            );
            taskConfigs.add(taskConfig);
        }
        return taskConfigs;
    }
    
    @Override
    public Class<? extends Task> taskClass() {
        return MyDatabaseSourceTask.class;
    }
    
    @Override
    public ConfigDef config() {
        // 声明支持的配置项、类型、默认值和文档
        return new ConfigDef()
            .define("db.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
                    "JDBC URL of the source database")
            .define("tables", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
                    "Comma-separated list of tables to replicate")
            .define("poll.interval.ms", ConfigDef.Type.INT, 1000,
                    ConfigDef.Importance.MEDIUM, "Poll interval in milliseconds");
    }
}

Task:数据搬运工

Task 是真正执行数据传输的单元。Source Task 从外部系统拉取数据并生成 SourceRecord 列表;Sink Task 从 Kafka 消费记录并写入外部系统。

public class MyDatabaseSourceTask extends SourceTask {
    private Connection connection;
    private String tables;
    private long lastPollOffset;
    
    @Override
    public void start(Map<String, String> props) {
        tables = props.get("tables");
        connection = DriverManager.getConnection(props.get("db.url"));
        // 恢复上次的处理位置(Connect 框架自动保存和传入)
        Map<String, Object> offset = context.offsetStorageReader()
            .offset(sourcePartition());
        lastPollOffset = offset != null ? 
            (Long) offset.get("last_id") : 0L;
    }
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        
        try (PreparedStatement stmt = connection.prepareStatement(
                "SELECT id, data, created_at FROM my_table " +
                "WHERE id > ? ORDER BY id LIMIT 1000")) {
            stmt.setLong(1, lastPollOffset);
            ResultSet rs = stmt.executeQuery();
            
            while (rs.next()) {
                long id = rs.getLong("id");
                String data = rs.getString("data");
                
                // Source partition:标识这条记录的来源(用于 offset 追踪)
                Map<String, String> sourcePartition = Map.of("table", "my_table");
                // Source offset:记录在源系统中的位置(框架用来恢复处理)
                Map<String, Object> sourceOffset = Map.of("last_id", id);
                
                records.add(new SourceRecord(
                    sourcePartition,          // 来源标识
                    sourceOffset,             // 当前处理位置
                    "my-kafka-topic",         // 目标 Kafka topic
                    null,                     // 分区(null = 让 Kafka 决定)
                    Schema.STRING_SCHEMA,     // key schema
                    String.valueOf(id),       // key 值
                    Schema.STRING_SCHEMA,     // value schema
                    data,                     // value 值
                    rs.getLong("created_at")  // 时间戳
                ));
                lastPollOffset = id;
            }
        } catch (SQLException e) {
            throw new ConnectException("Failed to poll database", e);
        }
        
        if (records.isEmpty()) {
            // 没有新数据时等待 poll interval,避免空转浪费 CPU
            Thread.sleep(1000);
        }
        return records;
    }
    
    @Override
    public void stop() {
        try { connection.close(); } catch (SQLException ignored) {}
    }
    
    private Map<String, String> sourcePartition() {
        return Map.of("table", "my_table");
    }
}

Converter:数据格式的翻译官

Converter 负责在 Kafka 的二进制字节和 Java 对象(SourceRecord / SinkRecord)之间做转换。框架内置了几个常用 Converter:

Converter 在 Worker 级别配置(影响所有 Connector),也可以在 Connector 级别覆盖:

{
  "connector.class": "...",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

REST API:动态管理 Connector

分布式模式下,所有 Connect 管理操作通过 REST API 完成:

# 列出所有 Connector
curl http://connect-worker:8083/connectors

# 创建 Connector
curl -X POST http://connect-worker:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-orders-source",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbz_password",
      "database.server.name": "production",
      "database.include.list": "orders",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "schema-changes.orders",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
  }'

# 查询 Connector 状态
curl http://connect-worker:8083/connectors/mysql-orders-source/status

# 暂停 / 恢复 Connector
curl -X PUT http://connect-worker:8083/connectors/mysql-orders-source/pause
curl -X PUT http://connect-worker:8083/connectors/mysql-orders-source/resume

# 删除 Connector(停止后删除配置,offset 保留)
curl -X DELETE http://connect-worker:8083/connectors/mysql-orders-source

Debezium MySQL CDC Connector 深入

CDC(Change Data Capture)是 Kafka Connect 最重要的使用场景之一。Debezium MySQL Connector 通过读取 MySQL 的 binlog 来捕获数据变更,将 INSERT/UPDATE/DELETE 操作转换为 Kafka 消息。

Binlog 读取原理

Debezium 将自己伪装成 MySQL Replica,向 MySQL 主库注册为一个从库,然后从指定的 binlog 位置(file + position)开始接收 binlog events。初次启动时,Debezium 先对目标表做一次全量快照(SELECT *),然后再从快照时对应的 binlog 位置开始增量捕获,保证数据无遗漏。

消息结构

Debezium 产生的每条消息包含变更前后的完整行数据:

{
  "before": {
    "id": 123,
    "amount": 99.99,
    "status": "PENDING"
  },
  "after": {
    "id": 123,
    "amount": 99.99,
    "status": "COMPLETED"
  },
  "source": {
    "db": "orders",
    "table": "orders",
    "ts_ms": 1719820800000,
    "file": "mysql-bin.000042",
    "pos": 1234567,
    "gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:23"
  },
  "op": "u",      // c=CREATE, u=UPDATE, d=DELETE, r=READ(snapshot)
  "ts_ms": 1719820800100
}

before 字段需要 MySQL 开启 binlog_row_image=FULL(默认),否则 UPDATE 只有 after

SMT(Single Message Transform)链

SMT 是 Kafka Connect 提供的消息级别的轻量转换机制,在数据从 Source 写入 Kafka(或从 Kafka 写入 Sink)时执行,无需单独的处理层:

{
  "transforms": "unwrap,maskSensitive,routeByStatus",
  
  "transforms.unwrap.type": 
      "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.delete.handling.mode": "rewrite",
  
  "transforms.maskSensitive.type": 
      "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskSensitive.fields": "credit_card_number,ssn",
  "transforms.maskSensitive.replacement": "***MASKED***",
  
  "transforms.routeByStatus.type": 
      "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.routeByStatus.fields": "status"
}

常用 SMT:

SMT 是轻量的,但不适合复杂逻辑(如 Join、聚合、条件分支)。复杂转换应该在 Kafka Streams 或 ksqlDB 中完成。

自定义 Source Connector 完整实现

以下是一个读取 REST API 数据的 Source Connector 完整示例:

// Connector 类:配置和任务分割
public class RestApiSourceConnector extends SourceConnector {
    public static final String BASE_URL_CONFIG = "rest.api.url";
    public static final String ENDPOINT_CONFIG = "endpoints";
    public static final String POLL_INTERVAL_CONFIG = "poll.interval.ms";
    public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";
    
    private Map<String, String> config;
    
    @Override
    public void start(Map<String, String> props) {
        config = props;
        // 验证 REST API 可达性
        String url = props.get(BASE_URL_CONFIG);
        if (url == null || url.isBlank()) {
            throw new ConfigException(BASE_URL_CONFIG, url, "Must not be blank");
        }
    }
    
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        String[] endpoints = config.get(ENDPOINT_CONFIG).split(",");
        int numTasks = Math.min(maxTasks, endpoints.length);
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        
        for (int i = 0; i < numTasks; i++) {
            // 每个 Task 负责一组 endpoints
            Map<String, String> taskConfig = new HashMap<>(config);
            List<String> endpointSlice = new ArrayList<>();
            for (int j = i; j < endpoints.length; j += numTasks) {
                endpointSlice.add(endpoints[j].trim());
            }
            taskConfig.put(ENDPOINT_CONFIG, String.join(",", endpointSlice));
            taskConfigs.add(taskConfig);
        }
        return taskConfigs;
    }
    
    @Override
    public Class<? extends Task> taskClass() {
        return RestApiSourceTask.class;
    }
    
    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define(BASE_URL_CONFIG, ConfigDef.Type.STRING, 
                    ConfigDef.Importance.HIGH, "Base URL of the REST API")
            .define(ENDPOINT_CONFIG, ConfigDef.Type.STRING,
                    ConfigDef.Importance.HIGH, "Comma-separated API endpoints")
            .define(POLL_INTERVAL_CONFIG, ConfigDef.Type.INT, 5000,
                    ConfigDef.Importance.MEDIUM, "Poll interval in ms")
            .define(TOPIC_PREFIX_CONFIG, ConfigDef.Type.STRING, "",
                    ConfigDef.Importance.LOW, "Prefix for Kafka topic names");
    }
    
    @Override
    public String version() {
        return "1.0.0";
    }
    
    @Override
    public void stop() { }
}

// Task 类:实际执行数据拉取
public class RestApiSourceTask extends SourceTask {
    private String baseUrl;
    private List<String> endpoints;
    private int pollIntervalMs;
    private String topicPrefix;
    private HttpClient httpClient;
    private ObjectMapper objectMapper;
    
    @Override
    public void start(Map<String, String> props) {
        baseUrl = props.get(RestApiSourceConnector.BASE_URL_CONFIG);
        endpoints = Arrays.asList(
            props.get(RestApiSourceConnector.ENDPOINT_CONFIG).split(","));
        pollIntervalMs = Integer.parseInt(
            props.get(RestApiSourceConnector.POLL_INTERVAL_CONFIG));
        topicPrefix = props.getOrDefault(
            RestApiSourceConnector.TOPIC_PREFIX_CONFIG, "");
        
        httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(10))
            .build();
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        
        for (String endpoint : endpoints) {
            Map<String, String> partition = Map.of("endpoint", endpoint);
            // 读取该 endpoint 上次处理到的 cursor
            Map<String, Object> storedOffset = context
                .offsetStorageReader()
                .offset(partition);
            String cursor = storedOffset != null ? 
                (String) storedOffset.get("cursor") : null;
            
            try {
                String url = baseUrl + endpoint + 
                    (cursor != null ? "?after=" + cursor : "");
                
                HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(url))
                    .header("Accept", "application/json")
                    .timeout(Duration.ofSeconds(30))
                    .GET()
                    .build();
                
                HttpResponse<String> response = httpClient.send(
                    request, HttpResponse.BodyHandlers.ofString());
                
                if (response.statusCode() != 200) {
                    // 非致命错误:记录日志,跳过本次轮询
                    log.warn("Got HTTP {} for {}", response.statusCode(), url);
                    continue;
                }
                
                JsonNode root = objectMapper.readTree(response.body());
                JsonNode items = root.get("items");
                String nextCursor = root.path("next_cursor").asText(null);
                
                for (JsonNode item : items) {
                    String id = item.get("id").asText();
                    Map<String, Object> offset = new HashMap<>();
                    offset.put("cursor", nextCursor != null ? nextCursor : id);
                    
                    records.add(new SourceRecord(
                        partition,
                        offset,
                        topicPrefix + endpoint.replace("/", "-").substring(1),
                        null,
                        Schema.STRING_SCHEMA,
                        id,
                        Schema.STRING_SCHEMA,
                        item.toString()
                    ));
                }
            } catch (IOException e) {
                // 网络错误:不抛出异常(会导致 Task 重启),而是记录并继续
                log.error("Failed to poll endpoint {}: {}", endpoint, e.getMessage());
            }
        }
        
        if (records.isEmpty()) {
            Thread.sleep(pollIntervalMs);
        }
        return records;
    }
    
    @Override
    public void stop() {
        // HttpClient 无需显式关闭
    }
    
    @Override
    public String version() {
        return "1.0.0";
    }
}

Level 2 · 它是怎么运行的(3-5年经验)

本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。


Level 3 · 规范怎么定义的(资深)

错误处理:死信队列

Sink Connector 在写入目标系统时,可能因为数据格式错误、目标系统限制等原因处理失败。默认行为是 Task 失败并需要人工干预。生产环境中推荐配置死信队列(DLQ):

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "connect-dlq",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true,
  "errors.log.enable": true,
  "errors.log.include.messages": true
}

Level 4 · 边界与陷阱(所有人)

Connect 框架的设计哲学

在 Kafka Connect 出现之前,将外部系统的数据移入或移出 Kafka 是每个团队各自为战的工程问题:有人写 Python 脚本轮询数据库,有人用定时任务读 CSV 文件,有人维护手写的 Kafka Producer。这些方案都有相同的缺陷:缺乏统一的错误处理、监控、重试和流量控制机制,也难以在团队间复用。

Kafka Connect 的设计目标是提供一个可插拔的数据集成框架,让数据移动的通用问题(连接管理、offset 追踪、错误重试、并行度控制)由框架负责,开发者只需要关注特定系统的数据访问逻辑。这个框架的核心抽象层次非常清晰:

Connector(配置 + 任务分割策略)
    ↓
Task(实际的数据移动执行单元)
    ↓
Converter(数据格式序列化/反序列化)
    ↓
Kafka(中间的持久化层)

独立模式 vs 分布式模式

独立模式(Standalone Mode)

单进程运行,配置存储在本地文件,offset 也存储在本地文件。适合开发和测试:

connect-standalone.sh connect-standalone.properties \
  my-source-connector.properties \
  my-sink-connector.properties

独立模式的 Connector 不具备容错性——进程崩溃,所有 Connector 停止,必须手动重启。

分布式模式(Distributed Mode)

多个 Worker 进程组成一个 Connect 集群,通过 Kafka 的消费者组协议协调。这是生产环境的唯一推荐选择。

分布式模式的三个关键内部 topic:

config.storage.topic  = connect-configs    # Connector 和 Task 的配置
offset.storage.topic  = connect-offsets    # 每个 Task 的处理进度(Source offset)
status.storage.topic  = connect-statuses   # Connector 和 Task 的状态

这三个 topic 是高度压缩和高复制因子(建议 replication.factor=3)的。它们是 Connect 集群的"数据库"——Worker 重启后从这里恢复配置和 offset。

Worker 之间的协调使用 Kafka 消费者组协议(Group ID = group.id 配置项)。当 Worker 加入或离开时,触发 rebalance,Task 在存活的 Worker 间重新分配。Connect 的 rebalance 要求所有 Task 停止并重新分配,这在 Worker 数量变化时会造成短暂的处理中断(Eager Rebalance)。Connect 3.x 开始支持 Incremental Cooperative Rebalance,减少中断时间。

监控:JMX 指标

Connect 通过 JMX 暴露丰富的运行时指标,关键指标包括:

# Connector 级别
kafka.connect:type=connector-metrics,connector=<name>
    - connector-total-task-count     # 总 Task 数
    - connector-running-task-count   # 运行中 Task 数
    - connector-failed-task-count    # 失败 Task 数

# Task 级别(Source Task)
kafka.connect:type=source-task-metrics,connector=<name>,task=<id>
    - source-record-poll-rate        # 每秒从源拉取的记录数
    - source-record-write-rate       # 每秒写入 Kafka 的记录数
    - poll-batch-avg-time-ms         # 单次 poll() 调用的平均耗时

# Task 级别(Sink Task)
kafka.connect:type=sink-task-metrics,connector=<name>,task=<id>
    - sink-record-read-rate          # 每秒从 Kafka 读取的记录数
    - sink-record-send-rate          # 每秒成功写入目标系统的记录数
    - put-batch-avg-time-ms          # 单次批量写入的平均耗时

# Worker 级别
kafka.connect:type=connect-worker-metrics
    - connector-count                # Worker 上的 Connector 数
    - task-count                     # Worker 上的 Task 数
    - connector-startup-success-rate # Connector 启动成功率

Kafka Connect 是 Kafka 生态系统中最被低估的组件之一。当你面临"如何把数据搬进 Kafka"或"如何从 Kafka 写入目标系统"的问题时,先看看是否有现成的 Connector(Confluent Hub 收录了几百个),再考虑自己实现。自己实现时,牢记 Connector 只管任务分割、Task 只管数据访问、错误处理交给框架,是写出健壮 Connect 插件的基本原则。

本章评分
4.6  / 5  (4 评分)

💬 留言讨论