Chapter 25

ksqlDB: Real-Time Stream Processing with SQL

What ksqlDB Is

ksqlDB is a SQL layer built on top of Kafka Streams. It maps your Kafka topics to queryable "streams" and "tables," letting you filter, aggregate, join, and window your real-time data using familiar SQL syntax โ€” no Java code required. Every SQL statement you submit is compiled by ksqlDB's query engine into a Kafka Streams topology, which runs continuously in the background.

Understanding ksqlDB as "a SQL compiler for Kafka Streams" rather than a separate stream processing engine is fundamental to using it correctly. Its performance characteristics are identical to Kafka Streams: state store I/O, Kafka network bandwidth, and JVM garbage collection are the bottlenecks. Its tuning levers are the same: RocksDB configuration, num.stream.threads, and horizontal scaling by adding ksqlDB server instances.

ksqlDB stores all query definitions in internal Kafka topics (_confluent-ksql-<appid>__command_topic). When a ksqlDB server restarts, it replays this topic to restore all running queries automatically โ€” a self-healing property that mirrors how Kafka Streams uses Kafka for coordination.

Two Deployment Modes

Interactive Mode

In interactive mode, the ksqlDB server exposes a REST API and WebSocket endpoints that accept dynamically submitted SQL statements. The KSQL CLI connects to these endpoints:

# Connect to ksqlDB server via CLI
ksql http://ksqldb-server:8088

# Or submit statements directly via REST
curl -X POST http://ksqldb-server:8088/ksql \
  -H "Content-Type: application/vnd.ksql.v1+json" \
  -d '{"ksql": "SHOW STREAMS;", "streamsProperties": {}}'

Interactive mode is appropriate for development, debugging, and interactive data exploration. Queries can be created, modified, and dropped at runtime. The cost is that changes are not version-controlled unless you explicitly track them.

Headless Mode

In headless mode, the ksqlDB server loads a pre-defined SQL file at startup (--queries-file), executes all statements, and runs continuously. The REST API is either disabled or restricted to monitoring endpoints.

# Start in headless mode
ksql-server-start /etc/ksqldb/ksqldb-server.properties \
  --queries-file /etc/ksqldb/production-queries.sql

Headless mode is designed for production CI/CD deployments. The SQL file is version-controlled, deployed through your pipeline, and rolled back by reverting the file and redeploying. Queries are immutable at runtime โ€” modifying a query requires updating the file and restarting the server, just as changing application code requires a new deployment. This constraint is a feature in production: it prevents ad-hoc changes that bypass review processes.

STREAM vs TABLE: The Core Semantic Distinction

The most important conceptual choice in ksqlDB is whether to declare a topic as a STREAM or a TABLE.

A STREAM is an unbounded sequence of events. Every message in the topic is an independent event. The complete history of the topic is part of the stream. Streams have no concept of "current state" โ€” they are a record of everything that ever happened.

A TABLE is a materialized view of the latest value per key. It consumes a Kafka topic and maintains, for each key, only the most recent value. Tables have current state โ€” semantically similar to a database table.

-- STREAM declaration: map the orders topic's message schema
-- Every new order event becomes a row in this stream
CREATE STREAM orders (
    id         INT,
    amount     DECIMAL(10, 2),
    user_id    INT,
    status     VARCHAR,
    created_at BIGINT   -- event timestamp in milliseconds
) WITH (
    KAFKA_TOPIC  = 'orders',
    VALUE_FORMAT = 'AVRO',
    TIMESTAMP    = 'created_at'   -- use created_at as event time
);

-- TABLE: aggregate orders stream into per-user totals
-- This creates a continuously running Kafka Streams topology
-- Results are written to a new Kafka topic (USER_TOTALS)
-- and materialized for Pull Query access
CREATE TABLE user_totals AS
    SELECT
        user_id,
        SUM(amount)     AS total_amount,
        COUNT(*)        AS order_count,
        MAX(created_at) AS last_order_time
    FROM orders
    WHERE status = 'COMPLETED'
    GROUP BY user_id
