Skip to content

Commit

Permalink
Priritize node processing for non-speculative tasks in EventDrivenFau…
Browse files Browse the repository at this point in the history
…ltTolerantQueryScheduler
  • Loading branch information
losipiuk committed May 16, 2023
1 parent 7377df5 commit f742394
Showing 1 changed file with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -881,10 +881,35 @@ private StageId getStageId(PlanFragmentId fragmentId)

private void scheduleTasks()
{
long tasksWaitingForNode = preSchedulingTaskContexts.values().stream().filter(context -> !context.getNodeLease().getNode().isDone()).count();
long speculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream()
.filter(context -> !context.getNodeLease().getNode().isDone())
.filter(PreSchedulingTaskContext::isSpeculative)
.count();

long nonSpeculativeTasksWaitingForNode = preSchedulingTaskContexts.values().stream()
.filter(context -> !context.getNodeLease().getNode().isDone())
.filter(preSchedulingTaskContext -> !preSchedulingTaskContext.isSpeculative())
.count();

while (!schedulingQueue.isEmpty()) {
if (nonSpeculativeTasksWaitingForNode >= maxTasksWaitingForNode) {
break;
}

PrioritizedScheduledTask scheduledTask = schedulingQueue.peekOrThrow();

if (scheduledTask.isSpeculative() && nonSpeculativeTasksWaitingForNode > 0) {
// do not handle any speculative tasks if there are non-speculative waiting
break;
}

if (scheduledTask.isSpeculative() && speculativeTasksWaitingForNode >= maxTasksWaitingForNode) {
// too many speculative tasks waiting for node
break;
}

verify(schedulingQueue.pollOrThrow().equals(scheduledTask));

while (tasksWaitingForNode < maxTasksWaitingForNode && !schedulingQueue.isEmpty()) {
PrioritizedScheduledTask scheduledTask = schedulingQueue.pollOrThrow();
StageExecution stageExecution = getStageExecution(scheduledTask.task().stageId());
if (stageExecution.getState().isDone()) {
continue;
Expand All @@ -899,7 +924,13 @@ private void scheduleTasks()
NodeLease lease = nodeAllocator.acquire(nodeRequirements.get(), memoryRequirements.getRequiredMemory());
lease.getNode().addListener(() -> eventQueue.add(Event.WAKE_UP), queryExecutor);
preSchedulingTaskContexts.put(scheduledTask.task(), new PreSchedulingTaskContext(lease, scheduledTask.isSpeculative()));
tasksWaitingForNode++;

if (scheduledTask.isSpeculative()) {
speculativeTasksWaitingForNode++;
}
else {
nonSpeculativeTasksWaitingForNode++;
}
}
}

Expand Down Expand Up @@ -2056,6 +2087,14 @@ public PrioritizedScheduledTask pollOrThrow()
return prioritizedTask;
}

public PrioritizedScheduledTask peekOrThrow()
{
IndexedPriorityQueue.Prioritized<ScheduledTask> task = queue.peekPrioritized();
checkState(task != null, "queue is empty");
// negate priority to reverse operation we do in addOrUpdate
return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority()));
}

public void addOrUpdate(PrioritizedScheduledTask prioritizedTask)
{
IndexedPriorityQueue.Prioritized<ScheduledTask> previousTask = queue.getPrioritized(prioritizedTask.task());
Expand Down

0 comments on commit f742394

Please sign in to comment.