← 返回 Skills 市场
samber

Golang Samber Ro

作者 Samuel Berthe · GitHub ↗ · v1.0.3 · MIT-0
cross-platform ✓ 安全检测通过
171
总下载
0
收藏
1
当前安装
3
版本数
在 OpenClaw 中安装
/install golang-samber-ro
功能描述
Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5 sub...
使用说明 (SKILL.md)

Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.

Thinking mode: Use ultrathink when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.

samber/ro — Reactive Streams for Go

Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.

Official Resources:

This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.

Why samber/ro (Streams vs Slices)

Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. samber/ro solves this with declarative, chainable stream operators.

When to use which tool:

Scenario Tool Why
Transform a slice (map, filter, reduce) samber/lo Finite, synchronous, eager — no stream overhead needed
Simple goroutine fan-out with error handling errgroup Standard lib, lightweight, sufficient for bounded concurrency
Infinite event stream (WebSocket, tickers, file watcher) samber/ro Declarative pipeline with backpressure, retry, timeout, combine
Real-time data enrichment from multiple async sources samber/ro CombineLatest/Zip compose dependent streams without manual select
Pub/sub with multiple consumers sharing one source samber/ro Hot observables (Share/Subjects) handle multicast natively

Key differences: lo vs ro

Aspect samber/lo samber/ro
Data Finite slices Infinite streams
Execution Synchronous, blocking Asynchronous, non-blocking
Evaluation Eager (allocates intermediate slices) Lazy (processes items as they arrive)
Timing Immediate Time-aware (delay, throttle, interval, timeout)
Error model Return (T, error) per call Error channel propagates through pipeline
Use case Collection transforms Event-driven, real-time, async pipelines

Installation

go get github.com/samber/ro

Core Concepts

Four building blocks:

  1. Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
  2. Observer — a consumer with three callbacks: onNext(T), onError(error), onComplete()
  3. Operator — a function that transforms an observable into another observable, chained via Pipe
  4. Subscription — the connection between observable and observer. Call .Wait() to block or .Unsubscribe() to cancel
observable := ro.Pipe2(
    ro.RangeWithInterval(0, 5, 1*time.Second),
    ro.Filter(func(x int) bool { return x%2 == 0 }),
    ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)

