← Back to Skills Marketplace
Data Pipeline
by
pagoda111king
· GitHub ↗
· v1.1.0
· MIT-0
59
Downloads
0
Stars
1
Active Installs
2
Versions
Install in OpenClaw
/install data-pipeline
Description
Lightweight ETL pipeline orchestrator with lifecycle hooks, bundle execution, stage retry, timeout control, and built-in transformers. 26 tests, 100% pass rate.
README (SKILL.md)
data-pipeline · 数据处理管线引擎
可组合的数据转换、验证和分析管线。像搭积木一样处理数据。
何时使用
当用户提到:数据清洗、数据转换、ETL、数据验证、数据分组、数据聚合、管道处理、批量数据处理、数组处理
快速开始
const { Pipeline, Transformers, Validators, PipelineFactory } = require('data-pipeline/src/pipeline');
// 创建一个清洗管线
const pipeline = new Pipeline();
pipeline
.addStage('filter', Transformers.filter(x => x.age >= 18))
.addStage('pick', Transformers.pick(['name', 'email']))
.addStage('sort', Transformers.sort('name', 'asc'));
const result = await pipeline.run(users);
核心 API
Pipeline
const pipeline = new Pipeline({ strict: true, context: { key: 'value' } });
// 添加阶段
pipeline.addStage(name, asyncFn, { retryCount: 0, retryDelay: 100, timeout: 30000 });
pipeline.addStages([{ name, fn, options }]);
// 阶段管理
pipeline.insertBefore(target, name, fn, options);
pipeline.insertAfter(target, name, fn, options);
pipeline.removeStage(name);
pipeline.toggleStage(name, enabled);
// 执行
const result = await pipeline.execute(data); // 返回 { data, metadata }
const data = await pipeline.run(data); // 只返回数据
// 指标
const metrics = pipeline.getMetrics();
pipeline.resetMetrics();
内置转换器
| 转换器 | 说明 | 示例 |
|---|---|---|
filter(fn) |
过滤 | Transformers.filter(x => x.active) |
map(fn) |
映射 | Transformers.map(x => x.name) |
reduce(fn, init) |
归约 | Transformers.reduce((a,b) => a+b, 0) |
groupBy(key) |
分组 | Transformers.groupBy('dept') |
sort(key, order) |
排序 | Transformers.sort('age', 'desc') |
dedup(key) |
去重 | Transformers.dedup('id') |
flatten(depth) |
扁平化 | Transformers.flatten(2) |
paginate(page, size) |
分页 | Transformers.paginate(1, 10) |
limit(n) |
限制 | Transformers.limit(5) |
pick(fields) |
选择字段 | Transformers.pick(['name', 'age']) |
rename(map) |
重命名 | Transformers.rename({old: 'new'}) |
merge(key, ...sources) |
合并 | Transformers.merge('id', extras) |
验证器
const schema = {
name: { required: true, type: 'string', minLength: 1 },
age: { type: 'number', min: 0, max: 150 },
email: { pattern: /^[^\s@]+@[^\s@]+\.[^\s@]+$/ },
role: { enum: ['admin', 'user'] },
password: { validate: (v) => v.length >= 8 ? true : 'Too short' }
};
const validator = Validators.schema(schema);
const result = validator(data);
// { valid: boolean, errors: [...], totalItems, validItems }
工厂函数
// ETL 管线
const etl = PipelineFactory.createETL(extract, transforms, load);
// 数据清洗管线
const cleaner = PipelineFactory.createCleaner(schema, { defaultField: 'value' });
// 数据分析管线
const analyzer = PipelineFactory.createAnalyzer('groupKey', {
avgVal: vals => vals.reduce((a,b) => a+b, 0) / vals.length,
maxVal: vals => Math.max(...vals)
});
使用场景
- 数据清洗:验证 → 去重 → 填充默认值 → 修剪字符串
- ETL 流程:提取 → 转换(map/filter/reduce)→ 加载
- 数据分析:分组 → 聚合 → 排序 → 分页
- 数据验证:批量验证对象数组,返回详细错误报告
- API 数据处理:合并多个数据源 → 重命名字段 → 选择输出字段
错误处理
try {
const result = await pipeline.execute(data);
} catch (err) {
if (err instanceof PipelineError) {
console.log('Failed at:', err.failedStage);
console.log('Partial data:', err.lastData);
console.log('Stage results:', err.stageResults);
}
}
性能指标
const metrics = pipeline.getMetrics();
// {
// pipeline: { totalRuns, totalErrors, avgTime },
// stages: [{ name, calls, errors, avgTime }, ...]
// }
Usage Guidance
This package appears internally coherent for a JavaScript data-pipeline library and doesn't ask for secrets or external installs. However: (1) the package has no homepage or source provenance—treat it as unvetted third-party code; (2) review src/pipeline.js and any included tests yourself before running in production; (3) when using the library, avoid passing untrusted stage functions or callbacks (onStageComplete) that could make network requests or access local files; (4) run it in a sandbox or ephemeral environment if you must execute it before doing a deeper audit; and (5) if you need stronger assurance, ask the author for a repository link or provenance information.
Capability Analysis
Type: OpenClaw Skill
Name: data-pipeline
Version: 1.1.0
The bundle provides a legitimate and well-structured data processing pipeline engine for ETL, validation, and transformation tasks. Analysis of 'src/pipeline.js' and 'SKILL.md' shows standard implementation of data manipulation logic (map, filter, reduce, schema validation) without any high-risk capabilities such as shell execution, network requests, or file system access. No evidence of malicious intent, data exfiltration, or prompt injection was found.
Capability Assessment
Purpose & Capability
The name/description (ETL/pipeline engine) match the included src/pipeline.js, examples, and tests. APIs declared in SKILL.md (Pipeline, Transformers, Validators, PipelineFactory) correspond to the implementation. There are no extra credentials, binaries, or config paths requested that would be unrelated to a data pipeline.
Instruction Scope
SKILL.md contains usage examples and runtime instructions that only reference the local library (require('data-pipeline/src/pipeline')) and typical pipeline operations. It does not instruct the agent to read system files, env vars, or post data to external endpoints. Note: the pipeline API supports user-provided callbacks (onStageComplete) and custom stage functions; those user-supplied functions can perform I/O or network requests if the caller provides them, so data exfiltration is possible only via user code, not the library itself.
Install Mechanism
No install spec; this is instruction-and-source-only. The repository includes source files and a package-lock.json that references npm registry packages (devDependencies). There are no download URLs, extract steps, or non-standard install behaviors in the bundle.
Credentials
The skill declares no required environment variables or credentials. The runtime code does not reference environment variables or sensitive config paths in the provided snippets.
Persistence & Privilege
always is false and model invocation is allowed (the platform default). The skill does not request permanent/privileged presence nor modify other skills or agent-wide configs based on the provided files.
How to Use
- Make sure OpenClaw is installed (local or Docker)
- Run the install command in chat:
/install data-pipeline - After installation, invoke the skill by name or use
/data-pipeline - Provide required inputs per the skill's parameter spec and get structured output
Version History
v1.1.0
v1.1.0: Pipeline stability improvements. All 58 tests pass. Fixed Jest hang with --detectOpenHandles.
v1.0.0
Initial release: ETL pipeline orchestrator with lifecycle hooks, bundle execution, retry/timeout, built-in transformers. 26 tests.
Metadata
Frequently Asked Questions
What is Data Pipeline?
Lightweight ETL pipeline orchestrator with lifecycle hooks, bundle execution, stage retry, timeout control, and built-in transformers. 26 tests, 100% pass rate. It is an AI Agent Skill for Claude Code / OpenClaw, with 59 downloads so far.
How do I install Data Pipeline?
Run "/install data-pipeline" in the OpenClaw or Claude Code chat to install it in one step — no extra setup required.
Is Data Pipeline free?
Yes, Data Pipeline is completely free, licensed under MIT-0. You can download, install and use it at no cost.
Which platforms does Data Pipeline support?
Data Pipeline is cross-platform and runs anywhere OpenClaw / Claude Code is available (cross-platform).
Who created Data Pipeline?
It is built and maintained by pagoda111king (@pagoda111king); the current version is v1.1.0.
More Skills