← Back to Skills Marketplace
samledger67-dotcom

Data Pipeline Agent

by samledger67-dotcom · GitHub ↗ · v98.0.1 · MIT-0
cross-platform ⚠ suspicious
279
Downloads
0
Stars
1
Active Installs
3
Versions
Install in OpenClaw
/install data-pipeline-agent
Description
ETL pipeline builder for business data — API extraction, data cleaning, transformation, and warehouse loading. Use when you need to move data between systems...
README (SKILL.md)

Data Pipeline Agent

Build, run, and monitor ETL (Extract → Transform → Load) pipelines for business data. Specializes in financial data flows, API integrations, and warehouse loading patterns for accounting and operations teams.

When to Use

  • Extracting data from APIs (QBO, Stripe, Salesforce, bank feeds, etc.)
  • Cleaning and normalizing messy spreadsheets or CSV exports
  • Merging data from multiple sources into one canonical dataset
  • Loading transformed data into databases, data warehouses, or Google Sheets
  • Scheduling recurring data syncs (daily GL pulls, weekly AR aging refresh, etc.)
  • Auditing data quality — detecting nulls, duplicates, type mismatches

When NOT to Use

  • Real-time streaming — use Kafka, Kinesis, or Pub/Sub for sub-second latency
  • Interactive dashboards — this agent outputs data; visualization belongs in BI tools
  • Raw SQL query optimization — use DBA tooling for query plans and indexes
  • One-off manual exports — if it happens once, just download the CSV
  • Transactional writes to client systems — read-only extraction only unless Irfan approves write access

Pipeline Patterns

Pattern 1: API Extract → Clean → CSV

# Extract from REST API, clean, output CSV
import requests, pandas as pd, json
from datetime import datetime, timedelta

def extract(api_url, headers, params=None):
    """Pull paginated JSON from any REST endpoint."""
    results = []
    while api_url:
        r = requests.get(api_url, headers=headers, params=params)
        r.raise_for_status()
        data = r.json()
        results.extend(data.get("data", data if isinstance(data, list) else [data]))
        api_url = data.get("next_page_url")  # pagination
        params = None  # only pass params on first call
    return results

def clean(records, rename_map=None, drop_nulls_on=None, date_cols=None):
    """Normalize, rename, parse dates, drop nulls."""
    df = pd.DataFrame(records)
    if rename_map:
        df = df.rename(columns=rename_map)
    if date_cols:
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors="coerce")
    if drop_nulls_on:
        df = df.dropna(subset=drop_nulls_on)
    df = df.drop_duplicates()
    return df

def load_csv(df, output_path):
    df.to_csv(output_path, index=False)
    print(f"✅ Saved {len(df)} rows → {output_path}")

# Example: QBO Invoice Extract
HEADERS = {"Authorization": "Bearer \x3CTOKEN>", "Accept": "application/json"}
records = extract("https://quickbooks.api.intuit.com/v3/company/\x3CREALM>/query?query=SELECT * FROM Invoice", HEADERS)
df = clean(records, rename_map={"TxnDate": "invoice_date", "TotalAmt": "amount"}, date_cols=["invoice_date"])
load_csv(df, f"data/invoices_{datetime.today().date()}.csv")

Pattern 2: Multi-Source Merge

import pandas as pd

def merge_gl_with_bank(gl_path, bank_path, match_on="amount", date_tolerance_days=3):
    """
    Match GL entries to bank transactions.
    Flags unmatched rows for manual review.
    """
    gl = pd.read_csv(gl_path, parse_dates=["date"])
    bank = pd.read_csv(bank_path, parse_dates=["date"])

    # Merge on amount + date proximity
    merged = pd.merge_asof(
        gl.sort_values("date"),
        bank.sort_values("date"),
        on="date",
        by=match_on,
        tolerance=pd.Timedelta(days=date_tolerance_days),
        direction="nearest",
        suffixes=("_gl", "_bank")
    )

    unmatched_gl = gl[~gl.index.isin(merged.dropna(subset=["date_bank"]).index)]
    unmatched_bank = bank[~bank.index.isin(merged.dropna(subset=["date_gl"]).index)]

    print(f"✅ Matched: {len(merged.dropna())} | ⚠️ Unmatched GL: {len(unmatched_gl)} | Bank: {len(unmatched_bank)}")
    return merged, unmatched_gl, unmatched_bank

Pattern 3: Data Quality Audit

import pandas as pd

def audit_dataset(df, required_cols=None, expected_types=None):
    """
    Run data quality checks. Returns a report dict.
    """
    report = {
        "row_count": len(df),
        "duplicate_rows": int(df.duplicated().sum()),
        "null_summary": df.isnull().sum().to_dict(),
        "issues": []
    }

    if required_cols:
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            report["issues"].append(f"Missing required columns: {missing}")

    if expected_types:
        for col, dtype in expected_types.items():
            if col in df.columns and not pd.api.types.is_dtype_equal(df[col].dtype, dtype):
                report["issues"].append(f"{col}: expected {dtype}, got {df[col].dtype}")

    # Flag columns with >20% nulls
    for col, nulls in report["null_summary"].items():
        pct = nulls / len(df) * 100
        if pct > 20:
            report["issues"].append(f"{col}: {pct:.1f}% null — review required")

    return report

