diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java index 21abb8c1c070..f6479f3bbc37 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java @@ -31,12 +31,18 @@ public enum NodeSchedulerPolicy UNIFORM, TOPOLOGY } + public enum SplitsBalancingPolicy + { + NODE, STAGE + } + private int minCandidates = 10; private boolean includeCoordinator = true; private int maxSplitsPerNode = 100; private int maxPendingSplitsPerTask = 10; private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM; private boolean optimizedLocalScheduling = true; + private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE; private int maxUnacknowledgedSplitsPerTask = 500; @NotNull @@ -132,6 +138,20 @@ public NodeSchedulerConfig setMaxUnacknowledgedSplitsPerTask(int maxUnacknowledg return this; } + @NotNull + public SplitsBalancingPolicy getSplitsBalancingPolicy() + { + return splitsBalancingPolicy; + } + + @Config("node-scheduler.splits-balancing-policy") + @ConfigDescription("Strategy for balancing new splits on worker nodes") + public NodeSchedulerConfig setSplitsBalancingPolicy(SplitsBalancingPolicy splitsBalancingPolicy) + { + this.splitsBalancingPolicy = splitsBalancingPolicy; + return this; + } + public boolean getOptimizedLocalScheduling() { return optimizedLocalScheduling; diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java index 73733206bbeb..cc6bcb0e9c2d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.collect.SetMultimap; @@ -24,6 +25,7 @@ import io.trino.execution.NodeTaskMap; import io.trino.execution.RemoteTask; import io.trino.execution.resourcegroups.IndexedPriorityQueue; +import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy; import io.trino.metadata.InternalNode; import io.trino.metadata.InternalNodeManager; import io.trino.metadata.Split; @@ -31,6 +33,8 @@ import io.trino.spi.SplitWeight; import io.trino.spi.TrinoException; +import javax.annotation.Nullable; + import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; @@ -68,6 +72,7 @@ public class UniformNodeSelector private final long maxSplitsWeightPerNode; private final long maxPendingSplitsWeightPerTask; private final int maxUnacknowledgedSplitsPerTask; + private final SplitsBalancingPolicy splitsBalancingPolicy; private final boolean optimizedLocalScheduling; public UniformNodeSelector( @@ -79,6 +84,7 @@ public UniformNodeSelector( long maxSplitsWeightPerNode, long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, + SplitsBalancingPolicy splitsBalancingPolicy, boolean optimizedLocalScheduling) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); @@ -90,6 +96,7 @@ public UniformNodeSelector( this.maxPendingSplitsWeightPerTask = maxPendingSplitsWeightPerTask; this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask; checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask); + this.splitsBalancingPolicy = requireNonNull(splitsBalancingPolicy, "splitsBalancingPolicy is null"); this.optimizedLocalScheduling = optimizedLocalScheduling; } @@ -171,21 +178,12 @@ public SplitPlacementResult computeAssignments(Set splits, List splits, List candidateNodes) + { + InternalNode chosenNode = null; + long minWeight = Long.MAX_VALUE; + + List freeNodes = getFreeNodesForStage(assignmentStats, candidateNodes); + switch (splitsBalancingPolicy) { + case STAGE: + for (InternalNode node : freeNodes) { + long queuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node); + if (queuedWeight <= minWeight) { + chosenNode = node; + minWeight = queuedWeight; + } + } + break; + case NODE: + for (InternalNode node : freeNodes) { + long totalSplitsWeight = assignmentStats.getTotalSplitsWeight(node); + if (totalSplitsWeight <= minWeight) { + chosenNode = node; + minWeight = totalSplitsWeight; + } + } + break; + default: + throw new UnsupportedOperationException("Unsupported split balancing policy " + splitsBalancingPolicy); + } + + return chosenNode; + } + + private List getFreeNodesForStage(NodeAssignmentStats assignmentStats, List nodes) + { + ImmutableList.Builder freeNodes = ImmutableList.builder(); + for (InternalNode node : nodes) { + if (assignmentStats.getTotalSplitsWeight(node) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask) { + freeNodes.add(node); + } + } + return freeNodes.build(); + } + /** * The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and a minHeap * based on the number of splits that are assigned to them. Splits are redistributed, one at a time, from a maxNode to a diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java index aecd01b08be3..8b6ee4488f55 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java @@ -23,6 +23,7 @@ import io.trino.Session; import io.trino.connector.CatalogName; import io.trino.execution.NodeTaskMap; +import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy; import io.trino.metadata.InternalNode; import io.trino.metadata.InternalNodeManager; import io.trino.spi.HostAddress; @@ -59,6 +60,7 @@ public class UniformNodeSelectorFactory private final boolean includeCoordinator; private final long maxSplitsWeightPerNode; private final long maxPendingSplitsWeightPerTask; + private final SplitsBalancingPolicy splitsBalancingPolicy; private final boolean optimizedLocalScheduling; private final NodeTaskMap nodeTaskMap; private final Duration nodeMapMemoizationDuration; @@ -86,6 +88,7 @@ public UniformNodeSelectorFactory( this.nodeManager = nodeManager; this.minCandidates = config.getMinCandidates(); this.includeCoordinator = config.isIncludeCoordinator(); + this.splitsBalancingPolicy = config.getSplitsBalancingPolicy(); this.optimizedLocalScheduling = config.getOptimizedLocalScheduling(); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); int maxSplitsPerNode = config.getMaxSplitsPerNode(); @@ -122,6 +125,7 @@ public NodeSelector createNodeSelector(Session session, Optional ca maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, getMaxUnacknowledgedSplitsPerTask(session), + splitsBalancingPolicy, optimizedLocalScheduling); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java index 257bb2cec485..9181a7fe88f7 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java @@ -23,6 +23,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM; +import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE; public class TestNodeSchedulerConfig { @@ -36,6 +37,7 @@ public void testDefaults() .setMaxPendingSplitsPerTask(10) .setMaxUnacknowledgedSplitsPerTask(500) .setIncludeCoordinator(true) + .setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE) .setOptimizedLocalScheduling(true)); } @@ -49,6 +51,7 @@ public void testExplicitPropertyMappings() .put("node-scheduler.max-pending-splits-per-task", "11") .put("node-scheduler.max-splits-per-node", "101") .put("node-scheduler.max-unacknowledged-splits-per-task", "501") + .put("node-scheduler.splits-balancing-policy", "node") .put("node-scheduler.optimized-local-scheduling", "false") .build(); @@ -59,6 +62,7 @@ public void testExplicitPropertyMappings() .setMaxPendingSplitsPerTask(11) .setMaxUnacknowledgedSplitsPerTask(501) .setMinCandidates(11) + .setSplitsBalancingPolicy(NODE) .setOptimizedLocalScheduling(false); assertFullMapping(properties, expected); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java index ea9fdd20526b..017e665ba2ef 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java @@ -31,6 +31,7 @@ import io.trino.execution.StageId; import io.trino.execution.TableExecuteContextManager; import io.trino.execution.TableInfo; +import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy; import io.trino.failuredetector.NoOpFailureDetector; import io.trino.metadata.InMemoryNodeManager; import io.trino.metadata.InternalNode; @@ -82,6 +83,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE; +import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.STAGE; import static io.trino.execution.scheduler.PipelinedStageExecution.createPipelinedStageExecution; import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL; import static io.trino.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler; @@ -152,7 +155,7 @@ public void testScheduleNoSplits() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(0, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(0, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1, STAGE); ScheduleResult scheduleResult = scheduler.schedule(); @@ -169,7 +172,7 @@ public void testScheduleSplitsOneAtATime() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1, STAGE); for (int i = 0; i < 60; i++) { ScheduleResult scheduleResult = scheduler.schedule(); @@ -207,7 +210,7 @@ public void testScheduleSplitsBatched() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 7); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(60, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 7, STAGE); for (int i = 0; i <= (60 / 7); i++) { ScheduleResult scheduleResult = scheduler.schedule(); @@ -245,7 +248,7 @@ public void testScheduleSplitsBlock() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(80, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(createFixedSplitSource(80, TestingSplit::createRemoteSplit), stage, nodeManager, nodeTaskMap, 1, STAGE); // schedule first 60 splits, which will cause the scheduler to block for (int i = 0; i <= 60; i++) { @@ -312,7 +315,7 @@ public void testScheduleSlowSplitSource() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); StageExecution stage = createStageExecution(plan, nodeTaskMap); - StageScheduler scheduler = getSourcePartitionedScheduler(queuedSplitSource, stage, nodeManager, nodeTaskMap, 1); + StageScheduler scheduler = getSourcePartitionedScheduler(queuedSplitSource, stage, nodeManager, nodeTaskMap, 1, STAGE); // schedule with no splits - will block ScheduleResult scheduleResult = scheduler.schedule(); @@ -350,7 +353,7 @@ public void testNoNodes() } @Test - public void testBalancedSplitAssignment() + public void testWorkerBalancedSplitAssignment() { // use private node manager so we can add a node later InMemoryNodeManager nodeManager = new InMemoryNodeManager(); @@ -363,7 +366,7 @@ public void testBalancedSplitAssignment() // Schedule 15 splits - there are 3 nodes, each node should get 5 splits PlanFragment firstPlan = createFragment(); StageExecution firstStage = createStageExecution(firstPlan, nodeTaskMap); - StageScheduler firstScheduler = getSourcePartitionedScheduler(createFixedSplitSource(15, TestingSplit::createRemoteSplit), firstStage, nodeManager, nodeTaskMap, 200); + StageScheduler firstScheduler = getSourcePartitionedScheduler(createFixedSplitSource(15, TestingSplit::createRemoteSplit), firstStage, nodeManager, nodeTaskMap, 200, NODE); ScheduleResult scheduleResult = firstScheduler.schedule(); assertEffectivelyFinished(scheduleResult, firstScheduler); @@ -382,7 +385,7 @@ public void testBalancedSplitAssignment() // Schedule 5 splits in another query. Since the new node does not have any splits, all 5 splits are assigned to the new node PlanFragment secondPlan = createFragment(); StageExecution secondStage = createStageExecution(secondPlan, nodeTaskMap); - StageScheduler secondScheduler = getSourcePartitionedScheduler(createFixedSplitSource(5, TestingSplit::createRemoteSplit), secondStage, nodeManager, nodeTaskMap, 200); + StageScheduler secondScheduler = getSourcePartitionedScheduler(createFixedSplitSource(5, TestingSplit::createRemoteSplit), secondStage, nodeManager, nodeTaskMap, 200, NODE); scheduleResult = secondScheduler.schedule(); assertEffectivelyFinished(scheduleResult, secondScheduler); @@ -396,6 +399,71 @@ public void testBalancedSplitAssignment() secondStage.abort(); } + @Test + public void testStageBalancedSplitAssignment() + { + // use private node manager so we can add a node later + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + nodeManager.addNode(CONNECTOR_ID, + new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), + new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), + new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)); + NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); + + // Schedule 15 splits - there are 3 nodes, each node should get 5 splits + PlanFragment firstPlan = createFragment(); + StageExecution firstStage = createStageExecution(firstPlan, nodeTaskMap); + QueuedSplitSource firstSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit); + StageScheduler firstScheduler = getSourcePartitionedScheduler(firstSplitSource, firstStage, nodeManager, nodeTaskMap, 200, STAGE); + firstSplitSource.addSplits(15); + + ScheduleResult scheduleResult = firstScheduler.schedule(); + assertTrue(scheduleResult.getBlocked().isDone()); + assertEquals(scheduleResult.getNewTasks().size(), 3); + assertEquals(firstStage.getAllTasks().size(), 3); + for (RemoteTask remoteTask : firstStage.getAllTasks()) { + PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); + assertEquals(splitsInfo.getCount(), 5); + } + + // Add new node + InternalNode additionalNode = new InternalNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN, false); + nodeManager.addNode(CONNECTOR_ID, additionalNode); + + // Schedule 5 splits in first query. Since the new node does not have any splits, all 5 splits are assigned to the new node + firstSplitSource.addSplits(5); + firstSplitSource.close(); + scheduleResult = firstScheduler.schedule(); + assertEffectivelyFinished(scheduleResult, firstScheduler); + assertTrue(scheduleResult.getBlocked().isDone()); + assertEquals(scheduleResult.getNewTasks().size(), 1); + assertEquals(firstStage.getAllTasks().size(), 4); + for (RemoteTask remoteTask : firstStage.getAllTasks()) { + PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); + assertEquals(splitsInfo.getCount(), 5); + } + + // Add new node + InternalNode anotherAdditionalNode = new InternalNode("other5", URI.create("http://127.0.0.1:15"), NodeVersion.UNKNOWN, false); + nodeManager.addNode(CONNECTOR_ID, anotherAdditionalNode); + + // Schedule 5 splits in another query. New query should be balanced across all nodes + PlanFragment secondPlan = createFragment(); + StageExecution secondStage = createStageExecution(secondPlan, nodeTaskMap); + StageScheduler secondScheduler = getSourcePartitionedScheduler(createFixedSplitSource(5, TestingSplit::createRemoteSplit), secondStage, nodeManager, nodeTaskMap, 200, STAGE); + + scheduleResult = secondScheduler.schedule(); + assertEffectivelyFinished(scheduleResult, secondScheduler); + assertEquals(secondStage.getAllTasks().size(), 5); + for (RemoteTask remoteTask : secondStage.getAllTasks()) { + PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); + assertEquals(splitsInfo.getCount(), 1); + } + + firstStage.abort(); + secondStage.abort(); + } + @Test public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() { @@ -553,13 +621,15 @@ private StageScheduler getSourcePartitionedScheduler( StageExecution stage, InternalNodeManager nodeManager, NodeTaskMap nodeTaskMap, - int splitBatchSize) + int splitBatchSize, + SplitsBalancingPolicy splitsBalancingPolicy) { NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig() .setIncludeCoordinator(false) .setMaxSplitsPerNode(20) - .setMaxPendingSplitsPerTask(0); - NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(nodeManager, nodeSchedulerConfig, nodeTaskMap)); + .setMaxPendingSplitsPerTask(0) + .setSplitsBalancingPolicy(splitsBalancingPolicy); + NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(nodeManager, nodeSchedulerConfig, nodeTaskMap, new Duration(0, SECONDS))); SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session, Optional.of(CONNECTOR_ID)), stage::getAllTasks); return newSourcePartitionedSchedulerAsStageScheduler(