Chapter 30

Security: Authentication, Authorization, Encryption and Audit

The Four Dimensions of Kafka Security

Kafka's security model consists of four independent but interconnected dimensions:

  1. Authentication: Verifying the identity of connecting parties — "Who are you?"
  2. Authorization: Controlling what authenticated identities can do — "What are you allowed to do?"
  3. Encryption: Protecting data in transit — "Ensure data cannot be intercepted or tampered with"
  4. Audit: Recording what happened — "Who did what, and when?"

All four dimensions are required. Authentication without authorization means any valid identity can read and write any topic. Encryption without authentication means an attacker can connect with any claimed identity. Access control without audit means security incidents cannot be investigated after the fact.

This chapter covers each dimension in deployment order of complexity, providing the technical rationale and exact configuration for each.

SASL Authentication Mechanisms

SASL (Simple Authentication and Security Layer) is the authentication framework Kafka uses. It supports five mechanisms, ranging from trivially simple to enterprise-grade complex.

PLAIN: Simple but Risky Without TLS

PLAIN transmits credentials as Base64-encoded cleartext. It provides no cryptographic protection of the password itself.

# server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9093
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
# kafka_server_jaas.conf
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_producer="producer-secret"
  user_consumer="consumer-secret";
};

Critical risk: Passwords are transmitted in cleartext over the wire. PLAIN must be combined with TLS (use SASL_SSL listeners, not SASL_PLAINTEXT) or an eavesdropper on the network can capture credentials directly. Additionally, all credentials are stored in the broker configuration file — dynamic user management is impossible without a broker restart.

PLAIN is only acceptable as a temporary measure during initial setup or in isolated development environments with no sensitive data.

SCRAM-SHA-256 / SCRAM-SHA-512: The Production Default

SCRAM (Salted Challenge Response Authentication Mechanism) is a substantial security improvement over PLAIN:

# Create a user (writes credentials to ZooKeeper/KRaft, no broker restart needed)
kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --alter \
  --add-config 'SCRAM-SHA-256=[iterations=8192,password=producer-secret]' \
  --entity-type users \
  --entity-name kafka-producer

kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --alter \
  --add-config 'SCRAM-SHA-512=[iterations=8192,password=consumer-secret]' \
  --entity-type users \
  --entity-name kafka-consumer

# List all users
kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --describe \
  --entity-type users

# Delete a user (immediately takes effect — no restart)
kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --alter \
  --delete-config 'SCRAM-SHA-256' \
  --entity-type users \
  --entity-name decommissioned-service

Broker configuration:

# server.properties
listeners=SASL_SSL://0.0.0.0:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password

Client JAAS configuration:

props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"kafka-producer\" " +
    "password=\"producer-secret\";");

GSSAPI/Kerberos: Enterprise SSO Integration

Kerberos is the established centralized authentication system in enterprise environments (Active Directory, MIT Kerberos). If your organization already operates a Kerberos infrastructure, Kafka integrates natively through the GSSAPI SASL mechanism.

# server.properties
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
# kafka_server_jaas.conf
KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/etc/security/keytabs/kafka.service.keytab"
  principal="kafka/[email protected]";
};

Kerberos operational challenges:

Kerberos is the right choice when your organization mandates SSO through Active Directory or an existing Kerberos realm. For greenfield deployments, SCRAM or OAUTHBEARER is typically easier to operate.

OAUTHBEARER: Cloud-Native JWT Authentication (Kafka 2.0+)

OAUTHBEARER allows Kafka clients to authenticate using JWT (JSON Web Token) access tokens, integrating with standard OAuth 2.0 / OIDC providers such as Keycloak, Okta, and Auth0.

# server.properties — Kafka 3.x built-in JWT validation
sasl.enabled.mechanisms=OAUTHBEARER
sasl.oauthbearer.jwks.endpoint.url=https://auth.company.com/.well-known/jwks.json
sasl.oauthbearer.expected.audience=kafka-cluster
sasl.oauthbearer.sub.claim.name=client_id

