← 返回 Skills 市场
cgxxxxxxxxxxxx

DataHub Unified Data Layer

作者 cgxxxxxxxxxxxx · GitHub ↗ · v1.0.0 · MIT-0
cross-platform ✓ 安全检测通过
68
总下载
0
收藏
0
当前安装
1
版本数
在 OpenClaw 中安装
/install datahub-unified
功能描述
DataHub统一数据获取层 — 发布-订阅架构整合14个数据源,支持行情/财报/公告/宏观/研报/IR数据获取,智能缓存+优先级路由+模式匹配,提供quick_quote/quick_financial/quick_macro等便捷方法。触发词:获取数据、查行情、查财报、查宏观数据、DataHub、数据订阅、发...
使用说明 (SKILL.md)

DataHub 统一数据获取层

版本: v1.0 状态: 生产就绪 代码位置: \x3CYOUR_DATAHUB_PATH>(请配置您的DataHub安装路径)


When to Activate

  • 用户需要获取金融数据(行情、财报、公告、宏观、研报、IR资讯)
  • 用户需要多数据源对比验证
  • 用户提到发布-订阅、数据中枢、DataHub
  • 用户需要批量获取数据(多只股票、多个指标)
  • 用户需要自然语言查询金融数据
  • 用户需要构建数据Pipeline
  • 触发词:获取数据、查行情、查财报、宏观经济、DataHub、数据订阅、发布订阅、数据源

快速开始

方式一:便捷函数(推荐)

import asyncio
import sys
sys.path.insert(0, "\x3CYOUR_DATAHUB_PATH>")

from datahub.production_startup import quick_quote, quick_financial, quick_macro, quick_query

async def main():
    # 1. 获取A股行情
    quote = await quick_quote("600519", market='cn')
    print(f"贵州茅台: {quote}")

    # 2. 获取港股行情
    quote_hk = await quick_quote("00656", market='hk')
    print(f"港股: {quote_hk}")

    # 3. 获取美股行情
    quote_us = await quick_quote("AAPL", market='us')
    print(f"Apple: {quote_us}")

    # 4. 获取财报数据
    financial = await quick_financial("600519", report_type='balance')
    print(f"资产负债表: {financial}")

    # 5. 获取宏观数据
    gdp = await quick_macro("GDP", source='fred')
    print(f"GDP数据: {gdp}")

    # 6. 自然语言查询
    result = await quick_query("贵州茅台股价")
    print(f"查询结果: {result}")

asyncio.run(main())

方式二:完整初始化

import asyncio
import sys
sys.path.insert(0, "\x3CYOUR_DATAHUB_PATH>")

from datahub import get_datahub, register_all_producers

async def main():
    # 初始化DataHub
    datahub = get_datahub()

    # 注册所有Producer(14个数据源)
    producers = register_all_producers(datahub)
    print(f"已注册 {len(producers)} 个Producer")

    # 订阅数据
    received = asyncio.Event()
    result = {}

    async def callback(data):
        result['data'] = data
        result['status'] = 'success'
        received.set()

    # 订阅主题
    datahub.subscribe("westock:cn:quote:600519", callback)

    # 请求刷新
    await datahub.request_refresh("westock:cn:quote:600519")

    # 等待数据
    await asyncio.wait_for(received.wait(), timeout=5.0)
    print(result)

asyncio.run(main())

方式三:生产环境管理器

import asyncio
import sys
sys.path.insert(0, "\x3CYOUR_DATAHUB_PATH>")

from datahub.production_startup import get_production

async def main():
    # 获取生产环境实例(自动初始化)
    prod = await get_production()

    # 使用便捷方法
    quote = await prod.get_quote("600519")
    financial = await prod.get_financial("600519", report_type='balance')

    # 获取统计信息
    stats = prod.get_stats()
    print(f"统计: {stats}")

asyncio.run(main())

主题命名规范

格式

{producer}:{data_type}:{market}:{code}
{producer}:{data_type}:{identifier}

示例

