← 返回 Skills 市场
datadrivenconstruction

Data Lineage Tracker

作者 datadrivenconstruction · GitHub ↗ · v2.1.0
darwinlinuxwin32 ⚠ suspicious
3341
总下载
0
收藏
12
当前安装
2
版本数
在 OpenClaw 中安装
/install data-lineage-tracker
功能描述
Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues.
使用说明 (SKILL.md)

\r

Data Lineage Tracker for Construction\r

\r

Overview\r

\r Track the origin, transformations, and flow of construction data through systems. Provides audit trails for compliance, helps debug data issues, and ensures data governance.\r \r

Business Case\r

\r Construction projects require data accountability:\r

  • Audit Compliance: Know where every number came from\r
  • Issue Resolution: Trace data problems to their source\r
  • Change Impact: Understand what downstream systems are affected\r
  • Regulatory Requirements: Maintain data provenance for legal/insurance\r \r

Technical Implementation\r

\r

from dataclasses import dataclass, field\r
from typing import List, Dict, Any, Optional, Set\r
from datetime import datetime\r
from enum import Enum\r
import json\r
import hashlib\r
import uuid\r
\r
class TransformationType(Enum):\r
    EXTRACT = "extract"\r
    TRANSFORM = "transform"\r
    LOAD = "load"\r
    AGGREGATE = "aggregate"\r
    JOIN = "join"\r
    FILTER = "filter"\r
    CALCULATE = "calculate"\r
    MANUAL_EDIT = "manual_edit"\r
    IMPORT = "import"\r
    EXPORT = "export"\r
\r
@dataclass\r
class DataSource:\r
    id: str\r
    name: str\r
    system: str\r
    location: str\r
    owner: str\r
    created_at: datetime\r
\r
@dataclass\r
class TransformationStep:\r
    id: str\r
    transformation_type: TransformationType\r
    description: str\r
    input_entities: List[str]\r
    output_entities: List[str]\r
    logic: str  # SQL, Python, or description\r
    performed_by: str  # user or system\r
    performed_at: datetime\r
    parameters: Dict[str, Any] = field(default_factory=dict)\r
\r
@dataclass\r
class DataEntity:\r
    id: str\r
    name: str\r
    source_id: str\r
    entity_type: str  # table, file, field, record\r
    created_at: datetime\r
    version: int = 1\r
    checksum: Optional[str] = None\r
    parent_entities: List[str] = field(default_factory=list)\r
    metadata: Dict[str, Any] = field(default_factory=dict)\r
\r
@dataclass\r
class LineageRecord:\r
    id: str\r
    entity_id: str\r
    transformation_id: str\r
    upstream_entities: List[str]\r
    downstream_entities: List[str]\r
    recorded_at: datetime\r
\r
class ConstructionDataLineageTracker:\r
    """Track data lineage for construction data flows."""\r
\r
    def __init__(self, project_id: str):\r
        self.project_id = project_id\r
        self.sources: Dict[str, DataSource] = {}\r
        self.entities: Dict[str, DataEntity] = {}\r
        self.transformations: Dict[str, TransformationStep] = {}\r
        self.lineage_records: List[LineageRecord] = []\r
\r
    def register_source(self, name: str, system: str, location: str, owner: str) -> DataSource:\r
        """Register a new data source."""\r
        source = DataSource(\r
            id=f"SRC-{uuid.uuid4().hex[:8]}",\r
            name=name,\r
            system=system,\r
            location=location,\r
            owner=owner,\r
            created_at=datetime.now()\r
        )\r
        self.sources[source.id] = source\r
        return source\r
\r
    def register_entity(self, name: str, source_id: str, entity_type: str,\r
                       parent_entities: List[str] = None,\r
                       metadata: Dict = None) -> DataEntity:\r
        """Register a data entity (table, file, field)."""\r
        entity = DataEntity(\r
            id=f"ENT-{uuid.uuid4().hex[:8]}",\r
            name=name,\r
            source_id=source_id,\r
            entity_type=entity_type,\r
            created_at=datetime.now(),\r
            parent_entities=parent_entities or [],\r
            metadata=metadata or {}\r
        )\r
        self.entities[entity.id] = entity\r
        return entity\r
