第 31 章

Java 客户端:Jedis、Lettuce 与 Redisson

第31章 Java 客户端:Jedis、Lettuce 与 Redisson

Java 生态是 Redis 客户端最成熟的生态之一。三大主流客户端各有定位:Jedis 简单直接、Lettuce 异步高性能、Redisson 提供分布式对象抽象。本章从连接池实现原理开始,深入讲解每个客户端的设计思想、核心配置与生产调优要点。


31.1 Jedis:同步阻塞模型

31.1.1 连接池原理(GenericObjectPool)

Jedis 本身不是线程安全的——一个 Jedis 实例对应一条 TCP 连接,在多线程环境下必须通过连接池管理。JedisPool 内部持有一个 GenericObjectPool<Jedis>(来自 Apache Commons Pool2),其核心机制如下:

对象生命周期

关键配置参数

JedisPoolConfig config = new JedisPoolConfig();

// 连接数量控制
config.setMaxTotal(50);                        // 最大连接数(含借出+空闲)
config.setMaxIdle(20);                         // 最大空闲连接数
config.setMinIdle(5);                          // 最小空闲连接数(预热用)

// 等待策略
config.setMaxWait(Duration.ofMillis(3000));    // 等待可用连接的超时时间
config.setBlockWhenExhausted(true);            // 池耗尽时阻塞等待(false=立即抛异常)

// 连接健康检测
config.setTestOnBorrow(true);                  // 借出前 PING 检测
config.setTestOnReturn(false);                 // 归还时检测(生产一般关闭)
config.setTestWhileIdle(true);                 // 后台线程定时检测空闲连接
config.setTimeBetweenEvictionRuns(Duration.ofSeconds(30)); // 检测间隔
config.setMinEvictableIdleTime(Duration.ofMinutes(1));     // 空闲超过此时间则销毁

JedisPool pool = new JedisPool(config, "localhost", 6379, 2000, "password");

连接泄漏是最常见的生产故障。若业务代码取出连接后抛出异常但未捕获,连接不会归还,池中计数的"借出"连接不断增加,最终新请求全部超时阻塞。

正确用法必须使用 try-with-resources:

// 正确:自动归还
try (Jedis jedis = pool.getResource()) {
    jedis.set("key", "value");
    return jedis.get("key");
}

// 错误:可能泄漏
Jedis jedis = pool.getResource();
jedis.set("key", "value"); // 若此处抛异常,连接永远不归还
jedis.close();

31.1.2 Pipeline 批量优化

每个 Redis 命令默认需要一个 RTT(网络往返延迟)。本地环回约 0.05ms,同机房约 0.5ms,跨机房可达 5-50ms。Pipeline 将多个命令合并在一次网络写入中发送,服务端顺序执行后批量返回结果。

try (Jedis jedis = pool.getResource()) {
    Pipeline pipe = jedis.pipelined();

    // 批量写入1000个key
    for (int i = 0; i < 1000; i++) {
        pipe.set("key:" + i, "value:" + i);
        pipe.expire("key:" + i, 3600);
    }

    // 同步等待所有响应
    List<Object> results = pipe.syncAndReturnAll();

    // 检查每个命令的执行结果
    for (int i = 0; i < results.size(); i++) {
        Object result = results.get(i);
        if (result instanceof Exception e) {
            log.error("Command {} failed: {}", i, e.getMessage());
        }
    }
}

Pipeline 注意事项

31.1.3 Jedis Cluster(集群模式)

Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("10.0.0.1", 7000));
nodes.add(new HostAndPort("10.0.0.2", 7001));
nodes.add(new HostAndPort("10.0.0.3", 7002));

JedisCluster cluster = new JedisCluster(nodes, connectionTimeout, soTimeout,
    maxAttempts, "password", config);

// 使用方式与单节点一致,内部自动路由
cluster.set("user:1001", "Alice");
String value = cluster.get("user:1001");

JedisCluster 内部维护 slot → node 的路由表(共 16384 个槽),并为每个节点维护一个独立的 JedisPool。当返回 MOVED 重定向时,刷新路由表;当返回 ASK 重定向时(槽迁移中),只对本次请求跟随重定向。


31.2 Lettuce:异步多路复用模型

31.2.1 核心设计:基于 Netty

