← Back to Skills Marketplace
datadrivenconstruction

Data Silo Detection

win32 ✓ Security Clean
1024
Downloads
0
Stars
0
Active Installs
2
Versions
Install in OpenClaw
/install data-silo-detection
Description
Detect and map data silos in construction organizations. Identify disconnected data sources and integration opportunities
README (SKILL.md)

\r

Data Silo Detection\r

\r

Overview\r

\r Based on DDC methodology (Chapter 1.2), this skill detects and maps data silos in construction organizations, identifying disconnected data sources, duplicate data, and integration opportunities.\r \r Book Reference: "Технологии и системы управления в современном строительстве" / "Technologies and Management Systems in Modern Construction"\r \r

Quick Start\r

\r

from dataclasses import dataclass, field\r
from enum import Enum\r
from typing import List, Dict, Optional, Set, Tuple\r
from datetime import datetime\r
import json\r
from collections import defaultdict\r
\r
class DataDomain(Enum):\r
    """Construction data domains"""\r
    DESIGN = "design"\r
    COST = "cost"\r
    SCHEDULE = "schedule"\r
    QUALITY = "quality"\r
    SAFETY = "safety"\r
    PROCUREMENT = "procurement"\r
    SITE = "site"\r
    DOCUMENT = "document"\r
    FINANCIAL = "financial"\r
    HR = "hr"\r
\r
class SiloSeverity(Enum):\r
    """Severity level of data silo"""\r
    CRITICAL = "critical"      # Major business impact\r
    HIGH = "high"              # Significant inefficiency\r
    MEDIUM = "medium"          # Noticeable issues\r
    LOW = "low"                # Minor inconvenience\r
\r
class DataSourceType(Enum):\r
    """Types of data sources"""\r
    DATABASE = "database"\r
    SPREADSHEET = "spreadsheet"\r
    FILE_SHARE = "file_share"\r
    CLOUD_APP = "cloud_app"\r
    DESKTOP_APP = "desktop_app"\r
    PAPER = "paper"\r
    EMAIL = "email"\r
    PERSONAL = "personal"\r
\r
@dataclass\r
class DataSource:\r
    """Represents a data source in the organization"""\r
    id: str\r
    name: str\r
    type: DataSourceType\r
    domain: DataDomain\r
    owner: str\r
    department: str\r
    users: List[str]\r
    data_entities: List[str]\r
    connections: List[str] = field(default_factory=list)\r
    update_frequency: str = "unknown"\r
    access_level: str = "department"  # personal, department, organization\r
    has_api: bool = False\r
    last_modified: Optional[datetime] = None\r
\r
@dataclass\r
class DataSilo:\r
    """Detected data silo"""\r
    id: str\r
    sources: List[DataSource]\r
    domain: DataDomain\r
    severity: SiloSeverity\r
    issue_type: str\r
    description: str\r
    impact: str\r
    affected_users: int\r
    affected_processes: List[str]\r
    recommendations: List[str]\r
    estimated_cost: Optional[float] = None\r
\r
@dataclass\r
class DuplicateData:\r
    """Detected duplicate data across sources"""\r
    entity_name: str\r
    sources: List[str]\r
    discrepancy_rate: float  # 0-1\r
    master_source: Optional[str] = None\r
    issues: List[str] = field(default_factory=list)\r
\r
@dataclass\r
class SiloAnalysis:\r
    """Complete silo analysis results"""\r
    organization: str\r
    analysis_date: datetime\r
    total_sources: int\r
    silos_detected: List[DataSilo]\r
    duplicates: List[DuplicateData]\r
    connectivity_score: float\r
    data_flow_gaps: List[Dict]\r
    priority_actions: List[str]\r
    integration_roadmap: Dict\r
\r
\r
class DataSiloDetector:\r
    """\r
    Detect and analyze data silos in construction organizations.\r
    Based on DDC methodology Chapter 1.2.\r
    """\r
\r
    def __init__(self):\r
        self.domain_relationships = self._define_domain_relationships()\r
        self.critical_entities = self._define_critical_entities()\r
