← 返回 Skills 市场
wpank

Dual Stream Architecture

作者 wpank · GitHub ↗ · v1.0.0
cross-platform ✓ 安全检测通过
802
总下载
0
收藏
3
当前安装
1
版本数
在 OpenClaw 中安装
/install dual-stream-architecture
功能描述
Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.
使用说明 (SKILL.md)

Dual-Stream Architecture

Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install dual-stream-architecture

When to Use

  • Event-driven systems needing both durability AND real-time
  • WebSocket/SSE backends that push live updates
  • Dashboards showing events as they happen
  • Kafka consumers have lag but users expect instant updates

Core Pattern

type DualPublisher struct {
    kafka  *kafka.Writer
    redis  *redis.Client
    logger *slog.Logger
}

func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
    // 1. Kafka: Critical path - must succeed
    payload, _ := json.Marshal(event)
    err := p.kafka.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.SourceID),
        Value: payload,
    })
    if err != nil {
        return fmt.Errorf("kafka publish failed: %w", err)
    }

    // 2. Redis: Best-effort - don't fail the operation
    p.publishToRedis(ctx, event)

    return nil
}

func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
    // Lightweight payload (full event in Kafka)
    notification := map[string]interface{}{
        "id":        event.ID,
        "type":      event.Type,
        "source_id": event.SourceID,
    }

    payload, _ := json.Marshal(notification)
    channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)

    // Fire and forget - log errors but don't propagate
    if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
        p.logger.Warn("redis publish failed", "error", err)
    }
}

Architecture

┌──────────────┐     ┌─────────────────┐     ┌──────────────┐
│   Ingester   │────▶│  DualPublisher  │────▶│    Kafka     │──▶ Event Processor
│              │     │                 │     │  (durable)   │
└──────────────┘     │                 │     └──────────────┘
                     │                 │     ┌──────────────┐
                     │                 │────▶│ Redis PubSub │──▶ WebSocket Gateway
                     │                 │     │ (real-time)  │
                     └─────────────────┘     └──────────────┘

Channel Naming Convention

events:{source_type}:{source_id}

Examples:
- events:user:octocat      - Events for user octocat
- events:repo:owner/repo   - Events for a repository
- events:org:microsoft     - Events for an organization

Batch Publishing

For high throughput:

func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
    // 1. Batch to Kafka
    messages := make([]kafka.Message, len(events))
    for i, event := range events {
        payload, _ := json.Marshal(event)
        messages[i] = kafka.Message{
            Key:   []byte(event.SourceID),
            Value: payload,
        }
    }

    if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
        return fmt.Errorf("kafka batch failed: %w", err)
    }

    // 2. Redis: Pipeline for efficiency
    pipe := p.redis.Pipeline()
    for _, event := range events {
        channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
        notification, _ := json.Marshal(map[string]interface{}{
            "id":   event.ID,
            "type": event.Type,
        })
        pipe.Publish(ctx, channel, notification)
    }
    
    if _, err := pipe.Exec(ctx); err != nil {
        p.logger.Warn("redis batch failed", "error", err)
    }

    return nil
}

Decision Tree

Requirement Stream Why
Must not lose event Kafka only Ack required, replicated
User sees immediately Redis only Sub-ms delivery
Both durability + real-time Dual stream This pattern
High volume (>10k/sec) Kafka, batch Redis Redis can bottleneck
Many subscribers per channel Redis + local fan-out Don't hammer Redis

Related Skills


NEVER Do

  • NEVER fail on Redis errors — Redis is best-effort. Log and continue.
  • NEVER send full payload to Redis — Send IDs only, clients fetch from API.
  • NEVER create one Redis channel per event — Use source-level channels.
  • NEVER skip Kafka for "unimportant" events — All events go to Kafka for replay.
  • NEVER use Redis Pub/Sub for persistence — Messages are fire-and-forget.

Edge Cases

