diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 60bdabb6ece..9f3fba8ff42 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -44,6 +44,7 @@ import javax.inject.Inject; import java.time.Duration; +import java.util.Comparator; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; @@ -51,9 +52,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; @@ -66,10 +65,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Sets.newConcurrentHashSet; -import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor; @@ -104,7 +101,6 @@ public class BinPackingNodeAllocatorService private final DataSize taskRuntimeMemoryEstimationOverhead; private final Ticker ticker; - private final ConcurrentMap allocatedMemory = new ConcurrentHashMap<>(); private final Deque pendingAcquires = new ConcurrentLinkedDeque<>(); private final Set fulfilledAcquires = newConcurrentHashSet(); private final Duration allowedNoMatchingNodePeriod; @@ -200,6 +196,15 @@ void refreshNodePoolMemoryInfos() @VisibleForTesting synchronized void processPendingAcquires() + { + processPendingAcquires(false); + boolean hasNonSpeculativePendingAcquires = pendingAcquires.stream().anyMatch(pendingAcquire -> !pendingAcquire.isSpeculative()); + if (!hasNonSpeculativePendingAcquires) { + processPendingAcquires(true); + } + } + + private void processPendingAcquires(boolean processSpeculative) { // synchronized only for sake manual triggering in test code. In production code it should only be called by single thread Iterator iterator = pendingAcquires.iterator(); @@ -208,9 +213,9 @@ synchronized void processPendingAcquires() nodeManager.getActiveNodesSnapshot(), nodePoolMemoryInfos.get(), fulfilledAcquires, - allocatedMemory, scheduleOnCoordinator, - taskRuntimeMemoryEstimationOverhead); + taskRuntimeMemoryEstimationOverhead, + !processSpeculative); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones while (iterator.hasNext()) { PendingAcquire pendingAcquire = iterator.next(); @@ -221,17 +226,18 @@ synchronized void processPendingAcquires() continue; } + if (pendingAcquire.isSpeculative() != processSpeculative) { + continue; + } + BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire); switch (result.getStatus()) { case RESERVED: InternalNode reservedNode = result.getNode().orElseThrow(); fulfilledAcquires.add(pendingAcquire.getLease()); - updateAllocatedMemory(reservedNode, pendingAcquire.getMemoryLease()); pendingAcquire.getFuture().set(reservedNode); if (pendingAcquire.getFuture().isCancelled()) { // completing future was unsuccessful - request was cancelled in the meantime - pendingAcquire.getLease().deallocateMemory(reservedNode); - fulfilledAcquires.remove(pendingAcquire.getLease()); // run once again when we are done @@ -271,9 +277,9 @@ public NodeAllocator getNodeAllocator(Session session) } @Override - public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement) + public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative) { - BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes()); + BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes(), speculative); PendingAcquire pendingAcquire = new PendingAcquire(nodeRequirements, memoryRequirement, nodeLease, ticker); pendingAcquires.add(pendingAcquire); wakeupProcessPendingAcquires(); @@ -288,20 +294,6 @@ public void close() // and that can be done before all leases are yet returned from running (soon to be failed) tasks. } - private void updateAllocatedMemory(InternalNode node, long delta) - { - allocatedMemory.compute( - node.getNodeIdentifier(), - (key, oldValue) -> { - verify(delta > 0 || (oldValue != null && oldValue >= -delta), "tried to release more than allocated (%s vs %s) for node %s", -delta, oldValue, key); - long newValue = oldValue == null ? delta : oldValue + delta; - if (newValue == 0) { - return null; // delete - } - return newValue; - }); - } - private static class PendingAcquire { private final NodeRequirements nodeRequirements; @@ -349,6 +341,11 @@ public void resetNoMatchingNodeFound() { noMatchingNodeStopwatch.reset(); } + + public boolean isSpeculative() + { + return lease.isSpeculative(); + } } private class BinPackingNodeLease @@ -356,13 +353,14 @@ private class BinPackingNodeLease { private final SettableFuture node = SettableFuture.create(); private final AtomicBoolean released = new AtomicBoolean(); - private final AtomicBoolean memoryDeallocated = new AtomicBoolean(); private final long memoryLease; private final AtomicReference taskId = new AtomicReference<>(); + private final AtomicBoolean speculative; - private BinPackingNodeLease(long memoryLease) + private BinPackingNodeLease(long memoryLease, boolean speculative) { this.memoryLease = memoryLease; + this.speculative = new AtomicBoolean(speculative); } @Override @@ -394,6 +392,21 @@ public void attachTaskId(TaskId taskId) } } + @Override + public void setSpeculative(boolean speculative) + { + checkArgument(!speculative, "cannot make non-speculative task speculative"); + boolean changed = this.speculative.compareAndSet(true, false); + if (changed) { + wakeupProcessPendingAcquires(); + } + } + + public boolean isSpeculative() + { + return speculative.get(); + } + public Optional getAttachedTaskId() { return Optional.ofNullable(this.taskId.get()); @@ -410,7 +423,6 @@ public void release() if (released.compareAndSet(false, true)) { node.cancel(true); if (node.isDone() && !node.isCancelled()) { - deallocateMemory(getFutureValue(node)); checkState(fulfilledAcquires.remove(this), "node lease %s not found in fulfilledAcquires %s", this, fulfilledAcquires); wakeupProcessPendingAcquires(); } @@ -419,21 +431,16 @@ public void release() throw new IllegalStateException("Node " + node + " already released"); } } - - public void deallocateMemory(InternalNode node) - { - if (memoryDeallocated.compareAndSet(false, true)) { - updateAllocatedMemory(node, -memoryLease); - } - } } private static class BinPackingSimulation { private final NodesSnapshot nodesSnapshot; private final List allNodesSorted; + private final boolean ignoreAcquiredSpeculative; private final Map nodesRemainingMemory; private final Map nodesRemainingMemoryRuntimeAdjusted; + private final Map speculativeMemoryReserved; private final Map nodeMemoryPoolInfos; private final boolean scheduleOnCoordinator; @@ -442,20 +449,20 @@ public BinPackingSimulation( NodesSnapshot nodesSnapshot, Map nodeMemoryPoolInfos, Set fulfilledAcquires, - Map preReservedMemory, boolean scheduleOnCoordinator, - DataSize taskRuntimeMemoryEstimationOverhead) + DataSize taskRuntimeMemoryEstimationOverhead, + boolean ignoreAcquiredSpeculative) { this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null"); // use same node ordering for each simulation this.allNodesSorted = nodesSnapshot.getAllNodes().stream() .sorted(comparing(InternalNode::getNodeIdentifier)) .collect(toImmutableList()); + this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative; requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null"); this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos); - requireNonNull(preReservedMemory, "preReservedMemory is null"); this.scheduleOnCoordinator = scheduleOnCoordinator; Map> realtimeTasksMemoryPerNode = new HashMap<>(); @@ -468,10 +475,26 @@ public BinPackingSimulation( realtimeTasksMemoryPerNode.put(node.getNodeIdentifier(), memoryPoolInfo.getTaskMemoryReservations()); } + Map preReservedMemory = new HashMap<>(); SetMultimap fulfilledAcquiresByNode = HashMultimap.create(); for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) { + if (ignoreAcquiredSpeculative && fulfilledAcquire.isSpeculative()) { + continue; + } InternalNode node = fulfilledAcquire.getAssignedNode(); fulfilledAcquiresByNode.put(node.getNodeIdentifier(), fulfilledAcquire); + preReservedMemory.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease()); + } + + speculativeMemoryReserved = new HashMap<>(); + if (ignoreAcquiredSpeculative) { + for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) { + if (!fulfilledAcquire.isSpeculative()) { + continue; + } + InternalNode node = fulfilledAcquire.getAssignedNode(); + speculativeMemoryReserved.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease()); + } } nodesRemainingMemory = new HashMap<>(); @@ -537,8 +560,12 @@ public ReserveResult tryReserve(PendingAcquire acquire) return ReserveResult.NONE_MATCHING; } + Comparator comparator = comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier())); + if (ignoreAcquiredSpeculative) { + comparator = resolveTiesWithSpeculativeMemory(comparator); + } InternalNode selectedNode = candidates.stream() - .max(comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier()))) + .max(comparator) .orElseThrow(); if (nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.getNodeIdentifier())) { @@ -556,13 +583,22 @@ public ReserveResult tryReserve(PendingAcquire acquire) // If selected node cannot be used right now, select best one ignoring runtime memory usage and reserve space there // for later use. This is important from algorithm liveliness perspective. If we did not reserve space for a task which // is too big to be scheduled right now, it could be starved by smaller tasks coming later. + Comparator fallbackComparator = comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())); + if (ignoreAcquiredSpeculative) { + fallbackComparator = resolveTiesWithSpeculativeMemory(fallbackComparator); + } InternalNode fallbackNode = candidates.stream() - .max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier()))) + .max(fallbackComparator) .orElseThrow(); subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease()); return ReserveResult.NOT_ENOUGH_RESOURCES_NOW; } + private Comparator resolveTiesWithSpeculativeMemory(Comparator comparator) + { + return comparator.thenComparing(node -> -speculativeMemoryReserved.getOrDefault(node.getNodeIdentifier(), 0L)); + } + private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease) { nodesRemainingMemoryRuntimeAdjusted.compute( diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index b4733a6e120..a832aeea5bc 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -881,10 +881,35 @@ private StageId getStageId(PlanFragmentId fragmentId) private void scheduleTasks() { - long tasksWaitingForNode = preSchedulingTaskContexts.values().stream().filter(context -> !context.getNodeLease().getNode().isDone()).count(); + long speculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream() + .filter(context -> !context.getNodeLease().getNode().isDone()) + .filter(PreSchedulingTaskContext::isSpeculative) + .count(); + + long nonSpeculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream() + .filter(context -> !context.getNodeLease().getNode().isDone()) + .filter(preSchedulingTaskContext -> !preSchedulingTaskContext.isSpeculative()) + .count(); + + while (!schedulingQueue.isEmpty()) { + if (nonSpeculativeTasksWaitingForNode >= maxTasksWaitingForNode) { + break; + } + + PrioritizedScheduledTask scheduledTask = schedulingQueue.peekOrThrow(); + + if (scheduledTask.isSpeculative() && nonSpeculativeTasksWaitingForNode > 0) { + // do not handle any speculative tasks if there are non-speculative waiting + break; + } + + if (scheduledTask.isSpeculative() && speculativeTasksWaitingForNode >= maxTasksWaitingForNode) { + // too many speculative tasks waiting for node + break; + } + + verify(schedulingQueue.pollOrThrow().equals(scheduledTask)); - while (tasksWaitingForNode < maxTasksWaitingForNode && !schedulingQueue.isEmpty()) { - PrioritizedScheduledTask scheduledTask = schedulingQueue.pollOrThrow(); StageExecution stageExecution = getStageExecution(scheduledTask.task().stageId()); if (stageExecution.getState().isDone()) { continue; @@ -896,10 +921,16 @@ private void scheduleTasks() continue; } MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId); - NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory()); + NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.isSpeculative()); lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor); preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative())); - tasksWaitingForNode++; + + if (scheduledTask.isSpeculative()) { + speculativeTasksWaitingForNode++; + } + else { + nonSpeculativeTasksWaitingForNode++; + } } } @@ -1115,7 +1146,9 @@ public void onSplitAssignment(SplitAssignmentEvent event) PreSchedulingTaskContext context = preSchedulingTaskContexts.get(prioritizedTask.task()); if (context != null) { // task is already waiting for node or for sink instance handle - context.setSpeculative(prioritizedTask.isSpeculative()); // update speculative flag + // update speculative flag + context.setSpeculative(prioritizedTask.isSpeculative()); + context.getNodeLease().setSpeculative(prioritizedTask.isSpeculative()); return; } schedulingQueue.addOrUpdate(prioritizedTask); @@ -2056,6 +2089,14 @@ public PrioritizedScheduledTask pollOrThrow() return prioritizedTask; } + public PrioritizedScheduledTask peekOrThrow() + { + IndexedPriorityQueue.Prioritized task = queue.peekPrioritized(); + checkState(task != null, "queue is empty"); + // negate priority to reverse operation we do in addOrUpdate + return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority())); + } + public void addOrUpdate(PrioritizedScheduledTask prioritizedTask) { IndexedPriorityQueue.Prioritized previousTask = queue.getPrioritized(prioritizedTask.task()); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java index 6c105daf20c..dfd889c1842 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeAllocator.java @@ -29,7 +29,7 @@ public interface NodeAllocator * * It is obligatory for the calling party to release all the leases they obtained via {@link NodeLease#release()}. */ - NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement); + NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative); @Override void close(); @@ -40,6 +40,8 @@ interface NodeLease default void attachTaskId(TaskId taskId) {} + void setSpeculative(boolean speculative); + void release(); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java index 0e1b4d8ce30..ed354db1387 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java @@ -150,19 +150,19 @@ public void testAllocateSimple() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first two allocations should not block - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); // same for subsequent two allocation (each task requires 32GB and we have 2 nodes with 64GB each) - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire4, NODE_2); // 5th allocation should block - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire5); // release acquire2 which uses @@ -174,7 +174,7 @@ public void testAllocateSimple() }); // try to acquire one more node (should block) - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire6); // add new node @@ -198,25 +198,25 @@ public void testAllocateDifferentSizes() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_2); - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire5, NODE_1); - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire6, NODE_2); // each of the nodes is filled in with 32+16+16 // try allocate 32 and 16 - NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire7); - NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire8); // free 16MB on NODE_1; @@ -244,25 +244,25 @@ public void testAllocateDifferentSizesOpportunisticAcquisition() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire3, NODE_1); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_2); - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire5, NODE_1); - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire6, NODE_2); // each of the nodes is filled in with 32+16+16 // try to allocate 32 and 16 - NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire7 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire7); - NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire8 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire8); // free 32MB on NODE_2; @@ -285,15 +285,15 @@ public void testAllocateReleaseBeforeAcquired() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first two allocations should not block - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_1); // another two should block - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire3); - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire4); // releasing a blocked one should not unblock anything @@ -314,7 +314,7 @@ public void testNoMatchingNodeAvailable() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // request a node with specific catalog (not present) - NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquireNoMatching); ticker.increment(59, TimeUnit.SECONDS); // still below timeout nodeAllocatorService.processPendingAcquires(); @@ -328,11 +328,11 @@ public void testNoMatchingNodeAvailable() nodeManager.addNodes(NODE_2); // we should be able to acquire the node now - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertAcquired(acquire1, NODE_2); // acquiring one more should block (only one acquire fits a node as we request 64GB) - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquire2); // remove node with catalog @@ -360,8 +360,8 @@ public void testNoMatchingNodeAvailableTimeoutReset() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // request a node with specific catalog (not present) - NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); - NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); + NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE), false); assertNotAcquired(acquireNoMatching1); assertNotAcquired(acquireNoMatching2); @@ -407,7 +407,7 @@ public void testRemoveAcquiredNode() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); // remove acquired node @@ -426,17 +426,17 @@ public void testAllocateNodeWithAddressRequirements() setupNodeAllocatorService(nodeManager); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_2); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NODE_2, DataSize.of(32, GIGABYTE), false); // no more place on NODE_2 assertNotAcquired(acquire3); // requests for other node are still good - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NODE_1, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire4, NODE_1); // release some space on NODE_2 @@ -454,7 +454,7 @@ public void testAllocateNotEnoughRuntimeMemory() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -465,27 +465,27 @@ public void testAllocateNotEnoughRuntimeMemory() nodeAllocatorService.refreshNodePoolMemoryInfos(); // second allocation of 32GB should go to another node - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire2, NODE_2); acquire2.attachTaskId(taskId(2)); // third allocation of 32GB should also use NODE_2 as there is not enough runtime memory on NODE_1 // second allocation of 32GB should go to another node - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire3, NODE_2); acquire3.attachTaskId(taskId(3)); // fourth allocation of 16 should fit on NODE_1 - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire4, NODE_1); acquire4.attachTaskId(taskId(4)); // fifth allocation of 16 should no longer fit on NODE_1. There is 16GB unreserved but only 15GB taking runtime usage into account - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertNotAcquired(acquire5); // even tiny allocations should not fit now - NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE)); + NodeAllocator.NodeLease acquire6 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), false); assertNotAcquired(acquire6); // if memory usage decreases on NODE_1 the pending 16GB allocation should complete @@ -511,7 +511,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -522,7 +522,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // global (greater) memory usage should take precedence - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } @@ -530,7 +530,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when global memory usage on node is smaller than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -541,7 +541,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // per-task (greater) memory usage should take precedence - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } @@ -549,7 +549,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() // test when per-task memory usage not present at all try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // first allocation is fine - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -558,7 +558,7 @@ public void testAllocateRuntimeMemoryDiscrepancies() nodeAllocatorService.refreshNodePoolMemoryInfos(); // global memory usage should be used (not per-task usage) - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); } } @@ -572,10 +572,10 @@ public void testSpaceReservedOnPrimaryNodeIfNoNodeWithEnoughRuntimeMemoryAvailab // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // reserve 32GB on NODE_1 and 16GB on NODE_2 - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(16, GIGABYTE), false); assertAcquired(acquire2, NODE_2); acquire2.attachTaskId(taskId(2)); @@ -591,11 +591,11 @@ public void testSpaceReservedOnPrimaryNodeIfNoNodeWithEnoughRuntimeMemoryAvailab // try to allocate 32GB task // it will not fit on neither of nodes. space should be reserved on NODE_2 as it has more memory available // when you do not take runtime memory into account - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire3); // to check that is the case try to allocate 20GB; NODE_1 should be picked - NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(20, GIGABYTE)); + NodeAllocator.NodeLease acquire4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(20, GIGABYTE), false); assertAcquired(acquire4, NODE_1); acquire4.attachTaskId(taskId(2)); } @@ -610,7 +610,7 @@ public void testAllocateWithRuntimeMemoryEstimateOverhead() // test when global memory usage on node is greater than per task usage try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // allocated 32GB - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertAcquired(acquire1, NODE_1); acquire1.attachTaskId(taskId(1)); @@ -621,7 +621,7 @@ public void testAllocateWithRuntimeMemoryEstimateOverhead() nodeAllocatorService.refreshNodePoolMemoryInfos(); // including overhead node runtime usage is 30+4 = 34GB so another 32GB task will not fit - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); assertNotAcquired(acquire2); // decrease runtime usage to 28GB @@ -645,12 +645,67 @@ public void testStressAcquireRelease() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { for (int i = 0; i < 10_000_000; ++i) { - NodeAllocator.NodeLease lease = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE)); + NodeAllocator.NodeLease lease = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); lease.release(); } } } + @Test(timeOut = TEST_TIMEOUT) + public void testAllocateSpeculative() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(NODE_1, NODE_2); + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + // allocate two speculative tasks + NodeAllocator.NodeLease acquireSpeculative1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(64, GIGABYTE), true); + assertAcquired(acquireSpeculative1, NODE_1); + NodeAllocator.NodeLease acquireSpeculative2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), true); + assertAcquired(acquireSpeculative2, NODE_2); + + // non-speculative tasks should still get node + NodeAllocator.NodeLease acquireNonSpeculative1 = nodeAllocator.acquire(REQ_NONE, DataSize.of(64, GIGABYTE), false); + assertAcquired(acquireNonSpeculative1, NODE_2); + NodeAllocator.NodeLease acquireNonSpeculative2 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertAcquired(acquireNonSpeculative2, NODE_1); + + // new speculative task will not fit (even tiny one) + NodeAllocator.NodeLease acquireSpeculative3 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), true); + assertNotAcquired(acquireSpeculative3); + + // if you switch it to non-speculative it will schedule + acquireSpeculative3.setSpeculative(false); + assertAcquired(acquireSpeculative3, NODE_1); + + // release all speculative tasks + acquireSpeculative1.release(); + acquireSpeculative2.release(); + acquireSpeculative3.release(); + + // we have 32G free on NODE_1 now + NodeAllocator.NodeLease acquireNonSpeculative4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertAcquired(acquireNonSpeculative4, NODE_1); + + // no place for speculative task + NodeAllocator.NodeLease acquireSpeculative4 = nodeAllocator.acquire(REQ_NONE, DataSize.of(1, GIGABYTE), true); + assertNotAcquired(acquireSpeculative4); + + // no place for another non-speculative task + NodeAllocator.NodeLease acquireNonSpeculative5 = nodeAllocator.acquire(REQ_NONE, DataSize.of(32, GIGABYTE), false); + assertNotAcquired(acquireNonSpeculative5); + + // release acquireNonSpeculative4 - a non-speculative task should be scheduled before speculative one + acquireNonSpeculative4.release(); + assertAcquired(acquireNonSpeculative5); + assertNotAcquired(acquireSpeculative4); + + // on subsequent release speculative task will get node + acquireNonSpeculative5.release(); + assertAcquired(acquireSpeculative4); + } + } + private TaskId taskId(int partition) { return new TaskId(new StageId("test_query", 0), partition, 0);