\r
    def calculate_checksum(self, data: Any) -> str:\r
        """Calculate checksum for data verification."""\r
        if isinstance(data, str):\r
            content = data\r
        else:\r
            content = json.dumps(data, sort_keys=True, default=str)\r
        return hashlib.sha256(content.encode()).hexdigest()[:16]\r
\r
    def record_transformation(self,\r
                             transformation_type: TransformationType,\r
                             description: str,\r
                             input_entities: List[str],\r
                             output_entities: List[str],\r
                             logic: str,\r
                             performed_by: str,\r
                             parameters: Dict = None) -> TransformationStep:\r
        """Record a data transformation."""\r
        transformation = TransformationStep(\r
            id=f"TRF-{uuid.uuid4().hex[:8]}",\r
            transformation_type=transformation_type,\r
            description=description,\r
            input_entities=input_entities,\r
            output_entities=output_entities,\r
            logic=logic,\r
            performed_by=performed_by,\r
            performed_at=datetime.now(),\r
            parameters=parameters or {}\r
        )\r
        self.transformations[transformation.id] = transformation\r
\r
        # Create lineage records\r
        for output_id in output_entities:\r
            record = LineageRecord(\r
                id=f"LIN-{uuid.uuid4().hex[:8]}",\r
                entity_id=output_id,\r
                transformation_id=transformation.id,\r
                upstream_entities=input_entities,\r
                downstream_entities=[],\r
                recorded_at=datetime.now()\r
            )\r
            self.lineage_records.append(record)\r
\r
            # Update downstream references for input entities\r
            for input_id in input_entities:\r
                for existing_record in self.lineage_records:\r
                    if existing_record.entity_id == input_id:\r
                        existing_record.downstream_entities.append(output_id)\r
\r
        return transformation\r
\r
    def trace_upstream(self, entity_id: str, depth: int = None) -> List[Dict]:\r
        """Trace all upstream sources of an entity."""\r
        visited = set()\r
        lineage = []\r
\r
        def trace(eid: str, current_depth: int):\r
            if eid in visited:\r
                return\r
            if depth is not None and current_depth > depth:\r
                return\r
\r
            visited.add(eid)\r
\r
            entity = self.entities.get(eid)\r
            if not entity:\r
                return\r
\r
            # Find transformations that produced this entity\r
            for record in self.lineage_records:\r
                if record.entity_id == eid:\r
                    transformation = self.transformations.get(record.transformation_id)\r
                    if transformation:\r
                        lineage.append({\r
                            'entity': entity.name,\r
                            'entity_id': eid,\r
                            'depth': current_depth,\r
                            'transformation': transformation.description,\r
                            'transformation_type': transformation.transformation_type.value,\r
                            'performed_at': transformation.performed_at.isoformat(),\r
                            'performed_by': transformation.performed_by,\r
                            'upstream': record.upstream_entities\r
                        })\r
\r
                        for upstream_id in record.upstream_entities:\r
                            trace(upstream_id, current_depth + 1)\r
\r
        trace(entity_id, 0)\r
        return sorted(lineage, key=lambda x: x['depth'])\r
\r
    def trace_downstream(self, entity_id: str, depth: int = None) -> List[Dict]:\r
        """Trace all downstream dependencies of an entity."""\r
        visited = set()\r
        dependencies = []\r
\r
        def trace(eid: str, current_depth: int):\r
            if eid in visited:\r
                return\r
            if depth is not None and current_depth > depth:\r
                return\r
\r
            visited.add(eid)\r
\r
            entity = self.entities.get(eid)\r
            if not entity:\r
                return\r
\r
            # Find entities that use this entity\r
            for record in self.lineage_records:\r
                if eid in record.upstream_entities:\r
                    transformation = self.transformations.get(record.transformation_id)\r
                    if transformation:\r
                        dependencies.append({\r
                            'entity': self.entities[record.entity_id].name if record.entity_id in self.entities else record.entity_id,\r
                            'entity_id': record.entity_id,\r
                            'depth': current_depth,\r
                            'transformation': transformation.description,\r
                            'transformation_type': transformation.transformation_type.value\r
                        })\r
\r
                        trace(record.entity_id, current_depth + 1)\r
\r
        trace(entity_id, 0)\r
        return sorted(dependencies, key=lambda x: x['depth'])\r
