From f0eae7c0c812121018b18513cb63edb9f6dc7378 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Thu, 25 Jul 2019 08:48:11 +0300 Subject: [PATCH] Add local dynamic filtering support to HivePageSourceProvider Also, run distributed Hive join tests with dynamic filtering. --- .../plugin/hive/HivePageSourceProvider.java | 8 +- ...ibutedJoinQueriesWithDynamicFiltering.java | 122 ++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java index 119412834dfa..985750d87cd5 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSourceProvider.java @@ -83,6 +83,12 @@ public HivePageSourceProvider( @Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns) + { + return createPageSource(transaction, session, split, table, columns, TupleDomain.all()); + } + + @Override + public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns, TupleDomain dynamicFilter) { HiveTableHandle hiveTable = (HiveTableHandle) table; @@ -107,7 +113,7 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti hiveSplit.getFileSize(), hiveSplit.getFileModifiedTime(), hiveSplit.getSchema(), - hiveTable.getCompactEffectivePredicate(), + hiveTable.getCompactEffectivePredicate().intersect(dynamicFilter.transform(HiveColumnHandle.class::cast)), hiveColumns, hiveSplit.getPartitionKeys(), hiveStorageTimeZone, diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java new file mode 100644 index 000000000000..dd31c6a65588 --- /dev/null +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java @@ -0,0 +1,122 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.hive; + +import com.google.common.collect.MoreCollectors; +import io.prestosql.Session; +import io.prestosql.operator.OperatorStats; +import io.prestosql.spi.QueryId; +import io.prestosql.sql.analyzer.FeaturesConfig; +import io.prestosql.sql.planner.Plan; +import io.prestosql.sql.planner.optimizations.PlanNodeSearcher; +import io.prestosql.sql.planner.plan.FilterNode; +import io.prestosql.sql.planner.plan.PlanNodeId; +import io.prestosql.sql.planner.plan.ProjectNode; +import io.prestosql.sql.planner.plan.TableScanNode; +import io.prestosql.testing.MaterializedResult; +import io.prestosql.tests.AbstractTestJoinQueries; +import io.prestosql.tests.DistributedQueryRunner; +import io.prestosql.tests.ResultWithQueryId; +import org.testng.annotations.Test; + +import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertLessThanOrEqual; +import static io.airlift.tpch.TpchTable.getTables; +import static io.prestosql.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; +import static io.prestosql.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; +import static io.prestosql.plugin.hive.HiveQueryRunner.createQueryRunner; +import static org.testng.Assert.assertEquals; + +public class TestHiveDistributedJoinQueriesWithDynamicFiltering + extends AbstractTestJoinQueries +{ + public TestHiveDistributedJoinQueriesWithDynamicFiltering() + { + super(() -> createQueryRunner(getTables())); + } + + @Override + protected Session getSession() + { + return Session.builder(super.getSession()) + .setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true") + .build(); + } + + @Test + public void testJoinWithEmptyBuildSide() + { + Session session = Session.builder(getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name()) + .build(); + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + session, + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 123.4567"); + assertEquals(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:lineitem"); + // Probe-side is not scanned at all, due to dynamic filtering: + assertEquals(probeStats.getInputPositions(), 0L); + } + + @Test + public void testJoinWithSelectiveBuildSide() + { + Session session = Session.builder(getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name()) + .build(); + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + session, + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.custkey = 1"); + assertGreaterThan(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:lineitem"); + // Probe side may be partially scanned, depending on the drivers' scheduling: + assertLessThanOrEqual(probeStats.getInputPositions(), countRows("lineitem")); + } + + private OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName) + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + Plan plan = runner.getQueryPlan(queryId); + PlanNodeId nodeId = PlanNodeSearcher.searchFrom(plan.getRoot()) + .where(node -> { + if (!(node instanceof ProjectNode)) { + return false; + } + ProjectNode projectNode = (ProjectNode) node; + FilterNode filterNode = (FilterNode) projectNode.getSource(); + TableScanNode tableScanNode = (TableScanNode) filterNode.getSource(); + return tableName.equals(tableScanNode.getTable().getConnectorHandle().toString()); + }) + .findOnlyElement() + .getId(); + return runner.getCoordinator() + .getQueryManager() + .getFullQueryInfo(queryId) + .getQueryStats() + .getOperatorSummaries() + .stream() + .filter(summary -> nodeId.equals(summary.getPlanNodeId())) + .collect(MoreCollectors.onlyElement()); + } + + private Long countRows(String tableName) + { + MaterializedResult result = getQueryRunner().execute("SELECT COUNT() FROM " + tableName); + return (Long) result.getOnlyValue(); + } +}