Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose futureCompletionExecutor on S3 CRT client #4880

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS CRT-based S3 Client",
"contributor": "",
"description": "Allow users to configure future completion executor on the AWS CRT-based S3 client via `S3CrtAsyncClientBuilder#futureCompletionExecutor`. See [#4879](https://github.com/aws/aws-sdk-java-v2/issues/4879)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import java.net.URI;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand Down Expand Up @@ -281,6 +284,30 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
*/
S3CrtAsyncClientBuilder thresholdInBytes(Long thresholdInBytes);

/**
* Configure the {@link Executor} that should be used to complete the {@link CompletableFuture} that is returned by the async
* service client. By default, this is a dedicated, per-client {@link ThreadPoolExecutor} that is managed by the SDK.
* <p>
* The configured {@link Executor} will be invoked by the async HTTP client's I/O threads (e.g., EventLoops), which must be
* reserved for non-blocking behavior. Blocking an I/O thread can cause severe performance degradation, including across
* multiple clients, as clients are configured, by default, to share a single I/O thread pool (e.g., EventLoopGroup).
* <p>
* You should typically only want to customize the future-completion {@link Executor} for a few possible reasons:
* <ol>
* <li>You want more fine-grained control over the {@link ThreadPoolExecutor} used, such as configuring the pool size
* or sharing a single pool between multiple clients.
* <li>You want to add instrumentation (i.e., metrics) around how the {@link Executor} is used.
* <li>You know, for certain, that all of your {@link CompletableFuture} usage is strictly non-blocking, and you wish to
* remove the minor overhead incurred by using a separate thread. In this case, you can use
* {@code Runnable::run} to execute the future-completion directly from within the I/O thread.
* </ol>
*
* @param futureCompletionExecutor the executor
* @return an instance of this builder.
*/
S3CrtAsyncClientBuilder futureCompletionExecutor(Executor futureCompletionExecutor);


@Override
S3AsyncClient build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.services.s3.internal.crt;

import static software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME;
import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.AUTH_SCHEMES;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
Expand All @@ -30,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
Expand All @@ -56,6 +58,7 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
Expand Down Expand Up @@ -120,22 +123,30 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
builder.executionInterceptors.forEach(overrideConfigurationBuilder::addExecutionInterceptor);
}

return S3AsyncClient.builder()
// Disable checksum for streaming operations, it is handled in CRT. Checksumming for non-streaming
// operations is still handled in HttpChecksumStage
.serviceConfiguration(S3Configuration.builder()
.checksumValidationEnabled(false)
.build())
.region(builder.region)
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.overrideConfiguration(overrideConfigurationBuilder.build())
.accelerate(builder.accelerate)
.forcePathStyle(builder.forcePathStyle)
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
.putAuthScheme(new CrtS3ExpressNoOpAuthScheme())
.httpClientBuilder(initializeS3CrtAsyncHttpClient(builder))
.build();
S3AsyncClientBuilder s3AsyncClientBuilder =
S3AsyncClient.builder()
// Disable checksum for streaming operations, it is handled in
// CRT. Checksumming for non-streaming
// operations is still handled in HttpChecksumStage
.serviceConfiguration(S3Configuration.builder()
.checksumValidationEnabled(false)
.build())
.region(builder.region)
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.overrideConfiguration(overrideConfigurationBuilder.build())
.accelerate(builder.accelerate)
.forcePathStyle(builder.forcePathStyle)
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
.putAuthScheme(new CrtS3ExpressNoOpAuthScheme())
.httpClientBuilder(initializeS3CrtAsyncHttpClient(builder));


if (builder.futureCompletionExecutor != null) {
s3AsyncClientBuilder.asyncConfiguration(b -> b.advancedOption(FUTURE_COMPLETION_EXECUTOR,
builder.futureCompletionExecutor));
}
return s3AsyncClientBuilder.build();
}

