From 40d8992f097a5704ae1e546fafac799f4c42b2ed Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Tue, 18 Jan 2022 13:41:44 +0100 Subject: [PATCH] Consider stage balancing only in UniformNodeSelector Previously, new stage splits were balanced against all splits running on worker nodes (that includes splits from other stages and queries). However, this leads to non-deterministic scheduling where in certain situations some stage with long running splits might be fully scheduled on subset of worker nodes. This PR makes UniformNodeSelector only consider stage balancing on candidate nodes which have sufficient split queue length. --- .../scheduler/NodeSchedulerConfig.java | 20 ++++ .../scheduler/UniformNodeSelector.java | 66 ++++++++++--- .../scheduler/UniformNodeSelectorFactory.java | 4 + .../execution/TestNodeSchedulerConfig.java | 4 + .../TestSourcePartitionedScheduler.java | 92 ++++++++++++++++--- 5 files changed, 163 insertions(+), 23 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java index 21abb8c1c070..f6479f3bbc37 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java @@ -31,12 +31,18 @@ public enum NodeSchedulerPolicy UNIFORM, TOPOLOGY } + public enum SplitsBalancingPolicy + { + NODE, STAGE + } + private int minCandidates = 10; private boolean includeCoordinator = true; private int maxSplitsPerNode = 100; private int maxPendingSplitsPerTask = 10; private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM; private boolean optimizedLocalScheduling = true; + private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE; private int maxUnacknowledgedSplitsPerTask = 500; @NotNull @@ -132,6 +138,20 @@ public NodeSchedulerConfig setMaxUnacknowledgedSplitsPerTask(int maxUnacknowledg return this; } + @NotNull + public SplitsBalancingPolicy getSplitsBalancingPolicy() + { + return splitsBalancingPolicy; + } + + @Config("node-scheduler.splits-balancing-policy") + @ConfigDescription("Strategy for balancing new splits on worker nodes") + public NodeSchedulerConfig setSplitsBalancingPolicy(SplitsBalancingPolicy splitsBalancingPolicy) + { + this.splitsBalancingPolicy = splitsBalancingPolicy; + return this; + } + public boolean getOptimizedLocalScheduling() { return optimizedLocalScheduling; diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java index 73733206bbeb..cc6bcb0e9c2d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.collect.SetMultimap; @@ -24,6 +25,7 @@ import io.trino.execution.NodeTaskMap; import io.trino.execution.RemoteTask; import io.trino.execution.resourcegroups.IndexedPriorityQueue; +import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy; import io.trino.metadata.InternalNode; import io.trino.metadata.InternalNodeManager; import io.trino.metadata.Split; @@ -31,6 +33,8 @@ import io.trino.spi.SplitWeight; import io.trino.spi.TrinoException; +import javax.annotation.Nullable; + import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; @@ -68,6 +72,7 @@ public class UniformNodeSelector private final long maxSplitsWeightPerNode; private final long maxPendingSplitsWeightPerTask; private final int maxUnacknowledgedSplitsPerTask; + private final SplitsBalancingPolicy splitsBalancingPolicy; private final boolean optimizedLocalScheduling; public UniformNodeSelector( @@ -79,6 +84,7 @@ public UniformNodeSelector( long maxSplitsWeightPerNode, long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, + SplitsBalancingPolicy splitsBalancingPolicy, boolean optimizedLocalScheduling) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); @@ -90,6 +96,7 @@ public UniformNodeSelector( this.maxPendingSplitsWeightPerTask = maxPendingSplitsWeightPerTask; this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask; checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask); + this.splitsBalancingPolicy = requireNonNull(splitsBalancingPolicy, "splitsBalancingPolicy is null"); this.optimizedLocalScheduling = optimizedLocalScheduling; } @@ -171,21 +178,12 @@ public SplitPlacementResult computeAssignments(Set splits, List splits, List candidateNodes) + { + InternalNode chosenNode = null; + long minWeight = Long.MAX_VALUE; + + List freeNodes = getFreeNodesForStage(assignmentStats, candidateNodes); + switch (splitsBalancingPolicy) { + case STAGE: + for (InternalNode node : freeNodes) { + long queuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node); + if (queuedWeight <= minWeight) { + chosenNode = node; + minWeight = queuedWeight; + } + } + break; + case NODE: + for (InternalNode node : freeNodes) { + long totalSplitsWeight = assignmentStats.getTotalSplitsWeight(node); + if (totalSplitsWeight <= minWeight) { + chosenNode = node; + minWeight = totalSplitsWeight; + } + } + break; + default: + throw new UnsupportedOperationException("Unsupported split balancing policy " + splitsBalancingPolicy); + } + + return chosenNode; + } + + private List getFreeNodesForStage(NodeAssignmentStats assignmentStats, List nodes) + { + ImmutableList.Builder freeNodes = ImmutableList.builder(); + for (InternalNode node : nodes) { + if (assignmentStats.getTotalSplitsWeight(node) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) { + freeNodes.add(node); + } + } + return freeNodes.build(); + } + /** * The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and a minHeap * based on the number of splits that are assigned to them. Splits are redistributed, one at a time, from a maxNode to a diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java index aecd01b08be3..8b6ee4488f55 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java @@ -23,6 +23,7 @@ import io.trino.Session; import io.trino.connector.CatalogName; import io.trino.execution.NodeTaskMap; +import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy; import io.trino.metadata.InternalNode; import io.trino.metadata.InternalNodeManager; import io.trino.spi.HostAddress; @@ -59,6 +60,7 @@ public class UniformNodeSelectorFactory private final boolean includeCoordinator; private final long maxSplitsWeightPerNode; private final long maxPendingSplitsWeightPerTask; + private final SplitsBalancingPolicy splitsBalancingPolicy; private final boolean optimizedLocalScheduling; private final NodeTaskMap nodeTaskMap; private final Duration nodeMapMemoizationDuration; @@ -86,6 +88,7 @@ public UniformNodeSelectorFactory( this.nodeManager = nodeManager; this.minCandidates = config.getMinCandidates(); this.includeCoordinator = config.isIncludeCoordinator(); + this.splitsBalancingPolicy = config.getSplitsBalancingPolicy(); this.optimizedLocalScheduling = config.getOptimizedLocalScheduling(); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); int maxSplitsPerNode = config.getMaxSplitsPerNode(); @@ -122,6 +125,7 @@ public NodeSelector createNodeSelector(Session session, Optional ca maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, getMaxUnacknowledgedSplitsPerTask(session), + splitsBalancingPolicy, optimizedLocalScheduling); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java index 257bb2cec485..9181a7fe88f7 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java @@ -23,6 +23,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM; +import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE; public class TestNodeSchedulerConfig { @@ -36,6 +37,7 @@ public void testDefaults() .setMaxPendingSplitsPerTask(10) .setMaxUnacknowledgedSplitsPerTask(500) .setIncludeCoordinator(true) + .setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE) .setOptimizedLocalScheduling(true)); } @@ -49,6 +51,7 @@ public void testExplicitPropertyMappings() .put("node-scheduler.max-pending-splits-per-task", "11") .put("node-scheduler.max-splits-per-node", "101") .put("node-scheduler.max-unacknowledged-splits-per-task", "501") + .put("node-scheduler.splits-balancing-policy", "node") .put("node-scheduler.optimized-local-scheduling", "false") .build(); @@ -59,6 +62,7 @@ public void testExplicitPropertyMappings() .setMaxPendingSplitsPerTask(11) .setMaxUnacknowledgedSplitsPerTask(501) .setMinCandidates(11) + .setSplitsBalancingPolicy(NODE) .setOptimizedLocalScheduling(false); assertFullMapping(properties, expected); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java index ea9fdd20526b..017e665ba2ef 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java @@ -31,6 +31,7 @@ import io.trino.execution.StageId; import io.trino.execution.TableExecuteContextManager; import io.trino.execution.TableInfo; +import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy; import io.trino.failuredetector.NoOpFailureDetector; import io.trino.metadata.InMemoryNodeManager; import io.trino.metadata.InternalNode; @@ -82,6 +83,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE; +import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.STAGE; import static io.trino.execution.scheduler.PipelinedStageExecution.createPipelinedStageExecution; import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL; import static io.trino.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler; @@ -152,7 +155,7 @@ public void testScheduleNoSplits() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(0, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(0, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1, STAGE); ScheduleResult scheduleResult = scheduler.schedule(); @@ -169,7 +172,7 @@ public void testScheduleSplitsOneAtATime() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1, STAGE); for (int i = 0; i < 60; i++) { ScheduleResult scheduleResult = scheduler.schedule(); @@ -207,7 +210,7 @@ public void testScheduleSplitsBatched() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 7); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 7, STAGE); for (int i = 0; i <= (60 / 7); i++) { ScheduleResult scheduleResult = scheduler.schedule(); @@ -245,7 +248,7 @@ public void testScheduleSplitsBlock() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(80, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(80, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1, STAGE); // schedule first 60 splits, which will cause the scheduler to block for (int i = 0; i <= 60; i++) { @@ -312,7 +315,7 @@ public void testScheduleSlowSplitSource() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(queuedSplitSource, stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(queuedSplitSource, stage, nodeManager, nodeTaskMap, 1, STAGE); // schedule with no splits - will block ScheduleResult scheduleResult = scheduler.schedule(); @@ -350,7 +353,7 @@ public void testNoNodes() } @Test - public void testBalancedSplitAssignment() + public void testWorkerBalancedSplitAssignment() { // use private node manager so we can add a node later InMemoryNodeManager nodeManager = new InMemoryNodeManager(); @@ -363,7 +366,7 @@ public void testBalancedSplitAssignment() // Schedule 15 splits - there are 3 nodes, each node should get 5 splits PlanFragment firstPlan = createFragment(); StageExecution firstStage = createStageExecution(firstPlan, nodeTaskMap); - StageScheduler firstScheduler = getSourcePartitionedScheduler(createFixedSplitSource(15, TestingSplit::createRemoteSplit), firstStage, nodeManager, nodeTaskMap, 200); + StageScheduler firstScheduler = getSourcePartitionedScheduler(createFixedSplitSource(15, TestingSplit::createRemoteSplit), firstStage, nodeManager, nodeTaskMap, 200, NODE); ScheduleResult scheduleResult = firstScheduler.schedule(); assertEffectivelyFinished(scheduleResult, firstScheduler); @@ -382,7 +385,7 @@ public void testBalancedSplitAssignment() // Schedule 5 splits in another query. Since the new node does not have any splits, all 5 splits are assigned to the new node PlanFragment secondPlan = createFragment(); StageExecution secondStage = createStageExecution(secondPlan, nodeTaskMap); - StageScheduler secondScheduler = getSourcePartitionedScheduler(createFixedSplitSource(5, TestingSplit::createRemoteSplit), secondStage, nodeManager, nodeTaskMap, 200); + StageScheduler secondScheduler = getSourcePartitionedScheduler(createFixedSplitSource(5, TestingSplit::createRemoteSplit), secondStage, nodeManager, nodeTaskMap, 200, NODE); scheduleResult = secondScheduler.schedule(); assertEffectivelyFinished(scheduleResult, secondScheduler); @@ -396,6 +399,71 @@ public void testBalancedSplitAssignment() secondStage.abort(); } + @Test + public void testStageBalancedSplitAssignment() + { + // use private node manager so we can add a node later + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + nodeManager.addNode(CONNECTOR_ID, + new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), + new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), + new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)); + NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); + + // Schedule 15 splits - there are 3 nodes, each node should get 5 splits + PlanFragment firstPlan = createFragment(); + StageExecution firstStage = createStageExecution(firstPlan, nodeTaskMap); + QueuedSplitSource firstSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit); + StageScheduler firstScheduler = getSourcePartitionedScheduler(firstSplitSource, firstStage, nodeManager, nodeTaskMap, 200, STAGE); + firstSplitSource.addSplits(15); + + ScheduleResult scheduleResult = firstScheduler.schedule(); + assertTrue(scheduleResult.getBlocked().isDone()); + assertEquals(scheduleResult.getNewTasks().size(), 3); + assertEquals(firstStage.getAllTasks().size(), 3); + for (RemoteTask remoteTask : firstStage.getAllTasks()) { + PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); + assertEquals(splitsInfo.getCount(), 5); + } + + // Add new node + InternalNode additionalNode = new InternalNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN, false); + nodeManager.addNode(CONNECTOR_ID, additionalNode); + + // Schedule 5 splits in first query. Since the new node does not have any splits, all 5 splits are assigned to the new node + firstSplitSource.addSplits(5); + firstSplitSource.close(); + scheduleResult = firstScheduler.schedule(); + assertEffectivelyFinished(scheduleResult, firstScheduler); + assertTrue(scheduleResult.getBlocked().isDone()); + assertEquals(scheduleResult.getNewTasks().size(), 1); + assertEquals(firstStage.getAllTasks().size(), 4); + for (RemoteTask remoteTask : firstStage.getAllTasks()) { + PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); + assertEquals(splitsInfo.getCount(), 5); + } + + // Add new node + InternalNode anotherAdditionalNode = new InternalNode("other5", URI.create("http://127.0.0.1:15"), NodeVersion.UNKNOWN, false); + nodeManager.addNode(CONNECTOR_ID, anotherAdditionalNode); + + // Schedule 5 splits in another query. New query should be balanced across all nodes + PlanFragment secondPlan = createFragment(); + StageExecution secondStage = createStageExecution(secondPlan, nodeTaskMap); + StageScheduler secondScheduler = getSourcePartitionedScheduler(createFixedSplitSource(5, TestingSplit::createRemoteSplit), secondStage, nodeManager, nodeTaskMap, 200, STAGE); + + scheduleResult = secondScheduler.schedule(); + assertEffectivelyFinished(scheduleResult, secondScheduler); + assertEquals(secondStage.getAllTasks().size(), 5); + for (RemoteTask remoteTask : secondStage.getAllTasks()) { + PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); + assertEquals(splitsInfo.getCount(), 1); + } + + firstStage.abort(); + secondStage.abort(); + } + @Test public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() { @@ -553,13 +621,15 @@ private StageScheduler getSourcePartitionedScheduler( StageExecution stage, InternalNodeManager nodeManager, NodeTaskMap nodeTaskMap, - int splitBatchSize) + int splitBatchSize, + SplitsBalancingPolicy splitsBalancingPolicy) { NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig() .setIncludeCoordinator(false) .setMaxSplitsPerNode(20) - .setMaxPendingSplitsPerTask(0); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(nodeManager, nodeSchedulerConfig, nodeTaskMap)); + .setMaxPendingSplitsPerTask(0) + .setSplitsBalancingPolicy(splitsBalancingPolicy); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(nodeManager, nodeSchedulerConfig, nodeTaskMap, new Duration(0, SECONDS))); SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session, Optional.of(CONNECTOR_ID)), stage::getAllTasks); return newSourcePartitionedSchedulerAsStageScheduler(