EMIT CHANGES;

The CREATE TABLE ... AS SELECT statement creates a persistent query that runs forever, continuously updating the USER_TOTALS table as new COMPLETED orders arrive. The underlying Kafka Streams topology has a RocksDB state store for the aggregation and a changelog topic for fault tolerance.

DDL in Depth

Streams from Existing Topics

-- Map an existing topic without modifying it
CREATE STREAM raw_clicks (
    user_id  VARCHAR KEY,   -- KEY marks this as the Kafka message key
    url      VARCHAR,
    referrer VARCHAR,
    device   VARCHAR,
    ts       BIGINT
) WITH (
    KAFKA_TOPIC  = 'raw-clicks',
    VALUE_FORMAT = 'JSON',
    PARTITIONS   = 12    -- if topic doesn't exist, ksqlDB creates it
);

-- Derived stream: filter to mobile clicks only
-- Creates a new Kafka topic 'MOBILE_CLICKS' automatically
CREATE STREAM mobile_clicks AS
    SELECT
        user_id,
        url,
        TIMESTAMPTOSTRING(ts, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS click_time
    FROM raw_clicks
    WHERE device = 'mobile'
EMIT CHANGES;

Windowed Aggregations

ksqlDB supports all four Kafka Streams window types through SQL syntax:

-- Tumbling window: click count per user per 5-minute bucket
CREATE TABLE clicks_per_user_5min AS
    SELECT
        user_id,
        COUNT(*)    AS click_count,
        WINDOWSTART AS window_start,   -- built-in: window start epoch ms
        WINDOWEND   AS window_end      -- built-in: window end epoch ms
    FROM raw_clicks
        WINDOW TUMBLING (SIZE 5 MINUTES, GRACE PERIOD 1 MINUTE)
    GROUP BY user_id
EMIT CHANGES;

-- Hopping window: 1-hour window, advancing every 15 minutes
CREATE TABLE hourly_click_trend AS
    SELECT
        user_id,
        COUNT(*) AS click_count
    FROM raw_clicks
        WINDOW HOPPING (SIZE 1 HOUR, ADVANCE BY 15 MINUTES,
                        GRACE PERIOD 5 MINUTES)
    GROUP BY user_id
EMIT CHANGES;

-- Session window: group activity with up to 30 minutes of inactivity
CREATE TABLE user_sessions AS
    SELECT
        user_id,
        COUNT(*)    AS actions_in_session,
        WINDOWSTART AS session_start,
        WINDOWEND   AS session_end
    FROM raw_clicks
        WINDOW SESSION (30 MINUTES, GRACE PERIOD 5 MINUTES)
    GROUP BY user_id
EMIT CHANGES;

Push Queries vs Pull Queries

This is ksqlDB's most distinctive feature relative to traditional databases โ€” two fundamentally different query execution modes.

Push Queries: Continuous Streaming Results

A push query runs indefinitely, streaming results to the client as they change. The server maintains an open connection and pushes new result rows whenever the underlying stream or table updates. Push queries are identified by the EMIT CHANGES clause.

-- Push query: stream real-time click counts as they update
SELECT user_id, click_count, window_start
FROM clicks_per_user_5min
EMIT CHANGES;

Push queries are appropriate for real-time dashboards, WebSocket feeds, alerting systems, and any use case where you want to react to changes as they happen. The client receives a stream of JSON objects over a persistent HTTP connection:

# Push query via REST API โ€” connection stays open, results stream in
curl -N -X POST http://ksqldb-server:8088/query-stream \
  -H "Content-Type: application/vnd.ksql.v1+json" \
  -d '{
    "sql": "SELECT user_id, click_count FROM clicks_per_user_5min EMIT CHANGES;",
    "properties": {}
  }'

# Streaming response (newline-delimited JSON):
# {"header":{"queryId":"transient_...","schema":"`USER_ID` STRING, `CLICK_COUNT` BIGINT"}}
# {"row":{"columns":["user-123",47]}}
# {"row":{"columns":["user-456",12]}}
# ... continues indefinitely

