Skip to content

Commit

Permalink
Add DynamicFilter#isAwaitable method
Browse files Browse the repository at this point in the history
DynamicFilter#isAwaitable method can be used together with
DynamicFilter#isBlocked to wait for narrowed dynamic filters.
Some dynamic filters cannot be waited for (e.g for replicated joins).
Therefore existing DynamicFilter#isComplete method cannot be used
to determine if one can still wait for dynamic filter to be narrowed
(isComplete could return false while isBlocked would return completed
future immediatelly).
  • Loading branch information
sopel39 committed Sep 11, 2020
1 parent e7f8dd9 commit cb300dc
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public TaskStatus process()
// 1. Completion of DynamicFilter
// 2. Timeout after waiting for the configured time
long timeLeft = dynamicFilteringProbeBlockingTimeoutMillis - stopwatch.elapsed(MILLISECONDS);
if (timeLeft > 0 && !dynamicFilter.isComplete()) {
if (timeLeft > 0 && dynamicFilter.isAwaitable()) {
future = toListenableFuture(dynamicFilter.isBlocked().orTimeout(timeLeft, MILLISECONDS));
return TaskStatus.continueOn(future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ public void testIncompleteDynamicFilterTimeout()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
new DynamicFilter() {
new DynamicFilter()
{
@Override
public CompletableFuture<?> isBlocked()
{
Expand All @@ -371,6 +372,12 @@ public boolean isComplete()
return false;
}

@Override
public boolean isAwaitable()
{
return true;
}

@Override
public TupleDomain<ColumnHandle> getCurrentPredicate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import static io.airlift.concurrent.MoreFutures.unmodifiableFuture;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.prestosql.operator.JoinUtils.isBuildSideReplicated;
import static io.prestosql.spi.connector.DynamicFilter.EMPTY;
import static io.prestosql.spi.predicate.Domain.union;
import static io.prestosql.sql.DynamicFilters.extractDynamicFilters;
Expand Down Expand Up @@ -232,6 +231,13 @@ public boolean isComplete()
.allMatch(context.getDynamicFilterSummaries()::containsKey);
}

@Override
public boolean isAwaitable()
{
return lazyDynamicFilterFutures.stream()
.anyMatch(future -> !future.isDone());
}

@Override
public TupleDomain<ColumnHandle> getCurrentPredicate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ public synchronized boolean isComplete()
return futuresLeft == 0;
}

@Override
public synchronized boolean isAwaitable()
{
return futuresLeft > 0;
}

@Override
public synchronized TupleDomain<ColumnHandle> getCurrentPredicate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public void testDynamicFilter()

assertTrue(dynamicFilter.getCurrentPredicate().isAll());
assertFalse(dynamicFilter.isComplete());
assertTrue(dynamicFilter.isAwaitable());

// assert initial dynamic filtering stats
DynamicFiltersStats stats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
Expand All @@ -192,6 +193,7 @@ public void testDynamicFilter()
// tuple domain from two tasks are needed for dynamic filter to be narrowed down
assertTrue(dynamicFilter.getCurrentPredicate().isAll());
assertFalse(dynamicFilter.isComplete());
assertTrue(dynamicFilter.isAwaitable());
assertFalse(blockedFuture.isDone());
assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 1);

Expand All @@ -213,6 +215,7 @@ public void testDynamicFilter()

// there are still more dynamic filters to be collected
assertFalse(dynamicFilter.isComplete());
assertTrue(dynamicFilter.isAwaitable());
blockedFuture = dynamicFilter.isBlocked();
assertFalse(blockedFuture.isDone());

Expand All @@ -227,6 +230,7 @@ public void testDynamicFilter()
new TestingColumnHandle("probeColumnA"),
multipleValues(INTEGER, ImmutableList.of(1L, 2L)))));
assertFalse(dynamicFilter.isComplete());
assertTrue(dynamicFilter.isAwaitable());
assertFalse(blockedFuture.isDone());
assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 3);

Expand All @@ -251,6 +255,7 @@ public void testDynamicFilter()

// there are still more dynamic filters to be collected for columns A and B
assertFalse(dynamicFilter.isComplete());
assertTrue(dynamicFilter.isAwaitable());
blockedFuture = dynamicFilter.isBlocked();
assertFalse(blockedFuture.isDone());

Expand All @@ -265,6 +270,7 @@ public void testDynamicFilter()
Symbol.from(df2), new TestingColumnHandle("probeColumnA")));

assertTrue(dynamicFilterColumnA.isComplete());
assertFalse(dynamicFilterColumnA.isAwaitable());
assertTrue(dynamicFilterColumnA.isBlocked().isDone());
assertEquals(dynamicFilterColumnA.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(
new TestingColumnHandle("probeColumnA"),
Expand All @@ -281,6 +287,7 @@ public void testDynamicFilter()
new TestingColumnHandle("probeColumnA"),
singleValue(INTEGER, 2L))));
assertFalse(dynamicFilter.isComplete());
assertTrue(dynamicFilter.isAwaitable());
assertFalse(blockedFuture.isDone());
assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 5);

