← 返回 Skills 市场
datadrivenconstruction

Data Source Audit

作者 datadrivenconstruction · GitHub ↗ · v2.1.0
darwinlinuxwin32 ✓ 安全检测通过
1903
总下载
0
收藏
9
当前安装
2
版本数
在 OpenClaw 中安装
/install data-source-audit
功能描述
Comprehensive audit of all construction data sources and systems. Map data flows, identify silos, assess quality, and create integration roadmap.
使用说明 (SKILL.md)

\r

Data Source Audit for Construction\r

\r

Overview\r

\r Perform comprehensive audits of construction data sources to identify silos, map data flows, assess quality, and plan integration strategies. Essential for digital transformation and data-driven construction initiatives.\r \r

Business Case\r

\r Construction organizations typically have 10-50+ data sources:\r

  • Project management systems\r
  • Estimating software\r
  • Scheduling tools\r
  • Accounting/ERP systems\r
  • BIM platforms\r
  • Document management systems\r
  • Field apps\r
  • Spreadsheets\r \r

Note: This skill is vendor-agnostic and works with any data source. Product names mentioned elsewhere in examples are trademarks of their respective owners.\r \r This skill helps:\r

  • Discover all data sources\r
  • Map data flows and dependencies\r
  • Identify integration opportunities\r
  • Prioritize data improvement efforts\r \r

Technical Implementation\r

\r

from dataclasses import dataclass, field\r
from typing import List, Dict, Any, Optional, Set\r
from enum import Enum\r
from datetime import datetime\r
import pandas as pd\r
import json\r
\r
class DataSourceType(Enum):\r
    DATABASE = "database"\r
    API = "api"\r
    FILE_SHARE = "file_share"\r
    CLOUD_APP = "cloud_app"\r
    SPREADSHEET = "spreadsheet"\r
    LEGACY_SYSTEM = "legacy_system"\r
    IOT_SENSOR = "iot_sensor"\r
    MANUAL_ENTRY = "manual_entry"\r
\r
class DataDomain(Enum):\r
    COST = "cost"\r
    SCHEDULE = "schedule"\r
    BIM = "bim"\r
    DOCUMENT = "document"\r
    FIELD = "field"\r
    SAFETY = "safety"\r
    QUALITY = "quality"\r
    HR = "hr"\r
    ACCOUNTING = "accounting"\r
    PROCUREMENT = "procurement"\r
\r
@dataclass\r
class DataSource:\r
    name: str\r
    source_type: DataSourceType\r
    domains: List[DataDomain]\r
    owner: str\r
    department: str\r
    description: str\r
    # Technical details\r
    technology: str\r
    location: str  # cloud, on-prem, hybrid\r
    access_method: str  # API, ODBC, file export, manual\r
    # Data characteristics\r
    update_frequency: str  # real-time, daily, weekly, monthly, ad-hoc\r
    data_volume: str  # small, medium, large\r
    retention_period: str\r
    # Quality metrics\r
    completeness_score: float = 0.0\r
    accuracy_score: float = 0.0\r
    timeliness_score: float = 0.0\r
    # Integration status\r
    integrations: List[str] = field(default_factory=list)\r
    is_master: bool = False  # Is this the master source for any entity?\r
    master_for: List[str] = field(default_factory=list)\r
    # Issues\r
    known_issues: List[str] = field(default_factory=list)\r
    # Metadata\r
    last_audit_date: Optional[datetime] = None\r
    audit_notes: str = ""\r
\r
@dataclass\r
class DataFlow:\r
    source: str\r
    target: str\r
    flow_type: str  # push, pull, bidirectional, manual\r
    frequency: str\r
    entities: List[str]  # What data entities flow\r
    transformation: str  # none, simple, complex\r
    status: str  # active, planned, deprecated\r
\r
@dataclass\r
class DataSilo:\r
    name: str\r
    sources: List[str]\r
    impact: str  # high, medium, low\r
    description: str\r
    resolution_options: List[str]\r
\r
class DataSourceAuditor:\r
    """Audit and analyze construction data sources."""\r
