Skip to content

Commit

Permalink
Merge pull request trinodb#3 from fuatbasik/disable-select-all-squash
Browse files Browse the repository at this point in the history
Disable S3Select pushdown when query not filters data
  • Loading branch information
fuatbasik authored Aug 2, 2022
2 parents 933f698 + 22c8222 commit 204d898
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,29 @@
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;

import javax.inject.Inject;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.apache.hadoop.hive.serde.serdeConstants.COLUMN_NAME_DELIMITER;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES;

public class S3SelectRecordCursorProvider
implements HiveRecordCursorProvider
Expand Down Expand Up @@ -80,17 +90,20 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
// Ignore predicates on partial columns for now.
effectivePredicate = effectivePredicate.filter((column, domain) -> column.isBaseColumn());

List<HiveColumnHandle> readerColumns = projectedReaderColumns
.map(readColumns -> readColumns.get().stream().map(HiveColumnHandle.class::cast).collect(toUnmodifiableList()))
.orElse(columns);
// Query is not going to filter any data, no need to use S3 Select
if (!hasFilters(schema, effectivePredicate, readerColumns)) {
return Optional.empty();
}

String serdeName = getDeserializerClassName(schema);
Optional<S3SelectDataType> s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName);

if (s3SelectDataTypeOptional.isPresent()) {
S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get();

List<HiveColumnHandle> readerColumns = projectedReaderColumns
.map(ReaderColumns::get)
.map(readColumns -> readColumns.stream().map(HiveColumnHandle.class::cast).collect(toUnmodifiableList()))
.orElse(columns);

IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType);
String ionSqlQuery = queryBuilder.buildSql(readerColumns, effectivePredicate);
Optional<S3SelectLineRecordReader> recordReader = S3SelectLineRecordReaderProvider.get(configuration, path, start, length, schema,
Expand All @@ -109,4 +122,64 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
return Optional.empty();
}
}

private boolean hasFilters(
Properties schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> readerColumns)
{
//There are no effective predicates and readercolumns and columntypes are identical to schema
//means getting all data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases.
if (effectivePredicate.isAll()) {
return !isEquivalentSchema(readerColumns, schema);
}
return true;
}

private boolean isEquivalentSchema(List<HiveColumnHandle> readerColumns, Properties schema)
{
Set<String> projectedColumnNames = getColumnProperty(readerColumns, HiveColumnHandle::getName);
Set<String> projectedColumnTypes = getColumnProperty(readerColumns, column -> column.getHiveType().getTypeInfo().getTypeName());
return isEquivalentColumns(projectedColumnNames, schema) && isEquivalentColumnTypes(projectedColumnTypes, schema);
}

private boolean isEquivalentColumns(Set<String> projectedColumnNames, Properties schema)
{
Set<String> columnNames;
String columnNameProperty = schema.getProperty(LIST_COLUMNS);
if (columnNameProperty.length() == 0) {
columnNames = new HashSet<>();
}
else {
final String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ",");
columnNames = new HashSet<>(asList(columnNameProperty.split(columnNameDelimiter)));
}
return projectedColumnNames.equals(columnNames);
}

private boolean isEquivalentColumnTypes(Set<String> projectedColumnTypes, Properties schema)
{
String columnTypeProperty = schema.getProperty(LIST_COLUMN_TYPES);
Set<String> columnTypes;
if (columnTypeProperty.length() == 0) {
columnTypes = new HashSet<>();
}
else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty)
.stream()
.map(TypeInfo::getTypeName)
.collect(Collectors.toSet());
}
return projectedColumnTypes.equals(columnTypes);
}

