/install flink-kafka-dual-write1
\r \r
Flink Kafka 双写任务生成\r
\r 按下面流程执行,默认服务对象是 bethune 仓库中的 Kafka 日志监控任务。\r \r
先做什么\r
\r 先确认用户给了哪些输入。若信息不全,只问最小必需项:\r \r
- 任务编号\r
- 参考任务,若用户说“参考任务34”则优先复用 34 的骨架\r
- Kafka topic\r
- module 过滤值\r
- 目标表名或业务名\r
- 消息字段结构,尤其是是否存在嵌套对象或列表展开字段\r \r 若用户已经给出“按任务34类似模式”,默认理解为:\r \r
- 单条消息通常产出一行,不按列表展开\r
- 保留 Kafka -> filter -> flatMap -> Hive -> StarRocks 的完整链路\r
- 沿用
parseAndFormatLogTime()、safe()、Hive 分区补齐、4 份 config 同步更新的处理方式\r \r 若任务更接近 35 或 36 这类列表展开模式,按列表展开规则处理。详细模式见references/bethune-patterns.md。\r \r
实现步骤\r
\r
- 先阅读参考任务及相关
MessageModel、Po、config.properties键位,确认命名和字段顺序。\r - 生成或更新 3 个 Java 文件:
MessageModel、Po、Job。\r - 同步更新 4 个配置文件:\r
src/main/resources/config.properties\rsrc/main/resources/dev/config.properties\rsrc/main/resources/product/config.properties\rsrc/main/resources/stage/config.properties\r
- 在仓库可编译时运行
mvn -DskipTests compile验证新增任务。\r - 向用户回传新增文件、配置键、是否编译通过;若用户需要,再补 Hive 和 StarRocks DDL。\r \r
必须遵守的约束\r
\r
- 保持
TableSchema、StarRocksSinkRowBuilder、toHiveRow()三处字段顺序完全一致。\r st永远放在输出首列;Hive 行末尾永远追加year、month、day。\rmodule过滤值写死在 Job 类常量里,不写入配置。\rlogTime统一走 A 方案:为空或解析失败都记录错误日志并丢弃。\rmessage为空直接丢弃。\rid优先取skyNetVo.getId(),为空时生成UUID。\rcnt通常固定为1。\r- 字符串字段优先通过
safe()兜底,数值字段保留原始数值类型。\r - 列表字段为
null或空集合时,整条消息直接丢弃。\r \r
命名规则\r
\r
MessageModel:src/main/java/com/ly/tms/po/carSupply/SkynetLog{BizName}MessageModel.java\rPo:src/main/java/com/ly/tms/po/carSupply/SkynetLog{BizName}Po.java\rJob:src/main/java/com/ly/tms/job/Bus_{BizName}_KafkaToStarRock_{任务编号}.java\r \r 配置键遵循 bethune 现有分组:\r \r- topic key:
kafka.bus.{biz}.topic\r - group key:
travel.car.{biz}.group\r - StarRocks key:
starrocks.fe.travel.common.{tableKey}\r - Hive key:
hive.hive_train_ops.{tableKey}\r \r
生成代码时的判断规则\r
\r
- 用户给的是顶层字段 + 少量嵌套对象:按 34 模式写单行输出。\r
- 用户给的是
datas、fullPriceList这类列表:按 35/36 模式在flatMap()中逐项展开。\r - JSON 字段名与 Java 字段名不一致时,在
MessageModel上补@JSONField(name = "...")。\r - 若参考任务里存在“嵌套字段优先,顶层字段兜底”的业务规则,保留该优先级,不要简化成单字段直取。\r \r
输出要求\r
\r 完成后至少说明:\r \r
- 新增或修改了哪些文件\r
- 新增了哪些配置键\r
- 本次任务属于“单行模式”还是“列表展开模式”\r
- 是否完成编译验证\r \r
参考资料\r
\r
读取 references/bethune-patterns.md 获取以下内容:\r
\r
- 任务 33/34/35/36 的差异\r
- 任务 34 的完整骨架摘要\r
parseAndFormatLogTime()与toHiveRow()的固定模板\r- 4 份配置文件中的插入分组位置\r cnt INT,\r traceid STRING,\r {其余字段按 PO 顺序}\r )\r PARTITIONED BY (year STRING, month STRING, day STRING)\r ROW FORMAT DELIMITED\r FIELDS TERMINATED BY '\001'\r STORED AS TEXTFILE;\r
\r
### StarRocks\r
```sql\r
CREATE TABLE TCTravelStreamData_db.{表名} (\r
st DATETIME,\r
apmtraceid VARCHAR(256),\r
id VARCHAR(256),\r
cnt INT,\r
traceid VARCHAR(256),\r
{其余字段:STRING→VARCHAR(512), INT→INT, DOUBLE→DOUBLE}\r
)\r
DUPLICATE KEY(st, apmtraceid)\r
DISTRIBUTED BY HASH(id) BUCKETS 8\r
PROPERTIES ("replication_num" = "3");\r
```\r
\r
---\r
\r
## 参考示例(已实现任务)\r
\r
| 任务 | 类名 | Topic | Module | List展开字段 | SR表名 |\r
|------|------|-------|--------|------------|--------|\r
| 33 | Bus_Search_Abtest_KafkaToStarRock_33 | skynet_log_Public_SFC_ABTest_Monitor | BUS_Public_SFC_ABTest_Monitor | 无 | bus_sfc_abtest_monitor |\r
| 34 | Bus_Search_ReplacePrice_KafkaToStarRock_34 | skynet_log_Public_SFC_Replace_Price_Monitor | BUS_Public_SFC_Replace_Price_Monitor | 无(ReferPriceBean嵌套) | bus_sfc_replace_price_monitor |\r
| 35 | Bus_Carpool_CalEnter_KafkaToStarRock_35 | skynet_log_3304590_CallEnter | BUS_PUBLIC_CARPOOL_PRICING_CallEnter | fullPriceList | bus_carpool_calenter_monitor |\r
| 36 | Bus_Metric_Collection_KafkaToStarRock_36 | skynet_log_3309435_bus_travelmetrics | BUS_METRIC_COLLECTION | datas | bus_metric_collection_monitor |\r
- Make sure OpenClaw is installed (local or Docker)
- Run the install command in chat:
/install flink-kafka-dual-write1 - After installation, invoke the skill by name or use
/flink-kafka-dual-write1 - Provide required inputs per the skill's parameter spec and get structured output
What is kafka写入hive和sr?
为 bethune 项目生成新的 Flink Kafka 到 Hive 和 StarRocks 双写监控任务,参考 Bus_Search_ReplacePrice_KafkaToStarRock_34 及相邻的 33/35/36 模式,自动产出 Job 类、MessageModel、PO、4 个 config.p... It is an AI Agent Skill for Claude Code / OpenClaw, with 231 downloads so far.
How do I install kafka写入hive和sr?
Run "/install flink-kafka-dual-write1" in the OpenClaw or Claude Code chat to install it in one step — no extra setup required.
Is kafka写入hive和sr free?
Yes, kafka写入hive和sr is completely free, licensed under MIT-0. You can download, install and use it at no cost.
Which platforms does kafka写入hive和sr support?
kafka写入hive和sr is cross-platform and runs anywhere OpenClaw / Claude Code is available (cross-platform).
Who created kafka写入hive和sr?
It is built and maintained by printsky (@printsky); the current version is v1.0.1.