Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly configure executorService for s3 multipartuploads in native s3filesystem #22209

Merged

Conversation

grantatspothero
Copy link
Contributor

@grantatspothero grantatspothero commented May 30, 2024

Description

Currently the native s3filesystem S3OutputStream performs MPU using supplyAsync which defaults to using the forkjoin common pool. This common pool is meant for CPU bound operations and shouldn't be used for IO bound tasks like S3 MPU.

Additional context and related issues

Approach:

Considerations:

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
(x) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to make this configurable. As you mentioned, the old implementation used a cached thread pool, and to my knowledge this has never been a problem for anyone.

The concurrency of uploads is naturally limited by the number of output streams, which should already be limited to a relatively small number.

@@ -120,12 +126,21 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi
config.getSseKmsKeyId(),
Optional.empty(),
config.getCannedAcl());

this.uploadExecutor = new ThreadPoolExecutor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't do what you expect. If you very carefully read the Javadoc for ThreadPoolExecutor, you'll see that this will result in tasks being rejected when the maximum pool size is reached. An easier way to see why this is true is that there is no queue to hold the non-runnable tasks, so the only possible behavior would be to block the caller or reject the task, and it doesn't block the caller.

If you spend a very long time thinking about ThreadPoolExecutor and reading the documentation many times, you may eventually realize that the cached and fixed are the only useful configurations. There's a reason that we don't use it directly in Trino.

If you want to limit concurrency without having a fixed number of threads, the alternative is to use BoundedExecutor, which we do use in various places. Once we can safely use virtual threads, all of this complexity goes away -- just use a Semaphore.

}

@PreDestroy
public void destroy()
{
client.close();
uploadExecutor.close();
Copy link
Contributor

@erichwang erichwang May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() will block up to a day before interrupting tasks.

Since we probably want to use a shorter time frame, we should use MoreExecutors.shutdownAndAwaitTermination(), which does the same thing and lets us provide the same time frame. I recommend 10s as a default we commonly use here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, close() was added for structured concurrency with virtual threads. The behavior makes sense for that use case, but not for server shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, GcsFileSystemFactory also uses an executor service so I followed the existing pattern of how it shuts down. executorService.shutdownNow() which interrupts each thread and moves on without waiting for termination.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erichwang good point. btw this applies to quite many existing thread pools.
We will need static code analysis rules to cope with that.

we should use MoreExecutors.shutdownAndAwaitTermination(),

for connector/server shutdown, we should use shutdownNow() without delay, so shutdownAndAwaitTermination isn't exactly perfect

@grantatspothero grantatspothero force-pushed the gn/nativeS3FileSystemUploadThreads branch from 498de32 to 59317d6 Compare May 31, 2024 14:55
@grantatspothero
Copy link
Contributor Author

grantatspothero commented May 31, 2024

Potentially related failure: https://github.com/trinodb/trino/actions/runs/9320114775/job/25656271154?pr=22209

Error:  Tests run: 447, Failures: 0, Errors: 1, Skipped: 7, Time elapsed: 688.6 s <<< FAILURE! -- in io.trino.plugin.iceberg.TestIcebergMinioOrcConnectorTest
Error:  io.trino.plugin.iceberg.TestIcebergMinioOrcConnectorTest.testOptimizeTimePartitionedTable -- Time elapsed: 300.4 s <<< ERROR!
io.trino.testing.QueryFailedException: Failed to close manifest writer
	at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:133)
	at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:554)
	at io.trino.testing.DistributedQueryRunner.executeWithPlan(DistributedQueryRunner.java:543)
	at io.trino.testing.QueryAssertions.assertDistributedUpdate(QueryAssertions.java:108)
	at io.trino.testing.QueryAssertions.assertUpdate(QueryAssertions.java:62)
	at io.trino.testing.AbstractTestQueryFramework.assertUpdate(AbstractTestQueryFramework.java:410)
	at io.trino.plugin.iceberg.BaseIcebergConnectorTest.testOptimizeTimePartitionedTable(BaseIcebergConnectorTest.java:5163)
	at io.trino.plugin.iceberg.BaseIcebergConnectorTest.testOptimizeTimePartitionedTable(BaseIcebergConnectorTest.java:5122)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)
	Suppressed: java.lang.Exception: SQL: ALTER TABLE test_optimize_time_partitioned_timestamp6withtimezone_days EXECUTE optimize WHERE p >= DATE '2022-04-01'
		at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:561)
		... 13 more
