第 25 章

ksqlDB:用 SQL 做实时流处理

第25章:ksqlDB:用 SQL 做实时流处理

导读:ksqlDB 如何用 SQL 实现实时流处理?

本章核心问题:ksqlDB 如何用 SQL 实现实时流处理?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

两种部署模式

交互式模式(Interactive Mode)

ksqlDB 服务器暴露 REST API 和 WebSocket 端点,接受动态提交的 SQL 语句。KSQL CLI 工具通过 REST API 与服务器交互:

# 启动 KSQL CLI
ksql http://ksqldb-server:8088

# 或通过 REST API 提交语句
curl -X POST http://ksqldb-server:8088/ksql \
  -H "Content-Type: application/vnd.ksql.v1+json" \
  -d '{"ksql": "SHOW STREAMS;", "streamsProperties": {}}'

交互式模式支持运行时动态创建、修改和删除流和表,适合开发调试和快速原型验证。

无头模式(Headless Mode)

ksqlDB 服务器在启动时加载一个预定义的 SQL 文件(queries.sql),执行其中的所有语句并持续运行,不暴露 REST API(或只暴露有限的监控端点)。适合生产环境的 CI/CD 部署:SQL 文件受版本控制,通过流水线部署和回滚。

# 无头模式启动
ksql-server-start /etc/ksqldb/ksqldb-server.properties \
  --queries-file /etc/ksqldb/queries.sql

无头模式的查询是不可变的:要修改查询,需要更新 SQL 文件并重启服务(类似于修改应用程序配置)。这对生产部署是有利的——避免了运行时的意外变更。

STREAM vs TABLE:流与表的本质差异

ksqlDB 中最重要的语义区分是 STREAMTABLE

STREAM(流) 是事件的无界序列。每条消息都是一个独立的事件,Kafka topic 的完整历史都是流的一部分。流没有"当前状态"——它记录的是发生过的所有事情。

TABLE(表) 是每个 key 最新值的物化视图。它消费一个 Kafka topic(通常是经过聚合或经过 compaction 的 topic),对每个 key 只保留最新的值。表有"当前状态"——类似数据库表的语义。

-- 创建流:声明 orders topic 的消息格式
-- KAFKA_TOPIC: 底层 Kafka topic 名称
-- VALUE_FORMAT: 消息序列化格式(AVRO/JSON/PROTOBUF)
CREATE STREAM orders (
    id        INT,
    amount    DECIMAL(10, 2),
    user_id   INT,
    status    VARCHAR,
    created_at BIGINT  -- 事件时间戳(毫秒)
) WITH (
    KAFKA_TOPIC   = 'orders',
    VALUE_FORMAT  = 'AVRO',
    TIMESTAMP     = 'created_at'  -- 使用 created_at 作为事件时间
);

-- 创建表:对 orders 流按 user_id 聚合
-- EMIT CHANGES 表示这是一个持续更新的物化视图
CREATE TABLE user_totals AS
    SELECT
        user_id,
        SUM(amount)    AS total_amount,
        COUNT(*)       AS order_count,
        MAX(created_at) AS last_order_time
    FROM orders
    WHERE status = 'COMPLETED'
    GROUP BY user_id
EMIT CHANGES;

上面的 CREATE TABLE ... AS SELECT 语句在后台创建了一个持续运行的 Kafka Streams 聚合拓扑,结果写入一个名为 USER_TOTALS 的 Kafka topic(ksqlDB 自动创建),同时也可以通过 Pull Query 低延迟查询。

DDL 详解与最佳实践

CREATE STREAM FROM Kafka Topic

-- 映射已有 topic,不改变 topic 内容
CREATE STREAM raw_clicks (
    user_id   VARCHAR KEY,   -- KEY 关键字声明 Kafka 消息的 key
    url       VARCHAR,
    referrer  VARCHAR,
    device    VARCHAR,
    ts        BIGINT
) WITH (
    KAFKA_TOPIC  = 'raw-clicks',
    VALUE_FORMAT = 'JSON',
    PARTITIONS   = 12        -- 如果 topic 不存在,ksqlDB 会创建它
);

-- 流转换:从 raw_clicks 派生,过滤移动端点击
-- 这会创建一个新的 Kafka topic 'mobile-clicks'
CREATE STREAM mobile_clicks AS
    SELECT
        user_id,
        url,
        TIMESTAMPTOSTRING(ts, 'yyyy-MM-dd HH:mm:ss') AS click_time
    FROM raw_clicks
    WHERE device = 'mobile'
EMIT CHANGES;

窗口聚合

