Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize non-speculative tasks in scheduler and node allocator #17465

Merged
merged 3 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import javax.inject.Inject;

import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -195,6 +196,15 @@ void refreshNodePoolMemoryInfos()

@VisibleForTesting
synchronized void processPendingAcquires()
{
processPendingAcquires(false);
boolean hasNonSpeculativePendingAcquires = pendingAcquires.stream().anyMatch(pendingAcquire -> !pendingAcquire.isSpeculative());
if (!hasNonSpeculativePendingAcquires) {
processPendingAcquires(true);
}
}

private void processPendingAcquires(boolean processSpeculative)
{
// synchronized only for sake manual triggering in test code. In production code it should only be called by single thread
Iterator<PendingAcquire> iterator = pendingAcquires.iterator();
Expand All @@ -204,7 +214,8 @@ synchronized void processPendingAcquires()
nodePoolMemoryInfos.get(),
fulfilledAcquires,
scheduleOnCoordinator,
taskRuntimeMemoryEstimationOverhead);
taskRuntimeMemoryEstimationOverhead,
!processSpeculative); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones

while (iterator.hasNext()) {
PendingAcquire pendingAcquire = iterator.next();
Expand All @@ -215,6 +226,10 @@ synchronized void processPendingAcquires()
continue;
}

if (pendingAcquire.isSpeculative() != processSpeculative) {
continue;
}

BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire);
switch (result.getStatus()) {
case RESERVED:
Expand Down Expand Up @@ -262,9 +277,9 @@ public NodeAllocator getNodeAllocator(Session session)
}

@Override
public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement)
public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative)
{
BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes());
BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes(), speculative);
PendingAcquire pendingAcquire = new PendingAcquire(nodeRequirements, memoryRequirement, nodeLease, ticker);
pendingAcquires.add(pendingAcquire);
wakeupProcessPendingAcquires();
Expand Down Expand Up @@ -326,6 +341,11 @@ public void resetNoMatchingNodeFound()
{
noMatchingNodeStopwatch.reset();
}

public boolean isSpeculative()
{
return lease.isSpeculative();
}
}

private class BinPackingNodeLease
Expand All @@ -335,10 +355,12 @@ private class BinPackingNodeLease
private final AtomicBoolean released = new AtomicBoolean();
private final long memoryLease;
private final AtomicReference<TaskId> taskId = new AtomicReference<>();
private final AtomicBoolean speculative;

private BinPackingNodeLease(long memoryLease)
private BinPackingNodeLease(long memoryLease, boolean speculative)
{
this.memoryLease = memoryLease;
this.speculative = new AtomicBoolean(speculative);
}

@Override
Expand Down Expand Up @@ -370,6 +392,21 @@ public void attachTaskId(TaskId taskId)
}
}

@Override
public void setSpeculative(boolean speculative)
{
checkArgument(!speculative, "cannot make non-speculative task speculative");
boolean changed = this.speculative.compareAndSet(true, false);
if (changed) {
wakeupProcessPendingAcquires();
}
}

public boolean isSpeculative()
{
return speculative.get();
}

public Optional<TaskId> getAttachedTaskId()
{
return Optional.ofNullable(this.taskId.get());
Expand Down Expand Up @@ -400,8 +437,10 @@ private static class BinPackingSimulation
{
private final NodesSnapshot nodesSnapshot;
private final List<InternalNode> allNodesSorted;
private final boolean ignoreAcquiredSpeculative;
private final Map<String, Long> nodesRemainingMemory;
private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;
private final Map<String, Long> speculativeMemoryReserved;

private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
private final boolean scheduleOnCoordinator;
Expand All @@ -411,13 +450,15 @@ public BinPackingSimulation(
Map<String, MemoryPoolInfo> nodeMemoryPoolInfos,
Set<BinPackingNodeLease> fulfilledAcquires,
boolean scheduleOnCoordinator,
DataSize taskRuntimeMemoryEstimationOverhead)
DataSize taskRuntimeMemoryEstimationOverhead,
boolean ignoreAcquiredSpeculative)
{
this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null");
// use same node ordering for each simulation
this.allNodesSorted = nodesSnapshot.getAllNodes().stream()
.sorted(comparing(InternalNode::getNodeIdentifier))
.collect(toImmutableList());
this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative;

requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null");
this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos);
Expand All @@ -437,11 +478,25 @@ public BinPackingSimulation(
Map<String, Long> preReservedMemory = new HashMap<>();
SetMultimap<String, BinPackingNodeLease> fulfilledAcquiresByNode = HashMultimap.create();
for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
if (ignoreAcquiredSpeculative && fulfilledAcquire.isSpeculative()) {
continue;
}
InternalNode node = fulfilledAcquire.getAssignedNode();
fulfilledAcquiresByNode.put(node.getNodeIdentifier(), fulfilledAcquire);
preReservedMemory.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease());
}

