第 30 章

安全加固:认证、授权、加密与审计

第30章:安全加固:认证、授权、加密与审计

导读:Kafka 安全如何覆盖认证、授权、加密与审计?

本章核心问题:Kafka 安全如何覆盖认证、授权、加密与审计?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Kafka 安全的四个维度

Kafka 的安全体系由四个独立但相互关联的维度构成:

  1. 认证(Authentication):验证连接方身份——"你是谁?"
  2. 授权(Authorization):控制操作权限——"你能做什么?"
  3. 加密(Encryption):保护数据传输——"数据在传输中不被窃听"
  4. 审计(Audit):记录操作历史——"谁在什么时间做了什么"

这四个维度缺一不可。只做认证而不做授权,任何合法用户都能操作任意 Topic;只做传输加密而不做认证,攻击者可以用任意身份连接;只做访问控制而不做审计,安全事件发生后无从溯源。

本章按照部署的复杂度从低到高,逐一讲解每个维度的技术选型和实施细节。

SASL 认证机制详解

SASL(Simple Authentication and Security Layer)是 Kafka 支持的认证框架,包含五种具体机制:

PLAIN:最简单但不建议在生产使用

PLAIN 机制以明文形式传输用户名和密码(仅做 Base64 编码,非加密)。

# Broker 配置 (server.properties)
listeners=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9093
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
# JAAS 配置 (kafka_server_jaas.conf)
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_producer="producer-secret"
  user_consumer="consumer-secret";
};

风险:用户名密码以明文传输。必须与 TLS 结合使用(使用 SASL_SSL 监听器而非 SASL_PLAINTEXT),否则网络监听可直接获取凭证。此外,所有用户凭证都写在 Broker 配置文件中,难以动态更新,不适合大规模用户管理。

SCRAM-SHA-256 / SCRAM-SHA-512:生产推荐的密码认证

SCRAM(Salted Challenge Response Authentication Mechanism)是对 PLAIN 的重大安全改进:

# 创建用户(凭证写入 ZooKeeper/KRaft,不需要重启 Broker)
kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --alter \
  --add-config 'SCRAM-SHA-256=[iterations=8192,password=producer-secret]' \
  --entity-type users \
  --entity-name kafka-producer

kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --alter \
  --add-config 'SCRAM-SHA-512=[iterations=8192,password=consumer-secret]' \
  --entity-type users \
  --entity-name kafka-consumer

# 查看用户列表
kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --describe \
  --entity-type users

# 删除用户
kafka-configs.sh \
  --bootstrap-server kafka:9092 \
  --alter \
  --delete-config 'SCRAM-SHA-256' \
  --entity-type users \
  --entity-name decommissioned-service

Broker 配置:

# server.properties
listeners=SASL_SSL://0.0.0.0:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password

客户端 JAAS 配置:

// Java 客户端(Producer/Consumer)
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username='kafka-producer' " +
    "password='producer-secret';");

GSSAPI/Kerberos:企业 SSO 集成

Kerberos 是企业环境中成熟的集中式认证方案(Active Directory、MIT Kerberos),适合已有 Kerberos 基础设施的组织。

# server.properties - Kerberos 配置
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
# kafka_server_jaas.conf
KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/etc/security/keytabs/kafka.service.keytab"
  principal="kafka/[email protected]";
};

Kerberos 的主要挑战:

OAUTHBEARER:云原生 JWT 认证(Kafka 2.0+)

OAUTHBEARER 机制允许使用 JWT(JSON Web Token)进行认证,适合云原生环境和微服务架构,可与 Keycloak、Okta、Auth0 等标准 OAuth 2.0 / OIDC 提供商集成。

# server.properties
sasl.enabled.mechanisms=OAUTHBEARER
sasl.oauthbearer.jwks.endpoint.url=https://auth.company.com/.well-known/jwks.json
sasl.oauthbearer.expected.audience=kafka-cluster
sasl.oauthbearer.sub.claim.name=client_id

客户端配置:

props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.oauthbearer.token.endpoint.url",
    "https://auth.company.com/oauth/token");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
    "clientId='kafka-producer-service' " +
    "clientSecret='client-secret' " +
    "scope='kafka:write';");

OAUTHBEARER 的优势:

Delegation Token:高性能短期凭证

当大量微服务需要认证时,每个服务都需要管理独立的 Kerberos keytab 或 SCRAM 密码。Delegation Token 提供了一种轻量级替代方案:主凭证(master credential)认证后,生成有时效性的派生 Token,供短期使用。

