/install dual-stream-architecture
Dual-Stream Architecture
Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.
Installation
OpenClaw / Moltbot / Clawbot
npx clawhub@latest install dual-stream-architecture
When to Use
- Event-driven systems needing both durability AND real-time
- WebSocket/SSE backends that push live updates
- Dashboards showing events as they happen
- Kafka consumers have lag but users expect instant updates
Core Pattern
type DualPublisher struct {
kafka *kafka.Writer
redis *redis.Client
logger *slog.Logger
}
func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
// 1. Kafka: Critical path - must succeed
payload, _ := json.Marshal(event)
err := p.kafka.WriteMessages(ctx, kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
})
if err != nil {
return fmt.Errorf("kafka publish failed: %w", err)
}
// 2. Redis: Best-effort - don't fail the operation
p.publishToRedis(ctx, event)
return nil
}
func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
// Lightweight payload (full event in Kafka)
notification := map[string]interface{}{
"id": event.ID,
"type": event.Type,
"source_id": event.SourceID,
}
payload, _ := json.Marshal(notification)
channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
// Fire and forget - log errors but don't propagate
if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
p.logger.Warn("redis publish failed", "error", err)
}
}
Architecture
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Ingester │────▶│ DualPublisher │────▶│ Kafka │──▶ Event Processor
│ │ │ │ │ (durable) │
└──────────────┘ │ │ └──────────────┘
│ │ ┌──────────────┐
│ │────▶│ Redis PubSub │──▶ WebSocket Gateway
│ │ │ (real-time) │
└─────────────────┘ └──────────────┘
Channel Naming Convention
events:{source_type}:{source_id}
Examples:
- events:user:octocat - Events for user octocat
- events:repo:owner/repo - Events for a repository
- events:org:microsoft - Events for an organization
Batch Publishing
For high throughput:
func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
// 1. Batch to Kafka
messages := make([]kafka.Message, len(events))
for i, event := range events {
payload, _ := json.Marshal(event)
messages[i] = kafka.Message{
Key: []byte(event.SourceID),
Value: payload,
}
}
if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
return fmt.Errorf("kafka batch failed: %w", err)
}
// 2. Redis: Pipeline for efficiency
pipe := p.redis.Pipeline()
for _, event := range events {
channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
notification, _ := json.Marshal(map[string]interface{}{
"id": event.ID,
"type": event.Type,
})
pipe.Publish(ctx, channel, notification)
}
if _, err := pipe.Exec(ctx); err != nil {
p.logger.Warn("redis batch failed", "error", err)
}
return nil
}
Decision Tree
| Requirement | Stream | Why |
|---|---|---|
| Must not lose event | Kafka only | Ack required, replicated |
| User sees immediately | Redis only | Sub-ms delivery |
| Both durability + real-time | Dual stream | This pattern |
| High volume (>10k/sec) | Kafka, batch Redis | Redis can bottleneck |
| Many subscribers per channel | Redis + local fan-out | Don't hammer Redis |
Related Skills
- Meta-skill: ai/skills/meta/realtime-dashboard/ — Complete realtime dashboard guide
- websocket-hub-patterns — WebSocket gateway
- backend/service-layer-architecture — Service integration
NEVER Do
- NEVER fail on Redis errors — Redis is best-effort. Log and continue.
- NEVER send full payload to Redis — Send IDs only, clients fetch from API.
- NEVER create one Redis channel per event — Use source-level channels.
- NEVER skip Kafka for "unimportant" events — All events go to Kafka for replay.
- NEVER use Redis Pub/Sub for persistence — Messages are fire-and-forget.
Edge Cases
| Case | Solution |
|---|---|
| Redis down | Log warning, continue with Kafka only |
| Client connects mid-stream | Query API for recent events, then subscribe |
| High channel cardinality | Use wildcard patterns or aggregate channels |
| Kafka backpressure | Buffer in memory with timeout, fail if full |
| Need event replay | Consume from Kafka from offset, not Redis |
- 确保已安装 OpenClaw(本地或 Docker 部署)
- 在对话框中输入安装命令:
/install dual-stream-architecture - 安装完成后,直接呼叫该 Skill 的名称或使用
/dual-stream-architecture触发 - 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
Dual Stream Architecture 是什么?
Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 802 次。
如何安装 Dual Stream Architecture?
在 OpenClaw 或 Claude Code 对话框中运行命令「/install dual-stream-architecture」即可一键安装,无需额外配置。
Dual Stream Architecture 是免费的吗?
是的,Dual Stream Architecture 完全免费(开源免费),可自由下载、安装和使用。
Dual Stream Architecture 支持哪些平台?
Dual Stream Architecture 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。
谁开发了 Dual Stream Architecture?
由 wpank(@wpank)开发并维护,当前版本 v1.0.0。