Chapter 38

Build a Redis Client

Chapter 38: Build a Redis Client

Billions of requests flow through go-redis or redigo to Redis servers every day. For most engineers, a Redis client is a black box: call client.Get(ctx, "key"), wait for the result. That abstraction makes you productive โ€” but also fragile. When connections leak, pipelines misorder, or cluster routing fails, you don't know what happened or where to look.

This chapter opens that black box. Starting from a raw TCP socket, we will build a Redis client step by step: one that handles GET/SET/HSET/LPUSH/EXPIRE, supports pipelining, and implements Pub/Sub subscription. Not to replace go-redis, but to understand what it does โ€” and to know where to look when it goes wrong.


Level 1 ยท Why Build Your Own Redis Client

What You Are Really Depending On When Using go-redis

When you write rdb.Get(ctx, "user:1000"), there is a complete call chain behind that one line:

Your code
  โ†’ go-redis command construction (serializes ["GET", "user:1000"] to RESP format)
  โ†’ Acquires a TCP connection from the connection pool
  โ†’ Writes RESP-encoded bytes to the socket
  โ†’ Reads the server response (also RESP format)
  โ†’ Parses the response, returns a Go value
  โ†’ Returns the connection to the pool

go-redis provides extensive wrapping at every stage: automatic retries, connection pool management, cluster routing, sentinel failover, context cancellation, command timeouts... These features make it a production-grade library.

But abstraction has a cost. When you encounter the following problems, abstraction becomes an obstacle:

These questions are hard to answer clearly without understanding the underlying protocol. Implementing one yourself is the most effective way to understand them.

The RESP Protocol: Redis's Communication Language

Redis uses RESP (Redis Serialization Protocol) as its client-server communication protocol. It is an extremely simple, human-readable, and efficient text protocol.

RESP's design philosophy is: simple enough to debug by hand. You can talk directly to Redis using telnet localhost 6379:

$ telnet localhost 6379
Trying 127.0.0.1...
Connected to localhost.
*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
+OK\r\n

That is RESP โ€” human-readable, machine-friendly, minimal protocol overhead.

What go-redis Does For You That You Cannot See

The most central thing go-redis does is connection pool management. A production Redis client may issue tens of thousands of commands per second. If every command established a new TCP connection, the three-way handshake overhead would become the bottleneck. The connection pool maintains a set of pre-established connections, allocates them on demand, and returns them when done.

go-redis also transparently handles the RESP3 HELLO handshake, TLS upgrade, and Auth authentication for every new connection. When you configure Options{Addr: "localhost:6379", Password: "secret"}, go-redis automatically sends AUTH secret when establishing each connection โ€” completely transparent to you.

Understanding these details is not about reinventing the wheel. It is about being able to quickly locate production issues: is this problem in my business logic, my connection pool configuration, or at the protocol level?


Level 2 ยท RESP Protocol Internals and Connection Management

The Five Data Types of RESP2

RESP2 defines five base types, each starting with a distinct prefix byte:

Prefix Type Example
+ Simple String +OK\r\n
- Error -ERR unknown command\r\n
: Integer :1000\r\n
$ Bulk String $6\r\nfoobar\r\n
* Array *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n

Simple String is used for simple status replies like +OK\r\n. It cannot contain newlines, so it is not suitable for binary data.

Error has the same format as Simple String but starts with -. Error types typically begin with uppercase, such as -ERR, -WRONGTYPE, -MOVED.

Integer is used for return values from counting operations like INCR and LLEN.

