Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for connector pushdown rules in Iceberg #11695

Merged
merged 1 commit into from
Apr 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.optimizer;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.connector.CatalogName;
import io.trino.cost.ScalarStatsCalculator;
import io.trino.metadata.TableHandle;
import io.trino.plugin.hive.HdfsConfig;
import io.trino.plugin.hive.HdfsConfiguration;
import io.trino.plugin.hive.HdfsConfigurationInitializer;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveHdfsConfiguration;
import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.plugin.iceberg.IcebergColumnHandle;
import io.trino.plugin.iceberg.IcebergTableHandle;
import io.trino.plugin.iceberg.TestingIcebergConnectorFactory;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.PrincipalType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.iterative.rule.PruneTableScanColumns;
import io.trino.sql.planner.iterative.rule.PushPredicateIntoTableScan;
import io.trino.sql.planner.iterative.rule.PushProjectionIntoTableScan;
import io.trino.sql.planner.iterative.rule.test.BaseRuleTest;
import io.trino.sql.planner.iterative.rule.test.PlanBuilder;
import io.trino.sql.planner.plan.Assignments;
import io.trino.sql.tree.ArithmeticBinaryExpression;
import io.trino.sql.tree.ArithmeticUnaryExpression;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.LongLiteral;
import io.trino.sql.tree.SubscriptExpression;
import io.trino.sql.tree.SymbolReference;
import io.trino.testing.LocalQueryRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.STRUCT;
import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RowType.field;
import static io.trino.sql.planner.assertions.PlanMatchPattern.expression;
import static io.trino.sql.planner.assertions.PlanMatchPattern.filter;
import static io.trino.sql.planner.assertions.PlanMatchPattern.project;
import static io.trino.sql.planner.assertions.PlanMatchPattern.strictProject;
import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.trino.sql.tree.ArithmeticBinaryExpression.Operator.ADD;
import static io.trino.sql.tree.ArithmeticUnaryExpression.Sign.MINUS;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Arrays.asList;

public class TestConnectorPushdownRulesWithIceberg
extends BaseRuleTest
{
private static final String ICEBERG_CATALOG_NAME = "iceberg";
private static final String SCHEMA_NAME = "test_schema";

private static final Type ROW_TYPE = RowType.from(asList(field("a", BIGINT), field("b", BIGINT)));

private File baseDir;
private HiveMetastore metastore;

private static final Session ICEBERG_SESSION = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG_NAME)
.setSchema(SCHEMA_NAME)
.build();

@Override
protected Optional<LocalQueryRunner> createLocalQueryRunner()
{
try {
baseDir = Files.createTempDirectory("metastore").toFile();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
HdfsConfig config = new HdfsConfig();
HdfsConfiguration configuration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(config), ImmutableSet.of());
HdfsEnvironment environment = new HdfsEnvironment(configuration, config, new NoHdfsAuthentication());

metastore = new FileHiveMetastore(
new NodeVersion("testversion"),
environment,
new MetastoreConfig(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(baseDir.toURI().toString())
.setMetastoreUser("test"));
Database database = Database.builder()
.setDatabaseName(SCHEMA_NAME)
.setOwnerName(Optional.of("public"))
.setOwnerType(Optional.of(PrincipalType.ROLE))
.build();

metastore.createDatabase(database);

HiveMetastore metastore = createTestingFileHiveMetastore(baseDir);
LocalQueryRunner queryRunner = LocalQueryRunner.create(ICEBERG_SESSION);

queryRunner.createCatalog(
ICEBERG_CATALOG_NAME,
new TestingIcebergConnectorFactory(Optional.of(metastore), Optional.empty(), EMPTY_MODULE),
ImmutableMap.of());

return Optional.of(queryRunner);
}

@Test
public void testProjectionPushdown()
{
String tableName = "projection_test";
PushProjectionIntoTableScan pushProjectionIntoTableScan = new PushProjectionIntoTableScan(
tester().getPlannerContext(),
tester().getTypeAnalyzer(),
new ScalarStatsCalculator(tester().getPlannerContext(), tester().getTypeAnalyzer()));

tester().getQueryRunner().execute(format(
"CREATE TABLE %s (struct_of_int) AS " +
"SELECT cast(row(5, 6) as row(a bigint, b bigint)) as struct_of_int where false",
tableName));

Type baseType = ROW_TYPE;

IcebergColumnHandle partialColumn = new IcebergColumnHandle(
new ColumnIdentity(3, "struct_of_int", STRUCT, ImmutableList.of(primitiveColumnIdentity(1, "a"), primitiveColumnIdentity(2, "b"))),
baseType,
ImmutableList.of(1),
BIGINT,
Optional.empty());

IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty());
TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false));

