第 23 章

Kafka Streams 架构:拓扑、线程与状态

第23章:Kafka Streams 架构:拓扑、线程与状态

导读:Kafka Streams 如何实现分布式有状态流处理?

本章核心问题:Kafka Streams 如何实现分布式有状态流处理?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Kafka Streams 是一个库,不是一个集群

理解 Kafka Streams 的第一步,是破除一个普遍的误解:它不是像 Apache Flink 或 Apache Spark 那样需要单独部署和运维的计算集群,而是一个普通的 Java 库(kafka-streams)。你的应用程序引入这个依赖,在 main() 方法里启动它,就得到了一个具备分布式、有状态、精确一次语义的流处理能力的 JVM 进程。

这个设计决策背后有深刻的工程哲学。传统流处理框架需要集群管理器(YARN、Mesos、Kubernetes)来调度计算资源,需要独立的 JobManager/Master 节点来协调任务,这引入了额外的运维复杂度。Kafka Streams 的设计者选择了另一条路:把 Kafka 本身作为协调基础设施

分区分配由 Kafka 的消费者组协议完成;故障检测由消费者组的心跳机制完成;状态的持久化由 Kafka 的 changelog topic 完成;弹性伸缩只需要启动或停止应用实例,Kafka 的 rebalance 会自动重新分配分区。整个系统没有任何额外的外部依赖,除了 Kafka 本身。

代价是什么?你失去了统一的 Web UI 来监控所有作业,失去了细粒度的资源配额控制,失去了跨作业的统一调度。这些是 Kafka Streams 有意不提供的,因为它认为这些职责属于容器编排层(Kubernetes)或你自己的监控体系(Prometheus + Grafana)。

StreamsConfig 与 StreamsBuilder

每个 Kafka Streams 应用从配置开始:

Properties props = new Properties();
// 应用程序 ID,同时也是消费者组 ID
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
// 默认的键/值序列化器
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
    Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
    Serdes.String().getClass());
// 提交间隔(影响 at-least-once 的 offset 提交频率)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// 状态存储目录(RocksDB 数据写在这里)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");

APPLICATION_ID_CONFIG 是 Kafka Streams 中最关键的配置项之一。它扮演三个角色:消费者组 ID(用于 Kafka 的分区分配协议)、changelog topic 的前缀名(<app-id>-<store-name>-changelog)、以及内部 repartition topic 的前缀名。同一个 application.id 的所有实例会自动组成一个处理集群。

StreamsBuilder 是定义处理逻辑的 DSL 入口:

StreamsBuilder builder = new StreamsBuilder();

// 从 topic 创建一个 KStream
KStream<String, String> textLines = builder.stream("text-input");

// 定义处理逻辑
KTable<String, Long> wordCounts = textLines
    .flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word)
    .count(Materialized.as("word-count-store"));

// 写回 Kafka
wordCounts.toStream().to("wordcount-output", 
    Produced.with(Serdes.String(), Serdes.Long()));

// 构建拓扑(此时不启动)
Topology topology = builder.build();
System.out.println(topology.describe()); // 打印拓扑图

builder.build() 返回的 Topology 对象是一个有向无环图(DAG)的描述,它此时还没有启动任何线程或连接任何 Kafka。topology.describe() 会输出类似下面的文本拓扑图:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [text-input])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> KSTREAM-FILTER-0000000003
      <-- KSTREAM-FLATMAPVALUES-0000000001
    ...
   Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000008 (topics: [wordcount-app-word-count-store-repartition])
      --> KSTREAM-AGGREGATE-0000000006
    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [word-count-store])
      --> KTABLE-TOSTREAM-0000000009

注意拓扑被分成了两个子拓扑(Sub-topology)。这是因为 groupBy 操作引入了重分区:为了保证相同的 key 进入同一个聚合任务,必须先把消息按新 key 写入一个 repartition topic,再从那个 topic 消费。这个 repartition topic 是 Kafka Streams 自动创建和管理的内部 topic。

Task:分区组的最小处理单元

StreamTask 是 Kafka Streams 中最核心的概念之一。每个 Task 对应一组分区——这组分区称为"分区组"(partition group),它们在处理拓扑中是共同进退的。

Task 数量由源 topic 的分区数决定。对于单个源 topic,Task 数 = 分区数。对于多个源 topic 的 join 操作,Task 数 = 分区数(两个 topic 必须有相同的分区数,且分区 i 与分区 i 对应)。