private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(DefaultS3CrtClientBuilder builder) {
Expand Down Expand Up @@ -186,6 +197,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
private S3CrtRetryConfiguration retryConfiguration;
private boolean crossRegionAccessEnabled;
private Long thresholdInBytes;
private Executor futureCompletionExecutor;

@Override
public S3CrtAsyncClientBuilder credentialsProvider(
Expand Down Expand Up @@ -281,6 +293,12 @@ public S3CrtAsyncClientBuilder thresholdInBytes(Long thresholdInBytes) {
return this;
}

@Override
public S3CrtAsyncClientBuilder futureCompletionExecutor(Executor futureCompletionExecutor) {
this.futureCompletionExecutor = futureCompletionExecutor;
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
import static com.github.tomakehurst.wiremock.client.WireMock.head;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.net.URI;
import java.util.concurrent.Executor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.crt.CrtResource;
Expand All @@ -42,7 +49,21 @@
@WireMockTest
public class S3CrtClientWiremockTest {

private static final String LOCATION = "http://Example-Bucket.s3.amazonaws.com/Example-Object";
private static final String BUCKET = "Example-Bucket";
private static final String KEY = "Example-Object";
private static final String E_TAG = "\"3858f62230ac3c915f300c664312c11f-9\"";
private static final String XML_RESPONSE_BODY = String.format(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n"
+ "<Location>%s</Location>\n"
+ "<Bucket>%s</Bucket>\n"
+ "<Key>%s</Key>\n"
+ "<ETag>%s</ETag>\n"
+ "</CompleteMultipartUploadResult>", LOCATION, BUCKET, KEY, E_TAG);
private S3AsyncClient s3AsyncClient;
private S3AsyncClient clientWithCustomExecutor;
private SpyableExecutor mockExecutor;

@BeforeAll
public static void setUpBeforeAll() {
Expand All @@ -68,27 +89,43 @@ public void tearDown() {

@Test
public void completeMultipartUpload_completeResponse() {
String location = "http://Example-Bucket.s3.amazonaws.com/Example-Object";
String bucket = "Example-Bucket";
String key = "Example-Object";
String eTag = "\"3858f62230ac3c915f300c664312c11f-9\"";
String xmlResponseBody = String.format(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n"
+ "<Location>%s</Location>\n"
+ "<Bucket>%s</Bucket>\n"
+ "<Key>%s</Key>\n"
+ "<ETag>%s</ETag>\n"
+ "</CompleteMultipartUploadResult>", location, bucket, key, eTag);

stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(xmlResponseBody)));
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(XML_RESPONSE_BODY)));

CompleteMultipartUploadResponse response = s3AsyncClient.completeMultipartUpload(
r -> r.bucket(bucket).key(key).uploadId("upload-id")).join();
r -> r.bucket(BUCKET).key(KEY).uploadId("upload-id")).join();

assertThat(response.location()).isEqualTo(location);
assertThat(response.bucket()).isEqualTo(bucket);
assertThat(response.key()).isEqualTo(key);
assertThat(response.eTag()).isEqualTo(eTag);
assertThat(response.location()).isEqualTo(LOCATION);
assertThat(response.bucket()).isEqualTo(BUCKET);
assertThat(response.key()).isEqualTo(KEY);
assertThat(response.eTag()).isEqualTo(E_TAG);
}

@Test
void overrideResponseCompletionExecutor_shouldCompleteWithCustomExecutor(WireMockRuntimeInfo wiremock) {

mockExecutor = Mockito.spy(new SpyableExecutor());

try (S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder()
.region(Region.US_EAST_1)
.endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
.futureCompletionExecutor(mockExecutor)
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("key",
"secret")))
.build()) {
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody(XML_RESPONSE_BODY)));

CompleteMultipartUploadResponse response = s3AsyncClient.completeMultipartUpload(
r -> r.bucket(BUCKET).key(KEY).uploadId("upload-id")).join();

verify(mockExecutor).execute(any(Runnable.class));
}
}

private static class SpyableExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run();
}
}
}
Loading