speculativeMemoryReserved = new HashMap<>();
if (ignoreAcquiredSpeculative) {
for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
if (!fulfilledAcquire.isSpeculative()) {
continue;
}
InternalNode node = fulfilledAcquire.getAssignedNode();
speculativeMemoryReserved.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease());
}
}

nodesRemainingMemory = new HashMap<>();
for (InternalNode node : nodesSnapshot.getAllNodes()) {
MemoryPoolInfo memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
Expand Down Expand Up @@ -505,8 +560,12 @@ public ReserveResult tryReserve(PendingAcquire acquire)
return ReserveResult.NONE_MATCHING;
}

Comparator<InternalNode> comparator = comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier()));
if (ignoreAcquiredSpeculative) {
comparator = resolveTiesWithSpeculativeMemory(comparator);
}
InternalNode selectedNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier())))
.max(comparator)
.orElseThrow();

if (nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.getNodeIdentifier())) {
Expand All @@ -524,13 +583,22 @@ public ReserveResult tryReserve(PendingAcquire acquire)
// If selected node cannot be used right now, select best one ignoring runtime memory usage and reserve space there
// for later use. This is important from algorithm liveliness perspective. If we did not reserve space for a task which
// is too big to be scheduled right now, it could be starved by smaller tasks coming later.
Comparator<InternalNode> fallbackComparator = comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier()));
if (ignoreAcquiredSpeculative) {
fallbackComparator = resolveTiesWithSpeculativeMemory(fallbackComparator);
}
InternalNode fallbackNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())))
.max(fallbackComparator)
.orElseThrow();
subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease());
return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
}

private Comparator<InternalNode> resolveTiesWithSpeculativeMemory(Comparator<InternalNode> comparator)
{
return comparator.thenComparing(node -> -speculativeMemoryReserved.getOrDefault(node.getNodeIdentifier(), 0L));
}

private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease)
{
nodesRemainingMemoryRuntimeAdjusted.compute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ private void scheduleTasks()
continue;
}
MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId);
NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory());
NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.isSpeculative());
lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor);
preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative()));

Expand Down Expand Up @@ -1146,7 +1146,9 @@ public void onSplitAssignment(SplitAssignmentEvent event)
PreSchedulingTaskContext context = preSchedulingTaskContexts.get(prioritizedTask.task());
if (context != null) {
// task is already waiting for node or for sink instance handle
context.setSpeculative(prioritizedTask.isSpeculative()); // update speculative flag
// update speculative flag
context.setSpeculative(prioritizedTask.isSpeculative());
context.getNodeLease().setSpeculative(prioritizedTask.isSpeculative());
return;
}
schedulingQueue.addOrUpdate(prioritizedTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface NodeAllocator
*
* It is obligatory for the calling party to release all the leases they obtained via {@link NodeLease#release()}.
*/
NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement);
NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative);

@Override
void close();
Expand All @@ -40,6 +40,8 @@ interface NodeLease

default void attachTaskId(TaskId taskId) {}

void setSpeculative(boolean speculative);

void release();
}
}
Loading