\r
    def get_entity_history(self, entity_id: str) -> List[Dict]:\r
        """Get complete history of changes to an entity."""\r
        history = []\r
\r
        for record in self.lineage_records:\r
            if record.entity_id == entity_id:\r
                transformation = self.transformations.get(record.transformation_id)\r
                if transformation:\r
                    history.append({\r
                        'timestamp': transformation.performed_at.isoformat(),\r
                        'action': transformation.transformation_type.value,\r
                        'description': transformation.description,\r
                        'performed_by': transformation.performed_by,\r
                        'inputs': [\r
                            self.entities[eid].name if eid in self.entities else eid\r
                            for eid in record.upstream_entities\r
                        ]\r
                    })\r
\r
        return sorted(history, key=lambda x: x['timestamp'])\r
\r
    def impact_analysis(self, entity_id: str) -> Dict:\r
        """Analyze impact of changes to an entity."""\r
        downstream = self.trace_downstream(entity_id)\r
\r
        impact = {\r
            'entity': self.entities[entity_id].name if entity_id in self.entities else entity_id,\r
            'total_affected': len(downstream),\r
            'affected_by_depth': {},\r
            'affected_entities': downstream\r
        }\r
\r
        for dep in downstream:\r
            depth = dep['depth']\r
            impact['affected_by_depth'][depth] = impact['affected_by_depth'].get(depth, 0) + 1\r
\r
        return impact\r
\r
    def validate_lineage(self) -> List[str]:\r
        """Validate lineage for completeness and consistency."""\r
        issues = []\r
\r
        # Check for orphan entities (no source or transformation)\r
        for eid, entity in self.entities.items():\r
            has_lineage = any(r.entity_id == eid for r in self.lineage_records)\r
            if not has_lineage and entity.entity_type != 'source':\r
                issues.append(f"Entity '{entity.name}' has no lineage record")\r
\r
        # Check for broken references\r
        all_entity_ids = set(self.entities.keys())\r
        for record in self.lineage_records:\r
            for upstream_id in record.upstream_entities:\r
                if upstream_id not in all_entity_ids:\r
                    issues.append(f"Lineage references unknown entity: {upstream_id}")\r
\r
        # Check for circular dependencies\r
        for eid in self.entities:\r
            upstream = set()\r
            to_check = [eid]\r
            while to_check:\r
                current = to_check.pop()\r
                if current in upstream:\r
                    issues.append(f"Circular dependency detected involving entity: {self.entities[eid].name}")\r
                    break\r
                upstream.add(current)\r
                for record in self.lineage_records:\r
                    if record.entity_id == current:\r
                        to_check.extend(record.upstream_entities)\r
\r
        return issues\r
\r
    def generate_lineage_graph(self, entity_id: str) -> str:\r
        """Generate Mermaid diagram of lineage."""\r
        lines = ["```mermaid", "graph LR"]\r
\r
        upstream = self.trace_upstream(entity_id, depth=5)\r
        downstream = self.trace_downstream(entity_id, depth=5)\r
\r
        # Add nodes\r
        added_nodes = set()\r
        for item in upstream + downstream:\r
            node_id = item['entity_id'].replace('-', '_')\r
            if node_id not in added_nodes:\r
                entity = self.entities.get(item['entity_id'])\r
                name = entity.name if entity else item['entity_id']\r
                lines.append(f"    {node_id}[{name}]")\r
                added_nodes.add(node_id)\r
\r
        # Add target node\r
        target_node = entity_id.replace('-', '_')\r
        if target_node not in added_nodes:\r
            entity = self.entities.get(entity_id)\r
            name = entity.name if entity else entity_id\r
            lines.append(f"    {target_node}[{name}]:::target")\r
\r
        # Add edges\r
        for item in upstream:\r
            for upstream_id in item.get('upstream', []):\r
                from_node = upstream_id.replace('-', '_')\r
                to_node = item['entity_id'].replace('-', '_')\r
                lines.append(f"    {from_node} --> {to_node}")\r
\r
        for item in downstream:\r
            from_node = entity_id.replace('-', '_')\r
            to_node = item['entity_id'].replace('-', '_')\r
            if to_node != from_node:\r
                lines.append(f"    {from_node} --> {to_node}")\r
\r
        lines.append("    classDef target fill:#f96")\r
        lines.append("```")\r
