Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize non-speculative tasks in scheduler and node allocator #17465

Merged
merged 3 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@
import javax.inject.Inject;

import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
Expand All @@ -66,10 +65,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor;
Expand Down Expand Up @@ -104,7 +101,6 @@ public class BinPackingNodeAllocatorService
private final DataSize taskRuntimeMemoryEstimationOverhead;
private final Ticker ticker;

private final ConcurrentMap<String, Long> allocatedMemory = new ConcurrentHashMap<>();
private final Deque<PendingAcquire> pendingAcquires = new ConcurrentLinkedDeque<>();
private final Set<BinPackingNodeLease> fulfilledAcquires = newConcurrentHashSet();
private final Duration allowedNoMatchingNodePeriod;
Expand Down Expand Up @@ -200,6 +196,15 @@ void refreshNodePoolMemoryInfos()

@VisibleForTesting
synchronized void processPendingAcquires()
{
processPendingAcquires(false);
boolean hasNonSpeculativePendingAcquires = pendingAcquires.stream().anyMatch(pendingAcquire -> !pendingAcquire.isSpeculative());
if (!hasNonSpeculativePendingAcquires) {
processPendingAcquires(true);
}
}

private void processPendingAcquires(boolean processSpeculative)
{
// synchronized only for sake manual triggering in test code. In production code it should only be called by single thread
Iterator<PendingAcquire> iterator = pendingAcquires.iterator();
Expand All @@ -208,9 +213,9 @@ synchronized void processPendingAcquires()
nodeManager.getActiveNodesSnapshot(),
nodePoolMemoryInfos.get(),
fulfilledAcquires,
allocatedMemory,
scheduleOnCoordinator,
taskRuntimeMemoryEstimationOverhead);
taskRuntimeMemoryEstimationOverhead,
!processSpeculative); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones

while (iterator.hasNext()) {
PendingAcquire pendingAcquire = iterator.next();
Expand All @@ -221,17 +226,18 @@ synchronized void processPendingAcquires()
continue;
}

if (pendingAcquire.isSpeculative() != processSpeculative) {
continue;
}

BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire);
switch (result.getStatus()) {
case RESERVED:
InternalNode reservedNode = result.getNode().orElseThrow();
fulfilledAcquires.add(pendingAcquire.getLease());
updateAllocatedMemory(reservedNode, pendingAcquire.getMemoryLease());
pendingAcquire.getFuture().set(reservedNode);
if (pendingAcquire.getFuture().isCancelled()) {
// completing future was unsuccessful - request was cancelled in the meantime
pendingAcquire.getLease().deallocateMemory(reservedNode);

fulfilledAcquires.remove(pendingAcquire.getLease());

// run once again when we are done
Expand Down Expand Up @@ -271,9 +277,9 @@ public NodeAllocator getNodeAllocator(Session session)
}

@Override
public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement)
public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative)
{
BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes());
BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes(), speculative);
PendingAcquire pendingAcquire = new PendingAcquire(nodeRequirements, memoryRequirement, nodeLease, ticker);
pendingAcquires.add(pendingAcquire);
wakeupProcessPendingAcquires();
Expand All @@ -288,20 +294,6 @@ public void close()
// and that can be done before all leases are yet returned from running (soon to be failed) tasks.
}

private void updateAllocatedMemory(InternalNode node, long delta)
{
allocatedMemory.compute(
node.getNodeIdentifier(),
(key, oldValue) -> {
verify(delta > 0 || (oldValue != null && oldValue >= -delta), "tried to release more than allocated (%s vs %s) for node %s", -delta, oldValue, key);
long newValue = oldValue == null ? delta : oldValue + delta;
if (newValue == 0) {
return null; // delete
}
return newValue;
});
}

private static class PendingAcquire
{
private final NodeRequirements nodeRequirements;
Expand Down Expand Up @@ -349,20 +341,26 @@ public void resetNoMatchingNodeFound()
{
noMatchingNodeStopwatch.reset();
}

public boolean isSpeculative()
{
return lease.isSpeculative();
}
}

private class BinPackingNodeLease
implements NodeAllocator.NodeLease
{
private final SettableFuture<InternalNode> node = SettableFuture.create();
private final AtomicBoolean released = new AtomicBoolean();
private final AtomicBoolean memoryDeallocated = new AtomicBoolean();
private final long memoryLease;
private final AtomicReference<TaskId> taskId = new AtomicReference<>();
private final AtomicBoolean speculative;

private BinPackingNodeLease(long memoryLease)
private BinPackingNodeLease(long memoryLease, boolean speculative)
{
this.memoryLease = memoryLease;
this.speculative = new AtomicBoolean(speculative);
}

@Override
Expand Down Expand Up @@ -394,6 +392,21 @@ public void attachTaskId(TaskId taskId)
}
}

