diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java index 8b06009ecd93..97f290f6df0d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java @@ -24,19 +24,29 @@ import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import javax.inject.Inject; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName; +import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.hadoop.hive.serde.serdeConstants.COLUMN_NAME_DELIMITER; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES; public class S3SelectRecordCursorProvider implements HiveRecordCursorProvider @@ -80,17 +90,20 @@ public Optional createRecordCursor( // Ignore predicates on partial columns for now. effectivePredicate = effectivePredicate.filter((column, domain) -> column.isBaseColumn()); + List readerColumns = projectedReaderColumns + .map(readColumns -> readColumns.get().stream().map(HiveColumnHandle.class::cast).collect(toUnmodifiableList())) + .orElse(columns); + // Query is not going to filter any data, no need to use S3 Select + if (!hasFilters(schema, effectivePredicate, readerColumns)) { + return Optional.empty(); + } + String serdeName = getDeserializerClassName(schema); Optional s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName); if (s3SelectDataTypeOptional.isPresent()) { S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get(); - List readerColumns = projectedReaderColumns - .map(ReaderColumns::get) - .map(readColumns -> readColumns.stream().map(HiveColumnHandle.class::cast).collect(toUnmodifiableList())) - .orElse(columns); - IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType); String ionSqlQuery = queryBuilder.buildSql(readerColumns, effectivePredicate); Optional recordReader = S3SelectLineRecordReaderProvider.get(configuration, path, start, length, schema, @@ -109,4 +122,64 @@ public Optional createRecordCursor( return Optional.empty(); } } + + private boolean hasFilters( + Properties schema, + TupleDomain effectivePredicate, + List readerColumns) + { + //There are no effective predicates and readercolumns and columntypes are identical to schema + //means getting all data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases. + if (effectivePredicate.isAll()) { + return !isEquivalentSchema(readerColumns, schema); + } + return true; + } + + private boolean isEquivalentSchema(List readerColumns, Properties schema) + { + Set projectedColumnNames = getColumnProperty(readerColumns, HiveColumnHandle::getName); + Set projectedColumnTypes = getColumnProperty(readerColumns, column -> column.getHiveType().getTypeInfo().getTypeName()); + return isEquivalentColumns(projectedColumnNames, schema) && isEquivalentColumnTypes(projectedColumnTypes, schema); + } + + private boolean isEquivalentColumns(Set projectedColumnNames, Properties schema) + { + Set columnNames; + String columnNameProperty = schema.getProperty(LIST_COLUMNS); + if (columnNameProperty.length() == 0) { + columnNames = new HashSet<>(); + } + else { + final String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ","); + columnNames = new HashSet<>(asList(columnNameProperty.split(columnNameDelimiter))); + } + return projectedColumnNames.equals(columnNames); + } + + private boolean isEquivalentColumnTypes(Set projectedColumnTypes, Properties schema) + { + String columnTypeProperty = schema.getProperty(LIST_COLUMN_TYPES); + Set columnTypes; + if (columnTypeProperty.length() == 0) { + columnTypes = new HashSet<>(); + } + else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty) + .stream() + .map(TypeInfo::getTypeName) + .collect(Collectors.toSet()); + } + return projectedColumnTypes.equals(columnTypes); + } + + private Set getColumnProperty(List readerColumns, Function mapper) + { + if (readerColumns == null || readerColumns.isEmpty()) { + return new HashSet<>(); + } + return readerColumns.stream() + .map(mapper) + .collect(Collectors.toSet()); + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java index a2c0e5a92046..fe37df05f2bf 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursor.java @@ -45,10 +45,10 @@ public class TestS3SelectRecordCursor { private static final String LAZY_SERDE_CLASS_NAME = LazySimpleSerDe.class.getName(); - private static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); - private static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); - private static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty()); - private static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty()); + protected static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); + protected static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty()); + protected static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty()); + protected static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty()); private static final HiveColumnHandle[] DEFAULT_TEST_COLUMNS = {ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN}; @Test diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java new file mode 100644 index 000000000000..410072921a9d --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java @@ -0,0 +1,158 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.hadoop.ConfigurationInstantiator; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HiveRecordCursorProvider; +import io.trino.plugin.hive.TestBackgroundHiveSplitLoader.TestingHdfsEnvironment; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.Range; +import io.trino.spi.predicate.SortedRangeSet; +import io.trino.spi.predicate.TupleDomain; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static io.trino.plugin.hive.HiveTestUtils.SESSION; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.ARTICLE_COLUMN; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.AUTHOR_COLUMN; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.DATE_ARTICLE_COLUMN; +import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.QUANTITY_COLUMN; +import static io.trino.spi.predicate.TupleDomain.withColumnDomains; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS; +import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; +import static org.testng.Assert.assertTrue; + +public class TestS3SelectRecordCursorProvider +{ + private static final Configuration CONFIGURATION = ConfigurationInstantiator.newEmptyConfiguration(); + private static final Path PATH = new Path("s3://fakeBucket/fakeObject.gz"); + private static final long START = 0; + private static final long LENGTH = 10; + private static final long FILESIZE = 10; + private static final HdfsEnvironment HDFS_ENVIRONMENT = new TestingHdfsEnvironment(new ArrayList<>()); + private static final S3SelectRecordCursorProvider S3_SELECT_RECORD_CURSOR_PROVIDER = new S3SelectRecordCursorProvider(HDFS_ENVIRONMENT, new TrinoS3ClientFactory(new HiveConfig())); + private static boolean s3SelectPushdownEnabled = true; + private static final List SCHEMA_COLUMNS = ImmutableList.of(ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN); + private static final Properties SCHEMA = createTestingSchema(); + + @Test + public void shouldReturnSelectRecordCursor() + { + List columnHandleList = new ArrayList<>(); + s3SelectPushdownEnabled = true; + TupleDomain effectivePredicate = TupleDomain.all(); + Optional recordCursor = + S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor( + CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, columnHandleList, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled); + assertTrue(recordCursor.isPresent()); + } + + @Test + public void shouldReturnSelectRecordCursorWhenEffectivePredicateExists() + { + s3SelectPushdownEnabled = true; + TupleDomain effectivePredicate = withColumnDomains(ImmutableMap.of(QUANTITY_COLUMN, + Domain.create(SortedRangeSet.copyOf(BIGINT, ImmutableList.of(Range.equal(BIGINT, 3L))), false))); + Optional recordCursor = + S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor( + CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, SCHEMA_COLUMNS, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled); + assertTrue(recordCursor.isPresent()); + } + + @Test + public void shouldReturnSelectRecordCursorWhenProjectionExists() + { + s3SelectPushdownEnabled = true; + TupleDomain effectivePredicate = TupleDomain.all(); + final List readerColumns = ImmutableList.of(QUANTITY_COLUMN, AUTHOR_COLUMN, ARTICLE_COLUMN); + Optional recordCursor = + S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor( + CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, readerColumns, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled); + assertTrue(recordCursor.isPresent()); + } + + @Test + public void shouldNotReturnSelectRecordCursorWhenPushdownIsDisabled() + { + s3SelectPushdownEnabled = false; + List columnHandleList = new ArrayList<>(); + TupleDomain effectivePredicate = TupleDomain.all(); + Optional recordCursor = + S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor( + CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, columnHandleList, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled); + assertTrue(recordCursor.isEmpty()); + } + + @Test + public void shouldNotReturnSelectRecordCursorWhenQueryIsNotFiltering() + { + s3SelectPushdownEnabled = true; + TupleDomain effectivePredicate = TupleDomain.all(); + Optional recordCursor = + S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor( + CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, SCHEMA_COLUMNS, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled); + assertTrue(recordCursor.isEmpty()); + } + + @Test + public void shouldNotReturnSelectRecordCursorWhenProjectionOrderIsDifferent() + { + s3SelectPushdownEnabled = true; + TupleDomain effectivePredicate = TupleDomain.all(); + final List readerColumns = ImmutableList.of(DATE_ARTICLE_COLUMN, QUANTITY_COLUMN, ARTICLE_COLUMN, AUTHOR_COLUMN); + Optional recordCursor = + S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor( + CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, readerColumns, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled); + assertTrue(recordCursor.isEmpty()); + } + + private static Properties createTestingSchema() + { + Properties schema = new Properties(); + String columnNames = buildPropertyFromColumns(SCHEMA_COLUMNS, HiveColumnHandle::getName); + String columnTypeNames = buildPropertyFromColumns(SCHEMA_COLUMNS, column -> column.getHiveType().getTypeInfo().getTypeName()); + schema.setProperty(LIST_COLUMNS, columnNames); + schema.setProperty(LIST_COLUMN_TYPES, columnTypeNames); + String deserializerClassName = LazySimpleSerDe.class.getName(); + schema.setProperty(SERIALIZATION_LIB, deserializerClassName); + return schema; + } + + private static String buildPropertyFromColumns(List columns, Function mapper) + { + if (columns == null || columns.isEmpty()) { + return ""; + } + return columns.stream() + .map(mapper) + .collect(Collectors.joining(",")); + } +}