第 38 章

实现一个 Redis 客户端

第三十八章:实现一个 Redis 客户端

每天有数以亿计的请求通过 go-redisredigo 流向 Redis 服务器。对大多数工程师来说,Redis 客户端就是一个黑盒:调用 client.Get(ctx, "key"),等待结果。这种抽象让你高效,但也让你脆弱——当连接泄漏、pipeline 错误、集群路由失败时,你不知道发生了什么,更不知道如何修复。

本章要打开这个黑盒。我们将从 TCP 套接字开始,一步步实现一个能够处理 GET/SET/HSET/LPUSH/EXPIRE、支持 pipeline 和 Pub/Sub 的 Redis 客户端。不是为了替代 go-redis,而是为了理解它在做什么——以及当它出问题时,你该往哪里看。


Level 1 · 为什么要自己实现 Redis 客户端

你在使用 go-redis 时真正依赖的是什么

当你写下 rdb.Get(ctx, "user:1000") 时,这行代码背后有一条完整的调用链:

你的代码
  → go-redis 命令构建(将 ["GET", "user:1000"] 序列化为 RESP 格式)
  → 从连接池取出一条 TCP 连接
  → 写入 RESP 编码的字节流到 socket
  → 读取服务器响应(也是 RESP 格式)
  → 解析响应,返回 Go 值
  → 将连接放回连接池

go-redis 在这条链上提供了大量封装:自动重试、连接池管理、集群路由、哨兵切换、Context 取消、命令超时……这些功能让它成为生产级库。

但封装有代价。当你遇到以下问题时,封装会成为障碍:

这些问题在不了解底层协议的情况下很难回答清楚。自己实现一遍,是理解这些问题最有效的方式。

RESP 协议:Redis 的通信语言

Redis 使用 RESP(Redis Serialization Protocol)作为客户端-服务器通信协议。这是一个极度简单、对人类可读且高效的文本协议。

RESP 的设计哲学是:简单到可以手动调试。你可以用 telnet localhost 6379 直接与 Redis 通话:

$ telnet localhost 6379
Trying 127.0.0.1...
Connected to localhost.
*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
+OK\r\n

这就是 RESP——人类可读,机器友好,协议开销极小。

go-redis 帮你做了什么,你看不见什么

go-redis 最核心的工作是连接池管理。一个生产环境的 Redis 客户端每秒可能发出数万条命令,如果每条命令都建立新的 TCP 连接,三次握手的开销会成为瓶颈。连接池维护一组预建立的连接,按需分配,用完归还。

go-redis 还透明地处理了 RESP3 的 hello 握手、TLS 升级、Auth 认证等初始化流程。当你配置 Options{Addr: "localhost:6379", Password: "secret"} 时,go-redis 会在建立每条连接时自动发送 AUTH secret,你完全感知不到。

了解这些细节,不是为了重复造轮子,而是为了在生产事故时能够快速定位:这个问题是我的业务逻辑、连接池配置、还是协议层面的问题?


Level 2 · RESP 协议原理与连接管理

RESP2 的五种数据类型

RESP2 定义了五种基本类型,每种类型以一个特殊字节开头:

前缀 类型 示例
+ Simple String(简单字符串) +OK\r\n
- Error(错误) -ERR unknown command\r\n
: Integer(整数) :1000\r\n
$ Bulk String(批量字符串) $6\r\nfoobar\r\n
* Array(数组) *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n

Simple String 用于简单的状态回复,如 +OK\r\n。它不能包含换行符,因此不适合传输二进制数据。

Error 格式与 Simple String 相同,但以 - 开头。错误类型通常以大写字母开头,如 -ERR-WRONGTYPE-MOVED

Integer 用于计数操作的返回值,如 INCRLLEN 的结果。

Bulk String 是最重要的类型,用于传输任意二进制数据。格式是 $<length>\r\n<data>\r\n。特殊情况:$-1\r\n 表示 Null Bulk String(对应 Go 的 nil,当 key 不存在时 GET 返回此值)。

Array 是递归结构,元素可以是任意 RESP 类型(包括嵌套数组)。所有 Redis 命令从客户端发送时都使用 Array 格式,如 GET foo 编码为:

*2\r\n
$3\r\n
GET\r\n
$3\r\n
foo\r\n

RESP 解析的状态机模型

RESP 的解析本质上是一个状态机:读取第一个字节确定类型,然后根据类型决定后续的读取策略。

对于 Simple String 和 Error:读到 \r\n 为止。 对于 Integer:读到 \r\n 为止,将内容解析为 int64。 对于 Bulk String:先读长度行($<N>\r\n),再读 N+2 个字节(数据 + \r\n)。 对于 Array:先读元素数量行(*<N>\r\n),然后递归解析 N 个元素。

这个递归结构使得 RESP 的解析器可以用很少的代码实现,但需要仔细处理缓冲和边界条件。

