← 返回 Skills 市场
samledger67-dotcom

Data Pipeline Agent

作者 samledger67-dotcom · GitHub ↗ · v98.0.1 · MIT-0
cross-platform ⚠ suspicious
279
总下载
0
收藏
1
当前安装
3
版本数
在 OpenClaw 中安装
/install data-pipeline-agent
功能描述
ETL pipeline builder for business data — API extraction, data cleaning, transformation, and warehouse loading. Use when you need to move data between systems...
使用说明 (SKILL.md)

Data Pipeline Agent

Build, run, and monitor ETL (Extract → Transform → Load) pipelines for business data. Specializes in financial data flows, API integrations, and warehouse loading patterns for accounting and operations teams.

When to Use

  • Extracting data from APIs (QBO, Stripe, Salesforce, bank feeds, etc.)
  • Cleaning and normalizing messy spreadsheets or CSV exports
  • Merging data from multiple sources into one canonical dataset
  • Loading transformed data into databases, data warehouses, or Google Sheets
  • Scheduling recurring data syncs (daily GL pulls, weekly AR aging refresh, etc.)
  • Auditing data quality — detecting nulls, duplicates, type mismatches

When NOT to Use

  • Real-time streaming — use Kafka, Kinesis, or Pub/Sub for sub-second latency
  • Interactive dashboards — this agent outputs data; visualization belongs in BI tools
  • Raw SQL query optimization — use DBA tooling for query plans and indexes
  • One-off manual exports — if it happens once, just download the CSV
  • Transactional writes to client systems — read-only extraction only unless Irfan approves write access

Pipeline Patterns

Pattern 1: API Extract → Clean → CSV

# Extract from REST API, clean, output CSV
import requests, pandas as pd, json
from datetime import datetime, timedelta

def extract(api_url, headers, params=None):
    """Pull paginated JSON from any REST endpoint."""
    results = []
    while api_url:
        r = requests.get(api_url, headers=headers, params=params)
        r.raise_for_status()
        data = r.json()
        results.extend(data.get("data", data if isinstance(data, list) else [data]))
        api_url = data.get("next_page_url")  # pagination
        params = None  # only pass params on first call
    return results

def clean(records, rename_map=None, drop_nulls_on=None, date_cols=None):
    """Normalize, rename, parse dates, drop nulls."""
    df = pd.DataFrame(records)
    if rename_map:
        df = df.rename(columns=rename_map)
    if date_cols:
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors="coerce")
    if drop_nulls_on:
        df = df.dropna(subset=drop_nulls_on)
    df = df.drop_duplicates()
    return df

def load_csv(df, output_path):
    df.to_csv(output_path, index=False)
    print(f"✅ Saved {len(df)} rows → {output_path}")

# Example: QBO Invoice Extract
HEADERS = {"Authorization": "Bearer \x3CTOKEN>", "Accept": "application/json"}
records = extract("https://quickbooks.api.intuit.com/v3/company/\x3CREALM>/query?query=SELECT * FROM Invoice", HEADERS)
df = clean(records, rename_map={"TxnDate": "invoice_date", "TotalAmt": "amount"}, date_cols=["invoice_date"])
load_csv(df, f"data/invoices_{datetime.today().date()}.csv")

Pattern 2: Multi-Source Merge

import pandas as pd

def merge_gl_with_bank(gl_path, bank_path, match_on="amount", date_tolerance_days=3):
    """
    Match GL entries to bank transactions.
    Flags unmatched rows for manual review.
    """
    gl = pd.read_csv(gl_path, parse_dates=["date"])
    bank = pd.read_csv(bank_path, parse_dates=["date"])

    # Merge on amount + date proximity
    merged = pd.merge_asof(
        gl.sort_values("date"),
        bank.sort_values("date"),
        on="date",
        by=match_on,
        tolerance=pd.Timedelta(days=date_tolerance_days),
        direction="nearest",
        suffixes=("_gl", "_bank")
    )

    unmatched_gl = gl[~gl.index.isin(merged.dropna(subset=["date_bank"]).index)]
    unmatched_bank = bank[~bank.index.isin(merged.dropna(subset=["date_gl"]).index)]

    print(f"✅ Matched: {len(merged.dropna())} | ⚠️ Unmatched GL: {len(unmatched_gl)} | Bank: {len(unmatched_bank)}")
    return merged, unmatched_gl, unmatched_bank

Pattern 3: Data Quality Audit

import pandas as pd

def audit_dataset(df, required_cols=None, expected_types=None):
    """
    Run data quality checks. Returns a report dict.
    """
    report = {
        "row_count": len(df),
        "duplicate_rows": int(df.duplicated().sum()),
        "null_summary": df.isnull().sum().to_dict(),
        "issues": []
    }

    if required_cols:
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            report["issues"].append(f"Missing required columns: {missing}")

    if expected_types:
        for col, dtype in expected_types.items():
            if col in df.columns and not pd.api.types.is_dtype_equal(df[col].dtype, dtype):
                report["issues"].append(f"{col}: expected {dtype}, got {df[col].dtype}")

    # Flag columns with >20% nulls
    for col, nulls in report["null_summary"].items():
        pct = nulls / len(df) * 100
        if pct > 20:
            report["issues"].append(f"{col}: {pct:.1f}% null — review required")

    return report