@Override
public void setSpeculative(boolean speculative)
{
checkArgument(!speculative, "cannot make non-speculative task speculative");
boolean changed = this.speculative.compareAndSet(true, false);
if (changed) {
wakeupProcessPendingAcquires();
}
}

public boolean isSpeculative()
{
return speculative.get();
}

public Optional<TaskId> getAttachedTaskId()
{
return Optional.ofNullable(this.taskId.get());
Expand All @@ -410,7 +423,6 @@ public void release()
if (released.compareAndSet(false, true)) {
node.cancel(true);
if (node.isDone() && !node.isCancelled()) {
deallocateMemory(getFutureValue(node));
checkState(fulfilledAcquires.remove(this), "node lease %s not found in fulfilledAcquires %s", this, fulfilledAcquires);
wakeupProcessPendingAcquires();
}
Expand All @@ -419,21 +431,16 @@ public void release()
throw new IllegalStateException("Node " + node + " already released");
}
}

public void deallocateMemory(InternalNode node)
{
if (memoryDeallocated.compareAndSet(false, true)) {
updateAllocatedMemory(node, -memoryLease);
}
}
}

private static class BinPackingSimulation
{
private final NodesSnapshot nodesSnapshot;
private final List<InternalNode> allNodesSorted;
private final boolean ignoreAcquiredSpeculative;
private final Map<String, Long> nodesRemainingMemory;
private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;
private final Map<String, Long> speculativeMemoryReserved;

private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
private final boolean scheduleOnCoordinator;
Expand All @@ -442,20 +449,20 @@ public BinPackingSimulation(
NodesSnapshot nodesSnapshot,
Map<String, MemoryPoolInfo> nodeMemoryPoolInfos,
Set<BinPackingNodeLease> fulfilledAcquires,
Map<String, Long> preReservedMemory,
boolean scheduleOnCoordinator,
DataSize taskRuntimeMemoryEstimationOverhead)
DataSize taskRuntimeMemoryEstimationOverhead,
boolean ignoreAcquiredSpeculative)
{
this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null");
// use same node ordering for each simulation
this.allNodesSorted = nodesSnapshot.getAllNodes().stream()
.sorted(comparing(InternalNode::getNodeIdentifier))
.collect(toImmutableList());
this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative;

requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null");
this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos);

requireNonNull(preReservedMemory, "preReservedMemory is null");
this.scheduleOnCoordinator = scheduleOnCoordinator;

Map<String, Map<String, Long>> realtimeTasksMemoryPerNode = new HashMap<>();
Expand All @@ -468,10 +475,26 @@ public BinPackingSimulation(
realtimeTasksMemoryPerNode.put(node.getNodeIdentifier(), memoryPoolInfo.getTaskMemoryReservations());
}

Map<String, Long> preReservedMemory = new HashMap<>();
SetMultimap<String, BinPackingNodeLease> fulfilledAcquiresByNode = HashMultimap.create();
for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
if (ignoreAcquiredSpeculative && fulfilledAcquire.isSpeculative()) {
continue;
}
InternalNode node = fulfilledAcquire.getAssignedNode();
fulfilledAcquiresByNode.put(node.getNodeIdentifier(), fulfilledAcquire);
preReservedMemory.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease());
}

speculativeMemoryReserved = new HashMap<>();
if (ignoreAcquiredSpeculative) {
for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
if (!fulfilledAcquire.isSpeculative()) {
continue;
}
InternalNode node = fulfilledAcquire.getAssignedNode();
speculativeMemoryReserved.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease());
}
}

nodesRemainingMemory = new HashMap<>();
Expand Down Expand Up @@ -537,8 +560,12 @@ public ReserveResult tryReserve(PendingAcquire acquire)
return ReserveResult.NONE_MATCHING;
}

Comparator<InternalNode> comparator = comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier()));
if (ignoreAcquiredSpeculative) {
comparator = resolveTiesWithSpeculativeMemory(comparator);
}
InternalNode selectedNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier())))
.max(comparator)
.orElseThrow();

