Knowledge Graph - Etl Pipeline Generator
/install etl-pipeline-generator
ETL Pipeline Generator
Design automated Extract-Transform-Load pipelines for knowledge graph construction.
This skill generates structured ETL pipelines that move raw data from various sources into graph-ready datasets or knowledge graph storage systems. It orchestrates data ingestion, transformation, and loading steps so that messy input data becomes structured graph data.
Quick Start
Use When
- Building automated data ingestion workflows
- Designing Extract-Transform-Load pipelines
- Orchestrating multi-step data transformations
- Creating repeatable data pipeline architectures
- Integrating multiple data sources
- Automating knowledge graph population
- Building data quality and validation workflows
Inputs
- Data source specifications (type, location, format)
- Transformation requirements and rules
- Target system definitions (database, format)
- Data quality requirements
- Pipeline scheduling and monitoring rules
- Error handling and recovery strategies
Outputs
- Pipeline configuration (YAML)
- Directed Acyclic Graph (DAG) definitions
- Python/Scala/SQL implementation scripts
- Data flow diagrams
- Pipeline documentation
- Monitoring and alerting configurations
Example
Simple Customer Data Pipeline:
name: customer_graph_ingestion
description: Ingest customer data into knowledge graph
extract:
source_type: csv
source_path: data/customers.csv
format: csv
transform:
- normalize_entities
- detect_relationships
- validate_schema
load:
target: neo4j
database: customer_graph
method: bulk_import
Generated Pipeline Flow:
Extract CSV → Normalize Entities → Detect Relationships →
Validate Data → Load to Neo4j → Verify Load
ETL Pipeline Architecture
1. Extract Stage
Purpose: Retrieve raw data from external sources
Supported Sources:
- CSV files
- JSON files
- REST APIs
- Databases (SQL, NoSQL)
- Text documents
- Streaming sources (Kafka, Pub/Sub)
- Data warehouses (Snowflake, BigQuery)
Configuration:
extract:
source_type: csv|json|api|database|text|stream
location: /path/to/file or endpoint
format: format_specifier
authentication: credentials (if needed)
filtering: (optional) filter criteria
2. Transform Stage
Purpose: Convert raw data into graph-ready structures
Transformation Operations:
- Entity Detection - Identify entity boundaries
- Relationship Inference - Discover relationships
- Schema Mapping - Map to target schema
- Data Normalization - Standardize values
- Type Conversion - Convert data types
- Deduplication - Remove duplicates
- Enrichment - Add contextual data
- Validation - Check data quality
- Filtering - Remove invalid records
Example Transform:
transform:
operations:
- normalize_entity_names
- infer_relationships_from_columns
- convert_dates_to_iso8601
- deduplicate_entities
- validate_required_fields
- enrich_with_external_data
3. Load Stage
Purpose: Load transformed data into target system
Supported Targets:
- Neo4j - Property graph database
- RDF Triple Stores - SPARQL endpoint
- ArangoDB - Multi-model database
- TigerGraph - Graph database
- Property Graph Engines - Generic property graphs
- CSV/JSON - File output
Configuration:
load:
target: neo4j|rdf|arangodb|tigergraph|property_graph|file
connection_params:
host: localhost
port: 7687
database: graph_database
method: bulk_import|streaming|batch
batch_size: 1000
Pipeline Stages Explained
Extract Stage Details
Data Source Connectors:
CSV Extractor
- delimiter detection
- header parsing
- encoding handling
- chunk processing
JSON Extractor
- nested object handling
- array flattening
- schema inference
- streaming JSON support
API Extractor
- authentication handling
- pagination support
- rate limiting
- response transformation
Database Extractor
- SQL query execution
- connection pooling
- partition handling
- transaction management
Transform Stage Details
Data Processing Operations:
Normalization
- case normalization
- whitespace trimming
- special character handling
- URL encoding
Type Conversion
- string → integer/float
- string → datetime
- string → boolean
- safe type conversions
Deduplication
- exact match detection
- fuzzy matching
- identity resolution
- merge strategies
Validation
- required field checks
- data type validation
- range validation
- pattern matching
Load Stage Details
Data Loading Methods:
Bulk Import
- CSV nodes/edges files
- Batch processing
- High throughput
- Transactional rollback
Streaming Load
- Real-time ingestion
- Event-based
- Lower latency
- Connection streaming
Batch Load
- Scheduled runs
- Configurable batch sizes
- Progress tracking
- Failure recovery
Pipeline Execution Modes
1. Synchronous Execution
Sequential execution of all stages
Immediate feedback on success/failure
Best for: Small to medium datasets
2. Asynchronous Execution
Non-blocking pipeline execution
Async progress monitoring
Best for: Large datasets, scheduled runs
3. Streaming Mode
Continuous data flow
Real-time processing
Best for: Live data feeds, event streams
4. Scheduled Execution
Cron-based or scheduled triggers
Repeatable, idempotent operations
Best for: Regular data refreshes
Pipeline Monitoring and Observability
Monitoring Capabilities
- Step execution times
- Data volume metrics (rows in/out)
- Error tracking and logging
- Data quality metrics
- Performance profiling
Alerting
alerts:
- condition: execution_time > 3600s
action: notify_admin
- condition: error_count > 10
action: pause_pipeline
- condition: data_quality_score \x3C 0.9
action: log_warning
Error Handling and Recovery
Error Handling Strategies
Fail Fast: Stop on first error
Fail Safe: Continue with logging
Retry: Exponential backoff retry
Skip: Skip failed records, continue
Dead Letter: Route to error queue
Recovery Mechanisms
Checkpoint/Resume: Save state, resume from checkpoint
Rollback: Undo on failure
Compensation: Execute cleanup actions
Retry with Backoff: Automatic retry logic
Output Formats
YAML Pipeline Configuration
name: pipeline_name
description: pipeline description
extract:
source_type: csv
location: data/file.csv
transform:
operations:
- normalize_entities
- validate_data
load:
target: neo4j
database: my_graph
DAG (Directed Acyclic Graph)
Extract(csv) → Parse → Normalize →
Validate → Deduplicate → Load(Neo4j) → Verify
Python Implementation
def pipeline_execution():
data = extract_from_csv('data.csv')
data = normalize_entities(data)
data = validate_data(data)
load_to_neo4j(data)
return verify_load()
Execution Metrics
{
"pipeline_id": "customer_ingestion_001",
"start_time": "2024-04-09T10:00:00Z",
"end_time": "2024-04-09T10:05:30Z",
"duration_seconds": 330,
"stages": [
{"name": "extract", "duration": 45, "records": 10000},
{"name": "transform", "duration": 120, "records": 9900},
{"name": "load", "duration": 150, "records": 9900}
],
"status": "SUCCESS"
}
Execution Steps
- Validate Pipeline Configuration – Check all parameters
- Initialize Extractors – Set up data source connections
- Execute Extract – Retrieve raw data
- Initialize Transformers – Prepare transformation engines
- Execute Transform – Apply transformation operations
- Validate Transformed Data – Check data quality
- Initialize Loaders – Set up target connections
- Execute Load – Load data to target system
- Verify Load – Confirm successful loading
- Generate Metrics – Collect execution statistics
Integration with Other Skills
The ETL Pipeline Generator orchestrates these skills:
- API Ingestion Connectors → Extract from APIs
- CSV Graph Loader Generator → Transform and load CSVs
- JSON to Triples Converter → Transform JSON to RDF
- Text Entity Relation Extractor → Extract from text
- Graph Schema Validation → Validate loaded data
- Graph Constraint Generator → Apply constraints
Recommended Libraries
- Orchestration: Apache Airflow, Prefect, Dagster
- ETL Processing: Apache Spark, Pandas, Polars
- Data Validation: Great Expectations, Pandera
- Graph Loaders: neo4j, rdflib, networkx
- Scheduling: APScheduler, Luigi, Celery
- Monitoring: Prometheus, Grafana, DataDog
Best Practices
✓ Idempotency – Pipelines can be rerun safely ✓ Data Validation – Validate at each stage ✓ Error Handling – Graceful error management ✓ Monitoring – Track pipeline execution ✓ Documentation – Document data lineage ✓ Testing – Test with sample data first ✓ Version Control – Track pipeline changes ✓ Scalability – Design for data growth ✓ Security – Secure credentials and data ✓ Performance – Optimize transformation logic
References
See pipeline-patterns.md for detailed ETL pipeline patterns and example-pipelines.md for complete real-world pipeline examples.
Version: 1.0.0
- 确保已安装 OpenClaw(本地或 Docker 部署)
- 在对话框中输入安装命令:
/install etl-pipeline-generator - 安装完成后,直接呼叫该 Skill 的名称或使用
/etl-pipeline-generator触发 - 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
Knowledge Graph - Etl Pipeline Generator 是什么?
Generate automated ETL pipelines for transforming and loading data into graph databases or knowledge graphs. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 38 次。
如何安装 Knowledge Graph - Etl Pipeline Generator?
在 OpenClaw 或 Claude Code 对话框中运行命令「/install etl-pipeline-generator」即可一键安装,无需额外配置。
Knowledge Graph - Etl Pipeline Generator 是免费的吗?
是的,Knowledge Graph - Etl Pipeline Generator 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。
Knowledge Graph - Etl Pipeline Generator 支持哪些平台?
Knowledge Graph - Etl Pipeline Generator 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。
谁开发了 Knowledge Graph - Etl Pipeline Generator?
由 Muhammad Asif(@fisa712)开发并维护,当前版本 v1.0.0。