observable.Subscribe(ro.NewObserver(
    func(s string) { fmt.Println(s) },      // onNext
    func(err error) { log.Println(err) },    // onError
    func() { fmt.Println("Done!") },         // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"

// Or collect synchronously:
values, err := ro.Collect(observable)

Cold vs Hot Observables

Cold (default): each .Subscribe() starts a new independent execution. Safe and predictable — use by default.

Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.

Convert with Behavior
Share() Cold → hot with reference counting. Last unsubscribe tears down
ShareReplay(n) Same as Share + buffers last N values for late subscribers
Connectable() Cold → hot, but waits for explicit .Connect() call
Subjects Natively hot — call .Send(), .Error(), .Complete() directly
Subject Constructor Replay behavior
PublishSubject NewPublishSubject[T]() None — late subscribers miss past events
BehaviorSubject NewBehaviorSubject[T](initial) Replays last value to new subscribers
ReplaySubject NewReplaySubject[T](bufferSize) Replays last N values
AsyncSubject NewAsyncSubject[T]() Emits only last value, only on complete
UnicastSubject NewUnicastSubject[T](bufferSize) Single subscriber only

For subject details and hot observable patterns, see Subjects Guide.

Operator Quick Reference

Category Key operators Purpose
Creation Just, FromSlice, FromChannel, Range, Interval, Defer, Future Create observables from various sources
Transform Map, MapErr, FlatMap, Scan, Reduce, GroupBy Transform or accumulate stream values
Filter Filter, Take, TakeLast, Skip, Distinct, Find, First, Last Selectively emit values
Combine Merge, Concat, Zip2Zip6, CombineLatest2CombineLatest5, Race Merge multiple observables
Error Catch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfig Recover from errors
Timing Delay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTime Control emission timing
Side effect Tap/Do, TapOnNext, TapOnError, TapOnComplete Observe without altering stream
Terminal Collect, ToSlice, ToChannel, ToMap Consume stream into Go types

Use typed Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.

For the complete operator catalog (150+ operators with signatures), see Operators Guide.

Common Mistakes

Mistake Why it fails Fix
Using ro.OnNext() without error handler Errors are silently dropped — bugs hide in production Use ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks
Using untyped Pipe() instead of Pipe2/Pipe3 Loses compile-time type safety, errors surface at runtime Use Pipe2, Pipe3...Pipe25 for typed operator chains
Forgetting .Unsubscribe() on infinite streams Goroutine leak — the observable runs forever Use TakeUntil(signal), context cancellation, or explicit Unsubscribe()
Using Share() when cold is sufficient Unnecessary complexity, harder to reason about lifecycle Use hot observables only when multiple consumers need the same stream
Using samber/ro for finite slice transforms Stream overhead (goroutines, subscriptions) for a synchronous operation Use samber/lo — it's simpler, faster, and purpose-built for slices
Not propagating context for cancellation Streams ignore shutdown signals, causing resource leaks on termination Chain ContextWithTimeout or ThrowOnContextCancel in the pipeline

Best Practices

  1. Always handle all three events — use NewObserver(onNext, onError, onComplete), not just OnNext. Unhandled errors cause silent data loss
  2. Use Collect() for synchronous consumption — when the stream is finite and you need []T, Collect blocks until complete and returns the slice + error
  3. Prefer typed Pipe functionsPipe2, Pipe3...Pipe25 catch type mismatches at compile time. Reserve untyped Pipe for dynamic operator chains
  4. Bound infinite streams — use Take(n), TakeUntil(signal), Timeout(d), or context cancellation. Unbounded streams leak goroutines
  5. Use Tap/Do for observability — log, trace, or meter emissions without altering the stream. Chain TapOnError for error monitoring
  6. Prefer samber/lo for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use lo. Reach for ro when data arrives over time, from multiple sources, or needs retry/timeout/backpressure

Plugin Ecosystem

40+ plugins extend ro with domain-specific operators:

Category Plugins Import path prefix
Encoding JSON, CSV, Base64, Gob plugins/encoding/...
Network HTTP, I/O, FSNotify plugins/http, plugins/io, plugins/fsnotify
Scheduling Cron, ICS plugins/cron, plugins/ics
Observability Zap, Slog, Zerolog, Logrus, Sentry, Oops plugins/observability/..., plugins/samber/oops
Rate limiting Native, Ulule plugins/ratelimit/...
Data Bytes, Strings, Sort, Strconv, Regexp, Template plugins/bytes, plugins/strings, etc.
System Process, Signal plugins/proc, plugins/signal

For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.

For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.

If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.

Cross-References

  • → See samber/cc-skills-golang@golang-samber-lo skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
  • → See samber/cc-skills-golang@golang-samber-mo skill for monadic types (Option, Result, Either) that compose with ro pipelines
  • → See samber/cc-skills-golang@golang-samber-hot skill for in-memory caching (also available as an ro plugin)
  • → See samber/cc-skills-golang@golang-concurrency skill for goroutine/channel patterns when reactive streams are overkill
  • → See samber/cc-skills-golang@golang-observability skill for monitoring reactive pipelines in production
安全使用建议
This skill is a documentation-style coding helper for the samber/ro reactive streams library and looks internally consistent. Before using it: (1) Review any example code before executing — examples may access filesystem paths or open network connections (e.g., WebSockets, HTTP) and `go get` will fetch remote modules. (2) If you allow an agent to run its examples, restrict or monitor the agent's ability to run Bash/git/go to avoid unintended file or network I/O. (3) Verify the upstream project/homepage (linked in the skill) if you plan to import the module into production. Otherwise, the skill does not request secrets or unusual privileges.
功能分析
Type: OpenClaw Skill Name: golang-samber-ro Version: 1.0.3 The skill bundle provides comprehensive documentation and instructions for using the 'samber/ro' ReactiveX implementation for Golang. Analysis of the SKILL.md, evals.json, and reference guides shows no evidence of malicious intent, data exfiltration, or unauthorized execution; all code examples and tool permissions (e.g., Bash for Go/Git) are strictly aligned with the stated purpose of developing and maintaining Go reactive streams.
能力评估
Purpose & Capability
Name/description match the content: the skill is a documentation/usage guide for samber/ro and only declares the Go tool as a required binary — appropriate for a Go coding assistant for this library.
Instruction Scope
SKILL.md contains extensive examples and runtime patterns (including examples that watch filesystem paths such as /etc/app/config or /var/log/app/ and examples that open network connections like WebSocket/HTTP). These are legitimate developer examples for this library, but if an agent actually executes example code it may read system files or open network I/O. The skill itself does not instruct collecting secrets or reading unrelated host config.
Install Mechanism
No install spec; instruction-only. Minimal risk because nothing is downloaded or written by the skill itself. Note: examples call `go get` which will fetch remote modules (normal for Go development).
Credentials
No required environment variables, credentials, or config paths declared. The examples reference file paths and plugins but do not request or require access to unrelated credentials.
Persistence & Privilege
always: false and no special persistence or privilege requests. The skill does not attempt to modify other skills or system-wide agent settings.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install golang-samber-ro
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /golang-samber-ro 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.0.3
- Update version to 1.0.3 in SKILL.md metadata. - Add AskUserQuestion tool to allowed-tools. - Minor correction of a typo: "more informations" → "more information" in documentation.
v1.0.1
- Updated version to 1.0.1 in metadata. - Added initial evaluation data file at evals/evals.json.
v0.1.0
Initial release of golang-samber-ro skill for Go projects using samber/ro: - Provides an overview, installation, and core concepts of samber/ro for reactive streams in Go. - Includes usage guidance on when to choose samber/ro over traditional goroutines/channels or samber/lo for finite slices. - Offers quick reference tables for operators, observable/subject types, and key pipeline patterns. - Explains cold vs. hot observables and subject types with constructors and typical behaviors. - Lists common mistakes and best practices for building robust asynchronous pipelines.
元数据
Slug golang-samber-ro
版本 1.0.3
许可证 MIT-0
累计安装 1
当前安装数 1
历史版本数 3
常见问题

Golang Samber Ro 是什么?

Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5 sub... 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 171 次。

如何安装 Golang Samber Ro?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install golang-samber-ro」即可一键安装,无需额外配置。

Golang Samber Ro 是免费的吗?

是的,Golang Samber Ro 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

Golang Samber Ro 支持哪些平台?

Golang Samber Ro 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 Golang Samber Ro?

由 Samuel Berthe(@samber)开发并维护,当前版本 v1.0.3。

💬 留言讨论