← 返回 Skills 市场
DataHub Unified Data Layer
作者
cgxxxxxxxxxxxx
· GitHub ↗
· v1.0.0
· MIT-0
68
总下载
0
收藏
0
当前安装
1
版本数
在 OpenClaw 中安装
/install datahub-unified
功能描述
DataHub统一数据获取层 — 发布-订阅架构整合14个数据源,支持行情/财报/公告/宏观/研报/IR数据获取,智能缓存+优先级路由+模式匹配,提供quick_quote/quick_financial/quick_macro等便捷方法。触发词:获取数据、查行情、查财报、查宏观数据、DataHub、数据订阅、发...
使用说明 (SKILL.md)
DataHub 统一数据获取层
版本: v1.0
状态: 生产就绪
代码位置: \x3CYOUR_DATAHUB_PATH>(请配置您的DataHub安装路径)
When to Activate
- 用户需要获取金融数据(行情、财报、公告、宏观、研报、IR资讯)
- 用户需要多数据源对比验证
- 用户提到发布-订阅、数据中枢、DataHub
- 用户需要批量获取数据(多只股票、多个指标)
- 用户需要自然语言查询金融数据
- 用户需要构建数据Pipeline
- 触发词:获取数据、查行情、查财报、宏观经济、DataHub、数据订阅、发布订阅、数据源
快速开始
方式一:便捷函数(推荐)
import asyncio
import sys
sys.path.insert(0, "\x3CYOUR_DATAHUB_PATH>")
from datahub.production_startup import quick_quote, quick_financial, quick_macro, quick_query
async def main():
# 1. 获取A股行情
quote = await quick_quote("600519", market='cn')
print(f"贵州茅台: {quote}")
# 2. 获取港股行情
quote_hk = await quick_quote("00656", market='hk')
print(f"港股: {quote_hk}")
# 3. 获取美股行情
quote_us = await quick_quote("AAPL", market='us')
print(f"Apple: {quote_us}")
# 4. 获取财报数据
financial = await quick_financial("600519", report_type='balance')
print(f"资产负债表: {financial}")
# 5. 获取宏观数据
gdp = await quick_macro("GDP", source='fred')
print(f"GDP数据: {gdp}")
# 6. 自然语言查询
result = await quick_query("贵州茅台股价")
print(f"查询结果: {result}")
asyncio.run(main())
方式二:完整初始化
import asyncio
import sys
sys.path.insert(0, "\x3CYOUR_DATAHUB_PATH>")
from datahub import get_datahub, register_all_producers
async def main():
# 初始化DataHub
datahub = get_datahub()
# 注册所有Producer(14个数据源)
producers = register_all_producers(datahub)
print(f"已注册 {len(producers)} 个Producer")
# 订阅数据
received = asyncio.Event()
result = {}
async def callback(data):
result['data'] = data
result['status'] = 'success'
received.set()
# 订阅主题
datahub.subscribe("westock:cn:quote:600519", callback)
# 请求刷新
await datahub.request_refresh("westock:cn:quote:600519")
# 等待数据
await asyncio.wait_for(received.wait(), timeout=5.0)
print(result)
asyncio.run(main())
方式三:生产环境管理器
import asyncio
import sys
sys.path.insert(0, "\x3CYOUR_DATAHUB_PATH>")
from datahub.production_startup import get_production
async def main():
# 获取生产环境实例(自动初始化)
prod = await get_production()
# 使用便捷方法
quote = await prod.get_quote("600519")
financial = await prod.get_financial("600519", report_type='balance')
# 获取统计信息
stats = prod.get_stats()
print(f"统计: {stats}")
asyncio.run(main())
主题命名规范
格式
{producer}:{data_type}:{market}:{code}
{producer}:{data_type}:{identifier}
示例
A股数据:
westock:cn:quote:600519— 腾讯自选股A股行情a-stock-financial:balance:600519— A股财报资产负债表a-stock-financial:income:600519— A股财报利润表a-stock-financial:cashflow:600519— A股财报现金流量表cninfo:announcement:600519— 巨潮资讯公告akshare:quote:600519— AkShare行情neodata:query:贵州茅台股价— 自然语言查询
港股数据:
westock:hk:quote:00656— 港股行情hkex:annual:00656— 港交所年报irasia:news:00656— IR Asia新闻
美股数据:
yfinance:quote:AAPL— Yahoo Finance行情sec:10k:AAPL— SEC 10-K文件seekingalpha:rating:AAPL— Seeking Alpha评级
宏观数据:
fred:series:GDP— FRED GDP数据worldbank:indicator:NY.GDP.MKTP.CD— 世界银行GDPimf:dataflow:CPI— IMF CPI数据dbnomics:series:*— DBnomics数据
本地数据:
wiki:entity:贵州茅台_600519.SH— 本地Wiki实体wiki:concept:DCF估值— 本地Wiki概念wiki:raw:某报告.pdf— 本地Wiki原始文件
数据源优先级
| 优先级 | Producer | 数据源 | 类型 | API需求 |
|---|---|---|---|---|
| 1 | LocalWikiProducer | 本地Wiki | 本地 | 无 |
| 8 | CninfoProducer | 巨潮资讯 | 官方 | 无 |
| 9 | SECEdgarProducer | SEC EDGAR | 官方 | 无 |
| 9 | HKEXProducer | 港交所披露易 | 官方 | 无 |
| 10 | EastMoneyProducer | 东方财富 | 在线 | MX_APIKEY |
| 11 | THSProducer | 同花顺问财 | 在线 | IWENCAI_API_KEY |
| 12 | NeoDataProducer | NeoData | 在线 | 无 |
| 13 | WeStockProducer | 腾讯自选股 | 在线 | 无 |
| 15 | AkShareProducer | AkShare | 开源 | 无 |
| 15 | AStockFinancialProducer | A股财报 | 开源 | 无 |
| 16 | YFinanceProducer | YFinance | 开源 | 无 |
| 20 | SeekingAlphaProducer | Seeking Alpha | 第三方 | 无 |
| 30 | FREDProducer | FRED | 官方 | FRED_API_KEY |
| 31 | DBnomicsProducer | DBnomics | 官方 | 无 |
| 32 | WorldBankProducer | 世界银行 | 官方 | 无 |
| 33 | IMFProducer | IMF | 官方 | IMF_COOKIES |
| 40 | AlphaVantageProducer | Alpha Vantage | 第三方 | ALPHAVANTAGE_API_KEY |
| 50 | IRWebsiteProducer | IR网站 | IR | 无 |
| 50 | IRAsiaProducer | IR Asia | IR | 无 |
优先级规则:数字越小优先级越高。本地数据 > 官方数据 > 在线数据 > 开源数据 > 第三方数据
核心功能
1. 发布-订阅机制
from datahub import get_datahub, register_all_producers
datahub = get_datahub()
register_all_producers(datahub)
# 精确订阅
def callback(data):
print(f"收到数据: {data}")
datahub.subscribe("westock:cn:quote:600519", callback)
# 模式订阅(通配符)
datahub.subscribe_pattern("westock:cn:quote:*", callback)
# 请求刷新
await datahub.request_refresh("westock:cn:quote:600519")
2. 缓存系统
from datahub import get_preconfigured_cache
# 预配置缓存类型
cache = get_preconfigured_cache('quote') # 5分钟TTL
# 缓存操作
await cache.set("600519", {"price": 1800})
data = await cache.get("600519")
stats = cache.stats() # {'size': 1, 'max_size': 500, 'ttl': 300}
预配置缓存:
| 类型 | TTL | 最大条目 | 适用场景 |
|---|---|---|---|
quote |
5分钟 | 500 | 实时行情 |
financial |
1小时 | 200 | 财务报表 |
announcement |
30分钟 | 300 | 公司公告 |
macro |
1天 | 100 | 宏观数据 |
research |
2小时 | 200 | 研报评级 |
ir |
1小时 | 200 | IR资讯 |
3. 重试机制
from datahub import with_retry
@with_retry(max_retries=3, base_delay=1.0, max_delay=60.0)
async def fetch_data():
# 自动重试(指数退避:1秒、2秒、4秒)
return await api_call()
4. 性能监控
stats = datahub.get_stats()
print(f"总发布数: {stats['total_publishes']}")
print(f"总错误数: {stats['total_errors']}")
print(f"主题数: {stats['topics_count']}")
print(f"Producer数: {stats['producers_count']}")
常用场景
场景1:批量获取行情
import asyncio
from datahub.production_startup import get_production
async def batch_quotes(codes: list) -> dict:
"""批量获取行情"""
prod = await get_production()
results = {}
tasks = []
for code in codes:
tasks.append(prod.get_quote(code, market='cn'))
quotes = await asyncio.gather(*tasks)
for code, quote in zip(codes, quotes):
if quote['status'] == 'success':
results[code] = quote['data']
return results
# 用法
async def main():
quotes = await batch_quotes(["600519", "000858", "601318"])
for code, data in quotes.items():
print(f"{code}: {data}")
asyncio.run(main())
场景2:多数据源验证
import asyncio
from datahub import get_datahub, register_all_producers
async def cross_verify_quote(code: str) -> dict:
"""多数据源验证行情"""
datahub = get_datahub()
register_all_producers(datahub)
sources = {
'westock': f"westock:cn:quote:{code}",
'akshare': f"akshare:quote:{code}",
}
results = {}
for source_name, topic in sources.items():
received = asyncio.Event()
result = {}
async def callback(data):
result['data'] = data
result['status'] = 'success'
received.set()
datahub.subscribe(topic, callback)
await datahub.request_refresh(topic)
try:
await asyncio.wait_for(received.wait(), timeout=5.0)
results[source_name] = result.get('data')
except asyncio.TimeoutError:
results[source_name] = None
return results
场景3:投研工作流集成
import asyncio
from datahub.production_startup import get_production
async def stock_analysis(code: str) -> dict:
"""个股全面分析"""
prod = await get_production()
# 并发获取多维度数据
quote_task = prod.get_quote(code, market='cn')
balance_task = prod.get_financial(code, report_type='balance')
income_task = prod.get_financial(code, report_type='income')
quote, balance, income = await asyncio.gather(
quote_task, balance_task, income_task
)
return {
'code': code,
'quote': quote.get('data') if quote['status'] == 'success' else None,
'balance': balance.get('data') if balance['status'] == 'success' else None,
'income': income.get('data') if income['status'] == 'success' else None,
}
场景4:自然语言查询
from datahub.production_startup import quick_query
async def main():
# 自然语言查询
result = await quick_query("贵州茅台股价")
print(result)
result = await quick_query("沪深300市盈率")
print(result)
result = await quick_query("中国GDP增长率")
print(result)
asyncio.run(main())
场景5:宏观数据追踪
from datahub.production_startup import quick_macro
async def macro_dashboard() -> dict:
"""宏观经济仪表盘"""
indicators = {
'GDP': ('GDP', 'fred'),
'CPI': ('CPIAUCSL', 'fred'),
'Unemployment': ('UNRATE', 'fred'),
}
results = {}
tasks = []
for name, (code, source) in indicators.items():
tasks.append(quick_macro(code, source))
data = await asyncio.gather(*tasks)
for (name, _), result in zip(indicators.items(), data):
results[name] = result
return results
API密钥配置
环境变量
# 同花顺问财
IWENCAI_API_KEY=your_key_here
# FRED美联储
FRED_API_KEY=your_key_here
# Alpha Vantage
ALPHAVANTAGE_API_KEY=your_key_here
# 东方财富妙想
MX_APIKEY=your_key_here
# IMF Cookies
IMF_COOKIES_BM_SV=your_cookie_here
IMF_COOKIES_AUTH=your_cookie_here
IMF_COOKIES_BM_S=your_cookie_here
加载配置
from dotenv import load_dotenv
load_dotenv()
from datahub import init_datahub, register_all_producers
datahub = init_datahub()
producers = register_all_producers(datahub)
错误处理
错误类型
| 错误类型 | 原因 | 处理方式 |
|---|---|---|
timeout |
网络超时 | 自动重试 |
not_found |
主题无匹配Producer | 返回错误 |
api_error |
API密钥失效/配额耗尽 | 记录日志 |
parse_error |
数据格式异常 | 记录日志 |
错误处理示例
from datahub.production_startup import quick_quote
async def safe_get_quote(code: str) -> dict:
"""安全的行情获取"""
try:
result = await quick_quote(code, market='cn')
if result['status'] == 'success':
return result['data']
elif result['status'] == 'timeout':
print(f"获取 {code} 超时")
return None
else:
print(f"获取 {code} 失败: {result.get('error')}")
return None
except Exception as e:
print(f"异常: {e}")
return None
性能指标
| 操作 | 耗时 | 说明 |
|---|---|---|
| DataHub初始化 | \x3C10ms | 单例模式 |
| Producer注册 | \x3C50ms | 14个Producer |
| 订阅主题 | \x3C1ms | 模式匹配 |
| 缓存命中 | \x3C10ms | LRU缓存 |
| 首次数据获取 | 0.1-2s | 网络请求 |
| 批量获取(10只) | 0.1-0.5s | 并发请求 |
故障排查
Q: Producer未注册
# 检查Producer注册状态
from datahub import get_datahub, register_all_producers
datahub = get_datahub()
producers = register_all_producers(datahub)
print(f"已注册Producer: {len(producers)}")
Q: 主题匹配失败
# 检查主题匹配
from datahub import get_datahub, register_all_producers
datahub = get_datahub()
register_all_producers(datahub)
topic = "westock:cn:quote:600519"
matched = datahub.pattern_index.match(topic)
print(f"匹配到的Producer: {len(matched)}")
for producer in matched:
print(f" - {producer.__class__.__name__}")
Q: API密钥未配置
# 检查API配置
from datahub.registry import api_registry
for name, config in api_registry.apis.items():
status = "✅" if config.enabled else "❌"
key_status = "已配置" if config.api_key else "未配置"
print(f"{status} {name}: {key_status}")
相关文档
datahub/PRODUCTION_GUIDE.md— 生产环境使用指南datahub/examples_research_workflow.py— 投研工作流示例datahub/quick_test.py— 核心功能测试脚本
DataHub已生产就绪,可投入实际投研工作流使用!
安全使用建议
Treat this as an incomplete review rather than a clean approval; rerun the scan where metadata.json and artifact/ can be read before installing.
能力标签
能力评估
Purpose & Capability
Workspace inspection failed before SKILL.md or artifact contents could be reviewed, so purpose-to-capability coherence could not be confirmed from artifact evidence.
Instruction Scope
Instruction scope could not be assessed because artifact text was unavailable.
Install Mechanism
Install mechanism could not be assessed because metadata and artifact files were not readable through the available sandbox.
Credentials
Environment access and proportionality could not be confirmed without artifact contents.
Persistence & Privilege
No artifact-backed evidence of persistence or privilege behavior was available to review.
如何使用
- 确保已安装 OpenClaw(本地或 Docker 部署)
- 在对话框中输入安装命令:
/install datahub-unified - 安装完成后,直接呼叫该 Skill 的名称或使用
/datahub-unified触发 - 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.0.0
DataHub统一数据获取层 v1.0.0
- 首次发布,整合14个数据源,统一提供行情、财报、公告、宏观数据等获取能力。
- 实现多数据源发布-订阅架构,支持优先级路由、智能缓存与快速便捷方法(如quick_quote等)。
- 支持自然语言查询、多数据源对比验证与数据Pipeline场景。
- 内置灵活主题命名体系及详细缓存、重试与性能监控机制,适合批量与自动化投研场景。
- 覆盖A股、港股、美股及全球宏观数据源,易于扩展与本地部署。
元数据
常见问题
DataHub Unified Data Layer 是什么?
DataHub统一数据获取层 — 发布-订阅架构整合14个数据源,支持行情/财报/公告/宏观/研报/IR数据获取,智能缓存+优先级路由+模式匹配,提供quick_quote/quick_financial/quick_macro等便捷方法。触发词:获取数据、查行情、查财报、查宏观数据、DataHub、数据订阅、发... 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 68 次。
如何安装 DataHub Unified Data Layer?
在 OpenClaw 或 Claude Code 对话框中运行命令「/install datahub-unified」即可一键安装,无需额外配置。
DataHub Unified Data Layer 是免费的吗?
是的,DataHub Unified Data Layer 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。
DataHub Unified Data Layer 支持哪些平台?
DataHub Unified Data Layer 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。
谁开发了 DataHub Unified Data Layer?
由 cgxxxxxxxxxxxx(@cgxxxxxxxxxxxx)开发并维护,当前版本 v1.0.0。
推荐 Skills