← 返回 Skills 市场
ugpoor

DolphinDB 流式计算技能

作者 superStupidBear · GitHub ↗ · v1.4.0 · MIT-0
cross-platform ⚠ suspicious
171
总下载
0
收藏
1
当前安装
6
版本数
在 OpenClaw 中安装
/install dolphindb-streaming
功能描述
提供基于DolphinDB的金融场景流式计算能力,支持实时行情、因子计算、风控及订单簿等实时数据流处理。
使用说明 (SKILL.md)

DolphinDB 流式计算技能 v1.1.3

🚨 强制流程:使用前必须加载环境

无论在何种场景下调用此技能(单独运行或被引用),必须先执行环境检测:

# 方法 1: 在技能目录内运行(推荐)
cd ~/.jvs/.openclaw/workspace/skills/\x3Cskill-name>
source ../dolphindb-skills/scripts/dolphin_wrapper.sh

# 方法 2: 在任何位置运行(推荐)
source ~/.jvs/.openclaw/workspace/skills/dolphindb-skills/scripts/dolphin_global.sh

# 方法 3: 手动检测
python3 ~/.jvs/.openclaw/workspace/skills/dolphindb-skills/scripts/init_dolphindb_env.py

验证环境:

$DOLPHINDB_PYTHON_BIN -c "import dolphindb; print(dolphindb.__version__)"

# 或使用包装器命令
dolphin_python -c "import dolphindb; print(dolphindb.__version__)"

重要: 详见 dolphindb-skills/USAGE_GUIDE.md


⚠️ 前置依赖

本技能依赖 dolphindb-basic 技能,请先安装:

clawhub install dolphindb-basic

运行前初始化需确保 Python 环境有 DolphinDB SDK,详细方法可参见 dolphindb-skills 技能。

# 加载环境检测器(相对路径,技能安装后自动可用)
source ../dolphindb-skills/scripts/load_dolphindb_env.sh

# 查看环境信息
dolphin_env_info

# 验证 SDK 已安装
dolphin_python -c "import dolphindb; print('SDK 版本:', dolphindb.__version__)"

统一调用接口:

dolphin_python script.py    # 运行 Python 脚本
dolphin_pip install pkg     # 安装包

重要:所有 DolphinDB 脚本在 Python 中的调用方式

import dolphindb as ddb

# 1. 建立连接
s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 2. 执行 DolphinDB 脚本(所有数据库操作都通过 s.run())
s.run('''
    // DolphinDB 脚本
    share streamTable(10000:0, `time`sym`price`volume, 
        [TIMESTAMP,SYMBOL,DOUBLE,LONG]) as tickStream
''')

# 3. 关闭连接
s.close()

描述

提供 DolphinDB 流数据计算能力,专注于实时行情处理、实时因子计算、实时风控等金融场景的流式计算解决方案。

触发条件

当用户提到以下关键词时触发此技能:

  • "实时计算"、"流式计算"、"streaming"
  • "实时行情"、"tick 数据"、"逐笔数据"
  • "实时因子"、"实时指标"
  • "流数据表"、"streamTable"
  • "流计算引擎"、"reactor"
  • "实时风控"、"实时监控"
  • "消息订阅"、"subscribeTable"

能力范围

1. 流数据表管理

  • 创建流数据表
  • 流表持久化
  • 流表数据发布/订阅

2. 流计算引擎

  • 响应式状态引擎
  • 聚合引擎
  • 订单簿引擎
  • OHLC 引擎
  • 自定义流计算引擎

3. 实时因子计算

  • 分钟频实时因子
  • 高频实时因子
  • Level-2 实时指标
  • 资金流实时计算

4. 实时行情处理

  • 实时 K 线合成
  • 实时订单簿合成
  • 实时涨跌停监控
  • 实时涨幅榜计算

5. 实时风控

  • 实时持仓监控
  • 实时盈亏计算
  • 实时风险指标
  • 异常交易检测