每个 Task 拥有:

// Task 处理一条记录的简化流程(StreamTask.java)
public boolean process() {
    // 从所有输入分区的缓冲队列中选一条时间戳最小的记录
    StampedRecord record = partitionGroup.nextRecord();
    if (record == null) return false;
    
    // 设置处理上下文(当前 record 的 offset、timestamp 等)
    processorContext.setCurrentNode(sourceNode);
    processorContext.setRecordContext(record);
    
    // 调用源处理器,触发拓扑中的处理链
    sourceNode.process(record.key(), record.value());
    return true;
}

状态存储:内存 vs RocksDB

Kafka Streams 提供两类状态存储:

内存存储(In-Memory Store):底层是 HashMapTreeMap,访问速度极快(纳秒级),但受限于 JVM 堆内存,GC 压力大,JVM 重启后状态丢失(需从 changelog 重建)。适合状态量小、对延迟极敏感的场景。

// 创建内存状态存储
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> inMemoryStore =
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long())
        .withLoggingEnabled(Collections.emptyMap())
        .withCachingEnabled();

// 在聚合中使用
KTable<String, Long> counts = stream
    .groupByKey()
    .count(inMemoryStore);

持久化存储(RocksDB):这是 Kafka Streams 的默认选择。RocksDB 是 Facebook 开源的嵌入式 LSM-tree 键值存储,数据写在本地磁盘,通过内存映射(mmap)和 Block Cache 加速读取,能处理远超 JVM 堆大小的状态量。

RocksDB 调优

Kafka Streams 通过 RocksDBConfigSetter 接口暴露 RocksDB 的调优入口:

public class MyRocksDBConfig implements RocksDBConfigSetter {
    @Override
    public void setConfig(String storeName, Options options, 
                          Map<String, Object> configs) {
        // Block Cache:热数据的内存缓存
        // 默认 50MB,对于热数据密集的场景严重不足
        // 建议设置为可用内存的 30-50%
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(512 * 1024 * 1024L); // 512MB
        tableConfig.setBlockSize(16 * 1024L);               // 16KB block
        tableConfig.setCacheIndexAndFilterBlocks(true);      // 索引也放缓存
        options.setTableFormatConfig(tableConfig);
        
        // Write Buffer(MemTable):写入先到这里,满了再 flush 到磁盘
        // 默认 16MB,高写入压力时可增大到 64-128MB
        options.setWriteBufferSize(64 * 1024 * 1024L);       // 64MB
        options.setMaxWriteBufferNumber(3);                   // 最多 3 个 MemTable
        
        // 压缩算法:Snappy 是速度与压缩率的平衡点
        // LZ4 更快但压缩率略低;ZSTD 压缩率最高但 CPU 消耗大
        options.setCompressionType(CompressionType.SNAPPY_COMPRESSION);
        
        // Level 0 到 Level 1 的 compaction 触发阈值
        options.setLevel0FileNumCompactionTrigger(10);
        options.setLevel0SlowdownWritesTrigger(20);
        options.setLevel0StopWritesTrigger(40);
    }
    
    @Override
    public void close(String storeName, Options options) {
        // 清理资源
    }
}

// 注册自定义 RocksDB 配置
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
    MyRocksDBConfig.class);

Block Cache 是 RocksDB 调优中影响最大的参数。RocksDB 读取数据时,先查 MemTable(写缓冲),再查 Block Cache(磁盘块缓存),最后才做实际的磁盘 I/O。默认的 50MB Block Cache 在生产环境几乎必然不够,对于有数百万个 key 的聚合状态,建议从 512MB 起步并根据监控调整。

write_buffer_size 控制 MemTable 大小。写入先进 MemTable,满了之后会 flush 到磁盘成为 SSTable,触发后台 compaction。写入压力大时增大这个值可以减少 flush 频率,但会增加内存占用和崩溃恢复时间(WAL 重放更长)。

为什么选择 Kafka Streams

Kafka Streams 作为库的设计在特定场景下有无可替代的优势:微服务架构中每个服务需要独立的流处理逻辑、团队不想维护额外的计算集群、需要与 Spring Boot 等框架深度集成、状态规模在单机可管理范围内(几十 GB 到几百 GB)。

当你的状态规模达到 TB 级、需要跨多 Job 的统一资源调度、或者需要更复杂的窗口语义时,Apache Flink 会是更好的选择。理解 Kafka Streams 的架构本质,是在两者之间做出合理取舍的前提。