private Set<String> getColumnProperty(List<HiveColumnHandle> readerColumns, Function<HiveColumnHandle, String> mapper)
{
if (readerColumns == null || readerColumns.isEmpty()) {
return new HashSet<>();
}
return readerColumns.stream()
.map(mapper)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public class TestS3SelectRecordCursor
{
private static final String LAZY_SERDE_CLASS_NAME = LazySimpleSerDe.class.getName();

private static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
private static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
private static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty());
private static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty());
protected static final HiveColumnHandle ARTICLE_COLUMN = createBaseColumn("article", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
protected static final HiveColumnHandle AUTHOR_COLUMN = createBaseColumn("author", 1, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
protected static final HiveColumnHandle DATE_ARTICLE_COLUMN = createBaseColumn("date_pub", 1, HIVE_INT, DATE, REGULAR, Optional.empty());
protected static final HiveColumnHandle QUANTITY_COLUMN = createBaseColumn("quantity", 1, HIVE_INT, INTEGER, REGULAR, Optional.empty());
private static final HiveColumnHandle[] DEFAULT_TEST_COLUMNS = {ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN};

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.s3select;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.hadoop.ConfigurationInstantiator;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HiveRecordCursorProvider;
import io.trino.plugin.hive.TestBackgroundHiveSplitLoader.TestingHdfsEnvironment;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.SortedRangeSet;
import io.trino.spi.predicate.TupleDomain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.trino.plugin.hive.HiveTestUtils.SESSION;
import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.ARTICLE_COLUMN;
import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.AUTHOR_COLUMN;
import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.DATE_ARTICLE_COLUMN;
import static io.trino.plugin.hive.s3select.TestS3SelectRecordCursor.QUANTITY_COLUMN;
import static io.trino.spi.predicate.TupleDomain.withColumnDomains;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
import static org.testng.Assert.assertTrue;

public class TestS3SelectRecordCursorProvider
{
private static final Configuration CONFIGURATION = ConfigurationInstantiator.newEmptyConfiguration();
private static final Path PATH = new Path("s3://fakeBucket/fakeObject.gz");
private static final long START = 0;
private static final long LENGTH = 10;
private static final long FILESIZE = 10;
private static final HdfsEnvironment HDFS_ENVIRONMENT = new TestingHdfsEnvironment(new ArrayList<>());
private static final S3SelectRecordCursorProvider S3_SELECT_RECORD_CURSOR_PROVIDER = new S3SelectRecordCursorProvider(HDFS_ENVIRONMENT, new TrinoS3ClientFactory(new HiveConfig()));
private static boolean s3SelectPushdownEnabled = true;
private static final List<HiveColumnHandle> SCHEMA_COLUMNS = ImmutableList.of(ARTICLE_COLUMN, AUTHOR_COLUMN, DATE_ARTICLE_COLUMN, QUANTITY_COLUMN);
private static final Properties SCHEMA = createTestingSchema();

@Test
public void shouldReturnSelectRecordCursor()
{
List<HiveColumnHandle> columnHandleList = new ArrayList<>();
s3SelectPushdownEnabled = true;
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
Optional<HiveRecordCursorProvider.ReaderRecordCursorWithProjections> recordCursor =
S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor(
CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, columnHandleList, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled);
assertTrue(recordCursor.isPresent());
}

@Test
public void shouldReturnSelectRecordCursorWhenEffectivePredicateExists()
{
s3SelectPushdownEnabled = true;
TupleDomain<HiveColumnHandle> effectivePredicate = withColumnDomains(ImmutableMap.of(QUANTITY_COLUMN,
Domain.create(SortedRangeSet.copyOf(BIGINT, ImmutableList.of(Range.equal(BIGINT, 3L))), false)));
Optional<HiveRecordCursorProvider.ReaderRecordCursorWithProjections> recordCursor =
S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor(
CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, SCHEMA_COLUMNS, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled);
assertTrue(recordCursor.isPresent());
}

@Test
public void shouldReturnSelectRecordCursorWhenProjectionExists()
{
s3SelectPushdownEnabled = true;
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
final List<HiveColumnHandle> readerColumns = ImmutableList.of(QUANTITY_COLUMN, AUTHOR_COLUMN, ARTICLE_COLUMN);
Optional<HiveRecordCursorProvider.ReaderRecordCursorWithProjections> recordCursor =
S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor(
CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, readerColumns, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled);
assertTrue(recordCursor.isPresent());
}

@Test
public void shouldNotReturnSelectRecordCursorWhenPushdownIsDisabled()
{
s3SelectPushdownEnabled = false;
List<HiveColumnHandle> columnHandleList = new ArrayList<>();
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
Optional<HiveRecordCursorProvider.ReaderRecordCursorWithProjections> recordCursor =
S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor(
CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, columnHandleList, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled);
assertTrue(recordCursor.isEmpty());
}

@Test
public void shouldNotReturnSelectRecordCursorWhenQueryIsNotFiltering()
{
s3SelectPushdownEnabled = true;
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
Optional<HiveRecordCursorProvider.ReaderRecordCursorWithProjections> recordCursor =
S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor(
CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, SCHEMA_COLUMNS, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled);
assertTrue(recordCursor.isEmpty());
}

@Test
public void shouldNotReturnSelectRecordCursorWhenProjectionOrderIsDifferent()
{
s3SelectPushdownEnabled = true;
TupleDomain<HiveColumnHandle> effectivePredicate = TupleDomain.all();
final List<HiveColumnHandle> readerColumns = ImmutableList.of(DATE_ARTICLE_COLUMN, QUANTITY_COLUMN, ARTICLE_COLUMN, AUTHOR_COLUMN);
Optional<HiveRecordCursorProvider.ReaderRecordCursorWithProjections> recordCursor =
S3_SELECT_RECORD_CURSOR_PROVIDER.createRecordCursor(
CONFIGURATION, SESSION, PATH, START, LENGTH, FILESIZE, SCHEMA, readerColumns, effectivePredicate, TESTING_TYPE_MANAGER, s3SelectPushdownEnabled);
assertTrue(recordCursor.isEmpty());
}

private static Properties createTestingSchema()
{
Properties schema = new Properties();
String columnNames = buildPropertyFromColumns(SCHEMA_COLUMNS, HiveColumnHandle::getName);
String columnTypeNames = buildPropertyFromColumns(SCHEMA_COLUMNS, column -> column.getHiveType().getTypeInfo().getTypeName());
schema.setProperty(LIST_COLUMNS, columnNames);
schema.setProperty(LIST_COLUMN_TYPES, columnTypeNames);
String deserializerClassName = LazySimpleSerDe.class.getName();
schema.setProperty(SERIALIZATION_LIB, deserializerClassName);
return schema;
}

private static String buildPropertyFromColumns(List<HiveColumnHandle> columns, Function<HiveColumnHandle, String> mapper)
{
if (columns == null || columns.isEmpty()) {
return "";
}
return columns.stream()
.map(mapper)
.collect(Collectors.joining(","));
}
}

0 comments on commit 204d898

Please sign in to comment.