# Usage
df = pd.read_csv("data/ar_aging.csv")
report = audit_dataset(
    df,
    required_cols=["customer_id", "invoice_date", "amount", "due_date"],
    expected_types={"amount": "float64", "customer_id": "object"}
)
print(report)

Pattern 4: Scheduled Cron Pipeline

#!/bin/bash
# daily-gl-sync.sh — run via cron or OpenClaw cron tool
# Extracts GL, cleans, loads to SQLite, notifies on error

set -euo pipefail
LOG="logs/gl-sync-$(date +%Y-%m-%d).log"
mkdir -p logs data

echo "[$(date)] Starting GL sync..." | tee -a "$LOG"

python3 pipelines/gl_extract.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_clean.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_load.py >> "$LOG" 2>&1 && \
echo "[$(date)] ✅ GL sync complete" | tee -a "$LOG" || \
echo "[$(date)] ❌ GL sync FAILED — check $LOG" | tee -a "$LOG"

Pattern 5: Load to SQLite / PostgreSQL

import pandas as pd
import sqlite3

def load_to_sqlite(df, db_path, table_name, if_exists="replace"):
    """
    Load DataFrame to SQLite. Use if_exists='append' for incremental loads.
    """
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists=if_exists, index=False)
    conn.close()
    print(f"✅ Loaded {len(df)} rows → {db_path}::{table_name}")

# PostgreSQL version (requires psycopg2 + sqlalchemy)
from sqlalchemy import create_engine

def load_to_postgres(df, conn_str, table_name, schema="public", if_exists="replace"):
    engine = create_engine(conn_str)
    df.to_sql(table_name, engine, schema=schema, if_exists=if_exists, index=False)
    print(f"✅ Loaded {len(df)} rows → {schema}.{table_name}")

Common Business Pipelines

AR Aging Refresh Pipeline

1. Extract: QBO Invoices API → raw JSON
2. Transform: Calculate days_outstanding, aging_bucket (0-30, 31-60, 61-90, 90+)
3. Enrich: Join with customer contact data
4. Load: Google Sheets "AR Aging" tab + SQLite archive
5. Alert: Flag invoices >60 days for follow-up queue

Bank Feed Reconciliation Pipeline

1. Extract: Bank API (Plaid/CSV export) + QBO GL
2. Transform: Normalize dates, amounts, memo fields
3. Match: Fuzzy join on amount + date (±3 days tolerance)
4. Flag: Unmatched transactions → manual review CSV
5. Load: Reconciliation log → SQLite + email summary

Payroll → GL Mapping Pipeline

1. Extract: Payroll system CSV export (Gusto, ADP, etc.)
2. Transform: Map payroll codes → GL account numbers
3. Validate: Totals match payroll register
4. Load: Journal entry template → QBO batch import format
5. Archive: Raw + transformed files in dated folder

Pipeline Design Checklist

Before building any pipeline:

  • Idempotency — Can the pipeline re-run without duplicating data?
  • Error handling — What happens if the API is down? Partial load?
  • Logging — Is every step logged with timestamps?
  • Data quality — Are nulls, duplicates, and type mismatches caught?
  • Reversibility — Can the load be rolled back if something goes wrong?
  • Rate limits — Does the source API have call limits? Add retry logic.
  • Secrets — Are API keys in env vars, not hardcoded?
  • Schedule — How often does this run? Who monitors it?

Data Cleaning Quick Reference

Problem Solution
Mixed date formats pd.to_datetime(col, infer_datetime_format=True)
Currency strings ("$1,234.56") col.str.replace(r'[$,]', '', regex=True).astype(float)
Duplicate rows df.drop_duplicates(subset=['id'])
Null amounts df['amount'].fillna(0) or df.dropna(subset=['amount'])
Inconsistent casing df['name'].str.strip().str.title()
Leading/trailing spaces df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
Outlier detection df[df['amount'].between(df['amount'].quantile(.01), df['amount'].quantile(.99))]

Scheduling with OpenClaw Cron

# Daily GL sync at 6 AM CST
Schedule: cron "0 6 * * *" tz=America/Chicago
Payload: agentTurn — "Run the daily GL sync pipeline in ~/workspace/pipelines/"
Delivery: announce to Telegram on completion or failure

Dependencies

Install Python data stack:

pip install pandas requests sqlalchemy psycopg2-binary openpyxl xlrd
# For Google Sheets
pip install gspread gspread-dataframe google-auth
# For Plaid bank feeds
pip install plaid-python

File Organization

workspace/
  pipelines/
    gl_extract.py
    gl_clean.py
    gl_load.py
    ar_aging.py
    bank_reconcile.py
  data/
    raw/          ← API responses, CSV imports (never edited)
    processed/    ← cleaned, transformed data
    archive/      ← date-stamped historical snapshots
  logs/
    pipeline-YYYY-MM-DD.log
  scripts/
    run-daily-pipelines.sh