Bulk String is the most important type, used to transmit arbitrary binary data. The format is $<length>\r\n<data>\r\n. Special case: $-1\r\n represents a Null Bulk String (corresponds to Go's nil, returned by GET when a key does not exist).

Array is a recursive structure whose elements can be any RESP type (including nested arrays). All Redis commands are sent from clients in Array format; for example, GET foo is encoded as:

*2\r\n
$3\r\n
GET\r\n
$3\r\n
foo\r\n

The State Machine Model for RESP Parsing

Parsing RESP is fundamentally a state machine: read the first byte to determine the type, then decide the subsequent reading strategy based on that type.

For Simple String and Error: read until \r\n. For Integer: read until \r\n, parse content as int64. For Bulk String: read the length line ($<N>\r\n), then read N+2 bytes (data + \r\n). For Array: read the element count line (*<N>\r\n), then recursively parse N elements.

This recursive structure means a RESP parser can be implemented with very little code, but requires careful handling of buffering and boundary conditions.

Connection Pools: Why They Are Needed and How They Work

Establishing a TCP connection has non-negligible overhead: the three-way handshake takes roughly 1 RTT (about 0.1ms on LAN, 1-5ms cross-datacenter), and TLS handshake adds another 1-2 RTTs. For Redis access rates above ten thousand per second, if each call established a new connection, the handshake overhead alone would become the primary bottleneck.

How a connection pool works:

On initialization: establish MinIdleConns connections and place them in the pool
On Get:
  1. Take an idle connection from the pool (if available)
  2. If pool is empty and total connections < PoolSize, create new connection
  3. If pool is full, wait (with timeout)
On Put:
  1. Check connection health (send PING)
  2. Return to pool (if pool is not full)
  3. Close connection (if pool is full or connection has expired)

Key question: when is a dead connection detected? TCP keepalive defaults to 2 hours before detecting a disconnect. A more practical approach: send PING when returning a connection, or set a read deadline when taking a connection out, letting the first command failure trigger reconnection.

Pipeline: Network Optimization for Batch Commands

Pipeline packs multiple commands into a single TCP packet (or a few packets) and sends them all at once, then reads all responses in bulk. This eliminates the network round-trip latency (RTT) for each individual command.

Example of pipelining 3 commands:

# Normal mode (3 RTTs):
Send SET a 1  โ†’  Wait for +OK  โ†’  Send SET b 2  โ†’  Wait for +OK  โ†’  Send SET c 3  โ†’  Wait for +OK

# Pipeline mode (1 RTT):
Send SET a 1\r\n SET b 2\r\n SET c 3\r\n  โ†’  Wait  โ†’  Read +OK\r\n +OK\r\n +OK\r\n

The critical constraint of pipeline: requests and responses are strictly ordered. Redis guarantees sending responses in the order commands were received; the client must parse responses in the same order and dispatch them to their respective waiters.

Pub/Sub: Dedicated Connections

Pub/Sub is Redis's push model: after a client subscribes to a channel, the connection enters subscription mode. It can only receive SUBSCRIBE/UNSUBSCRIBE/MESSAGE type messages and can no longer send ordinary commands like GET/SET.

This requires Pub/Sub to use an exclusive connection that cannot be shared with connections in the normal pool. go-redis's PubSub object maintains a dedicated connection and internally runs a goroutine that continuously reads server-pushed messages, dispatching them to subscribers via channels.


Level 3 ยท Building a Redis Client from Scratch

Project Structure

redis-client/
โ”œโ”€โ”€ resp/
โ”‚   โ”œโ”€โ”€ reader.go    # RESP decoder
โ”‚   โ””โ”€โ”€ writer.go    # RESP encoder
โ”œโ”€โ”€ pool/
โ”‚   โ””โ”€โ”€ pool.go      # Connection pool
โ”œโ”€โ”€ client.go        # Main client
โ”œโ”€โ”€ pipeline.go      # Pipeline support
โ”œโ”€โ”€ pubsub.go        # Pub/Sub support
โ””โ”€โ”€ main.go          # Usage example

Step 1: RESP Encoder

// resp/writer.go
package resp

import (
	"bufio"
	"fmt"
	"io"
	"strconv"
)

// Writer encodes Go values into RESP format and writes to the underlying writer.
// Uses bufio.Writer to reduce system call frequency.
type Writer struct {
	bw *bufio.Writer
}

func NewWriter(w io.Writer) *Writer {
	return &Writer{bw: bufio.NewWriter(w)}
}

// WriteCommand encodes a command (slice of strings) as a RESP Array and writes it.
// All Redis commands are sent in this format.
func (w *Writer) WriteCommand(args ...string) error {
	// *<count>\r\n
	if err := w.writeByte('*'); err != nil {
		return err
	}
	if err := w.writeString(strconv.Itoa(len(args))); err != nil {
		return err
	}
	if err := w.writeCRLF(); err != nil {
		return err
	}

	for _, arg := range args {
		// $<len>\r\n<data>\r\n
		if err := w.writeByte('$'); err != nil {
			return err
		}
		if err := w.writeString(strconv.Itoa(len(arg))); err != nil {
			return err
		}
		if err := w.writeCRLF(); err != nil {
			return err
		}
		if err := w.writeString(arg); err != nil {
			return err
		}
		if err := w.writeCRLF(); err != nil {
			return err
		}
	}
	return w.bw.Flush()
}

func (w *Writer) writeByte(b byte) error   { return w.bw.WriteByte(b) }
func (w *Writer) writeString(s string) error { _, err := w.bw.WriteString(s); return err }
func (w *Writer) writeCRLF() error          { _, err := w.bw.WriteString("\r\n"); return err }

// WriteCommandBuf writes a command to the buffer without flushing, for pipeline use.
func (w *Writer) WriteCommandBuf(args ...string) error {
	if err := fmt.Fprintf(w.bw, "*%d\r\n", len(args)); err != nil {
		return err
	}
	for _, arg := range args {
		if _, err := fmt.Fprintf(w.bw, "$%d\r\n%s\r\n", len(arg), arg); err != nil {
			return err
		}
	}
	return nil
}

// Flush sends the buffer contents to the underlying connection.
func (w *Writer) Flush() error { return w.bw.Flush() }

Step 2: RESP Decoder

// resp/reader.go
package resp

import (
	"bufio"
	"errors"
	"fmt"
	"io"
	"strconv"
	"strings"
)

// Value represents a RESP value of any type.
type Value struct {
	typ     byte
	integer int64
	str     string
	err     error
	array   []*Value
	isNull  bool
}

var ErrNull = errors.New("redis: null value")

func (v *Value) String() (string, error) {
	if v.isNull { return "", ErrNull }
	if v.err != nil { return "", v.err }
	return v.str, nil
}

func (v *Value) Int() (int64, error) {
	if v.err != nil { return 0, v.err }
	return v.integer, nil
}

func (v *Value) Array() ([]*Value, error) {
	if v.isNull { return nil, ErrNull }
	if v.err != nil { return nil, v.err }
	return v.array, nil
}

func (v *Value) Error() error { return v.err }

// Reader reads and parses RESP messages from the underlying reader.
type Reader struct {
	br *bufio.Reader
}

func NewReader(r io.Reader) *Reader {
	return &Reader{br: bufio.NewReaderSize(r, 4096)}
}

// ReadValue reads and parses one complete RESP value.
// This is a recursive implementation: Array types recursively call ReadValue for each element.
func (r *Reader) ReadValue() (*Value, error) {
	typByte, err := r.br.ReadByte()
	if err != nil {
		return nil, fmt.Errorf("resp: read type byte: %w", err)
	}
	switch typByte {
	case '+': return r.readSimpleString()
	case '-': return r.readError()
	case ':': return r.readInteger()
	case '$': return r.readBulkString()
	case '*': return r.readArray()
	default:
		return nil, fmt.Errorf("resp: unknown type byte: %q", typByte)
	}
}

func (r *Reader) readLine() (string, error) {
	line, err := r.br.ReadString('\n')
	if err != nil { return "", err }
	return strings.TrimSuffix(strings.TrimSuffix(line, "\n"), "\r"), nil
}

func (r *Reader) readSimpleString() (*Value, error) {
	line, err := r.readLine()
	if err != nil { return nil, err }
	return &Value{typ: '+', str: line}, nil
}

func (r *Reader) readError() (*Value, error) {
	line, err := r.readLine()
	if err != nil { return nil, err }
	return &Value{typ: '-', err: errors.New(line)}, nil
}

func (r *Reader) readInteger() (*Value, error) {
	line, err := r.readLine()
	if err != nil { return nil, err }
	n, err := strconv.ParseInt(line, 10, 64)
	if err != nil { return nil, fmt.Errorf("resp: invalid integer: %q", line) }
	return &Value{typ: ':', integer: n}, nil
}

func (r *Reader) readBulkString() (*Value, error) {
	line, err := r.readLine()
	if err != nil { return nil, err }
	n, err := strconv.ParseInt(line, 10, 64)
	if err != nil { return nil, fmt.Errorf("resp: invalid bulk string length: %q", line) }
	if n == -1 { return &Value{typ: '$', isNull: true}, nil }
	buf := make([]byte, n+2)
	if _, err := io.ReadFull(r.br, buf); err != nil {
		return nil, fmt.Errorf("resp: read bulk string data: %w", err)
	}
	return &Value{typ: '$', str: string(buf[:n])}, nil
}

func (r *Reader) readArray() (*Value, error) {
	line, err := r.readLine()
	if err != nil { return nil, err }
	n, err := strconv.ParseInt(line, 10, 64)
	if err != nil { return nil, fmt.Errorf("resp: invalid array length: %q", line) }
	if n == -1 { return &Value{typ: '*', isNull: true}, nil }
	arr := make([]*Value, n)
	for i := int64(0); i < n; i++ {
		arr[i], err = r.ReadValue()
		if err != nil { return nil, fmt.Errorf("resp: read array element %d: %w", i, err) }
	}
	return &Value{typ: '*', array: arr}, nil
}

Step 3: Connection Pool

// pool/pool.go
package pool

import (
	"context"
	"errors"
	"net"
	"sync"
	"time"
)

var ErrPoolExhausted = errors.New("pool: connection pool exhausted")

// Conn wraps a TCP connection with a creation timestamp for expiry detection.
type Conn struct {
	net.Conn
	createdAt time.Time
}

// Pool is a simple TCP connection pool.
type Pool struct {
	mu          sync.Mutex
	idle        []*Conn
	numActive   int
	maxSize     int
	maxIdleTime time.Duration
	dial        func() (net.Conn, error)
	waiters     []chan *Conn
}

func NewPool(dial func() (net.Conn, error), maxSize int, maxIdleTime time.Duration) *Pool {
	return &Pool{dial: dial, maxSize: maxSize, maxIdleTime: maxIdleTime}
}

// Get acquires a connection from the pool. Creates a new one if none are idle and
// the pool is not full. Blocks until a connection becomes available or ctx is cancelled.
func (p *Pool) Get(ctx context.Context) (*Conn, error) {
	p.mu.Lock()

	for len(p.idle) > 0 {
		conn := p.idle[len(p.idle)-1]
		p.idle = p.idle[:len(p.idle)-1]
		if time.Since(conn.createdAt) > p.maxIdleTime {
			conn.Close()
			p.numActive--
			continue
		}
		p.mu.Unlock()
		return conn, nil
	}

	if p.numActive < p.maxSize {
		p.numActive++
		p.mu.Unlock()
		c, err := p.dial()
		if err != nil {
			p.mu.Lock()
			p.numActive--
			p.mu.Unlock()
			return nil, err
		}
		return &Conn{Conn: c, createdAt: time.Now()}, nil
	}

	ch := make(chan *Conn, 1)
	p.waiters = append(p.waiters, ch)
	p.mu.Unlock()

	select {
	case conn := <-ch:
		if conn == nil { return nil, ErrPoolExhausted }
		return conn, nil
	case <-ctx.Done():
		p.mu.Lock()
		for i, w := range p.waiters {
			if w == ch {
				p.waiters = append(p.waiters[:i], p.waiters[i+1:]...)
				break
			}
		}
		p.mu.Unlock()
		return nil, ctx.Err()
	}
}

// Put returns a connection to the pool. If isHealthy is false, the connection is closed.
func (p *Pool) Put(conn *Conn, isHealthy bool) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if !isHealthy {
		conn.Close()
		p.numActive--
		if len(p.waiters) > 0 {
			ch := p.waiters[0]
			p.waiters = p.waiters[1:]
			ch <- nil
		}
		return
	}

	if len(p.waiters) > 0 {
		ch := p.waiters[0]
		p.waiters = p.waiters[1:]
		ch <- conn
		return
	}
	p.idle = append(p.idle, conn)
}

