Skip to content

Commit

Permalink
Support for lazy dynamic filters for replicated joins
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Sep 15, 2020
1 parent 0fb16ab commit 22e5d39
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ private void planDistribution(PlanRoot plan)
rootOutputBuffers,
nodeTaskMap,
executionPolicy,
schedulerStats);
schedulerStats,
dynamicFilterService);

queryScheduler.set(scheduler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.operator.StageExecutionDescriptor;
import io.prestosql.server.DynamicFilterService;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
import io.prestosql.split.SplitSource;
import io.prestosql.sql.planner.plan.PlanNodeId;
Expand Down Expand Up @@ -73,7 +74,8 @@ public FixedSourcePartitionedScheduler(
int splitBatchSize,
OptionalInt concurrentLifespansPerTask,
NodeSelector nodeSelector,
List<ConnectorPartitionHandle> partitionHandles)
List<ConnectorPartitionHandle> partitionHandles,
DynamicFilterService dynamicFilterService)
{
requireNonNull(stage, "stage is null");
requireNonNull(splitSources, "splitSources is null");
Expand Down Expand Up @@ -116,6 +118,7 @@ public FixedSourcePartitionedScheduler(
splitPlacementPolicy,
Math.max(splitBatchSize / concurrentLifespans, 1),
groupedExecutionForScanNode,
dynamicFilterService,
() -> true);

if (stageExecutionDescriptor.isStageGroupedExecution() && !groupedExecutionForScanNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.prestosql.execution.scheduler.FixedSourcePartitionedScheduler.BucketedSplitPlacementPolicy;
import io.prestosql.metadata.InternalNode;
import io.prestosql.metadata.Split;
import io.prestosql.server.DynamicFilterService;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
import io.prestosql.split.EmptySplit;
import io.prestosql.split.SplitSource;
Expand All @@ -40,6 +41,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -91,6 +93,7 @@ private enum State
private final int splitBatchSize;
private final PlanNodeId partitionedNode;
private final boolean groupedExecution;
private final DynamicFilterService dynamicFilterService;
private final BooleanSupplier anySourceTaskBlocked;

private final Map<Lifespan, ScheduleGroup> scheduleGroups = new HashMap<>();
Expand All @@ -106,12 +109,14 @@ private SourcePartitionedScheduler(
SplitPlacementPolicy splitPlacementPolicy,
int splitBatchSize,
boolean groupedExecution,
DynamicFilterService dynamicFilterService,
BooleanSupplier anySourceTaskBlocked)
{
this.stage = requireNonNull(stage, "stage is null");
this.partitionedNode = requireNonNull(partitionedNode, "partitionedNode is null");
this.splitSource = requireNonNull(splitSource, "splitSource is null");
this.splitPlacementPolicy = requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.anySourceTaskBlocked = requireNonNull(anySourceTaskBlocked, "anySourceTaskBlocked is null");

checkArgument(splitBatchSize > 0, "splitBatchSize must be at least one");
Expand All @@ -138,6 +143,7 @@ public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
SplitSource splitSource,
SplitPlacementPolicy splitPlacementPolicy,
int splitBatchSize,
DynamicFilterService dynamicFilterService,
BooleanSupplier anySourceTaskBlocked)
{
SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler(
Expand All @@ -147,6 +153,7 @@ public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
splitPlacementPolicy,
splitBatchSize,
false,
dynamicFilterService,
anySourceTaskBlocked);
sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NOT_PARTITIONED);
sourcePartitionedScheduler.noMoreLifespans();
Expand Down Expand Up @@ -187,6 +194,7 @@ public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(
SplitPlacementPolicy splitPlacementPolicy,
int splitBatchSize,
boolean groupedExecution,
DynamicFilterService dynamicFilterService,
BooleanSupplier anySourceTaskBlocked)
{
return new SourcePartitionedScheduler(
Expand All @@ -196,6 +204,7 @@ public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(
splitPlacementPolicy,
splitBatchSize,
groupedExecution,
dynamicFilterService,
anySourceTaskBlocked);
}

Expand Down Expand Up @@ -366,10 +375,24 @@ else if (pendingSplits.isEmpty()) {
return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
}

if (anyBlockedOnNextSplitBatch
&& stage.getScheduledNodes().isEmpty()
&& dynamicFilterService.isCollectingTaskNeeded(stage.getStageId().getQueryId(), stage.getFragment())) {
// schedule a task for collecting dynamic filters in case probe split generator is waiting for them
overallNewTasks.addAll(createTaskOnRandomNode());
}

boolean anySourceTaskBlocked = this.anySourceTaskBlocked.getAsBoolean();
if (anySourceTaskBlocked) {
// Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.
// In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock.
dynamicFilterService.unblockStageDynamicFilters(stage.getStageId().getQueryId(), stage.getFragment());
}

if (groupedExecution) {
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
}
else if (anyBlockedOnPlacements && anySourceTaskBlocked.getAsBoolean()) {
else if (anyBlockedOnPlacements && anySourceTaskBlocked) {
// In a broadcast join, output buffers of the tasks in build source stage have to
// hold onto all data produced before probe side task scheduling finishes,
// even if the data is acknowledged by all known consumers. This is because
Expand Down Expand Up @@ -478,6 +501,15 @@ private Set<RemoteTask> assignSplits(Multimap<InternalNode, Split> splitAssignme
return newTasks.build();
}

private Set<RemoteTask> createTaskOnRandomNode()
{
checkState(stage.getScheduledNodes().isEmpty(), "Stage task is already scheduled on node");
List<InternalNode> allNodes = splitPlacementPolicy.allNodes();
checkState(allNodes.size() > 0, "No nodes available");
InternalNode node = allNodes.get(ThreadLocalRandom.current().nextInt(0, allNodes.size()));
return stage.scheduleSplits(node, ImmutableMultimap.of(), ImmutableMultimap.of());
}

private Set<RemoteTask> finalizeTaskCreationIfNecessary()
{
// only lock down tasks if there is a sub stage that could block waiting for this stage to create all tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.prestosql.execution.buffer.OutputBuffers.OutputBufferId;
import io.prestosql.failuredetector.FailureDetector;
import io.prestosql.metadata.InternalNode;
import io.prestosql.server.DynamicFilterService;
import io.prestosql.server.DynamicFilterService.StageDynamicFilters;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class SqlQueryScheduler
private final Map<StageId, StageLinkage> stageLinkages;
private final SplitSchedulerStats schedulerStats;
private final boolean summarizeTaskInfo;
private final DynamicFilterService dynamicFilterService;
private final AtomicBoolean started = new AtomicBoolean();

public static SqlQueryScheduler createSqlQueryScheduler(
Expand All @@ -137,7 +139,8 @@ public static SqlQueryScheduler createSqlQueryScheduler(
OutputBuffers rootOutputBuffers,
NodeTaskMap nodeTaskMap,
ExecutionPolicy executionPolicy,
SplitSchedulerStats schedulerStats)
SplitSchedulerStats schedulerStats,
DynamicFilterService dynamicFilterService)
{
SqlQueryScheduler sqlQueryScheduler = new SqlQueryScheduler(
queryStateMachine,
Expand All @@ -154,7 +157,8 @@ public static SqlQueryScheduler createSqlQueryScheduler(
rootOutputBuffers,
nodeTaskMap,
executionPolicy,
schedulerStats);
schedulerStats,
dynamicFilterService);
sqlQueryScheduler.initialize();
return sqlQueryScheduler;
}
Expand All @@ -174,12 +178,14 @@ private SqlQueryScheduler(
OutputBuffers rootOutputBuffers,
NodeTaskMap nodeTaskMap,
ExecutionPolicy executionPolicy,
SplitSchedulerStats schedulerStats)
SplitSchedulerStats schedulerStats,
DynamicFilterService dynamicFilterService)
{
this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null");
this.executionPolicy = requireNonNull(executionPolicy, "schedulerPolicyFactory is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
this.summarizeTaskInfo = summarizeTaskInfo;
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");

// todo come up with a better way to build this, or eliminate this map
ImmutableMap.Builder<StageId, StageScheduler> stageSchedulers = ImmutableMap.builder();
Expand Down Expand Up @@ -356,6 +362,7 @@ private List<SqlStageExecution> createStages(
splitSource,
placementPolicy,
splitBatchSize,
dynamicFilterService,
() -> childStages.stream().anyMatch(SqlStageExecution::isAnyTaskBlocked)));
}
else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
Expand Down Expand Up @@ -433,7 +440,8 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
splitBatchSize,
getConcurrentLifespansPerNode(session),
nodeScheduler.createNodeSelector(catalogName),
connectorPartitionHandles));
connectorPartitionHandles,
dynamicFilterService));
}
else {
// all sources are remote
Expand Down
15 changes: 13 additions & 2 deletions presto-main/src/main/java/io/prestosql/operator/JoinUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.prestosql.sql.planner.plan.JoinNode;
import io.prestosql.sql.planner.plan.PlanNode;
import io.prestosql.sql.planner.plan.ProjectNode;
import io.prestosql.sql.planner.plan.RemoteSourceNode;
import io.prestosql.sql.planner.plan.SemiJoinNode;
import io.prestosql.util.MorePredicates;

Expand Down Expand Up @@ -67,14 +68,14 @@ public static boolean isBuildSideReplicated(PlanNode node)
.recurseOnlyWhen(
MorePredicates.<PlanNode>isInstanceOfAny(ProjectNode.class)
.or(JoinUtils::isLocalRepartitionExchange))
.where(JoinUtils::isRemoteReplicatedExchange)
.where(joinNode -> isRemoteReplicatedExchange(joinNode) || isRemoteReplicatedSourceNode(joinNode))
.matches();
}
return PlanNodeSearcher.searchFrom(((SemiJoinNode) node).getFilteringSource())
.recurseOnlyWhen(
MorePredicates.<PlanNode>isInstanceOfAny(ProjectNode.class)
.or(JoinUtils::isLocalGatherExchange))
.where(JoinUtils::isRemoteReplicatedExchange)
.where(joinNode -> isRemoteReplicatedExchange(joinNode) || isRemoteReplicatedSourceNode(joinNode))
.matches();
}

