第 24 章

窗口、Join 与 Exactly-Once 流处理

第24章:窗口、Join 与 Exactly-Once 流处理

导读:流处理中的窗口、Join 与精确一次如何实现?

本章核心问题:流处理中的窗口、Join 与精确一次如何实现?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

时间与窗口的本质问题

流处理与批处理最根本的区别之一,是数据没有终点。对于"过去 5 分钟内每个用户点击了多少次广告"这样的问题,批处理可以在某个时间点取一个快照来回答,但流处理必须在数据持续到来的过程中维护这个计数,并且精确地界定"5 分钟"的边界。

这就是**窗口(Window)**要解决的问题:把无限的事件流切成有限的时间切片,让聚合操作变得有意义。

但时间本身是危险的。Kafka Streams 区分三种时间:

Kafka Streams 默认使用消息的 Kafka 时间戳(事件时间或摄取时间,取决于 Producer 的配置),通过 TimestampExtractor 可以从消息体提取自定义时间戳:

// 从消息体 JSON 中提取事件时间戳(毫秒)
public class EventTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        // 若提取失败,回退到分区时间(该分区最新处理的消息时间戳)
        if (record.value() instanceof MyEvent event) {
            return event.getEventTimestamp();
        }
        return partitionTime;
    }
}

// 在 Consumed 中注册
builder.stream("clicks", Consumed.with(Serdes.String(), eventSerde)
    .withTimestampExtractor(new EventTimestampExtractor()));

四种窗口类型

Tumbling Window(滚动窗口)

固定大小、不重叠的窗口。每条消息恰好属于一个窗口。适合"每 5 分钟的统计报表"场景。

时间轴: ---[0min----5min)---[5min---10min)---[10min---15min)---
事件:    a  b  c  d         e  f              g
窗口1(0-5):   a, b, c, d
窗口2(5-10):  e, f
窗口3(10-15): g
KStream<String, ClickEvent> clicks = builder.stream("clicks");

KTable<Windowed<String>, Long> clicksPerUser = clicks
    .groupByKey()
    .windowedBy(
        // Kafka Streams 3.x 推荐使用 ofSizeWithNoGrace(无宽限期)
        // 或 ofSizeAndGrace(明确指定宽限期)
        TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
    )
    .count(Materialized.as("tumbling-click-counts"));

// 输出时 key 是 Windowed<String>,包含 key + 窗口起止时间
clicksPerUser.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        String.format("%s@[%d-%d]", 
            windowedKey.key(),
            windowedKey.window().start(),
            windowedKey.window().end()),
        count
    ))
    .to("click-counts-output", Produced.with(Serdes.String(), Serdes.Long()));

Hopping Window(跳跃窗口)

固定大小、允许重叠的窗口。窗口大小(size)> 推进步长(advance),因此相邻窗口会重叠,同一条消息可能属于多个窗口。适合"最近 1 小时的滑动报表,每 10 分钟更新一次"场景。

窗口大小: 10min, 步长: 5min
---[0-10)---[5-15)---[10-20)---
事件: a(t=3), b(t=7), c(t=12)
窗口[0-10):  a, b
窗口[5-15):  b, c   ← b 同时属于两个窗口
窗口[10-20): c
KTable<Windowed<String>, Long> hoppingCounts = clicks
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(
            Duration.ofMinutes(10),   // 窗口大小
            Duration.ofMinutes(2)     // 宽限期:允许最迟 2 分钟的迟到事件
        ).advanceBy(Duration.ofMinutes(5))  // 步长 5 分钟
    )
    .count(Materialized.as("hopping-click-counts"));

内存成本:一条消息会被写入 size/advance 个窗口(此例为 2 个)。窗口重叠越多,状态存储和 changelog 的写入量越大。

Sliding Window(滑动窗口)

基于记录时间戳的相对距离划分窗口,与 Tumbling/Hopping 的基于绝对时间边界不同。一个 Sliding Window 包含所有时间戳差值在指定范围内的记录对。这意味着窗口的数量不是预定义的,而是随数据动态产生的。

适合"计算每个用户在任意 5 分钟区间内的最大操作频率"这类需要感知记录间距的场景。

KTable<Windowed<String>, Long> slidingCounts = clicks
    .groupByKey()
    .windowedBy(
        // 所有时间戳在 5 分钟以内的记录归为一组
        SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
    )
    .count(Materialized.as("sliding-click-counts"));

Session Window(会话窗口)

基于活动间隔划分窗口,窗口大小不固定。当两条记录的时间戳差值小于 inactivity gap 时,它们属于同一会话;超过 gap 则开启新会话。适合用户行为分析中的"会话"概念。

