Skip to content

Commit

Permalink
Explicitly configure executorService for s3 multipartuploads
Browse files Browse the repository at this point in the history
Previously used forkjoin common pool meant for cpu bound operations
  • Loading branch information
grantatspothero authored and dain committed Jun 12, 2024
1 parent 0f210c0 commit 573a885
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 8 deletions.
5 changes: 5 additions & 0 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand All @@ -51,12 +52,14 @@
final class S3FileSystem
implements TrinoFileSystem
{
private final ExecutorService uploadExecutor;
private final S3Client client;
private final S3Context context;
private final RequestPayer requestPayer;

public S3FileSystem(S3Client client, S3Context context)
public S3FileSystem(ExecutorService uploadExecutor, S3Client client, S3Context context)
{
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.context = requireNonNull(context, "context is null");
this.requestPayer = context.requestPayer();
Expand All @@ -77,7 +80,7 @@ public TrinoInputFile newInputFile(Location location, long length)
@Override
public TrinoOutputFile newOutputFile(Location location)
{
return new S3OutputFile(client, context, new S3Location(location));
return new S3OutputFile(uploadExecutor, client, context, new S3Location(location));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryMode;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
import static java.lang.Math.toIntExact;
import static java.util.concurrent.Executors.newCachedThreadPool;

public final class S3FileSystemFactory
implements TrinoFileSystemFactory
{
private final S3Client client;
private final S3Context context;
private final ExecutorService uploadExecutor;

@Inject
public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config)
Expand Down Expand Up @@ -122,12 +126,15 @@ else if (config.getIamRole() != null) {
config.getSseKmsKeyId(),
Optional.empty(),
config.getCannedAcl());

this.uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s"));
}

@PreDestroy
public void destroy()
{
client.close();
uploadExecutor.shutdownNow();
}

@Override
Expand All @@ -138,10 +145,10 @@ public TrinoFileSystem create(ConnectorIdentity identity)
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY),
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY),
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY)));
return new S3FileSystem(client, context.withCredentialsProviderOverride(credentialsProvider));
return new S3FileSystem(uploadExecutor, client, context.withCredentialsProviderOverride(credentialsProvider));
}

return new S3FileSystem(client, context);
return new S3FileSystem(uploadExecutor, client, context);
}

private static Optional<StaticCredentialsProvider> getStaticCredentialsProvider(S3FileSystemConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;

import static java.util.Objects.requireNonNull;

final class S3OutputFile
implements TrinoOutputFile
{
private final ExecutorService uploadExecutor;
private final S3Client client;
private final S3Context context;
private final S3Location location;

public S3OutputFile(S3Client client, S3Context context, S3Location location)
public S3OutputFile(ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location)
{
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.context = requireNonNull(context, "context is null");
this.location = requireNonNull(location, "location is null");
Expand All @@ -50,7 +53,7 @@ public void createOrOverwrite(byte[] data)
@Override
public OutputStream create(AggregatedMemoryContext memoryContext)
{
return new S3OutputStream(memoryContext, client, context, location);
return new S3OutputStream(memoryContext, uploadExecutor, client, context, location);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl.getCannedAcl;
Expand All @@ -55,6 +56,7 @@ final class S3OutputStream
{
private final List<CompletedPart> parts = new ArrayList<>();
private final LocalMemoryContext memoryContext;
private final ExecutorService uploadExecutor;
private final S3Client client;
private final S3Location location;
private final S3Context context;
Expand All @@ -79,9 +81,10 @@ final class S3OutputStream
// Visibility is ensured by calling get() on inProgressUploadFuture.
private Optional<String> uploadId = Optional.empty();

public S3OutputStream(AggregatedMemoryContext memoryContext, S3Client client, S3Context context, S3Location location)
public S3OutputStream(AggregatedMemoryContext memoryContext, ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location)
{
this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName());
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.location = requireNonNull(location, "location is null");
this.context = requireNonNull(context, "context is null");
Expand Down Expand Up @@ -249,7 +252,7 @@ private void flushBuffer(boolean finished)
throw e;
}
multipartUploadStarted = true;
inProgressUploadFuture = supplyAsync(() -> uploadPage(data, length));
inProgressUploadFuture = supplyAsync(() -> uploadPage(data, length), uploadExecutor);
}
}

Expand Down

0 comments on commit 573a885

Please sign in to comment.