← 返回 Skills 市场
fisa712

Knowledge Graph - Etl Pipeline Generator

作者 Muhammad Asif · GitHub ↗ · v1.0.0 · MIT-0
cross-platform ✓ 安全检测通过
38
总下载
0
收藏
1
当前安装
1
版本数
在 OpenClaw 中安装
/install etl-pipeline-generator
功能描述
Generate automated ETL pipelines for transforming and loading data into graph databases or knowledge graphs.
使用说明 (SKILL.md)

ETL Pipeline Generator

Design automated Extract-Transform-Load pipelines for knowledge graph construction.

This skill generates structured ETL pipelines that move raw data from various sources into graph-ready datasets or knowledge graph storage systems. It orchestrates data ingestion, transformation, and loading steps so that messy input data becomes structured graph data.

Quick Start

Use When

  • Building automated data ingestion workflows
  • Designing Extract-Transform-Load pipelines
  • Orchestrating multi-step data transformations
  • Creating repeatable data pipeline architectures
  • Integrating multiple data sources
  • Automating knowledge graph population
  • Building data quality and validation workflows

Inputs

  • Data source specifications (type, location, format)
  • Transformation requirements and rules
  • Target system definitions (database, format)
  • Data quality requirements
  • Pipeline scheduling and monitoring rules
  • Error handling and recovery strategies

Outputs

  • Pipeline configuration (YAML)
  • Directed Acyclic Graph (DAG) definitions
  • Python/Scala/SQL implementation scripts
  • Data flow diagrams
  • Pipeline documentation
  • Monitoring and alerting configurations

Example

Simple Customer Data Pipeline:

name: customer_graph_ingestion
description: Ingest customer data into knowledge graph

extract:
  source_type: csv
  source_path: data/customers.csv
  format: csv

transform:
  - normalize_entities
  - detect_relationships
  - validate_schema

load:
  target: neo4j
  database: customer_graph
  method: bulk_import

Generated Pipeline Flow:

Extract CSV → Normalize Entities → Detect Relationships → 
Validate Data → Load to Neo4j → Verify Load

ETL Pipeline Architecture

1. Extract Stage

Purpose: Retrieve raw data from external sources

Supported Sources:

  • CSV files
  • JSON files
  • REST APIs
  • Databases (SQL, NoSQL)
  • Text documents
  • Streaming sources (Kafka, Pub/Sub)
  • Data warehouses (Snowflake, BigQuery)

Configuration:

extract:
  source_type: csv|json|api|database|text|stream
  location: /path/to/file or endpoint
  format: format_specifier
  authentication: credentials (if needed)
  filtering: (optional) filter criteria

2. Transform Stage

Purpose: Convert raw data into graph-ready structures

Transformation Operations:

  • Entity Detection - Identify entity boundaries
  • Relationship Inference - Discover relationships
  • Schema Mapping - Map to target schema
  • Data Normalization - Standardize values
  • Type Conversion - Convert data types
  • Deduplication - Remove duplicates
  • Enrichment - Add contextual data
  • Validation - Check data quality
  • Filtering - Remove invalid records

Example Transform:

transform:
  operations:
    - normalize_entity_names
    - infer_relationships_from_columns
    - convert_dates_to_iso8601
    - deduplicate_entities
    - validate_required_fields
    - enrich_with_external_data

3. Load Stage

Purpose: Load transformed data into target system

Supported Targets:

  • Neo4j - Property graph database
  • RDF Triple Stores - SPARQL endpoint
  • ArangoDB - Multi-model database
  • TigerGraph - Graph database
  • Property Graph Engines - Generic property graphs
  • CSV/JSON - File output

Configuration:

load:
  target: neo4j|rdf|arangodb|tigergraph|property_graph|file
  connection_params:
    host: localhost
    port: 7687
    database: graph_database
  method: bulk_import|streaming|batch
  batch_size: 1000

Pipeline Stages Explained

Extract Stage Details

Data Source Connectors:

CSV Extractor
  - delimiter detection
  - header parsing
  - encoding handling
  - chunk processing

