Skip to content

Commit

Permalink
Consider stage balancing only in UniformNodeSelector
Browse files Browse the repository at this point in the history
Previously, new stage splits were balanced against
all splits running on worker nodes (that includes
splits from other stages and queries). However,
this leads to non-deterministic scheduling where
in certain situations some stage with long running
splits might be fully scheduled on subset of worker
nodes. This PR makes UniformNodeSelector only
consider stage balancing on candidate nodes which
have sufficient split queue length.
  • Loading branch information
sopel39 committed Jan 24, 2022
1 parent 08cd12e commit b1e66fc
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ public enum NodeSchedulerPolicy
UNIFORM, TOPOLOGY
}

public enum SplitsBalancingPolicy
{
NODE, STAGE
}

private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM;
private boolean optimizedLocalScheduling = true;
private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE;
private int maxUnacknowledgedSplitsPerTask = 500;

@NotNull
Expand Down Expand Up @@ -132,6 +138,20 @@ public NodeSchedulerConfig setMaxUnacknowledgedSplitsPerTask(int maxUnacknowledg
return this;
}

@NotNull
public SplitsBalancingPolicy getSplitsBalancingPolicy()
{
return splitsBalancingPolicy;
}

@Config("node-scheduler.splits-balancing-policy")
@ConfigDescription("Strategy for balancing new splits on worker nodes")
public NodeSchedulerConfig setSplitsBalancingPolicy(SplitsBalancingPolicy splitsBalancingPolicy)
{
this.splitsBalancingPolicy = splitsBalancingPolicy;
return this;
}

public boolean getOptimizedLocalScheduling()
{
return optimizedLocalScheduling;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
Expand All @@ -24,13 +25,16 @@
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.resourcegroups.IndexedPriorityQueue;
import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;

import javax.annotation.Nullable;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
Expand Down Expand Up @@ -68,6 +72,7 @@ public class UniformNodeSelector
private final long maxSplitsWeightPerNode;
private final long maxPendingSplitsWeightPerTask;
private final int maxUnacknowledgedSplitsPerTask;
private final SplitsBalancingPolicy splitsBalancingPolicy;
private final boolean optimizedLocalScheduling;

public UniformNodeSelector(
Expand All @@ -79,6 +84,7 @@ public UniformNodeSelector(
long maxSplitsWeightPerNode,
long maxPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
SplitsBalancingPolicy splitsBalancingPolicy,
boolean optimizedLocalScheduling)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
Expand All @@ -90,6 +96,7 @@ public UniformNodeSelector(
this.maxPendingSplitsWeightPerTask = maxPendingSplitsWeightPerTask;
this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask;
checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask);
this.splitsBalancingPolicy = requireNonNull(splitsBalancingPolicy, "splitsBalancingPolicy is null");
this.optimizedLocalScheduling = optimizedLocalScheduling;
}

Expand Down Expand Up @@ -171,21 +178,12 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
throw new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}

InternalNode chosenNode = null;
long minWeight = Long.MAX_VALUE;

for (InternalNode node : candidateNodes) {
long totalSplitsWeight = assignmentStats.getTotalSplitsWeight(node);
if (totalSplitsWeight < minWeight && totalSplitsWeight < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
chosenNode = node;
minWeight = totalSplitsWeight;
}
}
InternalNode chosenNode = chooseNodeForSplit(assignmentStats, candidateNodes);
if (chosenNode == null) {
// minWeight is guaranteed to be MAX_VALUE at this line
long minWeight = Long.MAX_VALUE;
for (InternalNode node : candidateNodes) {
long queuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node);
if (queuedWeight < minWeight && queuedWeight < maxPendingSplitsWeightPerTask && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
if (queuedWeight <= minWeight && queuedWeight < maxPendingSplitsWeightPerTask && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
chosenNode = node;
minWeight = queuedWeight;
}
Expand Down Expand Up @@ -226,6 +224,50 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splits, existingTasks, bucketNodeMap);
}

