Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support S3 Select on native readers #17522

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions docs/src/main/sphinx/connector/hive-s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ workload:
* Your query filters out more than half of the original data set.
* Your query filter predicates use columns that have a data type supported by
Trino and S3 Select.
The ``TIMESTAMP``, ``REAL``, and ``DOUBLE`` data types are not supported by S3
Select Pushdown. We recommend using the DECIMAL data type for numerical data.
For more information about supported data types for S3 Select, see the
The ``TIMESTAMP``, ``DECIMAL``, ``REAL``, and ``DOUBLE`` data types are not
supported by S3 Select Pushdown. For more information about supported data
types for S3 Select, see the
`Data Types documentation <https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-data-types.html>`_.
* Your network connection between Amazon S3 and the Amazon EMR cluster has good
transfer speed and available bandwidth. Amazon S3 Select does not compress
Expand All @@ -391,7 +391,7 @@ workload:
Considerations and limitations
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* Only objects stored in CSV and JSON format are supported. Objects can be uncompressed,
* Only objects stored in JSON format are supported. Objects can be uncompressed,
or optionally compressed with gzip or bzip2.
* The "AllowQuotedRecordDelimiters" property is not supported. If this property
is specified, the query fails.
Expand All @@ -416,6 +416,10 @@ pushed down to S3 Select. Changes in the Hive connector :ref:`performance tuning
configuration properties <hive-performance-tuning-configuration>` are likely to impact
S3 Select pushdown performance.

S3 Select can be enabled for TEXTFILE data using the
``hive.s3select-pushdown.experimental-textfile-pushdown-enabled`` configuration property,
however this has been shown to produce incorrect results. For more information see
`the GitHub Issue. <https://github.com/trinodb/trino/issues/17775>`_

Understanding and tuning the maximum connections
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
5 changes: 4 additions & 1 deletion docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ Hive connector documentation.
`Table Statistics <#table-statistics>`__ for details.
- ``true``
* - ``hive.s3select-pushdown.enabled``
- Enable query pushdown to AWS S3 Select service.
- Enable query pushdown to JSON files using the AWS S3 Select service.
- ``false``
* - ``hive.s3select-pushdown.experimental-textfile-pushdown-enabled``
- Enable query pushdown to TEXTFILE tables using the AWS S3 Select service.
- ``false``
* - ``hive.s3select-pushdown.max-connections``
- Maximum number of simultaneously open connections to S3 for
Expand Down
5 changes: 5 additions & 0 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to use AWS CRT since it's in C and thus goes through JNI

</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.InputSerialization;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.OutputSerialization;
import software.amazon.awssdk.services.s3.model.RequestPayer;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
Expand All @@ -44,16 +47,18 @@
import static com.google.common.collect.Multimaps.toMultimap;
import static java.util.Objects.requireNonNull;

final class S3FileSystem
public final class S3FileSystem
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make the constructor package private with this change

implements TrinoFileSystem
{
private final S3Client client;
private final S3AsyncClient asyncClient;
private final S3Context context;
private final RequestPayer requestPayer;

public S3FileSystem(S3Client client, S3Context context)
public S3FileSystem(S3Client client, S3AsyncClient asyncClient, S3Context context)
{
this.client = requireNonNull(client, "client is null");
this.asyncClient = requireNonNull(asyncClient, "asyncClient is null");
this.context = requireNonNull(context, "context is null");
this.requestPayer = context.requestPayer();
}
Expand All @@ -70,6 +75,11 @@ public TrinoInputFile newInputFile(Location location, long length)
return new S3InputFile(client, context, new S3Location(location), length);
}

public TrinoInputFile newS3SelectInputFile(Location location, String query, boolean enableScanRange, InputSerialization inputSerialization, OutputSerialization outputSerialization)
{
return new S3SelectInputFile(client, asyncClient, context, new S3Location(location), query, enableScanRange, inputSerialization, outputSerialization);
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
import jakarta.annotation.PreDestroy;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.sts.StsClient;
Expand All @@ -38,13 +44,33 @@ public final class S3FileSystemFactory
implements TrinoFileSystemFactory
{
private final S3Client client;
private final S3AsyncClient asyncClient;
private final S3Context context;

@Inject
public S3FileSystemFactory(S3FileSystemConfig config)
{
S3ClientBuilder s3 = S3Client.builder();
applyS3Properties(s3, config);
s3.httpClient(buildHttpClient(config));

S3AsyncClientBuilder asyncS3 = S3AsyncClient.builder();
applyS3Properties(asyncS3, config);
asyncS3.httpClient(buildAsyncHttpClient(config));

this.client = s3.build();
this.asyncClient = asyncS3.build();

context = new S3Context(
toIntExact(config.getStreamingPartSize().toBytes()),
config.isRequesterPays(),
config.getSseType(),
config.getSseKmsKeyId());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing blank line

}

private static void applyS3Properties(S3BaseClientBuilder<?, ?> s3, S3FileSystemConfig config)
{
if ((config.getAwsAccessKey() != null) && (config.getAwsSecretKey() != null)) {
s3.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(config.getAwsAccessKey(), config.getAwsSecretKey())));
Expand All @@ -70,7 +96,10 @@ public S3FileSystemFactory(S3FileSystemConfig config)
.asyncCredentialUpdateEnabled(true)
.build());
}
}

private static SdkHttpClient buildHttpClient(S3FileSystemConfig config)
{
ApacheHttpClient.Builder httpClient = ApacheHttpClient.builder()
.maxConnections(config.getMaxConnections());

Expand All @@ -83,26 +112,34 @@ public S3FileSystemFactory(S3FileSystemConfig config)
.build());
}

s3.httpClientBuilder(httpClient);
return httpClient.build();
}

this.client = s3.build();
private static SdkAsyncHttpClient buildAsyncHttpClient(S3FileSystemConfig config)
{
AwsCrtAsyncHttpClient.Builder httpClient = AwsCrtAsyncHttpClient.builder();
if (config.getHttpProxy() != null) {
String scheme = config.isHttpProxySecure() ? "https" : "http";
httpClient.proxyConfiguration(software.amazon.awssdk.http.crt.ProxyConfiguration.builder()
.scheme(scheme)
.host(config.getHttpProxy().getHost())
.port(config.getHttpProxy().getPort())
.build());
}

context = new S3Context(
toIntExact(config.getStreamingPartSize().toBytes()),
config.isRequesterPays(),
config.getSseType(),
config.getSseKmsKeyId());
return httpClient.build();
}

@PreDestroy
public void destroy()
{
client.close();
asyncClient.close();
}

@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
return new S3FileSystem(client, context);
return new S3FileSystem(client, asyncClient, context);
}
}
Loading