Skip to content

Commit

Permalink
Prioritize non-speculative tasks in BinPackingNodeAllocatorService
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed May 16, 2023
1 parent f742394 commit 1eba1f3
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import javax.inject.Inject;

import java.time.Duration;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -195,6 +196,15 @@ void refreshNodePoolMemoryInfos()

@VisibleForTesting
synchronized void processPendingAcquires()
{
processPendingAcquires(false);
boolean hasNonSpeculativePendingAcquires = pendingAcquires.stream().anyMatch(pendingAcquire -> !pendingAcquire.isSpeculative());
if (!hasNonSpeculativePendingAcquires) {
processPendingAcquires(true);
}
}

private void processPendingAcquires(boolean processSpeculative)
{
// synchronized only for sake manual triggering in test code. In production code it should only be called by single thread
Iterator<PendingAcquire> iterator = pendingAcquires.iterator();
Expand All @@ -204,7 +214,8 @@ synchronized void processPendingAcquires()
nodePoolMemoryInfos.get(),
fulfilledAcquires,
scheduleOnCoordinator,
taskRuntimeMemoryEstimationOverhead);
taskRuntimeMemoryEstimationOverhead,
!processSpeculative); // if we are processing non-speculative pending acquires we are ignoring speculative acquired ones

while (iterator.hasNext()) {
PendingAcquire pendingAcquire = iterator.next();
Expand All @@ -215,6 +226,10 @@ synchronized void processPendingAcquires()
continue;
}

if (pendingAcquire.isSpeculative() != processSpeculative) {
continue;
}

BinPackingSimulation.ReserveResult result = simulation.tryReserve(pendingAcquire);
switch (result.getStatus()) {
case RESERVED:
Expand Down Expand Up @@ -262,9 +277,9 @@ public NodeAllocator getNodeAllocator(Session session)
}

@Override
public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement)
public NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative)
{
BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes());
BinPackingNodeLease nodeLease = new BinPackingNodeLease(memoryRequirement.toBytes(), speculative);
PendingAcquire pendingAcquire = new PendingAcquire(nodeRequirements, memoryRequirement, nodeLease, ticker);
pendingAcquires.add(pendingAcquire);
wakeupProcessPendingAcquires();
Expand Down Expand Up @@ -326,6 +341,11 @@ public void resetNoMatchingNodeFound()
{
noMatchingNodeStopwatch.reset();
}

public boolean isSpeculative()
{
return lease.isSpeculative();
}
}

private class BinPackingNodeLease
Expand All @@ -335,10 +355,12 @@ private class BinPackingNodeLease
private final AtomicBoolean released = new AtomicBoolean();
private final long memoryLease;
private final AtomicReference<TaskId> taskId = new AtomicReference<>();
private final AtomicBoolean speculative;

private BinPackingNodeLease(long memoryLease)
private BinPackingNodeLease(long memoryLease, boolean speculative)
{
this.memoryLease = memoryLease;
this.speculative = new AtomicBoolean(speculative);
}

@Override
Expand Down Expand Up @@ -370,6 +392,21 @@ public void attachTaskId(TaskId taskId)
}
}

@Override
public void setSpeculative(boolean speculative)
{
checkArgument(!speculative, "cannot make non-speculative task speculative");
boolean changed = this.speculative.compareAndSet(true, false);
if (changed) {
wakeupProcessPendingAcquires();
}
}

public boolean isSpeculative()
{
return speculative.get();
}

