Schema Registry and Data Contracts
What Happens Without Schema Registry
Consider a common scenario: the orders team decides to rename user_id to customer_id and change amount from an integer (cents) to a double (dollars). They update the Producer, deploy it, and go home. Downstream — the fraud service, the analytics service, the notification service, all deployed independently — immediately start throwing deserialization exceptions. On-call engineers scramble to identify the cause. Three services are down before anyone figures out that a schema change was deployed without coordination.
This is the cost of implicit contracts. Kafka messages are bytes. The Kafka broker does not know or care about message format. The format agreement lives only in code — Producer code and Consumer code — and when they diverge, Kafka has no mechanism to detect or prevent the failure.
Schema Registry makes message format an explicit, versioned, compatibility-constrained shared contract. Producers must register a schema before writing messages. Consumers retrieve the schema by ID when deserializing. Any schema change that violates the configured compatibility rules is rejected at registration time — before code is deployed, before any consumer can break.
Confluent Schema Registry Internal Architecture
Schema Registry is a standalone HTTP service. It stores all schemas in a Kafka topic named _schemas — there is no relational database, no embedded file system, no external state store:
_schemas topic (compacted, replication.factor=3)
key: subject name + schema version (string)
value: schema definition (JSON)
Log compaction on _schemas ensures the latest version of each subject is retained indefinitely, while older versions are never deleted (old consumers may still need them to deserialize historical messages). On startup, a Schema Registry instance replays _schemas to build its in-memory cache. If the instance crashes and restarts, it replays the topic again — the service is stateless with respect to its own process.
In a multi-instance Schema Registry cluster, instances use a Kafka-based leader election protocol. Only the Master (Leader) instance accepts write requests (schema registration). Follower instances forward write requests to the Master and serve read requests directly from their local cache. If the Master fails, a new leader is elected from the followers.
This design has an important consequence: Schema Registry's availability is coupled to Kafka's availability. If Kafka is down, Schema Registry can still serve cached reads but cannot register new schemas. For most use cases this is acceptable — schema registration is a deployment-time operation, not an on-the-critical-path operation.
Wire Format: The Physical Message Layout
Serializers that integrate with Schema Registry (KafkaAvroSerializer, KafkaProtobufSerializer, KafkaJsonSchemaSerializer) write messages in a specific format:
Byte 0: 0x00 (magic byte — identifies Schema Registry wire format)
Bytes 1-4: Schema ID (big-endian int32)
Bytes 5+: Serialized payload (Avro binary / Protobuf binary / JSON)
The Deserializer reads the 5-byte header, extracts the schema ID, fetches the schema from Schema Registry via GET /schemas/ids/{id}, and uses that schema to decode the remaining bytes. The schema ID is a globally unique integer assigned by the Registry (auto-incremented from 1), distinct from the subject version number (which is per-subject).
The critical design decision: the message does not contain the schema itself, only a 4-byte reference. Compare this to self-describing formats like JSON with embedded field names or Avro with embedded schemas — in high-throughput Kafka systems processing hundreds of thousands of messages per second, eliminating redundant schema bytes from every message has a measurable impact on network bandwidth and storage cost. A typical Avro message is 30-70% smaller than an equivalent JSON message.
Deserializers implement local schema caching. Each schema ID triggers exactly one network request to the Registry; subsequent messages using the same schema ID are resolved from the local cache with no network overhead.
Subject Naming Strategies
Schemas are organized in the Registry under subjects — namespaces that group schema versions together. A subject's history is the sequence of schema versions (v1, v2, v3...) registered under that name. The subject naming strategy determines how subject names are generated.
TopicNameStrategy (Default)
Subject name = <topic-name>-key or <topic-name>-value.
Topic: orders
Key schema subject: orders-key
Value schema subject: orders-value
Advantages: Simple and intuitive. Each topic has one pair of subjects. Every message in a topic uses the same schema.
Disadvantages: If the same Avro record type (OrderEvent) is used in multiple topics, each topic has an independent schema evolution history. Schema changes must be registered separately per topic, creating drift risk.
RecordNameStrategy
Subject name = the fully qualified Avro record name (namespace.RecordName).
Avro record: com.example.OrderEvent
Subject: com.example.OrderEvent
Advantages: Schema is decoupled from topic. The same record type shared across multiple topics has a single, unified evolution history.
Disadvantages: A single topic can contain multiple record types (each with a different schema ID), requiring consumers to dispatch on schema ID to select the correct deserialization path.
TopicRecordNameStrategy
Subject name = <topic>-<namespace.RecordName>. Combines per-topic isolation with per-record-type isolation. Appropriate when a single topic carries multiple message types (e.g., a CDC topic containing changes from multiple tables, each with its own record type).
Configure the naming strategy on both Producer and Consumer:
// Producer
props.put("value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
// Consumer — must match the Producer's strategy
props.put("value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
Four Compatibility Levels
The Schema Registry's core guarantee is enforcing compatibility rules at registration time. The compatibility level determines which schema changes are permitted.
BACKWARD (Default)
New schema can read data written by old schema. A Consumer using the new schema can process messages produced by the old schema.
Permitted changes:
- Delete a field (Consumer ignores the absent field — the new schema simply will not have that field's value)
- Add an optional field with a default value (old messages lack this field; the default value is used)
Prohibited changes:
- Add a required field with no default (old messages do not have the field; the new schema cannot satisfy its own required constraint when reading them)
- Change a field's type (e.g.,
int→string) - Rename a field (equivalent to deleting the old field and adding a new one)
Operational meaning: Deploy Consumers before Producers. In a rolling deployment, upgrade all Consumer instances first (they can read both old and new format), then deploy the Producer. There is no window during which a Consumer processes a message it cannot parse.
FORWARD
Old schema can read data written by new schema. A Consumer using the old schema can process messages produced by the new schema.
Permitted changes:
- Add fields (old Consumer ignores unknown fields)
- Delete a field that had a default value (new messages lack the field; old schema uses the default)
Operational meaning: Deploy Producers before Consumers. New Producers start writing new-format messages; old Consumers can still read them. Appropriate when Consumer upgrades are slow or decoupled.
FULL
Both BACKWARD and FORWARD simultaneously. The strictest level in practice.
Permitted changes:
- Add an optional field with a default value
- Delete an optional field that has a default value
This level is appropriate for high-stability schemas where any structural surprise must be prevented — financial data, compliance records, inter-team API contracts.
NONE
No compatibility checking. Any schema change is accepted. Appropriate only during initial development when you are iterating rapidly. Never use in production.
Configure compatibility per subject:
# Set compatibility for a specific subject
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
# Read current compatibility
curl http://schema-registry:8081/config/orders-value
# Test compatibility without registering (dry-run)
curl -X POST \
http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"OrderEvent\",...}"}'
# Response: {"is_compatible": true} or {"is_compatible": false, "messages": [...]}
Avro Schema Evolution Examples
Safe Changes
// Version 1: initial schema
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "int"},
{"name": "amount", "type": "double"},
{"name": "user_id", "type": "int"}
]
}
// Version 2: safe additions (BACKWARD compatible)
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "int"},
{"name": "amount", "type": "double"},
{"name": "user_id", "type": "int"},
// New optional field with default — old messages lack this field,
// Avro uses the default value when reading them
{"name": "currency", "type": "string", "default": "CNY"},
// Nullable field: union type [null, T] with default null
// Old messages without this field will have discount = null
{"name": "discount", "type": ["null", "double"], "default": null}
]
}
Breaking Changes (Rejected by BACKWARD Compatibility)
// Version 3: breaking changes — ALL of these would be rejected
// REJECTED: Adding a required field with no default
// Old messages don't have this field → Avro cannot satisfy the required constraint
{"name": "merchant_id", "type": "int"} // missing "default" key
// REJECTED: Changing a field type
// Old messages have amount as double → cannot parse as string
{"name": "amount", "type": "string"}
// REJECTED: Renaming a field (= delete old + add new)
// Old field "user_id" disappears → BACKWARD check fails
{"name": "customer_id", "type": "int"} // was "user_id"
// CORRECT way to rename: use Avro aliases
// The alias tells the Avro reader to map the old field name to the new one
{"name": "customer_id", "type": "int", "aliases": ["user_id"]} // safe
Complete Avro Producer and Consumer Example
// Full Avro schema as a string (alternatively, use generated classes
// from .avsc files via the Avro Maven Plugin)
String schemaJson = """
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "int"},
{"name": "amount", "type": "double"},
{"name": "user_id", "type": "int"},
{"name": "currency", "type": "string", "default": "CNY"},
{"name": "discount", "type": ["null", "double"], "default": null}
]
}
""";
// --- Producer ---
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
producerProps.put("schema.registry.url", "http://schema-registry:8081");
// In production, set auto.register.schemas=false
// Let CI/CD pipeline register schemas explicitly via the Maven plugin
// to enforce compatibility checks in the build process, not at runtime
producerProps.put("auto.register.schemas", false);
producerProps.put("use.latest.version", true);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Schema schema = new Schema.Parser().parse(schemaJson);
try (KafkaProducer<String, GenericRecord> producer =
new KafkaProducer<>(producerProps)) {
GenericRecord order = new GenericData.Record(schema);
order.put("id", 12345);
order.put("amount", 299.99d);
order.put("user_id", 67890);
order.put("currency", "CNY");
order.put("discount", null);
ProducerRecord<String, GenericRecord> record =
new ProducerRecord<>("orders", "12345", order);
// Synchronous send for simplicity; use callbacks or Futures in production
RecordMetadata meta = producer.send(record).get();
log.info("Sent to {}-{} @ offset {}",
meta.topic(), meta.partition(), meta.offset());
}
// --- Consumer ---
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-risk-service");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://schema-registry:8081");
// specific.avro.reader=true: deserialize into generated Java class (OrderEvent)
// specific.avro.reader=false (default): deserialize into GenericRecord
consumerProps.put("specific.avro.reader", true);
try (KafkaConsumer<String, OrderEvent> consumer =
new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(List.of("orders"));
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, OrderEvent> records =
consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, OrderEvent> record : records) {
OrderEvent event = record.value();
// event is a strongly-typed generated class
// null-safe: discount field is ["null", "double"]
Double discount = event.getDiscount();
processOrder(
event.getId(),
event.getAmount(),
event.getCurrency(),
discount
);
}
consumer.commitSync();
}
}
Data Contracts: Schema as an API Between Teams
Schema Registry is not just a technical component — it is a governance mechanism for inter-team communication. It transforms message format from an implicit agreement ("we all know the format because we read each other's code") into an explicit contract with versioning, compatibility rules, and automated enforcement.
The data contract pattern establishes clear ownership:
- The Producer team owns the schema. They define it, evolve it, and ensure changes follow compatibility rules.
- The Consumer teams declare a dependency on a specific schema version. They are notified (via CI/CD pipeline failures) if a proposed schema change would break their ability to read the data.
- The Schema Registry is the registry of record — the single source of truth for what the current and historical schemas are.
Schema Registry in CI/CD
The kafka-schema-registry-maven-plugin enables schema compatibility checks as a build-time gate:
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>7.6.0</version>
<configuration>
<schemaRegistryUrls>
<param>http://schema-registry.staging:8081</param>
</schemaRegistryUrls>
<!-- Map subject name to local schema file -->
<subjects>
<orders-value>src/main/avro/OrderEvent.avsc</orders-value>
<orders-key>src/main/avro/OrderKey.avsc</orders-key>
</subjects>
</configuration>
<executions>
<!-- Run compatibility check during verify (before merge) -->
<execution>
<id>check-compatibility</id>
<phase>verify</phase>
<goals>
<goal>test-compatibility</goal>
</goals>
</execution>
<!-- Register schemas only during actual deployment -->
<execution>
<id>register-schemas</id>
<phase>deploy</phase>
<goals>
<goal>register</goal>
</goals>
</execution>
</executions>
</plugin>
With this configuration:
- A developer modifies
OrderEvent.avsc(e.g., adds a required field without a default) - They open a pull request — the CI pipeline runs
mvn verify - The
test-compatibilitygoal callsPOST /compatibility/subjects/orders-value/versions/latestwith the new schema - Schema Registry returns
{"is_compatible": false}because the change violates BACKWARD compatibility mvn verifyfails — the PR cannot be merged- The developer must fix the schema change (add a default value) before the PR can land
This shifts the compatibility failure from production runtime (Consumer crashes at 3am) to development time (PR build fails in minutes). The cost of fixing a breaking schema change drops from an incident to a code review comment.
Protobuf and JSON Schema Support
Schema Registry supports three schema formats, each with distinct trade-offs:
Apache Avro: The most compact binary format. Schema is registered in the Registry and referenced by ID — not embedded in messages. Avro's schema evolution model is based on field name matching with aliases for renames, and union types for optional fields. Most performant for high-throughput Kafka pipelines.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
Protocol Buffers: Comparable binary size to Avro. Schema evolution is based on field numbers rather than field names — adding new fields with new numbers and deprecating (but not reusing) old field numbers is inherently backward and forward compatible. Protobuf's broader language support (C++, Go, Python, Rust, Swift) makes it preferable in polyglot environments.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaProtobufSerializer.class);
JSON Schema: Human-readable JSON with structural constraints defined by JSON Schema Draft 7. Appropriate when consumers are non-JVM services (Node.js, Python) that prefer JSON, or when human readability of raw Kafka messages is a priority. The trade-off is message size: JSON is typically 3-5x larger than equivalent Avro or Protobuf, which is significant at high throughput.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaJsonSchemaSerializer.class);
Format selection guidance: choose Avro for pure JVM ecosystems and maximum compactness; choose Protobuf for polyglot environments or when generated code quality matters; choose JSON Schema when message readability or non-JVM consumer interoperability is a priority. In all three cases, Schema Registry provides the same subject-based versioning and compatibility enforcement.
At scale — hundreds of thousands of messages per second — the difference between Avro and JSON is not just storage and bandwidth. Avro deserialization is significantly faster than JSON parsing, which matters when Consumer throughput is the bottleneck. Teams that start with JSON often migrate to Avro when they hit throughput ceilings. Starting with Avro and Schema Registry from the beginning avoids that migration cost entirely.
Schema Registry should be treated as a required infrastructure component, not an optional enhancement. Without it, schema changes in a Kafka-based system are a recurring source of production incidents. With it, schema changes become an engineered process with automated safety checks, clear ownership, and predictable consequences. The operational investment in running Schema Registry — which is modest, given its stateless architecture — is paid back the first time a breaking schema change is caught in CI before it reaches production.