\r
    def _define_domain_relationships(self) -> Dict[DataDomain, List[DataDomain]]:\r
        """Define expected relationships between domains"""\r
        return {\r
            DataDomain.DESIGN: [\r
                DataDomain.COST, DataDomain.SCHEDULE,\r
                DataDomain.PROCUREMENT, DataDomain.QUALITY\r
            ],\r
            DataDomain.COST: [\r
                DataDomain.DESIGN, DataDomain.SCHEDULE,\r
                DataDomain.FINANCIAL, DataDomain.PROCUREMENT\r
            ],\r
            DataDomain.SCHEDULE: [\r
                DataDomain.DESIGN, DataDomain.COST,\r
                DataDomain.SITE, DataDomain.HR\r
            ],\r
            DataDomain.PROCUREMENT: [\r
                DataDomain.COST, DataDomain.DESIGN,\r
                DataDomain.SITE, DataDomain.FINANCIAL\r
            ],\r
            DataDomain.SITE: [\r
                DataDomain.SCHEDULE, DataDomain.SAFETY,\r
                DataDomain.QUALITY, DataDomain.HR\r
            ],\r
            DataDomain.QUALITY: [\r
                DataDomain.DESIGN, DataDomain.SITE,\r
                DataDomain.DOCUMENT\r
            ],\r
            DataDomain.SAFETY: [\r
                DataDomain.SITE, DataDomain.HR,\r
                DataDomain.DOCUMENT\r
            ],\r
            DataDomain.FINANCIAL: [\r
                DataDomain.COST, DataDomain.PROCUREMENT,\r
                DataDomain.HR\r
            ]\r
        }\r
\r
    def _define_critical_entities(self) -> Dict[str, List[DataDomain]]:\r
        """Define entities that should be shared across domains"""\r
        return {\r
            "project": [DataDomain.DESIGN, DataDomain.COST, DataDomain.SCHEDULE],\r
            "budget": [DataDomain.COST, DataDomain.FINANCIAL, DataDomain.PROCUREMENT],\r
            "schedule": [DataDomain.SCHEDULE, DataDomain.SITE, DataDomain.PROCUREMENT],\r
            "material": [DataDomain.DESIGN, DataDomain.COST, DataDomain.PROCUREMENT],\r
            "labor": [DataDomain.HR, DataDomain.COST, DataDomain.SCHEDULE],\r
            "subcontractor": [DataDomain.PROCUREMENT, DataDomain.COST, DataDomain.SCHEDULE],\r
            "rfi": [DataDomain.DESIGN, DataDomain.DOCUMENT, DataDomain.SITE],\r
            "change_order": [DataDomain.COST, DataDomain.DESIGN, DataDomain.SCHEDULE]\r
        }\r
\r
    def detect_silos(\r
        self,\r
        organization: str,\r
        data_sources: List[DataSource],\r
        process_flows: Optional[List[Dict]] = None\r
    ) -> SiloAnalysis:\r
        """\r
        Detect data silos in the organization.\r
\r
        Args:\r
            organization: Organization name\r
            data_sources: List of data sources to analyze\r
            process_flows: Optional business process flows\r
\r
        Returns:\r
            Complete silo analysis\r
        """\r
        # Build connectivity graph\r
        connectivity = self._build_connectivity_graph(data_sources)\r
\r
        # Detect isolated sources\r
        isolated_silos = self._detect_isolated_sources(\r
            data_sources, connectivity\r
        )\r
\r
        # Detect domain silos\r
        domain_silos = self._detect_domain_silos(data_sources)\r
\r
        # Detect duplicate data\r
        duplicates = self._detect_duplicates(data_sources)\r
\r
        # Detect data flow gaps\r
        flow_gaps = self._detect_flow_gaps(\r
            data_sources, process_flows\r
        )\r
\r
        # Calculate connectivity score\r
        connectivity_score = self._calculate_connectivity_score(\r
            data_sources, connectivity\r
        )\r
\r
        # Combine all silos\r
        all_silos = isolated_silos + domain_silos\r
\r
        # Prioritize silos\r
        prioritized_silos = self._prioritize_silos(all_silos)\r
\r
        # Generate priority actions\r
        priority_actions = self._generate_priority_actions(\r
            prioritized_silos, duplicates\r
        )\r
\r
        # Create integration roadmap\r
        roadmap = self._create_integration_roadmap(\r
            prioritized_silos, flow_gaps\r
        )\r
\r
        return SiloAnalysis(\r
            organization=organization,\r
            analysis_date=datetime.now(),\r
            total_sources=len(data_sources),\r
            silos_detected=prioritized_silos,\r
            duplicates=duplicates,\r
            connectivity_score=connectivity_score,\r
            data_flow_gaps=flow_gaps,\r
            priority_actions=priority_actions,\r
            integration_roadmap=roadmap\r
        )\r