连接池:为什么需要,如何工作

TCP 连接的建立有不可忽视的开销:三次握手约需 1 个 RTT(局域网约 0.1ms,跨机房约 1-5ms),加上 TLS 握手又需要 1-2 个 RTT。对于每秒万次以上的 Redis 访问,如果每次都新建连接,仅握手开销就会成为主要瓶颈。

连接池的工作原理:

初始化时:建立 MinIdleConns 条连接放入池中
获取连接时:
  1. 从池中取出空闲连接(如果有)
  2. 如果池已空且连接总数 < PoolSize,新建连接
  3. 如果池已满,等待(带超时)
归还连接时:
  1. 检查连接健康(发送 PING)
  2. 放回池中(如果池未满)
  3. 关闭连接(如果池已满或连接已过期)

关键问题:什么时候检测到连接已死? TCP 的 keepalive 默认需要 2 小时才会检测到断开。更实用的方案是:在归还连接时发送 PING,或在取出连接时设置读超时,让第一次命令失败来触发重连。

Pipeline:批量命令的网络优化

Pipeline 将多条命令打包在一个 TCP 包(或少数几个包)中发送,然后批量读取响应。这消除了每条命令的网络往返延迟(RTT)。

以一次 pipeline 发送 3 条命令为例:

# 普通模式(3 个 RTT):
发送 SET a 1  →  等待 +OK  →  发送 SET b 2  →  等待 +OK  →  发送 SET c 3  →  等待 +OK

# Pipeline 模式(1 个 RTT):
发送 SET a 1\r\n SET b 2\r\n SET c 3\r\n  →  等待  →  读取 +OK\r\n +OK\r\n +OK\r\n

Pipeline 的关键约束:请求和响应严格有序。Redis 服务器保证按照接收命令的顺序发送响应,客户端必须按相同顺序解析响应并分发给等待者。

Pub/Sub:连接的专用化

Pub/Sub 是 Redis 的推送模型:客户端订阅一个频道后,连接进入订阅模式,只能接收 SUBSCRIBE/UNSUBSCRIBE/MESSAGE 三种消息,不能再发送普通命令(如 GET/SET)。

这要求 Pub/Sub 使用独占连接,不能复用普通连接池中的连接。go-redis 的 PubSub 对象维护一条专用连接,并在内部运行一个 goroutine 持续读取服务器推送的消息,通过 channel 分发给订阅者。


Level 3 · 从零实现 Redis 客户端

项目结构

redis-client/
├── resp/
│   ├── reader.go    # RESP 解码器
│   └── writer.go    # RESP 编码器
├── pool/
│   └── pool.go      # 连接池
├── client.go        # 主客户端
├── pipeline.go      # Pipeline 支持
├── pubsub.go        # Pub/Sub 支持
└── main.go          # 示例

Step 1:RESP 编码器

// resp/writer.go
package resp

import (
	"bufio"
	"fmt"
	"io"
	"strconv"
)

// Writer 将 Go 值编码为 RESP 格式并写入底层 writer。
// 使用 bufio.Writer 减少系统调用次数。
type Writer struct {
	bw *bufio.Writer
}

func NewWriter(w io.Writer) *Writer {
	return &Writer{bw: bufio.NewWriter(w)}
}

// WriteCommand 将命令(字符串切片)编码为 RESP Array 并写入。
// 所有 Redis 命令都以这种格式发送。
func (w *Writer) WriteCommand(args ...string) error {
	// *<count>\r\n
	if err := w.writeByte('*'); err != nil {
		return err
	}
	if err := w.writeString(strconv.Itoa(len(args))); err != nil {
		return err
	}
	if err := w.writeCRLF(); err != nil {
		return err
	}

	for _, arg := range args {
		// $<len>\r\n<data>\r\n
		if err := w.writeByte('$'); err != nil {
			return err
		}
		if err := w.writeString(strconv.Itoa(len(arg))); err != nil {
			return err
		}
		if err := w.writeCRLF(); err != nil {
			return err
		}
		if err := w.writeString(arg); err != nil {
			return err
		}
		if err := w.writeCRLF(); err != nil {
			return err
		}
	}
	return w.bw.Flush()
}

func (w *Writer) writeByte(b byte) error {
	return w.bw.WriteByte(b)
}

func (w *Writer) writeString(s string) error {
	_, err := w.bw.WriteString(s)
	return err
}

func (w *Writer) writeCRLF() error {
	_, err := w.bw.WriteString("\r\n")
	return err
}

// WriteCommandBuf 将命令写入 buffer 但不 flush,用于 pipeline。
func (w *Writer) WriteCommandBuf(args ...string) error {
	if err := fmt.Fprintf(w.bw, "*%d\r\n", len(args)); err != nil {
		return err
	}
	for _, arg := range args {
		if _, err := fmt.Fprintf(w.bw, "$%d\r\n%s\r\n", len(arg), arg); err != nil {
			return err
		}
	}
	return nil
}

