diff --git a/src/main/java/io/lettuce/core/RedisHandshake.java b/src/main/java/io/lettuce/core/RedisHandshake.java index 80dc62096..bf5ad7e4b 100644 --- a/src/main/java/io/lettuce/core/RedisHandshake.java +++ b/src/main/java/io/lettuce/core/RedisHandshake.java @@ -134,8 +134,12 @@ private CompletionStage tryHandshakeResp3(Channel channel) { handshake.completeExceptionally(throwable); } } else { - onHelloResponse(settings); - handshake.complete(null); + try { + onHelloResponse(settings); + handshake.complete(null); + } catch (RuntimeException e) { + handshake.completeExceptionally(e); + } } }); diff --git a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java index b01fd810f..7bfd18760 100644 --- a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java +++ b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java @@ -1,7 +1,7 @@ package io.lettuce.core; -import static io.lettuce.TestTags.UNIT_TEST; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static io.lettuce.TestTags.*; +import static java.util.concurrent.TimeUnit.*; import static org.assertj.core.api.Assertions.*; import java.nio.ByteBuffer; @@ -13,12 +13,12 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.ProtocolVersion; import io.netty.channel.embedded.EmbeddedChannel; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; /** * Unit tests for {@link RedisHandshake}. @@ -110,6 +110,23 @@ void handshakeFireAndForgetPostHandshake() { assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); } + @Test + void handshakeWithInvalidResponseShouldPropagateException() { + + EmbeddedChannel channel = new EmbeddedChannel(true, false); + + ConnectionState state = new ConnectionState(); + state.setCredentialsProvider(new StaticCredentialsProvider(null, null)); + RedisHandshake handshake = new RedisHandshake(null, false, state); + CompletionStage handshakeInit = handshake.initialize(channel); + + AsyncCommand> hello = channel.readOutbound(); + helloStringIdResponse(hello.getOutput()); + hello.complete(); + + assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isTrue(); + } + @Test void handshakeDelayedCredentialProvider() { @@ -176,6 +193,22 @@ private static void helloResponse(CommandOutput> output) { + + output.multiMap(8); + output.set(ByteBuffer.wrap("id".getBytes())); + output.set(ByteBuffer.wrap("1".getBytes())); + + output.set(ByteBuffer.wrap("mode".getBytes())); + output.set(ByteBuffer.wrap("master".getBytes())); + + output.set(ByteBuffer.wrap("role".getBytes())); + output.set(ByteBuffer.wrap("master".getBytes())); + + output.set(ByteBuffer.wrap("version".getBytes())); + output.set(ByteBuffer.wrap("1.2.3".getBytes())); + } + static class DelayedRedisCredentialsProvider implements RedisCredentialsProvider { private final Sinks.One credentialsSink = Sinks.one();