diff --git a/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java b/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java index 97b49e58860..34b3e068075 100644 --- a/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java +++ b/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java @@ -89,6 +89,16 @@ public E poll() return entry.getValue(); } + public Prioritized getPrioritized(E element) + { + Entry entry = index.get(element); + if (entry == null) { + return null; + } + + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + public Prioritized pollPrioritized() { Entry entry = pollEntry(); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index f71219c9905..b4733a6e120 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -2049,22 +2049,35 @@ public PrioritizedScheduledTask pollOrThrow() { IndexedPriorityQueue.Prioritized task = queue.pollPrioritized(); checkState(task != null, "queue is empty"); - if (nonSpeculativeTaskCount > 0) { - // non speculative tasks are always pooled first + PrioritizedScheduledTask prioritizedTask = getPrioritizedTask(task); + if (!prioritizedTask.isSpeculative()) { nonSpeculativeTaskCount--; } - // negate priority to reverse operation we do in addOrUpdate - return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority())); + return prioritizedTask; } public void addOrUpdate(PrioritizedScheduledTask prioritizedTask) { - if (!prioritizedTask.isSpeculative()) { + IndexedPriorityQueue.Prioritized previousTask = queue.getPrioritized(prioritizedTask.task()); + PrioritizedScheduledTask previousPrioritizedTask = null; + if (previousTask != null) { + previousPrioritizedTask = getPrioritizedTask(previousTask); + } + + if (!prioritizedTask.isSpeculative() && (previousPrioritizedTask == null || previousPrioritizedTask.isSpeculative())) { + // number of non-speculative tasks increased nonSpeculativeTaskCount++; } - // using negative priority here as will return entries with lowest pririty first and here we use bigger number for tasks with lower priority + + // using negative priority here as will return entries with the lowest priority first and here we use bigger number for tasks with lower priority queue.addOrUpdate(prioritizedTask.task(), -prioritizedTask.priority()); } + + private static PrioritizedScheduledTask getPrioritizedTask(IndexedPriorityQueue.Prioritized task) + { + // negate priority to reverse operation we do in addOrUpdate + return new PrioritizedScheduledTask(task.getValue(), toIntExact(-task.getPriority())); + } } private static class SchedulingDelayer