if (nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.getNodeIdentifier())) {
Expand All @@ -556,13 +583,22 @@ public ReserveResult tryReserve(PendingAcquire acquire)
// If selected node cannot be used right now, select best one ignoring runtime memory usage and reserve space there
// for later use. This is important from algorithm liveliness perspective. If we did not reserve space for a task which
// is too big to be scheduled right now, it could be starved by smaller tasks coming later.
Comparator<InternalNode> fallbackComparator = comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier()));
if (ignoreAcquiredSpeculative) {
fallbackComparator = resolveTiesWithSpeculativeMemory(fallbackComparator);
}
InternalNode fallbackNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())))
.max(fallbackComparator)
.orElseThrow();
subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease());
return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
}

private Comparator<InternalNode> resolveTiesWithSpeculativeMemory(Comparator<InternalNode> comparator)
{
return comparator.thenComparing(node -> -speculativeMemoryReserved.getOrDefault(node.getNodeIdentifier(), 0L));
}

private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease)
{
nodesRemainingMemoryRuntimeAdjusted.compute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,10 +881,35 @@ private StageId getStageId(PlanFragmentId fragmentId)

private void scheduleTasks()
{
long tasksWaitingForNode = preSchedulingTaskContexts.values().stream().filter(context -> !context.getNodeLease().getNode().isDone()).count();
long speculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream()
.filter(context -> !context.getNodeLease().getNode().isDone())
.filter(PreSchedulingTaskContext::isSpeculative)
.count();

long nonSpeculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream()
.filter(context -> !context.getNodeLease().getNode().isDone())
.filter(preSchedulingTaskContext -> !preSchedulingTaskContext.isSpeculative())
.count();

while (!schedulingQueue.isEmpty()) {
if (nonSpeculativeTasksWaitingForNode >= maxTasksWaitingForNode) {
break;
}

PrioritizedScheduledTask scheduledTask = schedulingQueue.peekOrThrow();

if (scheduledTask.isSpeculative() && nonSpeculativeTasksWaitingForNode > 0) {
// do not handle any speculative tasks if there are non-speculative waiting
break;
}

if (scheduledTask.isSpeculative() && speculativeTasksWaitingForNode >= maxTasksWaitingForNode) {
// too many speculative tasks waiting for node
break;
}

verify(schedulingQueue.pollOrThrow().equals(scheduledTask));

while (tasksWaitingForNode < maxTasksWaitingForNode && !schedulingQueue.isEmpty()) {
PrioritizedScheduledTask scheduledTask = schedulingQueue.pollOrThrow();
StageExecution stageExecution = getStageExecution(scheduledTask.task().stageId());
if (stageExecution.getState().isDone()) {
continue;
Expand All @@ -896,10 +921,16 @@ private void scheduleTasks()
continue;
}
MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId);
NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory());
NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.isSpeculative());
lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor);
preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative()));
tasksWaitingForNode++;

if (scheduledTask.isSpeculative()) {
speculativeTasksWaitingForNode++;
}
else {
nonSpeculativeTasksWaitingForNode++;
}
}
}

Expand Down Expand Up @@ -1115,7 +1146,9 @@ public void onSplitAssignment(SplitAssignmentEvent event)
PreSchedulingTaskContext context = preSchedulingTaskContexts.get(prioritizedTask.task());
if (context != null) {
// task is already waiting for node or for sink instance handle
context.setSpeculative(prioritizedTask.isSpeculative()); // update speculative flag
// update speculative flag
context.setSpeculative(prioritizedTask.isSpeculative());
context.getNodeLease().setSpeculative(prioritizedTask.isSpeculative());
return;
}
schedulingQueue.addOrUpdate(prioritizedTask);
Expand Down Expand Up @@ -2056,6 +2089,14 @@ public PrioritizedScheduledTask pollOrThrow()
return prioritizedTask;
}

public PrioritizedScheduledTask peekOrThrow()
{
IndexedPriorityQueue.Prioritized<ScheduledTask> task = queue.peekPrioritized();
checkState(task != null, "queue is empty");
// negate priority to reverse operation we do in addOrUpdate
return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change IndexedPriorityQueue to sort things in reserved order? The fact that we need to always negate priority here feels really error-prone

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can - will add commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will send separate PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

public void addOrUpdate(PrioritizedScheduledTask prioritizedTask)
{
IndexedPriorityQueue.Prioritized<ScheduledTask> previousTask = queue.getPrioritized(prioritizedTask.task());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface NodeAllocator
*
* It is obligatory for the calling party to release all the leases they obtained via {@link NodeLease#release()}.
*/
NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement);
NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative);

@Override
void close();
Expand All @@ -40,6 +40,8 @@ interface NodeLease

default void attachTaskId(TaskId taskId) {}

void setSpeculative(boolean speculative);

void release();
}
}
Loading