From 573a885bfaf044ca7daf8603d85c4c990a1ecbae Mon Sep 17 00:00:00 2001 From: Grant Nicholas Date: Thu, 30 May 2024 12:07:02 -0500 Subject: [PATCH] Explicitly configure executorService for s3 multipartuploads Previously used forkjoin common pool meant for cpu bound operations --- lib/trino-filesystem-s3/pom.xml | 5 +++++ .../java/io/trino/filesystem/s3/S3FileSystem.java | 7 +++++-- .../io/trino/filesystem/s3/S3FileSystemFactory.java | 11 +++++++++-- .../java/io/trino/filesystem/s3/S3OutputFile.java | 7 +++++-- .../java/io/trino/filesystem/s3/S3OutputStream.java | 7 +++++-- 5 files changed, 29 insertions(+), 8 deletions(-) diff --git a/lib/trino-filesystem-s3/pom.xml b/lib/trino-filesystem-s3/pom.xml index 4aa0cc499bff..d7c145925d24 100644 --- a/lib/trino-filesystem-s3/pom.xml +++ b/lib/trino-filesystem-s3/pom.xml @@ -28,6 +28,11 @@ guice + + io.airlift + concurrent + + io.airlift configuration diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index 74a4b7aab00c..2d0deb5fb61e 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -41,6 +41,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Stream; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -51,12 +52,14 @@ final class S3FileSystem implements TrinoFileSystem { + private final ExecutorService uploadExecutor; private final S3Client client; private final S3Context context; private final RequestPayer requestPayer; - public S3FileSystem(S3Client client, S3Context context) + public S3FileSystem(ExecutorService uploadExecutor, S3Client client, S3Context context) { + this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null"); this.client = requireNonNull(client, "client is null"); this.context = requireNonNull(context, "context is null"); this.requestPayer = context.requestPayer(); @@ -77,7 +80,7 @@ public TrinoInputFile newInputFile(Location location, long length) @Override public TrinoOutputFile newOutputFile(Location location) { - return new S3OutputFile(client, context, new S3Location(location)); + return new S3OutputFile(uploadExecutor, client, context, new S3Location(location)); } @Override diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java index 80799122748c..2610c0f6ca66 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java @@ -38,18 +38,22 @@ import java.net.URI; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryMode; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY; import static java.lang.Math.toIntExact; +import static java.util.concurrent.Executors.newCachedThreadPool; public final class S3FileSystemFactory implements TrinoFileSystemFactory { private final S3Client client; private final S3Context context; + private final ExecutorService uploadExecutor; @Inject public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config) @@ -122,12 +126,15 @@ else if (config.getIamRole() != null) { config.getSseKmsKeyId(), Optional.empty(), config.getCannedAcl()); + + this.uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s")); } @PreDestroy public void destroy() { client.close(); + uploadExecutor.shutdownNow(); } @Override @@ -138,10 +145,10 @@ public TrinoFileSystem create(ConnectorIdentity identity) identity.getExtraCredentials().get(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY), identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY), identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY))); - return new S3FileSystem(client, context.withCredentialsProviderOverride(credentialsProvider)); + return new S3FileSystem(uploadExecutor, client, context.withCredentialsProviderOverride(credentialsProvider)); } - return new S3FileSystem(client, context); + return new S3FileSystem(uploadExecutor, client, context); } private static Optional getStaticCredentialsProvider(S3FileSystemConfig config) diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java index 27ef778c5072..58bbfe155d89 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java @@ -20,18 +20,21 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.ExecutorService; import static java.util.Objects.requireNonNull; final class S3OutputFile implements TrinoOutputFile { + private final ExecutorService uploadExecutor; private final S3Client client; private final S3Context context; private final S3Location location; - public S3OutputFile(S3Client client, S3Context context, S3Location location) + public S3OutputFile(ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location) { + this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null"); this.client = requireNonNull(client, "client is null"); this.context = requireNonNull(context, "context is null"); this.location = requireNonNull(location, "location is null"); @@ -50,7 +53,7 @@ public void createOrOverwrite(byte[] data) @Override public OutputStream create(AggregatedMemoryContext memoryContext) { - return new S3OutputStream(memoryContext, client, context, location); + return new S3OutputStream(memoryContext, uploadExecutor, client, context, location); } @Override diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java index 20f951685e83..7791e15beaff 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import static io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl.getCannedAcl; @@ -55,6 +56,7 @@ final class S3OutputStream { private final List parts = new ArrayList<>(); private final LocalMemoryContext memoryContext; + private final ExecutorService uploadExecutor; private final S3Client client; private final S3Location location; private final S3Context context; @@ -79,9 +81,10 @@ final class S3OutputStream // Visibility is ensured by calling get() on inProgressUploadFuture. private Optional uploadId = Optional.empty(); - public S3OutputStream(AggregatedMemoryContext memoryContext, S3Client client, S3Context context, S3Location location) + public S3OutputStream(AggregatedMemoryContext memoryContext, ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location) { this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName()); + this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null"); this.client = requireNonNull(client, "client is null"); this.location = requireNonNull(location, "location is null"); this.context = requireNonNull(context, "context is null"); @@ -249,7 +252,7 @@ private void flushBuffer(boolean finished) throw e; } multipartUploadStarted = true; - inProgressUploadFuture = supplyAsync(() -> uploadPage(data, length)); + inProgressUploadFuture = supplyAsync(() -> uploadPage(data, length), uploadExecutor); } }