\r
    def __init__(self):\r
        self.sources: Dict[str, DataSource] = {}\r
        self.flows: List[DataFlow] = []\r
        self.silos: List[DataSilo] = []\r
\r
    def add_source(self, source: DataSource):\r
        """Register a data source."""\r
        self.sources[source.name] = source\r
\r
    def add_flow(self, flow: DataFlow):\r
        """Register a data flow between sources."""\r
        self.flows.append(flow)\r
\r
    def discover_sources_from_survey(self, survey_responses: List[Dict]) -> List[DataSource]:\r
        """Create data sources from survey responses."""\r
        sources = []\r
\r
        for response in survey_responses:\r
            source = DataSource(\r
                name=response['system_name'],\r
                source_type=DataSourceType(response['type']),\r
                domains=[DataDomain(d) for d in response['domains']],\r
                owner=response['owner'],\r
                department=response['department'],\r
                description=response['description'],\r
                technology=response['technology'],\r
                location=response['location'],\r
                access_method=response['access_method'],\r
                update_frequency=response['update_frequency'],\r
                data_volume=response['data_volume'],\r
                retention_period=response['retention_period'],\r
            )\r
            sources.append(source)\r
            self.add_source(source)\r
\r
        return sources\r
\r
    def identify_silos(self) -> List[DataSilo]:\r
        """Identify data silos based on integration analysis."""\r
        silos = []\r
\r
        # Find sources with no integrations\r
        isolated_sources = [\r
            name for name, source in self.sources.items()\r
            if not source.integrations and source.source_type != DataSourceType.MANUAL_ENTRY\r
        ]\r
\r
        if isolated_sources:\r
            silos.append(DataSilo(\r
                name="Isolated Systems",\r
                sources=isolated_sources,\r
                impact="high",\r
                description="Systems with no integrations, requiring manual data transfer",\r
                resolution_options=[\r
                    "Implement API integration",\r
                    "Set up automated file exports",\r
                    "Migrate to integrated platform"\r
                ]\r
            ))\r
\r
        # Find duplicate data domains without master\r
        domain_sources: Dict[DataDomain, List[str]] = {}\r
        for name, source in self.sources.items():\r
            for domain in source.domains:\r
                if domain not in domain_sources:\r
                    domain_sources[domain] = []\r
                domain_sources[domain].append(name)\r
\r
        for domain, sources in domain_sources.items():\r
            if len(sources) > 1:\r
                # Check if any is designated master\r
                masters = [s for s in sources if self.sources[s].is_master]\r
                if not masters:\r
                    silos.append(DataSilo(\r
                        name=f"No Master for {domain.value}",\r
                        sources=sources,\r
                        impact="medium",\r
                        description=f"Multiple sources for {domain.value} data without designated master",\r
                        resolution_options=[\r
                            "Designate master data source",\r
                            "Implement MDM solution",\r
                            "Create data reconciliation process"\r
                        ]\r
                    ))\r
\r
        # Find one-way flows that should be bidirectional\r
        flow_pairs = {}\r
        for flow in self.flows:\r
            key = tuple(sorted([flow.source, flow.target]))\r
            if key not in flow_pairs:\r
                flow_pairs[key] = []\r
            flow_pairs[key].append(flow)\r
\r
        for (s1, s2), flows in flow_pairs.items():\r
            if len(flows) == 1 and flows[0].flow_type != 'bidirectional':\r
                # Check if bidirectional would make sense\r
                s1_domains = set(self.sources[s1].domains)\r
                s2_domains = set(self.sources[s2].domains)\r
                if s1_domains & s2_domains:  # Overlapping domains\r
                    silos.append(DataSilo(\r
                        name=f"One-way flow: {s1} -> {s2}",\r
                        sources=[s1, s2],\r
                        impact="low",\r
                        description="Data flows one direction only between systems with overlapping domains",\r
                        resolution_options=[\r
                            "Evaluate need for bidirectional sync",\r
                            "Implement change data capture"\r
                        ]\r
                    ))\r