使用示例

1. 创建流数据表

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 创建共享流数据表
s.run('''
    share streamTable(10000:0, `time`sym`price`volume`bsFlag, 
        [TIMESTAMP,SYMBOL,DOUBLE,LONG,CHAR]) as tickStream
''')

# 创建持久化流表(带数据持久化)
s.run('''
    streamTable(
        windowSize=1000000,
        schema=table(1:0, `time`sym`price`volume, [TIMESTAMP,SYMBOL,DOUBLE,LONG]),
        persistenceDir="/data/stream/persistence",
        name=`persistentTickStream
    )
''')

s.close()

2. 响应式状态引擎

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 定义状态更新函数并创建引擎
s.run('''
    def updateState(state, newData){
        // 更新最新价格
        state[`lastPrice] = newData.price
        // 更新累计成交量
        state[`totalVolume] = coalesce(state[`totalVolume], 0) + newData.volume
        // 更新最高最低价
        state[`highPrice] = max(coalesce(state[`highPrice], 0), newData.price)
        state[`lowPrice] = min(coalesce(state[`lowPrice], DOUBLE_MAX), newData.price)
        return state
    }
    
    engine = createReactiveStateEngine(
        name=`stateEngine,
        streamTableNames=`tickStream,
        handler=updateState,
        keyColumn=`sym,
        initState=dict(`lastPrice`totalVolume`highPrice`lowPrice, 
                       [0.0,0L,0.0,DOUBLE_MAX])
    )
''')

s.close()

3. 实时 K 线合成引擎

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 创建 OHLC 流计算引擎
s.run('''
    ohlcEngine = createOHLEngine(
        name=`minuteOHLC,
        streamTableNames=`tickStream,
        timeColumn=`time,
        groupingColumn=`sym,
        freq=60000,  // 1 分钟 K 线
        metrics=`open`high`low`close`volume`turnover,
        outputTable=share(streamTable(10000:0, 
            `barTime`sym`open`high`low`close`volume`turnover,
            [TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE]), `ohlcOutput)
    )
''')

# 订阅 K 线输出
s.run('''
    def processOHLC(data){
        // 处理 K 线数据(如发送到前端、存储等)
        print("New OHLC bar: " + data)
    }
    
    subscribeTable(
        tableName=`ohlcOutput,
        actionName=`processOHLC,
        offset=0,
        handler=processOHLC
    )
''')

s.close()

4. 实时订单簿引擎

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 创建订单簿引擎(基于逐笔数据合成)
s.run('''
    obEngine = createOrderBookEngine(
        name=`orderBookEngine,
        streamTableNames=`tickStream,
        timeColumn=`time,
        symColumn=`sym,
        priceColumn=`price,
        volumeColumn=`volume,
        bsFlagColumn=`bsFlag,  // 'B'=买,'S'=卖
        level=10,  // 10 档行情
        outputTable=share(streamTable(10000:0,
            `time`sym`bid1`bidSz1`ask1`askSz1`bid2`bidSz2`ask2`askSz2,
            [TIMESTAMP,SYMBOL,DOUBLE,LONG,DOUBLE,LONG,DOUBLE,LONG,DOUBLE,LONG]), `obOutput)
    )
''')

# 订阅订单簿更新
s.run('''
    def processOrderBook(ob){
        // 计算买卖压力指标
        bidPressure = sum(ob[`bidSz1], ob[`bidSz2]) 
        askPressure = sum(ob[`askSz1], ob[`askSz2])
        imbalance = (bidPressure - askPressure) / (bidPressure + askPressure)
        
        if abs(imbalance) > 0.5 {
            print("Large imbalance detected for " + ob.sym + ": " + imbalance)
        }
    }
    
    subscribeTable(
        tableName=`obOutput,
        actionName=`processOB,
        offset=0,
        handler=processOrderBook
    )
''')

s.close()

