Chapter 26

Kafka Connect Deep Dive: Framework and Custom Connectors

The Problem Kafka Connect Solves

Before Kafka Connect existed, integrating external systems with Kafka was a bespoke engineering problem. Teams wrote Python scripts to poll databases, scheduled jobs to read CSV files, and maintained hand-crafted Kafka Producers. Every implementation reinvented the same wheel: connection management, offset tracking, error retry, backpressure, and parallelism control — all implemented differently, all with different failure modes, none reusable across teams.

Kafka Connect's design goal is a pluggable data integration framework where the generic problems of data movement are handled by the framework, and developers only implement the domain-specific data access logic for their particular external system. The abstraction hierarchy is deliberately clean:

Connector (configuration validation + task splitting strategy)
    ↓
Task (actual data movement execution)
    ↓
Converter (serialization/deserialization of data format)
    ↓
Kafka (the durable intermediary)

This separation means that error handling, retries, offset commits, and monitoring are implemented once in the framework and work identically across all connectors. A connector for MySQL, one for S3, and one for your custom REST API all share the same operational model.

Framework Core Components

Connector: Configuration Manager

The Connector class does not transfer any data. Its sole responsibilities are:

  1. Validate configuration parameters
  2. Decide how to partition work across parallel tasks given a maxTasks constraint
  3. Generate the per-task configuration map for each task
public class MyDatabaseSourceConnector extends SourceConnector {
    private Map<String, String> config;

    @Override
    public void start(Map<String, String> props) {
        this.config = props;
        // Validate config; optionally test connectivity
        // Do NOT maintain a persistent connection here — the Connector
        // is a coordinator, not a data pipeline
        validateConfig(props);
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // Example: 10 tables to replicate, split across up to maxTasks tasks
        List<String> tables = Arrays.asList(config.get("tables").split(","));
        int numTasks = Math.min(maxTasks, tables.size());

        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < numTasks; i++) {
            Map<String, String> taskConfig = new HashMap<>(config);
            // Assign a subset of tables to each task
            taskConfig.put("tables",
                String.join(",", tables.subList(
                    i * tables.size() / numTasks,
                    (i + 1) * tables.size() / numTasks
                ))
            );
            taskConfigs.add(taskConfig);
        }
        return taskConfigs;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return MyDatabaseSourceTask.class;
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("db.url", ConfigDef.Type.STRING,
                    ConfigDef.Importance.HIGH,
                    "JDBC URL of the source database")
            .define("tables", ConfigDef.Type.STRING,
                    ConfigDef.Importance.HIGH,
                    "Comma-separated list of tables to replicate")
            .define("poll.interval.ms", ConfigDef.Type.INT, 1000,
                    ConfigDef.Importance.MEDIUM,
                    "Milliseconds between poll iterations");
    }

    @Override
    public String version() { return "1.0.0"; }

    @Override
    public void stop() { }
}

Task: The Data Mover

Source Tasks pull records from external systems and return SourceRecord lists to the framework. The framework handles writing those records to Kafka and committing offsets. Sink Tasks receive SinkRecord lists from the framework (already read from Kafka) and write them to the external system.

public class MyDatabaseSourceTask extends SourceTask {
    private Connection connection;
    private List<String> tables;
    private long lastProcessedId;

    @Override
    public void start(Map<String, String> props) {
        tables = Arrays.asList(props.get("tables").split(","));

        try {
            connection = DriverManager.getConnection(props.get("db.url"),
                props.get("db.user"), props.get("db.password"));
        } catch (SQLException e) {
            throw new ConnectException("Cannot connect to database", e);
        }

        // Restore the last processed position from the offset store
        // The framework automatically persists offsets to connect-offsets topic
        Map<String, Object> offset = context.offsetStorageReader()
            .offset(Map.of("table", tables.get(0)));
        lastProcessedId = (offset != null && offset.containsKey("last_id"))
            ? (Long) offset.get("last_id") : 0L;
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();

        for (String table : tables) {
            Map<String, String> sourcePartition = Map.of("table", table);

            try (PreparedStatement stmt = connection.prepareStatement(
                    "SELECT id, payload, created_at FROM " + table +
                    " WHERE id > ? ORDER BY id LIMIT 500")) {
                stmt.setLong(1, lastProcessedId);
                ResultSet rs = stmt.executeQuery();

                while (rs.next()) {
                    long id = rs.getLong("id");
                    // sourceOffset tells the framework where we are in the source
                    // On restart, the framework passes this back in start()
                    Map<String, Object> sourceOffset = Map.of("last_id", id);

                    records.add(new SourceRecord(
                        sourcePartition,
                        sourceOffset,
                        "db-" + table,           // target Kafka topic
                        null,                    // partition (null = let Kafka decide)
                        Schema.STRING_SCHEMA,
                        String.valueOf(id),      // Kafka record key
                        Schema.STRING_SCHEMA,
                        rs.getString("payload"), // Kafka record value
                        rs.getLong("created_at") // timestamp
                    ));
                    lastProcessedId = id;
                }
            } catch (SQLException e) {
                // Throw ConnectException to signal a retryable error
                // The framework will restart the task after a backoff delay
                throw new ConnectException("Database poll failed for " + table, e);
            }
        }

        if (records.isEmpty()) {
            Thread.sleep(Integer.parseInt(
                context.configs().get("poll.interval.ms")));
        }
        return records;
    }

