Kafka Streams 架构:拓扑、线程与状态
第23章:Kafka Streams 架构:拓扑、线程与状态
导读:Kafka Streams 如何实现分布式有状态流处理?
本章核心问题:Kafka Streams 如何实现分布式有状态流处理?
读完本章你将理解:
- 拓扑构建与任务分配
- StreamThread 线程模型
- StateStore 与 Changelog Topic
- Standby Replicas 快速恢复
Level 1 · 你需要知道的(1-3年经验)
Kafka Streams 是一个库,不是一个集群
理解 Kafka Streams 的第一步,是破除一个普遍的误解:它不是像 Apache Flink 或 Apache Spark 那样需要单独部署和运维的计算集群,而是一个普通的 Java 库(kafka-streams)。你的应用程序引入这个依赖,在 main() 方法里启动它,就得到了一个具备分布式、有状态、精确一次语义的流处理能力的 JVM 进程。
这个设计决策背后有深刻的工程哲学。传统流处理框架需要集群管理器(YARN、Mesos、Kubernetes)来调度计算资源,需要独立的 JobManager/Master 节点来协调任务,这引入了额外的运维复杂度。Kafka Streams 的设计者选择了另一条路:把 Kafka 本身作为协调基础设施。
分区分配由 Kafka 的消费者组协议完成;故障检测由消费者组的心跳机制完成;状态的持久化由 Kafka 的 changelog topic 完成;弹性伸缩只需要启动或停止应用实例,Kafka 的 rebalance 会自动重新分配分区。整个系统没有任何额外的外部依赖,除了 Kafka 本身。
代价是什么?你失去了统一的 Web UI 来监控所有作业,失去了细粒度的资源配额控制,失去了跨作业的统一调度。这些是 Kafka Streams 有意不提供的,因为它认为这些职责属于容器编排层(Kubernetes)或你自己的监控体系(Prometheus + Grafana)。
StreamsConfig 与 StreamsBuilder
每个 Kafka Streams 应用从配置开始:
Properties props = new Properties();
// 应用程序 ID,同时也是消费者组 ID
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
// 默认的键/值序列化器
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
// 提交间隔(影响 at-least-once 的 offset 提交频率)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// 状态存储目录(RocksDB 数据写在这里)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
APPLICATION_ID_CONFIG 是 Kafka Streams 中最关键的配置项之一。它扮演三个角色:消费者组 ID(用于 Kafka 的分区分配协议)、changelog topic 的前缀名(<app-id>-<store-name>-changelog)、以及内部 repartition topic 的前缀名。同一个 application.id 的所有实例会自动组成一个处理集群。
StreamsBuilder 是定义处理逻辑的 DSL 入口:
StreamsBuilder builder = new StreamsBuilder();
// 从 topic 创建一个 KStream
KStream<String, String> textLines = builder.stream("text-input");
// 定义处理逻辑
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"));
// 写回 Kafka
wordCounts.toStream().to("wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
// 构建拓扑(此时不启动)
Topology topology = builder.build();
System.out.println(topology.describe()); // 打印拓扑图
builder.build() 返回的 Topology 对象是一个有向无环图(DAG)的描述,它此时还没有启动任何线程或连接任何 Kafka。topology.describe() 会输出类似下面的文本拓扑图:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [text-input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> KSTREAM-KEY-SELECT-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
--> KSTREAM-FILTER-0000000003
<-- KSTREAM-FLATMAPVALUES-0000000001
...
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000008 (topics: [wordcount-app-word-count-store-repartition])
--> KSTREAM-AGGREGATE-0000000006
Processor: KSTREAM-AGGREGATE-0000000006 (stores: [word-count-store])
--> KTABLE-TOSTREAM-0000000009
注意拓扑被分成了两个子拓扑(Sub-topology)。这是因为 groupBy 操作引入了重分区:为了保证相同的 key 进入同一个聚合任务,必须先把消息按新 key 写入一个 repartition topic,再从那个 topic 消费。这个 repartition topic 是 Kafka Streams 自动创建和管理的内部 topic。
Task:分区组的最小处理单元
StreamTask 是 Kafka Streams 中最核心的概念之一。每个 Task 对应一组分区——这组分区称为"分区组"(partition group),它们在处理拓扑中是共同进退的。
Task 数量由源 topic 的分区数决定。对于单个源 topic,Task 数 = 分区数。对于多个源 topic 的 join 操作,Task 数 = 分区数(两个 topic 必须有相同的分区数,且分区 i 与分区 i 对应)。
每个 Task 拥有:
- 一个独立的
ProcessorContext,追踪该 Task 的处理进度 - 属于该 Task 的状态存储实例(每个 Task 有自己的 RocksDB 实例)
- 输入缓冲区(
RecordQueue),按时间戳排序,决定处理顺序
// Task 处理一条记录的简化流程(StreamTask.java)
public boolean process() {
// 从所有输入分区的缓冲队列中选一条时间戳最小的记录
StampedRecord record = partitionGroup.nextRecord();
if (record == null) return false;
// 设置处理上下文(当前 record 的 offset、timestamp 等)
processorContext.setCurrentNode(sourceNode);
processorContext.setRecordContext(record);
// 调用源处理器,触发拓扑中的处理链
sourceNode.process(record.key(), record.value());
return true;
}
状态存储:内存 vs RocksDB
Kafka Streams 提供两类状态存储:
内存存储(In-Memory Store):底层是 HashMap 或 TreeMap,访问速度极快(纳秒级),但受限于 JVM 堆内存,GC 压力大,JVM 重启后状态丢失(需从 changelog 重建)。适合状态量小、对延迟极敏感的场景。
// 创建内存状态存储
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> inMemoryStore =
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withLoggingEnabled(Collections.emptyMap())
.withCachingEnabled();
// 在聚合中使用
KTable<String, Long> counts = stream
.groupByKey()
.count(inMemoryStore);
持久化存储(RocksDB):这是 Kafka Streams 的默认选择。RocksDB 是 Facebook 开源的嵌入式 LSM-tree 键值存储,数据写在本地磁盘,通过内存映射(mmap)和 Block Cache 加速读取,能处理远超 JVM 堆大小的状态量。
RocksDB 调优
Kafka Streams 通过 RocksDBConfigSetter 接口暴露 RocksDB 的调优入口:
public class MyRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options,
Map<String, Object> configs) {
// Block Cache:热数据的内存缓存
// 默认 50MB,对于热数据密集的场景严重不足
// 建议设置为可用内存的 30-50%
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(512 * 1024 * 1024L); // 512MB
tableConfig.setBlockSize(16 * 1024L); // 16KB block
tableConfig.setCacheIndexAndFilterBlocks(true); // 索引也放缓存
options.setTableFormatConfig(tableConfig);
// Write Buffer(MemTable):写入先到这里,满了再 flush 到磁盘
// 默认 16MB,高写入压力时可增大到 64-128MB
options.setWriteBufferSize(64 * 1024 * 1024L); // 64MB
options.setMaxWriteBufferNumber(3); // 最多 3 个 MemTable
// 压缩算法:Snappy 是速度与压缩率的平衡点
// LZ4 更快但压缩率略低;ZSTD 压缩率最高但 CPU 消耗大
options.setCompressionType(CompressionType.SNAPPY_COMPRESSION);
// Level 0 到 Level 1 的 compaction 触发阈值
options.setLevel0FileNumCompactionTrigger(10);
options.setLevel0SlowdownWritesTrigger(20);
options.setLevel0StopWritesTrigger(40);
}
@Override
public void close(String storeName, Options options) {
// 清理资源
}
}
// 注册自定义 RocksDB 配置
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
MyRocksDBConfig.class);
Block Cache 是 RocksDB 调优中影响最大的参数。RocksDB 读取数据时,先查 MemTable(写缓冲),再查 Block Cache(磁盘块缓存),最后才做实际的磁盘 I/O。默认的 50MB Block Cache 在生产环境几乎必然不够,对于有数百万个 key 的聚合状态,建议从 512MB 起步并根据监控调整。
write_buffer_size 控制 MemTable 大小。写入先进 MemTable,满了之后会 flush 到磁盘成为 SSTable,触发后台 compaction。写入压力大时增大这个值可以减少 flush 频率,但会增加内存占用和崩溃恢复时间(WAL 重放更长)。
为什么选择 Kafka Streams
Kafka Streams 作为库的设计在特定场景下有无可替代的优势:微服务架构中每个服务需要独立的流处理逻辑、团队不想维护额外的计算集群、需要与 Spring Boot 等框架深度集成、状态规模在单机可管理范围内(几十 GB 到几百 GB)。
当你的状态规模达到 TB 级、需要跨多 Job 的统一资源调度、或者需要更复杂的窗口语义时,Apache Flink 会是更好的选择。理解 Kafka Streams 的架构本质,是在两者之间做出合理取舍的前提。
Level 2 · 它是怎么运行的(3-5年经验)
StreamThread:线程即调度单元
KafkaStreams 实例启动后,内部会创建 num.stream.threads(默认 1)个 StreamThread。每个 StreamThread 是一个独立的 Java 线程,它:
- 内部维护一个普通的
KafkaConsumer用于拉取消息 - 维护一个
KafkaProducer用于写出结果和 changelog - 管理一组分配给它的
StreamTask
StreamThread 的主循环极为简洁,本质上是一个永不停止的 poll 循环:
// 简化的 StreamThread 主循环(实际代码在 StreamThread.java)
while (isRunning()) {
// 1. 调用 consumer.poll() 拉取消息(含 rebalance 处理)
ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTime);
// 2. 把记录分发给对应的 Task
for (StreamTask task : activeTasks) {
task.addRecords(task.inputPartition(), records);
}
// 3. 让每个 Task 处理消息
for (StreamTask task : activeTasks) {
task.process();
}
// 4. 按时间间隔提交 offset 和状态
if (shouldCommit()) {
commitAll();
}
// 5. 推送本地状态到 changelog topic
for (StreamTask task : activeTasks) {
task.flushState();
}
}
多线程不等于多实例。在单个 JVM 进程中设置 num.stream.threads=4,会创建 4 个 StreamThread,每个 thread 持有不同分区的 Task,它们并行处理但共享同一个进程的内存和 JVM 资源。水平扩展则是启动多个进程(每个进程都配置相同的 application.id),Kafka 的 rebalance 会自动在所有进程间重新分配分区。
StreamsPartitionAssignor:自定义分配策略
Kafka Streams 注册了一个自定义的 ConsumerPartitionAssignor 实现:StreamsPartitionAssignor。在 Kafka 消费者组的 rebalance 过程中,Group Leader(第一个加入组的消费者实例)会运行这个 Assignor 的 assign() 方法来决定分区分配。
StreamsPartitionAssignor 的分配逻辑远比默认的 RangeAssignor 复杂,它需要考虑:
- Sticky 分配:尽可能让 Task 留在当前持有它的实例上,避免状态迁移(状态在哪个机器上就尽量把 Task 分给那台机器)
- Standby Task 放置:Standby Task 不能和对应的 Active Task 在同一个实例上
- 拓扑感知:多个子拓扑的 Task 按子拓扑关系整体分配
每个实例在加入消费者组时,会在 JoinGroupRequest 的 metadata 中附带自己当前持有的 Task 信息(Task ID → 状态存储的最新 changelog offset)。Group Leader 的 Assignor 利用这些信息做出最优分配决策。
Level 3 · 规范怎么定义的(资深)
Changelog Topic:状态的持久化保险
每个状态存储(无论内存还是 RocksDB)在默认情况下都有对应的 changelog topic,命名格式为:
<application-id>-<store-name>-changelog
这个 topic 是**紧凑型(compacted)**的,Kafka 的 Log Compaction 会保留每个 key 的最新值并删除旧版本,使 changelog topic 的大小与当前状态的大小成正比,而不是与历史写入量成正比。
状态存储的每次写操作,都会同步地写一条消息到 changelog topic:
状态存储写入: key="kafka", value=42
↓ 同步
Changelog topic 写入: key="kafka", value=42 (offset=1234)
这意味着:
- 故障恢复:实例崩溃重启后,Kafka Streams 从 changelog topic 的头部开始重放,重建状态存储,直到追上最新 offset
- Standby Task 同步:Standby Task 持续消费 changelog,保持近实时同步
- 状态迁移:Task 迁移到新实例时,新实例重放 changelog 重建状态
changelog 写入是异步批量的(通过内置的 producer),不会显著影响处理延迟,但会增加网络带宽和 Kafka 存储压力。对于超大状态(TB 级),可以考虑关闭 changelog 并接受更长的恢复时间:
// 关闭 changelog(不推荐用于生产)
Materialized.as("my-store").withLoggingDisabled()
Level 4 · 边界与陷阱(所有人)
Standby Task:快速故障转移的秘密
有状态的流处理面临一个根本问题:如果持有状态的实例崩溃了,新接管分区的实例需要从 changelog topic 重放所有历史消息才能重建状态,这可能需要几分钟甚至更长时间。在这段时间内,该分区的处理是停滞的。
Standby Task 解决了这个问题。配置 num.standby.replicas=1,Kafka Streams 会在其他实例上为每个 Active Task 维护一个 Standby Task。Standby Task 持续消费 changelog topic 的消息,保持状态与 Active Task 接近同步(通常滞后秒级)。
当 Active Task 所在实例崩溃时,Assignor 会优先将该 Task 分配给拥有最新 Standby 状态的实例,该实例只需要重放少量增量 changelog 就能恢复处理,故障转移时间从分钟级降到秒级。
// 配置 1 个 standby 副本
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
Standby Task 不处理主数据流(不消费源 topic),只消费对应的 changelog topic。它消耗额外的内存和磁盘(存储状态副本),以及 changelog topic 的消费带宽,这是用资源换可用性的经典权衡。
完整 WordCount 示例
以下是一个生产可用的完整 WordCount 示例,包含错误处理和监控:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
public class WordCountApp {
public static void main(String[] args) {
Properties props = buildConfig();
Topology topology = buildTopology();
// 打印拓扑结构(部署前的重要调试手段)
System.out.println("=== Topology ===");
System.out.println(topology.describe());
KafkaStreams streams = new KafkaStreams(topology, props);
// 注册全局异常处理器(Kafka Streams 3.x API)
streams.setUncaughtExceptionHandler(exception -> {
System.err.println("Uncaught exception: " + exception.getMessage());
// REPLACE_THREAD: 重启崩溃的 StreamThread,不终止整个应用
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
.REPLACE_THREAD;
});
// 注册状态监听器
streams.setStateListener((newState, oldState) -> {
System.out.printf("State transition: %s → %s%n", oldState, newState);
if (newState == KafkaStreams.State.ERROR) {
System.err.println("Application entered ERROR state!");
}
});
// 优雅停止
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// 等待最多 10 秒让当前处理完成
streams.close(Duration.ofSeconds(10));
latch.countDown();
}));
// 启动(非阻塞,后台线程开始处理)
streams.start();
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
static Properties buildConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-v1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/data/kafka-streams");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
return props;
}
static Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(
"text-input",
Consumed.with(Serdes.String(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
);
KTable<String, Long> wordCounts = textLines
// flatMapValues 不改变 key,避免不必要的重分区
.flatMapValues(line -> Arrays.asList(
line.toLowerCase().split("\\W+")
))
// 过滤空字符串
.filter((key, word) -> !word.isEmpty())
// selectKey 改变 key 为单词本身,触发下游重分区
.selectKey((key, word) -> word)
// groupByKey(此时 key 已是单词,直接分组)
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
// count 聚合,结果持久化到 RocksDB
.count(Materialized
.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withCachingEnabled() // 启用写缓冲,减少 changelog 写入量
.withLoggingEnabled(Map.of(
"min.insync.replicas", "2",
"retention.ms", String.valueOf(7 * 24 * 3600 * 1000L)
))
);
// KTable 转 KStream 再输出
wordCounts.toStream().to(
"wordcount-output",
Produced.with(Serdes.String(), Serdes.Long())
);
return builder.build();
}
}
交互式查询:从外部读取状态
Kafka Streams 支持交互式查询,允许应用程序的其他部分(甚至 HTTP 服务)直接读取状态存储,而无需经过 Kafka:
// 查询本地状态(只能查询当前实例持有的分区的状态)
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"word-count-store",
QueryableStoreTypes.keyValueStore()
)
);
Long count = store.get("kafka"); // 直接从 RocksDB 读取,毫秒级延迟
// 全局查询(需要 RPC 路由到正确实例)
KeyQueryMetadata metadata = streams.queryMetadataForKey(
"word-count-store", "kafka", Serdes.String().serializer()
);
// metadata 包含持有该 key 的实例的 host:port
// 你的代码需要用 HTTP/gRPC 转发到那个实例
这是 Kafka Streams 构建 CQRS(命令查询职责分离)架构的基础:流处理写入状态,HTTP 服务读取状态,两者使用同一个 JVM 进程。