\r
    def _build_connectivity_graph(\r
        self,\r
        sources: List[DataSource]\r
    ) -> Dict[str, Set[str]]:\r
        """Build graph of source connections"""\r
        graph = defaultdict(set)\r
\r
        for source in sources:\r
            for connection in source.connections:\r
                graph[source.id].add(connection)\r
                graph[connection].add(source.id)\r
\r
        return graph\r
\r
    def _detect_isolated_sources(\r
        self,\r
        sources: List[DataSource],\r
        connectivity: Dict[str, Set[str]]\r
    ) -> List[DataSilo]:\r
        """Detect sources with no connections"""\r
        silos = []\r
\r
        for source in sources:\r
            connections = len(connectivity.get(source.id, set()))\r
\r
            if connections == 0:\r
                severity = SiloSeverity.CRITICAL if source.domain in [\r
                    DataDomain.COST, DataDomain.SCHEDULE\r
                ] else SiloSeverity.HIGH\r
\r
                silos.append(DataSilo(\r
                    id=f"isolated_{source.id}",\r
                    sources=[source],\r
                    domain=source.domain,\r
                    severity=severity,\r
                    issue_type="isolated_source",\r
                    description=f"{source.name} has no connections to other systems",\r
                    impact="Data must be manually transferred, risking errors and delays",\r
                    affected_users=len(source.users),\r
                    affected_processes=self._get_affected_processes(source.domain),\r
                    recommendations=[\r
                        f"Connect {source.name} via API or ETL to related systems",\r
                        "Establish data synchronization schedule",\r
                        "Define master data source for shared entities"\r
                    ]\r
                ))\r
            elif connections == 1 and source.access_level == "personal":\r
                silos.append(DataSilo(\r
                    id=f"personal_{source.id}",\r
                    sources=[source],\r
                    domain=source.domain,\r
                    severity=SiloSeverity.MEDIUM,\r
                    issue_type="personal_silo",\r
                    description=f"{source.name} is a personal data store with limited access",\r
                    impact="Data not accessible to team, knowledge loss risk",\r
                    affected_users=1,\r
                    affected_processes=self._get_affected_processes(source.domain),\r
                    recommendations=[\r
                        "Move data to shared organizational repository",\r
                        "Implement access controls instead of isolation",\r
                        "Document data structure and usage"\r
                    ]\r
                ))\r
\r
        return silos\r
\r
    def _detect_domain_silos(\r
        self,\r
        sources: List[DataSource]\r
    ) -> List[DataSilo]:\r
        """Detect silos between domains that should be connected"""\r
        silos = []\r
\r
        # Group sources by domain\r
        domain_sources = defaultdict(list)\r
        for source in sources:\r
            domain_sources[source.domain].append(source)\r
\r
        # Check for missing domain connections\r
        for domain, related_domains in self.domain_relationships.items():\r
            domain_srcs = domain_sources.get(domain, [])\r
\r
            for related in related_domains:\r
                related_srcs = domain_sources.get(related, [])\r
\r
                if domain_srcs and related_srcs:\r
                    # Check if any connections exist between domains\r
                    has_connection = False\r
                    for src in domain_srcs:\r
                        for rel_src in related_srcs:\r
                            if rel_src.id in src.connections:\r
                                has_connection = True\r
                                break\r
\r
                    if not has_connection:\r
                        silos.append(DataSilo(\r
                            id=f"domain_gap_{domain.value}_{related.value}",\r
                            sources=domain_srcs + related_srcs,\r
                            domain=domain,\r
                            severity=SiloSeverity.HIGH,\r
                            issue_type="domain_disconnect",\r
                            description=f"No data flow between {domain.value} and {related.value}",\r
                            impact="Related information not synchronized, decision delays",\r
                            affected_users=sum(len(s.users) for s in domain_srcs + related_srcs),\r
                            affected_processes=self._get_affected_processes(domain) +\r
                                              self._get_affected_processes(related),\r
                            recommendations=[\r
                                f"Establish integration between {domain.value} and {related.value} systems",\r
                                "Define shared data entities and master sources",\r
                                "Implement automated data synchronization"\r
                            ]\r
                        ))\r
