Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging back PR #2882 #2899

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-nettynioclient-0fb07b2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Netty NIO HTTP Client",
"contributor": "",
"type": "bugfix",
"description": "Ensure initial channel used for protocol detection is released before re-acquiring"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runOrPropagate;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
Expand Down Expand Up @@ -140,7 +141,7 @@ private void completeProtocolConfiguration(Channel newChannel, Protocol protocol
closeAndRelease(newChannel, new IllegalStateException("Pool closed"));
} else {
try {
protocolImplPromise.setSuccess(configureProtocol(newChannel, protocol));
configureProtocol(newChannel, protocol);
} catch (Throwable e) {
closeAndRelease(newChannel, e);
}
Expand All @@ -154,7 +155,7 @@ private void closeAndRelease(Channel newChannel, Throwable e) {
protocolImplPromise.setFailure(e);
}

private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
private void configureProtocol(Channel newChannel, Protocol protocol) {
if (Protocol.HTTP1_1 == protocol) {
// For HTTP/1.1 we use a traditional channel pool without multiplexing
SdkChannelPool idleConnectionMetricChannelPool = new IdleConnectionCountingChannelPool(eventLoop, delegatePool);
Expand All @@ -180,8 +181,10 @@ private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
.build();
}
// Give the channel back so it can be acquired again by protocolImpl
delegatePool.release(newChannel);
return protocolImpl;
// Await the release completion to ensure we do not unnecessarily acquire a second channel
delegatePool.release(newChannel).addListener(runOrPropagate(protocolImplPromise, () -> {
protocolImplPromise.trySuccess(protocolImpl);
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
Expand Down Expand Up @@ -221,4 +222,48 @@ private static void configureSslEngine(SSLEngine sslEngine) {
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParameters);
}

/**
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
* or invoke the provided {@link Consumer} with the result of a successful operation completion. This is useful for chaining
* together multiple futures that may depend upon each other but that may not have the same return type.
* <p>
* Note that if you do not need the value returned by a successful completion (or if it returns {@link Void}) you may use
* {@link #runOrPropagate(Promise, Runnable)} instead.
*
* @param destination the Promise to notify upon failure or cancellation
* @param onSuccess the Consumer to invoke upon success
*/
public static <T> GenericFutureListener<Future<T>> consumeOrPropagate(Promise<?> destination, Consumer<T> onSuccess) {
return f -> {
if (f.isSuccess()) {
T result = f.getNow();
onSuccess.accept(result);
} else if (f.isCancelled()) {
destination.cancel(false);
} else {
destination.tryFailure(f.cause());
}
};
}

/**
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
* or invoke the provided {@link Runnable} upon successful operation completion. This is useful for chaining together multiple
* futures that may depend upon each other but that may not have the same return type.
*
* @param destination the Promise to notify upon failure or cancellation
* @param onSuccess the Runnable to invoke upon success
*/
public static <T> GenericFutureListener<Future<T>> runOrPropagate(Promise<?> destination, Runnable onSuccess) {
return f -> {
if (f.isSuccess()) {
onSuccess.run();
} else if (f.isCancelled()) {
destination.cancel(false);
} else {
destination.tryFailure(f.cause());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.http.nio.netty.internal.http2;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -241,7 +242,9 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor

public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessForProtocol(Protocol protocol) throws Exception {
Promise<Channel> acquirePromise = eventLoopGroup.next().newPromise();
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);

// startConnection
httpOrHttp2ChannelPool.acquire();
Expand All @@ -258,6 +261,7 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
eventLoopGroup.register(channel);
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(protocol));
acquirePromise.setSuccess(channel);
releasePromise.setSuccess(null);

metricsFuture.join();
MetricCollection metrics = metricCollector.collect();
Expand All @@ -267,4 +271,31 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
}

@Test(timeout = 5_000)
public void protocolFutureAwaitsReleaseFuture() throws Exception {
Promise<Channel> delegateAcquirePromise = eventLoopGroup.next().newPromise();
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
when(mockDelegatePool.acquire()).thenReturn(delegateAcquirePromise);
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);

MockChannel channel = new MockChannel();
eventLoopGroup.register(channel);
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));

// Acquire a new connection and save the returned future
Future<Channel> acquireFuture = httpOrHttp2ChannelPool.acquire();

// Return a successful connection from the delegate pool
delegateAcquirePromise.setSuccess(channel);

// The returned future should not complete until the release completes
assertThat(acquireFuture.isDone()).isFalse();

// Complete the release
releasePromise.setSuccess(null);

// Assert the returned future completes (within the test timeout)
acquireFuture.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,39 @@
import static org.mockito.Mockito.when;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.http.nio.netty.internal.MockChannel;

public class NettyUtilsTest {

private static EventLoopGroup eventLoopGroup;

@BeforeClass
public static void setup() {
eventLoopGroup = new NioEventLoopGroup(1);
}

@AfterClass
public static void teardown() throws InterruptedException {
eventLoopGroup.shutdownGracefully().await();
}

@Test
public void testGetOrCreateAttributeKey_calledTwiceWithSameName_returnsSameInstance() {
String attr = "NettyUtilsTest.Foo";
Expand Down Expand Up @@ -77,4 +99,98 @@ public void doInEventLoop_notInEventLoop_submits() {
NettyUtils.doInEventLoop(mockExecutor, () -> {});
verify(mockExecutor).submit(any(Runnable.class));
}

@Test
public void runOrPropagate_success_runs() throws Exception {
Promise<String> destination = eventLoopGroup.next().newPromise();
AtomicBoolean reference = new AtomicBoolean();

GenericFutureListener<Future<Void>> listener =
NettyUtils.runOrPropagate(destination, () -> reference.set(true));

Promise<Void> source = eventLoopGroup.next().newPromise();
source.setSuccess(null);
listener.operationComplete(source);

assertThat(reference.get()).isTrue();
}

@Test
public void runOrPropagate_exception_propagates() throws Exception {
Promise<String> destination = eventLoopGroup.next().newPromise();

GenericFutureListener<Future<Void>> listener =
NettyUtils.runOrPropagate(destination, () -> {
});

Promise<Void> source = eventLoopGroup.next().newPromise();
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
listener.operationComplete(source);

assertThat(destination.cause())
.isInstanceOf(RuntimeException.class)
.hasMessage("Intentional exception for testing purposes");
}

@Test
public void runOrPropagate_cancel_propagates() throws Exception {
Promise<String> destination = eventLoopGroup.next().newPromise();

GenericFutureListener<Future<Void>> listener =
NettyUtils.runOrPropagate(destination, () -> {
});

Promise<Void> source = eventLoopGroup.next().newPromise();
source.cancel(false);
listener.operationComplete(source);

assertThat(destination.isCancelled()).isTrue();
}

@Test
public void consumeOrPropagate_success_consumes() throws Exception {
Promise<String> destination = eventLoopGroup.next().newPromise();
AtomicReference<String> reference = new AtomicReference<>();

GenericFutureListener<Future<String>> listener =
NettyUtils.consumeOrPropagate(destination, reference::set);

Promise<String> source = eventLoopGroup.next().newPromise();
source.setSuccess("test");
listener.operationComplete(source);

assertThat(reference.get()).isEqualTo("test");
}

@Test
public void consumeOrPropagate_exception_propagates() throws Exception {
Promise<String> destination = eventLoopGroup.next().newPromise();

GenericFutureListener<Future<String>> listener =
NettyUtils.consumeOrPropagate(destination, s -> {
});

Promise<String> source = eventLoopGroup.next().newPromise();
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
listener.operationComplete(source);

assertThat(destination.cause())
.isInstanceOf(RuntimeException.class)
.hasMessage("Intentional exception for testing purposes");
}

@Test
public void consumeOrPropagate_cancel_propagates() throws Exception {
Promise<String> destination = eventLoopGroup.next().newPromise();

GenericFutureListener<Future<String>> listener =
NettyUtils.consumeOrPropagate(destination, s -> {
});

Promise<String> source = eventLoopGroup.next().newPromise();
source.cancel(false);
listener.operationComplete(source);

assertThat(destination.isCancelled()).isTrue();
}
}