    @Override
    public void stop() {
        try { if (connection != null) connection.close(); }
        catch (SQLException ignored) { }
    }

    @Override
    public String version() { return "1.0.0"; }
}

Converter: Data Format Translator

Converters serialize SourceRecord values to bytes for Kafka (and deserialize bytes back to SinkRecord values). Converters are configured at the Worker level (applying to all connectors unless overridden) and optionally at the connector level:

{
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

Built-in converters:

The Avro and Protobuf converters store the schema in Schema Registry and reference it by ID in the wire format (magic byte 0x00 + 4-byte schema ID + serialized payload). This eliminates schema repetition in every message and enables schema evolution enforcement.

Standalone vs Distributed Mode

Standalone Mode

A single Worker process reads connector configurations from local properties files. Offsets are stored locally. No fault tolerance — if the process crashes, all connectors stop.

connect-standalone.sh connect-standalone.properties \
  source-connector.properties \
  sink-connector.properties

Standalone is appropriate for development and testing. Never use it for production — it provides no fault tolerance and no ability to scale tasks across machines.

Distributed Mode

Multiple Worker processes form a Connect cluster using Kafka's consumer group protocol. This is the only production-appropriate deployment mode.

Distributed mode uses three internal Kafka topics as its coordination database:

config.storage.topic  = connect-configs
  # Connector configurations and task configurations (compacted topic)
  # Persists across restarts — Workers read this to restore all connectors

offset.storage.topic  = connect-offsets
  # Source connector offset tracking (compacted topic)
  # Maps source partitions to their latest processed offset
  # Separate from Kafka consumer offsets — not in __consumer_offsets

status.storage.topic  = connect-statuses
  # Connector and task state (compacted topic)
  # Running/paused/failed status visible to all Workers in the cluster

All three topics should use replication.factor=3 and min.insync.replicas=2 in production. They are the cluster's persistent state — losing them means losing all connector configurations and offsets.

Workers coordinate using the Kafka consumer group protocol (configured via group.id). When Workers join or leave, Kafka triggers a rebalance and redistributes Task assignments. Connect 2.x uses Eager Rebalance (all tasks stop, then all restart with new assignments), which causes a brief processing gap during rebalance. Connect 3.x introduced Incremental Cooperative Rebalance, where only the tasks being moved are paused, significantly reducing disruption.

REST API: Dynamic Connector Management

In distributed mode, all management operations go through a REST API on any Worker node (the REST API is load-balanced across Workers):

# List all connectors
curl http://connect-worker:8083/connectors

# Create a connector
curl -X POST http://connect-worker:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "debezium-mysql-orders",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql.production.internal",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "${file:/etc/secrets/mysql.properties:password}",
      "database.server.name": "production",
      "database.include.list": "shop",
      "table.include.list": "shop.orders,shop.order_items",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "schema-changes.production",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false"
    }
  }'

# Check connector and task status
curl http://connect-worker:8083/connectors/debezium-mysql-orders/status
# {"name":"debezium-mysql-orders","connector":{"state":"RUNNING","worker_id":"..."},
#  "tasks":[{"id":0,"state":"RUNNING","worker_id":"..."}],"type":"source"}

# Pause, resume, delete
curl -X PUT http://connect-worker:8083/connectors/debezium-mysql-orders/pause
curl -X PUT http://connect-worker:8083/connectors/debezium-mysql-orders/resume
curl -X DELETE http://connect-worker:8083/connectors/debezium-mysql-orders

Connector configuration changes require a PUT to the config endpoint — this triggers a Task restart (the connector stops its tasks and starts new ones with the updated config). There is no in-place hot-reload.

Debezium MySQL CDC Connector Deep Dive

CDC (Change Data Capture) is one of Kafka Connect's most powerful use cases. Debezium's MySQL connector reads the MySQL binary log (binlog) to capture every INSERT, UPDATE, and DELETE as a Kafka message.

How Binlog Reading Works

