Skip to content

Commit

Permalink
Propagate initialization failures from ChannelInitializer #2475
Browse files Browse the repository at this point in the history
We try to extract the init failure on a best-effort basis to reveal the underlying cause for a connect failure.
  • Loading branch information
mp911de committed Aug 7, 2023
1 parent 6a141b3 commit 8e425f5
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 13 deletions.
26 changes: 17 additions & 9 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import reactor.core.publisher.Mono;
import io.lettuce.core.event.command.CommandListener;
import io.lettuce.core.event.connection.ConnectEvent;
import io.lettuce.core.event.connection.ConnectionCreatedEvent;
Expand Down Expand Up @@ -61,6 +60,7 @@
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import reactor.core.publisher.Mono;

/**
* Base Redis client. This class holds the netty infrastructure, {@link ClientOptions} and the basic connection procedure. This
Expand Down Expand Up @@ -112,8 +112,8 @@ public abstract class AbstractRedisClient implements AutoCloseable {
/**
* Create a new instance with client resources.
*
* @param clientResources the client resources. If {@code null}, the client will create a new dedicated instance of
* client resources and keep track of them.
* @param clientResources the client resources. If {@code null}, the client will create a new dedicated instance of client
* resources and keep track of them.
*/
protected AbstractRedisClient(ClientResources clientResources) {

Expand Down Expand Up @@ -395,8 +395,7 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia

String uriString = connectionBuilder.getRedisURI().toString();

EventRecorder.getInstance().record(
new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId()));
EventRecorder.getInstance().record(new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId()));
EventRecorder.RecordableEvent event = EventRecorder.getInstance()
.start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId()));

Expand Down Expand Up @@ -439,15 +438,24 @@ private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, Comple

connectFuture.addListener(future -> {

Channel channel = connectFuture.channel();
if (!future.isSuccess()) {

logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
Throwable cause = future.cause();
Throwable detail = channel.attr(ConnectionBuilder.INIT_FAILURE).get();

if (detail != null) {
detail.addSuppressed(cause);
cause = detail;
}

logger.debug("Connecting to Redis at {}: {}", redisAddress, cause);
connectionBuilder.endpoint().initialState();
channelReadyFuture.completeExceptionally(future.cause());
channelReadyFuture.completeExceptionally(cause);
return;
}

RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class);
RedisHandshakeHandler handshakeHandler = channel.pipeline().get(RedisHandshakeHandler.class);

if (handshakeHandler == null) {
channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered"));
Expand All @@ -461,7 +469,7 @@ private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, Comple
logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
channelReadyFuture.complete(connectFuture.channel());
channelReadyFuture.complete(channel);
return;
}

Expand Down
13 changes: 11 additions & 2 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.function.Function;
import java.util.function.Supplier;

import jdk.net.ExtendedSocketOptions;
import reactor.core.publisher.Mono;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.CommandEncoder;
import io.lettuce.core.protocol.CommandHandler;
Expand All @@ -41,6 +39,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand All @@ -49,6 +48,8 @@
import io.netty.util.AttributeKey;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import jdk.net.ExtendedSocketOptions;
import reactor.core.publisher.Mono;

/**
* Connection builder for connections. This class is part of the internal API.
Expand All @@ -61,6 +62,8 @@ public class ConnectionBuilder {

public static final AttributeKey<String> REDIS_URI = AttributeKey.valueOf("RedisURI");

public static final AttributeKey<Throwable> INIT_FAILURE = AttributeKey.valueOf("ConnectionBuilder.INIT_FAILURE");

private Mono<SocketAddress> socketAddressSupplier;

private ConnectionEvents connectionEvents;
Expand Down Expand Up @@ -332,6 +335,12 @@ private void doInitialize(Channel channel) {
clientResources.nettyCustomizer().afterChannelInitialized(channel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().attr(INIT_FAILURE).set(cause);
super.exceptionCaught(ctx, cause);
}

}

/**
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/io/lettuce/core/SslConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
Expand All @@ -50,7 +51,6 @@ public class SslConnectionBuilder extends ConnectionBuilder {

private RedisURI redisURI;


public SslConnectionBuilder ssl(RedisURI redisURI) {
this.redisURI = redisURI;
return this;
Expand All @@ -67,7 +67,7 @@ public static SslConnectionBuilder sslConnectionBuilder() {
* @return
* @since 6.0.8
*/
public static boolean isSslChannelInitializer(ChannelHandler handler){
public static boolean isSslChannelInitializer(ChannelHandler handler) {
return handler instanceof SslChannelInitializer;
}

Expand Down Expand Up @@ -181,6 +181,12 @@ private SSLEngine initializeSSLEngine(ByteBufAllocator alloc) throws IOException
return sslEngine;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().attr(INIT_FAILURE).set(cause);
super.exceptionCaught(ctx, cause);
}

}

}
24 changes: 24 additions & 0 deletions src/test/java/io/lettuce/core/ClientIntegrationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.net.SocketAddress;
import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.inject.New;
Expand All @@ -31,10 +32,12 @@
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.NettyCustomizer;
import io.lettuce.test.Delay;
import io.lettuce.test.LettuceExtension;
import io.lettuce.test.Wait;
import io.lettuce.test.resource.FastShutdown;
import io.netty.channel.Channel;

/**
* @author Will Glozer
Expand All @@ -44,6 +47,7 @@
class ClientIntegrationTests extends TestSupport {

private final RedisClient client;

private final RedisCommands<String, String> redis;

@Inject
Expand All @@ -61,6 +65,25 @@ void close(@New StatefulRedisConnection<String, String> connection) {
assertThatThrownBy(() -> connection.sync().get(key)).isInstanceOf(RedisException.class);
}

@Test
void propagatesChannelInitFailure() {

ClientResources handshakeFailure = ClientResources.builder().nettyCustomizer(new NettyCustomizer() {

@Override
public void afterChannelInitialized(Channel channel) {
throw new NoSuchElementException();
}

}).build();
RedisURI uri = RedisURI.create(host, port);
RedisClient customClient = RedisClient.create(handshakeFailure, uri);
assertThatException().isThrownBy(customClient::connect).withRootCauseInstanceOf(NoSuchElementException.class);

FastShutdown.shutdown(customClient);
FastShutdown.shutdown(handshakeFailure);
}

@Test
void statefulConnectionFromSync() {
assertThat(redis.getStatefulConnection().sync()).isSameAs(redis);
Expand Down Expand Up @@ -265,4 +288,5 @@ void pubSubConnectionShouldSetClientName() {

connection.close();
}

}

0 comments on commit 8e425f5

Please sign in to comment.