Skip to content

Commit

Permalink
Avoid intermediate collection creation in TaskStats creation
Browse files Browse the repository at this point in the history
  • Loading branch information
pettyjamesm authored and martint committed Nov 12, 2021
1 parent 1535550 commit cfd0c24
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions core/trino-main/src/main/java/io/trino/operator/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.units.DataSize.succinctBytes;
Expand Down Expand Up @@ -449,10 +448,20 @@ public TaskStats getTaskStats()

long physicalWrittenDataSize = 0;

boolean hasRunningPipelines = false;
boolean runningPipelinesFullyBlocked = true;
ImmutableSet.Builder<BlockedReason> blockedReasons = ImmutableSet.builder();

for (PipelineStats pipeline : pipelineStats) {
if (pipeline.getLastEndTime() != null) {
lastExecutionEndTime = max(pipeline.getLastEndTime().getMillis(), lastExecutionEndTime);
}
if (pipeline.getRunningDrivers() > 0 || pipeline.getRunningPartitionedDrivers() > 0 || pipeline.getBlockedDrivers() > 0) {
// pipeline is running
hasRunningPipelines = true;
runningPipelinesFullyBlocked &= pipeline.isFullyBlocked();
blockedReasons.addAll(pipeline.getBlockedReasons());
}

totalDrivers += pipeline.getTotalDrivers();
queuedDrivers += pipeline.getQueuedDrivers();
Expand Down Expand Up @@ -525,14 +534,7 @@ public TaskStats getTaskStats()
lastSystemMemoryReservation = systemMemory;
}

Set<PipelineStats> runningPipelineStats = pipelineStats.stream()
.filter(pipeline -> pipeline.getRunningDrivers() > 0 || pipeline.getRunningPartitionedDrivers() > 0 || pipeline.getBlockedDrivers() > 0)
.collect(toImmutableSet());
ImmutableSet<BlockedReason> blockedReasons = runningPipelineStats.stream()
.flatMap(pipeline -> pipeline.getBlockedReasons().stream())
.collect(toImmutableSet());

boolean fullyBlocked = !runningPipelineStats.isEmpty() && runningPipelineStats.stream().allMatch(PipelineStats::isFullyBlocked);
boolean fullyBlocked = hasRunningPipelines && runningPipelinesFullyBlocked;

return new TaskStats(
taskStateMachine.getCreatedTime(),
Expand Down Expand Up @@ -560,7 +562,7 @@ public TaskStats getTaskStats()
new Duration(totalCpuTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
new Duration(totalBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
fullyBlocked && (runningDrivers > 0 || runningPartitionedDrivers > 0),
blockedReasons,
blockedReasons.build(),
succinctBytes(physicalInputDataSize),
physicalInputPositions,
new Duration(physicalInputReadTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down

0 comments on commit cfd0c24

Please sign in to comment.