Expand Down Expand Up @@ -313,6 +320,7 @@ filterId2, getExpectedDomainString(2L, 3L), 2, 0),
// all dynamic filters have been collected, no need for more requests
dynamicFilterService.collectDynamicFilters();
assertTrue(dynamicFilter.isComplete());
assertFalse(dynamicFilter.isAwaitable());
assertTrue(dynamicFilter.isBlocked().isDone());
assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 6);
}
Expand Down Expand Up @@ -391,7 +399,6 @@ public void testReplicatedDynamicFilter()
ImmutableMap.of(
Symbol.from(df1), new TestingColumnHandle("probeColumnA")));
assertTrue(dynamicFilter.getCurrentPredicate().isAll());
assertFalse(dynamicFilter.isComplete());

// assert initial dynamic filtering stats
DynamicFiltersStats stats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
Expand All @@ -401,6 +408,8 @@ public void testReplicatedDynamicFilter()
assertEquals(stats.getLazyDynamicFilters(), 0);

// replicated dynamic filters cannot be lazy due to replicated join task scheduling dependencies
assertFalse(dynamicFilter.isComplete());
assertFalse(dynamicFilter.isAwaitable());
assertTrue(dynamicFilter.isBlocked().isDone());

dynamicFiltersStageSupplier.storeSummary(
Expand All @@ -414,6 +423,7 @@ public void testReplicatedDynamicFilter()
new TestingColumnHandle("probeColumnA"),
singleValue(INTEGER, 1L))));
assertTrue(dynamicFilter.isComplete());
assertFalse(dynamicFilter.isAwaitable());
assertEquals(dynamicFiltersStageSupplier.getRequestCount(), 1);

stats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
Expand Down Expand Up @@ -483,6 +493,42 @@ public void testDynamicFilterCancellation()
ImmutableMap.of(column, multipleValues(INTEGER, ImmutableList.of(1L, 2L)))));
}

@Test
public void testIsAwaitable()
{
DynamicFilterService dynamicFilterService = new DynamicFilterService(new FeaturesConfig());
DynamicFilterId filterId1 = new DynamicFilterId("df1");
DynamicFilterId filterId2 = new DynamicFilterId("df2");
Expression symbol = new Symbol("symbol").toSymbolReference();
ColumnHandle handle = new TestingColumnHandle("probeColumnA");
QueryId queryId = new QueryId("query");
StageId stageId = new StageId(queryId, 1);

TestDynamicFiltersStageSupplier dynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(SCHEDULING);
dynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId, 0)));

dynamicFilterService.registerQuery(
queryId,
dynamicFiltersStageSupplier,
ImmutableSet.of(filterId1, filterId2),
ImmutableSet.of(filterId1),
ImmutableSet.of());

DynamicFilter dynamicFilter1 = dynamicFilterService.createDynamicFilter(
queryId,
ImmutableList.of(new DynamicFilters.Descriptor(filterId1, symbol)),
ImmutableMap.of(Symbol.from(symbol), handle));

DynamicFilter dynamicFilter2 = dynamicFilterService.createDynamicFilter(
queryId,
ImmutableList.of(new DynamicFilters.Descriptor(filterId2, symbol)),
ImmutableMap.of(Symbol.from(symbol), handle));

assertTrue(dynamicFilter1.isAwaitable());
// non lazy dynamic filters are marked as non-awaitable
assertFalse(dynamicFilter2.isAwaitable());
}

@Test
public void testMultipleColumnMapping()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testSingle()
// Filter is blocked and not completed.
CompletableFuture<?> isBlocked = filter.isBlocked();
assertFalse(filter.isComplete());
assertTrue(filter.isAwaitable());
assertFalse(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.all());

Expand All @@ -59,6 +60,7 @@ public void testSingle()

// Unblocked and completed.
assertTrue(filter.isComplete());
assertFalse(filter.isAwaitable());
assertTrue(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(column, domain)));
}
Expand Down Expand Up @@ -116,6 +118,7 @@ public void testMultipleProbeColumns()
// Filter is blocked and not completed.
CompletableFuture<?> isBlocked = filter.isBlocked();
assertFalse(filter.isComplete());
assertTrue(filter.isAwaitable());
assertFalse(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.all());

Expand All @@ -124,6 +127,7 @@ public void testMultipleProbeColumns()

// Unblocked and completed.
assertTrue(filter.isComplete());
assertFalse(filter.isAwaitable());
assertTrue(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(column1, domain, column2, domain)));
}
Expand All @@ -149,6 +153,7 @@ public void testMultipleBuildColumnsSingleProbeColumn()
// Filter is blocking and not completed.
CompletableFuture<?> isBlocked = filter.isBlocked();
assertFalse(filter.isComplete());
assertTrue(filter.isAwaitable());
assertFalse(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.all());