JSON Extractor
  - nested object handling
  - array flattening
  - schema inference
  - streaming JSON support

API Extractor
  - authentication handling
  - pagination support
  - rate limiting
  - response transformation

Database Extractor
  - SQL query execution
  - connection pooling
  - partition handling
  - transaction management

Transform Stage Details

Data Processing Operations:

Normalization
  - case normalization
  - whitespace trimming
  - special character handling
  - URL encoding

Type Conversion
  - string → integer/float
  - string → datetime
  - string → boolean
  - safe type conversions

Deduplication
  - exact match detection
  - fuzzy matching
  - identity resolution
  - merge strategies

Validation
  - required field checks
  - data type validation
  - range validation
  - pattern matching

Load Stage Details

Data Loading Methods:

Bulk Import
  - CSV nodes/edges files
  - Batch processing
  - High throughput
  - Transactional rollback

Streaming Load
  - Real-time ingestion
  - Event-based
  - Lower latency
  - Connection streaming

Batch Load
  - Scheduled runs
  - Configurable batch sizes
  - Progress tracking
  - Failure recovery

Pipeline Execution Modes

1. Synchronous Execution

Sequential execution of all stages
Immediate feedback on success/failure
Best for: Small to medium datasets

2. Asynchronous Execution

Non-blocking pipeline execution
Async progress monitoring
Best for: Large datasets, scheduled runs

3. Streaming Mode

Continuous data flow
Real-time processing
Best for: Live data feeds, event streams

4. Scheduled Execution

Cron-based or scheduled triggers
Repeatable, idempotent operations
Best for: Regular data refreshes

Pipeline Monitoring and Observability

Monitoring Capabilities

  • Step execution times
  • Data volume metrics (rows in/out)
  • Error tracking and logging
  • Data quality metrics
  • Performance profiling

Alerting

alerts:
  - condition: execution_time > 3600s
    action: notify_admin
  - condition: error_count > 10
    action: pause_pipeline
  - condition: data_quality_score \x3C 0.9
    action: log_warning

Error Handling and Recovery

Error Handling Strategies

Fail Fast: Stop on first error
Fail Safe: Continue with logging
Retry: Exponential backoff retry
Skip: Skip failed records, continue
Dead Letter: Route to error queue

Recovery Mechanisms

Checkpoint/Resume: Save state, resume from checkpoint
Rollback: Undo on failure
Compensation: Execute cleanup actions
Retry with Backoff: Automatic retry logic

Output Formats

YAML Pipeline Configuration

name: pipeline_name
description: pipeline description

extract:
  source_type: csv
  location: data/file.csv

transform:
  operations:
    - normalize_entities
    - validate_data

load:
  target: neo4j
  database: my_graph

DAG (Directed Acyclic Graph)

Extract(csv) → Parse → Normalize → 
Validate → Deduplicate → Load(Neo4j) → Verify

Python Implementation

def pipeline_execution():
    data = extract_from_csv('data.csv')
    data = normalize_entities(data)
    data = validate_data(data)
    load_to_neo4j(data)
    return verify_load()

Execution Metrics

{
  "pipeline_id": "customer_ingestion_001",
  "start_time": "2024-04-09T10:00:00Z",
  "end_time": "2024-04-09T10:05:30Z",
  "duration_seconds": 330,
  "stages": [
    {"name": "extract", "duration": 45, "records": 10000},
    {"name": "transform", "duration": 120, "records": 9900},
    {"name": "load", "duration": 150, "records": 9900}
  ],
  "status": "SUCCESS"
}

Execution Steps

  1. Validate Pipeline Configuration – Check all parameters
  2. Initialize Extractors – Set up data source connections
  3. Execute Extract – Retrieve raw data
  4. Initialize Transformers – Prepare transformation engines
  5. Execute Transform – Apply transformation operations
  6. Validate Transformed Data – Check data quality
  7. Initialize Loaders – Set up target connections
  8. Execute Load – Load data to target system
  9. Verify Load – Confirm successful loading
  10. Generate Metrics – Collect execution statistics

Integration with Other Skills