A股数据

  • westock:cn:quote:600519 — 腾讯自选股A股行情
  • a-stock-financial:balance:600519 — A股财报资产负债表
  • a-stock-financial:income:600519 — A股财报利润表
  • a-stock-financial:cashflow:600519 — A股财报现金流量表
  • cninfo:announcement:600519 — 巨潮资讯公告
  • akshare:quote:600519 — AkShare行情
  • neodata:query:贵州茅台股价 — 自然语言查询

港股数据

  • westock:hk:quote:00656 — 港股行情
  • hkex:annual:00656 — 港交所年报
  • irasia:news:00656 — IR Asia新闻

美股数据

  • yfinance:quote:AAPL — Yahoo Finance行情
  • sec:10k:AAPL — SEC 10-K文件
  • seekingalpha:rating:AAPL — Seeking Alpha评级

宏观数据

  • fred:series:GDP — FRED GDP数据
  • worldbank:indicator:NY.GDP.MKTP.CD — 世界银行GDP
  • imf:dataflow:CPI — IMF CPI数据
  • dbnomics:series:* — DBnomics数据

本地数据

  • wiki:entity:贵州茅台_600519.SH — 本地Wiki实体
  • wiki:concept:DCF估值 — 本地Wiki概念
  • wiki:raw:某报告.pdf — 本地Wiki原始文件

数据源优先级

优先级 Producer 数据源 类型 API需求
1 LocalWikiProducer 本地Wiki 本地
8 CninfoProducer 巨潮资讯 官方
9 SECEdgarProducer SEC EDGAR 官方
9 HKEXProducer 港交所披露易 官方
10 EastMoneyProducer 东方财富 在线 MX_APIKEY
11 THSProducer 同花顺问财 在线 IWENCAI_API_KEY
12 NeoDataProducer NeoData 在线
13 WeStockProducer 腾讯自选股 在线
15 AkShareProducer AkShare 开源
15 AStockFinancialProducer A股财报 开源
16 YFinanceProducer YFinance 开源
20 SeekingAlphaProducer Seeking Alpha 第三方
30 FREDProducer FRED 官方 FRED_API_KEY
31 DBnomicsProducer DBnomics 官方
32 WorldBankProducer 世界银行 官方
33 IMFProducer IMF 官方 IMF_COOKIES
40 AlphaVantageProducer Alpha Vantage 第三方 ALPHAVANTAGE_API_KEY
50 IRWebsiteProducer IR网站 IR
50 IRAsiaProducer IR Asia IR

优先级规则:数字越小优先级越高。本地数据 > 官方数据 > 在线数据 > 开源数据 > 第三方数据


核心功能

1. 发布-订阅机制

from datahub import get_datahub, register_all_producers

datahub = get_datahub()
register_all_producers(datahub)

# 精确订阅
def callback(data):
    print(f"收到数据: {data}")

datahub.subscribe("westock:cn:quote:600519", callback)

# 模式订阅(通配符)
datahub.subscribe_pattern("westock:cn:quote:*", callback)

# 请求刷新
await datahub.request_refresh("westock:cn:quote:600519")

2. 缓存系统

from datahub import get_preconfigured_cache

# 预配置缓存类型
cache = get_preconfigured_cache('quote')  # 5分钟TTL

# 缓存操作
await cache.set("600519", {"price": 1800})
data = await cache.get("600519")
stats = cache.stats()  # {'size': 1, 'max_size': 500, 'ttl': 300}

预配置缓存

类型 TTL 最大条目 适用场景
quote 5分钟 500 实时行情
financial 1小时 200 财务报表
announcement 30分钟 300 公司公告
macro 1天 100 宏观数据
research 2小时 200 研报评级
ir 1小时 200 IR资讯

3. 重试机制

from datahub import with_retry

@with_retry(max_retries=3, base_delay=1.0, max_delay=60.0)
async def fetch_data():
    # 自动重试(指数退避:1秒、2秒、4秒)
    return await api_call()

4. 性能监控

stats = datahub.get_stats()
print(f"总发布数: {stats['total_publishes']}")
print(f"总错误数: {stats['total_errors']}")
print(f"主题数: {stats['topics_count']}")
print(f"Producer数: {stats['producers_count']}")

常用场景

场景1:批量获取行情

import asyncio
from datahub.production_startup import get_production

