From 53ba7d91c60dc647e6d52f658a4c5550a16a8dd8 Mon Sep 17 00:00:00 2001
From: John Viegas <70235430+joviegas@users.noreply.github.com>
Date: Tue, 7 Dec 2021 12:25:36 -0800
Subject: [PATCH] Revert "Revert "[netty-nio-client] Ensure initial channel
used for protocol detection is released before re-acquiring (#2882)" (#2895)"
This reverts commit d3cc7b874b9de1ccffea59decad213bb99907325.
---
.../bugfix-nettynioclient-0fb07b2.json | 6 +
.../http2/HttpOrHttp2ChannelPool.java | 11 +-
.../nio/netty/internal/utils/NettyUtils.java | 45 +++++++
.../http2/HttpOrHttp2ChannelPoolTest.java | 31 +++++
.../netty/internal/utils/NettyUtilsTest.java | 116 ++++++++++++++++++
5 files changed, 205 insertions(+), 4 deletions(-)
create mode 100644 .changes/next-release/bugfix-nettynioclient-0fb07b2.json
diff --git a/.changes/next-release/bugfix-nettynioclient-0fb07b2.json b/.changes/next-release/bugfix-nettynioclient-0fb07b2.json
new file mode 100644
index 000000000000..c4248032acc9
--- /dev/null
+++ b/.changes/next-release/bugfix-nettynioclient-0fb07b2.json
@@ -0,0 +1,6 @@
+{
+ "category": "Netty NIO HTTP Client",
+ "contributor": "",
+ "type": "bugfix",
+ "description": "Ensure initial channel used for protocol detection is released before re-acquiring"
+}
diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java
index e10a612689ec..5815cab683c6 100644
--- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java
+++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java
@@ -17,6 +17,7 @@
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;
+import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runOrPropagate;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
@@ -140,7 +141,7 @@ private void completeProtocolConfiguration(Channel newChannel, Protocol protocol
closeAndRelease(newChannel, new IllegalStateException("Pool closed"));
} else {
try {
- protocolImplPromise.setSuccess(configureProtocol(newChannel, protocol));
+ configureProtocol(newChannel, protocol);
} catch (Throwable e) {
closeAndRelease(newChannel, e);
}
@@ -154,7 +155,7 @@ private void closeAndRelease(Channel newChannel, Throwable e) {
protocolImplPromise.setFailure(e);
}
- private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
+ private void configureProtocol(Channel newChannel, Protocol protocol) {
if (Protocol.HTTP1_1 == protocol) {
// For HTTP/1.1 we use a traditional channel pool without multiplexing
SdkChannelPool idleConnectionMetricChannelPool = new IdleConnectionCountingChannelPool(eventLoop, delegatePool);
@@ -180,8 +181,10 @@ private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
.build();
}
// Give the channel back so it can be acquired again by protocolImpl
- delegatePool.release(newChannel);
- return protocolImpl;
+ // Await the release completion to ensure we do not unnecessarily acquire a second channel
+ delegatePool.release(newChannel).addListener(runOrPropagate(protocolImplPromise, () -> {
+ protocolImplPromise.trySuccess(protocolImpl);
+ }));
}
@Override
diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java
index cae4e5a538c6..14c40fd71a38 100644
--- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java
+++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java
@@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
@@ -221,4 +222,48 @@ private static void configureSslEngine(SSLEngine sslEngine) {
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParameters);
}
+
+ /**
+ * Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
+ * or invoke the provided {@link Consumer} with the result of a successful operation completion. This is useful for chaining
+ * together multiple futures that may depend upon each other but that may not have the same return type.
+ *
+ * Note that if you do not need the value returned by a successful completion (or if it returns {@link Void}) you may use
+ * {@link #runOrPropagate(Promise, Runnable)} instead.
+ *
+ * @param destination the Promise to notify upon failure or cancellation
+ * @param onSuccess the Consumer to invoke upon success
+ */
+ public static GenericFutureListener> consumeOrPropagate(Promise> destination, Consumer onSuccess) {
+ return f -> {
+ if (f.isSuccess()) {
+ T result = f.getNow();
+ onSuccess.accept(result);
+ } else if (f.isCancelled()) {
+ destination.cancel(false);
+ } else {
+ destination.tryFailure(f.cause());
+ }
+ };
+ }
+
+ /**
+ * Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
+ * or invoke the provided {@link Runnable} upon successful operation completion. This is useful for chaining together multiple
+ * futures that may depend upon each other but that may not have the same return type.
+ *
+ * @param destination the Promise to notify upon failure or cancellation
+ * @param onSuccess the Runnable to invoke upon success
+ */
+ public static GenericFutureListener> runOrPropagate(Promise> destination, Runnable onSuccess) {
+ return f -> {
+ if (f.isSuccess()) {
+ onSuccess.run();
+ } else if (f.isCancelled()) {
+ destination.cancel(false);
+ } else {
+ destination.tryFailure(f.cause());
+ }
+ };
+ }
}
diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPoolTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPoolTest.java
index 170bce1e17f6..8b524d7d3bbf 100644
--- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPoolTest.java
+++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPoolTest.java
@@ -16,6 +16,7 @@
package software.amazon.awssdk.http.nio.netty.internal.http2;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -241,7 +242,9 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessForProtocol(Protocol protocol) throws Exception {
Promise acquirePromise = eventLoopGroup.next().newPromise();
+ Promise releasePromise = eventLoopGroup.next().newPromise();
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
+ when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
// startConnection
httpOrHttp2ChannelPool.acquire();
@@ -258,6 +261,7 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
eventLoopGroup.register(channel);
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(protocol));
acquirePromise.setSuccess(channel);
+ releasePromise.setSuccess(null);
metricsFuture.join();
MetricCollection metrics = metricCollector.collect();
@@ -267,4 +271,31 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
}
+
+ @Test(timeout = 5_000)
+ public void protocolFutureAwaitsReleaseFuture() throws Exception {
+ Promise delegateAcquirePromise = eventLoopGroup.next().newPromise();
+ Promise releasePromise = eventLoopGroup.next().newPromise();
+ when(mockDelegatePool.acquire()).thenReturn(delegateAcquirePromise);
+ when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
+
+ MockChannel channel = new MockChannel();
+ eventLoopGroup.register(channel);
+ channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
+
+ // Acquire a new connection and save the returned future
+ Future acquireFuture = httpOrHttp2ChannelPool.acquire();
+
+ // Return a successful connection from the delegate pool
+ delegateAcquirePromise.setSuccess(channel);
+
+ // The returned future should not complete until the release completes
+ assertThat(acquireFuture.isDone()).isFalse();
+
+ // Complete the release
+ releasePromise.setSuccess(null);
+
+ // Assert the returned future completes (within the test timeout)
+ acquireFuture.await();
+ }
}
diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtilsTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtilsTest.java
index 3b27dd1c78e8..2b3c5040bbd3 100644
--- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtilsTest.java
+++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtilsTest.java
@@ -23,17 +23,39 @@
import static org.mockito.Mockito.when;
import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.http.nio.netty.internal.MockChannel;
public class NettyUtilsTest {
+
+ private static EventLoopGroup eventLoopGroup;
+
+ @BeforeClass
+ public static void setup() {
+ eventLoopGroup = new NioEventLoopGroup(1);
+ }
+
+ @AfterClass
+ public static void teardown() throws InterruptedException {
+ eventLoopGroup.shutdownGracefully().await();
+ }
+
@Test
public void testGetOrCreateAttributeKey_calledTwiceWithSameName_returnsSameInstance() {
String attr = "NettyUtilsTest.Foo";
@@ -77,4 +99,98 @@ public void doInEventLoop_notInEventLoop_submits() {
NettyUtils.doInEventLoop(mockExecutor, () -> {});
verify(mockExecutor).submit(any(Runnable.class));
}
+
+ @Test
+ public void runOrPropagate_success_runs() throws Exception {
+ Promise destination = eventLoopGroup.next().newPromise();
+ AtomicBoolean reference = new AtomicBoolean();
+
+ GenericFutureListener> listener =
+ NettyUtils.runOrPropagate(destination, () -> reference.set(true));
+
+ Promise source = eventLoopGroup.next().newPromise();
+ source.setSuccess(null);
+ listener.operationComplete(source);
+
+ assertThat(reference.get()).isTrue();
+ }
+
+ @Test
+ public void runOrPropagate_exception_propagates() throws Exception {
+ Promise destination = eventLoopGroup.next().newPromise();
+
+ GenericFutureListener> listener =
+ NettyUtils.runOrPropagate(destination, () -> {
+ });
+
+ Promise source = eventLoopGroup.next().newPromise();
+ source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
+ listener.operationComplete(source);
+
+ assertThat(destination.cause())
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Intentional exception for testing purposes");
+ }
+
+ @Test
+ public void runOrPropagate_cancel_propagates() throws Exception {
+ Promise destination = eventLoopGroup.next().newPromise();
+
+ GenericFutureListener> listener =
+ NettyUtils.runOrPropagate(destination, () -> {
+ });
+
+ Promise source = eventLoopGroup.next().newPromise();
+ source.cancel(false);
+ listener.operationComplete(source);
+
+ assertThat(destination.isCancelled()).isTrue();
+ }
+
+ @Test
+ public void consumeOrPropagate_success_consumes() throws Exception {
+ Promise destination = eventLoopGroup.next().newPromise();
+ AtomicReference reference = new AtomicReference<>();
+
+ GenericFutureListener> listener =
+ NettyUtils.consumeOrPropagate(destination, reference::set);
+
+ Promise source = eventLoopGroup.next().newPromise();
+ source.setSuccess("test");
+ listener.operationComplete(source);
+
+ assertThat(reference.get()).isEqualTo("test");
+ }
+
+ @Test
+ public void consumeOrPropagate_exception_propagates() throws Exception {
+ Promise destination = eventLoopGroup.next().newPromise();
+
+ GenericFutureListener> listener =
+ NettyUtils.consumeOrPropagate(destination, s -> {
+ });
+
+ Promise source = eventLoopGroup.next().newPromise();
+ source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
+ listener.operationComplete(source);
+
+ assertThat(destination.cause())
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Intentional exception for testing purposes");
+ }
+
+ @Test
+ public void consumeOrPropagate_cancel_propagates() throws Exception {
+ Promise destination = eventLoopGroup.next().newPromise();
+
+ GenericFutureListener> listener =
+ NettyUtils.consumeOrPropagate(destination, s -> {
+ });
+
+ Promise source = eventLoopGroup.next().newPromise();
+ source.cancel(false);
+ listener.operationComplete(source);
+
+ assertThat(destination.isCancelled()).isTrue();
+ }
}