\r
        return "\
".join(lines)\r
\r
    def export_lineage(self) -> Dict:\r
        """Export complete lineage data."""\r
        return {\r
            'project_id': self.project_id,\r
            'exported_at': datetime.now().isoformat(),\r
            'sources': {k: {\r
                'id': v.id,\r
                'name': v.name,\r
                'system': v.system,\r
                'location': v.location,\r
                'owner': v.owner\r
            } for k, v in self.sources.items()},\r
            'entities': {k: {\r
                'id': v.id,\r
                'name': v.name,\r
                'source_id': v.source_id,\r
                'entity_type': v.entity_type,\r
                'parent_entities': v.parent_entities\r
            } for k, v in self.entities.items()},\r
            'transformations': {k: {\r
                'id': v.id,\r
                'type': v.transformation_type.value,\r
                'description': v.description,\r
                'input_entities': v.input_entities,\r
                'output_entities': v.output_entities,\r
                'performed_by': v.performed_by,\r
                'performed_at': v.performed_at.isoformat()\r
            } for k, v in self.transformations.items()},\r
            'lineage_records': [{\r
                'id': r.id,\r
                'entity_id': r.entity_id,\r
                'transformation_id': r.transformation_id,\r
                'upstream_entities': r.upstream_entities\r
            } for r in self.lineage_records]\r
        }\r