Safety Rules

  1. Extract is always read-only. Never write to source systems during extraction.
  2. Archive raw data before any transformation — keep the original.
  3. Validate row counts before and after each transformation step.
  4. Test on sample data (10-100 rows) before running full pipeline.
  5. Client system writes require Irfan approval — QBO, bank APIs, payroll systems are extract-only by default.
  6. Never hardcode credentials — use environment variables or 1Password CLI.
Usage Guidance
This skill is an instruction-only ETL template and largely does what its description says, but exercise caution before using it in production: 1) Don’t paste real API keys, DB passwords, or service account tokens into chat — use a secrets manager or supply read-only, scoped credentials only when you run the code locally. 2) Inspect the full SKILL.md (the provided file appears truncated and contains a broken shell line like "mkdir -") and any scripts it would run; fix/understand shell commands before executing. 3) Test with dummy or sandbox accounts and read-only database replicas. 4) Ask the publisher for provenance (homepage, source repo, maintainer contact) — absence of a homepage and unknown source lowers trust. 5) If you plan to let the agent run autonomously, limit its network and credential access and prefer ephemeral, least-privilege tokens. If you need help reviewing the missing/truncated parts or converting examples into a secure runbook, provide the full SKILL.md and intended runtime environment and I can help.
Capability Analysis
Type: OpenClaw Skill Name: data-pipeline-agent Version: 98.0.1 The skill bundle provides legitimate ETL (Extract, Transform, Load) templates and instructions for an AI agent to automate data pipelines. The Python and Bash snippets in SKILL.md follow standard data engineering practices, including pagination handling, data validation, and the use of environment variables for secrets. While the _meta.json contains a future timestamp (2026) and an unusually high version number (98.0.1), the actual content is transparent, lacks obfuscation, and includes explicit safety rules regarding read-only access and credential management.
Capability Assessment
Purpose & Capability
Name/description match the SKILL.md content: the instructions are ETL templates for API extraction, cleaning, merging, auditing, and scheduled runs. However, the skill claims integrations with many external services (QuickBooks, Stripe, Salesforce, Google Sheets, warehouses) yet declares no required environment variables, credentials, or authentication guidance. That is a proportionality/metadata gap: a legitimate ETL skill normally documents how to provide API keys, OAuth tokens, or DB connection strings.
Instruction Scope
SKILL.md contains concrete example code (requests, pandas, CSV reads/writes, cron/bash) which stay within ETL scope. The examples reference local paths (data/*.csv, logs/) and API calls with Authorization headers — no explicit instructions to read unrelated system secrets or arbitrary host files are present. However the document is truncated and contains an obviously broken/partial shell command ("mkdir -") and ellipses, indicating the shipped instructions are incomplete/sloppy. The examples also implicitly require secret tokens but do not instruct secure handling, increasing risk that users will paste secrets into chat or code.
Install Mechanism
Instruction-only skill with no install spec and no code files — the lowest-risk install profile. Nothing is written to disk by an installer and there are no obscure download URLs or package installs baked into the skill itself.
Credentials
The skill declares no required env vars or primary credential despite explicitly showing usage patterns that require API tokens and DB credentials. This is a mismatch: the skill will in practice need secrets to access third-party APIs and warehouses. Because the skill lacks guidance on credential handling, an inexperienced user might share keys insecurely (e.g., in chat) or grant overly broad credentials. No config-path requirements are listed, but absence of declared secrets is surprising given the stated integrations.
Persistence & Privilege
always is false and there is no install-time persistence. Model invocation is allowed (the platform default) which means the agent could call the skill autonomously, but that is expected and not by itself a red flag here. The skill does not request modification of other skills or global agent settings.
How to Use
  1. Make sure OpenClaw is installed (local or Docker)
  2. Run the install command in chat: /install data-pipeline-agent
  3. After installation, invoke the skill by name or use /data-pipeline-agent
  4. Provide required inputs per the skill's parameter spec and get structured output
Version History
v98.0.1
Corrected display name
v98.0.0
probe
v1.0.0
Initial release
Metadata
Slug data-pipeline-agent
Version 98.0.1
License MIT-0
All-time Installs 1
Active Installs 1
Total Versions 3
Frequently Asked Questions

What is Data Pipeline Agent?

ETL pipeline builder for business data — API extraction, data cleaning, transformation, and warehouse loading. Use when you need to move data between systems... It is an AI Agent Skill for Claude Code / OpenClaw, with 279 downloads so far.

How do I install Data Pipeline Agent?

Run "/install data-pipeline-agent" in the OpenClaw or Claude Code chat to install it in one step — no extra setup required.

Is Data Pipeline Agent free?

Yes, Data Pipeline Agent is completely free, licensed under MIT-0. You can download, install and use it at no cost.

Which platforms does Data Pipeline Agent support?

Data Pipeline Agent is cross-platform and runs anywhere OpenClaw / Claude Code is available (cross-platform).

Who created Data Pipeline Agent?

It is built and maintained by samledger67-dotcom (@samledger67-dotcom); the current version is v98.0.1.

💬 Comments