\r
        return silos\r
\r
    def _detect_duplicates(\r
        self,\r
        sources: List[DataSource]\r
    ) -> List[DuplicateData]:\r
        """Detect duplicate data across sources"""\r
        duplicates = []\r
\r
        # Map entities to sources\r
        entity_sources = defaultdict(list)\r
        for source in sources:\r
            for entity in source.data_entities:\r
                entity_sources[entity].append(source.id)\r
\r
        # Find duplicates\r
        for entity, source_ids in entity_sources.items():\r
            if len(source_ids) > 1:\r
                # Check if it's a critical entity\r
                is_critical = entity.lower() in self.critical_entities\r
\r
                duplicate = DuplicateData(\r
                    entity_name=entity,\r
                    sources=source_ids,\r
                    discrepancy_rate=0.0,  # Would need actual data to calculate\r
                    issues=[]\r
                )\r
\r
                if is_critical and len(source_ids) > 2:\r
                    duplicate.issues.append(\r
                        "Critical entity duplicated in multiple systems"\r
                    )\r
\r
                if not any(s for s in sources if s.id in source_ids and "master" in s.name.lower()):\r
                    duplicate.issues.append("No clear master source defined")\r
\r
                duplicates.append(duplicate)\r
\r
        return duplicates\r
\r
    def _detect_flow_gaps(\r
        self,\r
        sources: List[DataSource],\r
        process_flows: Optional[List[Dict]]\r
    ) -> List[Dict]:\r
        """Detect gaps in expected data flows"""\r
        gaps = []\r
\r
        # Check critical entity coverage\r
        for entity, required_domains in self.critical_entities.items():\r
            entity_domains = set()\r
            for source in sources:\r
                if entity in [e.lower() for e in source.data_entities]:\r
                    entity_domains.add(source.domain)\r
\r
            missing = set(required_domains) - entity_domains\r
            if missing:\r
                gaps.append({\r
                    "entity": entity,\r
                    "missing_domains": [d.value for d in missing],\r
                    "impact": f"{entity} data not available in {len(missing)} domains"\r
                })\r
\r
        return gaps\r
\r
    def _calculate_connectivity_score(\r
        self,\r
        sources: List[DataSource],\r
        connectivity: Dict[str, Set[str]]\r
    ) -> float:\r
        """Calculate overall connectivity score"""\r
        if not sources:\r
            return 0.0\r
\r
        # Calculate average connections per source\r
        total_connections = sum(len(conns) for conns in connectivity.values())\r
        avg_connections = total_connections / len(sources)\r
\r
        # Ideal connections per source\r
        ideal_connections = 3\r
\r
        # Score based on average connections\r
        connection_score = min(1.0, avg_connections / ideal_connections)\r
\r
        # Penalize for isolated sources\r
        isolated = sum(1 for s in sources if s.id not in connectivity or not connectivity[s.id])\r
        isolation_penalty = isolated / len(sources)\r
\r
        # API availability bonus\r
        api_count = sum(1 for s in sources if s.has_api)\r
        api_bonus = (api_count / len(sources)) * 0.2\r
\r
        return max(0, min(1.0, connection_score - isolation_penalty + api_bonus))\r
\r
    def _get_affected_processes(self, domain: DataDomain) -> List[str]:\r
        """Get business processes affected by domain"""\r
        process_map = {\r
            DataDomain.DESIGN: ["Design Review", "RFI Processing", "Drawing Distribution"],\r
            DataDomain.COST: ["Budgeting", "Cost Tracking", "Invoice Processing"],\r
            DataDomain.SCHEDULE: ["Planning", "Progress Tracking", "Resource Allocation"],\r
            DataDomain.PROCUREMENT: ["Vendor Selection", "Purchase Orders", "Material Tracking"],\r
            DataDomain.SITE: ["Daily Reports", "Progress Photos", "Issue Management"],\r
            DataDomain.QUALITY: ["Inspections", "Defect Tracking", "Compliance"],\r
            DataDomain.SAFETY: ["Incident Reporting", "Safety Inspections", "Training"],\r
            DataDomain.FINANCIAL: ["Billing", "Payments", "Financial Reporting"],\r
            DataDomain.HR: ["Timekeeping", "Resource Management", "Certifications"]\r
        }\r
        return process_map.get(domain, [])\r