Debezium registers itself as a MySQL replica, presenting a fake server ID. MySQL streams binlog events to Debezium exactly as it would to a real replica. On first startup, Debezium performs an initial snapshot of all target tables (using SELECT * with a consistent lock), then begins incremental capture from the binlog position at the time of the snapshot — ensuring no events are missed between the snapshot and the first binlog event.

The binlog position is stored in the connect-offsets topic as {file: "mysql-bin.000042", pos: 1234567} (or as a GTID if gtid_mode=ON). On restart, Debezium reads this offset and resumes from where it stopped.

Message Structure

Debezium produces messages with a before/after envelope containing the complete row state before and after each change:

{
  "before": {
    "id": 123,
    "amount": 99.99,
    "status": "PENDING"
  },
  "after": {
    "id": 123,
    "amount": 99.99,
    "status": "COMPLETED"
  },
  "source": {
    "db": "shop",
    "table": "orders",
    "ts_ms": 1719820800000,
    "file": "mysql-bin.000042",
    "pos": 1234567,
    "gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:23",
    "snapshot": "false"
  },
  "op": "u",
  "ts_ms": 1719820800150
}

op values: c (create/INSERT), u (UPDATE), d (DELETE), r (READ, during initial snapshot). DELETE operations produce a tombstone message (key with null value) after the delete event, signaling Kafka's log compaction to remove the key.

The before field requires binlog_row_image=FULL (the MySQL default). With MINIMAL, UPDATE events only contain changed columns in before, which can make downstream consumers unable to reconstruct the full pre-change state.

SMT (Single Message Transform) Chain

SMTs are lightweight, stateless record-level transformations applied by the Connect framework in a pipeline. They execute between data reading (Source) or before data writing (Sink), eliminating the need for a separate processing layer for simple transformations.

{
  "transforms": "unwrap,maskPII,routeToUserTopic",

  "transforms.unwrap.type":
      "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.delete.handling.mode": "rewrite",
  "transforms.unwrap.add.fields": "op,source.ts_ms",

  "transforms.maskPII.type":
      "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskPII.fields": "email,phone,credit_card",
  "transforms.maskPII.replacement": "***REDACTED***",

  "transforms.routeToUserTopic.type":
      "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeToUserTopic.regex": "production\\.(.+)\\.(.+)",
  "transforms.routeToUserTopic.replacement": "$1-$2"
}

Common SMTs and their use cases:

SMTs are intentionally limited to stateless, single-record operations. Do not use them for joins, aggregations, or any logic requiring state or multiple records. Those operations belong in Kafka Streams or ksqlDB.

Custom Source Connector: Complete REST API Example

// Connector
public class RestApiSourceConnector extends SourceConnector {
    public static final String BASE_URL_CONFIG    = "rest.api.base.url";
    public static final String ENDPOINTS_CONFIG   = "rest.api.endpoints";
    public static final String POLL_MS_CONFIG     = "poll.interval.ms";
    public static final String TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";

    private Map<String, String> config;

    @Override
    public void start(Map<String, String> props) {
        config = props;
        if (props.get(BASE_URL_CONFIG) == null)
            throw new ConfigException(BASE_URL_CONFIG + " is required");
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        String[] endpoints = config.get(ENDPOINTS_CONFIG).split(",");
        int numTasks = Math.min(maxTasks, endpoints.length);
        List<Map<String, String>> configs = new ArrayList<>();

        for (int i = 0; i < numTasks; i++) {
            Map<String, String> taskConfig = new HashMap<>(config);
            List<String> assigned = new ArrayList<>();
            for (int j = i; j < endpoints.length; j += numTasks)
                assigned.add(endpoints[j].trim());
            taskConfig.put(ENDPOINTS_CONFIG, String.join(",", assigned));
            configs.add(taskConfig);
        }
        return configs;
    }

    @Override
    public Class<? extends Task> taskClass() { return RestApiSourceTask.class; }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define(BASE_URL_CONFIG, Type.STRING, Importance.HIGH, "REST API base URL")
            .define(ENDPOINTS_CONFIG, Type.STRING, Importance.HIGH,
                    "Comma-separated API endpoint paths")
            .define(POLL_MS_CONFIG, Type.INT, 5000, Importance.MEDIUM,
                    "Poll interval in milliseconds")
            .define(TOPIC_PREFIX_CONFIG, Type.STRING, "", Importance.LOW,
                    "Prefix for destination Kafka topic names");
    }

    @Override
    public String version() { return "1.0.0"; }

    @Override
    public void stop() { }
}

// Task
public class RestApiSourceTask extends SourceTask {
    private String baseUrl;
    private List<String> endpoints;
    private int pollIntervalMs;
    private String topicPrefix;
    private HttpClient httpClient;
    private ObjectMapper mapper;