func (p *Pool) Close() {
	p.mu.Lock()
	defer p.mu.Unlock()
	for _, conn := range p.idle { conn.Close() }
	p.idle = nil
}

Step 4: Main Client

// client.go
package redis

import (
	"context"
	"fmt"
	"net"
	"time"

	"redis-client/pool"
	"redis-client/resp"
)

type Client struct {
	pool    *pool.Pool
	addr    string
	timeout time.Duration
}

type Options struct {
	Addr        string
	Password    string
	DB          int
	PoolSize    int
	MaxIdleTime time.Duration
	Timeout     time.Duration
}

func NewClient(opts *Options) *Client {
	if opts == nil {
		opts = &Options{
			Addr: "localhost:6379", PoolSize: 10,
			MaxIdleTime: 5 * time.Minute, Timeout: 3 * time.Second,
		}
	}
	dial := func() (net.Conn, error) {
		conn, err := net.DialTimeout("tcp", opts.Addr, opts.Timeout)
		if err != nil { return nil, err }
		if opts.Password != "" {
			if err := initConn(conn, opts.Password, opts.DB, opts.Timeout); err != nil {
				conn.Close()
				return nil, err
			}
		}
		return conn, nil
	}
	return &Client{
		pool:    pool.NewPool(dial, opts.PoolSize, opts.MaxIdleTime),
		addr:    opts.Addr,
		timeout: opts.Timeout,
	}
}