5. 实时因子计算引擎

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 定义实时因子计算函数并创建引擎
s.run('''
    def calcRealtimeFactors(tickData){
        return select 
            sym,
            // VWAP(成交量加权均价)
            wap(price, volume) as vwap,
            // 价格动量
            price / last(price, 10) - 1 as momentum,
            // 成交量比率
            sum(volume) / mavg(sum(volume), 20) as volumeRatio,
            // 价格波动
            std(price, 60) / last(price) as volatility,
            // 买卖压力
            sum(iif(bsFlag=='B', volume, 0)) / sum(volume) as buyRatio
        from tickData
        group by sym, time_bar(60000, time) as minute
    }
    
    factorEngine = createStreamEngine(
        name=`realtimeFactorEngine,
        handler=calcRealtimeFactors,
        streamTableNames=`tickStream,
        windowSize=1000,
        timeColumn=`time,
        groupingColumn=`sym,
        outputTable=share(streamTable(10000:0,
            `minute`sym`vwap`momentum`volumeRatio`volatility`buyRatio,
            [TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE]), `factorOutput)
    )
''')

s.close()

6. 实时资金流计算

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 实时计算分钟资金流
s.run('''
    def calcCapitalFlow(tickData){
        // 定义主买主卖判断
        update tickData set 
            isBuy = iif(
                bsFlag=='B' or 
                (price > prev(price, 1) and volume > prev(volume, 1)), 
                1, 0
            ),
            isSell = iif(
                bsFlag=='S' or 
                (price \x3C prev(price, 1) and volume > prev(volume, 1)), 
                1, 0
            )
        
        // 计算资金流
        return select 
            sym,
            sum(iif(isBuy, price*volume, 0)) as buyFlow,
            sum(iif(isSell, price*volume, 0)) as sellFlow,
            sum(price*volume) as totalFlow,
            (sum(iif(isBuy, price*volume, 0)) - sum(iif(isSell, price*volume, 0))) 
                / sum(price*volume) as netFlowRatio
        from tickData
        group by sym, time_bar(60000, time) as minute
    }
    
    capitalFlowEngine = createStreamEngine(
        name=`capitalFlowEngine,
        handler=calcCapitalFlow,
        streamTableNames=`tickStream,
        windowSize=1000,
        timeColumn=`time,
        groupingColumn=`sym
    )
''')

s.close()

7. 实时涨幅榜计算

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 实时计算涨幅榜
s.run('''
    def calcPriceChangeRank(tickData){
        // 获取昨日收盘价(从维度表或缓存中)
        prevClose = loadPrevClose()
        
        return select 
            top 50 sym,
            last(price) as lastPrice,
            last(price) / prevClose[sym] - 1 as changePct,
            sum(volume) as volume,
            sum(price*volume) as turnover
        from tickData
        where time >= today() + 09:30:00m
        group by sym
        order by changePct desc
        limit 50
    }
    
    rankEngine = createStreamEngine(
        name=`priceChangeRankEngine,
        handler=calcPriceChangeRank,
        streamTableNames=`tickStream,
        windowSize=10000,
        timeColumn=`time,
        triggeringPattern='interval',
        triggeringInterval=5000  // 每 5 秒更新一次
    )
''')

s.close()

8. 实时风控监控

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 实时持仓监控
s.run('''
    def realtimeRiskMonitor(positionData, marketData){
        // 计算实时盈亏
        update positionData set 
            marketValue = position * marketData.price,
            pnl = (marketData.price - avgCost) * position,
            pnlRatio = (marketData.price - avgCost) / avgCost
        
        // 风险检查
        riskAlerts = select 
            sym,
            pnl,
            pnlRatio,
            marketValue,
            iif(abs(pnlRatio) > 0.05, "WARNING", "OK") as status
        from positionData
        where abs(pnlRatio) > 0.02  // 盈亏超过 2% 触发检查
        
        if size(riskAlerts) > 0 {
            sendAlert(riskAlerts)  // 发送风控警报
        }
        
        return positionData
    }
    
    riskEngine = createStreamEngine(
        name=`riskMonitorEngine,
        handler=realtimeRiskMonitor,
        streamTableNames=[`positionStream, `marketStream],
        windowSize=1000,
        timeColumn=`time
    )
''')

