← Back to Skills Marketplace
alvisdunlop

data-lineage-tracker

by AlvisDunlop · GitHub ↗ · v1.0.0 · MIT-0
darwinlinuxwin32 ⚠ suspicious
78
Downloads
0
Stars
0
Active Installs
1
Versions
Install in OpenClaw
/install abe-data-lineage-tracker
Description
Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues.
README (SKILL.md)

Data Lineage Tracker for Construction

Overview

Track the origin, transformations, and flow of construction data through systems. Provides audit trails for compliance, helps debug data issues, and ensures data governance.

AI-powered analysis is routed through SkillBoss API Hub (https://api.heybossai.com/v1/pilot) — no separate AI provider keys required.

Business Case

Construction projects require data accountability:

  • Audit Compliance: Know where every number came from
  • Issue Resolution: Trace data problems to their source
  • Change Impact: Understand what downstream systems are affected
  • Regulatory Requirements: Maintain data provenance for legal/insurance

Technical Implementation

import requests, os
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
import json
import hashlib
import uuid

SKILLBOSS_API_KEY = os.environ["SKILLBOSS_API_KEY"]
API_BASE = "https://api.heybossai.com/v1"

def pilot(body: dict) -> dict:
    r = requests.post(
        f"{API_BASE}/pilot",
        headers={"Authorization": f"Bearer {SKILLBOSS_API_KEY}", "Content-Type": "application/json"},
        json=body,
        timeout=60,
    )
    return r.json()

class TransformationType(Enum):
    EXTRACT = "extract"
    TRANSFORM = "transform"
    LOAD = "load"
    AGGREGATE = "aggregate"
    JOIN = "join"
    FILTER = "filter"
    CALCULATE = "calculate"
    MANUAL_EDIT = "manual_edit"
    IMPORT = "import"
    EXPORT = "export"

@dataclass
class DataSource:
    id: str
    name: str
    system: str
    location: str
    owner: str
    created_at: datetime

@dataclass
class TransformationStep:
    id: str
    transformation_type: TransformationType
    description: str
    input_entities: List[str]
    output_entities: List[str]
    logic: str  # SQL, Python, or description
    performed_by: str  # user or system
    performed_at: datetime
    parameters: Dict[str, Any] = field(default_factory=dict)

@dataclass
class DataEntity:
    id: str
    name: str
    source_id: str
    entity_type: str  # table, file, field, record
    created_at: datetime
    version: int = 1
    checksum: Optional[str] = None
    parent_entities: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class LineageRecord:
    id: str
    entity_id: str
    transformation_id: str
    upstream_entities: List[str]
    downstream_entities: List[str]
    recorded_at: datetime

class ConstructionDataLineageTracker:
    """Track data lineage for construction data flows."""

    def __init__(self, project_id: str):
        self.project_id = project_id
        self.sources: Dict[str, DataSource] = {}
        self.entities: Dict[str, DataEntity] = {}
        self.transformations: Dict[str, TransformationStep] = {}
        self.lineage_records: List[LineageRecord] = []

    def register_source(self, name: str, system: str, location: str, owner: str) -> DataSource:
        """Register a new data source."""
        source = DataSource(
            id=f"SRC-{uuid.uuid4().hex[:8]}",
            name=name,
            system=system,
            location=location,
            owner=owner,
            created_at=datetime.now()
        )
        self.sources[source.id] = source
        return source

    def register_entity(self, name: str, source_id: str, entity_type: str,
                       parent_entities: List[str] = None,
                       metadata: Dict = None) -> DataEntity:
        """Register a data entity (table, file, field)."""
        entity = DataEntity(
            id=f"ENT-{uuid.uuid4().hex[:8]}",
            name=name,
            source_id=source_id,
            entity_type=entity_type,
            created_at=datetime.now(),
            parent_entities=parent_entities or [],
            metadata=metadata or {}
        )
        self.entities[entity.id] = entity
        return entity

    def calculate_checksum(self, data: Any) -> str:
        """Calculate checksum for data verification."""
        if isinstance(data, str):
            content = data
        else:
            content = json.dumps(data, sort_keys=True, default=str)
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def record_transformation(self,
                             transformation_type: TransformationType,
                             description: str,
                             input_entities: List[str],
                             output_entities: List[str],
                             logic: str,
                             performed_by: str,
                             parameters: Dict = None) -> TransformationStep:
        """Record a data transformation."""
        transformation = TransformationStep(
            id=f"TRF-{uuid.uuid4().hex[:8]}",
            transformation_type=transformation_type,
            description=description,
            input_entities=input_entities,
            output_entities=output_entities,
            logic=logic,
            performed_by=performed_by,
            performed_at=datetime.now(),
            parameters=parameters or {}
        )
        self.transformations[transformation.id] = transformation

        # Create lineage records
        for output_id in output_entities:
            record = LineageRecord(
                id=f"LIN-{uuid.uuid4().hex[:8]}",
                entity_id=output_id,
                transformation_id=transformation.id,
                upstream_entities=input_entities,
                downstream_entities=[],
                recorded_at=datetime.now()
            )
            self.lineage_records.append(record)

            # Update downstream references for input entities
            for input_id in input_entities:
                for existing_record in self.lineage_records:
                    if existing_record.entity_id == input_id:
                        existing_record.downstream_entities.append(output_id)

        return transformation

    def trace_upstream(self, entity_id: str, depth: int = None) -> List[Dict]:
        """Trace all upstream sources of an entity."""
        visited = set()
        lineage = []

        def trace(eid: str, current_depth: int):
            if eid in visited:
                return
            if depth is not None and current_depth > depth:
                return

            visited.add(eid)

            entity = self.entities.get(eid)
            if not entity:
                return

            # Find transformations that produced this entity
            for record in self.lineage_records:
                if record.entity_id == eid:
                    transformation = self.transformations.get(record.transformation_id)
                    if transformation:
                        lineage.append({
                            'entity': entity.name,
                            'entity_id': eid,
                            'depth': current_depth,
                            'transformation': transformation.description,
                            'transformation_type': transformation.transformation_type.value,
                            'performed_at': transformation.performed_at.isoformat(),
                            'performed_by': transformation.performed_by,
                            'upstream': record.upstream_entities
                        })

                        for upstream_id in record.upstream_entities:
                            trace(upstream_id, current_depth + 1)

        trace(entity_id, 0)
        return sorted(lineage, key=lambda x: x['depth'])

    def trace_downstream(self, entity_id: str, depth: int = None) -> List[Dict]:
        """Trace all downstream dependencies of an entity."""
        visited = set()
        dependencies = []

        def trace(eid: str, current_depth: int):
            if eid in visited:
                return
            if depth is not None and current_depth > depth:
                return

            visited.add(eid)

            entity = self.entities.get(eid)
            if not entity:
                return

            # Find entities that use this entity
            for record in self.lineage_records:
                if eid in record.upstream_entities:
                    transformation = self.transformations.get(record.transformation_id)
                    if transformation:
                        dependencies.append({
                            'entity': self.entities[record.entity_id].name if record.entity_id in self.entities else record.entity_id,
                            'entity_id': record.entity_id,
                            'depth': current_depth,
                            'transformation': transformation.description,
                            'transformation_type': transformation.transformation_type.value
                        })

                        trace(record.entity_id, current_depth + 1)

        trace(entity_id, 0)
        return sorted(dependencies, key=lambda x: x['depth'])

    def get_entity_history(self, entity_id: str) -> List[Dict]:
        """Get complete history of changes to an entity."""
        history = []

        for record in self.lineage_records:
            if record.entity_id == entity_id:
                transformation = self.transformations.get(record.transformation_id)
                if transformation:
                    history.append({
                        'timestamp': transformation.performed_at.isoformat(),
                        'action': transformation.transformation_type.value,
                        'description': transformation.description,
                        'performed_by': transformation.performed_by,
                        'inputs': [
                            self.entities[eid].name if eid in self.entities else eid
                            for eid in record.upstream_entities
                        ]
                    })

        return sorted(history, key=lambda x: x['timestamp'])

    def impact_analysis(self, entity_id: str) -> Dict:
        """Analyze impact of changes to an entity."""
        downstream = self.trace_downstream(entity_id)

        impact = {
            'entity': self.entities[entity_id].name if entity_id in self.entities else entity_id,
            'total_affected': len(downstream),
            'affected_by_depth': {},
            'affected_entities': downstream
        }

        for dep in downstream:
            depth = dep['depth']
            impact['affected_by_depth'][depth] = impact['affected_by_depth'].get(depth, 0) + 1

        return impact

    def validate_lineage(self) -> List[str]:
        """Validate lineage for completeness and consistency."""
        issues = []

        # Check for orphan entities (no source or transformation)
        for eid, entity in self.entities.items():
            has_lineage = any(r.entity_id == eid for r in self.lineage_records)
            if not has_lineage and entity.entity_type != 'source':
                issues.append(f"Entity '{entity.name}' has no lineage record")

        # Check for broken references
        all_entity_ids = set(self.entities.keys())
        for record in self.lineage_records:
            for upstream_id in record.upstream_entities:
                if upstream_id not in all_entity_ids:
                    issues.append(f"Lineage references unknown entity: {upstream_id}")

        # Check for circular dependencies
        for eid in self.entities:
            upstream = set()
            to_check = [eid]
            while to_check:
                current = to_check.pop()
                if current in upstream:
                    issues.append(f"Circular dependency detected involving entity: {self.entities[eid].name}")
                    break
                upstream.add(current)
                for record in self.lineage_records:
                    if record.entity_id == current:
                        to_check.extend(record.upstream_entities)

        return issues

    def generate_lineage_graph(self, entity_id: str) -> str:
        """Generate Mermaid diagram of lineage."""
        lines = ["```mermaid", "graph LR"]

        upstream = self.trace_upstream(entity_id, depth=5)
        downstream = self.trace_downstream(entity_id, depth=5)

        # Add nodes
        added_nodes = set()
        for item in upstream + downstream:
            node_id = item['entity_id'].replace('-', '_')
            if node_id not in added_nodes:
                entity = self.entities.get(item['entity_id'])
                name = entity.name if entity else item['entity_id']
                lines.append(f"    {node_id}[{name}]")
                added_nodes.add(node_id)

        # Add target node
        target_node = entity_id.replace('-', '_')
        if target_node not in added_nodes:
            entity = self.entities.get(entity_id)
            name = entity.name if entity else entity_id
            lines.append(f"    {target_node}[{name}]:::target")

        # Add edges
        for item in upstream:
            for upstream_id in item.get('upstream', []):
                from_node = upstream_id.replace('-', '_')
                to_node = item['entity_id'].replace('-', '_')
                lines.append(f"    {from_node} --> {to_node}")

        for item in downstream:
            from_node = entity_id.replace('-', '_')
            to_node = item['entity_id'].replace('-', '_')
            if to_node != from_node:
                lines.append(f"    {from_node} --> {to_node}")

        lines.append("    classDef target fill:#f96")
        lines.append("```")

        return "\
".join(lines)

    def ai_analyze_lineage(self, context: str) -> str:
        """Use SkillBoss API Hub to generate AI analysis of lineage data."""
        result = pilot({
            "type": "chat",
            "inputs": {
                "messages": [{"role": "user", "content": context}]
            },
            "prefer": "balanced"
        })
        return result["result"]["choices"][0]["message"]["content"]

    def export_lineage(self) -> Dict:
        """Export complete lineage data."""
        return {
            'project_id': self.project_id,
            'exported_at': datetime.now().isoformat(),
            'sources': {k: {
                'id': v.id,
                'name': v.name,
                'system': v.system,
                'location': v.location,
                'owner': v.owner
            } for k, v in self.sources.items()},
            'entities': {k: {
                'id': v.id,
                'name': v.name,
                'source_id': v.source_id,
                'entity_type': v.entity_type,
                'parent_entities': v.parent_entities
            } for k, v in self.entities.items()},
            'transformations': {k: {
                'id': v.id,
                'type': v.transformation_type.value,
                'description': v.description,
                'input_entities': v.input_entities,
                'output_entities': v.output_entities,
                'performed_by': v.performed_by,
                'performed_at': v.performed_at.isoformat()
            } for k, v in self.transformations.items()},
            'lineage_records': [{
                'id': r.id,
                'entity_id': r.entity_id,
                'transformation_id': r.transformation_id,
                'upstream_entities': r.upstream_entities
            } for r in self.lineage_records]
        }

    def generate_report(self) -> str:
        """Generate lineage report."""
        lines = [f"# Data Lineage Report: {self.project_id}", ""]
        lines.append(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        lines.append(f"**Sources:** {len(self.sources)}")
        lines.append(f"**Entities:** {len(self.entities)}")
        lines.append(f"**Transformations:** {len(self.transformations)}")
        lines.append("")

        # Sources
        lines.append("## Data Sources")
        for source in self.sources.values():
            lines.append(f"- **{source.name}** ({source.system})")
            lines.append(f"  - Location: {source.location}")
            lines.append(f"  - Owner: {source.owner}")
        lines.append("")

        # Validation
        issues = self.validate_lineage()
        if issues:
            lines.append("## Lineage Issues")
            for issue in issues:
                lines.append(f"- {issue}")
            lines.append("")

        # Transformation summary
        lines.append("## Transformation Summary")
        type_counts = {}
        for t in self.transformations.values():
            type_counts[t.transformation_type.value] = type_counts.get(t.transformation_type.value, 0) + 1
        for t_type, count in sorted(type_counts.items()):
            lines.append(f"- {t_type}: {count}")

        return "\
".join(lines)

Quick Start

import os, requests

SKILLBOSS_API_KEY = os.environ["SKILLBOSS_API_KEY"]
API_BASE = "https://api.heybossai.com/v1"

def pilot(body: dict) -> dict:
    r = requests.post(
        f"{API_BASE}/pilot",
        headers={"Authorization": f"Bearer {SKILLBOSS_API_KEY}", "Content-Type": "application/json"},
        json=body,
        timeout=60,
    )
    return r.json()

# Initialize tracker
tracker = ConstructionDataLineageTracker("PROJECT-001")

# Register sources
procore = tracker.register_source("Procore", "SaaS", "cloud", "PM Team")
sage = tracker.register_source("Sage 300", "Database", "on-prem", "Finance")

# Register entities
budget = tracker.register_entity("Project Budget", procore.id, "table")
costs = tracker.register_entity("Job Costs", sage.id, "table")
report = tracker.register_entity("Cost Variance Report", procore.id, "file")

# Record transformation
tracker.record_transformation(
    transformation_type=TransformationType.JOIN,
    description="Join budget and actual costs for variance calculation",
    input_entities=[budget.id, costs.id],
    output_entities=[report.id],
    logic="SELECT b.*, c.actual, (b.budget - c.actual) as variance FROM budget b JOIN costs c ON b.cost_code = c.cost_code",
    performed_by="ETL Pipeline"
)

# Trace lineage
upstream = tracker.trace_upstream(report.id)
print("Upstream lineage:", upstream)

# Generate graph
print(tracker.generate_lineage_graph(report.id))

# AI analysis via SkillBoss API Hub
lineage_data = tracker.export_lineage()
analysis_prompt = f"Analyze this construction data lineage and identify potential data quality risks:\
{lineage_data}"
result = pilot({
    "type": "chat",
    "inputs": {
        "messages": [{"role": "user", "content": analysis_prompt}]
    },
    "prefer": "balanced"
})
analysis = result["result"]["choices"][0]["message"]["content"]
print(analysis)

Resources

  • Data Governance: DAMA DMBOK lineage guidelines
  • Audit Requirements: SOX, ISO compliance
  • AI Integration: SkillBoss API Hub (https://api.heybossai.com/v1/pilot) — unified routing for all AI analysis
Usage Guidance
Before installing, verify and ask the publisher to resolve the metadata inconsistencies (registry lists no env vars, but SKILL.md requires SKILLBOSS_API_KEY). Understand that the skill will (a) read user-provided files (claw.json requests filesystem permission) and (b) send analysis payloads to https://api.heybossai.com using SKILLBOSS_API_KEY. Do not use this skill with sensitive or regulated data until you confirm the SkillBoss service's data handling, retention, and ownership policies and have control over the SKILLBOSS_API_KEY. Prefer that the publisher explicitly document the required env vars, what exact data is transmitted, and provide a privacy/security statement or option to run analysis entirely locally if needed.
Capability Analysis
Type: OpenClaw Skill Name: abe-data-lineage-tracker Version: 1.0.0 The skill provides a legitimate and well-documented implementation of a data lineage tracker for construction projects. The Python code in SKILL.md defines structured classes for managing data sources, entities, and transformations, including features like Mermaid graph generation and impact analysis. While the skill requests filesystem permissions and communicates with an external API (api.heybossai.com), these capabilities are explicitly aligned with its stated purpose of processing data files and providing AI-powered analysis. No indicators of data exfiltration, credential theft, or malicious prompt injection were identified.
Capability Tags
requires-sensitive-credentials
Capability Assessment
Purpose & Capability
The skill's stated purpose (tracking data lineage) aligns with the included Python code and the need to read project files and perform analysis. However, SKILL.md declares a required SKILLBOSS_API_KEY for calling https://api.heybossai.com/v1/pilot, while the registry metadata at the top of the package claims no required env vars. The presence of a SkillBoss model in claw.json is consistent with the code, but the public metadata omission is an incoherence.
Instruction Scope
Runtime instructions and code explicitly post analysis payloads to an external API (api.heybossai.com) using SKILLBOSS_API_KEY. The skill also expects to accept file paths and has filesystem permission in claw.json, meaning user-provided files (potentially containing sensitive data) may be read locally and transmitted externally. This behavior is consistent with the described AI-assisted analysis but is not clearly disclosed in the top-level registry metadata and has privacy implications.
Install Mechanism
There is no install spec and no code files to execute beyond instructions embedded in SKILL.md; this instruction-only design means nothing new is downloaded at install time (low install risk).
Credentials
SKILL.md requires a single secret env var (SKILLBOSS_API_KEY) used to authenticate to an external API — reasonable for an AI-hosted analysis path but inconsistent with the package metadata (which lists no required env vars). A required secret that is not declared at the registry level is a transparency issue and increases risk because the user may not realize an external credential is needed or that their data will be sent off-host.
Persistence & Privilege
The skill does not request always:true and is user-invocable (normal). claw.json requests filesystem permission which is appropriate for reading user-provided files but means the skill can access local files the user points it at; it does not appear to modify other skills or system-wide settings.
How to Use
  1. Make sure OpenClaw is installed (local or Docker)
  2. Run the install command in chat: /install abe-data-lineage-tracker
  3. After installation, invoke the skill by name or use /abe-data-lineage-tracker
  4. Provide required inputs per the skill's parameter spec and get structured output
Version History
v1.0.0
Initial release of data-lineage-tracker. - Enables tracking of data origin, transformations, and flow within construction systems. - Supports entity registration, transformation logging, and lineage tracing for audit trails and compliance. - Integrates with SkillBoss API Hub for AI-powered analysis, requiring only a single API key. - Provides tools for debugging data issues and ensuring data governance in construction projects.
Metadata
Slug abe-data-lineage-tracker
Version 1.0.0
License MIT-0
All-time Installs 0
Active Installs 0
Total Versions 1
Frequently Asked Questions

What is data-lineage-tracker?

Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues. It is an AI Agent Skill for Claude Code / OpenClaw, with 78 downloads so far.

How do I install data-lineage-tracker?

Run "/install abe-data-lineage-tracker" in the OpenClaw or Claude Code chat to install it in one step — no extra setup required.

Is data-lineage-tracker free?

Yes, data-lineage-tracker is completely free, licensed under MIT-0. You can download, install and use it at no cost.

Which platforms does data-lineage-tracker support?

data-lineage-tracker is cross-platform and runs anywhere OpenClaw / Claude Code is available (darwin, linux, win32).

Who created data-lineage-tracker?

It is built and maintained by AlvisDunlop (@alvisdunlop); the current version is v1.0.0.

💬 Comments