Client configuration:

props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.oauthbearer.token.endpoint.url",
    "https://auth.company.com/oauth/token");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
    "clientId='kafka-producer-service' " +
    "clientSecret='client-secret' " +
    "scope='kafka:write';");

OAUTHBEARER advantages:

Delegation Token: Lightweight Short-Term Credentials

When hundreds of microservices need to authenticate, managing individual SCRAM passwords or Kerberos keytabs for each becomes operationally burdensome. Delegation Tokens solve this by allowing a service to authenticate once with its master credential, then obtain short-lived tokens that can be distributed to workers or sub-processes.

# Generate a delegation token (authenticated with master credential)
kafka-delegation-tokens.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --create \
  --max-life-time-period 86400000    # 24 hours in milliseconds

# Renew an expiring token
kafka-delegation-tokens.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --renew \
  --renew-time-period 3600000 \      # Extend by 1 hour
  --hmac <token-hmac>

# List tokens (users can only see their own tokens unless they are super users)
kafka-delegation-tokens.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --describe

# Expire a token immediately (revocation)
kafka-delegation-tokens.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --expire \
  --expiry-time-period -1 \
  --hmac <token-hmac>

SSL/TLS: Transport Encryption

Certificate Infrastructure

Each Kafka broker requires its own certificate. All brokers and clients share a common trust anchor (CA certificate). The following is the complete certificate generation workflow for a self-hosted CA:

# Step 1: Generate CA key and self-signed certificate
# (Replace with your organization's internal CA in production)
openssl req -new -x509 \
  -keyout ca-key.pem \
  -out ca-cert.pem \
  -days 3650 \
  -subj "/CN=Kafka-Internal-CA/O=Company/C=US" \
  -passout pass:ca-password

# Step 2: Create a keystore for each broker
keytool -keystore kafka-broker-1.keystore.jks \
  -alias kafka-broker-1 \
  -keyalg RSA \
  -keysize 2048 \
  -validity 365 \
  -genkey \
  -dname "CN=kafka-broker-1.internal,O=Company,C=US" \
  -storepass keystore-password \
  -keypass key-password

# Step 3: Generate CSR and sign with CA
keytool -keystore kafka-broker-1.keystore.jks \
  -alias kafka-broker-1 \
  -certreq \
  -file broker-1.csr \
  -storepass keystore-password

openssl x509 -req \
  -CA ca-cert.pem \
  -CAkey ca-key.pem \
  -in broker-1.csr \
  -out broker-1-signed.pem \
  -days 365 \
  -CAcreateserial \
  -passin pass:ca-password

# Step 4: Import CA cert and signed cert into keystore
keytool -keystore kafka-broker-1.keystore.jks \
  -alias CARoot \
  -import -file ca-cert.pem \
  -storepass keystore-password -noprompt

keytool -keystore kafka-broker-1.keystore.jks \
  -alias kafka-broker-1 \
  -import -file broker-1-signed.pem \
  -storepass keystore-password

# Step 5: Create a shared truststore with the CA certificate
keytool -keystore kafka.truststore.jks \
  -alias CARoot \
  -import -file ca-cert.pem \
  -storepass truststore-password -noprompt

Mutual TLS (mTLS) Configuration

mTLS requires clients to present their own certificates. The broker validates both the server certificate (standard TLS) and the client certificate (mutual TLS). This is the strongest transport-layer security configuration.

# server.properties — full mTLS configuration
ssl.keystore.location=/etc/kafka/ssl/kafka-broker-1.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-password

# REQUIRED = enforce client certificate validation (enables mTLS)
# REQUESTED = ask for client cert but allow connection without it
# NONE = standard one-way TLS only
ssl.client.auth=required

# Hostname verification — prevents man-in-the-middle attacks
# Must be set to "HTTPS" in production (not empty string)
ssl.endpoint.identification.algorithm=HTTPS