// Flush 将缓冲区内容发送到底层连接。
func (w *Writer) Flush() error {
	return w.bw.Flush()
}

Step 2:RESP 解码器

// resp/reader.go
package resp

import (
	"bufio"
	"errors"
	"fmt"
	"io"
	"strconv"
	"strings"
)

// Value 表示一个 RESP 值,可以是任意类型。
type Value struct {
	typ     byte
	integer int64
	str     string
	err     error
	array   []*Value
	isNull  bool
}

var ErrNull = errors.New("redis: null value")

func (v *Value) String() (string, error) {
	if v.isNull {
		return "", ErrNull
	}
	if v.err != nil {
		return "", v.err
	}
	return v.str, nil
}

func (v *Value) Int() (int64, error) {
	if v.err != nil {
		return 0, v.err
	}
	return v.integer, nil
}

func (v *Value) Array() ([]*Value, error) {
	if v.isNull {
		return nil, ErrNull
	}
	if v.err != nil {
		return nil, v.err
	}
	return v.array, nil
}

func (v *Value) Error() error {
	return v.err
}

// Reader 从底层 reader 读取并解析 RESP 消息。
type Reader struct {
	br *bufio.Reader
}

func NewReader(r io.Reader) *Reader {
	return &Reader{br: bufio.NewReaderSize(r, 4096)}
}

// ReadValue 读取并解析一个完整的 RESP 值。
// 这是递归实现:Array 类型会递归调用 ReadValue 解析每个元素。
func (r *Reader) ReadValue() (*Value, error) {
	// 读取类型字节
	typByte, err := r.br.ReadByte()
	if err != nil {
		return nil, fmt.Errorf("resp: read type byte: %w", err)
	}

	switch typByte {
	case '+':
		return r.readSimpleString()
	case '-':
		return r.readError()
	case ':':
		return r.readInteger()
	case '$':
		return r.readBulkString()
	case '*':
		return r.readArray()
	default:
		return nil, fmt.Errorf("resp: unknown type byte: %q", typByte)
	}
}

// readLine 读取一行(直到 \r\n),返回不含 \r\n 的内容。
func (r *Reader) readLine() (string, error) {
	line, err := r.br.ReadString('\n')
	if err != nil {
		return "", err
	}
	// 去掉末尾的 \r\n
	return strings.TrimSuffix(strings.TrimSuffix(line, "\n"), "\r"), nil
}

func (r *Reader) readSimpleString() (*Value, error) {
	line, err := r.readLine()
	if err != nil {
		return nil, err
	}
	return &Value{typ: '+', str: line}, nil
}

func (r *Reader) readError() (*Value, error) {
	line, err := r.readLine()
	if err != nil {
		return nil, err
	}
	return &Value{typ: '-', err: errors.New(line)}, nil
}

func (r *Reader) readInteger() (*Value, error) {
	line, err := r.readLine()
	if err != nil {
		return nil, err
	}
	n, err := strconv.ParseInt(line, 10, 64)
	if err != nil {
		return nil, fmt.Errorf("resp: invalid integer: %q", line)
	}
	return &Value{typ: ':', integer: n}, nil
}

func (r *Reader) readBulkString() (*Value, error) {
	line, err := r.readLine()
	if err != nil {
		return nil, err
	}
	n, err := strconv.ParseInt(line, 10, 64)
	if err != nil {
		return nil, fmt.Errorf("resp: invalid bulk string length: %q", line)
	}
	// Null bulk string ($-1\r\n)
	if n == -1 {
		return &Value{typ: '$', isNull: true}, nil
	}
	// 读取 n+2 个字节(数据 + \r\n)
	buf := make([]byte, n+2)
	if _, err := io.ReadFull(r.br, buf); err != nil {
		return nil, fmt.Errorf("resp: read bulk string data: %w", err)
	}
	return &Value{typ: '$', str: string(buf[:n])}, nil
}

func (r *Reader) readArray() (*Value, error) {
	line, err := r.readLine()
	if err != nil {
		return nil, err
	}
	n, err := strconv.ParseInt(line, 10, 64)
	if err != nil {
		return nil, fmt.Errorf("resp: invalid array length: %q", line)
	}
	if n == -1 {
		return &Value{typ: '*', isNull: true}, nil
	}
	arr := make([]*Value, n)
	for i := int64(0); i < n; i++ {
		arr[i], err = r.ReadValue()
		if err != nil {
			return nil, fmt.Errorf("resp: read array element %d: %w", i, err)
		}
	}
	return &Value{typ: '*', array: arr}, nil
}

Step 3:连接池

// pool/pool.go
package pool

import (
	"context"
	"errors"
	"net"
	"sync"
	"time"
)

var ErrPoolExhausted = errors.New("pool: connection pool exhausted")