Pull Queries: Point-in-Time Lookups

A pull query executes once, returns the current state of a materialized table, and closes. The semantics are identical to a traditional database SELECT โ€” request, response, done. Latency is typically sub-millisecond for local lookups.

-- Pull query: look up a specific user's current total (no EMIT CHANGES)
SELECT user_id, total_amount, order_count
FROM user_totals
WHERE user_id = 42;

Pull queries are only valid against materialized tables (created with GROUP BY). You cannot pull-query a stream โ€” streams have no concept of current state.

ksqlDB uses Kafka Streams' Interactive Queries mechanism to serve pull queries. If the requested key belongs to a partition held by a different ksqlDB server instance, the receiving instance automatically routes the query via internal HTTP to the correct instance and returns the result. This is completely transparent to the client.

# Pull query via REST
curl -X POST http://ksqldb-server:8088/query \
  -H "Content-Type: application/vnd.ksql.v1+json" \
  -d '{
    "ksql": "SELECT user_id, total_amount FROM user_totals WHERE user_id = 42;",
    "streamsProperties": {}
  }'

Joins in ksqlDB

Stream-Stream Join

-- Join orders and payments within a 10-minute window
CREATE STREAM order_payments AS
    SELECT
        o.id              AS order_id,
        o.amount          AS order_amount,
        p.payment_id,
        p.method          AS payment_method,
        p.processed_at    AS payment_time
    FROM orders o
        INNER JOIN payments p
            WITHIN 10 MINUTES GRACE PERIOD 2 MINUTES
            ON o.id = p.order_id
EMIT CHANGES;

The WITHIN 10 MINUTES clause maps to Kafka Streams' JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)). Records on both sides are buffered in state stores for the window duration. A match is emitted when both sides have a record with the same key within the time window.

Stream-Table Join: Dimension Enrichment

-- Create a dimension table for users
CREATE TABLE users (
    user_id  INT PRIMARY KEY,
    name     VARCHAR,
    country  VARCHAR,
    tier     VARCHAR
) WITH (
    KAFKA_TOPIC  = 'users',
    VALUE_FORMAT = 'AVRO'
);

-- Enrich each order with user information at processing time
CREATE STREAM enriched_orders AS
    SELECT
        o.id          AS order_id,
        o.amount,
        o.status,
        u.name        AS user_name,
        u.country     AS user_country,
        u.tier        AS user_tier
    FROM orders o
        LEFT JOIN users u ON o.user_id = u.user_id
EMIT CHANGES;

The LEFT JOIN ensures orders are still emitted even if the user record hasn't arrived yet (the user fields will be null). The users table always reflects the latest user state โ€” if a user updates their country between two orders, the first order gets the old country and the second gets the new one.

Table-Table Join

-- Join two tables: recomputes when either side updates
CREATE TABLE enriched_user_profiles AS
    SELECT
        b.user_id,
        b.name,
        b.email,
        s.preferred_currency,
        s.notification_enabled,
        s.theme
    FROM user_basic_info b
        INNER JOIN user_settings s ON b.user_id = s.user_id
EMIT CHANGES;

ksqlDB + Kafka Connect Integration

ksqlDB provides a SQL interface for managing Kafka Connect connectors, enabling end-to-end pipeline management in one place:

-- Create a Debezium MySQL CDC source connector
CREATE SOURCE CONNECTOR debezium_orders WITH (
    'connector.class'                          = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname'                        = 'mysql.internal',
    'database.port'                            = '3306',
    'database.user'                            = 'debezium',
    'database.password'                        = '${file:/etc/secrets/mysql.properties:password}',
    'database.server.name'                     = 'production',
    'table.include.list'                       = 'shop.orders,shop.order_items',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092',
    'database.history.kafka.topic'             = 'dbhistory.production',
    'transforms'                               = 'route',
    'transforms.route.type'                    = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.route.blacklist'               = 'internal_notes'
);