func initConn(conn net.Conn, password string, db int, timeout time.Duration) error {
	conn.SetDeadline(time.Now().Add(timeout))
	defer conn.SetDeadline(time.Time{})
	w := resp.NewWriter(conn)
	r := resp.NewReader(conn)
	if err := w.WriteCommand("AUTH", password); err != nil { return err }
	v, err := r.ReadValue()
	if err != nil { return err }
	if v.Error() != nil { return v.Error() }
	if db != 0 {
		if err := w.WriteCommand("SELECT", fmt.Sprintf("%d", db)); err != nil { return err }
		v, err = r.ReadValue()
		if err != nil { return err }
		if v.Error() != nil { return v.Error() }
	}
	return nil
}

// do executes a single command: acquire connection โ†’ send โ†’ read โ†’ return.
func (c *Client) do(ctx context.Context, args ...string) (*resp.Value, error) {
	conn, err := c.pool.Get(ctx)
	if err != nil { return nil, fmt.Errorf("redis: get connection: %w", err) }
	conn.SetDeadline(time.Now().Add(c.timeout))
	defer conn.SetDeadline(time.Time{})
	w := resp.NewWriter(conn)
	r := resp.NewReader(conn)
	if err := w.WriteCommand(args...); err != nil {
		c.pool.Put(conn, false)
		return nil, fmt.Errorf("redis: write command: %w", err)
	}
	v, err := r.ReadValue()
	if err != nil {
		c.pool.Put(conn, false)
		return nil, fmt.Errorf("redis: read response: %w", err)
	}
	c.pool.Put(conn, true)
	return v, nil
}

