Kafka Connect Deep Dive: Framework and Custom Connectors
The Problem Kafka Connect Solves
Before Kafka Connect existed, integrating external systems with Kafka was a bespoke engineering problem. Teams wrote Python scripts to poll databases, scheduled jobs to read CSV files, and maintained hand-crafted Kafka Producers. Every implementation reinvented the same wheel: connection management, offset tracking, error retry, backpressure, and parallelism control — all implemented differently, all with different failure modes, none reusable across teams.
Kafka Connect's design goal is a pluggable data integration framework where the generic problems of data movement are handled by the framework, and developers only implement the domain-specific data access logic for their particular external system. The abstraction hierarchy is deliberately clean:
Connector (configuration validation + task splitting strategy)
↓
Task (actual data movement execution)
↓
Converter (serialization/deserialization of data format)
↓
Kafka (the durable intermediary)
This separation means that error handling, retries, offset commits, and monitoring are implemented once in the framework and work identically across all connectors. A connector for MySQL, one for S3, and one for your custom REST API all share the same operational model.
Framework Core Components
Connector: Configuration Manager
The Connector class does not transfer any data. Its sole responsibilities are:
- Validate configuration parameters
- Decide how to partition work across parallel tasks given a
maxTasksconstraint - Generate the per-task configuration map for each task
public class MyDatabaseSourceConnector extends SourceConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
// Validate config; optionally test connectivity
// Do NOT maintain a persistent connection here — the Connector
// is a coordinator, not a data pipeline
validateConfig(props);
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// Example: 10 tables to replicate, split across up to maxTasks tasks
List<String> tables = Arrays.asList(config.get("tables").split(","));
int numTasks = Math.min(maxTasks, tables.size());
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(config);
// Assign a subset of tables to each task
taskConfig.put("tables",
String.join(",", tables.subList(
i * tables.size() / numTasks,
(i + 1) * tables.size() / numTasks
))
);
taskConfigs.add(taskConfig);
}
return taskConfigs;
}
@Override
public Class<? extends Task> taskClass() {
return MyDatabaseSourceTask.class;
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("db.url", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"JDBC URL of the source database")
.define("tables", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Comma-separated list of tables to replicate")
.define("poll.interval.ms", ConfigDef.Type.INT, 1000,
ConfigDef.Importance.MEDIUM,
"Milliseconds between poll iterations");
}
@Override
public String version() { return "1.0.0"; }
@Override
public void stop() { }
}
Task: The Data Mover
Source Tasks pull records from external systems and return SourceRecord lists to the framework. The framework handles writing those records to Kafka and committing offsets. Sink Tasks receive SinkRecord lists from the framework (already read from Kafka) and write them to the external system.
public class MyDatabaseSourceTask extends SourceTask {
private Connection connection;
private List<String> tables;
private long lastProcessedId;
@Override
public void start(Map<String, String> props) {
tables = Arrays.asList(props.get("tables").split(","));
try {
connection = DriverManager.getConnection(props.get("db.url"),
props.get("db.user"), props.get("db.password"));
} catch (SQLException e) {
throw new ConnectException("Cannot connect to database", e);
}
// Restore the last processed position from the offset store
// The framework automatically persists offsets to connect-offsets topic
Map<String, Object> offset = context.offsetStorageReader()
.offset(Map.of("table", tables.get(0)));
lastProcessedId = (offset != null && offset.containsKey("last_id"))
? (Long) offset.get("last_id") : 0L;
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
for (String table : tables) {
Map<String, String> sourcePartition = Map.of("table", table);
try (PreparedStatement stmt = connection.prepareStatement(
"SELECT id, payload, created_at FROM " + table +
" WHERE id > ? ORDER BY id LIMIT 500")) {
stmt.setLong(1, lastProcessedId);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
long id = rs.getLong("id");
// sourceOffset tells the framework where we are in the source
// On restart, the framework passes this back in start()
Map<String, Object> sourceOffset = Map.of("last_id", id);
records.add(new SourceRecord(
sourcePartition,
sourceOffset,
"db-" + table, // target Kafka topic
null, // partition (null = let Kafka decide)
Schema.STRING_SCHEMA,
String.valueOf(id), // Kafka record key
Schema.STRING_SCHEMA,
rs.getString("payload"), // Kafka record value
rs.getLong("created_at") // timestamp
));
lastProcessedId = id;
}
} catch (SQLException e) {
// Throw ConnectException to signal a retryable error
// The framework will restart the task after a backoff delay
throw new ConnectException("Database poll failed for " + table, e);
}
}
if (records.isEmpty()) {
Thread.sleep(Integer.parseInt(
context.configs().get("poll.interval.ms")));
}
return records;
}
@Override
public void stop() {
try { if (connection != null) connection.close(); }
catch (SQLException ignored) { }
}
@Override
public String version() { return "1.0.0"; }
}
Converter: Data Format Translator
Converters serialize SourceRecord values to bytes for Kafka (and deserialize bytes back to SinkRecord values). Converters are configured at the Worker level (applying to all connectors unless overridden) and optionally at the connector level:
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
Built-in converters:
JsonConverter: JSON with optional embedded schemaAvroConverter: Avro binary, requires Schema RegistryProtobufConverter: Protocol Buffers, requires Schema RegistryByteArrayConverter: raw bytes, no transformationStringConverter: UTF-8 string
The Avro and Protobuf converters store the schema in Schema Registry and reference it by ID in the wire format (magic byte 0x00 + 4-byte schema ID + serialized payload). This eliminates schema repetition in every message and enables schema evolution enforcement.
Standalone vs Distributed Mode
Standalone Mode
A single Worker process reads connector configurations from local properties files. Offsets are stored locally. No fault tolerance — if the process crashes, all connectors stop.
connect-standalone.sh connect-standalone.properties \
source-connector.properties \
sink-connector.properties
Standalone is appropriate for development and testing. Never use it for production — it provides no fault tolerance and no ability to scale tasks across machines.
Distributed Mode
Multiple Worker processes form a Connect cluster using Kafka's consumer group protocol. This is the only production-appropriate deployment mode.
Distributed mode uses three internal Kafka topics as its coordination database:
config.storage.topic = connect-configs
# Connector configurations and task configurations (compacted topic)
# Persists across restarts — Workers read this to restore all connectors
offset.storage.topic = connect-offsets
# Source connector offset tracking (compacted topic)
# Maps source partitions to their latest processed offset
# Separate from Kafka consumer offsets — not in __consumer_offsets
status.storage.topic = connect-statuses
# Connector and task state (compacted topic)
# Running/paused/failed status visible to all Workers in the cluster
All three topics should use replication.factor=3 and min.insync.replicas=2 in production. They are the cluster's persistent state — losing them means losing all connector configurations and offsets.
Workers coordinate using the Kafka consumer group protocol (configured via group.id). When Workers join or leave, Kafka triggers a rebalance and redistributes Task assignments. Connect 2.x uses Eager Rebalance (all tasks stop, then all restart with new assignments), which causes a brief processing gap during rebalance. Connect 3.x introduced Incremental Cooperative Rebalance, where only the tasks being moved are paused, significantly reducing disruption.
REST API: Dynamic Connector Management
In distributed mode, all management operations go through a REST API on any Worker node (the REST API is load-balanced across Workers):
# List all connectors
curl http://connect-worker:8083/connectors
# Create a connector
curl -X POST http://connect-worker:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "debezium-mysql-orders",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql.production.internal",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${file:/etc/secrets/mysql.properties:password}",
"database.server.name": "production",
"database.include.list": "shop",
"table.include.list": "shop.orders,shop.order_items",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.production",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}'
# Check connector and task status
curl http://connect-worker:8083/connectors/debezium-mysql-orders/status
# {"name":"debezium-mysql-orders","connector":{"state":"RUNNING","worker_id":"..."},
# "tasks":[{"id":0,"state":"RUNNING","worker_id":"..."}],"type":"source"}
# Pause, resume, delete
curl -X PUT http://connect-worker:8083/connectors/debezium-mysql-orders/pause
curl -X PUT http://connect-worker:8083/connectors/debezium-mysql-orders/resume
curl -X DELETE http://connect-worker:8083/connectors/debezium-mysql-orders
Connector configuration changes require a PUT to the config endpoint — this triggers a Task restart (the connector stops its tasks and starts new ones with the updated config). There is no in-place hot-reload.
Debezium MySQL CDC Connector Deep Dive
CDC (Change Data Capture) is one of Kafka Connect's most powerful use cases. Debezium's MySQL connector reads the MySQL binary log (binlog) to capture every INSERT, UPDATE, and DELETE as a Kafka message.
How Binlog Reading Works
Debezium registers itself as a MySQL replica, presenting a fake server ID. MySQL streams binlog events to Debezium exactly as it would to a real replica. On first startup, Debezium performs an initial snapshot of all target tables (using SELECT * with a consistent lock), then begins incremental capture from the binlog position at the time of the snapshot — ensuring no events are missed between the snapshot and the first binlog event.
The binlog position is stored in the connect-offsets topic as {file: "mysql-bin.000042", pos: 1234567} (or as a GTID if gtid_mode=ON). On restart, Debezium reads this offset and resumes from where it stopped.
Message Structure
Debezium produces messages with a before/after envelope containing the complete row state before and after each change:
{
"before": {
"id": 123,
"amount": 99.99,
"status": "PENDING"
},
"after": {
"id": 123,
"amount": 99.99,
"status": "COMPLETED"
},
"source": {
"db": "shop",
"table": "orders",
"ts_ms": 1719820800000,
"file": "mysql-bin.000042",
"pos": 1234567,
"gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:23",
"snapshot": "false"
},
"op": "u",
"ts_ms": 1719820800150
}
op values: c (create/INSERT), u (UPDATE), d (DELETE), r (READ, during initial snapshot). DELETE operations produce a tombstone message (key with null value) after the delete event, signaling Kafka's log compaction to remove the key.
The before field requires binlog_row_image=FULL (the MySQL default). With MINIMAL, UPDATE events only contain changed columns in before, which can make downstream consumers unable to reconstruct the full pre-change state.
SMT (Single Message Transform) Chain
SMTs are lightweight, stateless record-level transformations applied by the Connect framework in a pipeline. They execute between data reading (Source) or before data writing (Sink), eliminating the need for a separate processing layer for simple transformations.
{
"transforms": "unwrap,maskPII,routeToUserTopic",
"transforms.unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.maskPII.type":
"org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "email,phone,credit_card",
"transforms.maskPII.replacement": "***REDACTED***",
"transforms.routeToUserTopic.type":
"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeToUserTopic.regex": "production\\.(.+)\\.(.+)",
"transforms.routeToUserTopic.replacement": "$1-$2"
}
Common SMTs and their use cases:
ExtractNewRecordState(Debezium): flattens the{before, after, op}envelope to just theaftervalue, making downstream processing simplerMaskField: replaces field values with a fixed string — essential for PII compliance before data enters KafkaReplaceField: include, exclude, or rename fieldsTimestampConverter: convert timestamp fields between Unix epoch, ISO 8601, andjava.util.DateRegexRouter: rewrite topic names using regex — useful for routing CDC events to environment-specific topicsValueToKey: copy a field from the record value to the record key — controls Kafka partition routing
SMTs are intentionally limited to stateless, single-record operations. Do not use them for joins, aggregations, or any logic requiring state or multiple records. Those operations belong in Kafka Streams or ksqlDB.
Custom Source Connector: Complete REST API Example
// Connector
public class RestApiSourceConnector extends SourceConnector {
public static final String BASE_URL_CONFIG = "rest.api.base.url";
public static final String ENDPOINTS_CONFIG = "rest.api.endpoints";
public static final String POLL_MS_CONFIG = "poll.interval.ms";
public static final String TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
config = props;
if (props.get(BASE_URL_CONFIG) == null)
throw new ConfigException(BASE_URL_CONFIG + " is required");
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
String[] endpoints = config.get(ENDPOINTS_CONFIG).split(",");
int numTasks = Math.min(maxTasks, endpoints.length);
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(config);
List<String> assigned = new ArrayList<>();
for (int j = i; j < endpoints.length; j += numTasks)
assigned.add(endpoints[j].trim());
taskConfig.put(ENDPOINTS_CONFIG, String.join(",", assigned));
configs.add(taskConfig);
}
return configs;
}
@Override
public Class<? extends Task> taskClass() { return RestApiSourceTask.class; }
@Override
public ConfigDef config() {
return new ConfigDef()
.define(BASE_URL_CONFIG, Type.STRING, Importance.HIGH, "REST API base URL")
.define(ENDPOINTS_CONFIG, Type.STRING, Importance.HIGH,
"Comma-separated API endpoint paths")
.define(POLL_MS_CONFIG, Type.INT, 5000, Importance.MEDIUM,
"Poll interval in milliseconds")
.define(TOPIC_PREFIX_CONFIG, Type.STRING, "", Importance.LOW,
"Prefix for destination Kafka topic names");
}
@Override
public String version() { return "1.0.0"; }
@Override
public void stop() { }
}
// Task
public class RestApiSourceTask extends SourceTask {
private String baseUrl;
private List<String> endpoints;
private int pollIntervalMs;
private String topicPrefix;
private HttpClient httpClient;
private ObjectMapper mapper;
@Override
public void start(Map<String, String> props) {
baseUrl = props.get(RestApiSourceConnector.BASE_URL_CONFIG);
endpoints = Arrays.asList(
props.get(RestApiSourceConnector.ENDPOINTS_CONFIG).split(","));
pollIntervalMs = Integer.parseInt(
props.getOrDefault(RestApiSourceConnector.POLL_MS_CONFIG, "5000"));
topicPrefix = props.getOrDefault(
RestApiSourceConnector.TOPIC_PREFIX_CONFIG, "");
httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
mapper = new ObjectMapper();
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
for (String endpoint : endpoints) {
Map<String, String> partition = Map.of("endpoint", endpoint);
Map<String, Object> savedOffset = context.offsetStorageReader()
.offset(partition);
String cursor = savedOffset != null
? (String) savedOffset.get("cursor") : null;
String url = baseUrl + endpoint +
(cursor != null ? "?after=" + URLEncoder.encode(cursor,
StandardCharsets.UTF_8) : "");
try {
HttpRequest req = HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Accept", "application/json")
.timeout(Duration.ofSeconds(30))
.GET().build();
HttpResponse<String> resp = httpClient.send(req,
HttpResponse.BodyHandlers.ofString());
if (resp.statusCode() != 200) {
log.warn("HTTP {} from {}", resp.statusCode(), url);
continue;
}
JsonNode root = mapper.readTree(resp.body());
JsonNode items = root.get("items");
String nextCursor = root.path("next_cursor").asText(null);
for (JsonNode item : items) {
String id = item.get("id").asText();
Map<String, Object> newOffset = new HashMap<>();
newOffset.put("cursor", nextCursor != null ? nextCursor : id);
String topic = topicPrefix +
endpoint.replaceAll("^/", "").replace("/", "-");
records.add(new SourceRecord(
partition, newOffset,
topic,
null,
Schema.STRING_SCHEMA, id,
Schema.STRING_SCHEMA, item.toString()
));
}
} catch (IOException e) {
// Log and continue — do not throw, which would restart the task
// and lose progress on other endpoints in this task
log.error("Poll failed for {}: {}", endpoint, e.getMessage());
}
}
if (records.isEmpty()) Thread.sleep(pollIntervalMs);
return records;
}
@Override
public void stop() { }
@Override
public String version() { return "1.0.0"; }
}
Error Handling: Dead Letter Queue
By default, a Sink Connector stops its Task when a record cannot be written to the destination system. In production, this is almost always the wrong behavior — one malformed record should not halt processing of thousands of good records.
Configure the dead letter queue (DLQ) pattern:
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "connect-dlq-my-sink",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
With this configuration:
errors.tolerance=all: skip failed records instead of stopping the Task- Failed records are written to the DLQ topic with their original key and value
context.headers.enable=true: DLQ messages include error metadata in Kafka headers:connect.errors.topic,connect.errors.partition,connect.errors.offset,connect.errors.exception.class.name,connect.errors.exception.message, and the full stack trace- The error log output makes debugging possible without consuming the DLQ topic
The DLQ topic is a normal Kafka topic. You can consume it with any Kafka consumer to inspect and replay failed records after fixing the underlying issue.
Connector Monitoring via JMX
Connect exposes comprehensive JMX metrics for operational visibility:
# Source Task throughput and latency
kafka.connect:type=source-task-metrics,connector=<name>,task=<id>
source-record-poll-rate # Records polled from source per second
source-record-write-rate # Records written to Kafka per second
poll-batch-avg-time-ms # Average poll() execution time
source-record-active-count # Records polled but not yet committed
# Sink Task throughput and latency
kafka.connect:type=sink-task-metrics,connector=<name>,task=<id>
sink-record-read-rate # Records read from Kafka per second
sink-record-send-rate # Records successfully written to sink per second
put-batch-avg-time-ms # Average put() execution time
sink-record-lag-max # Max consumer lag across assigned partitions
# Worker health
kafka.connect:type=connect-worker-metrics
connector-count # Connectors on this Worker
task-count # Tasks on this Worker
connector-startup-failure-rate # Rate of failed connector starts
rebalance-avg-time-ms # Average rebalance duration
Alert on connector-failed-task-count > 0, sink-record-lag-max growing over time, and rebalance-avg-time-ms spikes (which indicate cluster instability). These three metrics cover the most common Connect production issues: task failures, consumer lag buildup, and rebalance storms.
Kafka Connect is among the most underappreciated components in the Kafka ecosystem. Before writing a custom connector, check Confluent Hub — it lists hundreds of community and commercially supported connectors. When you do need to write one, keep the responsibilities separate: the Connector validates configuration and splits work; the Task accesses data; the framework handles everything else.