\r
    def _prioritize_silos(\r
        self,\r
        silos: List[DataSilo]\r
    ) -> List[DataSilo]:\r
        """Prioritize silos by severity and impact"""\r
        severity_order = {\r
            SiloSeverity.CRITICAL: 0,\r
            SiloSeverity.HIGH: 1,\r
            SiloSeverity.MEDIUM: 2,\r
            SiloSeverity.LOW: 3\r
        }\r
\r
        return sorted(\r
            silos,\r
            key=lambda s: (severity_order[s.severity], -s.affected_users)\r
        )\r
\r
    def _generate_priority_actions(\r
        self,\r
        silos: List[DataSilo],\r
        duplicates: List[DuplicateData]\r
    ) -> List[str]:\r
        """Generate prioritized action items"""\r
        actions = []\r
\r
        # Critical silos first\r
        critical_silos = [s for s in silos if s.severity == SiloSeverity.CRITICAL]\r
        for silo in critical_silos[:3]:\r
            actions.append(f"URGENT: {silo.recommendations[0]}")\r
\r
        # Duplicate data issues\r
        critical_dups = [d for d in duplicates if d.issues]\r
        for dup in critical_dups[:2]:\r
            actions.append(\r
                f"Define master source for '{dup.entity_name}' "\r
                f"(currently in {len(dup.sources)} sources)"\r
            )\r
\r
        # High priority silos\r
        high_silos = [s for s in silos if s.severity == SiloSeverity.HIGH]\r
        for silo in high_silos[:3]:\r
            if silo.recommendations:\r
                actions.append(silo.recommendations[0])\r
\r
        return actions[:10]\r
\r
    def _create_integration_roadmap(\r
        self,\r
        silos: List[DataSilo],\r
        gaps: List[Dict]\r
    ) -> Dict:\r
        """Create phased integration roadmap"""\r
        roadmap = {\r
            "Phase 1 - Quick Wins (0-3 months)": [],\r
            "Phase 2 - Core Integration (3-6 months)": [],\r
            "Phase 3 - Advanced Integration (6-12 months)": [],\r
            "Phase 4 - Optimization (12+ months)": []\r
        }\r
\r
        # Phase 1: Address personal silos and easy integrations\r
        for silo in silos:\r
            if silo.issue_type == "personal_silo":\r
                roadmap["Phase 1 - Quick Wins (0-3 months)"].append(\r
                    f"Migrate {silo.sources[0].name} to shared repository"\r
                )\r
\r
        # Phase 2: Core domain integrations\r
        domain_gaps = [s for s in silos if s.issue_type == "domain_disconnect"]\r
        for silo in domain_gaps[:3]:\r
            roadmap["Phase 2 - Core Integration (3-6 months)"].append(\r
                silo.recommendations[0] if silo.recommendations else silo.description\r
            )\r
\r
        # Phase 3: Critical entity master data\r
        roadmap["Phase 3 - Advanced Integration (6-12 months)"].extend([\r
            "Implement master data management for shared entities",\r
            "Deploy integration middleware/ESB",\r
            "Establish data governance policies"\r
        ])\r
\r
        # Phase 4: Optimization\r
        roadmap["Phase 4 - Optimization (12+ months)"].extend([\r
            "Implement real-time data synchronization",\r
            "Deploy integration monitoring and alerting",\r
            "Continuous improvement based on metrics"\r
        ])\r
\r
        return roadmap\r
\r
    def generate_report(self, analysis: SiloAnalysis) -> str:\r
        """Generate silo analysis report"""\r
        report = f"""\r
# Data Silo Analysis Report\r
## {analysis.organization}\r
\r
**Analysis Date:** {analysis.analysis_date.strftime('%Y-%m-%d')}\r
**Data Sources Analyzed:** {analysis.total_sources}\r
**Connectivity Score:** {analysis.connectivity_score:.0%}\r
\r
## Executive Summary\r
\r
Detected **{len(analysis.silos_detected)}** data silos and **{len(analysis.duplicates)}** duplicate data issues.\r
\r
### Silos by Severity\r
"""\r
        severity_counts = defaultdict(int)\r
        for silo in analysis.silos_detected:\r
            severity_counts[silo.severity.value] += 1\r
