Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close the underlying crt connection and stream properly if the stream… #4835

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-32100be.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fixed the issue in the AWS CRT sync HTTP client where the connection was left open after the stream was aborted."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal.response;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.Abortable;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;

/**
* Wrapper of {@link InputStreamSubscriber} that also implements {@link Abortable} and closes the underlying connections when
* {@link #close()} or {@link #abort()} is invoked.
*/
@SdkInternalApi
public final class AbortableInputStreamSubscriber extends InputStream implements Subscriber<ByteBuffer>, Abortable {

private final InputStreamSubscriber delegate;
private final Runnable closeConnection;

public AbortableInputStreamSubscriber(Runnable onClose, InputStreamSubscriber inputStreamSubscriber) {
this.delegate = inputStreamSubscriber;
this.closeConnection = onClose;
}

@Override
public void abort() {
close();
}

@Override
public int read() throws IOException {
return delegate.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return delegate.read(b, off, len);
}

@Override
public int read(byte[] b) throws IOException {
return delegate.read(b);
}

@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
delegate.onNext(byteBuffer);
}

@Override
public void onError(Throwable t) {
delegate.onError(t);
}

@Override
public void onComplete() {
delegate.onComplete();
}

@Override
public void close() {
closeConnection.run();
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void onSuccessfulResponseComplete(HttpStream stream) {

private void handlePublisherError(HttpStream stream, Throwable failure) {
failResponseHandlerAndFuture(stream, failure);
responseHandlerHelper.releaseConnection(stream);
responseHandlerHelper.closeConnection(stream);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this from release to close because @TingDaoK pointed to me that the connection should be shut down instead of releasing back to the pool in this case.

}

private void onFailedResponseComplete(HttpStream stream, HttpException error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
import software.amazon.awssdk.utils.async.SimplePublisher;

Expand All @@ -39,8 +40,8 @@
*/
@SdkInternalApi
public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {

private volatile InputStreamSubscriber inputStreamSubscriber;
private static final Logger log = Logger.loggerFor(InputStreamAdaptingHttpStreamResponseHandler.class);
private volatile AbortableInputStreamSubscriber inputStreamSubscriber;
private final SimplePublisher<ByteBuffer> simplePublisher = new SimplePublisher<>();

private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
Expand All @@ -66,12 +67,19 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blo
}
responseBuilder.statusCode(responseStatusCode);
}

// Propagate cancellation
requestCompletionFuture.exceptionally(t -> {
responseHandlerHelper.closeConnection(stream);
return null;
});
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
if (inputStreamSubscriber == null) {
inputStreamSubscriber = new InputStreamSubscriber();
inputStreamSubscriber = new AbortableInputStreamSubscriber(() -> responseHandlerHelper.closeConnection(stream),
new InputStreamSubscriber());
simplePublisher.subscribe(inputStreamSubscriber);
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
// the stream directly.
Expand All @@ -88,7 +96,9 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {

writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
failFutureAndReleaseConnection(stream, failure);
log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future",
failure);
failFutureAndCloseConnection(stream, failure);
return;
}

Expand All @@ -111,11 +121,6 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
}
}

private void failFutureAndReleaseConnection(HttpStream stream, Throwable failure) {
requestCompletionFuture.completeExceptionally(failure);
responseHandlerHelper.releaseConnection(stream);
}

private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) {
requestCompletionFuture.completeExceptionally(failure);
responseHandlerHelper.closeConnection(stream);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal;

import static org.mockito.Mockito.verify;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.http.crt.internal.response.AbortableInputStreamSubscriber;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;

@ExtendWith(MockitoExtension.class)
public class AbortableInputStreamSubscriberTest {

private AbortableInputStreamSubscriber abortableInputStreamSubscriber;

@Mock
private Runnable onClose;

@BeforeEach
void setUp() {
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(onClose, new InputStreamSubscriber());
}

@Test
void close_shouldInvokeOnClose() {
abortableInputStreamSubscriber.close();
verify(onClose).run();
}

@Test
void abort_shouldInvokeOnClose() {
abortableInputStreamSubscriber.abort();
verify(onClose).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public abstract class BaseHttpStreamResponseHandlerTest {
CompletableFuture requestFuture;

@Mock
private HttpStream httpStream;
HttpStream httpStream;

private HttpStreamResponseHandler responseHandler;
HttpStreamResponseHandler responseHandler;

abstract HttpStreamResponseHandler responseHandler();

Expand Down Expand Up @@ -113,7 +113,7 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {
verify(httpStream, never()).incrementWindow(anyInt());
}

private static HttpHeader[] getHttpHeaders() {
static HttpHeader[] getHttpHeaders() {
HttpHeader[] httpHeaders = new HttpHeader[1];
httpHeaders[0] = new HttpHeader("Content-Length", "1");
return httpHeaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,29 @@

package software.amazon.awssdk.http.crt.internal;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
Expand All @@ -37,4 +52,62 @@ HttpStreamResponseHandler responseHandler() {
responseHandler.prepare();
return CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler);
}

@Test
void publisherFailedToDeliverEvents_shouldShutDownConnection() {
SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler();

HttpStreamResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler);
HttpHeader[] httpHeaders = getHttpHeaders();
crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);
crtResponseHandler.onResponseHeadersDone(httpStream, 0);
crtResponseHandler.onResponseBody(httpStream, "{}".getBytes(StandardCharsets.UTF_8));

crtResponseHandler.onResponseComplete(httpStream, 0);
assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining(
"subscription has been cancelled");
verify(crtConn).shutdown();
verify(crtConn).close();
verify(httpStream).close();
}

private static class TestAsyncHttpResponseHandler implements SdkAsyncHttpResponseHandler {

@Override
public void onHeaders(SdkHttpResponse headers) {
}

@Override
public void onStream(Publisher<ByteBuffer> stream) {
stream.subscribe(new Subscriber<ByteBuffer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(1);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
subscription.cancel();
}

@Override
public void onError(Throwable t) {

}

@Override
public void onComplete() {

}
});
}

@Override
public void onError(Throwable error) {

}
}
}
Loading
Loading