public Optional<TaskId> getAttachedTaskId()
{
return Optional.ofNullable(this.taskId.get());
Expand Down Expand Up @@ -400,8 +437,10 @@ private static class BinPackingSimulation
{
private final NodesSnapshot nodesSnapshot;
private final List<InternalNode> allNodesSorted;
private final boolean ignoreAcquiredSpeculative;
private final Map<String, Long> nodesRemainingMemory;
private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;
private final Map<String, Long> speculativeMemoryReserved;

private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
private final boolean scheduleOnCoordinator;
Expand All @@ -411,13 +450,15 @@ public BinPackingSimulation(
Map<String, MemoryPoolInfo> nodeMemoryPoolInfos,
Set<BinPackingNodeLease> fulfilledAcquires,
boolean scheduleOnCoordinator,
DataSize taskRuntimeMemoryEstimationOverhead)
DataSize taskRuntimeMemoryEstimationOverhead,
boolean ignoreAcquiredSpeculative)
{
this.nodesSnapshot = requireNonNull(nodesSnapshot, "nodesSnapshot is null");
// use same node ordering for each simulation
this.allNodesSorted = nodesSnapshot.getAllNodes().stream()
.sorted(comparing(InternalNode::getNodeIdentifier))
.collect(toImmutableList());
this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative;

requireNonNull(nodeMemoryPoolInfos, "nodeMemoryPoolInfos is null");
this.nodeMemoryPoolInfos = ImmutableMap.copyOf(nodeMemoryPoolInfos);
Expand All @@ -437,11 +478,25 @@ public BinPackingSimulation(
Map<String, Long> preReservedMemory = new HashMap<>();
SetMultimap<String, BinPackingNodeLease> fulfilledAcquiresByNode = HashMultimap.create();
for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
if (ignoreAcquiredSpeculative && fulfilledAcquire.isSpeculative()) {
continue;
}
InternalNode node = fulfilledAcquire.getAssignedNode();
fulfilledAcquiresByNode.put(node.getNodeIdentifier(), fulfilledAcquire);
preReservedMemory.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease());
}

speculativeMemoryReserved = new HashMap<>();
if (ignoreAcquiredSpeculative) {
for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
if (!fulfilledAcquire.isSpeculative()) {
continue;
}
InternalNode node = fulfilledAcquire.getAssignedNode();
speculativeMemoryReserved.compute(node.getNodeIdentifier(), (key, prev) -> (prev == null ? 0L : prev) + fulfilledAcquire.getMemoryLease());
}
}

nodesRemainingMemory = new HashMap<>();
for (InternalNode node : nodesSnapshot.getAllNodes()) {
MemoryPoolInfo memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
Expand Down Expand Up @@ -505,8 +560,12 @@ public ReserveResult tryReserve(PendingAcquire acquire)
return ReserveResult.NONE_MATCHING;
}

Comparator<InternalNode> comparator = comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier()));
if (ignoreAcquiredSpeculative) {
comparator = resolveTiesWithSpeculativeMemory(comparator);
}
InternalNode selectedNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier())))
.max(comparator)
.orElseThrow();

if (nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.getNodeIdentifier())) {
Expand All @@ -524,13 +583,22 @@ public ReserveResult tryReserve(PendingAcquire acquire)
// If selected node cannot be used right now, select best one ignoring runtime memory usage and reserve space there
// for later use. This is important from algorithm liveliness perspective. If we did not reserve space for a task which
// is too big to be scheduled right now, it could be starved by smaller tasks coming later.
Comparator<InternalNode> fallbackComparator = comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier()));
if (ignoreAcquiredSpeculative) {
fallbackComparator = resolveTiesWithSpeculativeMemory(fallbackComparator);
}
InternalNode fallbackNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())))
.max(fallbackComparator)
.orElseThrow();
subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease());
return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
}

private Comparator<InternalNode> resolveTiesWithSpeculativeMemory(Comparator<InternalNode> comparator)
{
return comparator.thenComparing(node -> -speculativeMemoryReserved.getOrDefault(node.getNodeIdentifier(), 0L));
}

private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease)
{
nodesRemainingMemoryRuntimeAdjusted.compute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ private void scheduleTasks()
continue;
}
MemoryRequirements memoryRequirements = stageExecution.getMemoryRequirements(partitionId);
NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory());
NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory(), scheduledTask.isSpeculative());
lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor);
preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative()));

Expand Down Expand Up @@ -1146,7 +1146,9 @@ public void onSplitAssignment(SplitAssignmentEvent event)
PreSchedulingTaskContext context = preSchedulingTaskContexts.get(prioritizedTask.task());
if (context != null) {
// task is already waiting for node or for sink instance handle
context.setSpeculative(prioritizedTask.isSpeculative()); // update speculative flag
// update speculative flag
context.setSpeculative(prioritizedTask.isSpeculative());
context.getNodeLease().setSpeculative(prioritizedTask.isSpeculative());
return;
}
schedulingQueue.addOrUpdate(prioritizedTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface NodeAllocator
*
* It is obligatory for the calling party to release all the leases they obtained via {@link NodeLease#release()}.
*/
NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement);
NodeLease acquire(NodeRequirements nodeRequirements, DataSize memoryRequirement, boolean speculative);

@Override
void close();
Expand All @@ -40,6 +40,8 @@ interface NodeLease

default void attachTaskId(TaskId taskId) {}

void setSpeculative(boolean speculative);

void release();
}
}
Loading

0 comments on commit 1eba1f3

Please sign in to comment.