lettuce复数 lettuce( 二 )

RedisClient.connectStatefulAsyncprivate <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,RedisCodec<K, V> codec, Endpoint endpoint,RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {//构建ConnectionBuidler,通过ConnectionBuilder来创建connectionConnectionBuilder connectionBuilder;if (redisURI.isSsl()) {SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();sslConnectionBuilder.ssl(redisURI);connectionBuilder = sslConnectionBuilder;} else {connectionBuilder = ConnectionBuilder.connectionBuilder();}//填充StatefulRedisConnectionImplconnectionBuilder.connection(connection);//控制RedisClient行为的一些配置参数connectionBuilder.clientOptions(clientOptions);//ClientResource包含了一些EventLoopGroup信息connectionBuilder.clientResources(clientResources);//配置commandHandlerSupplier,这个commandHandler很重要,是实现StatefulRedisConnectionImpl线程安全的关键,后面会详细讲 。connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);//connectionBuilder填充Bootstrap等更多的信息//getSocketAddressSupplier是根据redisURI获取真正的Redis连接信息,如:sentinel模式下,需要从sentinel获取到真实的redis连接地址connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);//配置netty的channeltypechannelType(connectionBuilder, redisURI);if (clientOptions.isPingBeforeActivateConnection()) {if (hasPassword(redisURI)) {connectionBuilder.enableAuthPingBeforeConnect();} else {connectionBuilder.enablePingBeforeConnect();}}//初始化channel,在这一步才真正的异步的去创建物理连接ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);ConnectionFuture<?> sync = future;if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {//连接成功之后发送auth命令,做密码的验证sync = sync.thenCompose(channelHandler -> {CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getPassword());return connection.async().dispatch(CommandType.AUTH, new StatusOutput<>(codec), args);});}//设置clientName,从Redis服务端执行client list可以看到clientnameif (LettuceStrings.isNotEmpty(redisURI.getClientName())) {sync = sync.thenApply(channelHandler -> {connection.setClientName(redisURI.getClientName());return channelHandler;});}//选择dbif (redisURI.getDatabase() != 0) {sync = sync.thenCompose(channelHandler -> {CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getDatabase());return connection.async().dispatch(CommandType.SELECT, new StatusOutput<>(codec), args);});}//返回connection对象return sync.thenApply(channelHandler -> (S) connection);}RedisClient.connectionBuilder//为ConnectionBuidler填充更多的信息,如Bootstrap、channelGroupprotected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,RedisURI redisURI) {//创建Netty客户端的Bootstrap对象Bootstrap redisBootstrap = new Bootstrap();//Bootstrap的一些配置参数,具体可以参考Netty的相关书籍(Netty权威指南)redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);SocketOptions socketOptions = getOptions().getSocketOptions();redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));if (LettuceStrings.isEmpty(redisURI.getSocket())) {//keepAlive参数,默认为trueredisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());//tcp_nodelay参数,默认为trueredisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());}connectionBuilder.timeout(redisURI.getTimeout());connectionBuilder.password(redisURI.getPassword());//把构建出来的bootStrap对象赋值给connectionBuidler,由connectionBuilder创建连接connectionBuilder.bootstrap(redisBootstrap);//Netty的相关参数配置,待研究connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);//配置socket地址提供者connectionBuilder.socketAddressSupplier(socketAddressSupplier);}RedisClient.initializeChannelAsync//初始化redis连接,返回ChannelFuture对象protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(ConnectionBuilder connectionBuilder) {Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();if (clientResources.eventExecutorGroup().isShuttingDown()) {throw new IllegalStateException("Cannot connect, Event executor group is terminated.");}//创建socketAddressFuture 对象,当socketAddressSupplier异步获取SocketAddress成功之后会把SocketAddress数据放入该对象中CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();//创建channelReadyFuture,当连接建立成功之后会把Channel对象放入该对象中CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();//配置获取SocketAddress异步操作之后的操作://1. 把SocketAddress对象放入socketAddressFuture中//2. 基于SocketAddress调用initializeChannelAsync0方法真正去建立连接socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete).subscribe(redisAddress -> {if (channelReadyFuture.isCancelled()) {return;}//异步建立真正的连接,如果建立成功会把生产的Channel对象放入channelReadyFuture中initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);}, channelReadyFuture::completeExceptionally);//建立连接成功之后返回的还是connectionBuilder的connection对象,即StatefulRedisConnectionImplreturn new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection()));}

秒懂生活扩展阅读