# 生成 Delegation Token(需要先用 SCRAM 或 Kerberos 认证)
kafka-delegation-tokens.sh \
  --bootstrap-server kafka:9092 \
  --create \
  --max-life-time-period 86400000 \  # 24 小时(毫秒)
  --command-config admin-client.properties

# 续期 Token
kafka-delegation-tokens.sh \
  --bootstrap-server kafka:9092 \
  --renew \
  --renew-time-period 3600000 \     # 续期 1 小时
  --hmac <token-hmac> \
  --command-config admin-client.properties

# 查看所有 Token(只能看到自己的,除非是超级用户)
kafka-delegation-tokens.sh \
  --bootstrap-server kafka:9092 \
  --describe \
  --command-config admin-client.properties

ACL:授权控制

kafka-acls.sh 基本操作

Kafka 使用 ACL(Access Control List)控制 Principal(认证身份)对 Resource(资源)执行 Operation(操作)的权限。

资源类型:

操作类型:ReadWriteCreateDescribeAlterDeleteDescribeConfigsAlterConfigsClusterAction

# 授予 Producer 服务写入 orders Topic 的权限
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:order-service \
  --operation Write \
  --operation Describe \
  --topic orders

# 授予 Consumer Group 读取权限
# 需要同时授予 Topic Read 权限和 Group Read 权限
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:kafka-consumer \
  --operation Read \
  --operation Describe \
  --topic orders

kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:kafka-consumer \
  --operation Read \
  --group payment-consumer-group

# 使用前缀通配符(赋予对所有 orders- 开头的 Topic 的读权限)
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:kafka-consumer \
  --operation Read \
  --topic 'orders-' \
  --resource-pattern-type prefixed

# 授予 Producer 使用特定事务 ID 的权限(事务 Producer 必须)
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --add \
  --allow-principal User:order-service \
  --operation Write \
  --operation Describe \
  --transactional-id order-producer-txn

# 查看 Topic 的所有 ACL
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --list \
  --topic orders

# 查看所有 ACL
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --list

# 撤销权限
kafka-acls.sh \
  --bootstrap-server kafka:9092 \
  --command-config admin-client.properties \
  --remove \
  --allow-principal User:decommissioned-service \
  --operation Write \
  --topic orders

开启 ACL 必须先配置超级用户

ACL 默认不开启(allow.everyone.if.no.acl.found=true)。开启 ACL 前必须先配置至少一个超级用户,否则开启后无法管理 Kafka:

# server.properties
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

# 超级用户(绕过所有 ACL 检查,用于管理操作)
super.users=User:admin;User:kafka-admin

# 当资源无 ACL 时,默认拒绝所有操作(生产环境推荐)
allow.everyone.if.no.acl.found=false

网络分层:多 Listener 架构

生产环境推荐将不同类型的连接分配到不同的 Listener,并配以不同的安全策略:

# server.properties - 多 Listener 配置
listeners=\
  INTERNAL://0.0.0.0:9092,\
  EXTERNAL://0.0.0.0:9093,\
  REPLICATION://0.0.0.0:9094,\
  ADMIN://0.0.0.0:9095

listener.security.protocol.map=\
  INTERNAL:SASL_SSL,\
  EXTERNAL:SASL_SSL,\
  REPLICATION:SSL,\
  ADMIN:SASL_SSL

# 内部服务(微服务)使用 SCRAM
listener.name.internal.sasl.enabled.mechanisms=SCRAM-SHA-256

# 外部客户端使用 OAUTHBEARER(与公司 IAM 集成)
listener.name.external.sasl.enabled.mechanisms=OAUTHBEARER

# Broker 间复制只用 mTLS(无 SASL,性能更好)
security.inter.broker.protocol=SSL

# 管理工具连接(仅允许内网 IP)
listener.name.admin.sasl.enabled.mechanisms=SCRAM-SHA-512

# 向外暴露的地址(根据实际网络配置)
advertised.listeners=\
  INTERNAL://kafka-broker-1.internal:9092,\
  EXTERNAL://kafka-broker-1.company.com:9093,\
  REPLICATION://kafka-broker-1.internal:9094,\
  ADMIN://kafka-broker-1.internal:9095

防火墙规则对应:

审计日志:安全事件溯源

审计日志是安全合规的基础要求,必须记录所有认证和授权事件。

# log4j.properties - 审计日志配置
# 授权日志(每次 ACL 允许/拒绝事件)
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.File=/var/log/kafka/kafka-authorizer.log
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# 认证日志(SASL 认证成功/失败)
log4j.logger.org.apache.kafka.common.network.SaslChannelBuilder=DEBUG, requestAppender
log4j.additivity.org.apache.kafka.common.network.SaslChannelBuilder=false