// Conn 包装一条 TCP 连接,携带创建时间用于过期检测。
type Conn struct {
	net.Conn
	createdAt time.Time
}

// Pool 是一个简单的 TCP 连接池。
// 生产级实现(如 go-redis)还会跟踪活跃连接数、实现 backoff 重试等。
type Pool struct {
	mu          sync.Mutex
	idle        []*Conn        // 空闲连接队列
	numActive   int            // 当前活跃连接总数(空闲 + 使用中)
	maxSize     int            // 最大连接数
	maxIdleTime time.Duration  // 空闲连接的最大存活时间
	dial        func() (net.Conn, error) // 建立新连接的函数
	waiters     []chan *Conn   // 等待连接的请求
}

// NewPool 创建一个连接池。
func NewPool(dial func() (net.Conn, error), maxSize int, maxIdleTime time.Duration) *Pool {
	return &Pool{
		dial:        dial,
		maxSize:     maxSize,
		maxIdleTime: maxIdleTime,
	}
}

// Get 从池中取出一条连接。如果没有空闲连接且未达上限,新建连接。
// 如果已达上限,等待直到有连接归还或 ctx 取消。
func (p *Pool) Get(ctx context.Context) (*Conn, error) {
	p.mu.Lock()

	// 尝试从空闲队列取出一条有效连接
	for len(p.idle) > 0 {
		conn := p.idle[len(p.idle)-1]
		p.idle = p.idle[:len(p.idle)-1]

		// 检查连接是否已过期
		if time.Since(conn.createdAt) > p.maxIdleTime {
			conn.Close()
			p.numActive--
			continue
		}
		p.mu.Unlock()
		return conn, nil
	}

	// 没有空闲连接,尝试新建
	if p.numActive < p.maxSize {
		p.numActive++
		p.mu.Unlock()
		c, err := p.dial()
		if err != nil {
			p.mu.Lock()
			p.numActive--
			p.mu.Unlock()
			return nil, err
		}
		return &Conn{Conn: c, createdAt: time.Now()}, nil
	}

	// 连接池已满,加入等待队列
	ch := make(chan *Conn, 1)
	p.waiters = append(p.waiters, ch)
	p.mu.Unlock()

	select {
	case conn := <-ch:
		if conn == nil {
			return nil, ErrPoolExhausted
		}
		return conn, nil
	case <-ctx.Done():
		// 从等待队列移除
		p.mu.Lock()
		for i, w := range p.waiters {
			if w == ch {
				p.waiters = append(p.waiters[:i], p.waiters[i+1:]...)
				break
			}
		}
		p.mu.Unlock()
		return nil, ctx.Err()
	}
}

// Put 将连接归还到池中。isHealthy 为 false 时直接关闭连接。
func (p *Pool) Put(conn *Conn, isHealthy bool) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if !isHealthy {
		conn.Close()
		p.numActive--
		// 通知等待者重新尝试(发送 nil 表示失败)
		if len(p.waiters) > 0 {
			ch := p.waiters[0]
			p.waiters = p.waiters[1:]
			ch <- nil
		}
		return
	}

	// 优先分配给等待者
	if len(p.waiters) > 0 {
		ch := p.waiters[0]
		p.waiters = p.waiters[1:]
		ch <- conn
		return
	}

	p.idle = append(p.idle, conn)
}

// Close 关闭连接池中的所有连接。
func (p *Pool) Close() {
	p.mu.Lock()
	defer p.mu.Unlock()
	for _, conn := range p.idle {
		conn.Close()
	}
	p.idle = nil
}

Step 4:主客户端实现

// client.go
package redis

import (
	"context"
	"fmt"
	"net"
	"time"

	"redis-client/pool"
	"redis-client/resp"
)

// Client 是 Redis 客户端,内部维护一个连接池。
type Client struct {
	pool    *pool.Pool
	addr    string
	timeout time.Duration
}

// Options 配置 Redis 客户端。
type Options struct {
	Addr        string
	Password    string
	DB          int
	PoolSize    int
	MaxIdleTime time.Duration
	Timeout     time.Duration
}

func defaultOptions() *Options {
	return &Options{
		Addr:        "localhost:6379",
		PoolSize:    10,
		MaxIdleTime: 5 * time.Minute,
		Timeout:     3 * time.Second,
	}
}

// NewClient 创建一个新的 Redis 客户端。
func NewClient(opts *Options) *Client {
	if opts == nil {
		opts = defaultOptions()
	}

	dial := func() (net.Conn, error) {
		conn, err := net.DialTimeout("tcp", opts.Addr, opts.Timeout)
		if err != nil {
			return nil, err
		}
		// 发送 AUTH 和 SELECT 进行初始化
		if opts.Password != "" {
			if err := initConn(conn, opts.Password, opts.DB, opts.Timeout); err != nil {
				conn.Close()
				return nil, err
			}
		}
		return conn, nil
	}

	return &Client{
		pool:    pool.NewPool(dial, opts.PoolSize, opts.MaxIdleTime),
		addr:    opts.Addr,
		timeout: opts.Timeout,
	}
}