Lettuce 基于 Netty 的事件循环模型,一个 StatefulRedisConnection 对应一条 TCP 连接,但通过 Netty 的 pipeline 机制支持多个并发请求(命令队列化发送,响应按序匹配)。这意味着多线程可以共享同一连接,不需要连接池(也可选用连接池)。

// 创建客户端(全局单例,持有 Netty EventLoopGroup)
RedisClient client = RedisClient.create(
    RedisURI.builder()
        .withHost("localhost")
        .withPort(6379)
        .withPassword("password".toCharArray())
        .withTimeout(Duration.ofSeconds(5))
        .build()
);

// 创建连接(线程安全,可共享)
StatefulRedisConnection<String, String> connection = client.connect();

31.2.2 三种命令接口

同步接口(阻塞等待结果,最易用):

RedisCommands<String, String> sync = connection.sync();
sync.set("key", "value");
String val = sync.get("key");

异步接口(非阻塞,基于 CompletableFuture):

RedisAsyncCommands<String, String> async = connection.async();

RedisFuture<String> future = async.get("key");

// 链式处理
future
    .thenApply(v -> v != null ? v.toUpperCase() : "DEFAULT")
    .thenAccept(System.out::println)
    .exceptionally(ex -> { log.error("Redis error", ex); return null; });

// 等待多个命令
RedisFuture<String> f1 = async.get("key1");
RedisFuture<String> f2 = async.get("key2");
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, f1, f2);

响应式接口(基于 Project Reactor,与 Spring WebFlux 天然集成):

RedisReactiveCommands<String, String> reactive = connection.reactive();

// 单值
Mono<String> mono = reactive.get("key");
mono.subscribe(System.out::println);

// 流式处理(SCAN等命令)
reactive.keys("user:*")
    .flatMap(key -> reactive.get(key))
    .filter(v -> v != null && v.startsWith("active"))
    .collectList()
    .subscribe(list -> log.info("Active users: {}", list.size()));

31.2.3 连接池配置(当需要隔离时)

GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
    new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(16);
poolConfig.setMaxIdle(8);
poolConfig.setMinIdle(2);

ConnectionPoolSupport<StatefulRedisConnection<String, String>> pool =
    ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);

try (StatefulRedisConnection<String, String> conn = pool.borrowObject()) {
    RedisCommands<String, String> cmd = conn.sync();
    cmd.set("key", "value");
}

31.2.4 客户端选项调优

ClientOptions options = ClientOptions.builder()
    .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) // 断连时拒绝命令(不排队)
    .autoReconnect(true)                  // 自动重连
    .pingBeforeActivateConnection(true)   // 激活连接前先 PING
    .socketOptions(SocketOptions.builder()
        .connectTimeout(Duration.ofSeconds(5))
        .keepAlive(true)                  // TCP keepalive
        .build())
    .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(3))) // 命令超时
    .build();

client.setOptions(options);

31.2.5 Jedis vs Lettuce 对比

维度 Jedis Lettuce
线程安全 非线程安全(需连接池) 线程安全(单连接多路复用)
编程模型 同步阻塞 同步 / 异步 / 响应式
连接数 高(每线程一个连接) 低(共享连接)
底层框架 直接 Socket Netty
Spring Boot 默认 Boot 1.x 默认 Boot 2.x+ 默认
适用场景 简单项目、脚本工具 高并发、微服务、响应式系统

31.3 Redisson:分布式对象层

31.3.1 设计理念

Redisson 在 Lettuce/Netty 之上提供了一层高级抽象,将 Redis 的各种数据结构封装为 Java 的分布式对象(实现 java.util.concurrent 接口),让开发者无需关心底层命令细节。

Config config = new Config();
config.useSingleServer()
    .setAddress("redis://localhost:6379")
    .setPassword("password")
    .setConnectionPoolSize(10)
    .setConnectionMinimumIdleSize(2)
    .setTimeout(3000)
    .setRetryAttempts(3)
    .setRetryInterval(1500);

RedissonClient redisson = Redisson.create(config);

31.3.2 分布式锁与 WatchDog 机制

// 基本分布式锁
RLock lock = redisson.getLock("order:lock:1001");

// 方式1:永不超时(WatchDog 自动续期)
lock.lock();
try {
    // 业务逻辑,WatchDog 每10s续期一次,保证锁不过期
} finally {
    lock.unlock();
}

// 方式2:指定超时(不启动 WatchDog)
lock.lock(30, TimeUnit.SECONDS);