async def batch_quotes(codes: list) -> dict:
    """批量获取行情"""
    prod = await get_production()
    results = {}

    tasks = []
    for code in codes:
        tasks.append(prod.get_quote(code, market='cn'))

    quotes = await asyncio.gather(*tasks)

    for code, quote in zip(codes, quotes):
        if quote['status'] == 'success':
            results[code] = quote['data']

    return results

# 用法
async def main():
    quotes = await batch_quotes(["600519", "000858", "601318"])
    for code, data in quotes.items():
        print(f"{code}: {data}")

asyncio.run(main())

场景2:多数据源验证

import asyncio
from datahub import get_datahub, register_all_producers

async def cross_verify_quote(code: str) -> dict:
    """多数据源验证行情"""
    datahub = get_datahub()
    register_all_producers(datahub)

    sources = {
        'westock': f"westock:cn:quote:{code}",
        'akshare': f"akshare:quote:{code}",
    }

    results = {}

    for source_name, topic in sources.items():
        received = asyncio.Event()
        result = {}

        async def callback(data):
            result['data'] = data
            result['status'] = 'success'
            received.set()

        datahub.subscribe(topic, callback)
        await datahub.request_refresh(topic)

        try:
            await asyncio.wait_for(received.wait(), timeout=5.0)
            results[source_name] = result.get('data')
        except asyncio.TimeoutError:
            results[source_name] = None

    return results

场景3:投研工作流集成

import asyncio
from datahub.production_startup import get_production

async def stock_analysis(code: str) -> dict:
    """个股全面分析"""
    prod = await get_production()

    # 并发获取多维度数据
    quote_task = prod.get_quote(code, market='cn')
    balance_task = prod.get_financial(code, report_type='balance')
    income_task = prod.get_financial(code, report_type='income')

    quote, balance, income = await asyncio.gather(
        quote_task, balance_task, income_task
    )

    return {
        'code': code,
        'quote': quote.get('data') if quote['status'] == 'success' else None,
        'balance': balance.get('data') if balance['status'] == 'success' else None,
        'income': income.get('data') if income['status'] == 'success' else None,
    }

场景4:自然语言查询

from datahub.production_startup import quick_query

async def main():
    # 自然语言查询
    result = await quick_query("贵州茅台股价")
    print(result)

    result = await quick_query("沪深300市盈率")
    print(result)

    result = await quick_query("中国GDP增长率")
    print(result)

asyncio.run(main())

场景5:宏观数据追踪

from datahub.production_startup import quick_macro

async def macro_dashboard() -> dict:
    """宏观经济仪表盘"""
    indicators = {
        'GDP': ('GDP', 'fred'),
        'CPI': ('CPIAUCSL', 'fred'),
        'Unemployment': ('UNRATE', 'fred'),
    }

    results = {}
    tasks = []

    for name, (code, source) in indicators.items():
        tasks.append(quick_macro(code, source))

    data = await asyncio.gather(*tasks)

    for (name, _), result in zip(indicators.items(), data):
        results[name] = result

    return results

API密钥配置

环境变量

# 同花顺问财
IWENCAI_API_KEY=your_key_here

# FRED美联储
FRED_API_KEY=your_key_here

# Alpha Vantage
ALPHAVANTAGE_API_KEY=your_key_here

# 东方财富妙想
MX_APIKEY=your_key_here

# IMF Cookies
IMF_COOKIES_BM_SV=your_cookie_here
IMF_COOKIES_AUTH=your_cookie_here
IMF_COOKIES_BM_S=your_cookie_here

加载配置

from dotenv import load_dotenv
load_dotenv()

from datahub import init_datahub, register_all_producers

datahub = init_datahub()
producers = register_all_producers(datahub)

错误处理

错误类型

错误类型 原因 处理方式
timeout 网络超时 自动重试
not_found 主题无匹配Producer 返回错误
api_error API密钥失效/配额耗尽 记录日志
parse_error 数据格式异常 记录日志

错误处理示例

from datahub.production_startup import quick_quote