// initConn 对新建连接发送 AUTH 和 SELECT 命令。
func initConn(conn net.Conn, password string, db int, timeout time.Duration) error {
	conn.SetDeadline(time.Now().Add(timeout))
	defer conn.SetDeadline(time.Time{}) // 清除 deadline

	w := resp.NewWriter(conn)
	r := resp.NewReader(conn)

	if err := w.WriteCommand("AUTH", password); err != nil {
		return err
	}
	v, err := r.ReadValue()
	if err != nil {
		return err
	}
	if v.Error() != nil {
		return v.Error()
	}

	if db != 0 {
		if err := w.WriteCommand("SELECT", fmt.Sprintf("%d", db)); err != nil {
			return err
		}
		v, err = r.ReadValue()
		if err != nil {
			return err
		}
		if v.Error() != nil {
			return v.Error()
		}
	}
	return nil
}

// do 执行单条命令的核心逻辑:从池取连接 → 发送命令 → 读取响应 → 归还连接。
func (c *Client) do(ctx context.Context, args ...string) (*resp.Value, error) {
	conn, err := c.pool.Get(ctx)
	if err != nil {
		return nil, fmt.Errorf("redis: get connection: %w", err)
	}

	// 设置命令超时
	conn.SetDeadline(time.Now().Add(c.timeout))
	defer conn.SetDeadline(time.Time{})

	w := resp.NewWriter(conn)
	r := resp.NewReader(conn)

	if err := w.WriteCommand(args...); err != nil {
		c.pool.Put(conn, false) // 写失败,连接不健康
		return nil, fmt.Errorf("redis: write command: %w", err)
	}

	v, err := r.ReadValue()
	if err != nil {
		c.pool.Put(conn, false) // 读失败,连接不健康
		return nil, fmt.Errorf("redis: read response: %w", err)
	}

	c.pool.Put(conn, true) // 正常归还
	return v, nil
}

// Get 获取 key 的值。key 不存在时返回 ("", ErrNull)。
func (c *Client) Get(ctx context.Context, key string) (string, error) {
	v, err := c.do(ctx, "GET", key)
	if err != nil {
		return "", err
	}
	return v.String()
}

// Set 设置 key 的值。
func (c *Client) Set(ctx context.Context, key, value string) error {
	v, err := c.do(ctx, "SET", key, value)
	if err != nil {
		return err
	}
	if v.Error() != nil {
		return v.Error()
	}
	return nil
}

// SetEX 设置 key 的值和过期时间(秒)。
func (c *Client) SetEX(ctx context.Context, key, value string, ttl time.Duration) error {
	seconds := fmt.Sprintf("%d", int(ttl.Seconds()))
	v, err := c.do(ctx, "SET", key, value, "EX", seconds)
	if err != nil {
		return err
	}
	if v.Error() != nil {
		return v.Error()
	}
	return nil
}

// HSet 设置 hash 字段。
func (c *Client) HSet(ctx context.Context, key string, fields map[string]string) (int64, error) {
	args := []string{"HSET", key}
	for k, v := range fields {
		args = append(args, k, v)
	}
	val, err := c.do(ctx, args...)
	if err != nil {
		return 0, err
	}
	return val.Int()
}

// LPush 将一个或多个值插入列表头部,返回列表长度。
func (c *Client) LPush(ctx context.Context, key string, values ...string) (int64, error) {
	args := append([]string{"LPUSH", key}, values...)
	val, err := c.do(ctx, args...)
	if err != nil {
		return 0, err
	}
	return val.Int()
}

// Expire 设置 key 的过期时间(秒)。
func (c *Client) Expire(ctx context.Context, key string, ttl time.Duration) (bool, error) {
	seconds := fmt.Sprintf("%d", int(ttl.Seconds()))
	val, err := c.do(ctx, "EXPIRE", key, seconds)
	if err != nil {
		return false, err
	}
	n, err := val.Int()
	return n == 1, err
}

// Close 关闭客户端,释放所有连接。
func (c *Client) Close() {
	c.pool.Close()
}

Step 5:Pipeline 实现

// pipeline.go
package redis

import (
	"context"
	"fmt"
	"time"

	"redis-client/resp"
)

// PipelineResult 保存单条 pipeline 命令的结果。
type PipelineResult struct {
	Value *resp.Value
	Err   error
}

// Pipeline 积累命令,在 Exec 时一次性发送并读取所有响应。
// 注意:Pipeline 不是线程安全的,不要并发使用同一个 Pipeline 实例。
type Pipeline struct {
	client   *Client
	commands [][]string // 积累的命令列表
}

