diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java index c4efb05ccbf8..d88559509e63 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/BackgroundHiveSplitLoader.java @@ -240,7 +240,7 @@ public TaskStatus process() // 1. Completion of DynamicFilter // 2. Timeout after waiting for the configured time long timeLeft = dynamicFilteringProbeBlockingTimeoutMillis - stopwatch.elapsed(MILLISECONDS); - if (timeLeft > 0 && !dynamicFilter.isComplete()) { + if (timeLeft > 0 && dynamicFilter.isAwaitable()) { future = toListenableFuture(dynamicFilter.isBlocked().orTimeout(timeLeft, MILLISECONDS)); return TaskStatus.continueOn(future); } diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java index 4f63ba30cab7..fcbe5751b637 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -351,7 +351,8 @@ public void testIncompleteDynamicFilterTimeout() throws Exception { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - new DynamicFilter() { + new DynamicFilter() + { @Override public CompletableFuture isBlocked() { @@ -371,6 +372,12 @@ public boolean isComplete() return false; } + @Override + public boolean isAwaitable() + { + return true; + } + @Override public TupleDomain getCurrentPredicate() { diff --git a/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java b/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java index 1a303511c960..b280ebe741a2 100644 --- a/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java +++ b/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java @@ -72,7 +72,6 @@ import static io.airlift.concurrent.MoreFutures.unmodifiableFuture; import static io.airlift.concurrent.MoreFutures.whenAnyComplete; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.prestosql.operator.JoinUtils.isBuildSideReplicated; import static io.prestosql.spi.connector.DynamicFilter.EMPTY; import static io.prestosql.spi.predicate.Domain.union; import static io.prestosql.sql.DynamicFilters.extractDynamicFilters; @@ -232,6 +231,13 @@ public boolean isComplete() .allMatch(context.getDynamicFilterSummaries()::containsKey); } + @Override + public boolean isAwaitable() + { + return lazyDynamicFilterFutures.stream() + .anyMatch(future -> !future.isDone()); + } + @Override public TupleDomain getCurrentPredicate() { diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/LocalDynamicFiltersCollector.java b/presto-main/src/main/java/io/prestosql/sql/planner/LocalDynamicFiltersCollector.java index c27a16ede52c..daac8223d5ff 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/LocalDynamicFiltersCollector.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/LocalDynamicFiltersCollector.java @@ -155,6 +155,12 @@ public synchronized boolean isComplete() return futuresLeft == 0; } + @Override + public synchronized boolean isAwaitable() + { + return futuresLeft > 0; + } + @Override public synchronized TupleDomain getCurrentPredicate() { diff --git a/presto-main/src/test/java/io/prestosql/server/TestDynamicFilterService.java b/presto-main/src/test/java/io/prestosql/server/TestDynamicFilterService.java index 80b2fa9d9d67..da31fbe7c19f 100644 --- a/presto-main/src/test/java/io/prestosql/server/TestDynamicFilterService.java +++ b/presto-main/src/test/java/io/prestosql/server/TestDynamicFilterService.java @@ -171,6 +171,7 @@ public void testDynamicFilter() assertTrue(dynamicFilter.getCurrentPredicate().isAll()); assertFalse(dynamicFilter.isComplete()); + assertTrue(dynamicFilter.isAwaitable()); // assert initial dynamic filtering stats DynamicFiltersStats stats = dynamicFilterService.getDynamicFilteringStats(queryId, session); @@ -192,6 +193,7 @@ public void testDynamicFilter() // tuple domain from two tasks are needed for dynamic filter to be narrowed down assertTrue(dynamicFilter.getCurrentPredicate().isAll()); assertFalse(dynamicFilter.isComplete()); + assertTrue(dynamicFilter.isAwaitable()); assertFalse(blockedFuture.isDone()); assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 1); @@ -213,6 +215,7 @@ public void testDynamicFilter() // there are still more dynamic filters to be collected assertFalse(dynamicFilter.isComplete()); + assertTrue(dynamicFilter.isAwaitable()); blockedFuture = dynamicFilter.isBlocked(); assertFalse(blockedFuture.isDone()); @@ -227,6 +230,7 @@ public void testDynamicFilter() new TestingColumnHandle("probeColumnA"), multipleValues(INTEGER, ImmutableList.of(1L, 2L))))); assertFalse(dynamicFilter.isComplete()); + assertTrue(dynamicFilter.isAwaitable()); assertFalse(blockedFuture.isDone()); assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 3); @@ -251,6 +255,7 @@ public void testDynamicFilter() // there are still more dynamic filters to be collected for columns A and B assertFalse(dynamicFilter.isComplete()); + assertTrue(dynamicFilter.isAwaitable()); blockedFuture = dynamicFilter.isBlocked(); assertFalse(blockedFuture.isDone()); @@ -265,6 +270,7 @@ public void testDynamicFilter() Symbol.from(df2), new TestingColumnHandle("probeColumnA"))); assertTrue(dynamicFilterColumnA.isComplete()); + assertFalse(dynamicFilterColumnA.isAwaitable()); assertTrue(dynamicFilterColumnA.isBlocked().isDone()); assertEquals(dynamicFilterColumnA.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of( new TestingColumnHandle("probeColumnA"), @@ -281,6 +287,7 @@ public void testDynamicFilter() new TestingColumnHandle("probeColumnA"), singleValue(INTEGER, 2L)))); assertFalse(dynamicFilter.isComplete()); + assertTrue(dynamicFilter.isAwaitable()); assertFalse(blockedFuture.isDone()); assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 5); @@ -313,6 +320,7 @@ filterId2, getExpectedDomainString(2L, 3L), 2, 0), // all dynamic filters have been collected, no need for more requests dynamicFilterService.collectDynamicFilters(); assertTrue(dynamicFilter.isComplete()); + assertFalse(dynamicFilter.isAwaitable()); assertTrue(dynamicFilter.isBlocked().isDone()); assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 6); } @@ -391,7 +399,6 @@ public void testReplicatedDynamicFilter() ImmutableMap.of( Symbol.from(df1), new TestingColumnHandle("probeColumnA"))); assertTrue(dynamicFilter.getCurrentPredicate().isAll()); - assertFalse(dynamicFilter.isComplete()); // assert initial dynamic filtering stats DynamicFiltersStats stats = dynamicFilterService.getDynamicFilteringStats(queryId, session); @@ -401,6 +408,8 @@ public void testReplicatedDynamicFilter() assertEquals(stats.getLazyDynamicFilters(), 0); // replicated dynamic filters cannot be lazy due to replicated join task scheduling dependencies + assertFalse(dynamicFilter.isComplete()); + assertFalse(dynamicFilter.isAwaitable()); assertTrue(dynamicFilter.isBlocked().isDone()); dynamicFiltersStageSupplier.storeSummary( @@ -414,6 +423,7 @@ public void testReplicatedDynamicFilter() new TestingColumnHandle("probeColumnA"), singleValue(INTEGER, 1L)))); assertTrue(dynamicFilter.isComplete()); + assertFalse(dynamicFilter.isAwaitable()); assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 1); stats = dynamicFilterService.getDynamicFilteringStats(queryId, session); @@ -483,6 +493,42 @@ public void testDynamicFilterCancellation() ImmutableMap.of(column, multipleValues(INTEGER, ImmutableList.of(1L, 2L))))); } + @Test + public void testIsAwaitable() + { + DynamicFilterService dynamicFilterService = new DynamicFilterService(new FeaturesConfig()); + DynamicFilterId filterId1 = new DynamicFilterId("df1"); + DynamicFilterId filterId2 = new DynamicFilterId("df2"); + Expression symbol = new Symbol("symbol").toSymbolReference(); + ColumnHandle handle = new TestingColumnHandle("probeColumnA"); + QueryId queryId = new QueryId("query"); + StageId stageId = new StageId(queryId, 1); + + TestDynamicFiltersStageSupplier dynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(SCHEDULING); + dynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId, 0))); + + dynamicFilterService.registerQuery( + queryId, + dynamicFiltersStageSupplier, + ImmutableSet.of(filterId1, filterId2), + ImmutableSet.of(filterId1), + ImmutableSet.of()); + + DynamicFilter dynamicFilter1 = dynamicFilterService.createDynamicFilter( + queryId, + ImmutableList.of(new DynamicFilters.Descriptor(filterId1, symbol)), + ImmutableMap.of(Symbol.from(symbol), handle)); + + DynamicFilter dynamicFilter2 = dynamicFilterService.createDynamicFilter( + queryId, + ImmutableList.of(new DynamicFilters.Descriptor(filterId2, symbol)), + ImmutableMap.of(Symbol.from(symbol), handle)); + + assertTrue(dynamicFilter1.isAwaitable()); + // non lazy dynamic filters are marked as non-awaitable + assertFalse(dynamicFilter2.isAwaitable()); + } + @Test public void testMultipleColumnMapping() { diff --git a/presto-main/src/test/java/io/prestosql/sql/planner/TestLocalDynamicFiltersCollector.java b/presto-main/src/test/java/io/prestosql/sql/planner/TestLocalDynamicFiltersCollector.java index 858c56b53b89..3d8f9e73030e 100644 --- a/presto-main/src/test/java/io/prestosql/sql/planner/TestLocalDynamicFiltersCollector.java +++ b/presto-main/src/test/java/io/prestosql/sql/planner/TestLocalDynamicFiltersCollector.java @@ -51,6 +51,7 @@ public void testSingle() // Filter is blocked and not completed. CompletableFuture isBlocked = filter.isBlocked(); assertFalse(filter.isComplete()); + assertTrue(filter.isAwaitable()); assertFalse(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.all()); @@ -59,6 +60,7 @@ public void testSingle() // Unblocked and completed. assertTrue(filter.isComplete()); + assertFalse(filter.isAwaitable()); assertTrue(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(column, domain))); } @@ -116,6 +118,7 @@ public void testMultipleProbeColumns() // Filter is blocked and not completed. CompletableFuture isBlocked = filter.isBlocked(); assertFalse(filter.isComplete()); + assertTrue(filter.isAwaitable()); assertFalse(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.all()); @@ -124,6 +127,7 @@ public void testMultipleProbeColumns() // Unblocked and completed. assertTrue(filter.isComplete()); + assertFalse(filter.isAwaitable()); assertTrue(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(column1, domain, column2, domain))); } @@ -149,6 +153,7 @@ public void testMultipleBuildColumnsSingleProbeColumn() // Filter is blocking and not completed. CompletableFuture isBlocked = filter.isBlocked(); assertFalse(filter.isComplete()); + assertTrue(filter.isAwaitable()); assertFalse(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.all()); @@ -157,6 +162,7 @@ public void testMultipleBuildColumnsSingleProbeColumn() // Unblocked, but not completed. assertFalse(filter.isComplete()); + assertTrue(filter.isAwaitable()); assertTrue(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains( ImmutableMap.of(column, Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L))))); @@ -165,12 +171,14 @@ public void testMultipleBuildColumnsSingleProbeColumn() isBlocked = filter.isBlocked(); assertFalse(isBlocked.isDone()); assertFalse(filter.isComplete()); + assertTrue(filter.isAwaitable()); collector.collectDynamicFilterDomains( ImmutableMap.of(filter2, Domain.multipleValues(BIGINT, ImmutableList.of(2L, 3L, 4L)))); // Unblocked and completed. assertTrue(filter.isComplete()); + assertFalse(filter.isAwaitable()); assertTrue(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains( ImmutableMap.of(column, Domain.multipleValues(BIGINT, ImmutableList.of(2L, 3L))))); @@ -195,6 +203,7 @@ public void testUnusedDynamicFilter() // Filter is blocking and not completed. CompletableFuture isBlocked = filter.isBlocked(); assertFalse(filter.isComplete()); + assertTrue(filter.isAwaitable()); assertFalse(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.all()); @@ -209,6 +218,7 @@ public void testUnusedDynamicFilter() // Unblocked and completed. assertTrue(filter.isComplete()); + assertFalse(filter.isAwaitable()); assertTrue(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(usedColumn, Domain.singleValue(BIGINT, 2L)))); } @@ -235,6 +245,7 @@ public void testUnregisteredDynamicFilter() // Filter is blocked and not completed. CompletableFuture isBlocked = filter.isBlocked(); assertFalse(filter.isComplete()); + assertTrue(filter.isAwaitable()); assertFalse(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.all()); @@ -242,6 +253,7 @@ public void testUnregisteredDynamicFilter() // Unblocked and completed (don't wait for filter2) assertTrue(filter.isComplete()); + assertFalse(filter.isAwaitable()); assertTrue(isBlocked.isDone()); assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(registeredColumn, Domain.singleValue(BIGINT, 2L)))); } diff --git a/presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryPageSourceProvider.java b/presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryPageSourceProvider.java index c9a02ff99738..4494c3de7596 100644 --- a/presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryPageSourceProvider.java +++ b/presto-memory/src/main/java/io/prestosql/plugin/memory/MemoryPageSourceProvider.java @@ -119,7 +119,7 @@ public boolean isFinished() @Override public Page getNextPage() { - if (enableLazyDynamicFiltering && !dynamicFilter.isComplete()) { + if (enableLazyDynamicFiltering && dynamicFilter.isAwaitable()) { return null; } TupleDomain predicate = dynamicFilter.getCurrentPredicate(); diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/DynamicFilter.java b/presto-spi/src/main/java/io/prestosql/spi/connector/DynamicFilter.java index 4bbef8ed4170..fdcc92aedcce 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/DynamicFilter.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/DynamicFilter.java @@ -35,6 +35,12 @@ public boolean isComplete() return true; } + @Override + public boolean isAwaitable() + { + return false; + } + @Override public TupleDomain getCurrentPredicate() { @@ -43,7 +49,9 @@ public TupleDomain getCurrentPredicate() }; /** - * Block until dynamic filter is narrowed down. + * Returned a future, which blocks until dynamic filter is narrowed down. Future + * completes immediately if filter cannot be narrowed down more or filter + * cannot be waited for (consult result of {@link DynamicFilter#isAwaitable()} method). * Dynamic filter might be narrowed down multiple times during query runtime. */ CompletableFuture isBlocked(); @@ -53,5 +61,12 @@ public TupleDomain getCurrentPredicate() */ boolean isComplete(); + /** + * Returns true if dynamic filter can be narrowed down more and + * {@link DynamicFilter#isBlocked()} method can be used to wait for + * narrowed filter. + */ + boolean isAwaitable(); + TupleDomain getCurrentPredicate(); } diff --git a/presto-tests/src/test/java/io/prestosql/execution/TestLazyCoordinatorDynamicFiltering.java b/presto-tests/src/test/java/io/prestosql/execution/TestLazyCoordinatorDynamicFiltering.java index 008237deb7af..1fe453c22bb1 100644 --- a/presto-tests/src/test/java/io/prestosql/execution/TestLazyCoordinatorDynamicFiltering.java +++ b/presto-tests/src/test/java/io/prestosql/execution/TestLazyCoordinatorDynamicFiltering.java @@ -211,17 +211,15 @@ public ConnectorSplitSource getSplits( @Override public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) { - CompletableFuture blocked = dynamicFilter.isBlocked(); - - if (blocked.isDone()) { - splitProduced.set(true); - return completedFuture(new ConnectorSplitBatch(ImmutableList.of(new EmptySplit(new CatalogName("test"))), isFinished())); + if (dynamicFilter.isAwaitable()) { + return dynamicFilter.isBlocked().thenApply(ignored -> { + // yield until dynamic filter is fully loaded + return new ConnectorSplitBatch(ImmutableList.of(), false); + }); } - return blocked.thenApply(ignored -> { - // yield until dynamic filter is fully loaded - return new ConnectorSplitBatch(ImmutableList.of(), false); - }); + splitProduced.set(true); + return completedFuture(new ConnectorSplitBatch(ImmutableList.of(new EmptySplit(new CatalogName("test"))), isFinished())); } @Override @@ -232,7 +230,7 @@ public void close() @Override public boolean isFinished() { - if (!dynamicFilter.isComplete() || !splitProduced.get()) { + if (!splitProduced.get()) { return false; }