Level 2 · 它是怎么运行的(3-5年经验)

StreamThread:线程即调度单元

KafkaStreams 实例启动后,内部会创建 num.stream.threads(默认 1)个 StreamThread。每个 StreamThread 是一个独立的 Java 线程,它:

  1. 内部维护一个普通的 KafkaConsumer 用于拉取消息
  2. 维护一个 KafkaProducer 用于写出结果和 changelog
  3. 管理一组分配给它的 StreamTask

StreamThread 的主循环极为简洁,本质上是一个永不停止的 poll 循环:

// 简化的 StreamThread 主循环(实际代码在 StreamThread.java)
while (isRunning()) {
    // 1. 调用 consumer.poll() 拉取消息(含 rebalance 处理)
    ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTime);
    
    // 2. 把记录分发给对应的 Task
    for (StreamTask task : activeTasks) {
        task.addRecords(task.inputPartition(), records);
    }
    
    // 3. 让每个 Task 处理消息
    for (StreamTask task : activeTasks) {
        task.process();
    }
    
    // 4. 按时间间隔提交 offset 和状态
    if (shouldCommit()) {
        commitAll();
    }
    
    // 5. 推送本地状态到 changelog topic
    for (StreamTask task : activeTasks) {
        task.flushState();
    }
}

多线程不等于多实例。在单个 JVM 进程中设置 num.stream.threads=4,会创建 4 个 StreamThread,每个 thread 持有不同分区的 Task,它们并行处理但共享同一个进程的内存和 JVM 资源。水平扩展则是启动多个进程(每个进程都配置相同的 application.id),Kafka 的 rebalance 会自动在所有进程间重新分配分区。

StreamsPartitionAssignor:自定义分配策略

Kafka Streams 注册了一个自定义的 ConsumerPartitionAssignor 实现:StreamsPartitionAssignor。在 Kafka 消费者组的 rebalance 过程中,Group Leader(第一个加入组的消费者实例)会运行这个 Assignor 的 assign() 方法来决定分区分配。

StreamsPartitionAssignor 的分配逻辑远比默认的 RangeAssignor 复杂,它需要考虑:

  1. Sticky 分配:尽可能让 Task 留在当前持有它的实例上,避免状态迁移(状态在哪个机器上就尽量把 Task 分给那台机器)
  2. Standby Task 放置:Standby Task 不能和对应的 Active Task 在同一个实例上
  3. 拓扑感知:多个子拓扑的 Task 按子拓扑关系整体分配

每个实例在加入消费者组时,会在 JoinGroupRequest 的 metadata 中附带自己当前持有的 Task 信息(Task ID → 状态存储的最新 changelog offset)。Group Leader 的 Assignor 利用这些信息做出最优分配决策。


Level 3 · 规范怎么定义的(资深)

Changelog Topic:状态的持久化保险

每个状态存储(无论内存还是 RocksDB)在默认情况下都有对应的 changelog topic,命名格式为:

<application-id>-<store-name>-changelog

这个 topic 是**紧凑型(compacted)**的,Kafka 的 Log Compaction 会保留每个 key 的最新值并删除旧版本,使 changelog topic 的大小与当前状态的大小成正比,而不是与历史写入量成正比。

状态存储的每次写操作,都会同步地写一条消息到 changelog topic:

状态存储写入: key="kafka", value=42
    ↓ 同步
Changelog topic 写入: key="kafka", value=42 (offset=1234)

这意味着:

  1. 故障恢复:实例崩溃重启后,Kafka Streams 从 changelog topic 的头部开始重放,重建状态存储,直到追上最新 offset
  2. Standby Task 同步:Standby Task 持续消费 changelog,保持近实时同步
  3. 状态迁移:Task 迁移到新实例时,新实例重放 changelog 重建状态

changelog 写入是异步批量的(通过内置的 producer),不会显著影响处理延迟,但会增加网络带宽和 Kafka 存储压力。对于超大状态(TB 级),可以考虑关闭 changelog 并接受更长的恢复时间:

// 关闭 changelog(不推荐用于生产)
Materialized.as("my-store").withLoggingDisabled()

Level 4 · 边界与陷阱(所有人)

Standby Task:快速故障转移的秘密

有状态的流处理面临一个根本问题:如果持有状态的实例崩溃了,新接管分区的实例需要从 changelog topic 重放所有历史消息才能重建状态,这可能需要几分钟甚至更长时间。在这段时间内,该分区的处理是停滞的。