// NewPipeline 创建一个新的 Pipeline。
func (c *Client) NewPipeline() *Pipeline {
	return &Pipeline{client: c}
}

// Set 将 SET 命令加入 pipeline 队列。
func (p *Pipeline) Set(key, value string) *Pipeline {
	p.commands = append(p.commands, []string{"SET", key, value})
	return p
}

// Get 将 GET 命令加入 pipeline 队列。
func (p *Pipeline) Get(key string) *Pipeline {
	p.commands = append(p.commands, []string{"GET", key})
	return p
}

// Incr 将 INCR 命令加入 pipeline 队列。
func (p *Pipeline) Incr(key string) *Pipeline {
	p.commands = append(p.commands, []string{"INCR", key})
	return p
}

// Exec 将所有积累的命令一次性发送,并返回对应顺序的结果列表。
// 关键设计:发送完所有命令后,按顺序读取所有响应。
// 即使某条命令返回错误(如 -WRONGTYPE),也必须继续读取后续响应。
func (p *Pipeline) Exec(ctx context.Context) ([]PipelineResult, error) {
	if len(p.commands) == 0 {
		return nil, nil
	}

	conn, err := p.client.pool.Get(ctx)
	if err != nil {
		return nil, fmt.Errorf("pipeline: get connection: %w", err)
	}

	conn.SetDeadline(time.Now().Add(p.client.timeout * time.Duration(len(p.commands))))
	defer conn.SetDeadline(time.Time{})

	w := resp.NewWriter(conn)
	r := resp.NewReader(conn)

	// 阶段 1:将所有命令写入缓冲区
	// 使用 WriteCommandBuf 而非 WriteCommand,因为 WriteCommand 每次调用都会 flush。
	// 我们需要所有命令积累后一次 flush,以减少系统调用次数。
	for _, cmd := range p.commands {
		if err := w.WriteCommandBuf(cmd...); err != nil {
			p.client.pool.Put(conn, false)
			return nil, fmt.Errorf("pipeline: write command %v: %w", cmd, err)
		}
	}
	// 一次性 flush,将所有命令发送到服务器
	if err := w.Flush(); err != nil {
		p.client.pool.Put(conn, false)
		return nil, fmt.Errorf("pipeline: flush: %w", err)
	}

	// 阶段 2:按顺序读取所有响应
	// 注意:即使某条响应是错误(-ERR),也必须继续读取,否则后续响应会偏移。
	results := make([]PipelineResult, len(p.commands))
	for i := range p.commands {
		v, err := r.ReadValue()
		results[i] = PipelineResult{Value: v, Err: err}
		if err != nil {
			// 读取失败(网络错误),无法继续读取后续响应,标记连接不健康
			p.client.pool.Put(conn, false)
			// 填充剩余结果为错误
			for j := i + 1; j < len(p.commands); j++ {
				results[j] = PipelineResult{Err: fmt.Errorf("pipeline: aborted due to error at index %d", i)}
			}
			return results, err
		}
	}

	p.client.pool.Put(conn, true)
	// 清空命令队列,允许 pipeline 复用
	p.commands = p.commands[:0]
	return results, nil
}

Step 6:Pub/Sub 实现

// pubsub.go
package redis

import (
	"context"
	"fmt"
	"net"
	"time"

	"redis-client/resp"
)

// Message 是从 Redis 收到的 Pub/Sub 消息。
type Message struct {
	Channel string
	Payload string
}

// PubSub 管理一条专用的 Pub/Sub 连接。
// 订阅模式下,这条连接只能接收推送消息,不能发送普通命令。
type PubSub struct {
	conn   net.Conn
	writer *resp.Writer
	reader *resp.Reader
	msgCh  chan *Message
	errCh  chan error
	done   chan struct{}
}

// NewPubSub 建立一条新的 Pub/Sub 连接(不使用连接池)。
func (c *Client) NewPubSub() (*PubSub, error) {
	conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
	if err != nil {
		return nil, fmt.Errorf("pubsub: dial: %w", err)
	}

	ps := &PubSub{
		conn:   conn,
		writer: resp.NewWriter(conn),
		reader: resp.NewReader(conn),
		msgCh:  make(chan *Message, 100),
		errCh:  make(chan error, 1),
		done:   make(chan struct{}),
	}
	return ps, nil
}

// Subscribe 订阅一个或多个频道,并启动后台 goroutine 持续读取消息。
func (ps *PubSub) Subscribe(ctx context.Context, channels ...string) error {
	args := append([]string{"SUBSCRIBE"}, channels...)
	if err := ps.writer.WriteCommand(args...); err != nil {
		return fmt.Errorf("pubsub: subscribe: %w", err)
	}

	// 读取订阅确认消息
	// 每个 SUBSCRIBE 命令会返回 [subscribe, channel, count] 三元素数组
	for range channels {
		v, err := ps.reader.ReadValue()
		if err != nil {
			return fmt.Errorf("pubsub: read subscribe ack: %w", err)
		}
		arr, err := v.Array()
		if err != nil {
			return err
		}
		msgType, _ := arr[0].String()
		if msgType != "subscribe" {
			return fmt.Errorf("pubsub: unexpected ack type: %s", msgType)
		}
	}

	// 启动后台读取 goroutine
	go ps.readLoop(ctx)
	return nil
}