\r
        for severity in ["critical", "high", "medium", "low"]:\r
            count = severity_counts.get(severity, 0)\r
            if count > 0:\r
                report += f"- **{severity.title()}**: {count}\
"\r
\r
        report += "\
## Priority Actions\
\
"\r
        for i, action in enumerate(analysis.priority_actions, 1):\r
            report += f"{i}. {action}\
"\r
\r
        report += "\
## Detected Silos\
\
"\r
        for silo in analysis.silos_detected[:5]:\r
            report += f"""\r
### {silo.id}\r
- **Type:** {silo.issue_type}\r
- **Severity:** {silo.severity.value}\r
- **Impact:** {silo.impact}\r
- **Affected Users:** {silo.affected_users}\r
"""\r
\r
        report += "\
## Integration Roadmap\
"\r
        for phase, items in analysis.integration_roadmap.items():\r
            report += f"\
### {phase}\
"\r
            for item in items:\r
                report += f"- {item}\
"\r
\r
        return report\r
```\r
\r
## Common Use Cases\r
\r
### Detect Data Silos\r
\r
```python\r
detector = DataSiloDetector()\r
\r
# Define data sources\r
sources = [\r
    DataSource(\r
        id="revit",\r
        name="Revit Models",\r
        type=DataSourceType.DESKTOP_APP,\r
        domain=DataDomain.DESIGN,\r
        owner="Design Team",\r
        department="Engineering",\r
        users=["architect1", "engineer1", "engineer2"],\r
        data_entities=["building_model", "drawings", "schedules"],\r
        connections=["navisworks"],\r
        has_api=True\r
    ),\r
    DataSource(\r
        id="excel_estimates",\r
        name="Excel Cost Estimates",\r
        type=DataSourceType.SPREADSHEET,\r
        domain=DataDomain.COST,\r
        owner="Estimator",\r
        department="Pre-construction",\r
        users=["estimator1"],\r
        data_entities=["costs", "quantities", "labor_rates"],\r
        connections=[],  # No connections - silo!\r
        access_level="personal"\r
    ),\r
    DataSource(\r
        id="procore",\r
        name="Procore",\r
        type=DataSourceType.CLOUD_APP,\r
        domain=DataDomain.SITE,\r
        owner="Project Manager",\r
        department="Operations",\r
        users=["pm1", "pm2", "super1"],\r
        data_entities=["daily_reports", "photos", "punch_list"],\r
        connections=["primavera"],\r
        has_api=True\r
    )\r
]\r
\r
analysis = detector.detect_silos(\r
    organization="ABC Construction",\r
    data_sources=sources\r
)\r
\r
print(f"Silos detected: {len(analysis.silos_detected)}")\r
print(f"Connectivity score: {analysis.connectivity_score:.0%}")\r
```\r
\r
### Generate Silo Report\r
\r
```python\r
report = detector.generate_report(analysis)\r
print(report)\r
\r
# Save to file\r
with open("silo_report.md", "w") as f:\r
    f.write(report)\r
```\r
\r
### View Priority Actions\r
\r
```python\r
print("Priority Actions:")\r
for i, action in enumerate(analysis.priority_actions, 1):\r
    print(f"{i}. {action}")\r
\r
print("\
Integration Roadmap:")\r
for phase, items in analysis.integration_roadmap.items():\r
    print(f"\
{phase}:")\r
    for item in items:\r
        print(f"  - {item}")\r
