From 71e02ce0eb63f0fe1a362fb4dd75447e2a01dfe4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 9 May 2023 18:31:40 +0200 Subject: [PATCH 1/3] Remove allocatedMemory field The allocatedMemory memory map can be easily computed based on fulfilledAcquires in BinPackingSimulation constructor. It does not add significant amount of computation as we are iterating fulfilledAcquires there anyway. Removal of allocatedMemory simplify state of BinPackingNodeAllocatorService making it less prone to concurrency related issues. --- .../BinPackingNodeAllocatorService.java | 36 ++----------------- 1 file changed, 2 insertions(+), 34 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 60bdabb6ece3..077d87d7e13d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -51,9 +51,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; @@ -66,10 +64,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Sets.newConcurrentHashSet; -import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor; @@ -104,7 +100,6 @@ public class BinPackingNodeAllocatorService private final DataSize taskRuntimeMemoryEstimationOverhead; private final Ticker ticker; - private final ConcurrentMap allocatedMemory = new ConcurrentHashMap<>(); private final Deque pendingAcquires = new ConcurrentLinkedDeque<>(); private final Set fulfilledAcquires = newConcurrentHashSet(); private final Duration allowedNoMatchingNodePeriod; @@ -208,7 +203,6 @@ synchronized void processPendingAcquires() nodeManager.getActiveNodesSnapshot(), nodePoolMemoryInfos.get(), fulfilledAcquires, - allocatedMemory, scheduleOnCoordinator, taskRuntimeMemoryEstimationOverhead); @@ -226,12 +220,9 @@ synchronized void processPendingAcquires() case RESERVED: InternalNode reservedNode = result.getNode().orElseThrow(); fulfilledAcquires.add(pendingAcquire.getLease()); - updateAllocatedMemory(reservedNode, pendingAcquire.getMemoryLease()); pendingAcquire.getFuture().set(reservedNode); if (pendingAcquire.getFuture().isCancelled()) { // completing future was unsuccessful - request was cancelled in the meantime - pendingAcquire.getLease().deallocateMemory(reservedNode); - fulfilledAcquires.remove(pendingAcquire.getLease()); // run once again when we are done @@ -288,20 +279,6 @@ public void close() // and that can be done before all leases are yet returned from running (soon to be failed) tasks. } - private void updateAllocatedMemory(InternalNode node, long delta) - { - allocatedMemory.compute( - node.getNodeIdentifier(), - (key, oldValue) -> { - verify(delta > 0 || (oldValue != null && oldValue >= -delta), "tried to release more than allocated (%s vs %s) for node %s", -delta, oldValue, key); - long newValue = oldValue == null ? delta : oldValue + delta; - if (newValue == 0) { - return null; // delete - } - return newValue; - }); - } - private static class PendingAcquire { private final NodeRequirements nodeRequirements; @@ -356,7 +333,6 @@ private class BinPackingNodeLease { private final SettableFuture node = SettableFuture.create(); private final AtomicBoolean released = new AtomicBoolean(); - private final AtomicBoolean memoryDeallocated = new AtomicBoolean(); private final long memoryLease; private final AtomicReference taskId = new AtomicReference<>(); @@ -410,7 +386,6 @@ public void release() if (released.compareAndSet(false, true)) { node.cancel(true); if (node.isDone() && !node.isCancelled()) { - deallocateMemory(getFutureValue(node)); checkState(fulfilledAcquires.remove(this), "node lease %s not found in fulfilledAcquires %s", this, fulfilledAcquires); wakeupProcessPendingAcquires(); } @@ -419,13 +394,6 @@ public void release() throw new IllegalStateException("Node " + node + " already released"); } } - - public void deallocateMemory(InternalNode node) - { - if (memoryDeallocated.compareAndSet(false, true)) { - updateAllocatedMemory(node, -memoryLease); - } - } } private static class BinPackingSimulation @@ -442,7 +410,6 @@ public BinPackingSimulation( NodesSnapshot nodesSnapshot, Map nodeMemoryPoolInfos, Set fulfilledAcquires, - Map preReservedMemory, boolean scheduleOnCoordinator, DataSize taskRuntimeMemoryEstimationOverhead) { @@ -455,7 +422,6 @@ public BinPackingSimulation( requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null"); this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos); - requireNonNull(preReservedMemory, "preReservedMemory is null"); this.scheduleOnCoordinator = scheduleOnCoordinator; Map> realtimeTasksMemoryPerNode = new HashMap<>(); @@ -468,10 +434,12 @@ public BinPackingSimulation( realtimeTasksMemoryPerNode.put(node.getNodeIdentifier(), memoryPoolInfo.getTaskMemoryReservations()); } + Map preReservedMemory = new HashMap<>(); SetMultimap fulfilledAcquiresByNode = HashMultimap.create(); for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) { InternalNode node = fulfilledAcquire.getAssignedNode(); fulfilledAcquiresByNode.put(node.getNodeIdentifier(), fulfilledAcquire); + preReservedMemory.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease()); } nodesRemainingMemory = new HashMap<>(); From 4e446b5fb18fb052d170b592e4109e13f93c10ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 2 May 2023 15:57:25 +0200 Subject: [PATCH 2/3] Priritize node processing for non-speculative tasks in EventDrivenFaultTolerantQueryScheduler --- ...ventDrivenFaultTolerantQueryScheduler.java | 47 +++++++++++++++++-- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index b4733a6e120d..5796a5022658 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -881,10 +881,35 @@ private StageId getStageId(PlanFragmentId fragmentId) private void scheduleTasks() { - long tasksWaitingForNode = preSchedulingTaskContexts.values().stream().filter(context -> !context.getNodeLease().getNode().isDone()).count(); + long speculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream() + .filter(context -> !context.getNodeLease().getNode().isDone()) + .filter(PreSchedulingTaskContext::isSpeculative) + .count(); + + long nonSpeculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream() + .filter(context -> !context.getNodeLease().getNode().isDone()) + .filter(preSchedulingTaskContext -> !preSchedulingTaskContext.isSpeculative()) + .count(); + + while (!schedulingQueue.isEmpty()) { + if (nonSpeculativeTasksWaitingForNode >= maxTasksWaitingForNode) { + break; + } + + PrioritizedScheduledTask scheduledTask = schedulingQueue.peekOrThrow(); + + if (scheduledTask.isSpeculative() && nonSpeculativeTasksWaitingForNode > 0) { + // do not handle any speculative tasks if there are non-speculative waiting + break; + } + + if (scheduledTask.isSpeculative() && speculativeTasksWaitingForNode >= maxTasksWaitingForNode) { + // too many speculative tasks waiting for node + break; + } + + verify(schedulingQueue.pollOrThrow().equals(scheduledTask)); - while (tasksWaitingForNode < maxTasksWaitingForNode && !schedulingQueue.isEmpty()) { - PrioritizedScheduledTask scheduledTask = schedulingQueue.pollOrThrow(); StageExecution stageExecution = getStageExecution(scheduledTask.task().stageId()); if (stageExecution.getState().isDone()) { continue; @@ -899,7 +924,13 @@ private void scheduleTasks() NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory()); lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor); preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative())); - tasksWaitingForNode++; + + if (scheduledTask.isSpeculative()) { + speculativeTasksWaitingForNode++; + } + else { + nonSpeculativeTasksWaitingForNode++; + } } } @@ -2056,6 +2087,14 @@ public PrioritizedScheduledTask pollOrThrow() return prioritizedTask; } + public PrioritizedScheduledTask peekOrThrow() + { + IndexedPriorityQueue.Prioritized task = queue.peekPrioritized(); + checkState(task != null, "queue is empty"); + // negate priority to reverse operation we do in addOrUpdate + return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority())); + } + public void addOrUpdate(PrioritizedScheduledTask prioritizedTask) { IndexedPriorityQueue.Prioritized previousTask = queue.getPrioritized(prioritizedTask.task()); From dd474e8457bd74e7518609881f0b3fb385f1d049 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 9 May 2023 18:13:37 +0200 Subject: [PATCH 3/3] Prioritize non-speculative tasks in BinPackingNodeAllocatorService --- .../BinPackingNodeAllocatorService.java | 82 ++++++++- ...ventDrivenFaultTolerantQueryScheduler.java | 6 +- .../execution/scheduler/NodeAllocator.java | 4 +- .../TestBinPackingNodeAllocator.java | 165 ++++++++++++------ 4 files changed, 192 insertions(+), 65 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 077d87d7e13d..9f3fba8ff427 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -44,6 +44,7 @@ import javax.inject.Inject; import java.time.Duration; +import java.util.Comparator; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; @@ -195,6 +196,15 @@ void refreshNodePoolMemoryInfos() @VisibleForTesting synchronized void processPendingAcquires() + { + processPendingAcquires(false); + boolean hasNonSpeculativePendingAcquires = pendingAcquires.stream().anyMatch(pendingAcquire -> !pendingAcquire.isSpeculative()); + if (!hasNonSpeculativePendingAcquires) { + processPendingAcquires(true); + } + } + + private void processPendingAcquires(boolean processSpeculative) { // synchronized only for sake manual triggering in test code. In production code it should only be called by single thread Iterator iterator = pendingAcquires.iterator(); @@ -204,7 +214,8 @@ synchronized void processPendingAcquires() nodePoolMemoryInfos.get(), fulfilledAcquires, scheduleOnCoordinator, - taskRuntimeMemoryEstimationOverhead); + taskRuntimeMemoryEstimationOverhead, + !processSpeculative); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones while (iterator.hasNext()) { PendingAcquire pendingAcquire = iterator.next(); @@ -215,6 +226,10 @@ synchronized void processPendingAcquires() continue; } + if (pendingAcquire.isSpeculative() != processSpeculative) { + continue; + } + BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire); switch (result.getStatus()) { case RESERVED: @@ -262,9 +277,9 @@ public NodeAllocator getNodeAllocator(Session session) } @Override - public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement) + public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative) { - BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes()); + BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes(), speculative); PendingAcquire pendingAcquire = new PendingAcquire(nodeRequirements, memoryRequirement, nodeLease, ticker); pendingAcquires.add(pendingAcquire); wakeupProcessPendingAcquires(); @@ -326,6 +341,11 @@ public void resetNoMatchingNodeFound() { noMatchingNodeStopwatch.reset(); } + + public boolean isSpeculative() + { + return lease.isSpeculative(); + } } private class BinPackingNodeLease @@ -335,10 +355,12 @@ private class BinPackingNodeLease private final AtomicBoolean released = new AtomicBoolean(); private final long memoryLease; private final AtomicReference taskId = new AtomicReference<>(); + private final AtomicBoolean speculative; - private BinPackingNodeLease(long memoryLease) + private BinPackingNodeLease(long memoryLease, boolean speculative) { this.memoryLease = memoryLease; + this.speculative = new AtomicBoolean(speculative); } @Override @@ -370,6 +392,21 @@ public void attachTaskId(TaskId taskId) } } + @Override + public void setSpeculative(boolean speculative) + { + checkArgument(!speculative, "cannot make non-speculative task speculative"); + boolean changed = this.speculative.compareAndSet(true, false); + if (changed) { + wakeupProcessPendingAcquires(); + } + } + + public boolean isSpeculative() + { + return speculative.get(); + } + public Optional getAttachedTaskId() { return Optional.ofNullable(this.taskId.get()); @@ -400,8 +437,10 @@ private static class BinPackingSimulation { private final NodesSnapshot nodesSnapshot; private final List allNodesSorted; + private final boolean ignoreAcquiredSpeculative; private final Map nodesRemainingMemory; private final Map nodesRemainingMemoryRuntimeAdjusted; + private final Map speculativeMemoryReserved; private final Map nodeMemoryPoolInfos; private final boolean scheduleOnCoordinator; @@ -411,13 +450,15 @@ public BinPackingSimulation( Map nodeMemoryPoolInfos, Set fulfilledAcquires, boolean scheduleOnCoordinator, - DataSize taskRuntimeMemoryEstimationOverhead) + DataSize taskRuntimeMemoryEstimationOverhead, + boolean ignoreAcquiredSpeculative) { this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null"); // use same node ordering for each simulation this.allNodesSorted = nodesSnapshot.getAllNodes().stream() .sorted(comparing(InternalNode::getNodeIdentifier)) .collect(toImmutableList()); + this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative; requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null"); this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos); @@ -437,11 +478,25 @@ public BinPackingSimulation( Map preReservedMemory = new HashMap<>(); SetMultimap fulfilledAcquiresByNode = HashMultimap.create(); for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) { + if (ignoreAcquiredSpeculative && fulfilledAcquire.isSpeculative()) { + continue; + } InternalNode node = fulfilledAcquire.getAssignedNode(); fulfilledAcquiresByNode.put(node.getNodeIdentifier(), fulfilledAcquire); preReservedMemory.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease()); } + speculativeMemoryReserved = new HashMap<>(); + if (ignoreAcquiredSpeculative) { + for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) { + if (!fulfilledAcquire.isSpeculative()) { + continue; + } + InternalNode node = fulfilledAcquire.getAssignedNode(); + speculativeMemoryReserved.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease()); + } + } + nodesRemainingMemory = new HashMap<>(); for (InternalNode node : nodesSnapshot.getAllNodes()) { MemoryPoolInfo memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier()); @@ -505,8 +560,12 @@ public ReserveResult tryReserve(PendingAcquire acquire) return ReserveResult.NONE_MATCHING; } + Comparator comparator = comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier())); + if (ignoreAcquiredSpeculative) { + comparator = resolveTiesWithSpeculativeMemory(comparator); + } InternalNode selectedNode = candidates.stream() - .max(comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier()))) + .max(comparator) .orElseThrow(); if (nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.getNodeIdentifier())) { @@ -524,13 +583,22 @@ public ReserveResult tryReserve(PendingAcquire acquire) // If selected node cannot be used right now, select best one ignoring runtime memory usage and reserve space there // for later use. This is important from algorithm liveliness perspective. If we did not reserve space for a task which // is too big to be scheduled right now, it could be starved by smaller tasks coming later. + Comparator fallbackComparator = comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())); + if (ignoreAcquiredSpeculative) { + fallbackComparator = resolveTiesWithSpeculativeMemory(fallbackComparator); + } InternalNode fallbackNode = candidates.stream() - .max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier()))) + .max(fallbackComparator) .orElseThrow(); subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease()); return ReserveResult.NOT_ENOUGH_RESOURCES_NOW; } + private Comparator resolveTiesWithSpeculativeMemory(Comparator comparator) + { + return comparator.thenComparing(node -> -speculativeMemoryReserved.getOrDefault(node.getNodeIdentifier(), 0L)); + } + private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease) { nodesRemainingMemoryRuntimeAdjusted.compute( diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 5796a5022658..a832aeea5bc1 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -921,7 +921,7 @@ private void scheduleTasks() continue; } MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId); - NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory()); + NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.isSpeculative()); lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor); preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative())); @@ -1146,7 +1146,9 @@ public void onSplitAssignment(SplitAssignmentEvent event) PreSchedulingTaskContext context = preSchedulingTaskContexts.get(prioritizedTask.task()); if (context != null) { // task is already waiting for node or for sink instance handle - context.setSpeculative(prioritizedTask.isSpeculative()); // update speculative flag + // update speculative flag + context.setSpeculative(prioritizedTask.isSpeculative()); + context.getNodeLease().setSpeculative(prioritizedTask.isSpeculative()); return; } schedulingQueue.addOrUpdate(prioritizedTask); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java index 6c105daf20ce..dfd889c18428 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java @@ -29,7 +29,7 @@ public interface NodeAllocator * * It is obligatory for the calling party to release all the leases they obtained via {@link NodeLease#release()}. */ - NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement); + NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative); @Override void close(); @@ -40,6 +40,8 @@ interface NodeLease default void attachTaskId(TaskId taskId) {} + void setSpeculative(boolean speculative); + void release(); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java index 0e1b4d8ce304..ed354db13877 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java @@ -150,19 +150,19 @@ public void testAllocateSimple() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first two allocations should not block - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); // same for subsequent two allocation (each task requires 32GB and we have 2 nodes with 64GB each) - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire4, NODE_2); // 5th allocation should block - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire5); // release acquire2 which uses @@ -174,7 +174,7 @@ public void testAllocateSimple() }); // try to acquire one more node (should block) - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire6); // add new node @@ -198,25 +198,25 @@ public void testAllocateDifferentSizes() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_2); - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire5, NODE_1); - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire6, NODE_2); // each of the nodes is filled in with 32+16+16 // try allocate 32 and 16 - NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire7); - NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire8); // free 16MB on NODE_1; @@ -244,25 +244,25 @@ public void testAllocateDifferentSizesOpportunisticAcquisition() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_2); - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire5, NODE_1); - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire6, NODE_2); // each of the nodes is filled in with 32+16+16 // try to allocate 32 and 16 - NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire7); - NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire8); // free 32MB on NODE_2; @@ -285,15 +285,15 @@ public void testAllocateReleaseBeforeAcquired() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first two allocations should not block - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_1); // another two should block - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire3); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire4); // releasing a blocked one should not unblock anything @@ -314,7 +314,7 @@ public void testNoMatchingNodeAvailable() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // request a node with specific catalog (not present) - NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquireNoMatching); ticker.increment(59, TimeUnit.SECONDS); // still below timeout nodeAllocatorService.processPendingAcquires(); @@ -328,11 +328,11 @@ public void testNoMatchingNodeAvailable() nodeManager.addNodes(NODE_2); // we should be able to acquire the node now - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertAcquired(acquire1, NODE_2); // acquiring one more should block (only one acquire fits a node as we request 64GB) - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquire2); // remove node with catalog @@ -360,8 +360,8 @@ public void testNoMatchingNodeAvailableTimeoutReset() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // request a node with specific catalog (not present) - NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); - NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); + NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquireNoMatching1); assertNotAcquired(acquireNoMatching2); @@ -407,7 +407,7 @@ public void testRemoveAcquiredNode() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); // remove acquired node @@ -426,17 +426,17 @@ public void testAllocateNodeWithAddressRequirements() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_2); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); // no more place on NODE_2 assertNotAcquired(acquire3); // requests for other node are still good - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire4, NODE_1); // release some space on NODE_2 @@ -454,7 +454,7 @@ public void testAllocateNotEnoughRuntimeMemory() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -465,27 +465,27 @@ public void testAllocateNotEnoughRuntimeMemory() nodeAllocatorService.refreshNodePoolMemoryInfos(); // second allocation of 32GB should go to another node - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); acquire2.attachTaskId(taskId(2)); // third allocation of 32GB should also use NODE_2 as there is not enough runtime memory on NODE_1 // second allocation of 32GB should go to another node - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire3, NODE_2); acquire3.attachTaskId(taskId(3)); // fourth allocation of 16 should fit on NODE_1 - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_1); acquire4.attachTaskId(taskId(4)); // fifth allocation of 16 should no longer fit on NODE_1. There is 16GB unreserved but only 15GB taking runtime usage into account - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire5); // even tiny allocations should not fit now - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), false); assertNotAcquired(acquire6); // if memory usage decreases on NODE_1 the pending 16GB allocation should complete @@ -511,7 +511,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -522,7 +522,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // global (greater) memory usage should take precedence - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } @@ -530,7 +530,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when global memory usage on node is smaller than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -541,7 +541,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // per-task (greater) memory usage should take precedence - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } @@ -549,7 +549,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when per-task memory usage not present at all try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -558,7 +558,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // global memory usage should be used (not per-task usage) - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } } @@ -572,10 +572,10 @@ public void testSpaceReservedOnPrimaryNodeIfNoNodeWithEnoughRuntimeMemoryAvailab // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // reserve 32GB on NODE_1 and 16GB on NODE_2 - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire2, NODE_2); acquire2.attachTaskId(taskId(2)); @@ -591,11 +591,11 @@ public void testSpaceReservedOnPrimaryNodeIfNoNodeWithEnoughRuntimeMemoryAvailab // try to allocate 32GB task // it will not fit on neither of nodes. space should be reserved on NODE_2 as it has more memory available // when you do not take runtime memory into account - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire3); // to check that is the case try to allocate 20GB; NODE_1 should be picked - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(20, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(20, GIGABYTE), false); assertAcquired(acquire4, NODE_1); acquire4.attachTaskId(taskId(2)); } @@ -610,7 +610,7 @@ public void testAllocateWithRuntimeMemoryEstimateOverhead() // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // allocated 32GB - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -621,7 +621,7 @@ public void testAllocateWithRuntimeMemoryEstimateOverhead() nodeAllocatorService.refreshNodePoolMemoryInfos(); // including overhead node runtime usage is 30+4 = 34GB so another 32GB task will not fit - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); // decrease runtime usage to 28GB @@ -645,12 +645,67 @@ public void testStressAcquireRelease() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { for (int i = 0; i < 10_000_000; ++i) { - NodeAllocator.NodeLease lease = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease lease = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); lease.release(); } } } + @Test(timeOut = TEST_TIMEOUT) + public void testAllocateSpeculative() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(NODE_1, NODE_2); + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + // allocate two speculative tasks + NodeAllocator.NodeLease acquireSpeculative1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(64, GIGABYTE), true); + assertAcquired(acquireSpeculative1, NODE_1); + NodeAllocator.NodeLease acquireSpeculative2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), true); + assertAcquired(acquireSpeculative2, NODE_2); + + // non-speculative tasks should still get node + NodeAllocator.NodeLease acquireNonSpeculative1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(64, GIGABYTE), false); + assertAcquired(acquireNonSpeculative1, NODE_2); + NodeAllocator.NodeLease acquireNonSpeculative2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertAcquired(acquireNonSpeculative2, NODE_1); + + // new speculative task will not fit (even tiny one) + NodeAllocator.NodeLease acquireSpeculative3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), true); + assertNotAcquired(acquireSpeculative3); + + // if you switch it to non-speculative it will schedule + acquireSpeculative3.setSpeculative(false); + assertAcquired(acquireSpeculative3, NODE_1); + + // release all speculative tasks + acquireSpeculative1.release(); + acquireSpeculative2.release(); + acquireSpeculative3.release(); + + // we have 32G free on NODE_1 now + NodeAllocator.NodeLease acquireNonSpeculative4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertAcquired(acquireNonSpeculative4, NODE_1); + + // no place for speculative task + NodeAllocator.NodeLease acquireSpeculative4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), true); + assertNotAcquired(acquireSpeculative4); + + // no place for another non-speculative task + NodeAllocator.NodeLease acquireNonSpeculative5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertNotAcquired(acquireNonSpeculative5); + + // release acquireNonSpeculative4 - a non-speculative task should be scheduled before speculative one + acquireNonSpeculative4.release(); + assertAcquired(acquireNonSpeculative5); + assertNotAcquired(acquireSpeculative4); + + // on subsequent release speculative task will get node + acquireNonSpeculative5.release(); + assertAcquired(acquireSpeculative4); + } + } + private TaskId taskId(int partition) { return new TaskId(new StageId("test_query", 0), partition, 0);