# Usage
df = pd.read_csv("data/ar_aging.csv")
report = audit_dataset(
    df,
    required_cols=["customer_id", "invoice_date", "amount", "due_date"],
    expected_types={"amount": "float64", "customer_id": "object"}
)
print(report)

Pattern 4: Scheduled Cron Pipeline

#!/bin/bash
# daily-gl-sync.sh — run via cron or OpenClaw cron tool
# Extracts GL, cleans, loads to SQLite, notifies on error

set -euo pipefail
LOG="logs/gl-sync-$(date +%Y-%m-%d).log"
mkdir -p logs data

echo "[$(date)] Starting GL sync..." | tee -a "$LOG"

python3 pipelines/gl_extract.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_clean.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_load.py >> "$LOG" 2>&1 && \
echo "[$(date)] ✅ GL sync complete" | tee -a "$LOG" || \
echo "[$(date)] ❌ GL sync FAILED — check $LOG" | tee -a "$LOG"

Pattern 5: Load to SQLite / PostgreSQL

import pandas as pd
import sqlite3

def load_to_sqlite(df, db_path, table_name, if_exists="replace"):
    """
    Load DataFrame to SQLite. Use if_exists='append' for incremental loads.
    """
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists=if_exists, index=False)
    conn.close()
    print(f"✅ Loaded {len(df)} rows → {db_path}::{table_name}")

# PostgreSQL version (requires psycopg2 + sqlalchemy)
from sqlalchemy import create_engine

def load_to_postgres(df, conn_str, table_name, schema="public", if_exists="replace"):
    engine = create_engine(conn_str)
    df.to_sql(table_name, engine, schema=schema, if_exists=if_exists, index=False)
    print(f"✅ Loaded {len(df)} rows → {schema}.{table_name}")

Common Business Pipelines

AR Aging Refresh Pipeline

1. Extract: QBO Invoices API → raw JSON
2. Transform: Calculate days_outstanding, aging_bucket (0-30, 31-60, 61-90, 90+)
3. Enrich: Join with customer contact data
4. Load: Google Sheets "AR Aging" tab + SQLite archive
5. Alert: Flag invoices >60 days for follow-up queue

Bank Feed Reconciliation Pipeline

1. Extract: Bank API (Plaid/CSV export) + QBO GL
2. Transform: Normalize dates, amounts, memo fields
3. Match: Fuzzy join on amount + date (±3 days tolerance)
4. Flag: Unmatched transactions → manual review CSV
5. Load: Reconciliation log → SQLite + email summary

Payroll → GL Mapping Pipeline

1. Extract: Payroll system CSV export (Gusto, ADP, etc.)
2. Transform: Map payroll codes → GL account numbers
3. Validate: Totals match payroll register
4. Load: Journal entry template → QBO batch import format
5. Archive: Raw + transformed files in dated folder

Pipeline Design Checklist

Before building any pipeline:

  • Idempotency — Can the pipeline re-run without duplicating data?
  • Error handling — What happens if the API is down? Partial load?
  • Logging — Is every step logged with timestamps?
  • Data quality — Are nulls, duplicates, and type mismatches caught?
  • Reversibility — Can the load be rolled back if something goes wrong?
  • Rate limits — Does the source API have call limits? Add retry logic.
  • Secrets — Are API keys in env vars, not hardcoded?
  • Schedule — How often does this run? Who monitors it?

Data Cleaning Quick Reference

Problem Solution
Mixed date formats pd.to_datetime(col, infer_datetime_format=True)
Currency strings ("$1,234.56") col.str.replace(r'[$,]', '', regex=True).astype(float)
Duplicate rows df.drop_duplicates(subset=['id'])
Null amounts df['amount'].fillna(0) or df.dropna(subset=['amount'])
Inconsistent casing df['name'].str.strip().str.title()
Leading/trailing spaces df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
Outlier detection df[df['amount'].between(df['amount'].quantile(.01), df['amount'].quantile(.99))]

Scheduling with OpenClaw Cron

# Daily GL sync at 6 AM CST
Schedule: cron "0 6 * * *" tz=America/Chicago
Payload: agentTurn — "Run the daily GL sync pipeline in ~/workspace/pipelines/"
Delivery: announce to Telegram on completion or failure

Dependencies

Install Python data stack:

pip install pandas requests sqlalchemy psycopg2-binary openpyxl xlrd
# For Google Sheets
pip install gspread gspread-dataframe google-auth
# For Plaid bank feeds
pip install plaid-python

File Organization