IcebergColumnHandle fullColumn = partialColumn.getBaseColumn();

// Test projected columns pushdown to IcebergTableHandle in case of full column references
tester().assertThat(pushProjectionIntoTableScan)
.on(p ->
p.project(
Assignments.of(p.symbol("struct_of_int", baseType), p.symbol("struct_of_int", baseType).toSymbolReference()),
p.tableScan(
table,
ImmutableList.of(p.symbol("struct_of_int", baseType)),
ImmutableMap.of(p.symbol("struct_of_int", baseType), fullColumn))))
.matches(
project(
ImmutableMap.of("expr", expression("col")),
tableScan(
icebergTable.withProjectedColumns(ImmutableSet.of(fullColumn))::equals,
TupleDomain.all(),
ImmutableMap.of("col", fullColumn::equals))));

// Rule should return Optional.empty after projected ColumnHandles have been added to IcebergTableHandle
tester().assertThat(pushProjectionIntoTableScan)
.on(p ->
p.project(
Assignments.of(p.symbol("struct_of_int", baseType), p.symbol("struct_of_int", baseType).toSymbolReference()),
p.tableScan(
new TableHandle(
new CatalogName(ICEBERG_CATALOG_NAME),
icebergTable.withProjectedColumns(ImmutableSet.of(fullColumn)),
new HiveTransactionHandle(false)),
ImmutableList.of(p.symbol("struct_of_int", baseType)),
ImmutableMap.of(p.symbol("struct_of_int", baseType), fullColumn))))
.doesNotFire();

// Test Dereference pushdown
tester().assertThat(pushProjectionIntoTableScan)
.on(p ->
p.project(
Assignments.of(
p.symbol("expr_deref", BIGINT), new SubscriptExpression(p.symbol("struct_of_int", baseType).toSymbolReference(), new LongLiteral("1"))),
p.tableScan(
table,
ImmutableList.of(p.symbol("struct_of_int", baseType)),
ImmutableMap.of(p.symbol("struct_of_int", baseType), fullColumn))))
.matches(project(
ImmutableMap.of("expr_deref", expression(new SymbolReference("struct_of_int#a"))),
tableScan(
icebergTable.withProjectedColumns(ImmutableSet.of(partialColumn))::equals,
TupleDomain.all(),
ImmutableMap.of("struct_of_int#a", partialColumn::equals))));

metastore.dropTable(SCHEMA_NAME, tableName, true);
}

@Test
public void testPredicatePushdown()
{
String tableName = "predicate_test";
tester().getQueryRunner().execute(format("CREATE TABLE %s (a, b) AS SELECT 5, 6", tableName));

PushPredicateIntoTableScan pushPredicateIntoTableScan = new PushPredicateIntoTableScan(tester().getPlannerContext(), tester().getTypeAnalyzer());

IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty());
TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false));

IcebergColumnHandle column = new IcebergColumnHandle(primitiveColumnIdentity(1, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty());

tester().assertThat(pushPredicateIntoTableScan)
.on(p ->
p.filter(
PlanBuilder.expression("a = 5"),
p.tableScan(
table,
ImmutableList.of(p.symbol("a", INTEGER)),
ImmutableMap.of(p.symbol("a", INTEGER), column))))
.matches(filter(
"a = 5",
tableScan(
tableHandle -> ((IcebergTableHandle) tableHandle).getUnenforcedPredicate().getDomains().get()
.equals(ImmutableMap.of(column, Domain.singleValue(INTEGER, 5L))),
TupleDomain.all(),
ImmutableMap.of("a", column::equals))));

metastore.dropTable(SCHEMA_NAME, tableName, true);
}

@Test
public void testColumnPruningProjectionPushdown()
{
String tableName = "column_pruning_projection_test";
tester().getQueryRunner().execute(format("CREATE TABLE %s (a, b) AS SELECT 5, 6", tableName));

PruneTableScanColumns pruneTableScanColumns = new PruneTableScanColumns(tester().getMetadata());

IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.empty(), TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty());
TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false));

IcebergColumnHandle columnA = new IcebergColumnHandle(primitiveColumnIdentity(0, "a"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty());
IcebergColumnHandle columnB = new IcebergColumnHandle(primitiveColumnIdentity(1, "b"), INTEGER, ImmutableList.of(), INTEGER, Optional.empty());

tester().assertThat(pruneTableScanColumns)
.on(p -> {
Symbol symbolA = p.symbol("a", INTEGER);
Symbol symbolB = p.symbol("b", INTEGER);
return p.project(
Assignments.of(p.symbol("x"), symbolA.toSymbolReference()),
p.tableScan(
table,
ImmutableList.of(symbolA, symbolB),
ImmutableMap.of(
symbolA, columnA,
symbolB, columnB)));
})
.matches(
strictProject(
ImmutableMap.of("expr", expression("COLA")),
tableScan(
icebergTable.withProjectedColumns(ImmutableSet.of(columnA))::equals,
TupleDomain.all(),
ImmutableMap.of("COLA", columnA::equals))));

metastore.dropTable(SCHEMA_NAME, tableName, true);
}