// readLoop 持续从连接读取推送消息,直到连接关闭或 ctx 取消。
func (ps *PubSub) readLoop(ctx context.Context) {
	defer close(ps.done)
	for {
		// 设置读取超时,以便定期检查 ctx 是否已取消
		ps.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))

		v, err := ps.reader.ReadValue()
		if err != nil {
			select {
			case <-ctx.Done():
				return // ctx 取消,正常退出
			default:
				// 检查是否是超时(继续循环)
				if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
					// 检查 ctx
					select {
					case <-ctx.Done():
						return
					default:
						continue
					}
				}
				ps.errCh <- err
				return
			}
		}

		arr, err := v.Array()
		if err != nil || len(arr) < 3 {
			continue
		}

		msgType, _ := arr[0].String()
		if msgType != "message" {
			continue
		}

		channel, _ := arr[1].String()
		payload, _ := arr[2].String()

		select {
		case ps.msgCh <- &Message{Channel: channel, Payload: payload}:
		case <-ctx.Done():
			return
		}
	}
}

// Channel 返回消息接收 channel。调用方应监听此 channel 和 ErrChan。
func (ps *PubSub) Channel() <-chan *Message {
	return ps.msgCh
}

// ErrChan 返回错误 channel。收到错误后 PubSub 连接已不可用。
func (ps *PubSub) ErrChan() <-chan error {
	return ps.errCh
}

// Close 关闭 Pub/Sub 连接。
func (ps *PubSub) Close() error {
	<-ps.done
	return ps.conn.Close()
}

使用示例

// main.go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	redis "redis-client"
)

func main() {
	ctx := context.Background()
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		PoolSize: 20,
	})
	defer client.Close()

	// 基础命令
	if err := client.Set(ctx, "name", "gopher"); err != nil {
		log.Fatal(err)
	}
	val, err := client.Get(ctx, "name")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("GET name:", val) // GET name: gopher

	// HSet
	n, err := client.HSet(ctx, "user:1", map[string]string{
		"name":  "Alice",
		"email": "[email protected]",
	})
	fmt.Println("HSET added:", n, err)

	// LPush
	length, err := client.LPush(ctx, "queue", "task1", "task2", "task3")
	fmt.Println("LPUSH length:", length, err)

	// Expire
	ok, err := client.Expire(ctx, "name", 10*time.Minute)
	fmt.Println("EXPIRE ok:", ok, err)

	// Pipeline
	pipe := client.NewPipeline()
	pipe.Set("k1", "v1").Set("k2", "v2").Get("k1").Get("k2")
	results, err := pipe.Exec(ctx)
	if err != nil {
		log.Fatal(err)
	}
	for i, r := range results {
		if r.Err != nil {
			fmt.Printf("result[%d] error: %v\n", i, r.Err)
		} else {
			s, _ := r.Value.String()
			fmt.Printf("result[%d]: %s\n", i, s)
		}
	}

	// Pub/Sub
	ps, err := client.NewPubSub()
	if err != nil {
		log.Fatal(err)
	}
	subCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	if err := ps.Subscribe(subCtx, "news"); err != nil {
		log.Fatal(err)
	}

	// 在另一个 goroutine 中发布消息
	go func() {
		time.Sleep(100 * time.Millisecond)
		client.do(ctx, "PUBLISH", "news", "Hello, Gopher!")
	}()

	select {
	case msg := <-ps.Channel():
		fmt.Printf("Received on %s: %s\n", msg.Channel, msg.Payload)
	case err := <-ps.ErrChan():
		fmt.Println("Pub/Sub error:", err)
	case <-subCtx.Done():
		fmt.Println("Timeout")
	}
}

Level 4 · 进阶:RESP3、集群与哨兵

RESP3 的新数据类型

Redis 6.0 引入了 RESP3 协议,通过发送 HELLO 3 命令升级连接。RESP3 新增了以下类型前缀:

前缀 类型 说明
% Map 键值对,替代返回扁平数组的命令(如 HGETALL)
~ Set 集合,元素唯一
, Double 浮点数(如 ZSCORE 返回)
( Big Number 超出 int64 范围的整数
_ Null 统一的 null 表示,替代 $-1*-1
` ` Attribute
> Push 服务器主动推送(Pub/Sub 消息,替代 *3\r\n$7\r\nmessage\r\n...

RESP3 的 Map 类型使得解析更直接:

# RESP2 的 HGETALL 响应(扁平数组,客户端需要两两配对):
*4\r\n$4\r\nname\r\n$5\r\nAlice\r\n$3\r\nage\r\n$2\r\n30\r\n

# RESP3 的 HGETALL 响应(Map 类型,直接是键值对):
%2\r\n$4\r\nname\r\n$5\r\nAlice\r\n$3\r\nage\r\n$2\r\n30\r\n

Redis Cluster 的 key slot 计算

Redis Cluster 将键空间分成 16384 个 slot(槽位),每个节点负责其中的一部分。客户端需要根据 key 计算出它属于哪个 slot,然后路由到对应的节点。

// cluster_slot.go
package redis

// crc16Table 是 CCITT-16 的查找表,用于快速计算 CRC16。
var crc16Table = [256]uint16{
	0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
	// ... (完整表省略,共256个元素)
}

// HashSlot 计算 key 对应的 Redis Cluster slot(0-16383)。
// 如果 key 包含 {} 括号,只对括号内的部分计算 hash(hash tags)。
// 例如:{user}.name 和 {user}.email 的 slot 相同,可以在同一节点上执行 MGET。
func HashSlot(key string) int {
	// 查找 hash tag({...} 部分)
	start := -1
	for i, ch := range key {
		if ch == '{' {
			start = i
		} else if ch == '}' && start >= 0 {
			tag := key[start+1 : i]
			if len(tag) > 0 {
				return int(crc16(tag) & 0x3FFF)
			}
			break
		}
	}
	return int(crc16(key) & 0x3FFF)
}

func crc16(s string) uint16 {
	crc := uint16(0)
	for i := 0; i < len(s); i++ {
		crc = (crc<<8) ^ crc16Table[((crc>>8)^uint16(s[i]))&0xFF]
	}
	return crc
}

集群客户端需要处理两种重定向:

Sentinel 故障转移

Redis Sentinel 是高可用方案,监控主节点并在主节点下线时自动提升从节点。Sentinel 客户端需要:

  1. 连接 Sentinel 节点,发送 SENTINEL get-master-addr-by-name <masterName> 获取当前主节点地址
  2. 订阅 Sentinel 的 +switch-master 事件,监听主节点切换
  3. 主节点切换时,更新连接池中的所有连接
// sentinel.go(骨架示例)
type SentinelClient struct {
	sentinels  []string
	masterName string
	*Client     // 嵌入普通 Client
}

func (s *SentinelClient) getMasterAddr() (string, error) {
	for _, sentinel := range s.sentinels {
		conn, err := net.DialTimeout("tcp", sentinel, 3*time.Second)
		if err != nil {
			continue
		}
		defer conn.Close()

		w := resp.NewWriter(conn)
		r := resp.NewReader(conn)

		w.WriteCommand("SENTINEL", "get-master-addr-by-name", s.masterName)
		v, err := r.ReadValue()
		if err != nil {
			continue
		}
		arr, err := v.Array()
		if err != nil || len(arr) != 2 {
			continue
		}
		host, _ := arr[0].String()
		port, _ := arr[1].String()
		return net.JoinHostPort(host, port), nil
	}
	return "", fmt.Errorf("sentinel: no available sentinel")
}

连接健康检查

生产级客户端需要实现主动健康检查,而不仅仅是被动等待命令失败:

// healthcheck.go
func (c *Client) startHealthCheck(interval time.Duration) {
	go func() {
		ticker := time.NewTicker(interval)
		defer ticker.Stop()
		for range ticker.C {
			ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
			_, err := c.do(ctx, "PING")
			cancel()
			if err != nil {
				// 记录指标,触发告警
				log.Printf("redis health check failed: %v", err)
			}
		}
	}()
}

与 go-redis 的性能对比

在局域网环境(RTT 约 0.1ms)下,使用 redis-benchmark 或等效 Go 基准测试,以下是典型对比数据:

场景 自实现客户端 go-redis
单条 GET(连接池,10并发) ~80,000 QPS ~90,000 QPS
Pipeline GET(批量100) ~600,000 QPS ~650,000 QPS
单条 SET(连接池,10并发) ~75,000 QPS ~88,000 QPS

差距约 10-15%,主要来自 go-redis 的优化细节:

这些差距在高并发场景下会放大,这正是生产环境应该使用 go-redis 而非自己实现的原因。但理解了这些细节,你在排查 go-redis 的连接池泄漏、pipeline 乱序等问题时,会更有方向感。


本章小结

本章从 RESP 协议出发,实现了一个功能完整的 Redis 客户端:

进阶部分讨论了 RESP3 的新类型、Cluster 的 slot 路由、Sentinel 的故障转移,以及与 go-redis 的性能对比。

自己实现 Redis 客户端不是为了替代 go-redis,而是为了在使用 go-redis 时有更清晰的心智模型——知道每一行配置背后是什么机制,知道出了问题该往哪里看。

本章评分
4.6  / 5  (3 评分)

💬 留言讨论