\r
        self.silos = silos\r
        return silos\r
\r
    def assess_source_quality(self, source_name: str, sample_data: pd.DataFrame) -> Dict[str, float]:\r
        """Assess data quality for a source based on sample data."""\r
        if source_name not in self.sources:\r
            raise ValueError(f"Unknown source: {source_name}")\r
\r
        scores = {}\r
\r
        # Completeness: % of non-null values\r
        completeness = 1 - (sample_data.isnull().sum().sum() / sample_data.size)\r
        scores['completeness'] = completeness\r
\r
        # Uniqueness: % of unique rows (for key columns)\r
        if len(sample_data) > 0:\r
            uniqueness = len(sample_data.drop_duplicates()) / len(sample_data)\r
        else:\r
            uniqueness = 1.0\r
        scores['uniqueness'] = uniqueness\r
\r
        # Validity: Basic format checks (simplified)\r
        validity_checks = 0\r
        total_checks = 0\r
\r
        for col in sample_data.columns:\r
            if 'date' in col.lower():\r
                total_checks += 1\r
                try:\r
                    pd.to_datetime(sample_data[col], errors='raise')\r
                    validity_checks += 1\r
                except:\r
                    pass\r
            if 'email' in col.lower():\r
                total_checks += 1\r
                valid_emails = sample_data[col].str.contains(r'@.*\.', na=False).sum()\r
                if valid_emails / len(sample_data) > 0.9:\r
                    validity_checks += 1\r
\r
        scores['validity'] = validity_checks / total_checks if total_checks > 0 else 1.0\r
\r
        # Update source with scores\r
        self.sources[source_name].completeness_score = scores['completeness']\r
        self.sources[source_name].accuracy_score = scores['validity']\r
\r
        return scores\r
\r
    def create_data_catalog(self) -> pd.DataFrame:\r
        """Create a data catalog from all sources."""\r
        catalog_entries = []\r
\r
        for name, source in self.sources.items():\r
            entry = {\r
                'Source Name': name,\r
                'Type': source.source_type.value,\r
                'Domains': ', '.join(d.value for d in source.domains),\r
                'Owner': source.owner,\r
                'Department': source.department,\r
                'Technology': source.technology,\r
                'Location': source.location,\r
                'Access Method': source.access_method,\r
                'Update Frequency': source.update_frequency,\r
                'Data Volume': source.data_volume,\r
                'Integrations': len(source.integrations),\r
                'Is Master': 'Yes' if source.is_master else 'No',\r
                'Quality Score': (source.completeness_score + source.accuracy_score) / 2,\r
                'Known Issues': len(source.known_issues),\r
            }\r
            catalog_entries.append(entry)\r
\r
        return pd.DataFrame(catalog_entries)\r
\r
    def generate_integration_matrix(self) -> pd.DataFrame:\r
        """Generate integration matrix showing connections between sources."""\r
        source_names = list(self.sources.keys())\r
        matrix = pd.DataFrame(\r
            index=source_names,\r
            columns=source_names,\r
            data=''\r
        )\r
\r
        for flow in self.flows:\r
            if flow.source in source_names and flow.target in source_names:\r
                current = matrix.loc[flow.source, flow.target]\r
                symbol = '→' if flow.flow_type == 'push' else '←' if flow.flow_type == 'pull' else '↔'\r
                matrix.loc[flow.source, flow.target] = f"{current}{symbol}" if current else symbol\r
\r
        return matrix\r
\r
    def calculate_integration_score(self) -> Dict[str, float]:\r
        """Calculate overall integration score and breakdown."""\r
        if not self.sources:\r
            return {'overall': 0.0}\r
\r
        scores = {}\r
\r
        # Coverage: % of sources with at least one integration\r
        integrated = sum(1 for s in self.sources.values() if s.integrations)\r
        scores['coverage'] = integrated / len(self.sources)\r
\r
        # Master data: % of domains with designated master\r
        domains_with_master = set()\r
        for source in self.sources.values():\r
            if source.is_master:\r
                domains_with_master.update(source.master_for)\r
