Skip to content

Commit

Permalink
Fixed an issue in S3 multipart client where `BlockingInputStreamAsync…
Browse files Browse the repository at this point in the history
…RequestBody#writeInputStream` could get stuck if any of the multipart request fails.
  • Loading branch information
zoewangg committed Feb 27, 2024
1 parent 44bf0d9 commit 258a3e1
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-6386be9.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fixed an issue in S3 multipart client that could cause `BlockingInputStreamAsyncRequestBody#writeInputStream` to get stuck if any of the multipart request fails. See [#4801](https://github.com/aws/aws-sdk-java-v2/issues/4801)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void onError(Throwable t) {

private void sendCurrentBody(AsyncRequestBody body) {
downstreamPublisher.send(body).exceptionally(t -> {
downstreamPublisher.error(t);
upstreamSubscription.cancel();
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.core.internal.async;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
Expand Down Expand Up @@ -146,6 +147,20 @@ public Optional<Long> contentLength() {

}

@Test
void downStreamFailed_shouldPropagateCancellation() {
CompletableFuture<Void> future = new CompletableFuture<>();
TestAsyncRequestBody asyncRequestBody = new TestAsyncRequestBody();
SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, AsyncRequestBodySplitConfiguration.builder()
.chunkSizeInBytes((long) CHUNK_SIZE)
.bufferSizeInBytes(10L)
.build());

assertThatThrownBy(() -> splittingPublisher.subscribe(requestBody -> {
throw new RuntimeException("foobar");
}).get(5, TimeUnit.SECONDS)).hasMessageContaining("foobar");
assertThat(asyncRequestBody.cancelled).isTrue();
}

private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception {
SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.multipart;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
import static com.github.tomakehurst.wiremock.client.WireMock.delete;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.put;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.io.InputStream;
import java.net.URI;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;

@WireMockTest
@Timeout(10)
public class S3MultipartClientPutObjectWiremockTest {

private static final String BUCKET = "Example-Bucket";
private static final String KEY = "Example-Object";
private static final String CREATE_MULTIPART_PAYLOAD = "<InitiateMultipartUploadResult>\n"
+ " <Bucket>string</Bucket>\n"
+ " <Key>string</Key>\n"
+ " <UploadId>string</UploadId>\n"
+ "</InitiateMultipartUploadResult>";
private S3AsyncClient s3AsyncClient;

@BeforeEach
public void setup(WireMockRuntimeInfo wiremock) {
stubPutObjectCalls();
s3AsyncClient = S3AsyncClient.builder()
.region(Region.US_EAST_1)
.endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")))
.multipartEnabled(true)
.multipartConfiguration(b -> b.minimumPartSizeInBytes(10L).apiCallBufferSizeInBytes(10L))
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
.build();
}

private void stubPutObjectCalls() {
stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody(CREATE_MULTIPART_PAYLOAD)));
stubFor(put(anyUrl()).willReturn(aResponse().withStatus(404)));
stubFor(put(urlEqualTo("/Example-Bucket/Example-Object?partNumber=1&uploadId=string")).willReturn(aResponse().withStatus(200)));
stubFor(delete(anyUrl()).willReturn(aResponse().withStatus(200)));
}

// https://github.com/aws/aws-sdk-java-v2/issues/4801
@Test
void uploadWithUnknownContentLength_onePartFails_shouldCancelUpstream() {
BlockingInputStreamAsyncRequestBody blockingInputStreamAsyncRequestBody = AsyncRequestBody.forBlockingInputStream(null);
CompletableFuture<PutObjectResponse> putObjectResponse = s3AsyncClient.putObject(
r -> r.bucket(BUCKET).key(KEY), blockingInputStreamAsyncRequestBody);

assertThatThrownBy(() -> {
try (InputStream inputStream = createUnlimitedInputStream()) {
blockingInputStreamAsyncRequestBody.writeInputStream(inputStream);
}
}).isInstanceOf(CancellationException.class);

assertThatThrownBy(() -> putObjectResponse.join()).hasRootCauseInstanceOf(S3Exception.class);
}

@Test
void uploadWithKnownContentLength_onePartFails_shouldCancelUpstream() {
BlockingInputStreamAsyncRequestBody blockingInputStreamAsyncRequestBody =
AsyncRequestBody.forBlockingInputStream(1024L * 20); // must be larger than the buffer used in
// InputStreamConsumingPublisher to trigger the error
CompletableFuture<PutObjectResponse> putObjectResponse = s3AsyncClient.putObject(
r -> r.bucket(BUCKET).key(KEY), blockingInputStreamAsyncRequestBody);

assertThatThrownBy(() -> {
try (InputStream inputStream = createUnlimitedInputStream()) {
blockingInputStreamAsyncRequestBody.writeInputStream(inputStream);
}
}).isInstanceOf(CancellationException.class);

assertThatThrownBy(() -> putObjectResponse.join()).hasRootCauseInstanceOf(S3Exception.class);
}

private InputStream createUnlimitedInputStream() {
return new InputStream() {
@Override
public int read() {
return 1;
}
};
}
}

0 comments on commit 258a3e1

Please sign in to comment.