Skip to content

Commit

Permalink
Make logic for nonSpeculativeTaskCount more explicit
Browse files Browse the repository at this point in the history
Previously there was a brittle assumption that addOrUpdate is called
with just once for non-speculative task and for speculative task we get
exactly two calls
 - add with speculative priority and with
 - update with non-speculative priority

 This PR makes logic more explicit and relaxes assumptions
  • Loading branch information
losipiuk committed May 11, 2023
1 parent 16aba82 commit f32875c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ public E poll()
return entry.getValue();
}

public Prioritized<E> getPrioritized(E element)
{
Entry<E> entry = index.get(element);
if (entry == null) {
return null;
}

return new Prioritized<>(entry.getValue(), entry.getPriority());
}

public Prioritized<E> pollPrioritized()
{
Entry<E> entry = pollEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2049,22 +2049,35 @@ public PrioritizedScheduledTask pollOrThrow()
{
IndexedPriorityQueue.Prioritized<ScheduledTask> task = queue.pollPrioritized();
checkState(task != null, "queue is empty");
if (nonSpeculativeTaskCount > 0) {
// non speculative tasks are always pooled first
PrioritizedScheduledTask prioritizedTask = getPrioritizedTask(task);
if (!prioritizedTask.isSpeculative()) {
nonSpeculativeTaskCount--;
}
// negate priority to reverse operation we do in addOrUpdate
return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority()));
return prioritizedTask;
}

public void addOrUpdate(PrioritizedScheduledTask prioritizedTask)
{
if (!prioritizedTask.isSpeculative()) {
IndexedPriorityQueue.Prioritized<ScheduledTask> previousTask = queue.getPrioritized(prioritizedTask.task());
PrioritizedScheduledTask previousPrioritizedTask = null;
if (previousTask != null) {
previousPrioritizedTask = getPrioritizedTask(previousTask);
}

if (!prioritizedTask.isSpeculative() && (previousPrioritizedTask == null || previousPrioritizedTask.isSpeculative())) {
// number of non-speculative tasks increased
nonSpeculativeTaskCount++;
}
// using negative priority here as will return entries with lowest pririty first and here we use bigger number for tasks with lower priority

// using negative priority here as will return entries with the lowest priority first and here we use bigger number for tasks with lower priority
queue.addOrUpdate(prioritizedTask.task(), -prioritizedTask.priority());
}

private static PrioritizedScheduledTask getPrioritizedTask(IndexedPriorityQueue.Prioritized<ScheduledTask> task)
{
// negate priority to reverse operation we do in addOrUpdate
return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority()));
}
}

private static class SchedulingDelayer
Expand Down

0 comments on commit f32875c

Please sign in to comment.