async def safe_get_quote(code: str) -> dict:
    """安全的行情获取"""
    try:
        result = await quick_quote(code, market='cn')

        if result['status'] == 'success':
            return result['data']
        elif result['status'] == 'timeout':
            print(f"获取 {code} 超时")
            return None
        else:
            print(f"获取 {code} 失败: {result.get('error')}")
            return None

    except Exception as e:
        print(f"异常: {e}")
        return None

性能指标

操作 耗时 说明
DataHub初始化 \x3C10ms 单例模式
Producer注册 \x3C50ms 14个Producer
订阅主题 \x3C1ms 模式匹配
缓存命中 \x3C10ms LRU缓存
首次数据获取 0.1-2s 网络请求
批量获取(10只) 0.1-0.5s 并发请求

故障排查

Q: Producer未注册

# 检查Producer注册状态
from datahub import get_datahub, register_all_producers

datahub = get_datahub()
producers = register_all_producers(datahub)
print(f"已注册Producer: {len(producers)}")

Q: 主题匹配失败

# 检查主题匹配
from datahub import get_datahub, register_all_producers

datahub = get_datahub()
register_all_producers(datahub)

topic = "westock:cn:quote:600519"
matched = datahub.pattern_index.match(topic)
print(f"匹配到的Producer: {len(matched)}")

for producer in matched:
    print(f"  - {producer.__class__.__name__}")

Q: API密钥未配置

# 检查API配置
from datahub.registry import api_registry

for name, config in api_registry.apis.items():
    status = "✅" if config.enabled else "❌"
    key_status = "已配置" if config.api_key else "未配置"
    print(f"{status} {name}: {key_status}")

相关文档

  • datahub/PRODUCTION_GUIDE.md — 生产环境使用指南
  • datahub/examples_research_workflow.py — 投研工作流示例
  • datahub/quick_test.py — 核心功能测试脚本

DataHub已生产就绪,可投入实际投研工作流使用!

安全使用建议
Treat this as an incomplete review rather than a clean approval; rerun the scan where metadata.json and artifact/ can be read before installing.
能力标签
requires-sensitive-credentials
能力评估
Purpose & Capability
Workspace inspection failed before SKILL.md or artifact contents could be reviewed, so purpose-to-capability coherence could not be confirmed from artifact evidence.
Instruction Scope
Instruction scope could not be assessed because artifact text was unavailable.
Install Mechanism
Install mechanism could not be assessed because metadata and artifact files were not readable through the available sandbox.
Credentials
Environment access and proportionality could not be confirmed without artifact contents.
Persistence & Privilege
No artifact-backed evidence of persistence or privilege behavior was available to review.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install datahub-unified
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /datahub-unified 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.0.0
DataHub统一数据获取层 v1.0.0 - 首次发布,整合14个数据源,统一提供行情、财报、公告、宏观数据等获取能力。 - 实现多数据源发布-订阅架构,支持优先级路由、智能缓存与快速便捷方法(如quick_quote等)。 - 支持自然语言查询、多数据源对比验证与数据Pipeline场景。 - 内置灵活主题命名体系及详细缓存、重试与性能监控机制,适合批量与自动化投研场景。 - 覆盖A股、港股、美股及全球宏观数据源,易于扩展与本地部署。
元数据
Slug datahub-unified
版本 1.0.0
许可证 MIT-0
累计安装 0
当前安装数 0
历史版本数 1
常见问题

DataHub Unified Data Layer 是什么?

DataHub统一数据获取层 — 发布-订阅架构整合14个数据源,支持行情/财报/公告/宏观/研报/IR数据获取,智能缓存+优先级路由+模式匹配,提供quick_quote/quick_financial/quick_macro等便捷方法。触发词:获取数据、查行情、查财报、查宏观数据、DataHub、数据订阅、发... 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 68 次。

如何安装 DataHub Unified Data Layer?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install datahub-unified」即可一键安装,无需额外配置。

DataHub Unified Data Layer 是免费的吗?

是的,DataHub Unified Data Layer 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

DataHub Unified Data Layer 支持哪些平台?

DataHub Unified Data Layer 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 DataHub Unified Data Layer?

由 cgxxxxxxxxxxxx(@cgxxxxxxxxxxxx)开发并维护,当前版本 v1.0.0。

💬 留言讨论