Standby Task 解决了这个问题。配置 num.standby.replicas=1,Kafka Streams 会在其他实例上为每个 Active Task 维护一个 Standby Task。Standby Task 持续消费 changelog topic 的消息,保持状态与 Active Task 接近同步(通常滞后秒级)。

当 Active Task 所在实例崩溃时,Assignor 会优先将该 Task 分配给拥有最新 Standby 状态的实例,该实例只需要重放少量增量 changelog 就能恢复处理,故障转移时间从分钟级降到秒级。

// 配置 1 个 standby 副本
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

Standby Task 不处理主数据流(不消费源 topic),只消费对应的 changelog topic。它消耗额外的内存和磁盘(存储状态副本),以及 changelog topic 的消费带宽,这是用资源换可用性的经典权衡。

完整 WordCount 示例

以下是一个生产可用的完整 WordCount 示例,包含错误处理和监控:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;

public class WordCountApp {
    
    public static void main(String[] args) {
        Properties props = buildConfig();
        Topology topology = buildTopology();
        
        // 打印拓扑结构(部署前的重要调试手段)
        System.out.println("=== Topology ===");
        System.out.println(topology.describe());
        
        KafkaStreams streams = new KafkaStreams(topology, props);
        
        // 注册全局异常处理器(Kafka Streams 3.x API)
        streams.setUncaughtExceptionHandler(exception -> {
            System.err.println("Uncaught exception: " + exception.getMessage());
            // REPLACE_THREAD: 重启崩溃的 StreamThread,不终止整个应用
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
                .REPLACE_THREAD;
        });
        
        // 注册状态监听器
        streams.setStateListener((newState, oldState) -> {
            System.out.printf("State transition: %s → %s%n", oldState, newState);
            if (newState == KafkaStreams.State.ERROR) {
                System.err.println("Application entered ERROR state!");
            }
        });
        
        // 优雅停止
        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            // 等待最多 10 秒让当前处理完成
            streams.close(Duration.ofSeconds(10));
            latch.countDown();
        }));
        
        // 启动(非阻塞,后台线程开始处理)
        streams.start();
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    static Properties buildConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-v1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/data/kafka-streams");
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        return props;
    }
    
    static Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, String> textLines = builder.stream(
            "text-input",
            Consumed.with(Serdes.String(), Serdes.String())
                .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
        );
        
        KTable<String, Long> wordCounts = textLines
            // flatMapValues 不改变 key,避免不必要的重分区
            .flatMapValues(line -> Arrays.asList(
                line.toLowerCase().split("\\W+")
            ))
            // 过滤空字符串
            .filter((key, word) -> !word.isEmpty())
            // selectKey 改变 key 为单词本身,触发下游重分区
            .selectKey((key, word) -> word)
            // groupByKey(此时 key 已是单词,直接分组)
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            // count 聚合,结果持久化到 RocksDB
            .count(Materialized
                .<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count-store")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Long())
                .withCachingEnabled()   // 启用写缓冲,减少 changelog 写入量
                .withLoggingEnabled(Map.of(
                    "min.insync.replicas", "2",
                    "retention.ms", String.valueOf(7 * 24 * 3600 * 1000L)
                ))
            );
        
        // KTable 转 KStream 再输出
        wordCounts.toStream().to(
            "wordcount-output",
            Produced.with(Serdes.String(), Serdes.Long())
        );
        
        return builder.build();
    }
}

交互式查询:从外部读取状态

Kafka Streams 支持交互式查询,允许应用程序的其他部分(甚至 HTTP 服务)直接读取状态存储,而无需经过 Kafka:

// 查询本地状态(只能查询当前实例持有的分区的状态)
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "word-count-store",
        QueryableStoreTypes.keyValueStore()
    )
);

Long count = store.get("kafka"); // 直接从 RocksDB 读取,毫秒级延迟

// 全局查询(需要 RPC 路由到正确实例)
KeyQueryMetadata metadata = streams.queryMetadataForKey(
    "word-count-store", "kafka", Serdes.String().serializer()
);
// metadata 包含持有该 key 的实例的 host:port
// 你的代码需要用 HTTP/gRPC 转发到那个实例

这是 Kafka Streams 构建 CQRS(命令查询职责分离)架构的基础:流处理写入状态,HTTP 服务读取状态,两者使用同一个 JVM 进程。

本章评分
4.8  / 5  (6 评分)

💬 留言讨论