Caused by: org.apache.iceberg.exceptions.RuntimeIOException: Failed to close manifest writer
	at org.apache.iceberg.ManifestFilterManager.filterManifestWithDeletedFiles(ManifestFilterManager.java:478)
	at org.apache.iceberg.ManifestFilterManager.filterManifest(ManifestFilterManager.java:319)
	at org.apache.iceberg.ManifestFilterManager.lambda$filterManifests$0(ManifestFilterManager.java:195)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69)
	at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.IOException: Put failed for bucket [test-iceberg-orc-es96ln7l13] key [iceberg_data/tpch/test_optimize_time_partitioned_timestamp6withtimezone_days-e632d8e17b88486397d835c82bb43520/metadata/74e5fb09-cb29-46fe-880b-ebab49f00d1d-m2.avro]: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
	at io.trino.filesystem.s3.S3OutputStream.flushBuffer(S3OutputStream.java:227)
	at io.trino.filesystem.s3.S3OutputStream.close(S3OutputStream.java:158)
	at com.google.common.io.CountingOutputStream.close(CountingOutputStream.java:69)
	at io.trino.plugin.iceberg.fileio.ForwardingOutputFile$CountingPositionOutputStream.close(ForwardingOutputFile.java:120)
	at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:190)
	at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:190)
	at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:461)
	at org.apache.iceberg.avro.AvroFileAppender.close(AvroFileAppender.java:94)
	at org.apache.iceberg.ManifestWriter.close(ManifestWriter.java:213)
	at org.apache.iceberg.ManifestFilterManager.filterManifestWithDeletedFiles(ManifestFilterManager.java:465)
	... 10 more
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
	at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:223)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:83)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:224)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
	at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:[1022](https://github.com/trinodb/trino/actions/runs/9320114775/job/25656271154?pr=22209#step:5:1023)0)
	at io.trino.filesystem.s3.S3OutputStream.flushBuffer(S3OutputStream.java:222)
	... 19 more
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 1 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 2 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 3 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 4 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 5 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 6 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 7 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 8 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 9 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
	Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 10 failure: Unable to execute HTTP request: Timeout waiting for connection from pool
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:316)
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282)
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionRequestFactory$DelegatingConnectionRequest.get(ClientConnectionRequestFactory.java:92)
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionRequestFactory$InstrumentedConnectionRequest.get(ClientConnectionRequestFactory.java:69)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:99)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:79)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:57)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
	... 42 more

@grantatspothero
Copy link
Contributor Author

grantatspothero commented May 31, 2024

S3 client is shared within the S3FileSystemFactory.

By allowing more concurrency with a cachedthreadpool (instead of the forkjoin common pool) we could be exhausting max http connections in the s3 client more easily leading to the above error. I'm not sure if this is actually the case for that test failure though.

The legacy TrinoS3FileSystem had a default max connections of 500: https://github.com/trinodb/trino/blob/master/lib/trino-hdfs/src/main/java/io/trino/hdfs/s3/HiveS3Config.java#L65

Currently S3FileSystemConfig s3.max-connections defaults to null, which uses the default max connections for the s3 client http connection pool, which is 50:
https://github.com/aws/aws-sdk-java-v2/blob/master/http-client-spi/src/main/java/software/amazon/awssdk/http/SdkHttpConfigurationOption.java#L145

IMO I think we should increase the default max connections to 500. WDYT @electrum

@electrum
Copy link
Member

The test failure in TestIcebergMinioOrcConnectorTest is due to the test setting s3.max-connections=2 which is to verify that there are no connection leaks. There is likely some parallelism combined with some scheduling that causes the timeout.

@electrum
Copy link
Member

Increasing the max connections sounds good to me.

@grantatspothero grantatspothero force-pushed the gn/nativeS3FileSystemUploadThreads branch 2 times, most recently from 776087c to 2e2a125 Compare May 31, 2024 19:00
Previously used forkjoin common pool meant for cpu bound operations
Aligns native S3FileSystem with legacy TrinoS3FileSystem
@grantatspothero grantatspothero force-pushed the gn/nativeS3FileSystemUploadThreads branch from 2e2a125 to c7eb64e Compare May 31, 2024 20:51
Copy link
Contributor

@erichwang erichwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems fine to me

@@ -120,12 +124,15 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi
config.getSseKmsKeyId(),
Optional.empty(),
config.getCannedAcl());

this.uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are no constructor provided args, you can inline this into the field declaration.

@dain dain merged commit 3fe9a30 into trinodb:master Jun 12, 2024
59 checks passed
@github-actions github-actions bot added this to the 450 milestone Jun 12, 2024
@@ -90,7 +90,7 @@ public static software.amazon.awssdk.core.retry.RetryMode getRetryMode(RetryMode
private String sseKmsKeyId;
private DataSize streamingPartSize = DataSize.of(16, MEGABYTE);
private boolean requesterPays;
private Integer maxConnections;
private Integer maxConnections = 500;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have a comment why this number. (i guess the reason is more than just "it's what we used to have in the legacy", otherwise we wouldn't make this change -- there are other things we used to have and we wanted not to carry on)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

5 participants