From 1eba1f372077d448e4e9b03a281cd3b7e0673302 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 9 May 2023 18:13:37 +0200 Subject: [PATCH] Prioritize non-speculative tasks in BinPackingNodeAllocatorService --- .../BinPackingNodeAllocatorService.java | 82 ++++++++- ...ventDrivenFaultTolerantQueryScheduler.java | 6 +- .../execution/scheduler/NodeAllocator.java | 4 +- .../TestBinPackingNodeAllocator.java | 165 ++++++++++++------ 4 files changed, 192 insertions(+), 65 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 077d87d7e13..9f3fba8ff42 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -44,6 +44,7 @@ import javax.inject.Inject; import java.time.Duration; +import java.util.Comparator; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; @@ -195,6 +196,15 @@ void refreshNodePoolMemoryInfos() @VisibleForTesting synchronized void processPendingAcquires() + { + processPendingAcquires(false); + boolean hasNonSpeculativePendingAcquires = pendingAcquires.stream().anyMatch(pendingAcquire -> !pendingAcquire.isSpeculative()); + if (!hasNonSpeculativePendingAcquires) { + processPendingAcquires(true); + } + } + + private void processPendingAcquires(boolean processSpeculative) { // synchronized only for sake manual triggering in test code. In production code it should only be called by single thread Iterator iterator = pendingAcquires.iterator(); @@ -204,7 +214,8 @@ synchronized void processPendingAcquires() nodePoolMemoryInfos.get(), fulfilledAcquires, scheduleOnCoordinator, - taskRuntimeMemoryEstimationOverhead); + taskRuntimeMemoryEstimationOverhead, + !processSpeculative); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones while (iterator.hasNext()) { PendingAcquire pendingAcquire = iterator.next(); @@ -215,6 +226,10 @@ synchronized void processPendingAcquires() continue; } + if (pendingAcquire.isSpeculative() != processSpeculative) { + continue; + } + BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire); switch (result.getStatus()) { case RESERVED: @@ -262,9 +277,9 @@ public NodeAllocator getNodeAllocator(Session session) } @Override - public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement) + public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative) { - BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes()); + BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes(), speculative); PendingAcquire pendingAcquire = new PendingAcquire(nodeRequirements, memoryRequirement, nodeLease, ticker); pendingAcquires.add(pendingAcquire); wakeupProcessPendingAcquires(); @@ -326,6 +341,11 @@ public void resetNoMatchingNodeFound() { noMatchingNodeStopwatch.reset(); } + + public boolean isSpeculative() + { + return lease.isSpeculative(); + } } private class BinPackingNodeLease @@ -335,10 +355,12 @@ private class BinPackingNodeLease private final AtomicBoolean released = new AtomicBoolean(); private final long memoryLease; private final AtomicReference taskId = new AtomicReference<>(); + private final AtomicBoolean speculative; - private BinPackingNodeLease(long memoryLease) + private BinPackingNodeLease(long memoryLease, boolean speculative) { this.memoryLease = memoryLease; + this.speculative = new AtomicBoolean(speculative); } @Override @@ -370,6 +392,21 @@ public void attachTaskId(TaskId taskId) } } + @Override + public void setSpeculative(boolean speculative) + { + checkArgument(!speculative, "cannot make non-speculative task speculative"); + boolean changed = this.speculative.compareAndSet(true, false); + if (changed) { + wakeupProcessPendingAcquires(); + } + } + + public boolean isSpeculative() + { + return speculative.get(); + } + public Optional getAttachedTaskId() { return Optional.ofNullable(this.taskId.get()); @@ -400,8 +437,10 @@ private static class BinPackingSimulation { private final NodesSnapshot nodesSnapshot; private final List allNodesSorted; + private final boolean ignoreAcquiredSpeculative; private final Map nodesRemainingMemory; private final Map nodesRemainingMemoryRuntimeAdjusted; + private final Map speculativeMemoryReserved; private final Map nodeMemoryPoolInfos; private final boolean scheduleOnCoordinator; @@ -411,13 +450,15 @@ public BinPackingSimulation( Map nodeMemoryPoolInfos, Set fulfilledAcquires, boolean scheduleOnCoordinator, - DataSize taskRuntimeMemoryEstimationOverhead) + DataSize taskRuntimeMemoryEstimationOverhead, + boolean ignoreAcquiredSpeculative) { this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null"); // use same node ordering for each simulation this.allNodesSorted = nodesSnapshot.getAllNodes().stream() .sorted(comparing(InternalNode::getNodeIdentifier)) .collect(toImmutableList()); + this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative; requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null"); this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos); @@ -437,11 +478,25 @@ public BinPackingSimulation( Map preReservedMemory = new HashMap<>(); SetMultimap fulfilledAcquiresByNode = HashMultimap.create(); for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) { + if (ignoreAcquiredSpeculative && fulfilledAcquire.isSpeculative()) { + continue; + } InternalNode node = fulfilledAcquire.getAssignedNode(); fulfilledAcquiresByNode.put(node.getNodeIdentifier(), fulfilledAcquire); preReservedMemory.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease()); } + speculativeMemoryReserved = new HashMap<>(); + if (ignoreAcquiredSpeculative) { + for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) { + if (!fulfilledAcquire.isSpeculative()) { + continue; + } + InternalNode node = fulfilledAcquire.getAssignedNode(); + speculativeMemoryReserved.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease()); + } + } + nodesRemainingMemory = new HashMap<>(); for (InternalNode node : nodesSnapshot.getAllNodes()) { MemoryPoolInfo memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier()); @@ -505,8 +560,12 @@ public ReserveResult tryReserve(PendingAcquire acquire) return ReserveResult.NONE_MATCHING; } + Comparator comparator = comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier())); + if (ignoreAcquiredSpeculative) { + comparator = resolveTiesWithSpeculativeMemory(comparator); + } InternalNode selectedNode = candidates.stream() - .max(comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier()))) + .max(comparator) .orElseThrow(); if (nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.getNodeIdentifier())) { @@ -524,13 +583,22 @@ public ReserveResult tryReserve(PendingAcquire acquire) // If selected node cannot be used right now, select best one ignoring runtime memory usage and reserve space there // for later use. This is important from algorithm liveliness perspective. If we did not reserve space for a task which // is too big to be scheduled right now, it could be starved by smaller tasks coming later. + Comparator fallbackComparator = comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())); + if (ignoreAcquiredSpeculative) { + fallbackComparator = resolveTiesWithSpeculativeMemory(fallbackComparator); + } InternalNode fallbackNode = candidates.stream() - .max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier()))) + .max(fallbackComparator) .orElseThrow(); subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease()); return ReserveResult.NOT_ENOUGH_RESOURCES_NOW; } + private Comparator resolveTiesWithSpeculativeMemory(Comparator comparator) + { + return comparator.thenComparing(node -> -speculativeMemoryReserved.getOrDefault(node.getNodeIdentifier(), 0L)); + } + private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease) { nodesRemainingMemoryRuntimeAdjusted.compute( diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 5796a502265..a832aeea5bc 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -921,7 +921,7 @@ private void scheduleTasks() continue; } MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId); - NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory()); + NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.isSpeculative()); lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor); preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative())); @@ -1146,7 +1146,9 @@ public void onSplitAssignment(SplitAssignmentEvent event) PreSchedulingTaskContext context = preSchedulingTaskContexts.get(prioritizedTask.task()); if (context != null) { // task is already waiting for node or for sink instance handle - context.setSpeculative(prioritizedTask.isSpeculative()); // update speculative flag + // update speculative flag + context.setSpeculative(prioritizedTask.isSpeculative()); + context.getNodeLease().setSpeculative(prioritizedTask.isSpeculative()); return; } schedulingQueue.addOrUpdate(prioritizedTask); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java index 6c105daf20c..dfd889c1842 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java @@ -29,7 +29,7 @@ public interface NodeAllocator * * It is obligatory for the calling party to release all the leases they obtained via {@link NodeLease#release()}. */ - NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement); + NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative); @Override void close(); @@ -40,6 +40,8 @@ interface NodeLease default void attachTaskId(TaskId taskId) {} + void setSpeculative(boolean speculative); + void release(); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java index 06b2d27f730..ab1e8c716c3 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java @@ -150,19 +150,19 @@ public void testAllocateSimple() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first two allocations should not block - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); // same for subsequent two allocation (each task requires 32GB and we have 2 nodes with 64GB each) - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire4, NODE_2); // 5th allocation should block - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire5); // release acquire2 which uses @@ -174,7 +174,7 @@ public void testAllocateSimple() }); // try to acquire one more node (should block) - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire6); // add new node @@ -198,25 +198,25 @@ public void testAllocateDifferentSizes() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_2); - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire5, NODE_1); - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire6, NODE_2); // each of the nodes is filled in with 32+16+16 // try allocate 32 and 16 - NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire7); - NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire8); // free 16MB on NODE_1; @@ -244,25 +244,25 @@ public void testAllocateDifferentSizesOpportunisticAcquisition() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_2); - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire5, NODE_1); - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire6, NODE_2); // each of the nodes is filled in with 32+16+16 // try to allocate 32 and 16 - NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire7); - NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire8); // free 32MB on NODE_2; @@ -285,15 +285,15 @@ public void testAllocateReleaseBeforeAcquired() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first two allocations should not block - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_1); // another two should block - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire3); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire4); // releasing a blocked one should not unblock anything @@ -314,7 +314,7 @@ public void testNoMatchingNodeAvailable() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // request a node with specific catalog (not present) - NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquireNoMatching); ticker.increment(59, TimeUnit.SECONDS); // still below timeout nodeAllocatorService.processPendingAcquires(); @@ -328,11 +328,11 @@ public void testNoMatchingNodeAvailable() nodeManager.addNodes(NODE_2); // we should be able to acquire the node now - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertAcquired(acquire1, NODE_2); // acquiring one more should block (only one acquire fits a node as we request 64GB) - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquire2); // remove node with catalog @@ -360,8 +360,8 @@ public void testNoMatchingNodeAvailableTimeoutReset() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // request a node with specific catalog (not present) - NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); - NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); + NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquireNoMatching1); assertNotAcquired(acquireNoMatching2); @@ -407,7 +407,7 @@ public void testRemoveAcquiredNode() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); // remove acquired node @@ -426,17 +426,17 @@ public void testAllocateNodeWithAddressRequirements() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_2); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); // no more place on NODE_2 assertNotAcquired(acquire3); // requests for other node are still good - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire4, NODE_1); // release some space on NODE_2 @@ -454,7 +454,7 @@ public void testAllocateNotEnoughRuntimeMemory() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -465,27 +465,27 @@ public void testAllocateNotEnoughRuntimeMemory() nodeAllocatorService.refreshNodePoolMemoryInfos(); // second allocation of 32GB should go to another node - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); acquire2.attachTaskId(taskId(2)); // third allocation of 32GB should also use NODE_2 as there is not enough runtime memory on NODE_1 // second allocation of 32GB should go to another node - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire3, NODE_2); acquire3.attachTaskId(taskId(3)); // fourth allocation of 16 should fit on NODE_1 - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_1); acquire4.attachTaskId(taskId(4)); // fifth allocation of 16 should no longer fit on NODE_1. There is 16GB unreserved but only 15GB taking runtime usage into account - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire5); // even tiny allocations should not fit now - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), false); assertNotAcquired(acquire6); // if memory usage decreases on NODE_1 the pending 16GB allocation should complete @@ -511,7 +511,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -522,7 +522,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // global (greater) memory usage should take precedence - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } @@ -530,7 +530,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when global memory usage on node is smaller than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -541,7 +541,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // per-task (greater) memory usage should take precedence - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } @@ -549,7 +549,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when per-task memory usage not present at all try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -558,7 +558,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // global memory usage should be used (not per-task usage) - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } } @@ -572,10 +572,10 @@ public void testSpaceReservedOnPrimaryNodeIfNoNodeWithEnoughRuntimeMemoryAvailab // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // reserve 32GB on NODE_1 and 16GB on NODE_2 - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire2, NODE_2); acquire2.attachTaskId(taskId(2)); @@ -591,11 +591,11 @@ public void testSpaceReservedOnPrimaryNodeIfNoNodeWithEnoughRuntimeMemoryAvailab // try to allocate 32GB task // it will not fit on neither of nodes. space should be reserved on NODE_2 as it has more memory available // when you do not take runtime memory into account - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire3); // to check that is the case try to allocate 20GB; NODE_1 should be picked - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(20, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(20, GIGABYTE), false); assertAcquired(acquire4, NODE_1); acquire4.attachTaskId(taskId(2)); } @@ -610,7 +610,7 @@ public void testAllocateWithRuntimeMemoryEstimateOverhead() // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // allocated 32GB - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -621,7 +621,7 @@ public void testAllocateWithRuntimeMemoryEstimateOverhead() nodeAllocatorService.refreshNodePoolMemoryInfos(); // including overhead node runtime usage is 30+4 = 34GB so another 32GB task will not fit - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); // decrease runtime usage to 28GB @@ -645,12 +645,67 @@ public void testStressAcquireRelease() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { for (int i = 0; i < 10_000_000; ++i) { - NodeAllocator.NodeLease lease = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease lease = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); lease.release(); } } } + @Test(timeOut = TEST_TIMEOUT) + public void testAllocateSpeculative() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(NODE_1, NODE_2); + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + // allocate two speculative tasks + NodeAllocator.NodeLease acquireSpeculative1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(64, GIGABYTE), true); + assertAcquired(acquireSpeculative1, NODE_1); + NodeAllocator.NodeLease acquireSpeculative2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), true); + assertAcquired(acquireSpeculative2, NODE_2); + + // non-speculative tasks should still get node + NodeAllocator.NodeLease acquireNonSpeculative1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(64, GIGABYTE), false); + assertAcquired(acquireNonSpeculative1, NODE_2); + NodeAllocator.NodeLease acquireNonSpeculative2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertAcquired(acquireNonSpeculative2, NODE_1); + + // new speculative task will not fit (even tiny one) + NodeAllocator.NodeLease acquireSpeculative3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), true); + assertNotAcquired(acquireSpeculative3); + + // if you switch it to non-speculative it will schedule + acquireSpeculative3.setSpeculative(false); + assertAcquired(acquireSpeculative3, NODE_1); + + // release all speculative tasks + acquireSpeculative1.release(); + acquireSpeculative2.release(); + acquireSpeculative3.release(); + + // we have 32G free on NODE_1 now + NodeAllocator.NodeLease acquireNonSpeculative4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertAcquired(acquireNonSpeculative4, NODE_1); + + // no place for speculative task + NodeAllocator.NodeLease acquireSpeculative4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), true); + assertNotAcquired(acquireSpeculative4); + + // no place for another non-speculative task + NodeAllocator.NodeLease acquireNonSpeculative5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertNotAcquired(acquireNonSpeculative5); + + // release acquireNonSpeculative4 - a non-speculative task should be scheduled before speculative one + acquireNonSpeculative4.release(); + assertAcquired(acquireNonSpeculative5); + assertNotAcquired(acquireSpeculative4); + + // on subsequent release speculative task will get node + acquireNonSpeculative5.release(); + assertAcquired(acquireSpeculative4); + } + } + private TaskId taskId(int partition) { return new TaskId(new StageId("test_query", 0), partition, 0);