@Test
public void testPushdownWithDuplicateExpressions()
{
String tableName = "duplicate_expressions";
tester().getQueryRunner().execute(format(
"CREATE TABLE %s (struct_of_bigint, just_bigint) AS SELECT cast(row(5, 6) AS row(a bigint, b bigint)) AS struct_of_int, 5 AS just_bigint WHERE false",
tableName));

PushProjectionIntoTableScan pushProjectionIntoTableScan = new PushProjectionIntoTableScan(
tester().getPlannerContext(),
tester().getTypeAnalyzer(),
new ScalarStatsCalculator(tester().getPlannerContext(), tester().getTypeAnalyzer()));

IcebergTableHandle icebergTable = new IcebergTableHandle(SCHEMA_NAME, tableName, DATA, Optional.of(1L), TupleDomain.all(), TupleDomain.all(), ImmutableSet.of(), Optional.empty());
TableHandle table = new TableHandle(new CatalogName(ICEBERG_CATALOG_NAME), icebergTable, new HiveTransactionHandle(false));

IcebergColumnHandle bigintColumn = new IcebergColumnHandle(primitiveColumnIdentity(1, "just_bigint"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty());
IcebergColumnHandle partialColumn = new IcebergColumnHandle(
new ColumnIdentity(3, "struct_of_bigint", STRUCT, ImmutableList.of(primitiveColumnIdentity(1, "a"), primitiveColumnIdentity(2, "b"))),
ROW_TYPE,
ImmutableList.of(1),
BIGINT,
Optional.empty());

// Test projection pushdown with duplicate column references
tester().assertThat(pushProjectionIntoTableScan)
.on(p -> {
SymbolReference column = p.symbol("just_bigint", BIGINT).toSymbolReference();
Expression negation = new ArithmeticUnaryExpression(MINUS, column);
return p.project(
Assignments.of(
// The column reference is part of both the assignments
p.symbol("column_ref", BIGINT), column,
p.symbol("negated_column_ref", BIGINT), negation),
p.tableScan(
table,
ImmutableList.of(p.symbol("just_bigint", BIGINT)),
ImmutableMap.of(p.symbol("just_bigint", BIGINT), bigintColumn)));
})
.matches(project(
ImmutableMap.of(
"column_ref", expression("just_bigint_0"),
"negated_column_ref", expression("- just_bigint_0")),
tableScan(
icebergTable.withProjectedColumns(ImmutableSet.of(bigintColumn))::equals,
TupleDomain.all(),
ImmutableMap.of("just_bigint_0", bigintColumn::equals))));

// Test Dereference pushdown
tester().assertThat(pushProjectionIntoTableScan)
.on(p -> {
SubscriptExpression subscript = new SubscriptExpression(p.symbol("struct_of_bigint", ROW_TYPE).toSymbolReference(), new LongLiteral("1"));
Expression sum = new ArithmeticBinaryExpression(ADD, subscript, new LongLiteral("2"));
return p.project(
Assignments.of(
// The subscript expression instance is part of both the assignments
p.symbol("expr_deref", BIGINT), subscript,
p.symbol("expr_deref_2", BIGINT), sum),
p.tableScan(
table,
ImmutableList.of(p.symbol("struct_of_bigint", ROW_TYPE)),
ImmutableMap.of(p.symbol("struct_of_bigint", ROW_TYPE), partialColumn.getBaseColumn())));
})
.matches(project(
ImmutableMap.of(
"expr_deref", expression(new SymbolReference("struct_of_bigint#a")),
"expr_deref_2", expression(new ArithmeticBinaryExpression(ADD, new SymbolReference("struct_of_bigint#a"), new LongLiteral("2")))),
tableScan(
icebergTable.withProjectedColumns(ImmutableSet.of(partialColumn))::equals,
TupleDomain.all(),
ImmutableMap.of("struct_of_bigint#a", partialColumn::equals))));

metastore.dropTable(SCHEMA_NAME, tableName, true);
}

@AfterClass(alwaysRun = true)
public void cleanup()
throws IOException
{
if (baseDir != null) {
deleteRecursively(baseDir.toPath(), ALLOW_INSECURE);
}
}
}