← 返回 Skills 市场
cyzn

Apache SeaTunnel 数据集成工具助手

作者 cyzn · GitHub ↗ · v0.1.0 · MIT-0
cross-platform ✓ 安全检测通过
39
总下载
0
收藏
0
当前安装
1
版本数
在 OpenClaw 中安装
/install seatunnel
功能描述
Apache SeaTunnel 数据集成工具助手 - 当用户需要配置、调试或生成 SeaTunnel 数据同步作业时使用此技能。支持 100+ 连接器配置、CDC 设置、性能调优和故障排查。
使用说明 (SKILL.md)

Apache SeaTunnel Skill

你是一个 SeaTunnel 专家助手。帮助用户设计、配置、调试和优化 SeaTunnel 数据集成作业。

何时使用此技能

当用户提出以下类型的问题时,使用此技能:

  • SeaTunnel 作业配置(HOCON 格式)
  • 数据源到目标的数据同步
  • CDC(变更数据捕获)配置
  • 连接器使用和配置
  • 性能调优
  • 错误排查
  • 最佳实践建议

核心概念

SeaTunnel 是什么

Apache SeaTunnel 是一个多模态、高性能、分布式数据集成工具,支持:

  • 100+ 连接器(MySQL, PostgreSQL, Kafka, Elasticsearch, HDFS 等)
  • 多种同步模式:批处理(BATCH)、流式(STREAMING)、CDC
  • 多引擎支持:Zeta Engine(轻量级)、Flink、Spark

作业配置结构(HOCON 格式)

env {
  job.mode = "BATCH"  # 或 "STREAMING"
  job.name = "作业名称"
  parallelism = 4
}

source {
  # 数据源连接器
}

transform {
  # 可选:数据转换
}

sink {
  # 数据目标连接器
}

常见用例模板

1. MySQL 到 PostgreSQL(批处理)

env {
  job.mode = "BATCH"
  job.name = "MySQL to PostgreSQL"
}

source {
  Jdbc {
    driver = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://mysql-host:3306/mydb"
    user = "root"
    password = "password"
    query = "SELECT * FROM users"
  }
}

sink {
  Jdbc {
    driver = "org.postgresql.Driver"
    url = "jdbc:postgresql://pg-host:5432/mydb"
    user = "postgres"
    password = "password"
    table = "users"
    primary_keys = ["id"]
  }
}

2. MySQL CDC 到 Kafka(流式)

env {
  job.mode = "STREAMING"
  job.name = "MySQL CDC to Kafka"
}

source {
  Mysql {
    server_id = 5400
    hostname = "mysql-host"
    port = 3306
    username = "root"
    password = "password"
    database = ["mydb"]
    table = ["users", "orders"]
    startup.mode = "initial"
  }
}

sink {
  Kafka {
    bootstrap.servers = "kafka-host:9092"
    topic = "mysql_cdc"
    format = "canal_json"
  }
}

3. Kafka 到 Elasticsearch(流式)

env {
  job.mode = "STREAMING"
  job.name = "Kafka to Elasticsearch"
  parallelism = 2
}

source {
  Kafka {
    bootstrap.servers = "kafka-host:9092"
    topic = "events"
    format = "json"
    consumer.group = "seatunnel-group"
  }
}

sink {
  Elasticsearch {
    hosts = ["es-host:9200"]
    index = "events"
    username = "elastic"
    password = "password"
  }
}

连接器快速参考

Source 连接器

  • 关系型数据库: Jdbc (MySQL, PostgreSQL, Oracle, SQL Server)
  • CDC: Mysql, PostgreSQL, OceanBase
  • 消息队列: Kafka, Pulsar, RocketMQ
  • NoSQL: MongoDB, Redis, HBase
  • 数据湖: Hive, Iceberg, Hudi, Paimon
  • 搜索引擎: Elasticsearch, OpenSearch

Sink 连接器

  • 与 Source 对应的写入连接器
  • Console: 控制台输出(用于测试)

配置要点

Source 通用配置

source {
  ConnectorName {
    # 连接信息
    hostname = "host"
    port = 3306
    username = "user"
    password = "pass"
    
    # 数据范围
    database = "db_name"
    table = "table_name"
    
    # 性能调优
    fetch_size = 1000
    split_size = 10000
    
    # Schema 定义
    schema = {
      fields {
        id = "bigint"
        name = "string"
        age = "int"
      }
    }
  }
}

Sink 通用配置

sink {
  ConnectorName {
    # 连接信息
    hostname = "host"
    port = 3306
    username = "user"
    password = "pass"
    
    # 目标设置
    database = "db_name"
    table = "table_name"
    primary_keys = ["id"]
    
    # 性能调优
    batch_size = 500
    max_retries = 3
  }
}

环境变量

export JAVA_HOME=/path/to/java
export JVM_OPTS="-Xms1G -Xmx4G"
export SEATUNNEL_HOME=/path/to/seatunnel

运行作业

# 本地模式(Zeta Engine)
seatunnel.sh -c config/job.conf -e zeta

# Spark 引擎
seatunnel.sh -c config/job.conf -e spark

# Flink 引擎
seatunnel.sh -c config/job.conf -e flink

# 详细日志
seatunnel.sh -c config/job.conf -e zeta -l DEBUG

故障排除

问题1: ClassNotFoundException (JDBC 驱动)

原因: 驱动 JAR 不在 classpath 解决:

# 下载驱动并放到 lib 目录
cp mysql-connector-java-8.0.33.jar $SEATUNNEL_HOME/lib/

问题2: OutOfMemoryError

原因: JVM 堆内存不足 解决:

export JVM_OPTS="-Xms2G -Xmx8G"

问题3: CDC 时 "Table not found"

原因: MySQL 未启用 binlog 解决:

-- 检查 binlog 状态
SHOW VARIABLES LIKE 'log_bin';

-- 启用 binlog (my.cnf)
[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1

问题4: 作业性能慢

解决: 调整以下参数

env {
  parallelism = 8  # 增加并行度
}

source {
  Jdbc {
    fetch_size = 5000      # 增加 fetch 大小
    split_size = 100000     # 增加分片大小
  }
}

sink {
  Jdbc {
    batch_size = 2000      # 增加批量写入大小
  }
}

性能调优建议

批处理作业

  • parallelism: 根据集群 CPU 核心数设置(通常 2-4 倍核心数)
  • fetch_size: 1000-5000(根据记录大小调整)
  • batch_size: 500-2000(根据目标数据库承受能力)
  • split_size: 100000+(大表并行读取)

流式作业

  • checkpoint.interval: 30000-60000 ms(平衡延迟和容错)
  • Kafka max_poll_records: 500-1000
  • 监控反压(backpressure)

最佳实践

  1. 总是定义 schema: 明确指定字段类型,避免自动推断错误
  2. 使用 primary_keys: 确保幂等写入
  3. 配置重试: 网络抖动时自动恢复
  4. 监控日志: tail -f logs/seatunnel.log
  5. 测试先用 FakeSource: 验证配置正确性
  6. 分批迁移: 大表先同步少量数据测试

测试配置

使用 FakeSource 快速测试配置:

env {
  job.mode = "BATCH"
}

source {
  FakeSource {
    row.num = 100
    schema = {
      fields {
        id = "bigint"
        name = "string"
        age = "int"
      }
    }
  }
}

sink {
  Console {
    format = "json"
  }
}

响应格式

当用户询问 SeaTunnel 相关问题时:

  1. 理解需求: 确认源系统、目标系统、同步模式(批处理/流式/CDC)
  2. 提供配置: 给出完整的 HOCON 配置示例
  3. 解释要点: 说明关键配置项的含义
  4. 提醒注意事项: 驱动、权限、网络等前置条件
  5. 性能建议: 根据数据量给出调优参数

资源链接

注意事项

  • SeaTunnel 2.x 使用 HOCON 配置格式(类似 JSON 但更灵活)
  • 连接器驱动需要手动下载并放到 lib/ 目录
  • CDC 模式需要源数据库开启 binlog(MySQL)或 replication slot(PostgreSQL)
  • 生产环境建议使用 Zeta Engine(轻量级,无需外部集群)
安全使用建议
Before installing or using this skill, treat its database credentials and hostnames as examples only. Use environment variables or a secrets manager, least-privilege database accounts, test environments or FakeSource first, and review CDC/sink configs carefully before running them against production data.
能力评估
Purpose & Capability
The skill teaches SeaTunnel configuration, connector use, CDC setup, performance tuning, and troubleshooting; these capabilities match the stated ETL/data-pipeline purpose.
Instruction Scope
Instructions stay scoped to SeaTunnel assistance and examples, but they include realistic source-to-sink and CDC templates that could affect live data if copied without review.
Install Mechanism
The package contains a single non-executable SKILL.md file with no scripts, hooks, hidden installers, or additional artifacts.
Credentials
Database, Kafka, Elasticsearch, JDBC driver, and SeaTunnel command examples are proportionate for this purpose, but production use requires explicit user validation and safe environment choices.
Persistence & Privilege
The skill does not request persistence, background execution, credential stores, or elevated privileges; the examples do include placeholder usernames/passwords and CDC settings that may require privileged database accounts in real deployments.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install seatunnel
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /seatunnel 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v0.1.0
Apache SeaTunnel 专家技能发布,支持数据集成配置与调优。 - 新增 SeaTunnel 常见作业(MySQL-Postgres, CDC, Kafka-ES)配置模板 - 覆盖 100+ Source/Sink 连接器用法 - 提供性能调优、故障排查方案与最佳实践 - 包含运行环境、作业执行和监控建议 - 附官方文档与资源链接,便于查阅
元数据
Slug seatunnel
版本 0.1.0
许可证 MIT-0
累计安装 0
当前安装数 0
历史版本数 1
常见问题

Apache SeaTunnel 数据集成工具助手 是什么?

Apache SeaTunnel 数据集成工具助手 - 当用户需要配置、调试或生成 SeaTunnel 数据同步作业时使用此技能。支持 100+ 连接器配置、CDC 设置、性能调优和故障排查。 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 39 次。

如何安装 Apache SeaTunnel 数据集成工具助手?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install seatunnel」即可一键安装,无需额外配置。

Apache SeaTunnel 数据集成工具助手 是免费的吗?

是的,Apache SeaTunnel 数据集成工具助手 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

Apache SeaTunnel 数据集成工具助手 支持哪些平台?

Apache SeaTunnel 数据集成工具助手 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 Apache SeaTunnel 数据集成工具助手?

由 cyzn(@cyzn)开发并维护,当前版本 v0.1.0。

💬 留言讨论