inactivity gap: 5min
事件: a(t=0), b(t=2), c(t=4), d(t=12), e(t=14)
会话1: a, b, c  (最大间隔 2min < 5min)
会话2: d, e     (c→d 间隔 8min > 5min,开新会话)
KTable<Windowed<String>, Long> sessionCounts = clicks
    .groupByKey()
    .windowedBy(
        SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))
    )
    .count(Materialized.as("session-click-counts"));

Session Window 的内部实现最为复杂:新记录到来时,需要查找并合并时间上相邻的现有会话,可能触发多个会话的合并(merge),也可能创建新会话。这使得 Session Window 的状态存储写入量远高于 Tumbling Window。

宽限期(Grace Period):处理迟到事件

现实世界中,事件时间戳小于当前流式时间的消息时常发生——移动网络抖动、日志批量上传、跨时区系统的时钟偏差。这些**迟到事件(late records)**如果不加处理,会被分配到已关闭的窗口而被忽略。

宽限期(Grace Period)给窗口一个"等待时间":窗口关闭后,仍然接受在宽限期内到达的迟到事件,并更新窗口的聚合结果(触发一次新的 KTable 更新)。

TimeWindows.ofSizeAndGrace(
    Duration.ofMinutes(5),   // 窗口大小
    Duration.ofMinutes(2)    // 宽限期:窗口关闭后再等 2 分钟
)

超过宽限期的迟到事件会被直接丢弃(dropped)。丢弃的事件数量可以通过 stream-metrics 中的 dropped-records-total 监控指标追踪。在生产环境中,你应该监控这个指标——如果它持续增长,说明你的 grace period 设置不足以覆盖实际的迟到分布,或者上游存在较严重的延迟问题。

如果需要对超出宽限期的记录进行特殊处理(而非简单丢弃),可以实现**死信队列(Dead Letter Queue)**模式:

// 在 flatMap 中,根据时间戳决定路由到正常处理流还是 DLQ topic
KStream<String, ClickEvent>[] branches = clicks
    .branch(
        (key, event) -> isWithinGrace(event),     // 正常处理
        (key, event) -> true                       // 超时 → DLQ
    );

branches[1].to("clicks-dlq", Produced.with(Serdes.String(), eventSerde));

三种 Join 类型与语义

KStream-KStream Join(流-流 Join)

两个无界流之间的 Join,必须有时间窗口约束,否则状态会无限增长。语义:记录 A 和记录 B 被 Join,当且仅当它们有相同的 key,且时间戳差值在指定窗口内。

KStream<String, OrderEvent> orders = builder.stream("orders");
KStream<String, PaymentEvent> payments = builder.stream("payments");

// 关联 key 相同、时间戳在 ±10 分钟内的订单和支付
KStream<String, EnrichedOrder> enriched = orders.join(
    payments,
    // ValueJoiner:如何合并两侧的值
    (order, payment) -> new EnrichedOrder(order, payment),
    // JoinWindows:时间窗口
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)),
    StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);

实现原理:Kafka Streams 为两侧的流各维护一个时间窗口状态存储。新记录到来时,在另一侧的状态存储中查找时间窗口内的匹配记录,产出 Join 结果;同时将自己存入本侧状态存储,等待另一侧的匹配记录。

Join 有三种变体:

KStream-KTable Join(流-表 Join)

将流中的每条记录与表中的最新值 Join,不需要时间窗口。语义类似数据库的"实时维度表查询":每条流记录到来时,用它的 key 去查表,返回表中最新的对应值。

KStream<String, ClickEvent> clicks = builder.stream("clicks");
// KTable:用户配置表(用户 ID → 用户配置)
KTable<String, UserProfile> userProfiles = builder.table("user-profiles");

// 每次 click 到来,用 user_id 查最新的用户配置
KStream<String, EnrichedClick> enrichedClicks = clicks.join(
    userProfiles,
    (click, profile) -> new EnrichedClick(click, profile),
    Joined.with(Serdes.String(), clickSerde, profileSerde)
);

关键特性:KTable 的更新(来自 user-profiles topic 的消息)是实时的,新的流记录会使用最新的表值。但已处理的流记录不会因为表更新而重新计算——这是 KStream-KTable Join 与 KTable-KTable Join 的核心区别。

KStream-KTable Join 是非等待的:流记录到来时,如果表中没有对应的 key,leftJoin 会输出 null 值,join(inner)则不输出任何结果。这与 KStream-KStream Join 不同,后者会等待窗口时间内的另一侧匹配。

KTable-KTable Join(表-表 Join)

两个 KTable 之间的 Join。当任一侧更新时,重新计算与另一侧当前值的 Join 结果,产出新的 KTable。语义等价于数据库中两张物化视图的 Join。

KTable<String, UserProfile> profiles = builder.table("user-profiles");
KTable<String, UserSettings> settings = builder.table("user-settings");

