← Back to Skills Marketplace
wpank

Dual Stream Architecture

by wpank · GitHub ↗ · v1.0.0
cross-platform ✓ Security Clean
802
Downloads
0
Stars
3
Active Installs
1
Versions
Install in OpenClaw
/install dual-stream-architecture
Description
Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.
README (SKILL.md)

Dual-Stream Architecture

Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install dual-stream-architecture

When to Use

  • Event-driven systems needing both durability AND real-time
  • WebSocket/SSE backends that push live updates
  • Dashboards showing events as they happen
  • Kafka consumers have lag but users expect instant updates

Core Pattern

type DualPublisher struct {
    kafka  *kafka.Writer
    redis  *redis.Client
    logger *slog.Logger
}

func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
    // 1. Kafka: Critical path - must succeed
    payload, _ := json.Marshal(event)
    err := p.kafka.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.SourceID),
        Value: payload,
    })
    if err != nil {
        return fmt.Errorf("kafka publish failed: %w", err)
    }

    // 2. Redis: Best-effort - don't fail the operation
    p.publishToRedis(ctx, event)

    return nil
}

func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
    // Lightweight payload (full event in Kafka)
    notification := map[string]interface{}{
        "id":        event.ID,
        "type":      event.Type,
        "source_id": event.SourceID,
    }

    payload, _ := json.Marshal(notification)
    channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)

    // Fire and forget - log errors but don't propagate
    if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
        p.logger.Warn("redis publish failed", "error", err)
    }
}

Architecture

┌──────────────┐     ┌─────────────────┐     ┌──────────────┐
│   Ingester   │────▶│  DualPublisher  │────▶│    Kafka     │──▶ Event Processor
│              │     │                 │     │  (durable)   │
└──────────────┘     │                 │     └──────────────┘
                     │                 │     ┌──────────────┐
                     │                 │────▶│ Redis PubSub │──▶ WebSocket Gateway
                     │                 │     │ (real-time)  │
                     └─────────────────┘     └──────────────┘

Channel Naming Convention

events:{source_type}:{source_id}

Examples:
- events:user:octocat      - Events for user octocat
- events:repo:owner/repo   - Events for a repository
- events:org:microsoft     - Events for an organization

Batch Publishing

For high throughput:

func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
    // 1. Batch to Kafka
    messages := make([]kafka.Message, len(events))
    for i, event := range events {
        payload, _ := json.Marshal(event)
        messages[i] = kafka.Message{
            Key:   []byte(event.SourceID),
            Value: payload,
        }
    }

    if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
        return fmt.Errorf("kafka batch failed: %w", err)
    }

    // 2. Redis: Pipeline for efficiency
    pipe := p.redis.Pipeline()
    for _, event := range events {
        channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
        notification, _ := json.Marshal(map[string]interface{}{
            "id":   event.ID,
            "type": event.Type,
        })
        pipe.Publish(ctx, channel, notification)
    }
    
    if _, err := pipe.Exec(ctx); err != nil {
        p.logger.Warn("redis batch failed", "error", err)
    }

    return nil
}

Decision Tree

Requirement Stream Why
Must not lose event Kafka only Ack required, replicated
User sees immediately Redis only Sub-ms delivery
Both durability + real-time Dual stream This pattern
High volume (>10k/sec) Kafka, batch Redis Redis can bottleneck
Many subscribers per channel Redis + local fan-out Don't hammer Redis

Related Skills


NEVER Do

  • NEVER fail on Redis errors — Redis is best-effort. Log and continue.
  • NEVER send full payload to Redis — Send IDs only, clients fetch from API.
  • NEVER create one Redis channel per event — Use source-level channels.
  • NEVER skip Kafka for "unimportant" events — All events go to Kafka for replay.
  • NEVER use Redis Pub/Sub for persistence — Messages are fire-and-forget.

Edge Cases

