diff --git a/docs/src/main/sphinx/connector/hive-s3.rst b/docs/src/main/sphinx/connector/hive-s3.rst index 5c1cdcec8281..028235f27b29 100644 --- a/docs/src/main/sphinx/connector/hive-s3.rst +++ b/docs/src/main/sphinx/connector/hive-s3.rst @@ -380,9 +380,9 @@ workload: * Your query filters out more than half of the original data set. * Your query filter predicates use columns that have a data type supported by Trino and S3 Select. - The ``TIMESTAMP``, ``REAL``, and ``DOUBLE`` data types are not supported by S3 - Select Pushdown. We recommend using the DECIMAL data type for numerical data. - For more information about supported data types for S3 Select, see the + The ``TIMESTAMP``, ``DECIMAL``, ``REAL``, and ``DOUBLE`` data types are not + supported by S3 Select Pushdown. For more information about supported data + types for S3 Select, see the `Data Types documentation `_. * Your network connection between Amazon S3 and the Amazon EMR cluster has good transfer speed and available bandwidth. Amazon S3 Select does not compress @@ -391,7 +391,7 @@ workload: Considerations and limitations ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -* Only objects stored in CSV and JSON format are supported. Objects can be uncompressed, +* Only objects stored in JSON format are supported. Objects can be uncompressed, or optionally compressed with gzip or bzip2. * The "AllowQuotedRecordDelimiters" property is not supported. If this property is specified, the query fails. @@ -416,6 +416,10 @@ pushed down to S3 Select. Changes in the Hive connector :ref:`performance tuning configuration properties ` are likely to impact S3 Select pushdown performance. +S3 Select can be enabled for TEXTFILE data using the +``hive.s3select-pushdown.experimental-textfile-pushdown-enabled`` configuration property, +however this has been shown to produce incorrect results. For more information see +`the GitHub Issue. `_ Understanding and tuning the maximum connections ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/src/main/sphinx/connector/hive.rst b/docs/src/main/sphinx/connector/hive.rst index 6611185f8af5..78976414b509 100644 --- a/docs/src/main/sphinx/connector/hive.rst +++ b/docs/src/main/sphinx/connector/hive.rst @@ -232,7 +232,10 @@ Hive connector documentation. `Table Statistics <#table-statistics>`__ for details. - ``true`` * - ``hive.s3select-pushdown.enabled`` - - Enable query pushdown to AWS S3 Select service. + - Enable query pushdown to JSON files using the AWS S3 Select service. + - ``false`` + * - ``hive.s3select-pushdown.experimental-textfile-pushdown-enabled`` + - Enable query pushdown to TEXTFILE tables using the AWS S3 Select service. - ``false`` * - ``hive.s3select-pushdown.max-connections`` - Maximum number of simultaneously open connections to S3 for diff --git a/lib/trino-filesystem-s3/pom.xml b/lib/trino-filesystem-s3/pom.xml index 4240c9b2b16d..d7129e8a9dc1 100644 --- a/lib/trino-filesystem-s3/pom.xml +++ b/lib/trino-filesystem-s3/pom.xml @@ -73,6 +73,11 @@ + + software.amazon.awssdk + aws-crt-client + + software.amazon.awssdk auth diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index 48a74456ea4a..50ee92f9f54a 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -21,12 +21,15 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.InputSerialization; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.OutputSerialization; import software.amazon.awssdk.services.s3.model.RequestPayer; import software.amazon.awssdk.services.s3.model.S3Error; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; @@ -44,16 +47,18 @@ import static com.google.common.collect.Multimaps.toMultimap; import static java.util.Objects.requireNonNull; -final class S3FileSystem +public final class S3FileSystem implements TrinoFileSystem { private final S3Client client; + private final S3AsyncClient asyncClient; private final S3Context context; private final RequestPayer requestPayer; - public S3FileSystem(S3Client client, S3Context context) + public S3FileSystem(S3Client client, S3AsyncClient asyncClient, S3Context context) { this.client = requireNonNull(client, "client is null"); + this.asyncClient = requireNonNull(asyncClient, "asyncClient is null"); this.context = requireNonNull(context, "context is null"); this.requestPayer = context.requestPayer(); } @@ -70,6 +75,11 @@ public TrinoInputFile newInputFile(Location location, long length) return new S3InputFile(client, context, new S3Location(location), length); } + public TrinoInputFile newS3SelectInputFile(Location location, String query, boolean enableScanRange, InputSerialization inputSerialization, OutputSerialization outputSerialization) + { + return new S3SelectInputFile(client, asyncClient, context, new S3Location(location), query, enableScanRange, inputSerialization, outputSerialization); + } + @Override public TrinoOutputFile newOutputFile(Location location) { diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java index 065ede41c99f..1b850b6cbc96 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java @@ -20,9 +20,15 @@ import jakarta.annotation.PreDestroy; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.sts.StsClient; @@ -38,13 +44,33 @@ public final class S3FileSystemFactory implements TrinoFileSystemFactory { private final S3Client client; + private final S3AsyncClient asyncClient; private final S3Context context; @Inject public S3FileSystemFactory(S3FileSystemConfig config) { S3ClientBuilder s3 = S3Client.builder(); + applyS3Properties(s3, config); + s3.httpClient(buildHttpClient(config)); + S3AsyncClientBuilder asyncS3 = S3AsyncClient.builder(); + applyS3Properties(asyncS3, config); + asyncS3.httpClient(buildAsyncHttpClient(config)); + + this.client = s3.build(); + this.asyncClient = asyncS3.build(); + + context = new S3Context( + toIntExact(config.getStreamingPartSize().toBytes()), + config.isRequesterPays(), + config.getSseType(), + config.getSseKmsKeyId()); + + } + + private static void applyS3Properties(S3BaseClientBuilder s3, S3FileSystemConfig config) + { if ((config.getAwsAccessKey() != null) && (config.getAwsSecretKey() != null)) { s3.credentialsProvider(StaticCredentialsProvider.create( AwsBasicCredentials.create(config.getAwsAccessKey(), config.getAwsSecretKey()))); @@ -70,7 +96,10 @@ public S3FileSystemFactory(S3FileSystemConfig config) .asyncCredentialUpdateEnabled(true) .build()); } + } + private static SdkHttpClient buildHttpClient(S3FileSystemConfig config) + { ApacheHttpClient.Builder httpClient = ApacheHttpClient.builder() .maxConnections(config.getMaxConnections()); @@ -83,26 +112,34 @@ public S3FileSystemFactory(S3FileSystemConfig config) .build()); } - s3.httpClientBuilder(httpClient); + return httpClient.build(); + } - this.client = s3.build(); + private static SdkAsyncHttpClient buildAsyncHttpClient(S3FileSystemConfig config) + { + AwsCrtAsyncHttpClient.Builder httpClient = AwsCrtAsyncHttpClient.builder(); + if (config.getHttpProxy() != null) { + String scheme = config.isHttpProxySecure() ? "https" : "http"; + httpClient.proxyConfiguration(software.amazon.awssdk.http.crt.ProxyConfiguration.builder() + .scheme(scheme) + .host(config.getHttpProxy().getHost()) + .port(config.getHttpProxy().getPort()) + .build()); + } - context = new S3Context( - toIntExact(config.getStreamingPartSize().toBytes()), - config.isRequesterPays(), - config.getSseType(), - config.getSseKmsKeyId()); + return httpClient.build(); } @PreDestroy public void destroy() { client.close(); + asyncClient.close(); } @Override public TrinoFileSystem create(ConnectorIdentity identity) { - return new S3FileSystem(client, context); + return new S3FileSystem(client, asyncClient, context); } } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SelectInput.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SelectInput.java new file mode 100644 index 000000000000..2f53b29e418f --- /dev/null +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SelectInput.java @@ -0,0 +1,269 @@ +package io.trino.filesystem.s3; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.spi.StandardErrorCode; +import io.trino.spi.TrinoException; +import software.amazon.awssdk.core.exception.AbortedException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.EndEvent; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.RecordsEvent; +import software.amazon.awssdk.services.s3.model.ScanRange; +import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest; +import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.SequenceInputStream; +import java.util.ArrayDeque; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static java.util.Objects.checkFromIndexSize; +import static java.util.Objects.requireNonNull; + +public class S3SelectInput + implements TrinoInput +{ + private final Location location; + private final S3AsyncClient client; + private final SelectObjectContentRequest request; + private boolean closed; + + public S3SelectInput(Location location, S3AsyncClient client, SelectObjectContentRequest request) + { + this.location = requireNonNull(location, "location is null"); + this.client = requireNonNull(client, "client is null"); + this.request = requireNonNull(request, "request is null"); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException + { + ensureOpen(); + if (position < 0) { + throw new IOException("Negative seek offset"); + } + checkFromIndexSize(offset, length, buffer.length); + if (length == 0) { + return; + } + + ScanRange scanRange = ScanRange.builder() + .start(position) + .end(position + length - 1) + .build(); + SelectObjectContentRequest rangeRequest = request.toBuilder().scanRange(scanRange).build(); + + try (InputStream in = getObject(rangeRequest)) { + int n = readNBytes(in, buffer, offset, length); + if (n < length) { + throw new EOFException("Read %s of %s requested bytes: %s".formatted(n, length, location)); + } + } + } + + @Override + public int readTail(byte[] buffer, int offset, int length) + throws IOException + { + ensureOpen(); + checkFromIndexSize(offset, length, buffer.length); + if (length == 0) { + return 0; + } + + ScanRange scanRange = ScanRange.builder() + .end((long) length) + .build(); + SelectObjectContentRequest rangeRequest = request.toBuilder().scanRange(scanRange).build(); + + try (InputStream in = getObject(rangeRequest)) { + return readNBytes(in, buffer, offset, length); + } + } + + @Override + public void close() + { + closed = true; + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Input closed: " + location); + } + } + + private InputStream getObject(SelectObjectContentRequest request) + throws IOException + { + try { + EventStreamEnumeration eventStreamEnumeration = new EventStreamEnumeration(); + InputStream recordInputStream = new SequenceInputStream(eventStreamEnumeration); + + SelectObjectContentResponseHandler.Visitor visitor = new SelectObjectContentResponseHandler.Visitor() + { + @Override + public void visitRecords(RecordsEvent event) + { + eventStreamEnumeration.addEvent(new S3Event(event, false)); + } + + @Override + public void visitEnd(EndEvent event) + { + eventStreamEnumeration.addEvent(new S3Event(null, true)); + } + }; + + client.selectObjectContent(request, SelectObjectContentResponseHandler.builder().subscriber(visitor).build()); + return recordInputStream; + } + catch (NoSuchKeyException e) { + throw new FileNotFoundException(location.toString()); + } + catch (SdkException e) { + throw new IOException("Failed to open S3 file: " + location, e); + } + } + + private static int readNBytes(InputStream in, byte[] buffer, int offset, int length) + throws IOException + { + try { + return in.readNBytes(buffer, offset, length); + } + catch (AbortedException e) { + throw new InterruptedIOException(); + } + } + + /** + * Below classes are required for compatibility between AWS Java SDK 1.x and 2.x + * They return an InputStream to all the incoming record events + */ + static class S3Event + { + RecordsEvent event; + boolean isEndEvent; + + public S3Event(RecordsEvent event, boolean isEndEvent) + { + this.event = event; + this.isEndEvent = isEndEvent; + } + } + + private static class EventStreamEnumeration + extends LazyLoadedIterator implements Enumeration + { + private boolean initialized; + private final BlockingQueue inputStreams; + + EventStreamEnumeration() + { + this.inputStreams = new LinkedBlockingQueue<>(); + } + + @Override + protected Optional getNext() + throws InterruptedException + { + if (!initialized) { + initialized = true; + return Optional.of(new ByteArrayInputStream(new byte[0])); + } + + S3Event s3Event = inputStreams.take(); + if (s3Event.isEndEvent) { + return Optional.empty(); + } + return Optional.of(s3Event.event.payload().asInputStream()); + } + + public void addEvent(S3Event event) + { + this.inputStreams.add(event); + } + + @Override + public boolean hasMoreElements() + { + return super.hasNext(); + } + + @Override + public InputStream nextElement() + { + return super.next(); + } + } + + private abstract static class LazyLoadedIterator + implements Iterator + { + private final Queue next = new ArrayDeque(); + private boolean isDone; + + @Override + public boolean hasNext() + { + advanceIfNeeded(); + return !isDone; + } + + @Override + public T next() + { + advanceIfNeeded(); + + if (isDone) { + throw new NoSuchElementException(); + } + + return next.poll(); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + private void advanceIfNeeded() + { + if (!isDone && next.isEmpty()) { + try { + Optional nextElement = getNext(); + nextElement.ifPresent(this.next::add); + this.isDone = this.next.isEmpty(); + } + catch (InterruptedException e) { + throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Interrupted"); // TODO: Better error message + } + } + } + + /** + * Load any newly-available events. This can return any number of events, in the order they should be encountered by the + * user of the iterator. This should return an empty collection if there are no remaining events in the stream. + */ + protected abstract Optional getNext() + throws InterruptedException; + } +} diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SelectInputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SelectInputFile.java new file mode 100644 index 000000000000..444935900eb1 --- /dev/null +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SelectInputFile.java @@ -0,0 +1,166 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.s3; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ExpressionType; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.InputSerialization; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.OutputSerialization; +import software.amazon.awssdk.services.s3.model.RequestPayer; +import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Instant; + +import static java.util.Objects.requireNonNull; + +public class S3SelectInputFile + implements TrinoInputFile +{ + private final S3Client s3; + private final S3AsyncClient asyncS3; + private final String query; + private final S3Location location; + private final boolean enableScanRange; + private final InputSerialization inputSerialization; + private final OutputSerialization outputSerialization; + private Long length; + private Instant lastModified; + private final RequestPayer requestPayer; + + + public S3SelectInputFile( + S3Client s3, + S3AsyncClient asyncS3, + S3Context context, + S3Location location, + String query, + boolean enableScanRange, + InputSerialization inputSerialization, + OutputSerialization outputSerialization) + { + this.s3 = requireNonNull(s3, "s3 is null"); + this.asyncS3 = requireNonNull(asyncS3, "asyncS3 is null"); + this.query = requireNonNull(query, "query is null"); + this.location = requireNonNull(location, "location is null"); + location.location().verifyValidFileLocation(); + this.requestPayer = context.requestPayer(); + this.enableScanRange = enableScanRange; + this.inputSerialization = requireNonNull(inputSerialization, "inputSerialization is null"); + this.outputSerialization = requireNonNull(outputSerialization, "outputSerialization is null"); + } + + @Override + public TrinoInput newInput() + { + return new S3SelectInput(location(), asyncS3, buildSelectObjectContentRequest()); + } + + @Override + public TrinoInputStream newStream() + { + return null; // new S3InputStream(location(), client, newGetObjectRequest(), length); + } + + @Override + public long length() + throws IOException + { + if ((length == null) && !headObject()) { + throw new FileNotFoundException(location.toString()); + } + return length; + } + + @Override + public Instant lastModified() + throws IOException + { + if ((lastModified == null) && !headObject()) { + throw new FileNotFoundException(location.toString()); + } + return lastModified; + } + + @Override + public boolean exists() + throws IOException + { + return headObject(); + } + + @Override + public Location location() + { + return location.location(); + } + + private SelectObjectContentRequest buildSelectObjectContentRequest() + { + SelectObjectContentRequest.Builder selectObjectContentRequest = SelectObjectContentRequest.builder() + .bucket(location.bucket()) + .key(location.key()) + .expression(query) + .expressionType(ExpressionType.SQL) + .inputSerialization(inputSerialization) + .outputSerialization(outputSerialization); + + if (enableScanRange) { +// ScanRange scanRange = ScanRange.builder() +// .start(start) +// .end(start + length) +// .build(); +// selectObjectContentRequest.scanRange(scanRange); + } + + return selectObjectContentRequest.build(); + } + + private boolean headObject() + throws IOException + { + HeadObjectRequest request = HeadObjectRequest.builder() + .requestPayer(requestPayer) + .bucket(location.bucket()) + .key(location.key()) + .build(); + + try { + HeadObjectResponse response = s3.headObject(request); + if (length == null) { + length = response.contentLength(); + } + if (lastModified == null) { + lastModified = response.lastModified(); + } + return true; + } + catch (NoSuchKeyException e) { + return false; + } + catch (SdkException e) { + throw new IOException("S3 HEAD request failed for file: " + location, e); + } + } +} diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java index 38590fabefd6..f1352a6df59f 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/LineReaderFactory.java @@ -14,8 +14,10 @@ package io.trino.hive.formats.line; import io.trino.filesystem.TrinoInputFile; +import io.trino.hive.formats.compression.Codec; import java.io.IOException; +import java.util.Optional; public interface LineReaderFactory { @@ -28,6 +30,7 @@ LineReader createLineReader( long start, long length, int headerCount, - int footerCount) + int footerCount, + Optional codec) throws IOException; } diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java index 95b942adf41c..8c5c54b94111 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/sequence/SequenceFileReaderFactory.java @@ -14,12 +14,14 @@ package io.trino.hive.formats.line.sequence; import io.trino.filesystem.TrinoInputFile; +import io.trino.hive.formats.compression.Codec; import io.trino.hive.formats.line.FooterAwareLineReader; import io.trino.hive.formats.line.LineBuffer; import io.trino.hive.formats.line.LineReader; import io.trino.hive.formats.line.LineReaderFactory; import java.io.IOException; +import java.util.Optional; public class SequenceFileReaderFactory implements LineReaderFactory @@ -51,7 +53,8 @@ public LineReader createLineReader( long start, long length, int headerCount, - int footerCount) + int footerCount, + Optional codec) throws IOException { LineReader lineReader = new SequenceFileReader(inputFile, start, length); diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java index 9d14de9c4833..80950c31e1a0 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/text/TextLineReaderFactory.java @@ -15,7 +15,6 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.hive.formats.compression.Codec; -import io.trino.hive.formats.compression.CompressionKind; import io.trino.hive.formats.line.FooterAwareLineReader; import io.trino.hive.formats.line.LineBuffer; import io.trino.hive.formats.line.LineReader; @@ -60,13 +59,12 @@ public LineReader createLineReader( long start, long length, int headerCount, - int footerCount) + int footerCount, + Optional codec) throws IOException { InputStream inputStream = inputFile.newStream(); try { - Optional codec = CompressionKind.forFile(inputFile.location().fileName()) - .map(CompressionKind::createCodec); if (codec.isPresent()) { checkArgument(start == 0, "Compressed files are not splittable"); // for compressed input, we do not know the length of the uncompressed text diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 626cfadde197..ab85c5c7b8fb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -134,6 +134,7 @@ public class HiveConfig private boolean collectColumnStatisticsOnWrite = true; private boolean s3SelectPushdownEnabled; + private boolean s3SelectExperimentalPushdownEnabled; private int s3SelectPushdownMaxConnections = 500; private boolean isTemporaryStagingDirectoryEnabled = true; @@ -1006,13 +1007,26 @@ public boolean isS3SelectPushdownEnabled() } @Config("hive.s3select-pushdown.enabled") - @ConfigDescription("Enable query pushdown to AWS S3 Select service") + @ConfigDescription("Enable query pushdown to JSON files using the AWS S3 Select service") public HiveConfig setS3SelectPushdownEnabled(boolean s3SelectPushdownEnabled) { this.s3SelectPushdownEnabled = s3SelectPushdownEnabled; return this; } + public boolean isS3SelectExperimentalPushdownEnabled() + { + return s3SelectExperimentalPushdownEnabled; + } + + @Config("hive.s3select-pushdown.experimental-textfile-pushdown-enabled") + @ConfigDescription("Enable query pushdown to TEXTFILE tables using the AWS S3 Select service") + public HiveConfig setS3SelectExperimentalPushdownEnabled(boolean s3SelectExperimentalPushdownEnabled) + { + this.s3SelectExperimentalPushdownEnabled = s3SelectExperimentalPushdownEnabled; + return this; + } + @Min(1) public int getS3SelectPushdownMaxConnections() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 511f5d9dc365..6d4f41249404 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -51,6 +51,7 @@ import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; import io.trino.plugin.hive.rcfile.RcFilePageSourceFactory; +import io.trino.plugin.hive.s3select.S3SelectPageSourceFactory; import io.trino.plugin.hive.s3select.S3SelectRecordCursorProvider; import io.trino.plugin.hive.s3select.TrinoS3ClientFactory; import io.trino.spi.connector.ConnectorNodePartitioningProvider; @@ -131,6 +132,7 @@ public void configure(Binder binder) configBinder(binder).bindConfig(HiveFormatsConfig.class); Multibinder pageSourceFactoryBinder = newSetBinder(binder, HivePageSourceFactory.class); + pageSourceFactoryBinder.addBinding().to(S3SelectPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(CsvPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(JsonPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(OpenXJsonPageSourceFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java index 308fcb58b915..3c34383a7060 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java @@ -37,5 +37,6 @@ Optional createPageSource( Optional acidInfo, OptionalInt bucketNumber, boolean originalFile, - AcidTransaction transaction); + AcidTransaction transaction, + boolean s3SelectPushdownEnabled); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java index cb172947aa1a..defee38855d8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java @@ -228,7 +228,8 @@ public static Optional createHivePageSource( acidInfo, tableBucketNumber, originalFile, - transaction); + transaction, + s3SelectPushdownEnabled); if (readerWithProjections.isPresent()) { ConnectorPageSource pageSource = readerWithProjections.get().get(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java index af3e7dc738a2..ff3d3bf74c44 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java @@ -22,6 +22,8 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.memory.MemoryInputFile; +import io.trino.hive.formats.compression.Codec; +import io.trino.hive.formats.compression.CompressionKind; import io.trino.hive.formats.line.Column; import io.trino.hive.formats.line.LineDeserializer; import io.trino.hive.formats.line.LineDeserializerFactory; @@ -99,7 +101,8 @@ public Optional createPageSource( Optional acidInfo, OptionalInt bucketNumber, boolean originalFile, - AcidTransaction transaction) + AcidTransaction transaction, + boolean s3SelectPushdownEnabled) { if (!lineReaderFactory.getHiveOutputFormatClassName().equals(schema.getProperty(FILE_INPUT_FORMAT)) || !lineDeserializerFactory.getHiveSerDeClassNames().contains(getDeserializerClassName(schema)) || @@ -166,7 +169,9 @@ public Optional createPageSource( } try { - LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount); + Optional codec = CompressionKind.forFile(inputFile.location().fileName()) + .map(CompressionKind::createCodec); + LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount, codec); LinePageSource pageSource = new LinePageSource(lineReader, lineDeserializer, lineReaderFactory.createLineBuffer(), path); return Optional.of(new ReaderPageSource(pageSource, readerProjections)); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java index 28c61abf528c..8ee642e5dca3 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java @@ -184,7 +184,8 @@ public Optional createPageSource( Optional acidInfo, OptionalInt bucketNumber, boolean originalFile, - AcidTransaction transaction) + AcidTransaction transaction, + boolean s3SelectPushdownEnabled) { if (!ORC_SERDE_CLASS.equals(getDeserializerClassName(schema))) { return Optional.empty(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index 30b356e11aa6..9e3824160f2f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -170,7 +170,8 @@ public Optional createPageSource( Optional acidInfo, OptionalInt bucketNumber, boolean originalFile, - AcidTransaction transaction) + AcidTransaction transaction, + boolean s3SelectPushdownEnabled) { if (!PARQUET_SERDE_CLASS_NAMES.contains(getDeserializerClassName(schema))) { return Optional.empty(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java index 2be057f69577..f1da024110c1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java @@ -107,7 +107,8 @@ public Optional createPageSource( Optional acidInfo, OptionalInt bucketNumber, boolean originalFile, - AcidTransaction transaction) + AcidTransaction transaction, + boolean s3SelectPushdownEnabled) { ColumnEncodingFactory columnEncodingFactory; String deserializerClassName = getDeserializerClassName(schema); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java index 2ecf45ddfd0a..53a1ca72288b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/IonSqlQueryBuilder.java @@ -22,9 +22,6 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.type.DecimalType; -import io.trino.spi.type.Decimals; -import io.trino.spi.type.Int128; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import io.trino.spi.type.VarcharType; @@ -149,7 +146,6 @@ private static boolean isSupported(Type type) validType.equals(TINYINT) || validType.equals(SMALLINT) || validType.equals(INTEGER) || - validType instanceof DecimalType || validType.equals(BOOLEAN) || validType.equals(DATE) || validType instanceof VarcharType; @@ -258,13 +254,7 @@ private static String valueToQuery(Type type, Object value) return "'" + FORMATTER.print(DAYS.toMillis((long) value)) + "'"; } if (type.equals(VarcharType.VARCHAR)) { - return "'" + ((Slice) value).toStringUtf8() + "'"; - } - if (type instanceof DecimalType decimalType) { - if (!decimalType.isShort()) { - return Decimals.toString((Int128) value, decimalType.getScale()); - } - return Decimals.toString((long) value, decimalType.getScale()); + return "'" + ((Slice) value).toStringUtf8().replace("'", "''") + "'"; } return "'" + ((Slice) value).toStringUtf8() + "'"; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectInputStream.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectInputStream.java new file mode 100644 index 000000000000..19bbe256eff3 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectInputStream.java @@ -0,0 +1,163 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import com.amazonaws.services.s3.model.SelectObjectContentRequest; +import com.google.common.primitives.Ints; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import static java.util.Objects.requireNonNull; + +public class S3SelectInputStream + extends TrinoInputStream +{ + private final Location location; + private final TrinoS3SelectClient client; + private final SelectObjectContentRequest request; + private final long length; + + private InputStream input; + private long position; + private boolean closed; + + public S3SelectInputStream(Location location, TrinoS3SelectClient client, SelectObjectContentRequest request, long length) + { + this.location = requireNonNull(location, "location is null"); + this.client = requireNonNull(client, "client is null"); + this.request = requireNonNull(request, "request is null"); + this.length = length; + + this.input = client.getRecordsContent(request); + } + + @Override + public int available() + throws IOException + { + ensureOpen(); + return Ints.saturatedCast(length - position); + } + + @Override + public long getPosition() + { + return position; + } + + @Override + public void seek(long position) + throws IOException + { + ensureOpen(); + if (position < 0) { + throw new IOException("Negative seek offset"); + } + if (position > length) { + throw new IOException("Cannot seek to %s. File size is %s: %s".formatted(position, length, location)); + } + + // for negative seek, reopen the file + if (position < this.position) { + input.close(); + // it is possible to seek backwards using the original file input stream, but this seems simpler + input = client.getRecordsContent(request); + this.position = 0; + } + + while (position > this.position) { + long skip = input.skip(position - this.position); + if (skip < 0) { + throw new IOException("Skip returned a negative size"); + } + + if (skip > 0) { + this.position += skip; + } + else { + if (input.read() == -1) { + // This should not happen unless the file size changed + throw new EOFException(); + } + this.position++; + } + } + + if (this.position != position) { + throw new IOException("Seek to %s failed. Current position is %s: %s".formatted(position, this.position, location)); + } + } + + @Override + public int read() + throws IOException + { + ensureOpen(); + int read = input.read(); + if (read != -1) { + position++; + } + return read; + } + + @Override + public int read(byte[] bytes, int offset, int length) + throws IOException + { + ensureOpen(); + int read = input.read(bytes, offset, length); + if (read > 0) { + position += read; + } + return read; + } + + @Override + public void close() + { + if (closed) { + return; + } + closed = true; + closeStream(); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Input stream closed: " + location); + } + } + + private void closeStream() + { + if (input == null) { + return; + } + + try (var ignored = input) { + client.close(); + } + catch (IOException ignored) { + } + finally { + input = null; + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReaderProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReaderProvider.java index 49221c398280..cb7bae2b0dbb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReaderProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectLineRecordReaderProvider.java @@ -15,12 +15,16 @@ import io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader; import io.trino.plugin.hive.s3select.json.S3SelectJsonRecordReader; +import io.trino.spi.connector.ConnectorSession; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import java.util.Optional; import java.util.Properties; +import static io.trino.plugin.hive.HiveSessionProperties.isJsonNativeReaderEnabled; +import static io.trino.plugin.hive.HiveSessionProperties.isTextFileNativeReaderEnabled; + /** * Returns an S3SelectLineRecordReader based on the serDe class. It supports CSV and JSON formats, and * will not push down any other formats. @@ -30,6 +34,7 @@ public class S3SelectLineRecordReaderProvider private S3SelectLineRecordReaderProvider() {} public static Optional get(Configuration configuration, + ConnectorSession session, Path path, long start, long length, @@ -40,8 +45,14 @@ public static Optional get(Configuration configuration { switch (dataType) { case CSV: + if (isTextFileNativeReaderEnabled(session)) { + return Optional.empty(); + } return Optional.of(new S3SelectCsvRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory)); case JSON: + if (isJsonNativeReaderEnabled(session)) { + return Optional.empty(); + } return Optional.of(new S3SelectJsonRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory)); default: // return empty if data type is not returned by the serDeMapper or unrecognizable by the LineRecordReader diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPageSourceFactory.java new file mode 100644 index 000000000000..1e36f2cd5eca --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPageSourceFactory.java @@ -0,0 +1,276 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.s3.S3FileSystem; +import io.trino.filesystem.s3.S3FileSystemFactory; +import io.trino.hive.formats.compression.CompressionKind; +import io.trino.hive.formats.line.Column; +import io.trino.hive.formats.line.LineDeserializer; +import io.trino.hive.formats.line.LineDeserializerFactory; +import io.trino.hive.formats.line.LineReader; +import io.trino.hive.formats.line.json.JsonDeserializerFactory; +import io.trino.hive.formats.line.simple.SimpleDeserializerFactory; +import io.trino.hive.formats.line.text.TextLineReaderFactory; +import io.trino.plugin.hive.AcidInfo; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HiveErrorCode; +import io.trino.plugin.hive.HivePageSourceFactory; +import io.trino.plugin.hive.ReaderColumns; +import io.trino.plugin.hive.ReaderPageSource; +import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.plugin.hive.line.LinePageSource; +import io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.TypeManager; +import software.amazon.awssdk.services.s3.model.CSVInput; +import software.amazon.awssdk.services.s3.model.CSVOutput; +import software.amazon.awssdk.services.s3.model.CompressionType; +import software.amazon.awssdk.services.s3.model.InputSerialization; +import software.amazon.awssdk.services.s3.model.JSONInput; +import software.amazon.awssdk.services.s3.model.JSONOutput; +import software.amazon.awssdk.services.s3.model.JSONType; +import software.amazon.awssdk.services.s3.model.OutputSerialization; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.hive.formats.line.LineDeserializer.EMPTY_LINE_DESERIALIZER; +import static io.trino.hive.thrift.metastore.hive_metastoreConstants.FILE_INPUT_FORMAT; +import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; +import static io.trino.plugin.hive.HiveSessionProperties.isJsonNativeReaderEnabled; +import static io.trino.plugin.hive.HiveSessionProperties.isTextFileNativeReaderEnabled; +import static io.trino.plugin.hive.s3select.S3SelectUtils.hasFilters; +import static io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader.COMMENTS_CHAR_STR; +import static io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader.DEFAULT_FIELD_DELIMITER; +import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName; +import static io.trino.plugin.hive.util.HiveUtil.getFooterCount; +import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount; +import static io.trino.plugin.hive.util.SerdeConstants.ESCAPE_CHAR; +import static io.trino.plugin.hive.util.SerdeConstants.FIELD_DELIM; +import static io.trino.plugin.hive.util.SerdeConstants.LINE_DELIM; +import static io.trino.plugin.hive.util.SerdeConstants.QUOTE_CHAR; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; + +public class S3SelectPageSourceFactory + implements HivePageSourceFactory +{ + private final TextLineReaderFactory lineReaderFactory; + private final TypeManager typeManager; + private final S3FileSystemFactory s3FileSystemFactory; + + @Inject + public S3SelectPageSourceFactory( + HiveConfig hiveConfig, + TypeManager typeManager, + S3FileSystemFactory s3FileSystemFactory) + { + this.lineReaderFactory = new TextLineReaderFactory(1024, 1024, toIntExact(hiveConfig.getTextMaxLineLength().toBytes())); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.s3FileSystemFactory = requireNonNull(s3FileSystemFactory, "s3FileSystemFactory is null"); + } + + @Override + public Optional createPageSource( + ConnectorSession session, + Location path, + long start, + long length, + long estimatedFileSize, + Properties schema, + List columns, + TupleDomain effectivePredicate, + Optional acidInfo, + OptionalInt bucketNumber, + boolean originalFile, + AcidTransaction transaction, + boolean s3SelectPushdownEnabled) + { + if (!s3SelectPushdownEnabled || acidInfo.isPresent()) { + return Optional.empty(); + } + + List projectedReaderColumns = columns; + Optional readerProjections = projectBaseColumns(columns); + if (readerProjections.isPresent()) { + projectedReaderColumns = readerProjections.get().get().stream() + .map(HiveColumnHandle.class::cast) + .collect(toImmutableList()); + } + + // Ignore predicates on partial columns for now. + effectivePredicate = effectivePredicate.filter((column, domain) -> column.isBaseColumn()); + + // Query is not going to filter any data, no need to use S3 Select + if (!hasFilters(schema, effectivePredicate, projectedReaderColumns)) { + return Optional.empty(); + } + + String serdeName = getDeserializerClassName(schema); + Optional s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName); + + if (s3SelectDataTypeOptional.isEmpty()) { + return Optional.empty(); + } + + S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get(); + CompressionType compressionType = CompressionKind.forFile(path.fileName()) + .map(S3SelectPageSourceFactory::compressionType) + .orElse(CompressionType.NONE); + + LineDeserializerFactory lineDeserializerFactory; + InputSerialization inputSerialization; + OutputSerialization outputSerialization; + boolean enableScanRange; + switch (s3SelectDataType) { + case CSV -> { + // CSV is actually mapped to SimpleLazySerde in S3SelectSerDeDataTypeMapper + lineDeserializerFactory = new SimpleDeserializerFactory(); + if (!isTextFileNativeReaderEnabled(session)) { + return Optional.empty(); + } + + String fieldDelimiter = schema.getProperty(FIELD_DELIM, DEFAULT_FIELD_DELIMITER); + String quoteChar = schema.getProperty(QUOTE_CHAR, null); + String escapeChar = schema.getProperty(ESCAPE_CHAR, null); + String recordDelimiter = schema.getProperty(LINE_DELIM, "\n"); + + CSVInput selectObjectCSVInputSerialization = CSVInput.builder() + .recordDelimiter(recordDelimiter) + .fieldDelimiter(fieldDelimiter) + .comments(COMMENTS_CHAR_STR) + .quoteCharacter(quoteChar) + .quoteEscapeCharacter(escapeChar) + .build(); + + inputSerialization = InputSerialization.builder() + .compressionType(compressionType) + .csv(selectObjectCSVInputSerialization) + .build(); + + CSVOutput selectObjectCSVOutputSerialization = CSVOutput.builder() + .recordDelimiter(recordDelimiter) + .fieldDelimiter(fieldDelimiter) + .quoteCharacter(quoteChar) + .quoteEscapeCharacter(escapeChar) + .build(); + outputSerialization = OutputSerialization.builder() + .csv(selectObjectCSVOutputSerialization) + .build(); + + // Works for CSV if AllowQuotedRecordDelimiter is disabled. + boolean isQuotedRecordDelimiterAllowed = Boolean.TRUE.equals( + inputSerialization.csv().allowQuotedRecordDelimiter()); + enableScanRange = CompressionType.NONE.equals(compressionType) && !isQuotedRecordDelimiterAllowed; + } + case JSON -> { + lineDeserializerFactory = new JsonDeserializerFactory(); + if (!isJsonNativeReaderEnabled(session)) { + return Optional.empty(); + } + + // JSONType.LINES is the only JSON format supported by the Hive JsonSerDe. + JSONInput selectObjectJSONInputSerialization = JSONInput.builder() + .type(JSONType.LINES) + .build(); + + inputSerialization = InputSerialization.builder() + .compressionType(compressionType) + .json(selectObjectJSONInputSerialization) + .build(); + + JSONOutput selectObjectJSONOutputSerialization = JSONOutput.builder().build(); + outputSerialization = OutputSerialization.builder() + .json(selectObjectJSONOutputSerialization) + .build(); + + enableScanRange = CompressionType.NONE.equals(compressionType); + } + default -> throw new IllegalStateException("Unknown s3 select data type: " + s3SelectDataType); + } + + if (!lineReaderFactory.getHiveOutputFormatClassName().equals(schema.getProperty(FILE_INPUT_FORMAT)) || + !lineDeserializerFactory.getHiveSerDeClassNames().contains(getDeserializerClassName(schema))) { + return Optional.empty(); + } + + // get header and footer count + int headerCount = getHeaderCount(schema); + if (headerCount > 1) { + checkArgument(start == 0, "Multiple header rows are not supported for a split file"); + } + int footerCount = getFooterCount(schema); + if (footerCount > 0) { + checkArgument(start == 0, "Footer not supported for a split file"); + } + + // create deserializer + LineDeserializer lineDeserializer = EMPTY_LINE_DESERIALIZER; + if (!columns.isEmpty()) { + ImmutableList.Builder deserializerColumns = ImmutableList.builder(); + for (int columnIndex = 0; columnIndex < projectedReaderColumns.size(); columnIndex++) { + HiveColumnHandle column = projectedReaderColumns.get(columnIndex); + deserializerColumns.add(new Column(column.getName(), column.getType(), columnIndex)); + } + lineDeserializer = lineDeserializerFactory.create(deserializerColumns.build(), Maps.fromProperties(schema)); + } + + Optional nullCharacterEncoding = Optional.empty(); + if (s3SelectDataType == S3SelectDataType.CSV) { + nullCharacterEncoding = S3SelectCsvRecordReader.nullCharacterEncoding(schema); + } + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType, nullCharacterEncoding); + String ionSqlQuery = queryBuilder.buildSql(projectedReaderColumns, effectivePredicate); + + S3FileSystem fileSystem = (S3FileSystem) s3FileSystemFactory.create(session); + TrinoInputFile inputFile = fileSystem.newS3SelectInputFile( + path, + ionSqlQuery, + enableScanRange, + inputSerialization, + outputSerialization); + try { + LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount, Optional.empty()); + LinePageSource pageSource = new LinePageSource(lineReader, lineDeserializer, lineReaderFactory.createLineBuffer(), path); + return Optional.of(new ReaderPageSource(pageSource, readerProjections)); + } + catch (IOException e) { + throw new TrinoException(HiveErrorCode.HIVE_FILESYSTEM_ERROR, "Failed to open s3 select reader", e); + } + } + + private static CompressionType compressionType(CompressionKind compressionKind) + { + return switch (compressionKind) { + case GZIP -> CompressionType.GZIP; + case BZIP2 -> CompressionType.BZIP2; + default -> throw new TrinoException(NOT_SUPPORTED, "Compression extension not supported for S3 Select: " + compressionKind); + }; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java index da53346ea945..917f46a300a8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java @@ -17,7 +17,6 @@ import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; -import io.trino.plugin.hive.type.DecimalTypeInfo; import io.trino.spi.connector.ConnectorSession; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; @@ -50,7 +49,8 @@ public final class S3SelectPushdown private static final Set SUPPORTED_S3_PREFIXES = ImmutableSet.of("s3://", "s3a://", "s3n://"); /* - * Double and Real Types lose precision. Thus, they are not pushed down to S3. Please use Decimal Type if push down is desired. + * Double and Real Types lose precision. Thus, they are not pushed down to S3. + * Correctness problems have also been observed with Decimal columns. * * When S3 select support was added, Trino did not properly implement TIMESTAMP semantic. This was fixed in 2020, and TIMESTAMPS may be supportable now * (https://github.com/trinodb/trino/issues/10962). Pushing down timestamps to s3select maybe still be problematic due to ION SQL comparing timestamps @@ -63,7 +63,6 @@ public final class S3SelectPushdown "smallint", "bigint", "string", - "decimal", "date"); private S3SelectPushdown() {} @@ -138,12 +137,7 @@ private static boolean areColumnTypesSupported(List columns) } for (Column column : columns) { - String type = column.getType().getHiveTypeName().toString(); - if (column.getType().getTypeInfo() instanceof DecimalTypeInfo) { - // skip precision and scale when check decimal type - type = "decimal"; - } - if (!SUPPORTED_COLUMN_TYPES.contains(type)) { + if (!SUPPORTED_COLUMN_TYPES.contains(column.getType().getHiveTypeName().toString())) { return false; } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java index 16560502030f..057a775c3846 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java @@ -14,15 +14,14 @@ package io.trino.plugin.hive.s3select; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.trino.filesystem.Location; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.HiveRecordCursorProvider; import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader; -import io.trino.plugin.hive.type.TypeInfo; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.RecordCursor; @@ -32,22 +31,16 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Properties; -import java.util.Set; -import java.util.function.Function; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; -import static io.trino.plugin.hive.type.TypeInfoUtils.getTypeInfosFromTypeString; +import static io.trino.plugin.hive.s3select.S3SelectDataType.CSV; +import static io.trino.plugin.hive.s3select.S3SelectUtils.hasFilters; import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName; -import static io.trino.plugin.hive.util.SerdeConstants.COLUMN_NAME_DELIMITER; -import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS; -import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES; import static java.util.Objects.requireNonNull; public class S3SelectRecordCursorProvider @@ -55,12 +48,14 @@ public class S3SelectRecordCursorProvider { private final HdfsEnvironment hdfsEnvironment; private final TrinoS3ClientFactory s3ClientFactory; + private final boolean experimentalPushdownEnabled; @Inject - public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, TrinoS3ClientFactory s3ClientFactory) + public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, TrinoS3ClientFactory s3ClientFactory, HiveConfig hiveConfig) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.s3ClientFactory = requireNonNull(s3ClientFactory, "s3ClientFactory is null"); + this.experimentalPushdownEnabled = hiveConfig.isS3SelectExperimentalPushdownEnabled(); } @Override @@ -106,15 +101,26 @@ public Optional createRecordCursor( if (s3SelectDataTypeOptional.isPresent()) { S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get(); + if (s3SelectDataType == CSV && !experimentalPushdownEnabled) { + return Optional.empty(); + } Optional nullCharacterEncoding = Optional.empty(); - if (s3SelectDataType == S3SelectDataType.CSV) { + if (s3SelectDataType == CSV) { nullCharacterEncoding = S3SelectCsvRecordReader.nullCharacterEncoding(schema); } IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType, nullCharacterEncoding); String ionSqlQuery = queryBuilder.buildSql(readerColumns, effectivePredicate); - Optional recordReader = S3SelectLineRecordReaderProvider.get(configuration, path, start, length, schema, - ionSqlQuery, s3ClientFactory, s3SelectDataType); + Optional recordReader = S3SelectLineRecordReaderProvider.get( + configuration, + session, + path, + start, + length, + schema, + ionSqlQuery, + s3ClientFactory, + s3SelectDataType); if (recordReader.isEmpty()) { // S3 Select data type is not mapped to an S3SelectLineRecordReader @@ -127,65 +133,4 @@ public Optional createRecordCursor( // unsupported serdes return Optional.empty(); } - - private static boolean hasFilters( - Properties schema, - TupleDomain effectivePredicate, - List readerColumns) - { - //There are no effective predicates and readercolumns and columntypes are identical to schema - //means getting all data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases. - if (effectivePredicate.isAll()) { - return !isEquivalentSchema(readerColumns, schema); - } - return true; - } - - private static boolean isEquivalentSchema(List readerColumns, Properties schema) - { - Set projectedColumnNames = getColumnProperty(readerColumns, HiveColumnHandle::getName); - Set projectedColumnTypes = getColumnProperty(readerColumns, column -> column.getHiveType().getTypeInfo().getTypeName()); - return isEquivalentColumns(projectedColumnNames, schema) && isEquivalentColumnTypes(projectedColumnTypes, schema); - } - - private static boolean isEquivalentColumns(Set projectedColumnNames, Properties schema) - { - Set columnNames; - String columnNameProperty = schema.getProperty(LIST_COLUMNS); - if (columnNameProperty.length() == 0) { - columnNames = ImmutableSet.of(); - } - else { - String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ","); - columnNames = Arrays.stream(columnNameProperty.split(columnNameDelimiter)) - .collect(toImmutableSet()); - } - return projectedColumnNames.equals(columnNames); - } - - private static boolean isEquivalentColumnTypes(Set projectedColumnTypes, Properties schema) - { - String columnTypeProperty = schema.getProperty(LIST_COLUMN_TYPES); - Set columnTypes; - if (columnTypeProperty.length() == 0) { - columnTypes = ImmutableSet.of(); - } - else { - columnTypes = getTypeInfosFromTypeString(columnTypeProperty) - .stream() - .map(TypeInfo::getTypeName) - .collect(toImmutableSet()); - } - return projectedColumnTypes.equals(columnTypes); - } - - private static Set getColumnProperty(List readerColumns, Function mapper) - { - if (readerColumns.isEmpty()) { - return ImmutableSet.of(); - } - return readerColumns.stream() - .map(mapper) - .collect(toImmutableSet()); - } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectTrinoInput.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectTrinoInput.java new file mode 100644 index 000000000000..6cb3f35a517b --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectTrinoInput.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import com.amazonaws.AbortedException; +import com.amazonaws.services.s3.model.ScanRange; +import com.amazonaws.services.s3.model.SelectObjectContentRequest; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +import static java.util.Objects.checkFromIndexSize; +import static java.util.Objects.requireNonNull; + +public class S3SelectTrinoInput + implements TrinoInput +{ + private final Location location; + private final TrinoS3SelectClient client; + private final SelectObjectContentRequest request; + private boolean closed; + + public S3SelectTrinoInput(Location location, TrinoS3SelectClient client, SelectObjectContentRequest request) + { + this.location = requireNonNull(location, "location is null"); + this.client = requireNonNull(client, "client is null"); + this.request = requireNonNull(request, "request is null"); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException + { + ensureOpen(); + if (position < 0) { + throw new IOException("Negative seek offset"); + } + checkFromIndexSize(offset, length, buffer.length); + if (length == 0) { + return; + } + + SelectObjectContentRequest contentRequest = request.withScanRange(new ScanRange() + .withStart(position) + .withEnd(position + length)); + InputStream inputStream = client.getRecordsContent(contentRequest); + int n = readNBytes(inputStream, buffer, offset, length); + if (n < length) { + throw new EOFException("Read %s of %s requested bytes: %s".formatted(n, length, location)); + } + } + + @Override + public int readTail(byte[] buffer, int offset, int length) + throws IOException + { + ensureOpen(); + checkFromIndexSize(offset, length, buffer.length); + if (length == 0) { + return 0; + } + + long start = request.getScanRange().getStart(); + SelectObjectContentRequest contentRequest = request.withScanRange(new ScanRange() + .withStart(start) + .withEnd(start + length)); + return readNBytes(client.getRecordsContent(contentRequest), buffer, offset, length); + } + + @Override + public void close() + { + closed = true; + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Input closed: " + location); + } + } + + private static int readNBytes(InputStream in, byte[] buffer, int offset, int length) + throws IOException + { + try { + return in.readNBytes(buffer, offset, length); + } + catch (AbortedException e) { + throw new InterruptedIOException(); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectUtils.java new file mode 100644 index 000000000000..418f78d93e63 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.type.TypeInfo; +import io.trino.spi.predicate.TupleDomain; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.function.Function; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.hive.type.TypeInfoUtils.getTypeInfosFromTypeString; +import static io.trino.plugin.hive.util.SerdeConstants.COLUMN_NAME_DELIMITER; +import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS; +import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES; + +public final class S3SelectUtils +{ + private S3SelectUtils() + { } + + public static boolean hasFilters( + Properties schema, + TupleDomain effectivePredicate, + List readerColumns) + { + //There are no effective predicates and readercolumns and columntypes are identical to schema + //means getting all data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases. + if (effectivePredicate.isAll()) { + return !isEquivalentSchema(readerColumns, schema); + } + return true; + } + + private static boolean isEquivalentSchema(List readerColumns, Properties schema) + { + Set projectedColumnNames = getColumnProperty(readerColumns, HiveColumnHandle::getName); + Set projectedColumnTypes = getColumnProperty(readerColumns, column -> column.getHiveType().getTypeInfo().getTypeName()); + return isEquivalentColumns(projectedColumnNames, schema) && isEquivalentColumnTypes(projectedColumnTypes, schema); + } + + private static boolean isEquivalentColumns(Set projectedColumnNames, Properties schema) + { + Set columnNames; + String columnNameProperty = schema.getProperty(LIST_COLUMNS); + if (columnNameProperty.length() == 0) { + columnNames = ImmutableSet.of(); + } + else { + String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ","); + columnNames = Arrays.stream(columnNameProperty.split(columnNameDelimiter)) + .collect(toImmutableSet()); + } + return projectedColumnNames.equals(columnNames); + } + + private static boolean isEquivalentColumnTypes(Set projectedColumnTypes, Properties schema) + { + String columnTypeProperty = schema.getProperty(LIST_COLUMN_TYPES); + Set columnTypes; + if (columnTypeProperty.length() == 0) { + columnTypes = ImmutableSet.of(); + } + else { + columnTypes = getTypeInfosFromTypeString(columnTypeProperty) + .stream() + .map(TypeInfo::getTypeName) + .collect(toImmutableSet()); + } + return projectedColumnTypes.equals(columnTypes); + } + + private static Set getColumnProperty(List readerColumns, Function mapper) + { + if (readerColumns.isEmpty()) { + return ImmutableSet.of(); + } + return readerColumns.stream() + .map(mapper) + .collect(toImmutableSet()); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/TrinoS3ClientFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/TrinoS3ClientFactory.java index 9a016c8e41ec..06879caf5730 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/TrinoS3ClientFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/TrinoS3ClientFactory.java @@ -90,7 +90,7 @@ public TrinoS3ClientFactory(HiveConfig config) this.defaultMaxConnections = config.getS3SelectPushdownMaxConnections(); } - synchronized AmazonS3 getS3Client(Configuration config) + public synchronized AmazonS3 getS3Client(Configuration config) { if (s3Client == null) { s3Client = createS3Client(config); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/csv/S3SelectCsvRecordReader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/csv/S3SelectCsvRecordReader.java index 89dc6d3c9102..310dbe56ae9e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/csv/S3SelectCsvRecordReader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/csv/S3SelectCsvRecordReader.java @@ -42,8 +42,8 @@ public class S3SelectCsvRecordReader * TODO: Remove this proxy logic when S3Select API supports disabling of row level comments. */ - private static final String COMMENTS_CHAR_STR = "\uFDD0"; - private static final String DEFAULT_FIELD_DELIMITER = ","; + public static final String COMMENTS_CHAR_STR = "\uFDD0"; + public static final String DEFAULT_FIELD_DELIMITER = ","; public S3SelectCsvRecordReader( Configuration configuration, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java index 3b711f498d21..5237dfdcbd24 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java @@ -110,6 +110,7 @@ protected QueryRunner createQueryRunner() .put("hive.s3.streaming.part-size", HIVE_S3_STREAMING_PART_SIZE.toString()) // This is required to enable AWS Athena partition projection .put("hive.partition-projection-enabled", "true") + .put("hive.s3select-pushdown.experimental-textfile-pushdown-enabled", "true") .buildOrThrow()) .build(); } @@ -1793,99 +1794,218 @@ public void testPartitionedTableExternalLocationOnTopOfTheBucket() } @Test(dataProvider = "s3SelectFileFormats") - public void testS3SelectPushdown(String tableProperties) + public void testS3SelectPushdownOnHiveReaders(String tableProperties) + { + testS3SelectPushdown(tableProperties, false); + } + + @Test(dataProvider = "s3SelectFileFormats") + public void testS3SelectPushdownOnNativeReaders(String tableProperties) + { + testS3SelectPushdown(tableProperties, true); + } + + private void testS3SelectPushdown(String tableProperties, boolean useNativeReaders) { Session usingAppendInserts = Session.builder(getSession()) .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "APPEND") .build(); List values = ImmutableList.of( - "1, true, 11, 111, 1111, 11111, 'one', 1.1, DATE '2020-01-01'", - "2, true, 22, 222, 2222, 22222, 'two', 2.2, DATE '2020-02-02'", - "3, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL", - "4, false, 44, 444, 4444, 44444, 'four', 4.4, DATE '2020-04-04'"); + "1, true, 11, 111, 1111, 11111, 'one', DATE '2020-01-01'", + "2, true, 22, 222, 2222, 22222, 'two', DATE '2020-02-02'", + "3, NULL, NULL, NULL, NULL, NULL, NULL, NULL", + "4, false, 44, 444, 4444, 44444, 'four', DATE '2020-04-04'"); try (TestTable table = new TestTable( sql -> getQueryRunner().execute(usingAppendInserts, sql), "hive.%s.test_s3_select_pushdown".formatted(HIVE_TEST_SCHEMA), - "(id INT, bool_t BOOLEAN, tiny_t TINYINT, small_t SMALLINT, int_t INT, big_t BIGINT, string_t VARCHAR, decimal_t DECIMAL(10, 5), date_t DATE) " + + "(id INT, bool_t BOOLEAN, tiny_t TINYINT, small_t SMALLINT, int_t INT, big_t BIGINT, string_t VARCHAR, date_t DATE) " + "WITH (" + tableProperties + ")", values)) { - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = true", "VALUES 1, 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = false", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t != 22", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t > 22", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t >= 22", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22 OR tiny_t = 44", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL OR tiny_t >= 22", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t != 222", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t > 222", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t >= 222", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222 OR small_t = 444", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL OR small_t >= 222", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t != 2222", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t > 2222", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t >= 2222", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222 OR int_t = 4444", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL OR int_t >= 2222", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t != 22222", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t > 22222", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t >= 22222", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222 OR big_t = 44444", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL OR big_t >= 22222", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two'", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t != 'two'", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t < 'two'", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t <= 'two'", "VALUES 1, 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two' OR string_t = 'four'", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL OR string_t >= 'two'", "VALUES 2, 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t = 2.2", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t != 2.2", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t < 2.2", "VALUES 1"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t <= 2.2", "VALUES 1, 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t = 2.2 OR decimal_t = 4.4", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t IS NULL OR decimal_t >= 2.2", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE decimal_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02'", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t != DATE '2020-02-02'", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t > DATE '2020-02-02'", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t <= DATE '2020-02-02'", "VALUES 1, 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02' OR date_t = DATE '2020-04-04'", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL OR date_t >= DATE '2020-02-02'", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NOT NULL", "VALUES 1, 2, 4"); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = true", "VALUES 1, 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = false", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t != 22", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t > 22", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t >= 22", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22 OR tiny_t = 44", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL OR tiny_t >= 22", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t != 222", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t > 222", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t >= 222", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222 OR small_t = 444", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL OR small_t >= 222", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t != 2222", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t > 2222", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t >= 2222", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222 OR int_t = 4444", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL OR int_t >= 2222", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t != 22222", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t > 22222", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t >= 22222", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222 OR big_t = 44444", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL OR big_t >= 22222", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two'", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t != 'two'", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t < 'two'", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t <= 'two'", "VALUES 1, 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two' OR string_t = 'four'", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL OR string_t >= 'two'", "VALUES 2, 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02'", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t != DATE '2020-02-02'", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t > DATE '2020-02-02'", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t <= DATE '2020-02-02'", "VALUES 1, 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02' OR date_t = DATE '2020-04-04'", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL OR date_t >= DATE '2020-02-02'", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + } + } + + @Test(dataProvider = "s3SelectFileFormats") + public void testS3SelectOnDecimalColumnIsDisabled(String tableProperties) + { + Session usingAppendInserts = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "APPEND") + .build(); + List values = ImmutableList.of("1, 1.1", "2, 2.2", "3, NULL", "4, 4.4"); + try (TestTable table = new TestTable( + sql -> getQueryRunner().execute(usingAppendInserts, sql), + "hive.%s.test_s3_select_pushdown".formatted(HIVE_TEST_SCHEMA), + "(id INT, decimal_t DECIMAL(10, 5)) WITH (" + tableProperties + ")", + values)) { + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t = 2.2", "VALUES 2"); + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t != 2.2", "VALUES 1, 4"); + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t < 2.2", "VALUES 1"); + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t <= 2.2", "VALUES 1, 2"); + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t = 2.2 OR decimal_t = 4.4", "VALUES 2, 4"); + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t IS NULL OR decimal_t >= 2.2", "VALUES 2, 3, 4"); + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t IS NULL", "VALUES 3"); + assertNoS3SelectPushdown("SELECT id FROM " + table.getName() + " WHERE decimal_t IS NOT NULL", "VALUES 1, 2, 4"); + } + } + + @Test + public void testJsonS3SelectPushdownWithSpecialCharacters() + { + testJsonS3SelectPushdownWithSpecialCharacters(true); + testJsonS3SelectPushdownWithSpecialCharacters(false); + } + + private void testJsonS3SelectPushdownWithSpecialCharacters(boolean useNativeReaders) + { + Session usingAppendInserts = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "APPEND") + .build(); + + List specialCharacterValues = ImmutableList.of( + "1, 'a,comma'", + "2, 'a|pipe'", + "3, 'an''escaped quote'", + "4, 'a\"double quote'"); + try (TestTable table = new TestTable( + sql -> getQueryRunner().execute(usingAppendInserts, sql), + "hive.%s.test_s3_select_pushdown_special_characters".formatted(HIVE_TEST_SCHEMA), + "(id INT, string_t VARCHAR) WITH (format = 'JSON')", + specialCharacterValues)) { + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='a,comma'", "VALUES 1", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='a|pipe'", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='an''escaped quote'", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='a\"double quote'", "VALUES 4", useNativeReaders); } } - private void assertS3SelectQuery(@Language("SQL") String query, @Language("SQL") String expectedValues) + @Test + public void testS3SelectExperimentalPushdown() { + // Demonstrate correctness issues which have resulted in pushdown for TEXTFILE + // using CSV support in S3 Select being put behind a separate "experimental" flag. + // TODO: https://github.com/trinodb/trino/issues/17775 + Session usingAppendInserts = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "APPEND") + .build(); + List values = ImmutableList.of( + "1, true, 11", + "2, true, 22", + "3, NULL, NULL", + "4, false, 44"); Session withS3SelectPushdown = Session.builder(getSession()) .setCatalogSessionProperty("hive", "s3_select_pushdown_enabled", "true") .setCatalogSessionProperty("hive", "json_native_reader_enabled", "false") .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false") .build(); + Session withoutS3SelectPushdown = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "json_native_reader_enabled", "false") + .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false") + .build(); + + try (TestTable table = new TestTable( + sql -> getQueryRunner().execute(usingAppendInserts, sql), + "hive.%s.test_s3_select_pushdown_experimental_features".formatted(HIVE_TEST_SCHEMA), + "(id INT, bool_t BOOLEAN, int_t INT) WITH (format = 'TEXTFILE')", + values)) { + assertQuery(withoutS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3"); + assertThat(query(withS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE int_t IS NULL")).returnsEmptyResult(); + + assertQueryFails( + withS3SelectPushdown, + "SELECT id FROM " + table.getName() + " WHERE bool_t = true", + "S3 returned an error: Error casting:.*"); + } + + List specialCharacterValues = ImmutableList.of( + "1, 'a,comma'", + "2, 'a|pipe'", + "3, 'an''escaped quote'", + "4, 'a~null encoding'"); + try (TestTable table = new TestTable( + sql -> getQueryRunner().execute(usingAppendInserts, sql), + "hive.%s.test_s3_select_pushdown_special_characters".formatted(HIVE_TEST_SCHEMA), + "(id INT, string_t VARCHAR) WITH (format = 'TEXTFILE', textfile_field_separator=',', textfile_field_separator_escape='|', null_format='~')", + specialCharacterValues)) { + // These two should return a result, but incorrectly return nothing + String selectWithComma = "SELECT id FROM " + table.getName() + " WHERE string_t ='a,comma'"; + assertQuery(withoutS3SelectPushdown, selectWithComma, "VALUES 1"); + assertThat(query(withS3SelectPushdown, selectWithComma)).returnsEmptyResult(); + + String selectWithPipe = "SELECT id FROM " + table.getName() + " WHERE string_t ='a|pipe'"; + assertQuery(withoutS3SelectPushdown, selectWithPipe, "VALUES 2"); + assertThat(query(withS3SelectPushdown, selectWithPipe)).returnsEmptyResult(); + + // These two are actually correct + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='an''escaped quote'", "VALUES 3", false); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='a~null encoding'", "VALUES 4", false); + } + } + + private void assertS3SelectQuery(@Language("SQL") String query, @Language("SQL") String expectedValues, boolean useNativeReaders) + { + Session withS3SelectPushdown = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "s3_select_pushdown_enabled", "true") + .setCatalogSessionProperty("hive", "json_native_reader_enabled", Boolean.toString(useNativeReaders)) + .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", Boolean.toString(useNativeReaders)) + .build(); + MaterializedResult expectedResult = computeActual(expectedValues); assertQueryStats( withS3SelectPushdown, @@ -1901,6 +2021,29 @@ private void assertS3SelectQuery(@Language("SQL") String query, @Language("SQL") results -> assertEquals(results.getOnlyColumnAsSet(), expectedResult.getOnlyColumnAsSet())); } + private void assertNoS3SelectPushdown(@Language("SQL") String query, @Language("SQL") String expectedValues) + { + Session withS3SelectPushdown = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "s3_select_pushdown_enabled", "true") + .setCatalogSessionProperty("hive", "json_native_reader_enabled", "false") + .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false") + .build(); + + MaterializedResult expectedResult = computeActual(expectedValues); + assertQueryStats( + withS3SelectPushdown, + query, + statsWithPushdown -> { + long inputPositionsWithPushdown = statsWithPushdown.getPhysicalInputPositions(); + assertQueryStats( + getSession(), + query, + statsWithoutPushdown -> assertThat(statsWithoutPushdown.getPhysicalInputPositions()).isEqualTo(inputPositionsWithPushdown), + results -> assertEquals(results.getOnlyColumnAsSet(), expectedResult.getOnlyColumnAsSet())); + }, + results -> assertEquals(results.getOnlyColumnAsSet(), expectedResult.getOnlyColumnAsSet())); + } + @DataProvider public static Object[][] s3SelectFileFormats() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index ee58a859e08c..173d07217229 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -142,7 +142,7 @@ public static ConnectorSession getHiveSession(HiveConfig hiveConfig) public static TestingConnectorSession getHiveSession(HiveConfig hiveConfig, OrcReaderConfig orcReaderConfig) { return TestingConnectorSession.builder() - .setPropertyMetadata(getHiveSessionProperties(hiveConfig, orcReaderConfig).getSessionProperties()) + .setPropertyMetadata(getHiveSessionProperties(hiveConfig, orcReaderConfig, new HiveFormatsConfig()).getSessionProperties()) .build(); } @@ -162,14 +162,14 @@ public static TestingConnectorSession getHiveSession(HiveConfig hiveConfig, Parq public static HiveSessionProperties getHiveSessionProperties(HiveConfig hiveConfig) { - return getHiveSessionProperties(hiveConfig, new OrcReaderConfig()); + return getHiveSessionProperties(hiveConfig, new OrcReaderConfig(), new HiveFormatsConfig()); } - public static HiveSessionProperties getHiveSessionProperties(HiveConfig hiveConfig, OrcReaderConfig orcReaderConfig) + public static HiveSessionProperties getHiveSessionProperties(HiveConfig hiveConfig, OrcReaderConfig orcReaderConfig, HiveFormatsConfig hiveFormatsConfig) { return new HiveSessionProperties( hiveConfig, - new HiveFormatsConfig(), + hiveFormatsConfig, orcReaderConfig, new OrcWriterConfig(), new ParquetReaderConfig(), @@ -217,7 +217,7 @@ public static Set getDefaultHivePageSourceFactories(HdfsE public static Set getDefaultHiveRecordCursorProviders(HiveConfig hiveConfig, HdfsEnvironment hdfsEnvironment) { - return ImmutableSet.of(new S3SelectRecordCursorProvider(hdfsEnvironment, new TrinoS3ClientFactory(hiveConfig))); + return ImmutableSet.of(new S3SelectRecordCursorProvider(hdfsEnvironment, new TrinoS3ClientFactory(hiveConfig), hiveConfig)); } public static Set getDefaultHiveFileWriterFactories(HiveConfig hiveConfig, HdfsEnvironment hdfsEnvironment) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index 0d06f9e792cf..de8264a4cbc8 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -92,6 +92,7 @@ public void testDefaults() .setIgnoreCorruptedStatistics(false) .setCollectColumnStatisticsOnWrite(true) .setS3SelectPushdownEnabled(false) + .setS3SelectExperimentalPushdownEnabled(false) .setS3SelectPushdownMaxConnections(500) .setTemporaryStagingDirectoryEnabled(true) .setTemporaryStagingDirectoryPath("/tmp/presto-${USER}") @@ -178,6 +179,7 @@ public void testExplicitPropertyMappings() .put("hive.ignore-corrupted-statistics", "true") .put("hive.collect-column-statistics-on-write", "false") .put("hive.s3select-pushdown.enabled", "true") + .put("hive.s3select-pushdown.experimental-textfile-pushdown-enabled", "true") .put("hive.s3select-pushdown.max-connections", "1234") .put("hive.temporary-staging-directory-enabled", "false") .put("hive.temporary-staging-directory-path", "updated") @@ -261,6 +263,7 @@ public void testExplicitPropertyMappings() .setIgnoreCorruptedStatistics(true) .setCollectColumnStatisticsOnWrite(false) .setS3SelectPushdownEnabled(true) + .setS3SelectExperimentalPushdownEnabled(true) .setS3SelectPushdownMaxConnections(1234) .setTemporaryStagingDirectoryEnabled(false) .setTemporaryStagingDirectoryPath("updated") diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java index d6afdddd1a54..5357defaa59f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java @@ -223,7 +223,8 @@ static ConnectorPageSource createPageSource( Optional.empty(), OptionalInt.empty(), false, - NO_ACID_TRANSACTION); + NO_ACID_TRANSACTION, + true); checkState(readerPageSourceWithProjections.isPresent(), "readerPageSourceWithProjections is not present"); checkState(readerPageSourceWithProjections.get().getReaderColumns().isEmpty(), "projection should not be required"); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java index bcdc09ae6b87..22b11a9132e5 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java @@ -241,7 +241,8 @@ private static List readFile(Map columns, Optiona acidInfo, OptionalInt.empty(), false, - NO_ACID_TRANSACTION); + NO_ACID_TRANSACTION, + true); checkArgument(pageSourceWithProjections.isPresent()); checkArgument(pageSourceWithProjections.get().getReaderColumns().isEmpty(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java index c432a3974f8b..1eb2e22582a1 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java @@ -98,7 +98,8 @@ private static ConnectorPageSource createPageSource(File parquetFile, HiveColumn Optional.empty(), OptionalInt.empty(), false, - AcidTransaction.NO_ACID_TRANSACTION) + AcidTransaction.NO_ACID_TRANSACTION, + true) .orElseThrow() .get(); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java index 5d9f20ad13aa..acd8d349cc19 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java @@ -124,7 +124,8 @@ private ConnectorPageSource createPageSource(ConnectorSession session, File parq Optional.empty(), OptionalInt.empty(), false, - AcidTransaction.NO_ACID_TRANSACTION) + AcidTransaction.NO_ACID_TRANSACTION, + true) .orElseThrow(); pageSourceWithProjections.getReaderColumns().ifPresent(projections -> { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestMinioS3SelectQueries.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestMinioS3SelectQueries.java new file mode 100644 index 000000000000..104807959644 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestMinioS3SelectQueries.java @@ -0,0 +1,125 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import io.trino.Session; +import io.trino.plugin.hive.containers.HiveHadoop; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.List; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; + +public class TestMinioS3SelectQueries + extends AbstractTestQueryFramework +{ + private static final String HIVE_TEST_SCHEMA = "hive_datalake"; + private static final DataSize HIVE_S3_STREAMING_PART_SIZE = DataSize.of(5, MEGABYTE); + + private String bucketName; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + this.bucketName = "test-hive-insert-overwrite-" + randomNameSuffix(); + HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName, HiveHadoop.HIVE3_IMAGE)); + hiveMinioDataLake.start(); + return S3HiveQueryRunner.builder(hiveMinioDataLake) + .setHiveProperties( + ImmutableMap.builder() + .put("hive.non-managed-table-writes-enabled", "true") + .put("hive.metastore-cache-ttl", "1d") + .put("hive.metastore-refresh-interval", "1d") + .put("hive.s3.streaming.part-size", HIVE_S3_STREAMING_PART_SIZE.toString()) + .buildOrThrow()) + .build(); + } + + @BeforeClass + public void setUp() + { + computeActual(format( + "CREATE SCHEMA hive.%1$s WITH (location='s3a://%2$s/%1$s')", + HIVE_TEST_SCHEMA, + bucketName)); + } + + @Test + public void testTextfileQueries() + { + // Demonstrate correctness issues which have resulted in pushdown for TEXTFILE + // using CSV support in S3 Select being put behind a separate "experimental" flag. + // TODO: https://github.com/trinodb/trino/issues/17775 + List values = ImmutableList.of( + "1, true, 11", + "2, true, 22", + "3, NULL, NULL", + "4, false, 44"); + Session withS3SelectPushdown = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "s3_select_pushdown_enabled", "true") + .setCatalogSessionProperty("hive", "json_native_reader_enabled", "false") + .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false") + .build(); + Session withoutS3SelectPushdown = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "json_native_reader_enabled", "false") + .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false") + .build(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "hive.%s.test_textfile_queries".formatted(HIVE_TEST_SCHEMA), + "(id INT, bool_t BOOLEAN, int_t INT) WITH (format = 'TEXTFILE')", + values)) { + assertQuery(withS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3"); + assertQuery(withS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE bool_t = true", "VALUES 1, 2"); + } + + List specialCharacterValues = ImmutableList.of( + "1, 'a,comma'", + "2, 'a|pipe'", + "3, 'an''escaped quote'", + "4, 'a~null encoding'"); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "hive.%s.test_s3_select_pushdown_special_characters".formatted(HIVE_TEST_SCHEMA), + "(id INT, string_t VARCHAR) WITH (format = 'TEXTFILE', textfile_field_separator=',', textfile_field_separator_escape='|', null_format='~')", + specialCharacterValues)) { + String selectWithComma = "SELECT id FROM " + table.getName() + " WHERE string_t = 'a,comma'"; + assertQuery(withoutS3SelectPushdown, selectWithComma, "VALUES 1"); + assertQuery(withS3SelectPushdown, selectWithComma, "VALUES 1"); + + String selectWithPipe = "SELECT id FROM " + table.getName() + " WHERE string_t = 'a|pipe'"; + assertQuery(withoutS3SelectPushdown, selectWithPipe, "VALUES 2"); + assertQuery(withS3SelectPushdown, selectWithPipe, "VALUES 2"); + + String selectWithQuote = "SELECT id FROM " + table.getName() + " WHERE string_t = 'an''escaped quote'"; + assertQuery(withoutS3SelectPushdown, selectWithQuote, "VALUES 3"); + assertQuery(withS3SelectPushdown, selectWithQuote, "VALUES 3"); + + String selectWithNullFormatEncoding = "SELECT id FROM " + table.getName() + " WHERE string_t = 'a~null encoding'"; + assertQuery(withoutS3SelectPushdown, selectWithNullFormatEncoding, "VALUES 4"); + assertQuery(withS3SelectPushdown, selectWithNullFormatEncoding, "VALUES 4"); + } + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestS3SelectQueries.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestS3SelectQueries.java index 93fbbf725c20..90d31febc109 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestS3SelectQueries.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/TestS3SelectQueries.java @@ -60,6 +60,7 @@ protected QueryRunner createQueryRunner() ImmutableMap.Builder hiveProperties = ImmutableMap.builder(); hiveProperties.put("hive.s3.endpoint", bucketEndpoint); hiveProperties.put("hive.non-managed-table-writes-enabled", "true"); + hiveProperties.put("hive.s3select-pushdown.experimental-pushdown-enabled", "true"); return HiveQueryRunner.builder() .setHiveProperties(hiveProperties.buildOrThrow()) .setInitialTables(ImmutableList.of()) @@ -78,7 +79,18 @@ protected QueryRunner createQueryRunner() } @Test(dataProvider = "s3SelectFileFormats") - public void testS3SelectPushdown(String tableProperties) + public void testS3SelectPushdownOnHiveReaders(String tableProperties) + { + testS3SelectPushdown(tableProperties, false); + } + + @Test(dataProvider = "s3SelectFileFormats") + public void testS3SelectPushdownOnNativeReaders(String tableProperties) + { + testS3SelectPushdown(tableProperties, true); + } + + private void testS3SelectPushdown(String tableProperties, boolean useNativeReaders) { Session usingAppendInserts = Session.builder(getSession()) .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "APPEND") @@ -93,73 +105,73 @@ public void testS3SelectPushdown(String tableProperties) "hive.%s.test_s3_select_pushdown".formatted(HiveQueryRunner.TPCH_SCHEMA), "(id INT, bool_t BOOLEAN, tiny_t TINYINT, small_t SMALLINT, int_t INT, big_t BIGINT, string_t VARCHAR, date_t DATE) " + "WITH (external_location = 's3://" + bucket + "/test_s3_select_pushdown/test_table_" + randomNameSuffix() + "', " + tableProperties + ")", values)) { - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = true", "VALUES 1, 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = false", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t != 22", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t > 22", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t >= 22", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22 OR tiny_t = 44", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL OR tiny_t >= 22", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t != 222", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t > 222", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t >= 222", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222 OR small_t = 444", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL OR small_t >= 222", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t != 2222", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t > 2222", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t >= 2222", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222 OR int_t = 4444", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL OR int_t >= 2222", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t != 22222", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t > 22222", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t >= 22222", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222 OR big_t = 44444", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL OR big_t >= 22222", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two'", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t != 'two'", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t < 'two'", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t <= 'two'", "VALUES 1, 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two' OR string_t = ''", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL OR string_t >= 'two'", "VALUES 2, 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NOT NULL", "VALUES 1, 2, 4"); - - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02'", "VALUES 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t != DATE '2020-02-02'", "VALUES 1, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t > DATE '2020-02-02'", "VALUES 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t <= DATE '2020-02-02'", "VALUES 1, 2"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02' OR date_t = DATE '2020-04-04'", "VALUES 2, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL OR date_t >= DATE '2020-02-02'", "VALUES 2, 3, 4"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL", "VALUES 3"); - assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NOT NULL", "VALUES 1, 2, 4"); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = true", "VALUES 1, 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t = false", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE bool_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t != 22", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t > 22", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t >= 22", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t = 22 OR tiny_t = 44", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL OR tiny_t >= 22", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE tiny_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t != 222", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t > 222", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t >= 222", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t = 222 OR small_t = 444", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL OR small_t >= 222", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE small_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t != 2222", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t > 2222", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t >= 2222", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t = 2222 OR int_t = 4444", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL OR int_t >= 2222", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE int_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t != 22222", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t > 22222", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t >= 22222", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t = 22222 OR big_t = 44444", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL OR big_t >= 22222", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE big_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two'", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t != 'two'", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t < 'two'", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t <= 'two'", "VALUES 1, 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t = 'two' OR string_t = ''", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL OR string_t >= 'two'", "VALUES 2, 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); + + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02'", "VALUES 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t != DATE '2020-02-02'", "VALUES 1, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t > DATE '2020-02-02'", "VALUES 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t <= DATE '2020-02-02'", "VALUES 1, 2", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t = DATE '2020-02-02' OR date_t = DATE '2020-04-04'", "VALUES 2, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL OR date_t >= DATE '2020-02-02'", "VALUES 2, 3, 4", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NULL", "VALUES 3", useNativeReaders); + assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE date_t IS NOT NULL", "VALUES 1, 2, 4", useNativeReaders); } } - private void assertS3SelectQuery(@Language("SQL") String query, @Language("SQL") String expectedValues) + private void assertS3SelectQuery(@Language("SQL") String query, @Language("SQL") String expectedValues, boolean useNativeReaders) { Session withS3SelectPushdown = Session.builder(getSession()) .setCatalogSessionProperty("hive", "s3_select_pushdown_enabled", "true") - .setCatalogSessionProperty("hive", "json_native_reader_enabled", "false") - .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false") + .setCatalogSessionProperty("hive", "json_native_reader_enabled", Boolean.toString(useNativeReaders)) + .setCatalogSessionProperty("hive", "text_file_native_reader_enabled", Boolean.toString(useNativeReaders)) .build(); MaterializedResult expectedResult = computeActual(expectedValues); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java index 56d1223173f6..bb2974310153 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestIonSqlQueryBuilder.java @@ -15,12 +15,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slices; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveType; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.SortedRangeSet; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.predicate.ValueSet; import io.trino.spi.type.DecimalType; import io.trino.spi.type.TypeManager; import io.trino.util.DateTimeUtils; @@ -106,19 +108,11 @@ public void testDecimalColumns() // CSV IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, S3SelectDataType.CSV, Optional.empty()); - assertEquals(queryBuilder.buildSql(columns, tupleDomain), - "SELECT s._1, s._2, s._3 FROM S3Object s WHERE " + - "(s._1 != '' AND s._1 < 50) AND " + - "(s._2 != '' AND s._2 = 0.05) AND " + - "((s._3 != '' AND s._3 >= 0.00 AND s._3 <= 0.02))"); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s._1, s._2, s._3 FROM S3Object s"); // JSON queryBuilder = new IonSqlQueryBuilder(typeManager, S3SelectDataType.JSON, Optional.empty()); - assertEquals(queryBuilder.buildSql(columns, tupleDomain), - "SELECT s.quantity, s.extendedprice, s.discount FROM S3Object s WHERE " + - "(s.quantity IS NOT NULL AND s.quantity < 50) AND " + - "(s.extendedprice IS NOT NULL AND s.extendedprice = 0.05) AND " + - "((s.discount IS NOT NULL AND s.discount >= 0.00 AND s.discount <= 0.02))"); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s.quantity, s.extendedprice, s.discount FROM S3Object s"); } @Test @@ -160,4 +154,22 @@ public void testNotPushDoublePredicates() queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.JSON, Optional.empty()); assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s.quantity, s.extendedprice, s.discount FROM S3Object s WHERE (s.quantity IS NOT NULL AND CAST(s.quantity AS INT) < 50)"); } + + @Test + public void testStringEscaping() + { + List columns = ImmutableList.of( + createBaseColumn("string", 0, HIVE_STRING, VARCHAR, REGULAR, Optional.empty())); + TupleDomain tupleDomain = withColumnDomains(ImmutableMap.of( + columns.get(0), + Domain.create(ValueSet.of(VARCHAR, Slices.utf8Slice("value with a ' quote")), false))); + + // CSV + IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.CSV, Optional.empty()); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s._1 FROM S3Object s WHERE (s._1 != '' AND s._1 = 'value with a '' quote')"); + + // JSON + queryBuilder = new IonSqlQueryBuilder(TESTING_TYPE_MANAGER, S3SelectDataType.JSON, Optional.empty()); + assertEquals(queryBuilder.buildSql(columns, tupleDomain), "SELECT s.string FROM S3Object s WHERE (s.string IS NOT NULL AND s.string = 'value with a '' quote')"); + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java index fb5e17500d0f..cc4395cac0b3 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java @@ -19,13 +19,18 @@ import io.trino.hadoop.ConfigurationInstantiator; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HiveFormatsConfig; import io.trino.plugin.hive.HiveRecordCursorProvider.ReaderRecordCursorWithProjections; import io.trino.plugin.hive.TestBackgroundHiveSplitLoader.TestingHdfsEnvironment; +import io.trino.plugin.hive.orc.OrcReaderConfig; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.SortedRangeSet; import io.trino.spi.predicate.TupleDomain; +import io.trino.testing.TestingConnectorSession; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hive.hcatalog.data.JsonSerDe; import org.testng.annotations.Test; import java.util.ArrayList; @@ -35,7 +40,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static io.trino.plugin.hive.HiveTestUtils.SESSION; +import static io.trino.plugin.hive.HiveTestUtils.getHiveSessionProperties; import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.ARTICLE_COLUMN; import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.AUTHOR_COLUMN; import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.DATE_ARTICLE_COLUMN; @@ -109,17 +114,64 @@ public void shouldNotReturnSelectRecordCursorWhenProjectionOrderIsDifferent() assertTrue(recordCursor.isEmpty()); } + @Test + public void testDisableExperimentalFeatures() + { + List readerColumns = new ArrayList<>(); + TupleDomain effectivePredicate = TupleDomain.all(); + S3SelectRecordCursorProvider s3SelectRecordCursorProvider = new S3SelectRecordCursorProvider( + new TestingHdfsEnvironment(new ArrayList<>()), + new TrinoS3ClientFactory(new HiveConfig()), + new HiveConfig().setS3SelectExperimentalPushdownEnabled(false)); + + Optional csvRecordCursor = s3SelectRecordCursorProvider.createRecordCursor( + ConfigurationInstantiator.newEmptyConfiguration(), + SESSION, + Location.of("s3://fakeBucket/fakeObject.gz"), + 0, + 10, + 10, + createTestingSchema(LazySimpleSerDe.class.getName()), + readerColumns, + effectivePredicate, + TESTING_TYPE_MANAGER, + true); + assertTrue(csvRecordCursor.isEmpty()); + + Optional jsonRecordCursor = s3SelectRecordCursorProvider.createRecordCursor( + ConfigurationInstantiator.newEmptyConfiguration(), + SESSION, + Location.of("s3://fakeBucket/fakeObject.gz"), + 0, + 10, + 10, + createTestingSchema(JsonSerDe.class.getName()), + readerColumns, + effectivePredicate, + TESTING_TYPE_MANAGER, + true); + assertTrue(jsonRecordCursor.isPresent()); + } + private static Optional getRecordCursor(TupleDomain effectivePredicate, List readerColumns, boolean s3SelectPushdownEnabled) { + HiveFormatsConfig withoutNativeReaders = new HiveFormatsConfig() + .setCsvNativeReaderEnabled(false) + .setJsonNativeReaderEnabled(false); + ConnectorSession session = TestingConnectorSession.builder() + .setPropertyMetadata(getHiveSessionProperties(new HiveConfig(), new OrcReaderConfig(), withoutNativeReaders).getSessionProperties()) + .build(); + S3SelectRecordCursorProvider s3SelectRecordCursorProvider = new S3SelectRecordCursorProvider( new TestingHdfsEnvironment(new ArrayList<>()), - new TrinoS3ClientFactory(new HiveConfig())); + new TrinoS3ClientFactory(new HiveConfig()), + new HiveConfig().setS3SelectExperimentalPushdownEnabled(true)); return s3SelectRecordCursorProvider.createRecordCursor( ConfigurationInstantiator.newEmptyConfiguration(), - SESSION, + session, Location.of("s3://fakeBucket/fakeObject.gz"), 0, 10, @@ -132,6 +184,11 @@ private static Optional getRecordCursor(Tuple } private static Properties createTestingSchema() + { + return createTestingSchema(LazySimpleSerDe.class.getName()); + } + + private static Properties createTestingSchema(String serdeClassName) { List schemaColumns = getAllColumns(); Properties schema = new Properties(); @@ -139,8 +196,7 @@ private static Properties createTestingSchema() String columnTypeNames = buildPropertyFromColumns(schemaColumns, column -> column.getHiveType().getTypeInfo().getTypeName()); schema.setProperty(LIST_COLUMNS, columnNames); schema.setProperty(LIST_COLUMN_TYPES, columnTypeNames); - String deserializerClassName = LazySimpleSerDe.class.getName(); - schema.setProperty(SERIALIZATION_LIB, deserializerClassName); + schema.setProperty(SERIALIZATION_LIB, serdeClassName); return schema; }