    @Override
    public void start(Map<String, String> props) {
        baseUrl       = props.get(RestApiSourceConnector.BASE_URL_CONFIG);
        endpoints     = Arrays.asList(
            props.get(RestApiSourceConnector.ENDPOINTS_CONFIG).split(","));
        pollIntervalMs = Integer.parseInt(
            props.getOrDefault(RestApiSourceConnector.POLL_MS_CONFIG, "5000"));
        topicPrefix   = props.getOrDefault(
            RestApiSourceConnector.TOPIC_PREFIX_CONFIG, "");
        httpClient    = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(10))
            .build();
        mapper        = new ObjectMapper();
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();

        for (String endpoint : endpoints) {
            Map<String, String> partition = Map.of("endpoint", endpoint);
            Map<String, Object> savedOffset = context.offsetStorageReader()
                .offset(partition);
            String cursor = savedOffset != null
                ? (String) savedOffset.get("cursor") : null;

            String url = baseUrl + endpoint +
                (cursor != null ? "?after=" + URLEncoder.encode(cursor,
                    StandardCharsets.UTF_8) : "");

            try {
                HttpRequest req = HttpRequest.newBuilder()
                    .uri(URI.create(url))
                    .header("Accept", "application/json")
                    .timeout(Duration.ofSeconds(30))
                    .GET().build();

                HttpResponse<String> resp = httpClient.send(req,
                    HttpResponse.BodyHandlers.ofString());

                if (resp.statusCode() != 200) {
                    log.warn("HTTP {} from {}", resp.statusCode(), url);
                    continue;
                }

                JsonNode root       = mapper.readTree(resp.body());
                JsonNode items      = root.get("items");
                String  nextCursor  = root.path("next_cursor").asText(null);

                for (JsonNode item : items) {
                    String id = item.get("id").asText();
                    Map<String, Object> newOffset = new HashMap<>();
                    newOffset.put("cursor", nextCursor != null ? nextCursor : id);

                    String topic = topicPrefix +
                        endpoint.replaceAll("^/", "").replace("/", "-");

                    records.add(new SourceRecord(
                        partition, newOffset,
                        topic,
                        null,
                        Schema.STRING_SCHEMA, id,
                        Schema.STRING_SCHEMA, item.toString()
                    ));
                }
            } catch (IOException e) {
                // Log and continue — do not throw, which would restart the task
                // and lose progress on other endpoints in this task
                log.error("Poll failed for {}: {}", endpoint, e.getMessage());
            }
        }

        if (records.isEmpty()) Thread.sleep(pollIntervalMs);
        return records;
    }

    @Override
    public void stop() { }

    @Override
    public String version() { return "1.0.0"; }
}

Error Handling: Dead Letter Queue

By default, a Sink Connector stops its Task when a record cannot be written to the destination system. In production, this is almost always the wrong behavior — one malformed record should not halt processing of thousands of good records.

Configure the dead letter queue (DLQ) pattern:

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "connect-dlq-my-sink",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true,
  "errors.log.enable": true,
  "errors.log.include.messages": true
}

With this configuration:

The DLQ topic is a normal Kafka topic. You can consume it with any Kafka consumer to inspect and replay failed records after fixing the underlying issue.

Connector Monitoring via JMX

Connect exposes comprehensive JMX metrics for operational visibility:

# Source Task throughput and latency
kafka.connect:type=source-task-metrics,connector=<name>,task=<id>
    source-record-poll-rate       # Records polled from source per second
    source-record-write-rate      # Records written to Kafka per second
    poll-batch-avg-time-ms        # Average poll() execution time
    source-record-active-count    # Records polled but not yet committed

# Sink Task throughput and latency
kafka.connect:type=sink-task-metrics,connector=<name>,task=<id>
    sink-record-read-rate         # Records read from Kafka per second
    sink-record-send-rate         # Records successfully written to sink per second
    put-batch-avg-time-ms         # Average put() execution time
    sink-record-lag-max           # Max consumer lag across assigned partitions

# Worker health
kafka.connect:type=connect-worker-metrics
    connector-count               # Connectors on this Worker
    task-count                    # Tasks on this Worker
    connector-startup-failure-rate # Rate of failed connector starts
    rebalance-avg-time-ms         # Average rebalance duration

Alert on connector-failed-task-count > 0, sink-record-lag-max growing over time, and rebalance-avg-time-ms spikes (which indicate cluster instability). These three metrics cover the most common Connect production issues: task failures, consumer lag buildup, and rebalance storms.

Kafka Connect is among the most underappreciated components in the Kafka ecosystem. Before writing a custom connector, check Confluent Hub — it lists hundreds of community and commercially supported connectors. When you do need to write one, keep the responsibilities separate: the Connector validates configuration and splits work; the Task accesses data; the framework handles everything else.

Rate this chapter
4.6  / 5  (4 ratings)

💬 Comments