审计日志样例:

# ACL 授权拒绝(Write 操作被拒)
[2024-01-15 10:35:01,200] INFO Principal = User:anonymous is Denied 
Operation = Write from host = 10.0.0.100 on resource = Topic:LITERAL:orders 
for request = Produce with resourceRefCount = 1 (kafka.authorizer.logger)

# ACL 授权允许
[2024-01-15 10:35:05,300] INFO Principal = User:order-service is Allowed 
Operation = Write from host = 10.0.0.50 on resource = Topic:LITERAL:orders 
for request = Produce with resourceRefCount = 1 (kafka.authorizer.logger)

将审计日志集成到 SIEM(Security Information and Event Management)系统:

# 使用 Filebeat 将审计日志推送到 Elasticsearch/OpenSearch
# filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/kafka/kafka-authorizer.log
    fields:
      service: kafka
      event_type: authorization
    fields_under_root: true
    multiline.pattern: '^\['
    multiline.negate: true
    multiline.match: after

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "kafka-audit-%{+yyyy.MM.dd}"

生产安全检查清单(25 条)

序号 检查项 优先级 验证命令
1 所有 Listener 使用 SASL_SSL,禁用明文 PLAINTEXT P0 kafka-broker-api-versions.sh --bootstrap-server host:9092
2 ssl.client.auth=required 开启 mTLS P0 检查 server.properties
3 TLS 版本仅允许 1.2 和 1.3 P0 openssl s_client -connect host:9092
4 启用 ACL(allow.everyone.if.no.acl.found=false P0 检查 server.properties
5 配置超级用户并仅限管理员知晓 P0 检查 server.properties
6 审计日志已启用并推送到 SIEM P0 检查 log4j.properties
7 证书有效期监控和自动续签 P0 检查 Prometheus 告警
8 Broker 间通信使用独立 Listener 和证书 P1 检查 listeners 配置
9 ZooKeeper/KRaft 启用认证 P1 检查 zoo.cfg / KRaft 配置
10 Producer/Consumer 使用最小权限原则 P1 kafka-acls.sh --list
11 事务 Producer 配置 TransactionalId ACL P1 检查 ACL
12 不同环境(dev/staging/prod)使用不同 CA P1 检查证书颁发机构
13 密钥文件权限设置为 600(仅 Kafka 用户可读) P1 ls -la /etc/kafka/ssl/
14 Keystore 密码使用 Vault 等密钥管理系统存储 P1 检查密钥来源
15 禁用 JMX 开放端口(或加密 JMX) P1 `netstat -tlnp
16 客户端启用主机名验证 P2 检查 ssl.endpoint.identification.algorithm
17 使用 SCRAM-SHA-512 而非 SCRAM-SHA-256(更强) P2 检查 sasl.mechanism
18 Delegation Token 最大生命周期设置合理 P2 检查 Token 策略
19 限制 kafka-consumer-groups.sh 的使用人员 P2 检查 Cluster Describe ACL
20 日志中不包含明文密码(检查 log4j 配置) P2 grep -r 'password' /var/log/kafka/
21 网络防火墙仅开放必要端口 P2 nmap -p 9092-9095 kafka-broker
22 Kafka Connect 使用独立的 Service Account P2 检查 Connect Worker 配置
23 Schema Registry 启用认证 P2 检查 Schema Registry 配置
24 定期审查 ACL(删除无用权限) P3 kafka-acls.sh --list 定期审查
25 渗透测试和安全审计(每年至少一次) P3 外部安全评估

小结:纵深防御的安全策略

Kafka 安全没有单一的银弹。生产环境的安全策略必须是纵深防御(Defense in Depth)

每一层的失效都不会直接导致全面沦陷。这才是真正的生产级安全。


Level 2 · 它是怎么运行的(3-5年经验)

SSL/TLS:传输层加密

证书体系设计

Kafka SSL 部署需要为每个 Broker 生成独立的证书,并配置统一的信任链。以下是完整的证书生成流程:

# 1. 生成 CA 私钥和自签名证书(企业环境用内部 CA 替代)
openssl req -new -x509 -keyout ca-key.pem -out ca-cert.pem \
  -days 3650 \
  -subj "/CN=Kafka-CA/O=Company/C=CN" \
  -passout pass:ca-password

# 2. 为每个 Broker 生成 Keystore
keytool -keystore kafka-broker-1.keystore.jks \
  -alias kafka-broker-1 \
  -keyalg RSA \
  -keysize 2048 \
  -validity 365 \
  -genkey \
  -dname "CN=kafka-broker-1.internal,O=Company,C=CN" \
  -storepass keystore-password

# 3. 生成 CSR 并用 CA 签名
keytool -keystore kafka-broker-1.keystore.jks \
  -alias kafka-broker-1 \
  -certreq \
  -file broker-1.csr \
  -storepass keystore-password

openssl x509 -req \
  -CA ca-cert.pem \
  -CAkey ca-key.pem \
  -in broker-1.csr \
  -out broker-1-signed.pem \
  -days 365 \
  -CAcreateserial \
  -passin pass:ca-password

# 4. 将 CA 证书和签名证书导入 Keystore
keytool -keystore kafka-broker-1.keystore.jks \
  -alias CARoot \
  -import \
  -file ca-cert.pem \
  -storepass keystore-password \
  -noprompt

keytool -keystore kafka-broker-1.keystore.jks \
  -alias kafka-broker-1 \
  -import \
  -file broker-1-signed.pem \
  -storepass keystore-password

# 5. 创建 Truststore(包含 CA 证书)
keytool -keystore kafka.truststore.jks \
  -alias CARoot \
  -import \
  -file ca-cert.pem \
  -storepass truststore-password \
  -noprompt

mTLS(双向 TLS)配置

mTLS 要求客户端也提供证书,Broker 验证客户端身份,是最严格的传输层安全配置:

# server.properties - mTLS 配置
ssl.keystore.location=/etc/kafka/ssl/kafka-broker-1.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-password

# REQUIRED: 要求客户端提供证书(启用 mTLS)
ssl.client.auth=required

# 启用主机名验证(防止中间人攻击,生产环境必须开启)
ssl.endpoint.identification.algorithm=HTTPS

# TLS 版本限制(禁用 TLS 1.0/1.1)
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.protocol=TLSv1.3

# 指定强密码套件(TLS 1.3 自动使用最强套件)
ssl.cipher.suites=TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_GCM_SHA256

TLS 性能影响与优化

TLS 加密会带来额外开销:

优化措施:

  1. 使用 TLS 1.3:与 TLS 1.2 相比,TLS 1.3 减少一次握手 RTT(1-RTT vs 2-RTT),并且支持 0-RTT 恢复(对长连接价值有限,但对频繁重连的客户端有帮助)。

  2. 启用 AES-NI 加速:验证 CPU 支持 AES-NI:

    grep -o 'aes' /proc/cpuinfo | head -1
    # 如果输出 "aes",表示支持 AES-NI 硬件加速
    
  3. 连接复用:避免频繁创建新连接(每次握手都有开销)。Kafka 客户端默认复用连接,确保应用代码不要频繁创建 Producer/Consumer 实例。


Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

证书自动化轮换

手动管理 SSL 证书是生产事故的高频根源(见第 32 章案例 8)。以下是基于 cert-manager(Kubernetes 环境)的自动化方案:

# Kubernetes CertificateRequest - 自动轮换 Kafka Broker 证书
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
  name: kafka-broker-cert
  namespace: kafka
spec:
  secretName: kafka-broker-tls
  duration: 8760h    # 365 天
  renewBefore: 720h  # 提前 30 天续期
  commonName: kafka-broker.kafka.svc.cluster.local
  dnsNames:
    - kafka-broker
    - kafka-broker.kafka
    - kafka-broker.kafka.svc
    - kafka-broker.kafka.svc.cluster.local
  issuerRef:
    name: internal-ca-issuer
    kind: ClusterIssuer

非 Kubernetes 环境使用 HashiCorp Vault PKI:

# 配置 Vault PKI,有效期 365 天,提前 30 天告警
vault write pki/config/urls \
  issuing_certificates="https://vault:8200/v1/pki/ca" \
  crl_distribution_points="https://vault:8200/v1/pki/crl"

# 创建证书 Role
vault write pki/roles/kafka-broker \
  allowed_domains="internal" \
  allow_subdomains=true \
  max_ttl="8760h"

# 签发证书(在 Broker 启动脚本中调用)
vault write pki/issue/kafka-broker \
  common_name="kafka-broker-1.internal" \
  ttl="8760h"

证书到期前 30 天告警(Prometheus 规则):

- alert: KafkaBrokerCertExpiringInThirtyDays
  expr: |
    (ssl_certificate_expiry_seconds{job="kafka-brokers"} - time()) / 86400 < 30
  labels:
    severity: warning
  annotations:
    summary: "Kafka Broker 证书将在 {{ $value | humanizeDuration }} 后过期"
    description: "{{ $labels.instance }} 的 SSL 证书即将到期,请立即启动证书轮换流程。"
本章评分
4.7  / 5  (3 评分)

💬 留言讨论