// 任一侧更新时,用最新值重新计算 Join
KTable<String, EnrichedUser> enrichedUsers = profiles.join(
    settings,
    (profile, setting) -> new EnrichedUser(profile, setting),
    Materialized.as("enriched-users")
);

实现原理:Kafka Streams 在每个 Task 中维护两个 KTable 的完整状态副本。当 profiles 更新时,用新的 profile 值加上当前的 settings 值计算 Join;当 settings 更新时,用新的 settings 值加上当前的 profiles 值计算 Join。输出是一个新的 KTable,也会有对应的 changelog topic。


Level 2 · 它是怎么运行的(3-5年经验)

Exactly-Once v2:端到端精确一次语义

为什么精确一次是困难的

At-least-once 处理很容易实现:处理完消息、写出结果之后提交 offset。如果在提交 offset 之前崩溃,重启后从上次 offset 重新处理,结果可能重复写出。

At-most-once 也容易:提交 offset 之后再处理消息。如果在处理过程中崩溃,那条消息被跳过,不重复但有丢失。

Exactly-once(精确一次)要求:即使在任何时间点崩溃,每条消息的处理效果恰好出现一次。这需要把"消费 offset 提交"和"生产输出写入"变成一个原子操作。

EOS v2 的实现机制

Kafka 的 Exactly-Once Semantics(EOS)通过事务 API 实现:producer.beginTransaction() → 处理消息 → producer.sendOffsetsToTransaction()producer.commitTransaction()。整个过程是原子的:要么全部成功(输出写入 + offset 提交),要么全部回滚。

EOS v1(Kafka Streams 早期版本)为每个 Task 创建一个独立的事务性 Producer。N 个 Task = N 个 Producer = N 个 PID(Producer ID),Broker 需要为每个 PID 维护幂等性状态,带来了显著的内存和协调开销。

EOS v2(Kafka 2.5+,Kafka Streams 通过 exactly_once_v2 启用)改为每个 StreamThread 使用一个事务性 Producer

// 启用 EOS v2(推荐,需要 Kafka Broker >= 2.5)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
    StreamsConfig.EXACTLY_ONCE_V2);

// EOS v1(兼容旧版本 Broker,不推荐新部署使用)
// props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
//     StreamsConfig.EXACTLY_ONCE);

EOS v2 的事务流程:

StreamThread 的每次 commit 循环:
1. producer.beginTransaction()
2. 处理 N 条消息(调用处理拓扑)
3. producer.send(output_records)         // 发送所有输出消息
4. producer.sendOffsetsToTransaction(    // 原子地提交消费 offset
       consumer.currentOffsets(),
       consumer.groupMetadata()          // v2 用 groupMetadata 替代 groupId
   )
5. producer.commitTransaction()          // 事务提交,输出可见

groupMetadata() 是 EOS v2 的关键改进。它将消费者组的 generation ID 包含在事务中,使 Broker 能检测到 "zombie 消费者"(实例已崩溃但事务未完成的旧实例)的事务并拒绝它,防止"幽灵写入"。

性能影响

Exactly-once 的语义保证有性能代价:

配置 典型吞吐量 延迟 适用场景
at_least_once 基准(100%) 最低 幂等操作,允许重复
exactly_once_v2 80-90% 略高(事务提交开销) 金融交易、计费系统
exactly_once(v1) 70-80% 最高(多 Producer 协调) 遗留系统兼容

10-20% 的吞吐量损失来自事务协调开销:beginTransactioncommitTransaction 各需要一次 Broker 往返,每次提交周期(commit.interval.ms,默认 100ms)至少两次额外的网络请求。适当增大 commit.interval.ms(如 1000ms)可以摊薄这个开销,代价是更长的恢复窗口(崩溃后可能重新处理最多 commit.interval.ms 内的消息)。

EOS 的范围边界

必须明确:Kafka Streams 的 EOS 保证的是 Kafka-to-Kafka 的精确一次——消息从输入 topic 到输出 topic 的处理效果精确一次。

如果你的处理逻辑涉及外部系统(写数据库、调用 REST API),这些操作不在 Kafka 事务范围内。要实现端到端的 exactly-once,需要额外的机制:

理解这个边界是正确评估 EOS 价值的前提。Kafka Streams 的 EOS 保证的是"流拓扑内部"的精确一次,而非"全系统"的精确一次。这已经足够解决绝大多数实际问题:重复的聚合计数、重复的事件驱动通知、重复的 KTable 更新。


Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

以下是与"窗口、Join 与 Exactly-Once 流处理"相关的常见边界问题和生产陷阱:

陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。

陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。

陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。

本章评分
4.6  / 5  (5 评分)

💬 留言讨论