\r
        all_domains = set()\r
        for source in self.sources.values():\r
            all_domains.update(d.value for d in source.domains)\r
\r
        scores['master_data'] = len(domains_with_master) / len(all_domains) if all_domains else 1.0\r
\r
        # Data quality average\r
        quality_scores = [\r
            (s.completeness_score + s.accuracy_score) / 2\r
            for s in self.sources.values()\r
            if s.completeness_score > 0 or s.accuracy_score > 0\r
        ]\r
        scores['quality'] = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0\r
\r
        # Silo impact\r
        high_impact_silos = sum(1 for s in self.silos if s.impact == 'high')\r
        scores['silo_risk'] = 1 - (high_impact_silos * 0.2)  # Each high-impact silo reduces score\r
\r
        # Overall\r
        scores['overall'] = (\r
            scores['coverage'] * 0.3 +\r
            scores['master_data'] * 0.25 +\r
            scores['quality'] * 0.25 +\r
            scores['silo_risk'] * 0.2\r
        )\r
\r
        return scores\r
\r
    def generate_audit_report(self) -> str:\r
        """Generate comprehensive audit report."""\r
        report = ["# Data Source Audit Report", ""]\r
        report.append(f"**Audit Date:** {datetime.now().strftime('%Y-%m-%d')}")\r
        report.append(f"**Total Sources:** {len(self.sources)}")\r
        report.append(f"**Total Data Flows:** {len(self.flows)}")\r
        report.append("")\r
\r
        # Integration Score\r
        scores = self.calculate_integration_score()\r
        report.append("## Integration Maturity Score")\r
        report.append(f"**Overall Score:** {scores['overall']:.1%}")\r
        report.append(f"- Coverage: {scores['coverage']:.1%}")\r
        report.append(f"- Master Data: {scores['master_data']:.1%}")\r
        report.append(f"- Data Quality: {scores['quality']:.1%}")\r
        report.append(f"- Silo Risk: {scores['silo_risk']:.1%}")\r
        report.append("")\r
\r
        # Sources by Type\r
        report.append("## Sources by Type")\r
        by_type = {}\r
        for source in self.sources.values():\r
            t = source.source_type.value\r
            by_type[t] = by_type.get(t, 0) + 1\r
        for t, count in sorted(by_type.items(), key=lambda x: -x[1]):\r
            report.append(f"- {t}: {count}")\r
        report.append("")\r
\r
        # Data Silos\r
        report.append("## Identified Data Silos")\r
        if self.silos:\r
            for silo in self.silos:\r
                report.append(f"\
### {silo.name}")\r
                report.append(f"**Impact:** {silo.impact}")\r
                report.append(f"**Sources:** {', '.join(silo.sources)}")\r
                report.append(f"**Description:** {silo.description}")\r
                report.append("**Resolution Options:**")\r
                for opt in silo.resolution_options:\r
                    report.append(f"- {opt}")\r
        else:\r
            report.append("No significant data silos identified.")\r
        report.append("")\r
\r
        # Recommendations\r
        report.append("## Recommendations")\r
        recommendations = self._generate_recommendations()\r
        for i, rec in enumerate(recommendations, 1):\r
            report.append(f"{i}. {rec}")\r
\r
        return "\
".join(report)\r
\r
    def _generate_recommendations(self) -> List[str]:\r
        """Generate recommendations based on audit findings."""\r
        recommendations = []\r
\r
        scores = self.calculate_integration_score()\r
\r
        if scores['coverage'] \x3C 0.7:\r
            recommendations.append(\r
                "Increase integration coverage - over 30% of systems are isolated. "\r
                "Prioritize connecting high-value data sources."\r
            )\r
\r
        if scores['master_data'] \x3C 0.5:\r
            recommendations.append(\r
                "Implement Master Data Management - designate authoritative sources "\r
                "for key entities (projects, vendors, employees, cost codes)."\r
            )\r
