lettuce复数 lettuce

【lettuce复数 lettuce】lettuce-core版本: 5.1.7.RELEASE
先看一下Lettuce的基本使用方法,使用Lettuce大概分为如下几步:

  1. 基于Redis连接信息创建RedisClient
  2. 基于RedisClient创建StatefulRedisConnection
  3. 从Connection中获取Command,基于Command执行Redis命令操作 。
/** * @author xiaobing * @date 2019/12/20 */public class LettuceSimpleUse {private void testLettuce() throws ExecutionException, InterruptedException {//构建RedisClient对象,RedisClient包含了Redis的基本配置信息,可以基于RedisClient创建RedisConnectionRedisClient client = RedisClient.create("redis://localhost");//创建一个线程安全的StatefulRedisConnection,可以多线程并发对该connection操作,底层只有一个物理连接.StatefulRedisConnection<String, String> connection = client.connect();//获取SyncCommand 。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三种commandRedisStringCommands<String, String> sync = connection.sync();String value = https://www.doubo5.com/sync.get("key");System.out.println("get redis value with lettuce sync command, value is :" + value);//获取SyncCommand 。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三种commandRedisAsyncCommands async = connection.async();RedisFuture getFuture = async.get("key");value = getFuture.get();System.out.println("get redis value with lettuce async command, value is :" + value);}public static void main(String[] args) throws ExecutionException, InterruptedException {new LettuceSimpleUse().testLettuce();}}先看一张建立连接的时序图,有一个直观的印象 。

lettuce复数 lettuce

文章插图
lettuce源码--建立redis连接


RedisClient一个可扩展、线程安全的RedisClient,支持sync、async、reactor执行模式 。
RedisClient.create只是传入了一些配置信息,此时并没有创建连接 。
// 使用默认的ClientResourcepublic static RedisClient create(String uri) {LettuceAssert.notEmpty(uri, "URI must not be empty");return new RedisClient(null, RedisURI.create(uri));}// ClientResources中包含了一些配置和线程池信息,是一个比较重的资源,多个RedisClient可以共享同一个ClientResourceprotected RedisClient(ClientResources clientResources, RedisURI redisURI) {super(clientResources);assertNotNull(redisURI);this.redisURI = redisURI;setDefaultTimeout(redisURI.getTimeout()); }RedisClient.connnect可以看到connect方法有一些重载方法,默认的是用UTF8 String对key和value序列化,通过传入RedisCodec支持自定义的对Key和Value的序列化方式 。
public StatefulRedisConnection<String, String> connect() {return connect(newStringStringCodec());}public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {checkForRedisURI();//connectStandaloneAsync是异步创建connection,返回的是Future对象,通过getConnection转为同步操作return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));}//异步转同步操作protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {try {return connectionFuture.get();} catch (InterruptedException e) {Thread.currentThread().interrupt();throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);} catch (Exception e) {if (e instanceof ExecutionException) {throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e.getCause());}throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);}}RedisClient.connectStandaloneAsyncprivate <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,RedisURI redisURI, Duration timeout) {assertNotNull(codec);checkValidRedisURI(redisURI);logger.debug("Trying to get a Redis connection for: " + redisURI);//创建一个有状态的EndPoint用于抽象底层channel的实现,DefaultEndpoint内部封装断线重连、重连后成功后回放连接失败期间的command 。同时封装了AT_MOST_ONCE、AT_LEAST_ONCE的可靠性实现(该逻辑是基于内存的,所以并不可靠) 。DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);RedisChannelWriter writer = endpoint;//进一步封装,添加支持过期时间的执行命令if (CommandExpiryWriter.isSupported(clientOptions)) {writer = new CommandExpiryWriter(writer, clientOptions, clientResources);}//创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令 。此时并没有真正的创建物理连接,该类本身是无状态、线程安全的 。StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, codec, timeout);//异步创建Redis物理连接,返回future对象 。后面可以看到future中返回的对象其实还是上面的connectionConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,() -> new CommandHandler(clientOptions, clientResources, endpoint));future.whenComplete((channelHandler, throwable) -> {if (throwable != null) {connection.close();}});return future;}//StatefulRedisConnectionImpl的构造函数,此时已经创建了sync、async、reactive三种类型的RedisCommand 。基于RedisCodec对key和value序列化,通过write把命令真正的发出去 。public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V>codec, Duration timeout) {super(writer, timeout);this.codec = codec;this.async = newRedisAsyncCommandsImpl();this.sync = newRedisSyncCommandsImpl();this.reactive = newRedisReactiveCommandsImpl();}

秒懂生活扩展阅读