-- 5 分钟滚动窗口聚合:每个用户在每 5 分钟内的点击数
CREATE TABLE clicks_per_user_5min AS
    SELECT
        user_id,
        COUNT(*) AS click_count,
        WINDOWSTART AS window_start,  -- 内置变量:窗口开始时间
        WINDOWEND   AS window_end     -- 内置变量:窗口结束时间
    FROM raw_clicks
        WINDOW TUMBLING (SIZE 5 MINUTES, GRACE PERIOD 1 MINUTE)
    GROUP BY user_id
EMIT CHANGES;

-- 1 小时跳跃窗口,步长 15 分钟
CREATE TABLE hourly_clicks AS
    SELECT user_id, COUNT(*) AS cnt
    FROM raw_clicks
        WINDOW HOPPING (SIZE 1 HOUR, ADVANCE BY 15 MINUTES)
    GROUP BY user_id
EMIT CHANGES;

-- 会话窗口:30 分钟无活动则开启新会话
CREATE TABLE user_sessions AS
    SELECT user_id, COUNT(*) AS actions_in_session
    FROM raw_clicks
        WINDOW SESSION (30 MINUTES)
    GROUP BY user_id
EMIT CHANGES;

Join 操作

Stream-Stream Join

-- 关联 orders 和 payments 流,时间窗口 10 分钟
CREATE STREAM order_payments AS
    SELECT
        o.id         AS order_id,
        o.amount     AS order_amount,
        p.payment_id AS payment_id,
        p.method     AS payment_method
    FROM orders o
        INNER JOIN payments p
            WITHIN 10 MINUTES
            ON o.id = p.order_id
EMIT CHANGES;

Stream-Table Join(维度表查询)

-- 创建用户维度表
CREATE TABLE users (
    user_id   INT PRIMARY KEY,
    name      VARCHAR,
    country   VARCHAR
) WITH (
    KAFKA_TOPIC  = 'users',
    VALUE_FORMAT = 'AVRO'
);

-- 用订单流关联用户表
CREATE STREAM enriched_orders AS
    SELECT
        o.id         AS order_id,
        o.amount,
        u.name       AS user_name,
        u.country    AS user_country
    FROM orders o
        LEFT JOIN users u ON o.user_id = u.user_id
EMIT CHANGES;

Table-Table Join

-- 两张维度表的 Join,任一侧更新时重新计算
CREATE TABLE user_profiles AS
    SELECT
        p.user_id,
        p.name,
        s.preferred_currency,
        s.notification_enabled
    FROM user_basic_info p
        INNER JOIN user_settings s ON p.user_id = s.user_id
EMIT CHANGES;

ksqlDB 与 Kafka Connect 集成

ksqlDB 内置了对 Kafka Connect 的管理支持,可以通过 ksqlDB 的 SQL 接口直接创建和管理 Connector:

-- 创建 Source Connector(从 MySQL 读取数据到 Kafka)
CREATE SOURCE CONNECTOR mysql_orders_source WITH (
    'connector.class'                   = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname'                 = 'mysql',
    'database.port'                     = '3306',
    'database.user'                     = 'debezium',
    'database.password'                 = 'dbz',
    'database.server.name'              = 'mydb',
    'table.include.list'                = 'mydb.orders',
    'database.history.kafka.topic'      = 'schema-changes.mydb',
    'database.history.kafka.bootstrap.servers' = 'kafka:9092'
);

-- 检查 Connector 状态
DESCRIBE CONNECTOR mysql_orders_source;

-- 创建 Sink Connector(从 Kafka 写入 Elasticsearch)
CREATE SINK CONNECTOR es_user_totals_sink WITH (
    'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'connection.url'                      = 'http://elasticsearch:9200',
    'topics'                              = 'USER_TOTALS',
    'type.name'                           = '_doc',
    'key.ignore'                          = 'false',
    'schema.ignore'                       = 'false'
);

这种集成使得构建完整的数据管道变得简单:MySQL → Kafka(Debezium)→ ksqlDB 实时处理 → Elasticsearch(展示),全部通过 SQL 界面管理。

选择哪个工具取决于需求的具体特征:

选择 ksqlDB 当:

ksqlDB 的限制:复杂的自定义 UDF(User-Defined Function)仍需 Java;调试困难(SQL 隐藏了底层 Kafka Streams 的复杂性);性能调优选项比直接使用 Kafka Streams 更有限。

选择 Kafka Streams 当:

维度 Kafka Streams/ksqlDB Apache Flink
状态规模 单机(几十到几百 GB) 分布式(TB 级)
乱序处理 基本支持(grace period) 高级(watermark + 复杂触发策略)
Sink Exactly-Once Kafka-to-Kafka 任意外部系统(需 connector 支持)
运维复杂度 低(无额外集群) 高(需要运维 Flink 集群)
SQL 功能 ksqlDB 语法有限 Flink SQL 接近标准 SQL
批流统一 不支持 完整支持

