Kafka Streams Architecture: Topology, Threads and State
Kafka Streams Is a Library, Not a Cluster
The single most important thing to understand about Kafka Streams is what it is not: it is not a standalone compute cluster like Apache Flink or Apache Spark Streaming. It is an ordinary Java library โ a Maven dependency you add to your application. Your main() method starts it, and you get a distributed, stateful, exactly-once stream processor running inside your JVM process. No cluster manager, no separate JobManager, no YARN or Mesos required.
This design choice reflects a deliberate philosophy: use Kafka itself as the coordination infrastructure. Partition assignment is handled by Kafka's consumer group protocol. Failure detection uses consumer group heartbeats. State persistence is backed by Kafka changelog topics. Elastic scaling means starting or stopping application instances โ Kafka's rebalance redistributes partitions automatically. The entire system has no external dependencies beyond Kafka itself.
The trade-offs are real. There is no unified web UI for monitoring all jobs. There is no fine-grained resource quota enforcement. There is no cross-job scheduling. Kafka Streams deliberately leaves these concerns to the container orchestration layer (Kubernetes) and your existing observability stack (Prometheus, Grafana). Understanding this scope is what lets you choose Kafka Streams confidently for the problems it solves, and choose Flink or Spark when you genuinely need what they offer.
StreamsConfig and StreamsBuilder
Every Kafka Streams application begins with configuration:
Properties props = new Properties();
// Application ID doubles as the consumer group ID
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
// Default key/value serialization
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
// Offset commit interval for at-least-once processing
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// Local directory for RocksDB state stores
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
APPLICATION_ID_CONFIG is the most consequential configuration. It serves three purposes simultaneously: the Kafka consumer group ID (controlling which instances form a processing cluster), the prefix for all internal changelog topics (<app-id>-<store-name>-changelog), and the prefix for all internal repartition topics. Any JVM process configured with the same application.id automatically joins the same processing cluster.
StreamsBuilder provides the high-level DSL for defining processing logic:
StreamsBuilder builder = new StreamsBuilder();
// Create a KStream from a Kafka topic
KStream<String, String> textLines = builder.stream("text-input");
// Define processing logic using the fluent DSL
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"));
// Write results back to Kafka
wordCounts.toStream().to("wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
// Build the topology DAG description โ no threads started yet
Topology topology = builder.build();
// Print the topology graph โ invaluable for debugging and documentation
System.out.println(topology.describe());
The Topology object returned by builder.build() is a pure description of the processing DAG. No threads are started, no Kafka connections are made. Calling topology.describe() prints a human-readable graph:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [text-input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> KSTREAM-KEY-SELECT-0000000002
Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
--> KSTREAM-FILTER-0000000003
...
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000008
(topics: [wordcount-app-word-count-store-repartition])
--> KSTREAM-AGGREGATE-0000000006
Processor: KSTREAM-AGGREGATE-0000000006 (stores: [word-count-store])
--> KTABLE-TOSTREAM-0000000009
The topology is split into two sub-topologies. This split is caused by the groupBy operation, which changes the record key. Because stateful aggregation requires that records with the same key always arrive at the same Task, Kafka Streams must write records through an internal repartition topic after the key change. This repartition topic is automatically created and managed by the framework. Reading its name from topology.describe() helps diagnose unexpected repartition overhead.
StreamThread: The Thread Is the Scheduling Unit
When you call streams.start(), Kafka Streams spawns num.stream.threads (default: 1) StreamThread instances. Each StreamThread is a Java thread that owns:
- A dedicated
KafkaConsumerfor polling records from source partitions - A dedicated
KafkaProducerfor writing output records and changelog messages - A set of
StreamTaskinstances assigned to it by the partition assignor
The StreamThread main loop is conceptually simple โ it is a perpetual poll loop:
// Simplified StreamThread main loop (actual code in StreamThread.java)
while (isRunning()) {
// 1. Poll Kafka for new records, handling any pending rebalance
ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTime);
// 2. Route fetched records to the correct Task's input buffer
for (StreamTask task : activeTasks) {
task.addRecords(task.inputPartition(), records);
}
// 3. Drive each Task to process one record
for (StreamTask task : activeTasks) {
task.process();
}
// 4. Periodically commit offsets and flush state
if (shouldCommit()) {
commitAll();
}
// 5. Flush pending changelog writes to Kafka
for (StreamTask task : activeTasks) {
task.flushState();
}
}
It is important to distinguish between multi-threading and multi-instance scaling. Setting num.stream.threads=4 in a single JVM creates 4 StreamThreads within one process, each processing different Tasks in parallel. They share JVM heap, GC pressure, and OS resources. Horizontal scaling means launching multiple separate JVM processes (Docker containers, Kubernetes pods) with the same application.id. Kafka's rebalance protocol redistributes partitions automatically across all participating instances.
StreamTask: The Minimal Unit of Processing
StreamTask is arguably the most fundamental concept in Kafka Streams. Each Task corresponds to a partition group โ a set of partitions that must be processed together. The number of Tasks is determined by the source topic partition count: for a single source topic with N partitions, there are N Tasks. For joined topics, both must have the same partition count, and partition i is always paired with partition i.
Each Task owns:
- A private
ProcessorContexttracking its processing position - Its own state store instances (each Task has its own RocksDB instance, in its own subdirectory under
state.dir) - An input record buffer (
RecordQueue) sorted by record timestamp, which determines processing order when multiple partitions feed the same Task
// Simplified Task processing loop (from StreamTask.java)
public boolean process() {
// Select the record with the smallest timestamp across all input queues
// This implements time-ordered processing across partitions
StampedRecord record = partitionGroup.nextRecord();
if (record == null) return false;
// Inject record metadata into the ProcessorContext
processorContext.setCurrentNode(sourceNode);
processorContext.setRecordContext(record);
// Invoke the source processor, which triggers the full processing chain
sourceNode.process(record.key(), record.value());
return true;
}
The per-Task isolation of state stores is crucial. It means that concurrent Tasks on different threads never share a RocksDB instance, eliminating lock contention on the hot path. It also means state migration is simply a matter of replaying the Task's changelog partition on a new host.
StreamsPartitionAssignor: Custom Partition Assignment
Kafka Streams registers StreamsPartitionAssignor as a custom ConsumerPartitionAssignor. During a consumer group rebalance, the Group Leader runs this assignor's assign() method to determine which Tasks go to which instances.
StreamsPartitionAssignor is dramatically more sophisticated than Kafka's built-in RangeAssignor or CooperativeStickyAssignor. Its logic must satisfy several constraints simultaneously:
-
Stickiness: Keep Tasks on the instance that already holds their state, avoiding costly state migration. The assignor reads each instance's current Task IDs and their latest changelog offsets from the
JoinGroupRequestmetadata and prefers assignments that require the least state catch-up. -
Standby placement: A Standby Task must never be on the same instance as its corresponding Active Task โ otherwise a single instance failure would lose both the active state and the standby replica.
-
Topology awareness: Tasks that belong to the same sub-topology are assigned to instances that can form a coherent processing pipeline, respecting repartition topic co-partitioning requirements.
Each Kafka Streams instance encodes its current Task ownership and changelog offsets into the metadata bytes of its JoinGroupRequest. The Group Leader's assignor reads this information and produces an assignment that minimizes state movement while satisfying all constraints.
Standby Tasks: The Secret to Fast Failover
Stateful stream processing has a fundamental availability problem: if the instance holding a state store crashes, the new instance that takes over must replay the entire changelog from the beginning to rebuild state. For a store with millions of keys and hours of history, this can take minutes. During that time, processing for those partitions is stalled.
Standby Tasks solve this problem. With num.standby.replicas=1, Kafka Streams maintains a shadow copy of each Active Task's state on a different instance. The Standby Task continuously consumes the changelog topic, keeping its local state synchronized with the Active Task โ typically lagging by only seconds.
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
When the Active Task's instance fails, the rebalance assignor detects which instances have Standby state and assigns the orphaned Task to the instance with the most up-to-date Standby state. That instance only needs to replay the changelog delta since its last Standby sync โ typically a few seconds of data. Failover time drops from minutes to seconds.
Standby Tasks consume resources: they read from changelog topics (network bandwidth), maintain local RocksDB instances (disk and memory), and increase the storage footprint of your Kafka cluster (more consumers on the changelog topics). The explicit trade-off is resource cost for availability. In production deployments processing critical business data, the cost is almost always justified.
State Stores: In-Memory vs RocksDB
Kafka Streams offers two categories of state store:
In-Memory Stores use a HashMap or TreeMap as the underlying data structure. Lookups are nanosecond-fast โ no disk I/O, no serialization overhead beyond what the Serde requires. The constraints are JVM heap size (large states cause GC pressure and potentially OutOfMemoryErrors) and that all state is lost on JVM restart (must be rebuilt from changelog). Use in-memory stores for small states (millions of entries, not billions) where latency is critical.
// Explicit in-memory store materialization
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> inMemStore =
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
as("my-in-memory-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withLoggingEnabled(Collections.emptyMap());
// Note: no .withPersistence() call โ defaults to in-memory
Persistent Stores (RocksDB) are the default for Kafka Streams. RocksDB is Facebook's open-source embedded key-value store built on an LSM-tree (Log-Structured Merge-tree). Data is stored on local disk in SSTable files. Reads are served from MemTable (write buffer in memory), Block Cache (compressed block cache), or actual disk I/O. RocksDB can handle state far larger than available JVM heap โ terabytes, if you have the disk.
RocksDB Tuning
Kafka Streams exposes RocksDB configuration through the RocksDBConfigSetter interface:
public class ProductionRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options,
Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
// Block Cache: the single most impactful tuning parameter.
// Default is 50MB โ almost always too small for production.
// Cache stores compressed data blocks read from SSTable files.
// Hot data that fits in cache is served at memory speed.
// Recommendation: 30-50% of available JVM off-heap memory.
tableConfig.setBlockCacheSize(512 * 1024 * 1024L); // 512MB
// Block size: trade-off between random read efficiency and
// sequential read overhead. 16KB is a good starting point.
tableConfig.setBlockSize(16 * 1024L); // 16KB
// Cache index and filter blocks alongside data blocks.
// Prevents index eviction under memory pressure โ important
// because index misses are expensive (additional disk reads).
tableConfig.setCacheIndexAndFilterBlocks(true);
tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
options.setTableFormatConfig(tableConfig);
// Write Buffer (MemTable): new writes go here first.
// When full, MemTable is flushed to disk as an SSTable.
// Default: 16MB. Under heavy write load, increase to reduce
// flush frequency and compaction amplification.
// Trade-off: larger buffers mean longer WAL replay on recovery.
options.setWriteBufferSize(64 * 1024 * 1024L); // 64MB
options.setMaxWriteBufferNumber(3); // allow up to 3 concurrent MemTables
// Compression: Snappy offers the best speed/ratio balance.
// LZ4 is faster with slightly lower compression ratio.
// ZSTD achieves highest compression but uses more CPU.
// For Kafka Streams, Snappy is recommended unless disk is the bottleneck.
options.setCompressionType(CompressionType.SNAPPY_COMPRESSION);
// Level 0 has no compression overhead by default
options.setCompressionPerLevel(Arrays.asList(
CompressionType.NO_COMPRESSION, // Level 0
CompressionType.NO_COMPRESSION, // Level 1
CompressionType.SNAPPY_COMPRESSION, // Level 2+
CompressionType.SNAPPY_COMPRESSION,
CompressionType.SNAPPY_COMPRESSION,
CompressionType.SNAPPY_COMPRESSION,
CompressionType.SNAPPY_COMPRESSION
));
// Compaction triggers: default values are conservative.
// Raise these if you see compaction stalls under write bursts.
options.setLevel0FileNumCompactionTrigger(10);
options.setLevel0SlowdownWritesTrigger(20);
options.setLevel0StopWritesTrigger(40);
// Max bytes per level: controls write amplification.
// Default is 256MB for Level 1. Scale proportionally.
options.setMaxBytesForLevelBase(256 * 1024 * 1024L);
}
@Override
public void close(String storeName, Options options) {
// No resources to release in this implementation
}
}
// Register the custom config
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
ProductionRocksDBConfig.class);
The Block Cache deserves special emphasis. RocksDB's read path is: MemTable โ Block Cache โ disk. If the data your queries access most frequently fits in the Block Cache, you get memory-speed reads regardless of total state size. The default 50MB cache is adequate for toy applications and completely inadequate for production workloads with even modest hot-data sizes. Monitor rocksdb.block.cache.hit and rocksdb.block.cache.miss JMX metrics to tune this value.
The Write Buffer affects write amplification and recovery time. A write that hits a full MemTable triggers a flush to disk, which may trigger compaction, which reads and rewrites existing SSTable files. Larger write buffers amortize this cost over more writes, but increase the time needed to replay the write-ahead log (WAL) during recovery after a crash.
Changelog Topics: Durable State Persistence
Every state store (in-memory or RocksDB, unless explicitly disabled) has a corresponding changelog topic in Kafka:
<application-id>-<store-name>-changelog
This topic uses log compaction (cleanup.policy=compact). Kafka's log compaction process periodically scans segments and retains only the most recent value for each key, discarding older versions. This keeps the changelog topic size proportional to the current state size rather than the total historical write volume.
Every write to a state store produces a corresponding message to the changelog topic:
State store write: key="kafka", value=42
โ (synchronous within the same StreamThread)
Changelog topic write: key="kafka", value=42 โ offset 1234
This write-through pattern enables three critical capabilities:
Fault recovery: When an instance restarts after a crash, Kafka Streams reads the local RocksDB checkpoint to find the last committed changelog offset, then replays changelog messages from that point to rebuild state. Only the delta since the last checkpoint needs replaying, not the full history.
Standby Task synchronization: Standby Tasks subscribe to changelog topics and apply the same writes as Active Tasks, maintaining a near-real-time replica of the state.
Task migration: When a Task moves to a new instance (scaling event or failover), the new instance replays the changelog from the beginning (or from its Standby checkpoint) to reconstruct state before resuming processing.
Changelog writes are produced by the StreamThread's embedded producer in batches, decoupled from the record processing latency. The overhead is primarily network bandwidth and Kafka storage. For extremely large states where recovery time is acceptable, you can disable logging โ but understand the consequences:
// Disabled changelog โ state is lost on restart, must replay source topics
Materialized.as("no-log-store")
.withLoggingDisabled()
You can also tune the changelog topic's retention and replication per store:
Materialized.as("tuned-store")
.withLoggingEnabled(Map.of(
"min.insync.replicas", "2",
"retention.ms", String.valueOf(7 * 24 * 3600 * 1000L), // 7 days
"segment.bytes", String.valueOf(100 * 1024 * 1024) // 100MB segments
));
Complete WordCount Example
Here is a production-grade WordCount implementation with full error handling, monitoring hooks, and topology inspection:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
public class WordCountApp {
public static void main(String[] args) {
Properties config = buildConfig();
Topology topology = buildTopology();
// Always print topology before deploying โ confirms repartition topics
// and sub-topology splits match expectations
System.out.println("=== Topology Description ===");
System.out.println(topology.describe());
KafkaStreams streams = new KafkaStreams(topology, config);
// Thread-level exception handler (Kafka Streams 3.x)
// REPLACE_THREAD restarts the failed StreamThread without
// shutting down the entire application
streams.setUncaughtExceptionHandler(ex -> {
System.err.printf("StreamThread crashed: %s%n", ex.getMessage());
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
.REPLACE_THREAD;
});
// State transition listener for observability
streams.setStateListener((newState, oldState) -> {
System.out.printf("[%s] %s โ %s%n",
Thread.currentThread().getName(), oldState, newState);
});
// Graceful shutdown hook
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {
@Override
public void run() {
System.out.println("Shutting down, flushing state...");
// Close with timeout: allows in-flight processing to complete
boolean closed = streams.close(Duration.ofSeconds(30));
if (!closed) {
System.err.println("Streams did not close cleanly within 30s");
}
latch.countDown();
}
});
streams.start();
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
static Properties buildConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-v2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka1:9092,kafka2:9092,kafka3:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/data/kafka-streams");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
// Custom RocksDB tuning
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
ProductionRocksDBConfig.class);
return props;
}
static Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(
"text-input",
Consumed.with(Serdes.String(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
);
KTable<String, Long> wordCounts = source
// flatMapValues preserves the key, avoiding an early repartition
.flatMapValues(line ->
Arrays.asList(line.toLowerCase().split("\\W+")))
.filter((key, word) -> !word.isEmpty())
// selectKey changes the key โ triggers repartition downstream
.selectKey((key, word) -> word)
// groupByKey: key is already the word, no further re-keying
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized
.<String, Long, KeyValueStore<Bytes, byte[]>>
as("word-count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
// Write caching batches changelog writes, trading
// a small freshness delay for significantly fewer
// changelog messages and reduced Kafka write pressure
.withCachingEnabled()
.withLoggingEnabled(Map.of(
"min.insync.replicas", "2"
))
);
wordCounts.toStream()
.to("wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
}
Interactive Queries: Reading State Without Kafka Round-Trips
Kafka Streams allows application code to read state stores directly, without routing queries through Kafka:
// Query local state (only partitions assigned to this instance)
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"word-count-store",
QueryableStoreTypes.keyValueStore()
)
);
Long count = store.get("kafka"); // Served from RocksDB, sub-millisecond latency
// Determine which instance owns a given key (for routing in a microservice mesh)
KeyQueryMetadata metadata = streams.queryMetadataForKey(
"word-count-store",
"kafka",
Serdes.String().serializer()
);
// metadata.activeHost() โ HostInfo(host, port) of the owning instance
// Your HTTP layer forwards the request there via internal RPC
This pattern is the foundation for CQRS architectures built on Kafka Streams: the stream processor writes to state stores, while HTTP endpoints serve reads directly from those stores โ all within the same JVM process.
When to Choose Kafka Streams
Kafka Streams excels when your team already operates Kafka and wants stream processing without adopting an additional system to run and monitor. It is the natural fit for microservice architectures where each service needs its own stateful processing logic, for Spring Boot applications that want to embed stream processing alongside REST endpoints, and for workloads where state fits comfortably on a single machine (from gigabytes to a few hundred gigabytes per Task).
When state grows into the terabyte range per Task, when you need unified resource management across dozens of jobs, or when you need more sophisticated out-of-order handling than Kafka Streams provides, Apache Flink becomes the better choice. Knowing Kafka Streams' architecture โ how Tasks map to partitions, how RocksDB stores state, how changelogs enable recovery โ is the prerequisite for making that call with confidence.