← 返回 Skills 市场
Data Pipeline
作者
pagoda111king
· GitHub ↗
· v1.1.0
· MIT-0
59
总下载
0
收藏
1
当前安装
2
版本数
在 OpenClaw 中安装
/install data-pipeline
功能描述
Lightweight ETL pipeline orchestrator with lifecycle hooks, bundle execution, stage retry, timeout control, and built-in transformers. 26 tests, 100% pass rate.
使用说明 (SKILL.md)
data-pipeline · 数据处理管线引擎
可组合的数据转换、验证和分析管线。像搭积木一样处理数据。
何时使用
当用户提到:数据清洗、数据转换、ETL、数据验证、数据分组、数据聚合、管道处理、批量数据处理、数组处理
快速开始
const { Pipeline, Transformers, Validators, PipelineFactory } = require('data-pipeline/src/pipeline');
// 创建一个清洗管线
const pipeline = new Pipeline();
pipeline
.addStage('filter', Transformers.filter(x => x.age >= 18))
.addStage('pick', Transformers.pick(['name', 'email']))
.addStage('sort', Transformers.sort('name', 'asc'));
const result = await pipeline.run(users);
核心 API
Pipeline
const pipeline = new Pipeline({ strict: true, context: { key: 'value' } });
// 添加阶段
pipeline.addStage(name, asyncFn, { retryCount: 0, retryDelay: 100, timeout: 30000 });
pipeline.addStages([{ name, fn, options }]);
// 阶段管理
pipeline.insertBefore(target, name, fn, options);
pipeline.insertAfter(target, name, fn, options);
pipeline.removeStage(name);
pipeline.toggleStage(name, enabled);
// 执行
const result = await pipeline.execute(data); // 返回 { data, metadata }
const data = await pipeline.run(data); // 只返回数据
// 指标
const metrics = pipeline.getMetrics();
pipeline.resetMetrics();
内置转换器
| 转换器 | 说明 | 示例 |
|---|---|---|
filter(fn) |
过滤 | Transformers.filter(x => x.active) |
map(fn) |
映射 | Transformers.map(x => x.name) |
reduce(fn, init) |
归约 | Transformers.reduce((a,b) => a+b, 0) |
groupBy(key) |
分组 | Transformers.groupBy('dept') |
sort(key, order) |
排序 | Transformers.sort('age', 'desc') |
dedup(key) |
去重 | Transformers.dedup('id') |
flatten(depth) |
扁平化 | Transformers.flatten(2) |
paginate(page, size) |
分页 | Transformers.paginate(1, 10) |
limit(n) |
限制 | Transformers.limit(5) |
pick(fields) |
选择字段 | Transformers.pick(['name', 'age']) |
rename(map) |
重命名 | Transformers.rename({old: 'new'}) |
merge(key, ...sources) |
合并 | Transformers.merge('id', extras) |
验证器
const schema = {
name: { required: true, type: 'string', minLength: 1 },
age: { type: 'number', min: 0, max: 150 },
email: { pattern: /^[^\s@]+@[^\s@]+\.[^\s@]+$/ },
role: { enum: ['admin', 'user'] },
password: { validate: (v) => v.length >= 8 ? true : 'Too short' }
};
const validator = Validators.schema(schema);
const result = validator(data);
// { valid: boolean, errors: [...], totalItems, validItems }
工厂函数
// ETL 管线
const etl = PipelineFactory.createETL(extract, transforms, load);
// 数据清洗管线
const cleaner = PipelineFactory.createCleaner(schema, { defaultField: 'value' });
// 数据分析管线
const analyzer = PipelineFactory.createAnalyzer('groupKey', {
avgVal: vals => vals.reduce((a,b) => a+b, 0) / vals.length,
maxVal: vals => Math.max(...vals)
});
使用场景
- 数据清洗:验证 → 去重 → 填充默认值 → 修剪字符串
- ETL 流程:提取 → 转换(map/filter/reduce)→ 加载
- 数据分析:分组 → 聚合 → 排序 → 分页
- 数据验证:批量验证对象数组,返回详细错误报告
- API 数据处理:合并多个数据源 → 重命名字段 → 选择输出字段
错误处理
try {
const result = await pipeline.execute(data);
} catch (err) {
if (err instanceof PipelineError) {
console.log('Failed at:', err.failedStage);
console.log('Partial data:', err.lastData);
console.log('Stage results:', err.stageResults);
}
}
性能指标
const metrics = pipeline.getMetrics();
// {
// pipeline: { totalRuns, totalErrors, avgTime },
// stages: [{ name, calls, errors, avgTime }, ...]
// }
安全使用建议
This package appears internally coherent for a JavaScript data-pipeline library and doesn't ask for secrets or external installs. However: (1) the package has no homepage or source provenance—treat it as unvetted third-party code; (2) review src/pipeline.js and any included tests yourself before running in production; (3) when using the library, avoid passing untrusted stage functions or callbacks (onStageComplete) that could make network requests or access local files; (4) run it in a sandbox or ephemeral environment if you must execute it before doing a deeper audit; and (5) if you need stronger assurance, ask the author for a repository link or provenance information.
功能分析
Type: OpenClaw Skill
Name: data-pipeline
Version: 1.1.0
The bundle provides a legitimate and well-structured data processing pipeline engine for ETL, validation, and transformation tasks. Analysis of 'src/pipeline.js' and 'SKILL.md' shows standard implementation of data manipulation logic (map, filter, reduce, schema validation) without any high-risk capabilities such as shell execution, network requests, or file system access. No evidence of malicious intent, data exfiltration, or prompt injection was found.
能力评估
Purpose & Capability
The name/description (ETL/pipeline engine) match the included src/pipeline.js, examples, and tests. APIs declared in SKILL.md (Pipeline, Transformers, Validators, PipelineFactory) correspond to the implementation. There are no extra credentials, binaries, or config paths requested that would be unrelated to a data pipeline.
Instruction Scope
SKILL.md contains usage examples and runtime instructions that only reference the local library (require('data-pipeline/src/pipeline')) and typical pipeline operations. It does not instruct the agent to read system files, env vars, or post data to external endpoints. Note: the pipeline API supports user-provided callbacks (onStageComplete) and custom stage functions; those user-supplied functions can perform I/O or network requests if the caller provides them, so data exfiltration is possible only via user code, not the library itself.
Install Mechanism
No install spec; this is instruction-and-source-only. The repository includes source files and a package-lock.json that references npm registry packages (devDependencies). There are no download URLs, extract steps, or non-standard install behaviors in the bundle.
Credentials
The skill declares no required environment variables or credentials. The runtime code does not reference environment variables or sensitive config paths in the provided snippets.
Persistence & Privilege
always is false and model invocation is allowed (the platform default). The skill does not request permanent/privileged presence nor modify other skills or agent-wide configs based on the provided files.
如何使用
- 确保已安装 OpenClaw(本地或 Docker 部署)
- 在对话框中输入安装命令:
/install data-pipeline - 安装完成后,直接呼叫该 Skill 的名称或使用
/data-pipeline触发 - 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.1.0
v1.1.0: Pipeline stability improvements. All 58 tests pass. Fixed Jest hang with --detectOpenHandles.
v1.0.0
Initial release: ETL pipeline orchestrator with lifecycle hooks, bundle execution, retry/timeout, built-in transformers. 26 tests.
元数据
常见问题
Data Pipeline 是什么?
Lightweight ETL pipeline orchestrator with lifecycle hooks, bundle execution, stage retry, timeout control, and built-in transformers. 26 tests, 100% pass rate. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 59 次。
如何安装 Data Pipeline?
在 OpenClaw 或 Claude Code 对话框中运行命令「/install data-pipeline」即可一键安装,无需额外配置。
Data Pipeline 是免费的吗?
是的,Data Pipeline 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。
Data Pipeline 支持哪些平台?
Data Pipeline 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。
谁开发了 Data Pipeline?
由 pagoda111king(@pagoda111king)开发并维护,当前版本 v1.1.0。
推荐 Skills