Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider stage balancing only in UniformNodeSelector #10660

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ public enum NodeSchedulerPolicy
UNIFORM, TOPOLOGY
}

public enum SplitsBalancingPolicy
{
NODE, STAGE
}

private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM;
private boolean optimizedLocalScheduling = true;
private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE;
private int maxUnacknowledgedSplitsPerTask = 500;

@NotNull
Expand Down Expand Up @@ -132,6 +138,20 @@ public NodeSchedulerConfig setMaxUnacknowledgedSplitsPerTask(int maxUnacknowledg
return this;
}

@NotNull
public SplitsBalancingPolicy getSplitsBalancingPolicy()
{
return splitsBalancingPolicy;
}

@Config("node-scheduler.splits-balancing-policy")
@ConfigDescription("Strategy for balancing new splits on worker nodes")
public NodeSchedulerConfig setSplitsBalancingPolicy(SplitsBalancingPolicy splitsBalancingPolicy)
{
this.splitsBalancingPolicy = splitsBalancingPolicy;
return this;
}

public boolean getOptimizedLocalScheduling()
{
return optimizedLocalScheduling;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
Expand All @@ -24,13 +25,16 @@
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.resourcegroups.IndexedPriorityQueue;
import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;

import javax.annotation.Nullable;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
Expand Down Expand Up @@ -68,6 +72,7 @@ public class UniformNodeSelector
private final long maxSplitsWeightPerNode;
private final long maxPendingSplitsWeightPerTask;
private final int maxUnacknowledgedSplitsPerTask;
private final SplitsBalancingPolicy splitsBalancingPolicy;
private final boolean optimizedLocalScheduling;

public UniformNodeSelector(
Expand All @@ -79,6 +84,7 @@ public UniformNodeSelector(
long maxSplitsWeightPerNode,
long maxPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
SplitsBalancingPolicy splitsBalancingPolicy,
boolean optimizedLocalScheduling)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
Expand All @@ -90,6 +96,7 @@ public UniformNodeSelector(
this.maxPendingSplitsWeightPerTask = maxPendingSplitsWeightPerTask;
this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask;
checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask);
this.splitsBalancingPolicy = requireNonNull(splitsBalancingPolicy, "splitsBalancingPolicy is null");
this.optimizedLocalScheduling = optimizedLocalScheduling;
}

Expand Down Expand Up @@ -171,21 +178,12 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
throw new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}

InternalNode chosenNode = null;
long minWeight = Long.MAX_VALUE;

for (InternalNode node : candidateNodes) {
long totalSplitsWeight = assignmentStats.getTotalSplitsWeight(node);
if (totalSplitsWeight < minWeight && totalSplitsWeight < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
chosenNode = node;
minWeight = totalSplitsWeight;
}
}
InternalNode chosenNode = chooseNodeForSplit(assignmentStats, candidateNodes);
if (chosenNode == null) {
// minWeight is guaranteed to be MAX_VALUE at this line
long minWeight = Long.MAX_VALUE;
for (InternalNode node : candidateNodes) {
long queuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node);
if (queuedWeight < minWeight && queuedWeight < maxPendingSplitsWeightPerTask && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
if (queuedWeight <= minWeight && queuedWeight < maxPendingSplitsWeightPerTask && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
chosenNode = node;
minWeight = queuedWeight;
}
Expand Down Expand Up @@ -226,6 +224,50 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splits, existingTasks, bucketNodeMap);
}

@Nullable
private InternalNode chooseNodeForSplit(NodeAssignmentStats assignmentStats, List<InternalNode> candidateNodes)
{
InternalNode chosenNode = null;
long minWeight = Long.MAX_VALUE;

List<InternalNode> freeNodes = getFreeNodesForStage(assignmentStats, candidateNodes);
switch (splitsBalancingPolicy) {
case STAGE:
for (InternalNode node : freeNodes) {
long queuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node);
if (queuedWeight <= minWeight) {
chosenNode = node;
minWeight = queuedWeight;
}
}
break;
case NODE:
for (InternalNode node : freeNodes) {
long totalSplitsWeight = assignmentStats.getTotalSplitsWeight(node);
if (totalSplitsWeight <= minWeight) {
chosenNode = node;
minWeight = totalSplitsWeight;
}
}
break;
default:
throw new UnsupportedOperationException("Unsupported split balancing policy " + splitsBalancingPolicy);
}

return chosenNode;
}

private List<InternalNode> getFreeNodesForStage(NodeAssignmentStats assignmentStats, List<InternalNode> nodes)
{
ImmutableList.Builder<InternalNode> freeNodes = ImmutableList.builder();
for (InternalNode node : nodes) {
if (assignmentStats.getTotalSplitsWeight(node) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) {
freeNodes.add(node);
}
}
return freeNodes.build();
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and a minHeap
* based on the number of splits that are assigned to them. Splits are redistributed, one at a time, from a maxNode to a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.Session;
import io.trino.connector.CatalogName;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.HostAddress;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class UniformNodeSelectorFactory
private final boolean includeCoordinator;
private final long maxSplitsWeightPerNode;
private final long maxPendingSplitsWeightPerTask;
private final SplitsBalancingPolicy splitsBalancingPolicy;
private final boolean optimizedLocalScheduling;
private final NodeTaskMap nodeTaskMap;
private final Duration nodeMapMemoizationDuration;
Expand Down Expand Up @@ -86,6 +88,7 @@ public UniformNodeSelectorFactory(
this.nodeManager = nodeManager;
this.minCandidates = config.getMinCandidates();
this.includeCoordinator = config.isIncludeCoordinator();
this.splitsBalancingPolicy = config.getSplitsBalancingPolicy();
this.optimizedLocalScheduling = config.getOptimizedLocalScheduling();
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
int maxSplitsPerNode = config.getMaxSplitsPerNode();
Expand Down Expand Up @@ -122,6 +125,7 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogName> ca
maxSplitsWeightPerNode,
maxPendingSplitsWeightPerTask,
getMaxUnacknowledgedSplitsPerTask(session),
splitsBalancingPolicy,
optimizedLocalScheduling);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;
import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE;

public class TestNodeSchedulerConfig
{
Expand All @@ -36,6 +37,7 @@ public void testDefaults()
.setMaxPendingSplitsPerTask(10)
.setMaxUnacknowledgedSplitsPerTask(500)
.setIncludeCoordinator(true)
.setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE)
.setOptimizedLocalScheduling(true));
}

Expand All @@ -49,6 +51,7 @@ public void testExplicitPropertyMappings()
.put("node-scheduler.max-pending-splits-per-task", "11")
.put("node-scheduler.max-splits-per-node", "101")
.put("node-scheduler.max-unacknowledged-splits-per-task", "501")
.put("node-scheduler.splits-balancing-policy", "node")
.put("node-scheduler.optimized-local-scheduling", "false")
.build();

Expand All @@ -59,6 +62,7 @@ public void testExplicitPropertyMappings()
.setMaxPendingSplitsPerTask(11)
.setMaxUnacknowledgedSplitsPerTask(501)
.setMinCandidates(11)
.setSplitsBalancingPolicy(NODE)
.setOptimizedLocalScheduling(false);

assertFullMapping(properties, expected);
Expand Down
Loading