Case Solution
Redis down Log warning, continue with Kafka only
Client connects mid-stream Query API for recent events, then subscribe
High channel cardinality Use wildcard patterns or aggregate channels
Kafka backpressure Buffer in memory with timeout, fail if full
Need event replay Consume from Kafka from offset, not Redis
Usage Guidance
This is an architecture/implementation guide (no executable code bundled), and overall it is coherent with its purpose. Before installing or following the README examples: 1) Verify the origin of any npx or GitHub URL used — don't run install commands that fetch code from untrusted or unclear sources. 2) Expect to provide Kafka and Redis connection configuration (host, port, TLS settings, credentials) in your application — the skill doesn't manage or request them. 3) When you copy or install the skill manually, review the files you pull into your environment. 4) If you intend to let an autonomous agent use this skill, it's low-risk because it doesn't request secrets, but still avoid granting the agent network/credential access to your Kafka/Redis clusters unless you trust it. 5) If you need a packaged/official release, prefer installs from a verified registry or a trusted repository rather than an ad-hoc tree URL.
Capability Analysis
Type: OpenClaw Skill Name: dual-stream-architecture Version: 1.0.0 The skill bundle describes a dual-stream event publishing architecture using Kafka and Redis. The `SKILL.md` and `README.md` files provide documentation, code examples, and installation instructions. There is no evidence of prompt injection against the AI agent, nor any malicious code attempting data exfiltration, unauthorized execution, or persistence. The Go code snippets are illustrative and do not contain risky operations. Installation methods include `npx clawhub@latest install` and direct `cp -r` commands, which are standard for skill integration. While `npx add` from a GitHub URL is mentioned, it's not inherently malicious and likely a legitimate installation path within this ecosystem.
Capability Assessment
Purpose & Capability
The name/description match the included content: SKILL.md is a design/implementation guide for publishing to Kafka (durability) and Redis Pub/Sub (real-time). The code examples and guidance are coherent with the described purpose.
Instruction Scope
SKILL.md contains code snippets, architecture diagrams, and operational guidance only. It does not instruct the agent to read unrelated system files, exfiltrate data, or call external endpoints outside of normal installation guidance. The README contains manual copy/install suggestions (including paths under the user's home) but these are standard packaging instructions rather than runtime data collection.
Install Mechanism
There is no declared install spec in the skill metadata (instruction-only), which is low-risk. The README suggests using npx to fetch/install the package and includes a GitHub tree-style URL in an example; pulling code over npx or using non-standard URLs is a general supply-chain risk — verify the source before running those commands.
Credentials
The skill declares no required environment variables or credentials, which is appropriate for an instruction-only pattern. However, the examples assume Kafka and Redis clients (which in real deployments require connection strings / credentials). The skill does not declare or attempt to access credentials itself — you will need to supply those when you implement the pattern.
Persistence & Privilege
always:false and no config paths or installation hooks are declared. The skill does not request persistent presence or elevated privileges and does not modify other skills or global agent settings.
How to Use
  1. Make sure OpenClaw is installed (local or Docker)
  2. Run the install command in chat: /install dual-stream-architecture
  3. After installation, invoke the skill by name or use /dual-stream-architecture
  4. Provide required inputs per the skill's parameter spec and get structured output
Version History
v1.0.0
Initial release of the dual-stream-architecture skill: publish events to Kafka and Redis Pub/Sub for both durability and real-time delivery. - Provides a Go implementation of simultaneous event publishing to Kafka (guaranteed delivery) and Redis Pub/Sub (sub-ms updates). - Includes usage recommendations, example code, channel conventions, and a decision tree for selecting stream strategies. - Outlines best practices (e.g., Redis as best-effort, channel naming, batching for throughput). - Documents critical edge cases and common pitfalls. - Links to related skills for real-time dashboards and service architectures.
Metadata
Slug dual-stream-architecture
Version 1.0.0
License
All-time Installs 3
Active Installs 3
Total Versions 1
Frequently Asked Questions

What is Dual Stream Architecture?

Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture. It is an AI Agent Skill for Claude Code / OpenClaw, with 802 downloads so far.

How do I install Dual Stream Architecture?

Run "/install dual-stream-architecture" in the OpenClaw or Claude Code chat to install it in one step — no extra setup required.

Is Dual Stream Architecture free?

Yes, Dual Stream Architecture is completely free (open-source). You can download, install and use it at no cost.

Which platforms does Dual Stream Architecture support?

Dual Stream Architecture is cross-platform and runs anywhere OpenClaw / Claude Code is available (cross-platform).

Who created Dual Stream Architecture?

It is built and maintained by wpank (@wpank); the current version is v1.0.0.

💬 Comments