-- Check connector status
DESCRIBE CONNECTOR debezium_orders;
-- Output: connector status, task count, task states, error details

-- Create a sink connector to write processed results to Elasticsearch
CREATE SINK CONNECTOR es_user_totals WITH (
    'connector.class'    = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'connection.url'     = 'http://elasticsearch:9200',
    'topics'             = 'USER_TOTALS',
    'key.ignore'         = 'false',
    'schema.ignore'      = 'false',
    'type.name'          = '_doc'
);

This enables a complete, SQL-managed pipeline: MySQL โ†’ Kafka (via Debezium) โ†’ ksqlDB aggregations โ†’ Elasticsearch (via Sink connector), with all components configured and monitored through a single SQL interface.

Choose ksqlDB When

ksqlDB shines for rapid prototyping and simple transformations. Non-Java developers (BI analysts, data engineers with SQL backgrounds) can build and deploy real-time transformations without learning Kafka Streams APIs. It is also the right choice when you want Kafka Connect management integrated with your stream processing definitions.

Limitations to be aware of: complex custom logic still requires Java UDFs; debugging is harder because SQL abstracts away the underlying topology; and performance tuning options are more limited than direct Kafka Streams usage.

Choose Kafka Streams When

Choose Kafka Streams for complex stateful logic that does not map naturally to SQL (state machines, recursive patterns, custom aggregations), when you need fine-grained control over RocksDB and thread configuration, when the processor is part of a microservice that also serves HTTP traffic, or when per-record latency is a hard requirement that SQL planning overhead would violate.

Dimension ksqlDB / Kafka Streams Apache Flink
State scale per task Single machine (GB range) Distributed (TB range)
Out-of-order handling Grace period (basic) Watermarks + complex trigger strategies
Exactly-once for sinks Kafka-to-Kafka only Any transactional external system
Operational overhead Low (no extra cluster) High (JobManager + TaskManager cluster)
SQL completeness ksqlDB dialect, limited Near ANSI-standard Flink SQL
Batch + stream unification Not supported Complete support

Where Flink SQL outperforms ksqlDB concretely:

Flink's watermark system provides per-source configurable lateness tolerance with multiple built-in strategies (bounded out-of-orderness, monotonous timestamps, custom). ksqlDB's GRACE PERIOD is a simpler approximation. For data with high and variable lateness, Flink's watermark semantics give more precise control.

Flink's exactly-once guarantees extend to external sinks โ€” JDBC, HDFS, HBase, Iceberg โ€” when the connector implements TwoPhaseCommitSinkFunction. ksqlDB's exactly-once is Kafka-to-Kafka only. For pipelines that must produce exactly-once to a database, Flink is the appropriate choice.

Where ksqlDB outperforms Flink concretely:

Deployment simplicity is ksqlDB's decisive advantage. A ksqlDB cluster is a set of JVM processes with no additional infrastructure. A Flink cluster requires a JobManager (with HA configuration for production), TaskManagers with appropriate memory and slot configuration, a checkpoint storage backend (S3, HDFS), and typically a Kubernetes operator or YARN integration. For teams that want stream processing without adding a new operational system, ksqlDB's simplicity is substantial.

Pull queries are unique to ksqlDB โ€” Flink has no equivalent of sub-millisecond point-in-time lookups against materialized state. If your application needs to serve low-latency queries against streaming-computed state (CQRS pattern), ksqlDB's Pull Query is purpose-built for this.

In practice, ksqlDB and Kafka Streams coexist comfortably in the same Kafka ecosystem. ksqlDB handles simple pipelines and business-user queries; Kafka Streams handles complex core logic; both exchange data through Kafka topics. Choosing between them is not usually an either/or decision โ€” it is a matter of matching the right tool to each specific processing requirement within the same overall architecture.

Rate this chapter
4.7  / 5  (5 ratings)

๐Ÿ’ฌ Comments