Flink SQL 相比 ksqlDB 的具体优势

  1. 对乱序数据有更精细的 Watermark 策略控制(可以为不同的源流设置不同的 Watermark 延迟)
  2. Sink 端的 Exactly-Once 保证适用于任何支持事务的外部系统(JDBC、HDFS、HBase),而不仅限于 Kafka
  3. 更丰富的窗口语义,包括累积窗口(Cumulate Window)和不同触发模式
  4. Flink SQL 更接近 ANSI SQL 标准,学习曲线对有数据库背景的工程师更平滑

ksqlDB 相比 Flink SQL 的优势

  1. 部署极简——ksqlDB 服务器是一个 JAR 包,无需 Flink JobManager/TaskManager 集群
  2. Pull Query 提供了 Kafka Streams 交互式查询的 SQL 界面,Flink 没有等价能力
  3. 与 Kafka Connect 的原生集成管理,Flink 需要额外的连接器配置
  4. 社区和企业支持(Confluent Cloud)更成熟

实际项目中,ksqlDB 和 Kafka Streams 在同一个 Kafka 集群中共存是完全正常的:ksqlDB 处理简单的数据管道和业务用户查询,Kafka Streams 处理复杂的核心业务逻辑,两者通过 Kafka topic 交换数据。


Level 2 · 它是怎么运行的(3-5年经验)

ksqlDB 是什么

ksqlDB 是构建在 Kafka Streams 之上的 SQL 层。它把你的 Kafka topic 映射为可查询的"流"或"表",让你用熟悉的 SQL 语法对实时数据做过滤、聚合、Join 和窗口计算,而不必写任何 Java 代码。

从架构上看,ksqlDB 服务器是一个运行 Kafka Streams 应用的 JVM 进程。你提交的每一条 SQL 语句都被 ksqlDB 的查询引擎编译成一个 Kafka Streams 拓扑,然后在后台持续运行。ksqlDB 存储这些查询的定义到 Kafka 的内部 topic 中,保证服务器重启后查询自动恢复。

理解 ksqlDB 的本质是"Kafka Streams 的 SQL 编译器",而不是一个独立的流处理引擎,这对于诊断问题和规划容量至关重要:ksqlDB 的性能瓶颈与 Kafka Streams 相同(状态存储 I/O、Kafka 网络带宽、JVM GC),优化手段也相同(RocksDB 调优、增加 num.stream.threads、水平扩展 ksqlDB 服务器实例)。


Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

推送查询 vs 拉取查询

这是 ksqlDB 最独特的查询语义分类,与传统数据库完全不同。

推送查询(Push Query)

推送查询会持续向客户端推送结果,直到客户端断开连接。它订阅的是一个持续变化的数据流,适合实时数据推送(WebSocket、Server-Sent Events)、监控大屏、流数据订阅。

-- 推送查询:持续输出每分钟的点击统计(带 EMIT CHANGES)
SELECT user_id, click_count, window_start
FROM clicks_per_user_5min
EMIT CHANGES;

通过 REST API 以流式方式获取推送查询结果:

curl -N -X POST http://ksqldb-server:8088/query-stream \
  -H "Content-Type: application/vnd.ksql.v1+json" \
  -d '{
    "sql": "SELECT user_id, click_count FROM clicks_per_user_5min EMIT CHANGES;",
    "properties": {}
  }'
# 响应是持续的 newline-delimited JSON 流

拉取查询(Pull Query)

拉取查询是一次性的点时查询,语义与传统 SQL 相同:查询物化表当前的状态,返回结果,连接关闭。延迟通常在毫秒级(取决于路由是否需要跨 ksqlDB 实例)。

-- 拉取查询:查询特定用户的当前累计消费(不带 EMIT CHANGES)
SELECT user_id, total_amount, order_count
FROM user_totals
WHERE user_id = 42;

拉取查询只能针对物化表CREATE TABLE AS SELECT ... GROUP BY),不能针对流或未聚合的表。这是因为拉取查询需要访问点时快照,而流没有当前状态的概念。

ksqlDB 使用 Kafka Streams 的交互式查询(Interactive Queries)机制服务拉取查询:如果被查询的 key 不在当前服务器持有的分区上,ksqlDB 会自动将查询路由(通过内部 HTTP 调用)到持有该分区的服务器实例。这对客户端完全透明。

本章评分
4.7  / 5  (5 评分)

💬 留言讨论