\r
        if scores['quality'] \x3C 0.7:\r
            recommendations.append(\r
                "Improve data quality - implement validation rules at data entry points "\r
                "and automated quality monitoring."\r
            )\r
\r
        # Check for spreadsheet dependency\r
        spreadsheets = [s for s in self.sources.values()\r
                       if s.source_type == DataSourceType.SPREADSHEET]\r
        if len(spreadsheets) > 3:\r
            recommendations.append(\r
                f"Reduce spreadsheet dependency - {len(spreadsheets)} spreadsheet-based "\r
                "data sources identified. Migrate critical data to proper databases."\r
            )\r
\r
        # Check for legacy systems\r
        legacy = [s for s in self.sources.values()\r
                 if s.source_type == DataSourceType.LEGACY_SYSTEM]\r
        if legacy:\r
            recommendations.append(\r
                f"Plan legacy system migration - {len(legacy)} legacy systems identified. "\r
                "Create modernization roadmap."\r
            )\r
\r
        return recommendations\r
```\r
\r
## Quick Start\r
\r
```python\r
# Initialize auditor\r
auditor = DataSourceAuditor()\r
\r
# Add known sources\r
auditor.add_source(DataSource(\r
    name="Procore",\r
    source_type=DataSourceType.CLOUD_APP,\r
    domains=[DataDomain.DOCUMENT, DataDomain.FIELD, DataDomain.SCHEDULE],\r
    owner="Project Controls",\r
    department="Operations",\r
    description="Primary project management platform",\r
    technology="SaaS",\r
    location="cloud",\r
    access_method="API",\r
    update_frequency="real-time",\r
    data_volume="large",\r
    retention_period="7 years",\r
    integrations=["Sage 300", "Primavera P6"],\r
    is_master=True,\r
    master_for=["projects", "documents"]\r
))\r
\r
auditor.add_source(DataSource(\r
    name="Sage 300",\r
    source_type=DataSourceType.DATABASE,\r
    domains=[DataDomain.COST, DataDomain.ACCOUNTING],\r
    owner="Finance",\r
    department="Accounting",\r
    description="ERP and job costing system",\r
    technology="SQL Server",\r
    location="on-prem",\r
    access_method="ODBC",\r
    update_frequency="daily",\r
    data_volume="medium",\r
    retention_period="10 years",\r
    is_master=True,\r
    master_for=["costs", "vendors", "invoices"]\r
))\r
\r
# Add data flows\r
auditor.add_flow(DataFlow(\r
    source="Procore",\r
    target="Sage 300",\r
    flow_type="push",\r
    frequency="daily",\r
    entities=["change_orders", "budget_changes"],\r
    transformation="simple",\r
    status="active"\r
))\r
\r
# Identify silos\r
silos = auditor.identify_silos()\r
\r
# Generate report\r
report = auditor.generate_audit_report()\r
print(report)\r
\r
# Create data catalog\r
catalog = auditor.create_data_catalog()\r
catalog.to_excel("data_catalog.xlsx", index=False)\r
```\r
\r
## Survey Template\r
\r
Use this survey to discover data sources across the organization:\r
\r
```yaml\r
System Survey:\r
  - system_name: "What is the name of this system?"\r
  - type: "What type of system is it?"\r
    options: [database, api, file_share, cloud_app, spreadsheet, legacy_system]\r
  - domains: "What types of data does it contain?"\r
    options: [cost, schedule, bim, document, field, safety, quality, hr, accounting]\r
  - owner: "Who is the system owner?"\r
  - department: "Which department uses this system?"\r
  - technology: "What technology/platform is it built on?"\r
  - location: "Where is the system hosted?"\r
    options: [cloud, on-prem, hybrid]\r
  - access_method: "How can data be accessed?"\r
    options: [api, odbc, file_export, manual]\r
  - update_frequency: "How often is data updated?"\r
    options: [real-time, daily, weekly, monthly, ad-hoc]\r
  - integrations: "What other systems does it connect to?"\r