// 方式3:尝试获取(非阻塞)
boolean acquired = lock.tryLock(3, 30, TimeUnit.SECONDS);
if (acquired) {
    try { /* 业务 */ } finally { lock.unlock(); }
}

// 公平锁(按请求顺序排队)
RLock fairLock = redisson.getFairLock("fairLock");

// 联锁(同时锁多个资源)
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2);
multiLock.lock();

WatchDog 续期机制:Redisson 获取锁(不指定 leaseTime)后,在内部 Netty 时间轮中注册一个定时任务,每 leaseTime/3(默认 10 秒)执行一次 PEXPIRE,将锁的过期时间重置为 leaseTime(默认 30 秒)。业务调用 unlock() 时取消该任务。若进程宕机,定时任务消失,锁自然在 30 秒后过期,避免死锁。

31.3.3 其他分布式对象

// 分布式 Map(自动 Java 序列化/反序列化)
RMap<String, User> userMap = redisson.getMap("users");
userMap.put("user:1001", new User("Alice", 25));
User user = userMap.get("user:1001");
userMap.fastPut("user:1002", new User("Bob", 30)); // 不返回旧值,性能更好

// 带本地缓存的 Map(L1+L2 双层缓存)
LocalCachedMapOptions<String, User> cacheOptions = LocalCachedMapOptions.<String, User>defaults()
    .cacheSize(1000)
    .timeToLive(Duration.ofMinutes(10))
    .syncStrategy(LocalCachedMapOptions.SyncStrategy.INVALIDATE);
RLocalCachedMap<String, User> cachedMap = redisson.getLocalCachedMap("users", cacheOptions);

// 分布式 Queue
RQueue<String> queue = redisson.getQueue("tasks");
queue.offer("task:1");
String task = queue.poll();

// 延迟队列
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(queue);
delayedQueue.offer("delayed-task", 10, TimeUnit.MINUTES); // 10分钟后进入主队列

// 布隆过滤器
RBloomFilter<Long> bloomFilter = redisson.getBloomFilter("user_ids");
bloomFilter.tryInit(10_000_000L, 0.001); // 1000万元素,0.1% 误判率
bloomFilter.add(1001L);
boolean exists = bloomFilter.contains(9999L);

// 分布式限速器(令牌桶)
RRateLimiter rateLimiter = redisson.getRateLimiter("api:limiter");
rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.SECONDS);
boolean allowed = rateLimiter.tryAcquire(1);

// 分布式信号量
RSemaphore semaphore = redisson.getSemaphore("database:connections");
semaphore.trySetPermits(10); // 最多10个并发
semaphore.acquire();
try { /* 访问数据库 */ } finally { semaphore.release(); }

31.3.4 集群配置

Config config = new Config();
config.useClusterServers()
    .addNodeAddress(
        "redis://10.0.0.1:7000",
        "redis://10.0.0.2:7001",
        "redis://10.0.0.3:7002"
    )
    .setReadMode(ReadMode.SLAVE)          // 读操作路由到从节点
    .setLoadBalancer(new RoundRobinLoadBalancer())
    .setScanInterval(2000)                // 集群拓扑扫描间隔(ms)
    .setMasterConnectionPoolSize(10)
    .setSlaveConnectionPoolSize(10);

31.4 生产选型建议

场景 推荐客户端 原因
Spring Boot 普通业务 Lettuce(Spring Data Redis) 已集成,异步能力强
需要分布式锁/限流 Redisson 开箱即用,WatchDog 续期
简单脚本/工具 Jedis API 直观,依赖少
响应式/WebFlux Lettuce Reactive 原生 Reactor 支持
高频小对象存储 Redisson RMap + 本地缓存 减少网络开销

Spring Boot 集成最佳实践

spring:
  data:
    redis:
      host: localhost
      port: 6379
      password: secret
      lettuce:
        pool:
          max-active: 16
          max-idle: 8
          min-idle: 2
          max-wait: 3000ms
        shutdown-timeout: 200ms
      connect-timeout: 5000ms
      timeout: 3000ms
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }
}

31.5 常见问题排查

问题1:JedisConnectionException: Could not get a resource from the pool

问题2:Lettuce 命令超时但连接未断

问题3:Redisson WatchDog 不工作

问题4:序列化问题导致数据乱码

本章评分
4.6  / 5  (3 评分)

💬 留言讨论