diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java index 58259ab39266..cb66ff328968 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java @@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Sets.newConcurrentHashSet; import static io.airlift.units.DataSize.succinctBytes; @@ -449,10 +448,20 @@ public TaskStats getTaskStats() long physicalWrittenDataSize = 0; + boolean hasRunningPipelines = false; + boolean runningPipelinesFullyBlocked = true; + ImmutableSet.Builder blockedReasons = ImmutableSet.builder(); + for (PipelineStats pipeline : pipelineStats) { if (pipeline.getLastEndTime() != null) { lastExecutionEndTime = max(pipeline.getLastEndTime().getMillis(), lastExecutionEndTime); } + if (pipeline.getRunningDrivers() > 0 || pipeline.getRunningPartitionedDrivers() > 0 || pipeline.getBlockedDrivers() > 0) { + // pipeline is running + hasRunningPipelines = true; + runningPipelinesFullyBlocked &= pipeline.isFullyBlocked(); + blockedReasons.addAll(pipeline.getBlockedReasons()); + } totalDrivers += pipeline.getTotalDrivers(); queuedDrivers += pipeline.getQueuedDrivers(); @@ -525,14 +534,7 @@ public TaskStats getTaskStats() lastSystemMemoryReservation = systemMemory; } - Set runningPipelineStats = pipelineStats.stream() - .filter(pipeline -> pipeline.getRunningDrivers() > 0 || pipeline.getRunningPartitionedDrivers() > 0 || pipeline.getBlockedDrivers() > 0) - .collect(toImmutableSet()); - ImmutableSet blockedReasons = runningPipelineStats.stream() - .flatMap(pipeline -> pipeline.getBlockedReasons().stream()) - .collect(toImmutableSet()); - - boolean fullyBlocked = !runningPipelineStats.isEmpty() && runningPipelineStats.stream().allMatch(PipelineStats::isFullyBlocked); + boolean fullyBlocked = hasRunningPipelines && runningPipelinesFullyBlocked; return new TaskStats( taskStateMachine.getCreatedTime(), @@ -560,7 +562,7 @@ public TaskStats getTaskStats() new Duration(totalCpuTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(totalBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), fullyBlocked && (runningDrivers > 0 || runningPartitionedDrivers > 0), - blockedReasons, + blockedReasons.build(), succinctBytes(physicalInputDataSize), physicalInputPositions, new Duration(physicalInputReadTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),