```\r
\r
## Resources\r
\r
- **DAMA DMBOK**: Data Management Body of Knowledge\r
- **Data Governance Frameworks**: DCAM, EDM Council\r
- **Integration Patterns**: Enterprise Integration Patterns book\r
安全使用建议
This skill appears internally consistent for performing audits on construction data you provide. Before using it: (1) confirm you trust the skill source (homepage is provided but package owner is not verified); (2) avoid supplying production credentials unless strictly necessary — if you do provide API/database credentials, treat them as sensitive; (3) run any suggested Python snippets in an isolated environment if you plan to execute code locally; (4) if the agent requests additional files, verify those are intentionally supplied. If you need the skill to connect to live systems, prefer creating read‑only accounts and document exactly what access is granted.
功能分析
Type: OpenClaw Skill Name: data-source-audit Version: 2.1.0 The OpenClaw AgentSkill 'data-source-audit' is designed for comprehensive data source auditing and report generation. The Python code in SKILL.md implements data modeling and analysis logic using standard libraries like pandas, without any evidence of malicious intent. The `claw.json` file explicitly declares 'filesystem' permission, which is justified by the skill's ability to export data catalogs to Excel files (e.g., `catalog.to_excel("data_catalog.xlsx", index=False)` as shown in SKILL.md). The instructions in SKILL.md and instructions.md are clear, align with the stated purpose, and do not contain any prompt injection attempts to subvert the agent for harmful actions. There are no signs of data exfiltration, unauthorized command execution, persistence mechanisms, or obfuscation.
能力评估
Purpose & Capability
Name/description (data source audit) match the declared needs: SKILL.md includes Python code examples for parsing CSV/Excel/JSON and mapping flows, and claw.json grants filesystem permission so the skill can read user files. Requiring python3 and filesystem access is proportionate for local data processing.
Instruction Scope
Runtime instructions limit work to data provided by the user and reference the code in SKILL.md for processing. There are no instructions to access unrelated system paths, environment variables, or hardcoded external endpoints; the skill is instruction-only and does not itself include commands that exfiltrate data.
Install Mechanism
There is no install spec and no code files to download or execute beyond the Python examples embedded in SKILL.md. This minimizes the risk of arbitrary remote code install.
Credentials
The skill requests no environment variables or credentials. That is appropriate for an audit that operates on user‑supplied files; it does not ask for unrelated secrets (AWS keys, database passwords) in its metadata.
Persistence & Privilege
always is false and model invocation is standard. The only elevated capability is filesystem permission in claw.json, which is reasonable for a file‑processing audit skill and is consistent with the described behavior.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install data-source-audit
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /data-source-audit 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v2.1.0
- Added a full technical implementation with Python dataclasses for data sources, flows, and silos. - Introduced enums for DataSourceType and DataDomain for more structured classification. - Included mechanisms for discovering sources from surveys and identifying data silos automatically. - Enhanced business overview and technical documentation for audit, mapping, and quality assessment of construction data sources.
v1.0.0
Initial release of Data Source Audit skill for construction data ecosystems. - Enables identification and documentation of all construction-related data sources and flows. - Maps data flows, highlights data silos, and assesses quality metrics per source. - Supports survey-based discovery of data systems. - Suggests integration strategies and master data management opportunities. - Includes core data classes for sources, flows, silos, and audit analysis.
元数据
Slug data-source-audit
版本 2.1.0
许可证
累计安装 10
当前安装数 9
历史版本数 2
常见问题

Data Source Audit 是什么?

Comprehensive audit of all construction data sources and systems. Map data flows, identify silos, assess quality, and create integration roadmap. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 1903 次。

如何安装 Data Source Audit?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install data-source-audit」即可一键安装,无需额外配置。

Data Source Audit 是免费的吗?

是的,Data Source Audit 完全免费(开源免费),可自由下载、安装和使用。

Data Source Audit 支持哪些平台?

Data Source Audit 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(darwin, linux, win32)。

谁开发了 Data Source Audit?

由 datadrivenconstruction(@datadrivenconstruction)开发并维护,当前版本 v2.1.0。

💬 留言讨论