s.close()

9. 多引擎协同工作

import dolphindb as ddb

s = ddb.session()
s.connect(host="localhost", port=8848, userid="admin", password="123456")

# 构建完整的实时计算流水线
s.run('''
    // 1. 原始 tick 数据流
    share streamTable(100000:0, `time`sym`price`volume`bsFlag, 
        [TIMESTAMP,SYMBOL,DOUBLE,LONG,CHAR]) as tickStream
    
    // 2. OHLC 引擎
    ohlcEngine = createOHLEngine(
        name=`ohlcEngine,
        streamTableNames=`tickStream,
        freq=60000,
        metrics=`open`high`low`close`volume
    )
    
    // 3. 因子计算引擎(基于 OHLC)
    factorEngine = createStreamEngine(
        name=`factorEngine,
        handler=calcFactors,
        streamTableNames=`ohlcOutput,
        windowSize=100
    )
    
    // 4. 信号生成引擎
    signalEngine = createStreamEngine(
        name=`signalEngine,
        handler=generateSignals,
        streamTableNames=`factorOutput,
        windowSize=10
    )
    
    // 5. 执行引擎
    execEngine = createStreamEngine(
        name=`execEngine,
        handler=executeOrders,
        streamTableNames=`signalOutput,
        windowSize=10
    )
''')

s.close()

流计算引擎类型

1. 响应式状态引擎(ReactiveStateEngine)

  • 维护每个 key 的状态
  • 适用于累计计算、最新值跟踪

2. 聚合引擎(AggregationEngine)

  • 时间窗口聚合
  • 适用于 K 线合成、统计指标

3. 订单簿引擎(OrderBookEngine)

  • 基于逐笔合成订单簿
  • 适用于 Level-2 行情处理

4. OHLC 引擎

  • 专门用于 K 线合成
  • 高性能、低延迟

5. 自定义流引擎(StreamEngine)

  • 自定义处理逻辑
  • 最灵活

性能优化

窗口大小

  • 根据数据频率和内存调整
  • tick 数据:1000-10000
  • 分钟数据:100-1000

触发模式

  • row: 每行触发(最低延迟)
  • batch: 批量触发(更高吞吐)
  • interval: 定时触发(可控频率)

数据持久化

  • 启用 persistenceDir 防止数据丢失
  • 配置适当的 windowSize 避免内存溢出

参考文档


相关技能

  • dolphindb-skills: 技能套件索引(含环境检测)
  • dolphindb-basic: DolphinDB 基础 CRUD 操作
  • dolphindb-quant-finance: 量化金融场景
  • dolphindb-docker: Docker 容器化部署
