From f9a67d14be9ee6faf3d5b475fd7e7027a987d5f5 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Fri, 4 Sep 2020 18:15:24 +0530 Subject: [PATCH 1/2] Move searchScanFilterAndProjectOperatorStats --- .../hive/TestHiveDistributedJoinQueries.java | 34 ------------------- .../testing/AbstractTestQueryFramework.java | 34 +++++++++++++++++++ 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java index c21123c0bf08..af1ce9619c37 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java @@ -13,17 +13,9 @@ */ package io.prestosql.plugin.hive; -import com.google.common.collect.MoreCollectors; import io.prestosql.Session; import io.prestosql.operator.OperatorStats; -import io.prestosql.spi.QueryId; import io.prestosql.sql.analyzer.FeaturesConfig; -import io.prestosql.sql.planner.Plan; -import io.prestosql.sql.planner.optimizations.PlanNodeSearcher; -import io.prestosql.sql.planner.plan.FilterNode; -import io.prestosql.sql.planner.plan.PlanNodeId; -import io.prestosql.sql.planner.plan.ProjectNode; -import io.prestosql.sql.planner.plan.TableScanNode; import io.prestosql.testing.AbstractTestJoinQueries; import io.prestosql.testing.DistributedQueryRunner; import io.prestosql.testing.MaterializedResult; @@ -77,30 +69,4 @@ public void testJoinWithEmptyBuildSide() assertEquals(probeStats.getInputPositions(), 0L); assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers()); } - - private OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName) - { - DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); - Plan plan = runner.getQueryPlan(queryId); - PlanNodeId nodeId = PlanNodeSearcher.searchFrom(plan.getRoot()) - .where(node -> { - if (!(node instanceof ProjectNode)) { - return false; - } - ProjectNode projectNode = (ProjectNode) node; - FilterNode filterNode = (FilterNode) projectNode.getSource(); - TableScanNode tableScanNode = (TableScanNode) filterNode.getSource(); - return tableName.equals(tableScanNode.getTable().getConnectorHandle().toString()); - }) - .findOnlyElement() - .getId(); - return runner.getCoordinator() - .getQueryManager() - .getFullQueryInfo(queryId) - .getQueryStats() - .getOperatorSummaries() - .stream() - .filter(summary -> nodeId.equals(summary.getPlanNodeId())) - .collect(MoreCollectors.onlyElement()); - } } diff --git a/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java b/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java index 9fa1d004a4f5..749436877dc7 100644 --- a/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java +++ b/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.MoreCollectors; import io.airlift.units.Duration; import io.prestosql.Session; import io.prestosql.cost.CostCalculator; @@ -26,6 +27,8 @@ import io.prestosql.execution.TaskManagerConfig; import io.prestosql.execution.warnings.WarningCollector; import io.prestosql.metadata.Metadata; +import io.prestosql.operator.OperatorStats; +import io.prestosql.spi.QueryId; import io.prestosql.spi.security.AccessDeniedException; import io.prestosql.spi.type.Type; import io.prestosql.sql.analyzer.FeaturesConfig; @@ -37,7 +40,12 @@ import io.prestosql.sql.planner.PlanOptimizers; import io.prestosql.sql.planner.RuleStatsRecorder; import io.prestosql.sql.planner.TypeAnalyzer; +import io.prestosql.sql.planner.optimizations.PlanNodeSearcher; import io.prestosql.sql.planner.optimizations.PlanOptimizer; +import io.prestosql.sql.planner.plan.FilterNode; +import io.prestosql.sql.planner.plan.PlanNodeId; +import io.prestosql.sql.planner.plan.ProjectNode; +import io.prestosql.sql.planner.plan.TableScanNode; import io.prestosql.sql.query.QueryAssertions.QueryAssert; import io.prestosql.sql.tree.ExplainType; import io.prestosql.testing.TestingAccessControlManager.TestingPrivilege; @@ -418,4 +426,30 @@ protected Session noJoinReordering(JoinDistributionType distributionType) .setSystemProperty(JOIN_DISTRIBUTION_TYPE, distributionType.name()) .build(); } + + protected OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName) + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + Plan plan = runner.getQueryPlan(queryId); + PlanNodeId nodeId = PlanNodeSearcher.searchFrom(plan.getRoot()) + .where(node -> { + if (!(node instanceof ProjectNode)) { + return false; + } + ProjectNode projectNode = (ProjectNode) node; + FilterNode filterNode = (FilterNode) projectNode.getSource(); + TableScanNode tableScanNode = (TableScanNode) filterNode.getSource(); + return tableName.equals(tableScanNode.getTable().getConnectorHandle().toString()); + }) + .findOnlyElement() + .getId(); + return runner.getCoordinator() + .getQueryManager() + .getFullQueryInfo(queryId) + .getQueryStats() + .getOperatorSummaries() + .stream() + .filter(summary -> nodeId.equals(summary.getPlanNodeId())) + .collect(MoreCollectors.onlyElement()); + } } From b001728df0e045fb18b4b877da9f184b875e3333 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Thu, 27 Aug 2020 01:04:29 +0530 Subject: [PATCH 2/2] Support lazy dynamic filtering in hive connector --- .../hive/BackgroundHiveSplitLoader.java | 27 +- .../io/prestosql/plugin/hive/HiveConfig.java | 16 ++ .../plugin/hive/HiveSessionProperties.java | 13 + .../plugin/hive/HiveSplitManager.java | 4 +- .../hive/TestBackgroundHiveSplitLoader.java | 90 ++++++- .../prestosql/plugin/hive/TestHiveConfig.java | 7 +- .../hive/TestHiveDynamicPartitionPruning.java | 239 ++++++++++++++++++ .../testing/AbstractTestQueryFramework.java | 5 +- 8 files changed, 388 insertions(+), 13 deletions(-) create mode 100644 presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDynamicPartitionPruning.java diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java index 21deb80cb2e7..185d880a2c4b 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java @@ -14,6 +14,7 @@ package io.prestosql.plugin.hive; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -21,6 +22,7 @@ import com.google.common.collect.Streams; import com.google.common.io.CharStreams; import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.Duration; import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext; import io.prestosql.plugin.hive.HiveSplit.BucketConversion; import io.prestosql.plugin.hive.metastore.Column; @@ -35,6 +37,7 @@ import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.DynamicFilter; import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; @@ -74,7 +77,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BooleanSupplier; import java.util.function.IntPredicate; -import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -82,6 +84,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.immediateFuture; import static io.airlift.concurrent.MoreFutures.addExceptionCallback; +import static io.airlift.concurrent.MoreFutures.toListenableFuture; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; @@ -109,6 +112,7 @@ import static java.lang.String.format; import static java.util.Collections.max; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER; public class BackgroundHiveSplitLoader @@ -129,7 +133,8 @@ public class BackgroundHiveSplitLoader private final Table table; private final TupleDomain compactEffectivePredicate; - private final Supplier> dynamicFilterSupplier; + private final DynamicFilter dynamicFilter; + private final long dynamicFilteringProbeBlockingTimeoutMillis; private final TypeManager typeManager; private final Optional tableBucketInfo; private final HdfsEnvironment hdfsEnvironment; @@ -163,13 +168,15 @@ public class BackgroundHiveSplitLoader private final ReadWriteLock taskExecutionLock = new ReentrantReadWriteLock(); private HiveSplitSource hiveSplitSource; + private Stopwatch stopwatch; private volatile boolean stopped; public BackgroundHiveSplitLoader( Table table, Iterable partitions, TupleDomain compactEffectivePredicate, - Supplier> dynamicFilterSupplier, + DynamicFilter dynamicFilter, + Duration dynamicFilteringProbeBlockingTimeout, TypeManager typeManager, Optional tableBucketInfo, ConnectorSession session, @@ -184,7 +191,8 @@ public BackgroundHiveSplitLoader( { this.table = table; this.compactEffectivePredicate = compactEffectivePredicate; - this.dynamicFilterSupplier = dynamicFilterSupplier; + this.dynamicFilter = dynamicFilter; + this.dynamicFilteringProbeBlockingTimeoutMillis = dynamicFilteringProbeBlockingTimeout.toMillis(); this.typeManager = typeManager; this.tableBucketInfo = tableBucketInfo; this.loaderConcurrency = loaderConcurrency; @@ -204,6 +212,7 @@ public BackgroundHiveSplitLoader( public void start(HiveSplitSource splitSource) { this.hiveSplitSource = splitSource; + this.stopwatch = Stopwatch.createStarted(); for (int i = 0; i < loaderConcurrency; i++) { ListenableFuture future = ResumableTasks.submit(executor, new HiveSplitLoaderTask()); addExceptionCallback(future, hiveSplitSource::fail); // best effort; hiveSplitSource could be already completed @@ -227,6 +236,14 @@ public TaskStatus process() return TaskStatus.finished(); } ListenableFuture future; + // Block until one of below conditions is met: + // 1. Completion of DynamicFilter + // 2. Timeout after waiting for the configured time + long timeLeft = dynamicFilteringProbeBlockingTimeoutMillis - stopwatch.elapsed(MILLISECONDS); + if (timeLeft > 0 && !dynamicFilter.isComplete()) { + future = toListenableFuture(dynamicFilter.isBlocked().orTimeout(timeLeft, MILLISECONDS)); + return TaskStatus.continueOn(future); + } taskExecutionLock.readLock().lock(); try { future = loadSplits(); @@ -325,7 +342,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) TupleDomain effectivePredicate = compactEffectivePredicate.transform(HiveColumnHandle.class::cast); List partitionColumns = getPartitionKeyColumnHandles(table, typeManager); - BooleanSupplier partitionMatchSupplier = () -> partitionMatches(partitionColumns, dynamicFilterSupplier.get(), hivePartition); + BooleanSupplier partitionMatchSupplier = () -> partitionMatches(partitionColumns, dynamicFilter.getCurrentPredicate(), hivePartition); if (!partitionMatchSupplier.getAsBoolean()) { // Avoid listing files and creating splits from a partition if it has been pruned due to dynamic filters return COMPLETED_FUTURE; diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java index f09549325ac2..1f7d0214412b 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java @@ -133,6 +133,8 @@ public class HiveConfig private boolean projectionPushdownEnabled = true; + private Duration dynamicFilteringProbeBlockingTimeout = new Duration(0, MINUTES); + public int getMaxInitialSplits() { return maxInitialSplits; @@ -964,4 +966,18 @@ public HiveConfig setProjectionPushdownEnabled(boolean projectionPushdownEnabled this.projectionPushdownEnabled = projectionPushdownEnabled; return this; } + + @NotNull + public Duration getDynamicFilteringProbeBlockingTimeout() + { + return dynamicFilteringProbeBlockingTimeout; + } + + @Config("hive.dynamic-filtering-probe-blocking-timeout") + @ConfigDescription("Duration to wait for completion of dynamic filters during split generation for probe side table") + public HiveConfig setDynamicFilteringProbeBlockingTimeout(Duration dynamicFilteringProbeBlockingTimeout) + { + this.dynamicFilteringProbeBlockingTimeout = dynamicFilteringProbeBlockingTimeout; + return this; + } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java index c7cfeed4adf2..8427d6eeda10 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; +import io.airlift.units.Duration; import io.prestosql.orc.OrcWriteValidation.OrcWriteValidationMode; import io.prestosql.plugin.hive.orc.OrcReaderConfig; import io.prestosql.plugin.hive.orc.OrcWriterConfig; @@ -31,6 +32,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.prestosql.plugin.base.session.PropertyMetadataUtil.dataSizeProperty; +import static io.prestosql.plugin.base.session.PropertyMetadataUtil.durationProperty; import static io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND; import static io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR; import static io.prestosql.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; @@ -91,6 +93,7 @@ public final class HiveSessionProperties private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled"; + private static final String DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT = "dynamic_filtering_probe_blocking_timeout"; private final List> sessionProperties; @@ -369,6 +372,11 @@ public HiveSessionProperties( PARQUET_OPTIMIZED_WRITER_ENABLED, "Experimental: Enable optimized writer", parquetWriterConfig.isParquetOptimizedWriterEnabled(), + false), + durationProperty( + DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT, + "Duration to wait for completion of dynamic filters during split generation for probe side table", + hiveConfig.getDynamicFilteringProbeBlockingTimeout(), false)); } @@ -635,4 +643,9 @@ public static boolean isParquetOptimizedWriterEnabled(ConnectorSession session) { return session.getProperty(PARQUET_OPTIMIZED_WRITER_ENABLED, Boolean.class); } + + public static Duration getDynamicFilteringProbeBlockingTimeout(ConnectorSession session) + { + return session.getProperty(DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT, Duration.class); + } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSplitManager.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSplitManager.java index 7919bb45956b..0a9ce86bc2eb 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSplitManager.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSplitManager.java @@ -65,6 +65,7 @@ import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH; import static io.prestosql.plugin.hive.HivePartition.UNPARTITIONED_ID; +import static io.prestosql.plugin.hive.HiveSessionProperties.getDynamicFilteringProbeBlockingTimeout; import static io.prestosql.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions; import static io.prestosql.plugin.hive.HiveSessionProperties.isPartitionUseColumnNames; import static io.prestosql.plugin.hive.TableToPartitionMapping.mapColumnsByIndex; @@ -219,7 +220,8 @@ public ConnectorSplitSource getSplits( table, hivePartitions, hiveTable.getCompactEffectivePredicate(), - dynamicFilter::getCurrentPredicate, + dynamicFilter, + getDynamicFilteringProbeBlockingTimeout(session), typeManager, createBucketSplitInfo(bucketHandle, bucketFilter), session, diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java index 20a907b80dcf..4f63ba30cab7 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -30,7 +30,9 @@ import io.prestosql.plugin.hive.metastore.Table; import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter; import io.prestosql.spi.PrestoException; +import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.DynamicFilter; import io.prestosql.spi.connector.SchemaTableName; import io.prestosql.spi.predicate.Domain; import io.prestosql.spi.predicate.TupleDomain; @@ -68,6 +70,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -343,6 +346,45 @@ public void testNoHangIfPartitionIsOffline() assertThrows(RuntimeException.class, hiveSplitSource::isFinished); } + @Test(timeOut = 30_000) + public void testIncompleteDynamicFilterTimeout() + throws Exception + { + BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( + new DynamicFilter() { + @Override + public CompletableFuture isBlocked() + { + return CompletableFuture.runAsync(() -> { + try { + TimeUnit.HOURS.sleep(1); + } + catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + } + + @Override + public boolean isComplete() + { + return false; + } + + @Override + public TupleDomain getCurrentPredicate() + { + return TupleDomain.all(); + } + }, + Duration.valueOf("1s")); + HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); + backgroundHiveSplitLoader.start(hiveSplitSource); + + assertEquals(drain(hiveSplitSource).size(), 2); + assertTrue(hiveSplitSource.isFinished()); + } + @Test public void testCachedDirectoryLister() throws Exception @@ -454,7 +496,8 @@ public HivePartitionMetadata next() } }, TupleDomain.all(), - TupleDomain::all, + DynamicFilter.EMPTY, + Duration.valueOf("0s"), TYPE_MANAGER, createBucketSplitInfo(Optional.empty(), Optional.empty()), SESSION, @@ -703,6 +746,21 @@ private static List drainSplits(HiveSplitSource source) return splits.build(); } + private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( + DynamicFilter dynamicFilter, + Duration dynamicFilteringProbeBlockingTimeoutMillis) + { + return backgroundHiveSplitLoader( + new TestingHdfsEnvironment(TEST_FILES), + TupleDomain.all(), + dynamicFilter, + dynamicFilteringProbeBlockingTimeoutMillis, + Optional.empty(), + SIMPLE_TABLE, + Optional.empty(), + Optional.empty()); + } + private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( List files, TupleDomain tupleDomain) @@ -755,6 +813,27 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( Table table, Optional bucketHandle, Optional validWriteIds) + { + return backgroundHiveSplitLoader( + hdfsEnvironment, + compactEffectivePredicate, + DynamicFilter.EMPTY, + Duration.valueOf("0s"), + hiveBucketFilter, + table, + bucketHandle, + validWriteIds); + } + + private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( + HdfsEnvironment hdfsEnvironment, + TupleDomain compactEffectivePredicate, + DynamicFilter dynamicFilter, + Duration dynamicFilteringProbeBlockingTimeout, + Optional hiveBucketFilter, + Table table, + Optional bucketHandle, + Optional validWriteIds) { List hivePartitionMetadatas = ImmutableList.of( @@ -767,7 +846,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( table, hivePartitionMetadatas, compactEffectivePredicate, - TupleDomain::all, + dynamicFilter, + dynamicFilteringProbeBlockingTimeout, TYPE_MANAGER, createBucketSplitInfo(bucketHandle, hiveBucketFilter), SESSION, @@ -796,7 +876,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(List result = runner.executeWithQueryId( + getSession(), + "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'"); + assertEquals(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:" + PARTITIONED_LINEITEM); + assertEquals(probeStats.getInputPositions(), 0L); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0L); + + DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 0L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L); + + DynamicFilterDomainStats domainStats = getOnlyElement(dynamicFiltersStats.getDynamicFilterDomainStats()); + assertEquals(domainStats.getSimplifiedDomain(), none(INTEGER).toString(getSession().toConnectorSession())); + assertEquals(domainStats.getDiscreteValuesCount(), 0); + assertEquals(domainStats.getRangeCount(), 0); + } + + @Test(timeOut = 30_000) + public void testJoinWithSelectiveBuildSide() + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + getSession(), + "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey " + + "AND supplier.name = 'Supplier#000000001'"); + assertGreaterThan(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:" + PARTITIONED_LINEITEM); + // Probe-side is partially scanned + assertEquals(probeStats.getInputPositions(), 615); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0L); + + DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 0L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L); + + DynamicFilterDomainStats domainStats = getOnlyElement(dynamicFiltersStats.getDynamicFilterDomainStats()); + assertEquals(domainStats.getSimplifiedDomain(), singleValue(INTEGER, 1L).toString(getSession().toConnectorSession())); + assertEquals(domainStats.getDiscreteValuesCount(), 0); + assertEquals(domainStats.getRangeCount(), 1); + } + + @Test(timeOut = 30_000) + public void testJoinWithNonSelectiveBuildSide() + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + getSession(), + "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey"); + assertEquals(result.getResult().getRowCount(), LINEITEM_COUNT); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:" + PARTITIONED_LINEITEM); + // Probe-side is fully scanned + assertEquals(probeStats.getInputPositions(), LINEITEM_COUNT); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0L); + + DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 0L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L); + + DynamicFilterDomainStats domainStats = getOnlyElement(dynamicFiltersStats.getDynamicFilterDomainStats()); + assertEquals(domainStats.getSimplifiedDomain(), Domain.create(ValueSet.ofRanges( + range(INTEGER, 1L, true, 100L, true)), false) + .toString(getSession().toConnectorSession())); + assertEquals(domainStats.getDiscreteValuesCount(), 0); + assertEquals(domainStats.getRangeCount(), 1); + } + + @Test(timeOut = 30_000) + public void testJoinLargeBuildSideNoDynamicFiltering() + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + getSession(), + "SELECT * FROM partitioned_lineitem JOIN orders ON partitioned_lineitem.orderkey = orders.orderkey"); + assertEquals(result.getResult().getRowCount(), LINEITEM_COUNT); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:" + PARTITIONED_LINEITEM); + // Probe-side is fully scanned because the build-side is too large for dynamic filtering: + assertEquals(probeStats.getInputPositions(), LINEITEM_COUNT); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0L); + + DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 0L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L); + + DynamicFilterDomainStats domainStats = getOnlyElement(dynamicFiltersStats.getDynamicFilterDomainStats()); + assertEquals(domainStats.getSimplifiedDomain(), Domain.all(INTEGER).toString(getSession().toConnectorSession())); + assertEquals(domainStats.getDiscreteValuesCount(), 0); + assertEquals(domainStats.getRangeCount(), 1); + } + + @Test(timeOut = 30_000) + public void testJoinWithMultipleDynamicFiltersOnProbe() + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + // supplier names Supplier#000000001 and Supplier#000000002 match suppkey 1 and 2 + ResultWithQueryId result = runner.executeWithQueryId( + getSession(), + "SELECT * FROM (" + + "SELECT supplier.suppkey FROM " + + "partitioned_lineitem JOIN tpch.tiny.supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name IN ('Supplier#000000001', 'Supplier#000000002')" + + ") t JOIN supplier ON t.suppkey = supplier.suppkey AND supplier.suppkey IN (2, 3)"); + assertGreaterThan(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:" + PARTITIONED_LINEITEM); + // Probe-side is partially scanned + assertEquals(probeStats.getInputPositions(), 558L); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0L); + + DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 2L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 2L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 0L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 2); + + List domainStats = dynamicFiltersStats.getDynamicFilterDomainStats(); + assertEquals(domainStats.size(), 2); + domainStats.forEach(stats -> { + assertGreaterThanOrEqual(stats.getRangeCount(), 1); + assertEquals(stats.getDiscreteValuesCount(), 0); + }); + } + + private DynamicFiltersStats getDynamicFilteringStats(QueryId queryId) + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + return runner.getCoordinator() + .getQueryManager() + .getFullQueryInfo(queryId) + .getQueryStats() + .getDynamicFiltersStats(); + } +} diff --git a/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java b/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java index 749436877dc7..12ea51e5335a 100644 --- a/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java +++ b/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java @@ -437,6 +437,9 @@ protected OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, return false; } ProjectNode projectNode = (ProjectNode) node; + if (!(projectNode.getSource() instanceof FilterNode)) { + return false; + } FilterNode filterNode = (FilterNode) projectNode.getSource(); TableScanNode tableScanNode = (TableScanNode) filterNode.getSource(); return tableName.equals(tableScanNode.getTable().getConnectorHandle().toString()); @@ -449,7 +452,7 @@ protected OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, .getQueryStats() .getOperatorSummaries() .stream() - .filter(summary -> nodeId.equals(summary.getPlanNodeId())) + .filter(summary -> nodeId.equals(summary.getPlanNodeId()) && summary.getOperatorType().equals("ScanFilterAndProjectOperator")) .collect(MoreCollectors.onlyElement()); } }