Windows, Joins and Exactly-Once Stream Processing
The Fundamental Problem of Time
Stream processing differs from batch processing in one essential way: the data never ends. To answer a question like "how many times did each user click an ad in the last five minutes," a batch job takes a snapshot at some point in time. A stream processor must maintain that count as events continuously arrive, while precisely defining what "five minutes" means.
This is the problem windows solve: slicing an infinite event stream into finite time buckets so that aggregations become meaningful and bounded.
Time itself is treacherous in distributed systems. Kafka Streams distinguishes three notions of time:
- Event time: When the event actually occurred, recorded in the message payload or in the Kafka record's
timestampfield. The most semantically accurate, but events can arrive late. - Ingestion time: When the message was written to Kafka (the broker-assigned timestamp). Slightly later than event time, but always monotonically increasing per partition.
- Processing time: The wall-clock time when Kafka Streams processes the record. Easiest to implement but semantically weakest — processing delays skew window boundaries.
Kafka Streams uses the Kafka record timestamp by default and exposes TimestampExtractor to pull custom timestamps from the message body:
public class EventTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record,
long partitionTime) {
// partitionTime: the largest timestamp seen so far on this partition
// Fall back to partitionTime if extraction fails, rather than -1
if (record.value() instanceof MyEvent event) {
return event.getEventTimestampMs();
}
return partitionTime;
}
}
builder.stream("clicks",
Consumed.with(Serdes.String(), eventSerde)
.withTimestampExtractor(new EventTimestampExtractor()));
Four Window Types
Tumbling Windows: Fixed, Non-Overlapping
A tumbling window has a fixed size and no overlap. Every record belongs to exactly one window. The window boundary is aligned to the epoch (midnight UTC by default). This is the right choice for "generate a report every five minutes" use cases.
Timeline: ---[0min----5min)---[5min---10min)---[10min---15min)---
Events: a b c d e f g
Window 1 (0-5): a, b, c, d
Window 2 (5-10): e, f
Window 3 (10-15): g
KStream<String, ClickEvent> clicks = builder.stream("clicks");
KTable<Windowed<String>, Long> clicksPerUser = clicks
.groupByKey()
.windowedBy(
// ofSizeWithNoGrace: window closes immediately when stream time
// advances past the window end. No late records accepted.
// ofSizeAndGrace: explicit grace period for late arrivals.
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
)
.count(Materialized.as("tumbling-click-counts"));
// Output key is Windowed<String>, containing the key plus window boundaries
clicksPerUser.toStream()
.map((windowedKey, count) -> KeyValue.pair(
String.format("%s@[%d-%d]",
windowedKey.key(),
windowedKey.window().start(),
windowedKey.window().end()),
count
))
.to("click-counts-output", Produced.with(Serdes.String(), Serdes.Long()));
Hopping Windows: Fixed, Overlapping
A hopping window has a fixed size and a fixed advance interval where size > advance, so consecutive windows overlap. The same record can belong to multiple windows. This is appropriate for "compute a one-hour rolling metric, refreshed every ten minutes."
Window size: 10min, advance: 5min
---[0-10)---[5-15)---[10-20)---
Events: a(t=3), b(t=7), c(t=12)
Window[0-10): a, b
Window[5-15): b, c ← b belongs to two windows
Window[10-20): c
KTable<Windowed<String>, Long> hoppingCounts = clicks
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10), // window size
Duration.ofMinutes(2) // grace period for late records
).advanceBy(Duration.ofMinutes(5)) // slide interval
)
.count(Materialized.as("hopping-click-counts"));
The memory cost scales with the overlap ratio: each record is written to size / advance windows (two in this example). Wide overlaps with large windows can multiply state store write amplification significantly — a 60-minute window with a 1-minute advance writes each record to 60 windows. Monitor state store size and changelog write rates when using high-overlap configurations.
Sliding Windows: Record-Timestamp-Proximity Based
Sliding windows are fundamentally different from tumbling and hopping windows, which use absolute time boundaries. A sliding window groups all records whose timestamps fall within a specified duration of each other. Window boundaries are not predetermined — they shift dynamically based on when records arrive.
This is appropriate for detecting bursts of activity: "find all users who performed more than 10 actions within any five-minute span."
KTable<Windowed<String>, Long> slidingCounts = clicks
.groupByKey()
.windowedBy(
// All records within 5 minutes of each other are grouped together
SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
)
.count(Materialized.as("sliding-click-counts"));
Because window boundaries are determined by record timestamps rather than fixed epochs, the number of distinct windows is proportional to the number of distinct record clusters in the data. Sparse data produces few windows; bursty data can produce many overlapping windows.
Session Windows: Activity-Gap Based
Session windows have variable sizes determined by gaps in activity. Records are grouped into the same session when their timestamp difference is smaller than the inactivity gap. A new session starts when the gap exceeds the threshold.
Inactivity gap: 5 min
Events: a(t=0), b(t=2), c(t=4), d(t=12), e(t=14)
Session 1: a, b, c (max gap: 2 min < 5 min)
Session 2: d, e (gap c→d: 8 min > 5 min → new session)
KTable<Windowed<String>, Long> sessionCounts = clicks
.groupByKey()
.windowedBy(
SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))
)
.count(Materialized.as("session-click-counts"));
Session windows are the most expensive to implement internally. When a new record arrives, Kafka Streams must find all existing sessions within the inactivity gap distance of the new record's timestamp, merge them into a single session if they exist, and update the merged session's value. This can involve reading and writing multiple existing state store entries per record, making session windows significantly more write-intensive than tumbling windows. For high-throughput streams, profile state store write rates before choosing session windows in production.
Grace Period: Handling Late Records
In production systems, events routinely arrive after their event timestamps suggest they should have. Mobile clients buffer events during connectivity loss and upload them in bulk. Services in different time zones have clock skew. Log shipping pipelines introduce processing delays. These late records have timestamps earlier than the current stream time.
Without accommodation, a record arriving after its window has closed is simply discarded — the window has already been finalized. Grace period gives each window a buffer after its nominal close time during which late records are still accepted and included in the window's aggregation:
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5), // window size
Duration.ofMinutes(2) // grace period: accept late records for 2 more minutes
// after the window's nominal close time
)
Records arriving after the grace period expires are dropped silently by default. The number of dropped records is tracked by the dropped-records-total metric in the stream-record-cache-metrics group. Monitor this metric in production. A sustained increase indicates either that your grace period is shorter than your actual late-arrival distribution, or that upstream systems are experiencing significant delays.
For records that must not be lost even if they arrive extremely late, implement a dead letter queue pattern:
// Branch based on whether the record is within an acceptable lateness bound
@SuppressWarnings("unchecked")
KStream<String, ClickEvent>[] branches = clicks.branch(
Named.as("branch-"),
(key, event) -> isWithinAcceptableBound(event), // route to normal processing
(key, event) -> true // route to DLQ
);
KStream<String, ClickEvent> normalFlow = branches[0];
KStream<String, ClickEvent> dlqFlow = branches[1];
dlqFlow.to("clicks-dead-letter-queue",
Produced.with(Serdes.String(), eventSerde));
Three Join Types and Their Semantics
KStream-KStream Join: Windowed Stream-Stream Join
Joining two unbounded streams requires a time window constraint. Without it, the state store on each side would grow forever — for every record on the left, you would need to check every record ever seen on the right. The window constrains the join: record A and record B are joined only if they share the same key and their timestamps differ by at most the window duration.
KStream<String, OrderEvent> orders = builder.stream("orders");
KStream<String, PaymentEvent> payments = builder.stream("payments");
// Join orders with payments: same key, timestamps within ±10 minutes
KStream<String, EnrichedOrder> enriched = orders.join(
payments,
// ValueJoiner: how to combine the two sides
(order, payment) -> new EnrichedOrder(order, payment),
// JoinWindows: the time constraint
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
Internally, Kafka Streams maintains a windowed state store for each side of the join. When a record arrives on the left, it queries the right state store for records within the window, emits matches, and stores itself for future right-side matches. When a right-side record arrives, it does the symmetric operation.
Three join variants are available with distinct semantics:
join(inner): Only emits when both sides have a matching record within the window. Records without a match are simply not output.leftJoin(left outer): Immediately emits when a left record arrives. If no right match is found within the window, emits with a null right value. If a right match arrives later within the window, emits again with the actual right value.outerJoin(full outer): Emits for either side immediately, with null on the side that has not yet arrived. May emit multiple times as the other side arrives.
KStream-KTable Join: Enrich a Stream Against a Table
Joining a stream against a table is the Kafka Streams equivalent of a database lookup against a dimension table. Each stream record is enriched with the current value from the table at the time the record is processed. No time window is needed — the table always represents its latest state.
KStream<String, ClickEvent> clicks = builder.stream("clicks");
// KTable backed by the user-profiles topic — always holds latest profile per user
KTable<String, UserProfile> userProfiles = builder.table("user-profiles");
KStream<String, EnrichedClick> enrichedClicks = clicks.join(
userProfiles,
(click, profile) -> new EnrichedClick(click, profile),
Joined.with(Serdes.String(), clickSerde, profileSerde)
);
The critical behavioral distinction: when the user-profiles table is updated, that update is immediately reflected in joins for new stream records. However, already-processed stream records are not retroactively re-joined. If a user's profile changed between two click events, the first click was enriched with the old profile and the second click is enriched with the new profile.
KStream-KTable join is non-blocking: when a click record arrives, if the user profile table has no entry for that key, a leftJoin emits immediately with a null profile, and an inner join emits nothing. Unlike KStream-KStream join, there is no waiting for the other side within a time window.
KTable-KTable Join: Join Two Materialized Tables
When both sides are KTables, the join is re-evaluated whenever either side updates. The output is a new KTable representing the join of the two tables' current values.
KTable<String, UserProfile> profiles = builder.table("user-profiles");
KTable<String, UserSettings> settings = builder.table("user-settings");
// When either table updates, recompute the join with the other table's current value
KTable<String, EnrichedUser> enrichedUsers = profiles.join(
settings,
(profile, setting) -> new EnrichedUser(profile, setting),
Materialized.as("enriched-users")
);
Internally, each Task maintains local state stores for both KTables. When a profiles update arrives, Kafka Streams looks up the current settings value for the same key and emits an updated join result. When a settings update arrives, it looks up the current profiles value and emits again. The output enrichedUsers KTable accumulates these updates and has its own changelog topic.
KTable-KTable join is semantically equivalent to joining two database materialized views where either side can trigger a re-evaluation. It is useful for maintaining denormalized views of data that changes on both sides.
Exactly-Once v2: End-to-End Exactly-Once Semantics
Why Exactly-Once Is Difficult
At-least-once processing is straightforward: process a record, write the output, commit the offset. If the application crashes before committing the offset, the record is reprocessed on restart — possibly producing duplicate output. For idempotent operations (writing the same value to a count that already has that value), this is acceptable.
At-most-once is also simple: commit the offset before processing. If the application crashes during processing, the record is skipped — no duplicates but potential data loss.
Exactly-once requires that even under any failure scenario, each record's processing effects appear exactly once. This demands making "offset commit" and "output write" a single atomic operation. Kafka's transaction API makes this possible.
How EOS v2 Works
The Kafka Streams transaction cycle looks like this:
// Pseudocode for one StreamThread commit cycle under EOS v2
producer.beginTransaction();
// Process records accumulated since last commit
for (ConsumerRecord record : pendingRecords) {
processingTopology.process(record); // writes to state stores
}
// Flush all output records produced during this cycle
producer.flush();
// Atomically commit: output records become visible AND offsets advance
// sendOffsetsToTransaction links the consumer group's offset commit
// to the producer transaction — they succeed or fail together
producer.sendOffsetsToTransaction(
consumer.currentOffsets(),
consumer.groupMetadata() // includes generation ID for zombie fencing
);
producer.commitTransaction();
// At this point: output is visible AND offsets are committed. Atomic.
EOS v1 created one transactional producer per Task. With many Tasks (e.g., 100 partitions = 100 Tasks = 100 producers), each producer requires its own PID (Producer ID) registered with the broker, and the broker must maintain idempotency state per PID. This created significant coordination overhead.
EOS v2 (Kafka 2.5+) uses one transactional producer per StreamThread, not per Task. If you have 4 StreamThreads, you have 4 producers regardless of the Task count. The producer's groupMetadata() — which includes the consumer group's generation ID — is the key mechanism for zombie fencing: if an old producer (from a crashed instance) tries to commit a transaction after a new instance has taken over the partition, the broker recognizes the stale generation ID and rejects the transaction.
// Enable EOS v2 (requires Kafka Broker >= 2.5)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Tune commit interval: larger values amortize transaction overhead
// but extend the reprocessing window after a crash
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); // default: 100ms for EOS
Performance Comparison
Exactly-once guarantees come with measurable overhead. Benchmark results vary by hardware, topic configuration, and message size, but typical observations:
| Configuration | Relative Throughput | Latency | Best For |
|---|---|---|---|
at_least_once |
100% (baseline) | Lowest | Idempotent operations, deduplication at consumer |
exactly_once_v2 |
80–90% | Slightly higher | Financial transactions, billing, deduplication requirements |
exactly_once (v1) |
70–80% | Highest | Legacy broker compatibility only |
The throughput reduction in EOS v2 comes primarily from:
- Transaction overhead per commit:
beginTransactionandcommitTransactioneach require a broker round-trip. At the default EOS commit interval of 100ms, this adds ~20ms of latency per 100ms cycle — roughly 20% overhead. - Reduced batching: Transactions must be committed before their results are visible, which limits opportunistic batching.
Increasing commit.interval.ms from 100ms to 1000ms reduces transaction overhead by 10x, at the cost of a longer reprocessing window if the application crashes (up to 1000ms of records may be reprocessed). For most production use cases, 500–1000ms is a good balance.
The Scope Boundary of EOS
Kafka Streams EOS guarantees Kafka-to-Kafka exactly-once processing. Records from an input topic are processed with their effects appearing exactly once in output topics and changelog topics. This covers the most common cases: duplicate aggregation counts, duplicate event-driven notifications, duplicate KTable updates.
External system writes (database inserts, REST API calls, S3 file writes) are outside the Kafka transaction boundary. If your process() handler writes to a database and then the application crashes before committing the Kafka transaction, the database write has already happened but the Kafka offset is not committed. On restart, the record is reprocessed and the database is written again — you have a duplicate in the external system despite EOS enabled in Kafka Streams.
Solving this requires additional mechanisms:
- Idempotent writes: Database upsert with a business-unique key derived from the Kafka record (topic + partition + offset)
- Transactional outbox pattern: Write to a Kafka topic atomically (covered by Kafka transactions), then let a separate service propagate to the external system
- Two-phase commit: Technically possible but impractical due to performance and complexity cost
Understanding this boundary is what allows you to make accurate guarantees to your users. Kafka Streams EOS is not a silver bullet for all consistency requirements — it is a precisely scoped guarantee that eliminates a specific and common class of duplicates in Kafka-native architectures.