安全使用建议
This skill appears to be a DolphinDB streaming helper, but before installing or sourcing anything you should: (1) inspect the scripts it tells you to source (~/.jvs/.openclaw/workspace/skills/dolphindb-skills/scripts/* and ../dolphindb-skills) — do not 'source' them blind because sourcing runs arbitrary shell code; (2) verify the origin and contents of the 'dolphindb-basic' skill and any wrapper scripts (dolphin_wrapper.sh, dolphin_global.sh, init_dolphindb_env.py); (3) ensure $DOLPHINDB_PYTHON_BIN and any wrapper binaries are what you expect and not malicious shims; (4) run initial tests in an isolated environment or container and avoid using real credentials until you confirm behavior; and (5) request the skill author/source or a full package that includes the referenced scripts and a declared dependency list so you can review them — if the referenced scripts are provided in-package or come from a reputable repository and the environment variables are declared, the risk would be lower.
功能分析
Type: OpenClaw Skill Name: dolphindb-streaming Version: 1.4.0 The skill provides legitimate instructions and code examples for real-time streaming calculations using DolphinDB. It includes standard setup procedures, such as sourcing environment scripts from a dependency skill (dolphindb-skills) and using the DolphinDB Python SDK. No evidence of malicious intent, data exfiltration, or unauthorized execution was found in SKILL.md or _meta.json.
能力评估
Purpose & Capability
Name/description and the SKILL.md content are consistent: the skill focuses on DolphinDB stream processing and shows example APIs and engines that match that purpose. However, the skill claims no required env/config but depends heavily on an external 'dolphindb-basic' skill and a particular workspace layout (~/.jvs/.openclaw/workspace/skills/dolphindb-skills), which is not declared in the registry metadata.
Instruction Scope
The instructions mandate sourcing shell scripts and running a Python init script from ~/.jvs/.openclaw/workspace/skills/dolphindb-skills/scripts (relative paths outside this skill). Sourcing those scripts will execute arbitrary shell code in the user's environment. The doc also references $DOLPHINDB_PYTHON_BIN and wrapper commands (dolphin_python) that are not declared by the skill, and tells the agent to install/require another skill (clawhub install dolphindb-basic). These operations expand scope beyond the skill's own files and give external code execution rights.
Install Mechanism
There is no install spec or code included (instruction-only), which minimizes direct disk writes by this package. However, the runtime instructions instruct users/agents to run shell 'source' and python scripts from a specific workspace location and to install another skill via 'clawhub', so the effective install/runtime footprint depends on those external artifacts rather than this skill bundle.
Credentials
Registry metadata lists no required env vars or credentials, but the instructions reference $DOLPHINDB_PYTHON_BIN, wrapper commands, and recommend connecting to DolphinDB with host/user/password examples. The skill therefore implicitly requires environment setup and potentially user credentials, yet these are not declared — an inconsistency that could hide credential usage or exfiltration paths if the sourced scripts perform network actions.
Persistence & Privilege
The skill does not request always:true and is user-invocable only. It does instruct installing or sourcing external scripts, but it does not itself demand persistent system-wide privileges or modify other skills' configs in the package as provided.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install dolphindb-streaming
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /dolphindb-streaming 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.4.0
v1.4.0:添加强制环境检测流程,支持全局包装器调用
v1.1.3
环境检测脚本路径修复,添加 dolphindb-basic 依赖提示
v1.1.2
- 增加了 Python 环境依赖和初始化说明,需首先检测并配置 Python DolphinDB SDK 环境。 - 所有 DolphinDB 脚本用 Python 示例重写,统一通过 Python SDK 调用。 - 新增了如何通过 Python SDK 建立连接、执行脚本和关闭连接的标准用法。 - 其余技能功能与原有版本保持一致,提升了多语言适配性和易用性。
v1.1.1
v1.1.1 - 集成环境自动检测
v1.1.0
Fix: Replace local file paths with official DolphinDB documentation links
v1.0.0
Initial release: Streaming computation for real-time factor calculation and market data processing
元数据
Slug dolphindb-streaming
版本 1.4.0
许可证 MIT-0
累计安装 1
当前安装数 1
历史版本数 6
常见问题

DolphinDB 流式计算技能 是什么?

提供基于DolphinDB的金融场景流式计算能力,支持实时行情、因子计算、风控及订单簿等实时数据流处理。 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 171 次。

如何安装 DolphinDB 流式计算技能?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install dolphindb-streaming」即可一键安装,无需额外配置。

DolphinDB 流式计算技能 是免费的吗?

是的,DolphinDB 流式计算技能 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

DolphinDB 流式计算技能 支持哪些平台?

DolphinDB 流式计算技能 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 DolphinDB 流式计算技能?

由 superStupidBear(@ugpoor)开发并维护,当前版本 v1.4.0。

💬 留言讨论