Expand All @@ -157,6 +162,7 @@ public void testMultipleBuildColumnsSingleProbeColumn()

// Unblocked, but not completed.
assertFalse(filter.isComplete());
assertTrue(filter.isAwaitable());
assertTrue(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(
ImmutableMap.of(column, Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L)))));
Expand All @@ -165,12 +171,14 @@ public void testMultipleBuildColumnsSingleProbeColumn()
isBlocked = filter.isBlocked();
assertFalse(isBlocked.isDone());
assertFalse(filter.isComplete());
assertTrue(filter.isAwaitable());

collector.collectDynamicFilterDomains(
ImmutableMap.of(filter2, Domain.multipleValues(BIGINT, ImmutableList.of(2L, 3L, 4L))));

// Unblocked and completed.
assertTrue(filter.isComplete());
assertFalse(filter.isAwaitable());
assertTrue(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(
ImmutableMap.of(column, Domain.multipleValues(BIGINT, ImmutableList.of(2L, 3L)))));
Expand All @@ -195,6 +203,7 @@ public void testUnusedDynamicFilter()
// Filter is blocking and not completed.
CompletableFuture<?> isBlocked = filter.isBlocked();
assertFalse(filter.isComplete());
assertTrue(filter.isAwaitable());
assertFalse(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.all());

Expand All @@ -209,6 +218,7 @@ public void testUnusedDynamicFilter()

// Unblocked and completed.
assertTrue(filter.isComplete());
assertFalse(filter.isAwaitable());
assertTrue(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(usedColumn, Domain.singleValue(BIGINT, 2L))));
}
Expand All @@ -235,13 +245,15 @@ public void testUnregisteredDynamicFilter()
// Filter is blocked and not completed.
CompletableFuture<?> isBlocked = filter.isBlocked();
assertFalse(filter.isComplete());
assertTrue(filter.isAwaitable());
assertFalse(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.all());

collector.collectDynamicFilterDomains(ImmutableMap.of(registeredFilterId, Domain.singleValue(BIGINT, 2L)));

// Unblocked and completed (don't wait for filter2)
assertTrue(filter.isComplete());
assertFalse(filter.isAwaitable());
assertTrue(isBlocked.isDone());
assertEquals(filter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(registeredColumn, Domain.singleValue(BIGINT, 2L))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public boolean isFinished()
@Override
public Page getNextPage()
{
if (enableLazyDynamicFiltering && !dynamicFilter.isComplete()) {
if (enableLazyDynamicFiltering && dynamicFilter.isAwaitable()) {
return null;
}
TupleDomain<ColumnHandle> predicate = dynamicFilter.getCurrentPredicate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public boolean isComplete()
return true;
}

@Override
public boolean isAwaitable()
{
return false;
}

@Override
public TupleDomain<ColumnHandle> getCurrentPredicate()
{
Expand All @@ -43,7 +49,9 @@ public TupleDomain<ColumnHandle> getCurrentPredicate()
};

/**
* Block until dynamic filter is narrowed down.
* Returned a future, which blocks until dynamic filter is narrowed down. Future
* completes immediately if filter cannot be narrowed down more or filter
* cannot be waited for (consult result of {@link DynamicFilter#isAwaitable()} method).
* Dynamic filter might be narrowed down multiple times during query runtime.
*/
CompletableFuture<?> isBlocked();
Expand All @@ -53,5 +61,12 @@ public TupleDomain<ColumnHandle> getCurrentPredicate()
*/
boolean isComplete();

/**
* Returns true if dynamic filter can be narrowed down more and
* {@link DynamicFilter#isBlocked()} method can be used to wait for
* narrowed filter.
*/
boolean isAwaitable();

TupleDomain<ColumnHandle> getCurrentPredicate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,15 @@ public ConnectorSplitSource getSplits(
@Override
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
{
CompletableFuture<?> blocked = dynamicFilter.isBlocked();

if (blocked.isDone()) {
splitProduced.set(true);
return completedFuture(new ConnectorSplitBatch(ImmutableList.of(new EmptySplit(new CatalogName("test"))), isFinished()));
if (dynamicFilter.isAwaitable()) {
return dynamicFilter.isBlocked().thenApply(ignored -> {
// yield until dynamic filter is fully loaded
return new ConnectorSplitBatch(ImmutableList.of(), false);
});
}

return blocked.thenApply(ignored -> {
// yield until dynamic filter is fully loaded
return new ConnectorSplitBatch(ImmutableList.of(), false);
});
splitProduced.set(true);
return completedFuture(new ConnectorSplitBatch(ImmutableList.of(new EmptySplit(new CatalogName("test"))), isFinished()));
}

@Override
Expand All @@ -232,7 +230,7 @@ public void close()
@Override
public boolean isFinished()
{
if (!dynamicFilter.isComplete() || !splitProduced.get()) {
if (!splitProduced.get()) {
return false;
}

Expand Down

0 comments on commit cb300dc

Please sign in to comment.