Case Solution
Redis down Log warning, continue with Kafka only
Client connects mid-stream Query API for recent events, then subscribe
High channel cardinality Use wildcard patterns or aggregate channels
Kafka backpressure Buffer in memory with timeout, fail if full
Need event replay Consume from Kafka from offset, not Redis
安全使用建议
This is an architecture/implementation guide (no executable code bundled), and overall it is coherent with its purpose. Before installing or following the README examples: 1) Verify the origin of any npx or GitHub URL used — don't run install commands that fetch code from untrusted or unclear sources. 2) Expect to provide Kafka and Redis connection configuration (host, port, TLS settings, credentials) in your application — the skill doesn't manage or request them. 3) When you copy or install the skill manually, review the files you pull into your environment. 4) If you intend to let an autonomous agent use this skill, it's low-risk because it doesn't request secrets, but still avoid granting the agent network/credential access to your Kafka/Redis clusters unless you trust it. 5) If you need a packaged/official release, prefer installs from a verified registry or a trusted repository rather than an ad-hoc tree URL.
功能分析
Type: OpenClaw Skill Name: dual-stream-architecture Version: 1.0.0 The skill bundle describes a dual-stream event publishing architecture using Kafka and Redis. The `SKILL.md` and `README.md` files provide documentation, code examples, and installation instructions. There is no evidence of prompt injection against the AI agent, nor any malicious code attempting data exfiltration, unauthorized execution, or persistence. The Go code snippets are illustrative and do not contain risky operations. Installation methods include `npx clawhub@latest install` and direct `cp -r` commands, which are standard for skill integration. While `npx add` from a GitHub URL is mentioned, it's not inherently malicious and likely a legitimate installation path within this ecosystem.
能力评估
Purpose & Capability
The name/description match the included content: SKILL.md is a design/implementation guide for publishing to Kafka (durability) and Redis Pub/Sub (real-time). The code examples and guidance are coherent with the described purpose.
Instruction Scope
SKILL.md contains code snippets, architecture diagrams, and operational guidance only. It does not instruct the agent to read unrelated system files, exfiltrate data, or call external endpoints outside of normal installation guidance. The README contains manual copy/install suggestions (including paths under the user's home) but these are standard packaging instructions rather than runtime data collection.
Install Mechanism
There is no declared install spec in the skill metadata (instruction-only), which is low-risk. The README suggests using npx to fetch/install the package and includes a GitHub tree-style URL in an example; pulling code over npx or using non-standard URLs is a general supply-chain risk — verify the source before running those commands.
Credentials
The skill declares no required environment variables or credentials, which is appropriate for an instruction-only pattern. However, the examples assume Kafka and Redis clients (which in real deployments require connection strings / credentials). The skill does not declare or attempt to access credentials itself — you will need to supply those when you implement the pattern.
Persistence & Privilege
always:false and no config paths or installation hooks are declared. The skill does not request persistent presence or elevated privileges and does not modify other skills or global agent settings.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install dual-stream-architecture
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /dual-stream-architecture 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.0.0
Initial release of the dual-stream-architecture skill: publish events to Kafka and Redis Pub/Sub for both durability and real-time delivery. - Provides a Go implementation of simultaneous event publishing to Kafka (guaranteed delivery) and Redis Pub/Sub (sub-ms updates). - Includes usage recommendations, example code, channel conventions, and a decision tree for selecting stream strategies. - Outlines best practices (e.g., Redis as best-effort, channel naming, batching for throughput). - Documents critical edge cases and common pitfalls. - Links to related skills for real-time dashboards and service architectures.
元数据
Slug dual-stream-architecture
版本 1.0.0
许可证
累计安装 3
当前安装数 3
历史版本数 1
常见问题

Dual Stream Architecture 是什么?

Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 802 次。

如何安装 Dual Stream Architecture?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install dual-stream-architecture」即可一键安装,无需额外配置。

Dual Stream Architecture 是免费的吗?

是的,Dual Stream Architecture 完全免费(开源免费),可自由下载、安装和使用。

Dual Stream Architecture 支持哪些平台?

Dual Stream Architecture 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 Dual Stream Architecture?

由 wpank(@wpank)开发并维护,当前版本 v1.0.0。

💬 留言讨论