func (c *Client) Get(ctx context.Context, key string) (string, error) {
	v, err := c.do(ctx, "GET", key)
	if err != nil { return "", err }
	return v.String()
}

func (c *Client) Set(ctx context.Context, key, value string) error {
	v, err := c.do(ctx, "SET", key, value)
	if err != nil { return err }
	return v.Error()
}

func (c *Client) SetEX(ctx context.Context, key, value string, ttl time.Duration) error {
	seconds := fmt.Sprintf("%d", int(ttl.Seconds()))
	v, err := c.do(ctx, "SET", key, value, "EX", seconds)
	if err != nil { return err }
	return v.Error()
}

func (c *Client) HSet(ctx context.Context, key string, fields map[string]string) (int64, error) {
	args := []string{"HSET", key}
	for k, val := range fields { args = append(args, k, val) }
	v, err := c.do(ctx, args...)
	if err != nil { return 0, err }
	return v.Int()
}

func (c *Client) LPush(ctx context.Context, key string, values ...string) (int64, error) {
	args := append([]string{"LPUSH", key}, values...)
	v, err := c.do(ctx, args...)
	if err != nil { return 0, err }
	return v.Int()
}

func (c *Client) Expire(ctx context.Context, key string, ttl time.Duration) (bool, error) {
	v, err := c.do(ctx, "EXPIRE", key, fmt.Sprintf("%d", int(ttl.Seconds())))
	if err != nil { return false, err }
	n, err := v.Int()
	return n == 1, err
}