@Nullable
private InternalNode chooseNodeForSplit(NodeAssignmentStats assignmentStats, List<InternalNode> candidateNodes)
{
InternalNode chosenNode = null;
long minWeight = Long.MAX_VALUE;

List<InternalNode> freeNodes = getFreeNodesForStage(assignmentStats, candidateNodes);
switch (splitsBalancingPolicy) {
case STAGE:
for (InternalNode node : freeNodes) {
long queuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node);
if (queuedWeight <= minWeight) {
chosenNode = node;
minWeight = queuedWeight;
}
}
break;
case NODE:
for (InternalNode node : freeNodes) {
long totalSplitsWeight = assignmentStats.getTotalSplitsWeight(node);
if (totalSplitsWeight <= minWeight) {
chosenNode = node;
minWeight = totalSplitsWeight;
}
}
break;
default:
throw new UnsupportedOperationException("Unsupported split balancing policy " + splitsBalancingPolicy);
}

return chosenNode;
}

private List<InternalNode> getFreeNodesForStage(NodeAssignmentStats assignmentStats, List<InternalNode> nodes)
{
ImmutableList.Builder<InternalNode> freeNodes = ImmutableList.builder();
for (InternalNode node : nodes) {
if (assignmentStats.getTotalSplitsWeight(node) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
freeNodes.add(node);
}
}
return freeNodes.build();
}

/**
* The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and a minHeap
* based on the number of splits that are assigned to them. Splits are redistributed, one at a time, from a maxNode to a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.Session;
import io.trino.connector.CatalogName;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.plugin.base.cache.NonEvictableCache;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class UniformNodeSelectorFactory
private final boolean includeCoordinator;
private final long maxSplitsWeightPerNode;
private final long maxPendingSplitsWeightPerTask;
private final SplitsBalancingPolicy splitsBalancingPolicy;
private final boolean optimizedLocalScheduling;
private final NodeTaskMap nodeTaskMap;
private final Duration nodeMapMemoizationDuration;
Expand Down Expand Up @@ -88,6 +90,7 @@ public UniformNodeSelectorFactory(
this.nodeManager = nodeManager;
this.minCandidates = config.getMinCandidates();
this.includeCoordinator = config.isIncludeCoordinator();
this.splitsBalancingPolicy = config.getSplitsBalancingPolicy();
this.optimizedLocalScheduling = config.getOptimizedLocalScheduling();
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
int maxSplitsPerNode = config.getMaxSplitsPerNode();
Expand Down Expand Up @@ -124,6 +127,7 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogName> ca
maxSplitsWeightPerNode,
maxPendingSplitsWeightPerTask,
getMaxUnacknowledgedSplitsPerTask(session),
splitsBalancingPolicy,
optimizedLocalScheduling);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;
import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE;

public class TestNodeSchedulerConfig
{
Expand All @@ -36,6 +37,7 @@ public void testDefaults()
.setMaxPendingSplitsPerTask(10)
.setMaxUnacknowledgedSplitsPerTask(500)
.setIncludeCoordinator(true)
.setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE)
.setOptimizedLocalScheduling(true));
}

Expand All @@ -49,6 +51,7 @@ public void testExplicitPropertyMappings()
.put("node-scheduler.max-pending-splits-per-task", "11")
.put("node-scheduler.max-splits-per-node", "101")
.put("node-scheduler.max-unacknowledged-splits-per-task", "501")
.put("node-scheduler.splits-balancing-policy", "node")
.put("node-scheduler.optimized-local-scheduling", "false")
.buildOrThrow();

Expand All @@ -59,6 +62,7 @@ public void testExplicitPropertyMappings()
.setMaxPendingSplitsPerTask(11)
.setMaxUnacknowledgedSplitsPerTask(501)
.setMinCandidates(11)
.setSplitsBalancingPolicy(NODE)
.setOptimizedLocalScheduling(false);

assertFullMapping(properties, expected);
Expand Down
Loading

0 comments on commit b1e66fc

Please sign in to comment.