\r
    def generate_report(self) -> str:\r
        """Generate lineage report."""\r
        lines = [f"# Data Lineage Report: {self.project_id}", ""]\r
        lines.append(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M')}")\r
        lines.append(f"**Sources:** {len(self.sources)}")\r
        lines.append(f"**Entities:** {len(self.entities)}")\r
        lines.append(f"**Transformations:** {len(self.transformations)}")\r
        lines.append("")\r
\r
        # Sources\r
        lines.append("## Data Sources")\r
        for source in self.sources.values():\r
            lines.append(f"- **{source.name}** ({source.system})")\r
            lines.append(f"  - Location: {source.location}")\r
            lines.append(f"  - Owner: {source.owner}")\r
        lines.append("")\r
\r
        # Validation\r
        issues = self.validate_lineage()\r
        if issues:\r
            lines.append("## Lineage Issues")\r
            for issue in issues:\r
                lines.append(f"- ⚠️ {issue}")\r
            lines.append("")\r
\r
        # Transformation summary\r
        lines.append("## Transformation Summary")\r
        type_counts = {}\r
        for t in self.transformations.values():\r
            type_counts[t.transformation_type.value] = type_counts.get(t.transformation_type.value, 0) + 1\r
        for t_type, count in sorted(type_counts.items()):\r
            lines.append(f"- {t_type}: {count}")\r
\r
        return "\
".join(lines)\r
```\r
\r
## Quick Start\r
\r
```python\r
# Initialize tracker\r
tracker = ConstructionDataLineageTracker("PROJECT-001")\r
\r
# Register sources\r
procore = tracker.register_source("Procore", "SaaS", "cloud", "PM Team")\r
sage = tracker.register_source("Sage 300", "Database", "on-prem", "Finance")\r
\r
# Register entities\r
budget = tracker.register_entity("Project Budget", procore.id, "table")\r
costs = tracker.register_entity("Job Costs", sage.id, "table")\r
report = tracker.register_entity("Cost Variance Report", procore.id, "file")\r
\r
# Record transformation\r
tracker.record_transformation(\r
    transformation_type=TransformationType.JOIN,\r
    description="Join budget and actual costs for variance calculation",\r
    input_entities=[budget.id, costs.id],\r
    output_entities=[report.id],\r
    logic="SELECT b.*, c.actual, (b.budget - c.actual) as variance FROM budget b JOIN costs c ON b.cost_code = c.cost_code",\r
    performed_by="ETL Pipeline"\r
)\r
\r
# Trace lineage\r
upstream = tracker.trace_upstream(report.id)\r
print("Upstream lineage:", upstream)\r
\r
# Generate graph\r
print(tracker.generate_lineage_graph(report.id))\r
\r
# Export for audit\r
lineage_data = tracker.export_lineage()\r
```\r
\r
## Resources\r
\r
- **Data Governance**: DAMA DMBOK lineage guidelines\r
- **Audit Requirements**: SOX, ISO compliance\r
安全使用建议
This skill appears to do what it claims (track and record data lineage for construction data) and does not ask for credentials or external installs. Before installing: 1) Confirm you trust the publisher/homepage (source is listed as 'unknown'); 2) Be mindful that the skill requests filesystem access (it will read files you point it to) — avoid giving it access to sensitive system files or credentials; 3) Note the small metadata inconsistencies (claw.json shows version 2.0.0 while registry lists 2.1.0); consider testing the skill in a sandbox or with non-sensitive sample data first; 4) If you plan to let the agent run the skill autonomously, restrict its scope (only allow access to project folders) and monitor activity; 5) If you need stronger assurance, request the full SKILL.md and instructions be reviewed for any hidden network calls or explicit upload steps (none are present in the files reviewed).
功能分析
Type: OpenClaw Skill Name: data-lineage-tracker Version: 2.1.0 The skill's core Python logic in SKILL.md is benign, implementing an in-memory data lineage tracker without file system, network, or shell operations. However, the claw.json manifest declares 'filesystem' permissions which are not utilized by the provided Python code. This over-permission creates a potential vulnerability, as an AI agent, if prompted, could attempt file system operations despite the skill's current implementation not requiring it. There is no evidence of intentional malicious behavior like data exfiltration or persistence.
能力评估
Purpose & Capability
Name/description (data-lineage for construction) align with the provided Python-based classes and methods in SKILL.md. Requiring python3 is reasonable for the included Python examples and for processing CSV/Excel/JSON data. The manifest's 'filesystem' permission is consistent with reading user-provided files.
Instruction Scope
Instructions focus on processing data supplied by the user (CSV/Excel/JSON or provided file paths) and reference the Python code in SKILL.md. This stays within the stated purpose. However, the skill will read files from the filesystem (claw.json lists filesystem permission) — the instructions rely on user-supplied file paths but do not technically prevent reading other files if the agent is given broad discretion. No instructions are present that access external endpoints or request unrelated credentials.
Install Mechanism
Instruction-only skill with no install spec or runtime downloads. That lowers risk: nothing is written to disk by an installer. The python3 binary requirement is proportional to the provided code samples.
Credentials
The skill does not request environment variables, secrets, or external API keys. No unexpected credentials or config paths are declared. The lack of required env vars is coherent for an offline/local data processing helper.
Persistence & Privilege
The skill is not always-enabled and is user-invocable (normal). The manifest declares a 'filesystem' permission, which is reasonable for a file-processing tool but does increase blast radius: if the agent is allowed to run the skill autonomously, it could access files on disk. There is no evidence the skill modifies other skills or system-wide settings.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install data-lineage-tracker
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /data-lineage-tracker 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v2.1.0
Version 2.1.0 of data-lineage-tracker - Introduces detailed documentation in SKILL.md, including business case, technical overview, and code snippets. - Extensively describes the skill’s purpose, focusing on tracking data origin, transformations, and flow for improved compliance and debugging in construction. - Outlines main use cases: audit trails, issue resolution, change impact analysis, and meeting regulatory requirements. - Provides example Python classes and methods for implementing data lineage tracking, covering data sources, transformations, and lineage record-keeping.
v1.0.0
Initial release of Data Lineage Tracker for construction data systems. - Enables tracking of data origin, transformations, and flow across systems. - Supports audit trails and compliance through recorded data provenance. - Assists with debugging and impact analysis of data issues. - Includes core classes for sources, entities, transformations, and lineage records. - Provides upstream and downstream tracing functions for data entities.
元数据
Slug data-lineage-tracker
版本 2.1.0
许可证
累计安装 14
当前安装数 12
历史版本数 2
常见问题

Data Lineage Tracker 是什么?

Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 3341 次。

如何安装 Data Lineage Tracker?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install data-lineage-tracker」即可一键安装,无需额外配置。

Data Lineage Tracker 是免费的吗?

是的,Data Lineage Tracker 完全免费(开源免费),可自由下载、安装和使用。

Data Lineage Tracker 支持哪些平台?

Data Lineage Tracker 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(darwin, linux, win32)。

谁开发了 Data Lineage Tracker?

由 datadrivenconstruction(@datadrivenconstruction)开发并维护,当前版本 v2.1.0。

💬 留言讨论