# Restrict TLS versions — disable TLS 1.0 and TLS 1.1
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.protocol=TLSv1.3

# Strong cipher suites (TLS 1.3 automatically uses the strongest available)
ssl.cipher.suites=TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_GCM_SHA256

TLS Performance Impact and Optimization

TLS encryption introduces measurable overhead:

Optimization strategies:

1. Use TLS 1.3: Compared to TLS 1.2, TLS 1.3 reduces handshake to 1-RTT (versus 2-RTT), eliminates legacy cipher suites that require more CPU, and supports 0-RTT session resumption.

2. Verify AES-NI hardware acceleration:

# Check for AES-NI support
grep -o 'aes' /proc/cpuinfo | head -1
# Output "aes" confirms hardware AES acceleration is available

# Verify Java is using AES-NI (look for AES hardware provider in JVM)
java -XX:+PrintFlagsFinal -version 2>&1 | grep UseAESIntrinsics

3. Minimize connection churn: Each new TLS connection requires a full handshake. Kafka clients maintain persistent connections by default — ensure application code does not create and destroy Producer/Consumer instances on each request.

# Verify TLS version and cipher being negotiated
openssl s_client \
  -connect kafka-broker-1:9092 \
  -tls1_3 \
  -cert client-cert.pem \
  -key client-key.pem \
  -CAfile ca-cert.pem \
  2>&1 | grep -E "(Protocol|Cipher|Verify)"

ACL-Based Authorization

kafka-acls.sh Operations Reference

Kafka ACLs define which Principal (authenticated identity) can perform which Operation on which Resource. Understanding the resource and operation model is essential before writing any ACL rules.

Resource types: Topic, Group (Consumer Group), Cluster, TransactionalId, DelegationToken

Operation types: Read, Write, Create, Describe, Alter, Delete, DescribeConfigs, AlterConfigs, ClusterAction, All

# Grant a producer service write access to a specific topic
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:order-service \
  --operation Write \
  --operation Describe \
  --topic orders

# Grant a consumer group read access
# Note: BOTH a topic Read ACL AND a group Read ACL are required
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:kafka-consumer \
  --operation Read \
  --operation Describe \
  --topic orders

kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:kafka-consumer \
  --operation Read \
  --group payment-consumer-group

# Prefix-based wildcard ACL (grant access to all topics starting with "orders-")
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:kafka-consumer \
  --operation Read \
  --topic 'orders-' \
  --resource-pattern-type prefixed

# Grant a transactional producer permission to use its transactional ID
# (required for exactly-once producers)
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:order-service \
  --operation Write \
  --operation Describe \
  --transactional-id order-producer-txn

# List ACLs for a specific topic
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --list \
  --topic orders

# List all ACLs in the cluster
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --list

# Revoke a permission
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --remove \
  --allow-principal User:decommissioned-service \
  --operation Write \
  --topic orders

Enabling ACLs Safely: Super Users First

ACLs are disabled by default (allow.everyone.if.no.acl.found=true). Before enabling ACL enforcement, you must configure at least one super user — otherwise, enabling ACLs will lock you out of your own cluster.

# server.properties
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

# Super users bypass all ACL checks — use for administrative operations only
super.users=User:kafka-admin;User:monitoring-service

# Deny all operations on resources that have no ACL (production setting)
# Leaving this at 'true' means ACLs have no effect — every unauthenticated
# principal can still read and write everything
allow.everyone.if.no.acl.found=false

A staged rollout approach:

  1. Deploy SCRAM credentials for all services
  2. Configure allow.everyone.if.no.acl.found=true (ACLs present but not enforced)
  3. Write all required ACLs
  4. Verify services can still operate via test deploys
  5. Switch allow.everyone.if.no.acl.found=false
  6. Monitor authorization logger for unexpected denials

Multi-Listener Architecture: Network Segmentation

Separate connection types into distinct listeners with different security policies:

# server.properties — production multi-listener configuration
listeners=\
  INTERNAL://0.0.0.0:9092,\
  EXTERNAL://0.0.0.0:9093,\
  REPLICATION://0.0.0.0:9094,\
  ADMIN://0.0.0.0:9095

listener.security.protocol.map=\
  INTERNAL:SASL_SSL,\
  EXTERNAL:SASL_SSL,\
  REPLICATION:SSL,\
  ADMIN:SASL_SSL

# Internal microservices use SCRAM-SHA-256
listener.name.internal.sasl.enabled.mechanisms=SCRAM-SHA-256

# External clients (mobile apps, third-party integrations) use OAUTHBEARER
listener.name.external.sasl.enabled.mechanisms=OAUTHBEARER

# Broker-to-broker replication: mTLS only (no SASL overhead)
security.inter.broker.protocol=SSL

# Administrative CLI tools use the stronger SCRAM-SHA-512
listener.name.admin.sasl.enabled.mechanisms=SCRAM-SHA-512

# Advertised addresses for clients to connect to
advertised.listeners=\
  INTERNAL://kafka-broker-1.internal:9092,\
  EXTERNAL://kafka.company.com:9093,\
  REPLICATION://kafka-broker-1.internal:9094,\
  ADMIN://kafka-broker-1.internal:9095

Corresponding firewall rules:

Audit Logging: Security Event Trail

Audit logging is a baseline compliance requirement. Every authentication event and every authorization decision must be recorded.

# log4j.properties — configure dedicated audit appenders

# Authorization audit (every ACL allow/deny decision)
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.File=/var/log/kafka/kafka-authorizer.log
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

Sample audit log entries:

# Authorization denied
[2024-01-15 10:35:01,200] INFO Principal = User:anonymous is Denied
Operation = Write from host = 10.0.0.100 on resource = Topic:LITERAL:orders
for request = Produce with resourceRefCount = 1 (kafka.authorizer.logger)

# Authorization allowed
[2024-01-15 10:35:05,300] INFO Principal = User:order-service is Allowed
Operation = Write from host = 10.0.0.50 on resource = Topic:LITERAL:orders
for request = Produce with resourceRefCount = 1 (kafka.authorizer.logger)

Ship audit logs to a SIEM system using Filebeat:

# filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/kafka/kafka-authorizer.log
    fields:
      service: kafka
      event_type: authorization
      environment: production
    fields_under_root: true
    multiline.pattern: '^\['
    multiline.negate: true
    multiline.match: after

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "kafka-audit-%{+yyyy.MM.dd}"
  pipeline: "kafka-audit-enrichment"

Certificate Rotation Automation

Manual SSL certificate management is one of the most common causes of production outages (see Chapter 32, incident 8). Certificate expiry that brings down a production cluster is entirely preventable with automation.

On Kubernetes: cert-manager

apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
  name: kafka-broker-cert
  namespace: kafka
spec:
  secretName: kafka-broker-tls
  duration: 8760h      # 365 days
  renewBefore: 720h    # Begin renewal 30 days before expiry
  commonName: kafka-broker.kafka.svc.cluster.local
  dnsNames:
    - kafka-broker
    - kafka-broker.kafka
    - kafka-broker.kafka.svc
    - kafka-broker.kafka.svc.cluster.local
  issuerRef:
    name: internal-ca-issuer
    kind: ClusterIssuer

On bare metal / VMs: HashiCorp Vault PKI

# Configure Vault PKI secrets engine
vault secrets enable pki
vault secrets tune -max-lease-ttl=87600h pki

# Create a Kafka broker role
vault write pki/roles/kafka-broker \
  allowed_domains="internal" \
  allow_subdomains=true \
  max_ttl="8760h" \
  key_type="rsa" \
  key_bits=2048

# Issue a certificate (call this in broker startup script)
vault write -format=json pki/issue/kafka-broker \
  common_name="kafka-broker-1.internal" \
  ttl="8760h" \
  > /tmp/broker-cert.json