The ETL Pipeline Generator orchestrates these skills:

  • API Ingestion Connectors → Extract from APIs
  • CSV Graph Loader Generator → Transform and load CSVs
  • JSON to Triples Converter → Transform JSON to RDF
  • Text Entity Relation Extractor → Extract from text
  • Graph Schema Validation → Validate loaded data
  • Graph Constraint Generator → Apply constraints

Recommended Libraries

  • Orchestration: Apache Airflow, Prefect, Dagster
  • ETL Processing: Apache Spark, Pandas, Polars
  • Data Validation: Great Expectations, Pandera
  • Graph Loaders: neo4j, rdflib, networkx
  • Scheduling: APScheduler, Luigi, Celery
  • Monitoring: Prometheus, Grafana, DataDog

Best Practices

Idempotency – Pipelines can be rerun safely ✓ Data Validation – Validate at each stage ✓ Error Handling – Graceful error management ✓ Monitoring – Track pipeline execution ✓ Documentation – Document data lineage ✓ Testing – Test with sample data first ✓ Version Control – Track pipeline changes ✓ Scalability – Design for data growth ✓ Security – Secure credentials and data ✓ Performance – Optimize transformation logic

References

See pipeline-patterns.md for detailed ETL pipeline patterns and example-pipelines.md for complete real-world pipeline examples.


Version: 1.0.0

安全使用建议
Install only if you are comfortable reviewing generated ETL code before use. Use staging data first, avoid embedding secrets in configs, use least-privilege credentials, confirm production writes or deletes explicitly, and keep backups or rollback plans for target databases.
能力评估
Purpose & Capability
The documented purpose is generating ETL pipeline configs, DAGs, scripts, and patterns for graph/knowledge-graph loading; API, database, file, bulk-load, streaming, and update/delete examples fit that purpose.
Instruction Scope
The instructions are broad ETL design guidance and include credentials, database queries, bulk writes, scheduled/streaming modes, and a hard-delete pattern; these are disclosed and purpose-aligned, but production-data guardrails are sparse.
Install Mechanism
No install-time execution, package dependencies, obfuscated setup, or suspicious static-scan findings were present; VirusTotal telemetry was clean.
Credentials
The included Python utility mostly generates configs/scripts and can read local CSV/JSON when explicitly executed; examples show external APIs and databases as ETL templates rather than hidden runtime behavior.
Persistence & Privilege
No privilege escalation, background persistence, credential harvesting, or autonomous long-running worker is installed; scheduled and continuous modes are described as generated pipeline patterns.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install etl-pipeline-generator
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /etl-pipeline-generator 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.0.0
- Initial release of ETL Pipeline Generator skill. - Automates the creation of ETL pipelines for transforming and loading data into graph databases or knowledge graphs. - Supports multiple data source types (CSV, JSON, APIs, databases, streams) and targets (Neo4j, RDF triple stores, ArangoDB, TigerGraph, file outputs). - Generates pipeline configurations, DAGs, implementation scripts, data flow diagrams, and monitoring settings. - Includes robust features for data quality, error handling, recovery, and pipeline observability. - Supports multiple execution modes: synchronous, asynchronous, streaming, and scheduled.
元数据
Slug etl-pipeline-generator
版本 1.0.0
许可证 MIT-0
累计安装 1
当前安装数 1
历史版本数 1
常见问题

Knowledge Graph - Etl Pipeline Generator 是什么?

Generate automated ETL pipelines for transforming and loading data into graph databases or knowledge graphs. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 38 次。

如何安装 Knowledge Graph - Etl Pipeline Generator?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install etl-pipeline-generator」即可一键安装,无需额外配置。

Knowledge Graph - Etl Pipeline Generator 是免费的吗?

是的,Knowledge Graph - Etl Pipeline Generator 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

Knowledge Graph - Etl Pipeline Generator 支持哪些平台?

Knowledge Graph - Etl Pipeline Generator 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 Knowledge Graph - Etl Pipeline Generator?

由 Muhammad Asif(@fisa712)开发并维护,当前版本 v1.0.0。

💬 留言讨论