func (c *Client) Close() { c.pool.Close() }

Step 5: Pipeline

// pipeline.go
package redis

import (
	"context"
	"fmt"
	"time"

	"redis-client/resp"
)

type PipelineResult struct {
	Value *resp.Value
	Err   error
}

// Pipeline accumulates commands and executes them all at once in Exec.
// Not goroutine-safe; do not share a Pipeline across goroutines.
type Pipeline struct {
	client   *Client
	commands [][]string
}

func (c *Client) NewPipeline() *Pipeline {
	return &Pipeline{client: c}
}

func (p *Pipeline) Set(key, value string) *Pipeline {
	p.commands = append(p.commands, []string{"SET", key, value})
	return p
}

func (p *Pipeline) Get(key string) *Pipeline {
	p.commands = append(p.commands, []string{"GET", key})
	return p
}

func (p *Pipeline) Incr(key string) *Pipeline {
	p.commands = append(p.commands, []string{"INCR", key})
	return p
}

// Exec sends all accumulated commands in one shot and returns results in order.
//
// Critical design: after writing all commands, read responses in order.
// Even if one command returns an error (-WRONGTYPE), continue reading subsequent responses โ€”
// otherwise the response stream goes out of sync.
func (p *Pipeline) Exec(ctx context.Context) ([]PipelineResult, error) {
	if len(p.commands) == 0 { return nil, nil }

	conn, err := p.client.pool.Get(ctx)
	if err != nil { return nil, fmt.Errorf("pipeline: get connection: %w", err) }

	conn.SetDeadline(time.Now().Add(p.client.timeout * time.Duration(len(p.commands))))
	defer conn.SetDeadline(time.Time{})

	w := resp.NewWriter(conn)
	r := resp.NewReader(conn)

	// Phase 1: buffer all commands
	for _, cmd := range p.commands {
		if err := w.WriteCommandBuf(cmd...); err != nil {
			p.client.pool.Put(conn, false)
			return nil, fmt.Errorf("pipeline: write command %v: %w", cmd, err)
		}
	}
	// Single flush sends everything in one TCP segment (or as few as possible)
	if err := w.Flush(); err != nil {
		p.client.pool.Put(conn, false)
		return nil, fmt.Errorf("pipeline: flush: %w", err)
	}

	// Phase 2: read all responses in order
	results := make([]PipelineResult, len(p.commands))
	for i := range p.commands {
		v, err := r.ReadValue()
		results[i] = PipelineResult{Value: v, Err: err}
		if err != nil {
			// Network error: cannot continue reading. Mark connection unhealthy.
			p.client.pool.Put(conn, false)
			for j := i + 1; j < len(p.commands); j++ {
				results[j] = PipelineResult{Err: fmt.Errorf("pipeline: aborted at index %d", i)}
			}
			return results, err
		}
	}

	p.client.pool.Put(conn, true)
	p.commands = p.commands[:0]
	return results, nil
}

Step 6: Pub/Sub

// pubsub.go
package redis

import (
	"context"
	"fmt"
	"net"
	"time"

	"redis-client/resp"
)

type Message struct {
	Channel string
	Payload string
}

// PubSub manages a dedicated Pub/Sub connection.
// In subscription mode the connection can only receive push messages โ€”
// it cannot send ordinary commands.
type PubSub struct {
	conn   net.Conn
	writer *resp.Writer
	reader *resp.Reader
	msgCh  chan *Message
	errCh  chan error
	done   chan struct{}
}