```\r
\r
## Quick Reference\r
\r
| Component | Purpose |\r
|-----------|---------|\r
| `DataSiloDetector` | Main detection engine |\r
| `DataSource` | Data source definition |\r
| `DataSilo` | Detected silo with details |\r
| `DuplicateData` | Duplicate data detection |\r
| `SiloAnalysis` | Complete analysis results |\r
| `SiloSeverity` | Severity classification |\r
\r
## Resources\r
\r
- **Book**: "Data-Driven Construction" by Artem Boiko, Chapter 1.2\r
- **Website**: https://datadrivenconstruction.io\r
\r
## Next Steps\r
\r
- Use [erp-integration-analysis](../erp-integration-analysis/SKILL.md) for system integration\r
- Use [data-evolution-analysis](../../Chapter-1.1/data-evolution-analysis/SKILL.md) for maturity assessment\r
- Use [etl-pipeline](../../Chapter-4.2/etl-pipeline/SKILL.md) to connect silos\r
Usage Guidance
This skill appears internally consistent with its stated purpose. Before installing or running it: (1) verify the origin (homepage and author) if you care about provenance; (2) run it in a sandbox or with non-sensitive sample data first — the skill reads files, so avoid supplying documents that contain credentials or personal data; (3) confirm you are comfortable granting filesystem access (the skill needs to read user-supplied files); (4) note the win32 OS restriction in metadata — if you plan to run on macOS/Linux, validate compatibility; (5) if you want network-enabled analysis (e.g., querying APIs), ensure explicit instructions/consent are added, since the current materials do not show external calls.
Capability Analysis
Type: OpenClaw Skill Name: data-silo-detection Version: 2.1.0 The skill bundle is benign. The Python code in SKILL.md defines data structures and logic for detecting data silos, generating reports, and suggesting integration roadmaps using only standard libraries. The `filesystem` permission in claw.json is justified by the `generate_report` function writing a markdown report to a file. There are no indicators of data exfiltration, malicious execution, persistence, or prompt injection attempts in any of the provided files. All instructions and code align with the stated purpose of data silo detection in construction organizations.
Capability Assessment
Purpose & Capability
Name/description (data silo detection) align with the content: SKILL.md contains Python classes and methods to analyze user-provided data sources. The claw.json permission for filesystem access is consistent with reading user files. Minor oddity: the skill is restricted to win32 in metadata despite Python code being cross-platform; this is likely a packaging/metadata choice rather than a functional inconsistency.
Instruction Scope
instructions.md and SKILL.md focus on processing user-supplied project data (CSV/Excel/JSON/direct input), validating inputs, analyzing connectivity, duplicates, and producing reports. There are no instructions in the provided material to read unrelated system files, pull environment variables, or transmit data to external endpoints.
Install Mechanism
No install specification or code files to fetch — the skill is instruction-only and requires python3 to be present. This minimizes the risk of hidden third-party downloads or arbitrary code installation.
Credentials
The skill declares no required environment variables or credentials. The only declared permission in claw.json is filesystem access, which is proportionate for a tool that reads user-provided files. There are no requests for unrelated cloud or API credentials.
Persistence & Privilege
The skill is not set always:true and does not request special persistent privileges. It is user-invocable and can be invoked autonomously by the agent by default (normal), but it does not attempt to modify other skills or agent-wide configuration in the materials provided.
How to Use
  1. Make sure OpenClaw is installed (local or Docker)
  2. Run the install command in chat: /install data-silo-detection
  3. After installation, invoke the skill by name or use /data-silo-detection
  4. Provide required inputs per the skill's parameter spec and get structured output
Version History
v2.1.0
- Added detailed domain models and Enum classes for data sources, silos, duplication, and analysis in construction organizations. - Introduced organization-wide detection of data silos, duplicate data, and integration gaps with customizable severity and recommendations. - Implemented comprehensive APIs for detecting isolated data sources, domain silos, duplicate data, flow gaps, and generating integration roadmaps. - Enhanced connectivity scoring and prioritization of actions to improve data integration opportunities. - Documentation now references the DDC methodology and relevant literature for domain alignment.
v1.0.0
Initial release of Data Silo Detection skill. - Detects and maps data silos in construction organizations following the DDC methodology (Chapter 1.2). - Identifies disconnected data sources, duplicate data, and opportunities for integration. - Provides severity levels for silos and detailed analysis including impacted users/processes and recommendations. - Calculates organizational data connectivity score and highlights key data flow gaps. - Generates prioritized actions and a suggested integration roadmap.
Metadata
Slug data-silo-detection
Version 2.1.0
License
All-time Installs 0
Active Installs 0
Total Versions 2
Frequently Asked Questions

What is Data Silo Detection?

Detect and map data silos in construction organizations. Identify disconnected data sources and integration opportunities. It is an AI Agent Skill for Claude Code / OpenClaw, with 1024 downloads so far.

How do I install Data Silo Detection?

Run "/install data-silo-detection" in the OpenClaw or Claude Code chat to install it in one step — no extra setup required.

Is Data Silo Detection free?

Yes, Data Silo Detection is completely free (open-source). You can download, install and use it at no cost.

Which platforms does Data Silo Detection support?

Data Silo Detection is cross-platform and runs anywhere OpenClaw / Claude Code is available (win32).

Who created Data Silo Detection?

It is built and maintained by datadrivenconstruction (@datadrivenconstruction); the current version is v2.1.0.

💬 Comments