workspace/
  pipelines/
    gl_extract.py
    gl_clean.py
    gl_load.py
    ar_aging.py
    bank_reconcile.py
  data/
    raw/          ← API responses, CSV imports (never edited)
    processed/    ← cleaned, transformed data
    archive/      ← date-stamped historical snapshots
  logs/
    pipeline-YYYY-MM-DD.log
  scripts/
    run-daily-pipelines.sh

Safety Rules

  1. Extract is always read-only. Never write to source systems during extraction.
  2. Archive raw data before any transformation — keep the original.
  3. Validate row counts before and after each transformation step.
  4. Test on sample data (10-100 rows) before running full pipeline.
  5. Client system writes require Irfan approval — QBO, bank APIs, payroll systems are extract-only by default.
  6. Never hardcode credentials — use environment variables or 1Password CLI.
安全使用建议
This skill is an instruction-only ETL template and largely does what its description says, but exercise caution before using it in production: 1) Don’t paste real API keys, DB passwords, or service account tokens into chat — use a secrets manager or supply read-only, scoped credentials only when you run the code locally. 2) Inspect the full SKILL.md (the provided file appears truncated and contains a broken shell line like "mkdir -") and any scripts it would run; fix/understand shell commands before executing. 3) Test with dummy or sandbox accounts and read-only database replicas. 4) Ask the publisher for provenance (homepage, source repo, maintainer contact) — absence of a homepage and unknown source lowers trust. 5) If you plan to let the agent run autonomously, limit its network and credential access and prefer ephemeral, least-privilege tokens. If you need help reviewing the missing/truncated parts or converting examples into a secure runbook, provide the full SKILL.md and intended runtime environment and I can help.
功能分析
Type: OpenClaw Skill Name: data-pipeline-agent Version: 98.0.1 The skill bundle provides legitimate ETL (Extract, Transform, Load) templates and instructions for an AI agent to automate data pipelines. The Python and Bash snippets in SKILL.md follow standard data engineering practices, including pagination handling, data validation, and the use of environment variables for secrets. While the _meta.json contains a future timestamp (2026) and an unusually high version number (98.0.1), the actual content is transparent, lacks obfuscation, and includes explicit safety rules regarding read-only access and credential management.
能力评估
Purpose & Capability
Name/description match the SKILL.md content: the instructions are ETL templates for API extraction, cleaning, merging, auditing, and scheduled runs. However, the skill claims integrations with many external services (QuickBooks, Stripe, Salesforce, Google Sheets, warehouses) yet declares no required environment variables, credentials, or authentication guidance. That is a proportionality/metadata gap: a legitimate ETL skill normally documents how to provide API keys, OAuth tokens, or DB connection strings.
Instruction Scope
SKILL.md contains concrete example code (requests, pandas, CSV reads/writes, cron/bash) which stay within ETL scope. The examples reference local paths (data/*.csv, logs/) and API calls with Authorization headers — no explicit instructions to read unrelated system secrets or arbitrary host files are present. However the document is truncated and contains an obviously broken/partial shell command ("mkdir -") and ellipses, indicating the shipped instructions are incomplete/sloppy. The examples also implicitly require secret tokens but do not instruct secure handling, increasing risk that users will paste secrets into chat or code.
Install Mechanism
Instruction-only skill with no install spec and no code files — the lowest-risk install profile. Nothing is written to disk by an installer and there are no obscure download URLs or package installs baked into the skill itself.
Credentials
The skill declares no required env vars or primary credential despite explicitly showing usage patterns that require API tokens and DB credentials. This is a mismatch: the skill will in practice need secrets to access third-party APIs and warehouses. Because the skill lacks guidance on credential handling, an inexperienced user might share keys insecurely (e.g., in chat) or grant overly broad credentials. No config-path requirements are listed, but absence of declared secrets is surprising given the stated integrations.
Persistence & Privilege
always is false and there is no install-time persistence. Model invocation is allowed (the platform default) which means the agent could call the skill autonomously, but that is expected and not by itself a red flag here. The skill does not request modification of other skills or global agent settings.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install data-pipeline-agent
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /data-pipeline-agent 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v98.0.1
Corrected display name
v98.0.0
probe
v1.0.0
Initial release
元数据
Slug data-pipeline-agent
版本 98.0.1
许可证 MIT-0
累计安装 1
当前安装数 1
历史版本数 3
常见问题

Data Pipeline Agent 是什么?

ETL pipeline builder for business data — API extraction, data cleaning, transformation, and warehouse loading. Use when you need to move data between systems... 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 279 次。

如何安装 Data Pipeline Agent?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install data-pipeline-agent」即可一键安装,无需额外配置。

Data Pipeline Agent 是免费的吗?

是的,Data Pipeline Agent 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

Data Pipeline Agent 支持哪些平台?

Data Pipeline Agent 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 Data Pipeline Agent?

由 samledger67-dotcom(@samledger67-dotcom)开发并维护,当前版本 v98.0.1。

💬 留言讨论