Security: Authentication, Authorization, Encryption and Audit
The Four Dimensions of Kafka Security
Kafka's security model consists of four independent but interconnected dimensions:
- Authentication: Verifying the identity of connecting parties โ "Who are you?"
- Authorization: Controlling what authenticated identities can do โ "What are you allowed to do?"
- Encryption: Protecting data in transit โ "Ensure data cannot be intercepted or tampered with"
- Audit: Recording what happened โ "Who did what, and when?"
All four dimensions are required. Authentication without authorization means any valid identity can read and write any topic. Encryption without authentication means an attacker can connect with any claimed identity. Access control without audit means security incidents cannot be investigated after the fact.
This chapter covers each dimension in deployment order of complexity, providing the technical rationale and exact configuration for each.
SASL Authentication Mechanisms
SASL (Simple Authentication and Security Layer) is the authentication framework Kafka uses. It supports five mechanisms, ranging from trivially simple to enterprise-grade complex.
PLAIN: Simple but Risky Without TLS
PLAIN transmits credentials as Base64-encoded cleartext. It provides no cryptographic protection of the password itself.
# server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9093
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
# kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_producer="producer-secret"
user_consumer="consumer-secret";
};
Critical risk: Passwords are transmitted in cleartext over the wire. PLAIN must be combined with TLS (use SASL_SSL listeners, not SASL_PLAINTEXT) or an eavesdropper on the network can capture credentials directly. Additionally, all credentials are stored in the broker configuration file โ dynamic user management is impossible without a broker restart.
PLAIN is only acceptable as a temporary measure during initial setup or in isolated development environments with no sensitive data.
SCRAM-SHA-256 / SCRAM-SHA-512: The Production Default
SCRAM (Salted Challenge Response Authentication Mechanism) is a substantial security improvement over PLAIN:
- Salted hash โ the server never sees the plaintext password
- Challenge-response protocol โ prevents credential replay attacks
- Credentials stored in ZooKeeper/KRaft โ supports online user management without broker restarts
# Create a user (writes credentials to ZooKeeper/KRaft, no broker restart needed)
kafka-configs.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=producer-secret]' \
--entity-type users \
--entity-name kafka-producer
kafka-configs.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--alter \
--add-config 'SCRAM-SHA-512=[iterations=8192,password=consumer-secret]' \
--entity-type users \
--entity-name kafka-consumer
# List all users
kafka-configs.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--describe \
--entity-type users
# Delete a user (immediately takes effect โ no restart)
kafka-configs.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--alter \
--delete-config 'SCRAM-SHA-256' \
--entity-type users \
--entity-name decommissioned-service
Broker configuration:
# server.properties
listeners=SASL_SSL://0.0.0.0:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
Client JAAS configuration:
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"kafka-producer\" " +
"password=\"producer-secret\";");
GSSAPI/Kerberos: Enterprise SSO Integration
Kerberos is the established centralized authentication system in enterprise environments (Active Directory, MIT Kerberos). If your organization already operates a Kerberos infrastructure, Kafka integrates natively through the GSSAPI SASL mechanism.
# server.properties
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
# kafka_server_jaas.conf
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/[email protected]";
};
Kerberos operational challenges:
- Complexity: Requires maintaining a KDC, properly managing keytabs and principals for every broker and client
- Clock synchronization: All nodes must have clock skew under 5 minutes (default Kerberos ticket lifetime requirement)
- Debugging difficulty: Kerberos error messages are notoriously cryptic; diagnosis requires specialist knowledge
Kerberos is the right choice when your organization mandates SSO through Active Directory or an existing Kerberos realm. For greenfield deployments, SCRAM or OAUTHBEARER is typically easier to operate.
OAUTHBEARER: Cloud-Native JWT Authentication (Kafka 2.0+)
OAUTHBEARER allows Kafka clients to authenticate using JWT (JSON Web Token) access tokens, integrating with standard OAuth 2.0 / OIDC providers such as Keycloak, Okta, and Auth0.
# server.properties โ Kafka 3.x built-in JWT validation
sasl.enabled.mechanisms=OAUTHBEARER
sasl.oauthbearer.jwks.endpoint.url=https://auth.company.com/.well-known/jwks.json
sasl.oauthbearer.expected.audience=kafka-cluster
sasl.oauthbearer.sub.claim.name=client_id
Client configuration:
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.oauthbearer.token.endpoint.url",
"https://auth.company.com/oauth/token");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
"clientId='kafka-producer-service' " +
"clientSecret='client-secret' " +
"scope='kafka:write';");
OAUTHBEARER advantages:
- Short-lived tokens: Typical access tokens expire in 1 hour. A compromised token has a bounded blast radius.
- Unified identity management: Integrates with your organization's existing IAM system โ one place to manage all service identities
- No broker-side credential storage: Credentials live in the OAuth provider, not in ZooKeeper or broker config files
Delegation Token: Lightweight Short-Term Credentials
When hundreds of microservices need to authenticate, managing individual SCRAM passwords or Kerberos keytabs for each becomes operationally burdensome. Delegation Tokens solve this by allowing a service to authenticate once with its master credential, then obtain short-lived tokens that can be distributed to workers or sub-processes.
# Generate a delegation token (authenticated with master credential)
kafka-delegation-tokens.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--create \
--max-life-time-period 86400000 # 24 hours in milliseconds
# Renew an expiring token
kafka-delegation-tokens.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--renew \
--renew-time-period 3600000 \ # Extend by 1 hour
--hmac <token-hmac>
# List tokens (users can only see their own tokens unless they are super users)
kafka-delegation-tokens.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--describe
# Expire a token immediately (revocation)
kafka-delegation-tokens.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--expire \
--expiry-time-period -1 \
--hmac <token-hmac>
SSL/TLS: Transport Encryption
Certificate Infrastructure
Each Kafka broker requires its own certificate. All brokers and clients share a common trust anchor (CA certificate). The following is the complete certificate generation workflow for a self-hosted CA:
# Step 1: Generate CA key and self-signed certificate
# (Replace with your organization's internal CA in production)
openssl req -new -x509 \
-keyout ca-key.pem \
-out ca-cert.pem \
-days 3650 \
-subj "/CN=Kafka-Internal-CA/O=Company/C=US" \
-passout pass:ca-password
# Step 2: Create a keystore for each broker
keytool -keystore kafka-broker-1.keystore.jks \
-alias kafka-broker-1 \
-keyalg RSA \
-keysize 2048 \
-validity 365 \
-genkey \
-dname "CN=kafka-broker-1.internal,O=Company,C=US" \
-storepass keystore-password \
-keypass key-password
# Step 3: Generate CSR and sign with CA
keytool -keystore kafka-broker-1.keystore.jks \
-alias kafka-broker-1 \
-certreq \
-file broker-1.csr \
-storepass keystore-password
openssl x509 -req \
-CA ca-cert.pem \
-CAkey ca-key.pem \
-in broker-1.csr \
-out broker-1-signed.pem \
-days 365 \
-CAcreateserial \
-passin pass:ca-password
# Step 4: Import CA cert and signed cert into keystore
keytool -keystore kafka-broker-1.keystore.jks \
-alias CARoot \
-import -file ca-cert.pem \
-storepass keystore-password -noprompt
keytool -keystore kafka-broker-1.keystore.jks \
-alias kafka-broker-1 \
-import -file broker-1-signed.pem \
-storepass keystore-password
# Step 5: Create a shared truststore with the CA certificate
keytool -keystore kafka.truststore.jks \
-alias CARoot \
-import -file ca-cert.pem \
-storepass truststore-password -noprompt
Mutual TLS (mTLS) Configuration
mTLS requires clients to present their own certificates. The broker validates both the server certificate (standard TLS) and the client certificate (mutual TLS). This is the strongest transport-layer security configuration.
# server.properties โ full mTLS configuration
ssl.keystore.location=/etc/kafka/ssl/kafka-broker-1.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-password
# REQUIRED = enforce client certificate validation (enables mTLS)
# REQUESTED = ask for client cert but allow connection without it
# NONE = standard one-way TLS only
ssl.client.auth=required
# Hostname verification โ prevents man-in-the-middle attacks
# Must be set to "HTTPS" in production (not empty string)
ssl.endpoint.identification.algorithm=HTTPS
# Restrict TLS versions โ disable TLS 1.0 and TLS 1.1
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.protocol=TLSv1.3
# Strong cipher suites (TLS 1.3 automatically uses the strongest available)
ssl.cipher.suites=TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_GCM_SHA256
TLS Performance Impact and Optimization
TLS encryption introduces measurable overhead:
- Throughput reduction of 20-30%: Primarily from AES-GCM encryption CPU cost. Modern CPUs with AES-NI hardware instructions significantly reduce this overhead.
- Latency increase of 2-3ms on connection establishment: TLS handshake requires additional round trips.
Optimization strategies:
1. Use TLS 1.3: Compared to TLS 1.2, TLS 1.3 reduces handshake to 1-RTT (versus 2-RTT), eliminates legacy cipher suites that require more CPU, and supports 0-RTT session resumption.
2. Verify AES-NI hardware acceleration:
# Check for AES-NI support
grep -o 'aes' /proc/cpuinfo | head -1
# Output "aes" confirms hardware AES acceleration is available
# Verify Java is using AES-NI (look for AES hardware provider in JVM)
java -XX:+PrintFlagsFinal -version 2>&1 | grep UseAESIntrinsics
3. Minimize connection churn: Each new TLS connection requires a full handshake. Kafka clients maintain persistent connections by default โ ensure application code does not create and destroy Producer/Consumer instances on each request.
# Verify TLS version and cipher being negotiated
openssl s_client \
-connect kafka-broker-1:9092 \
-tls1_3 \
-cert client-cert.pem \
-key client-key.pem \
-CAfile ca-cert.pem \
2>&1 | grep -E "(Protocol|Cipher|Verify)"
ACL-Based Authorization
kafka-acls.sh Operations Reference
Kafka ACLs define which Principal (authenticated identity) can perform which Operation on which Resource. Understanding the resource and operation model is essential before writing any ACL rules.
Resource types: Topic, Group (Consumer Group), Cluster, TransactionalId, DelegationToken
Operation types: Read, Write, Create, Describe, Alter, Delete, DescribeConfigs, AlterConfigs, ClusterAction, All
# Grant a producer service write access to a specific topic
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--add \
--allow-principal User:order-service \
--operation Write \
--operation Describe \
--topic orders
# Grant a consumer group read access
# Note: BOTH a topic Read ACL AND a group Read ACL are required
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--add \
--allow-principal User:kafka-consumer \
--operation Read \
--operation Describe \
--topic orders
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--add \
--allow-principal User:kafka-consumer \
--operation Read \
--group payment-consumer-group
# Prefix-based wildcard ACL (grant access to all topics starting with "orders-")
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--add \
--allow-principal User:kafka-consumer \
--operation Read \
--topic 'orders-' \
--resource-pattern-type prefixed
# Grant a transactional producer permission to use its transactional ID
# (required for exactly-once producers)
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--add \
--allow-principal User:order-service \
--operation Write \
--operation Describe \
--transactional-id order-producer-txn
# List ACLs for a specific topic
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--list \
--topic orders
# List all ACLs in the cluster
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--list
# Revoke a permission
kafka-acls.sh \
--bootstrap-server kafka:9092 \
--command-config admin-client.properties \
--remove \
--allow-principal User:decommissioned-service \
--operation Write \
--topic orders
Enabling ACLs Safely: Super Users First
ACLs are disabled by default (allow.everyone.if.no.acl.found=true). Before enabling ACL enforcement, you must configure at least one super user โ otherwise, enabling ACLs will lock you out of your own cluster.
# server.properties
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# Super users bypass all ACL checks โ use for administrative operations only
super.users=User:kafka-admin;User:monitoring-service
# Deny all operations on resources that have no ACL (production setting)
# Leaving this at 'true' means ACLs have no effect โ every unauthenticated
# principal can still read and write everything
allow.everyone.if.no.acl.found=false
A staged rollout approach:
- Deploy SCRAM credentials for all services
- Configure
allow.everyone.if.no.acl.found=true(ACLs present but not enforced) - Write all required ACLs
- Verify services can still operate via test deploys
- Switch
allow.everyone.if.no.acl.found=false - Monitor authorization logger for unexpected denials
Multi-Listener Architecture: Network Segmentation
Separate connection types into distinct listeners with different security policies:
# server.properties โ production multi-listener configuration
listeners=\
INTERNAL://0.0.0.0:9092,\
EXTERNAL://0.0.0.0:9093,\
REPLICATION://0.0.0.0:9094,\
ADMIN://0.0.0.0:9095
listener.security.protocol.map=\
INTERNAL:SASL_SSL,\
EXTERNAL:SASL_SSL,\
REPLICATION:SSL,\
ADMIN:SASL_SSL
# Internal microservices use SCRAM-SHA-256
listener.name.internal.sasl.enabled.mechanisms=SCRAM-SHA-256
# External clients (mobile apps, third-party integrations) use OAUTHBEARER
listener.name.external.sasl.enabled.mechanisms=OAUTHBEARER
# Broker-to-broker replication: mTLS only (no SASL overhead)
security.inter.broker.protocol=SSL
# Administrative CLI tools use the stronger SCRAM-SHA-512
listener.name.admin.sasl.enabled.mechanisms=SCRAM-SHA-512
# Advertised addresses for clients to connect to
advertised.listeners=\
INTERNAL://kafka-broker-1.internal:9092,\
EXTERNAL://kafka.company.com:9093,\
REPLICATION://kafka-broker-1.internal:9094,\
ADMIN://kafka-broker-1.internal:9095
Corresponding firewall rules:
- Port
9092(INTERNAL): Allow only internal VPC CIDR - Port
9093(EXTERNAL): Allow specific external IP ranges - Port
9094(REPLICATION): Allow only broker node IPs - Port
9095(ADMIN): Allow only operations bastion host IPs
Audit Logging: Security Event Trail
Audit logging is a baseline compliance requirement. Every authentication event and every authorization decision must be recorded.
# log4j.properties โ configure dedicated audit appenders
# Authorization audit (every ACL allow/deny decision)
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.File=/var/log/kafka/kafka-authorizer.log
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
Sample audit log entries:
# Authorization denied
[2024-01-15 10:35:01,200] INFO Principal = User:anonymous is Denied
Operation = Write from host = 10.0.0.100 on resource = Topic:LITERAL:orders
for request = Produce with resourceRefCount = 1 (kafka.authorizer.logger)
# Authorization allowed
[2024-01-15 10:35:05,300] INFO Principal = User:order-service is Allowed
Operation = Write from host = 10.0.0.50 on resource = Topic:LITERAL:orders
for request = Produce with resourceRefCount = 1 (kafka.authorizer.logger)
Ship audit logs to a SIEM system using Filebeat:
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/kafka/kafka-authorizer.log
fields:
service: kafka
event_type: authorization
environment: production
fields_under_root: true
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "kafka-audit-%{+yyyy.MM.dd}"
pipeline: "kafka-audit-enrichment"
Certificate Rotation Automation
Manual SSL certificate management is one of the most common causes of production outages (see Chapter 32, incident 8). Certificate expiry that brings down a production cluster is entirely preventable with automation.
On Kubernetes: cert-manager
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: kafka-broker-cert
namespace: kafka
spec:
secretName: kafka-broker-tls
duration: 8760h # 365 days
renewBefore: 720h # Begin renewal 30 days before expiry
commonName: kafka-broker.kafka.svc.cluster.local
dnsNames:
- kafka-broker
- kafka-broker.kafka
- kafka-broker.kafka.svc
- kafka-broker.kafka.svc.cluster.local
issuerRef:
name: internal-ca-issuer
kind: ClusterIssuer
On bare metal / VMs: HashiCorp Vault PKI
# Configure Vault PKI secrets engine
vault secrets enable pki
vault secrets tune -max-lease-ttl=87600h pki
# Create a Kafka broker role
vault write pki/roles/kafka-broker \
allowed_domains="internal" \
allow_subdomains=true \
max_ttl="8760h" \
key_type="rsa" \
key_bits=2048
# Issue a certificate (call this in broker startup script)
vault write -format=json pki/issue/kafka-broker \
common_name="kafka-broker-1.internal" \
ttl="8760h" \
> /tmp/broker-cert.json
# Extract cert and key from Vault response
cat /tmp/broker-cert.json | jq -r '.data.certificate' > /etc/kafka/ssl/broker.pem
cat /tmp/broker-cert.json | jq -r '.data.private_key' > /etc/kafka/ssl/broker-key.pem
Prometheus alert for certificate expiry:
- alert: KafkaBrokerCertExpiringIn30Days
expr: |
(ssl_certificate_expiry_seconds{job="kafka-brokers"} - time()) / 86400 < 30
for: 1h
labels:
severity: warning
annotations:
summary: "Kafka broker SSL certificate expiring on {{ $labels.instance }}"
description: |
Certificate expires in {{ $value | humanizeDuration }}.
Initiate certificate rotation immediately to avoid a production incident.
- alert: KafkaBrokerCertExpiringIn7Days
expr: |
(ssl_certificate_expiry_seconds{job="kafka-brokers"} - time()) / 86400 < 7
for: 1h
labels:
severity: critical
page: "true"
annotations:
summary: "CRITICAL: Kafka broker SSL certificate expiring in less than 7 days"
Production Security Checklist (25 Items)
| # | Check | Priority | How to Verify |
|---|---|---|---|
| 1 | All listeners use SASL_SSL; PLAINTEXT disabled | P0 | kafka-broker-api-versions.sh --bootstrap-server host:9092 |
| 2 | ssl.client.auth=required โ mTLS enforced |
P0 | Check server.properties |
| 3 | TLS 1.2 and 1.3 only; TLS 1.0/1.1 disabled | P0 | openssl s_client -connect host:9092 |
| 4 | ACLs enabled; allow.everyone.if.no.acl.found=false |
P0 | Check server.properties |
| 5 | Super users configured; known only to administrators | P0 | Check server.properties; audit access |
| 6 | Audit log enabled and shipped to SIEM | P0 | Check log4j.properties and Filebeat |
| 7 | Certificate expiry monitoring and auto-renewal active | P0 | Check Prometheus alerts |
| 8 | Broker-to-broker replication uses dedicated listener | P1 | Check listeners config |
| 9 | ZooKeeper/KRaft authentication enabled | P1 | Check zoo.cfg / KRaft config |
| 10 | Producers/consumers follow least-privilege ACL model | P1 | kafka-acls.sh --list |
| 11 | Transactional producers have TransactionalId ACLs | P1 | Check ACLs |
| 12 | Different CAs for dev, staging, and production | P1 | Verify certificate issuer |
| 13 | Keystore/key files have 600 permissions | P1 | ls -la /etc/kafka/ssl/ |
| 14 | Keystore passwords stored in a secrets manager | P1 | Check secrets source in startup scripts |
| 15 | JMX port is not exposed or is TLS-protected | P1 | netstat -tlnp | grep 9999 |
| 16 | Clients enable hostname verification | P2 | Check ssl.endpoint.identification.algorithm |
| 17 | Using SCRAM-SHA-512 over SCRAM-SHA-256 where possible | P2 | Check sasl.mechanism config |
| 18 | Delegation Token max lifetime is reasonable (โค 24h) | P2 | Check token policy |
| 19 | Cluster Describe ACL restricted to authorized admin tools | P2 | Check Cluster ACLs |
| 20 | Log files do not contain plaintext credentials | P2 | grep -rn 'password' /var/log/kafka/ |
| 21 | Network firewall allows only required ports | P2 | nmap -p 9092-9095 kafka-broker |
| 22 | Kafka Connect workers use a dedicated service account | P2 | Check Connect worker config |
| 23 | Schema Registry authentication enabled | P2 | Check Schema Registry config |
| 24 | ACL review scheduled quarterly (revoke unused permissions) | P3 | Calendar + kafka-acls.sh --list |
| 25 | Annual penetration test and security audit | P3 | External security assessment schedule |
Summary: Defense in Depth
There is no single silver bullet in Kafka security. Production-grade security requires defense in depth โ multiple independent security layers, each of which limits the blast radius when another layer is compromised or misconfigured:
- Network layer: Firewall restricts access to only required ports; internal and external listeners are separated
- Transport layer: TLS 1.3 encryption for all connections; mTLS ensures both sides present valid certificates
- Authentication layer: SCRAM or OAUTHBEARER โ never PLAIN without TLS; credentials managed in a secrets system
- Authorization layer: Least-privilege ACLs โ each service has exactly the permissions it needs, nothing more
- Audit layer: Every authentication and authorization event recorded, centrally stored, tamper-evident
When one layer fails, the others limit the damage. This is the architecture that earns sleeping through the night.