Expand All @@ -88,6 +89,16 @@ private static boolean isRemoteReplicatedExchange(PlanNode node)
return exchangeNode.getScope() == REMOTE && exchangeNode.getType() == REPLICATE;
}

private static boolean isRemoteReplicatedSourceNode(PlanNode node)
{
if (!(node instanceof RemoteSourceNode)) {
return false;
}

RemoteSourceNode remoteSourceNode = (RemoteSourceNode) node;
return remoteSourceNode.getExchangeType() == REPLICATE;
}

private static boolean isLocalRepartitionExchange(PlanNode node)
{
if (!(node instanceof ExchangeNode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.intersection;
import static com.google.common.collect.Sets.union;
import static io.airlift.concurrent.MoreFutures.toCompletableFuture;
import static io.airlift.concurrent.MoreFutures.unmodifiableFuture;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
Expand All @@ -76,6 +78,7 @@
import static io.prestosql.spi.predicate.Domain.union;
import static io.prestosql.sql.DynamicFilters.extractDynamicFilters;
import static io.prestosql.sql.planner.ExpressionExtractor.extractExpressions;
import static io.prestosql.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static io.prestosql.util.MorePredicates.isInstanceOfAny;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Expand Down Expand Up @@ -131,7 +134,7 @@ public void registerQuery(SqlQueryExecution sqlQueryExecution, SubPlan fragmente
}

@VisibleForTesting
void registerQuery(
public void registerQuery(
QueryId queryId,
Supplier<List<StageDynamicFilters>> stageDynamicFiltersSupplier,
Set<DynamicFilterId> dynamicFilters,
Expand Down Expand Up @@ -189,6 +192,41 @@ public void removeQuery(QueryId queryId)
dynamicFilterContexts.remove(queryId);
}

/**
* Dynamic filters are collected in same stage as the join operator. This can result in deadlock
* for source stage joins and connectors that wait for dynamic filters before generating splits
* (probe splits might be blocked on dynamic filters which require at least one probe task in order to be collected).
* To overcome this issue an initial task is created for source stages running broadcast join operator.
* This task allows for dynamic filters collection without any probe side splits being scheduled.
*/
public boolean isCollectingTaskNeeded(QueryId queryId, PlanFragment plan)
{
DynamicFilterContext context = dynamicFilterContexts.get(queryId);
if (context == null) {
// query has been removed or not registered (e.g dynamic filtering is disabled)
return false;
}

return !getSourceStageInnerLazyDynamicFilters(plan).isEmpty();
}

/**
* Join build source tasks might become blocked waiting for join stage to collect build data.
* In such case dynamic filters must be unblocked (and probe split generation resumed) for
* source stage containing joins to allow build source tasks to flush data and complete.
*/
public void unblockStageDynamicFilters(QueryId queryId, PlanFragment plan)
{
DynamicFilterContext context = dynamicFilterContexts.get(queryId);
if (context == null) {
// query has been removed or not registered (e.g dynamic filtering is disabled)
return;
}

getSourceStageInnerLazyDynamicFilters(plan).forEach(filter ->
requireNonNull(context.getLazyDynamicFilters().get(filter), "Future not found").set(null));
}

public DynamicFilter createDynamicFilter(QueryId queryId, List<DynamicFilters.Descriptor> dynamicFilterDescriptors, Map<Symbol, ColumnHandle> columnHandles)
{
Multimap<DynamicFilterId, ColumnHandle> sourceColumnHandles = extractSourceColumnHandles(dynamicFilterDescriptors, columnHandles);
Expand Down Expand Up @@ -327,8 +365,28 @@ private static Multimap<DynamicFilterId, ColumnHandle> extractSourceColumnHandle

private static Set<DynamicFilterId> getLazyDynamicFilters(PlanFragment plan)
{
// lazy dynamic filters cannot be consumed by the same stage where they are produced as it would result in query deadlock
return difference(getProducedDynamicFilters(plan.getRoot()), getConsumedDynamicFilters(plan.getRoot()));
// To prevent deadlock dynamic filter can be lazy only when:
// 1. it's consumed by different stage from where it's produced
// 2. or it's produced by replicated join in source stage. In such case an extra
// task is created that will collect dynamic filter and prevent deadlock.
Set<DynamicFilterId> interStageDynamicFilters = difference(getProducedDynamicFilters(plan.getRoot()), getConsumedDynamicFilters(plan.getRoot()));
return ImmutableSet.copyOf(union(interStageDynamicFilters, getSourceStageInnerLazyDynamicFilters(plan)));
}

@VisibleForTesting
static Set<DynamicFilterId> getSourceStageInnerLazyDynamicFilters(PlanFragment plan)
{
if (!plan.getPartitioning().equals(SOURCE_DISTRIBUTION)) {
// Only non-fixed source stages can have (replicated) lazy dynamic filters that are
// produced and consumed within stage. This is because for such stages an extra
// dynamic filtering collecting task can be added.
return ImmutableSet.of();
}

PlanNode planNode = plan.getRoot();
Set<DynamicFilterId> innerStageDynamicFilters = intersection(getProducedDynamicFilters(planNode), getConsumedDynamicFilters(planNode));
Set<DynamicFilterId> replicatedDynamicFilters = getReplicatedDynamicFilters(planNode);
return ImmutableSet.copyOf(intersection(innerStageDynamicFilters, replicatedDynamicFilters));
}

private static Set<DynamicFilterId> getReplicatedDynamicFilters(PlanNode planNode)
Expand Down Expand Up @@ -581,10 +639,7 @@ private void addDynamicFilters(Map<DynamicFilterId, Domain> newDynamicFilters)
{
newDynamicFilters.forEach((filter, domain) -> {
dynamicFilterSummaries.put(filter, domain);
SettableFuture<?> future = lazyDynamicFilters.get(filter);
if (future != null) {
checkState(future.set(null), "Same future set twice");
}
Optional.ofNullable(lazyDynamicFilters.get(filter)).ifPresent(future -> future.set(null));
});

// stop collecting dynamic filters for query when all dynamic filters have been collected
Expand Down
Loading

0 comments on commit 22e5d39

Please sign in to comment.