# Extract cert and key from Vault response
cat /tmp/broker-cert.json | jq -r '.data.certificate' > /etc/kafka/ssl/broker.pem
cat /tmp/broker-cert.json | jq -r '.data.private_key' > /etc/kafka/ssl/broker-key.pem

Prometheus alert for certificate expiry:

- alert: KafkaBrokerCertExpiringIn30Days
  expr: |
    (ssl_certificate_expiry_seconds{job="kafka-brokers"} - time()) / 86400 < 30
  for: 1h
  labels:
    severity: warning
  annotations:
    summary: "Kafka broker SSL certificate expiring on {{ $labels.instance }}"
    description: |
      Certificate expires in {{ $value | humanizeDuration }}.
      Initiate certificate rotation immediately to avoid a production incident.

- alert: KafkaBrokerCertExpiringIn7Days
  expr: |
    (ssl_certificate_expiry_seconds{job="kafka-brokers"} - time()) / 86400 < 7
  for: 1h
  labels:
    severity: critical
    page: "true"
  annotations:
    summary: "CRITICAL: Kafka broker SSL certificate expiring in less than 7 days"

Production Security Checklist (25 Items)

# Check Priority How to Verify
1 All listeners use SASL_SSL; PLAINTEXT disabled P0 kafka-broker-api-versions.sh --bootstrap-server host:9092
2 ssl.client.auth=required — mTLS enforced P0 Check server.properties
3 TLS 1.2 and 1.3 only; TLS 1.0/1.1 disabled P0 openssl s_client -connect host:9092
4 ACLs enabled; allow.everyone.if.no.acl.found=false P0 Check server.properties
5 Super users configured; known only to administrators P0 Check server.properties; audit access
6 Audit log enabled and shipped to SIEM P0 Check log4j.properties and Filebeat
7 Certificate expiry monitoring and auto-renewal active P0 Check Prometheus alerts
8 Broker-to-broker replication uses dedicated listener P1 Check listeners config
9 ZooKeeper/KRaft authentication enabled P1 Check zoo.cfg / KRaft config
10 Producers/consumers follow least-privilege ACL model P1 kafka-acls.sh --list
11 Transactional producers have TransactionalId ACLs P1 Check ACLs
12 Different CAs for dev, staging, and production P1 Verify certificate issuer
13 Keystore/key files have 600 permissions P1 ls -la /etc/kafka/ssl/
14 Keystore passwords stored in a secrets manager P1 Check secrets source in startup scripts
15 JMX port is not exposed or is TLS-protected P1 netstat -tlnp | grep 9999
16 Clients enable hostname verification P2 Check ssl.endpoint.identification.algorithm
17 Using SCRAM-SHA-512 over SCRAM-SHA-256 where possible P2 Check sasl.mechanism config
18 Delegation Token max lifetime is reasonable (≤ 24h) P2 Check token policy
19 Cluster Describe ACL restricted to authorized admin tools P2 Check Cluster ACLs
20 Log files do not contain plaintext credentials P2 grep -rn 'password' /var/log/kafka/
21 Network firewall allows only required ports P2 nmap -p 9092-9095 kafka-broker
22 Kafka Connect workers use a dedicated service account P2 Check Connect worker config
23 Schema Registry authentication enabled P2 Check Schema Registry config
24 ACL review scheduled quarterly (revoke unused permissions) P3 Calendar + kafka-acls.sh --list
25 Annual penetration test and security audit P3 External security assessment schedule

Summary: Defense in Depth

There is no single silver bullet in Kafka security. Production-grade security requires defense in depth — multiple independent security layers, each of which limits the blast radius when another layer is compromised or misconfigured:

When one layer fails, the others limit the damage. This is the architecture that earns sleeping through the night.

Rate this chapter
4.7  / 5  (3 ratings)

💬 Comments