diff --git a/.changes/next-release/bugfix-AmazonS3-6386be9.json b/.changes/next-release/bugfix-AmazonS3-6386be9.json new file mode 100644 index 000000000000..93af13e80825 --- /dev/null +++ b/.changes/next-release/bugfix-AmazonS3-6386be9.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Amazon S3", + "contributor": "", + "description": "Fixed an issue in S3 multipart client that could cause `BlockingInputStreamAsyncRequestBody#writeInputStream` to get stuck if any of the multipart request fails. See [#4801](https://github.com/aws/aws-sdk-java-v2/issues/4801)" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index 6d8d18a14754..12278cf84dca 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -203,6 +203,7 @@ public void onError(Throwable t) { private void sendCurrentBody(AsyncRequestBody body) { downstreamPublisher.send(body).exceptionally(t -> { downstreamPublisher.error(t); + upstreamSubscription.cancel(); return null; }); } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index d2e06f28492a..6f116ca2667c 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.core.internal.async; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; @@ -146,6 +147,20 @@ public Optional contentLength() { } + @Test + void downStreamFailed_shouldPropagateCancellation() { + CompletableFuture future = new CompletableFuture<>(); + TestAsyncRequestBody asyncRequestBody = new TestAsyncRequestBody(); + SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes((long) CHUNK_SIZE) + .bufferSizeInBytes(10L) + .build()); + + assertThatThrownBy(() -> splittingPublisher.subscribe(requestBody -> { + throw new RuntimeException("foobar"); + }).get(5, TimeUnit.SECONDS)).hasMessageContaining("foobar"); + assertThat(asyncRequestBody.cancelled).isTrue(); + } private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java new file mode 100644 index 000000000000..19bf3988e3ec --- /dev/null +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java @@ -0,0 +1,121 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.internal.multipart; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; +import static com.github.tomakehurst.wiremock.client.WireMock.delete; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.put; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import java.io.InputStream; +import java.net.URI; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +@WireMockTest +@Timeout(10) +public class S3MultipartClientPutObjectWiremockTest { + + private static final String BUCKET = "Example-Bucket"; + private static final String KEY = "Example-Object"; + private static final String CREATE_MULTIPART_PAYLOAD = "\n" + + " string\n" + + " string\n" + + " string\n" + + ""; + private S3AsyncClient s3AsyncClient; + + @BeforeEach + public void setup(WireMockRuntimeInfo wiremock) { + stubPutObjectCalls(); + s3AsyncClient = S3AsyncClient.builder() + .region(Region.US_EAST_1) + .endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort())) + .credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret"))) + .multipartEnabled(true) + .multipartConfiguration(b -> b.minimumPartSizeInBytes(10L).apiCallBufferSizeInBytes(10L)) + .httpClientBuilder(AwsCrtAsyncHttpClient.builder()) + .build(); + } + + private void stubPutObjectCalls() { + stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody(CREATE_MULTIPART_PAYLOAD))); + stubFor(put(anyUrl()).willReturn(aResponse().withStatus(404))); + stubFor(put(urlEqualTo("/Example-Bucket/Example-Object?partNumber=1&uploadId=string")).willReturn(aResponse().withStatus(200))); + stubFor(delete(anyUrl()).willReturn(aResponse().withStatus(200))); + } + + // https://github.com/aws/aws-sdk-java-v2/issues/4801 + @Test + void uploadWithUnknownContentLength_onePartFails_shouldCancelUpstream() { + BlockingInputStreamAsyncRequestBody blockingInputStreamAsyncRequestBody = AsyncRequestBody.forBlockingInputStream(null); + CompletableFuture putObjectResponse = s3AsyncClient.putObject( + r -> r.bucket(BUCKET).key(KEY), blockingInputStreamAsyncRequestBody); + + assertThatThrownBy(() -> { + try (InputStream inputStream = createUnlimitedInputStream()) { + blockingInputStreamAsyncRequestBody.writeInputStream(inputStream); + } + }).isInstanceOf(CancellationException.class); + + assertThatThrownBy(() -> putObjectResponse.join()).hasRootCauseInstanceOf(S3Exception.class); + } + + @Test + void uploadWithKnownContentLength_onePartFails_shouldCancelUpstream() { + BlockingInputStreamAsyncRequestBody blockingInputStreamAsyncRequestBody = + AsyncRequestBody.forBlockingInputStream(1024L * 20); // must be larger than the buffer used in + // InputStreamConsumingPublisher to trigger the error + CompletableFuture putObjectResponse = s3AsyncClient.putObject( + r -> r.bucket(BUCKET).key(KEY), blockingInputStreamAsyncRequestBody); + + assertThatThrownBy(() -> { + try (InputStream inputStream = createUnlimitedInputStream()) { + blockingInputStreamAsyncRequestBody.writeInputStream(inputStream); + } + }).isInstanceOf(CancellationException.class); + + assertThatThrownBy(() -> putObjectResponse.join()).hasRootCauseInstanceOf(S3Exception.class); + } + + private InputStream createUnlimitedInputStream() { + return new InputStream() { + @Override + public int read() { + return 1; + } + }; + } +}