func (c *Client) NewPubSub() (*PubSub, error) {
	conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
	if err != nil { return nil, fmt.Errorf("pubsub: dial: %w", err) }
	return &PubSub{
		conn:   conn,
		writer: resp.NewWriter(conn),
		reader: resp.NewReader(conn),
		msgCh:  make(chan *Message, 100),
		errCh:  make(chan error, 1),
		done:   make(chan struct{}),
	}, nil
}

// Subscribe subscribes to one or more channels and starts a background goroutine
// to continuously read pushed messages.
func (ps *PubSub) Subscribe(ctx context.Context, channels ...string) error {
	args := append([]string{"SUBSCRIBE"}, channels...)
	if err := ps.writer.WriteCommand(args...); err != nil {
		return fmt.Errorf("pubsub: subscribe: %w", err)
	}
	// Read subscription acknowledgements
	// Each SUBSCRIBE returns a [subscribe, channel, count] three-element array
	for range channels {
		v, err := ps.reader.ReadValue()
		if err != nil { return fmt.Errorf("pubsub: read ack: %w", err) }
		arr, err := v.Array()
		if err != nil { return err }
		msgType, _ := arr[0].String()
		if msgType != "subscribe" {
			return fmt.Errorf("pubsub: unexpected ack type: %s", msgType)
		}
	}
	go ps.readLoop(ctx)
	return nil
}

func (ps *PubSub) readLoop(ctx context.Context) {
	defer close(ps.done)
	for {
		ps.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
		v, err := ps.reader.ReadValue()
		if err != nil {
			select {
			case <-ctx.Done():
				return
			default:
				if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
					select {
					case <-ctx.Done():
						return
					default:
						continue
					}
				}
				ps.errCh <- err
				return
			}
		}
		arr, err := v.Array()
		if err != nil || len(arr) < 3 { continue }
		msgType, _ := arr[0].String()
		if msgType != "message" { continue }
		channel, _ := arr[1].String()
		payload, _ := arr[2].String()
		select {
		case ps.msgCh <- &Message{Channel: channel, Payload: payload}:
		case <-ctx.Done():
			return
		}
	}
}

func (ps *PubSub) Channel() <-chan *Message { return ps.msgCh }
func (ps *PubSub) ErrChan() <-chan error    { return ps.errCh }
func (ps *PubSub) Close() error             { <-ps.done; return ps.conn.Close() }

Usage Example

func main() {
	ctx := context.Background()
	client := redis.NewClient(&redis.Options{Addr: "localhost:6379", PoolSize: 20})
	defer client.Close()

	// Basic commands
	client.Set(ctx, "name", "gopher")
	val, _ := client.Get(ctx, "name")
	fmt.Println("GET name:", val) // GET name: gopher

	// Pipeline: 4 commands, 1 RTT
	pipe := client.NewPipeline()
	pipe.Set("k1", "v1").Set("k2", "v2").Get("k1").Get("k2")
	results, _ := pipe.Exec(ctx)
	for i, r := range results {
		if r.Err == nil {
			s, _ := r.Value.String()
			fmt.Printf("result[%d]: %q\n", i, s)
		}
	}

	// Pub/Sub
	ps, _ := client.NewPubSub()
	subCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
	ps.Subscribe(subCtx, "news")
	go func() {
		time.Sleep(100 * time.Millisecond)
		client.do(ctx, "PUBLISH", "news", "Hello, Gopher!")
	}()
	select {
	case msg := <-ps.Channel():
		fmt.Printf("Received on %s: %s\n", msg.Channel, msg.Payload)
	case <-subCtx.Done():
		fmt.Println("Timeout")
	}
}

Level 4 ยท Advanced: RESP3, Cluster, and Sentinel

RESP3 New Types

Redis 6.0 introduced RESP3, activated by sending HELLO 3. New type prefixes include:

Prefix Type Notes
% Map Key-value pairs, replaces flat-array responses (e.g., HGETALL)
~ Set Unordered unique elements
, Double Floating point (e.g., ZSCORE return)
( Big Number Integers beyond int64 range
_ Null Unified null, replaces $-1 and *-1
| Attribute Command metadata (e.g., cache invalidation hints)
> Push Server-initiated push (Pub/Sub messages, replaces *3 message arrays)

RESP3 Maps make parsing more direct. Instead of receiving a flat array of interleaved keys and values from HGETALL (which the client must pair up two at a time), you receive a typed map where the structure is explicit in the wire format.

Redis Cluster Key Slot Computation

Redis Cluster divides the keyspace into 16384 slots; each node owns a subset. Clients must compute which slot a key belongs to and route to the correct node:

// HashSlot computes the Redis Cluster slot (0-16383) for a key.
// If the key contains curly braces {}, only the content inside is hashed (hash tags).
// Example: {user}.name and {user}.email hash to the same slot, enabling MGET across them.
func HashSlot(key string) int {
	start := -1
	for i, ch := range key {
		if ch == '{' {
			start = i
		} else if ch == '}' && start >= 0 {
			tag := key[start+1 : i]
			if len(tag) > 0 { return int(crc16(tag) & 0x3FFF) }
			break
		}
	}
	return int(crc16(key) & 0x3FFF)
}

Cluster clients must handle two redirect types:

Sentinel Failover Client

Redis Sentinel monitors the primary and automatically promotes a replica when the primary fails. A sentinel-aware client must:

  1. Connect to a Sentinel node and send SENTINEL get-master-addr-by-name <masterName> to discover the current primary address.
  2. Subscribe to the +switch-master event channel to receive failover notifications.
  3. On failover, drain and rebuild the connection pool pointing to the new primary.
type SentinelClient struct {
	sentinels  []string
	masterName string
	*Client
}

func (s *SentinelClient) getMasterAddr() (string, error) {
	for _, sentinel := range s.sentinels {
		conn, err := net.DialTimeout("tcp", sentinel, 3*time.Second)
		if err != nil { continue }
		defer conn.Close()
		w := resp.NewWriter(conn)
		r := resp.NewReader(conn)
		w.WriteCommand("SENTINEL", "get-master-addr-by-name", s.masterName)
		v, err := r.ReadValue()
		if err != nil { continue }
		arr, err := v.Array()
		if err != nil || len(arr) != 2 { continue }
		host, _ := arr[0].String()
		port, _ := arr[1].String()
		return net.JoinHostPort(host, port), nil
	}
	return "", fmt.Errorf("sentinel: no available sentinel node")
}

Connection Health Checks

Production clients need proactive health checks, not just passive detection on command failure:

func (c *Client) startHealthCheck(interval time.Duration) {
	go func() {
		ticker := time.NewTicker(interval)
		defer ticker.Stop()
		for range ticker.C {
			ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
			_, err := c.do(ctx, "PING")
			cancel()
			if err != nil {
				log.Printf("redis health check failed: %v", err)
			}
		}
	}()
}

Performance Comparison with go-redis

In a LAN environment (RTT ~0.1ms), typical results:

Scenario Our Client go-redis
Single GET (pool, 10 concurrent) ~80,000 QPS ~90,000 QPS
Pipeline GET (batch of 100) ~600,000 QPS ~650,000 QPS
Single SET (pool, 10 concurrent) ~75,000 QPS ~88,000 QPS

The ~10-15% gap comes from go-redis optimizations:

This gap widens at high concurrency โ€” which is exactly why production systems should use go-redis rather than a hand-rolled client. But understanding these details gives you a clear mental model for diagnosing connection pool leaks, pipeline misorderings, and cluster routing failures when go-redis does something unexpected.


Chapter Summary

This chapter started from the RESP wire protocol and built a fully functional Redis client:

The advanced section covered RESP3 new types, Cluster slot routing with CRC16 hash tags, Sentinel failover mechanics, and a performance comparison with go-redis.

Building a Redis client is not about replacing go-redis. It is about having a clear mental model when using go-redis โ€” knowing what mechanism drives every configuration knob, and knowing where to look when something goes wrong.

Rate this chapter
4.6  / 5  (3 ratings)

๐Ÿ’ฌ Comments