diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index d4d479d092ce..5d5d87d3aad1 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -35,7 +35,6 @@ import io.trino.sql.planner.plan.TableScanNode; import io.trino.tracing.TrinoAttributes; import io.trino.util.Failures; -import io.trino.util.Optionals; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import org.joda.time.DateTime; @@ -458,7 +457,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) long failedInputBlockedTime = 0; long bufferedDataSize = 0; - Optional outputBufferUtilization = Optional.empty(); + ImmutableList.Builder bufferUtilizationHistograms = ImmutableList.builderWithExpectedSize(taskInfos.size()); long outputDataSize = 0; long failedOutputDataSize = 0; long outputPositions = 0; @@ -535,7 +534,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) inputBlockedTime += taskStats.getInputBlockedTime().roundTo(NANOSECONDS); bufferedDataSize += taskInfo.getOutputBuffers().getTotalBufferedBytes(); - outputBufferUtilization = Optionals.combine(outputBufferUtilization, taskInfo.getOutputBuffers().getUtilization(), TDigestHistogram::mergeWith); + taskInfo.getOutputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add); outputDataSize += taskStats.getOutputDataSize().toBytes(); outputPositions += taskStats.getOutputPositions(); @@ -640,7 +639,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) succinctDuration(inputBlockedTime, NANOSECONDS), succinctDuration(failedInputBlockedTime, NANOSECONDS), succinctBytes(bufferedDataSize), - outputBufferUtilization, + TDigestHistogram.merge(bufferUtilizationHistograms.build()), succinctBytes(outputDataSize), succinctBytes(failedOutputDataSize), outputPositions, diff --git a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientStatus.java b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientStatus.java index 4d1bf5efd568..ea95c7586a03 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientStatus.java +++ b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientStatus.java @@ -162,6 +162,47 @@ public DirectExchangeClientStatus mergeWith(DirectExchangeClientStatus other) requestDuration.mergeWith(other.requestDuration)); // this is correct as long as all clients have the same shape of histogram } + @Override + public DirectExchangeClientStatus mergeWith(List others) + { + if (others.isEmpty()) { + return this; + } + + long bufferedBytes = this.bufferedBytes; + long maxBufferedBytes = this.maxBufferedBytes; + long averageBytesPerRequest = this.averageBytesPerRequest; + long successfulRequestsCount = this.successfulRequestsCount; + int bufferedPages = this.bufferedPages; + int spilledPages = this.spilledPages; + long spilledBytes = this.spilledBytes; + boolean noMoreLocations = this.noMoreLocations; + ImmutableList.Builder requestDurations = ImmutableList.builderWithExpectedSize(others.size()); + for (DirectExchangeClientStatus other : others) { + bufferedBytes = (bufferedBytes + other.bufferedBytes) / 2; // this is correct as long as all clients have the same buffer size (capacity) + maxBufferedBytes = Math.max(maxBufferedBytes, other.maxBufferedBytes); + averageBytesPerRequest = mergeAvgs(averageBytesPerRequest, successfulRequestsCount, other.averageBytesPerRequest, other.successfulRequestsCount); + successfulRequestsCount = successfulRequestsCount + other.successfulRequestsCount; + bufferedPages = bufferedPages + other.bufferedPages; + spilledPages = spilledPages + other.spilledPages; + spilledBytes = spilledBytes + other.spilledBytes; + noMoreLocations = noMoreLocations && other.noMoreLocations; // if at least one has some locations, mergee has some too + requestDurations.add(other.requestDuration); + } + + return new DirectExchangeClientStatus( + bufferedBytes, + maxBufferedBytes, + averageBytesPerRequest, + successfulRequestsCount, + bufferedPages, + spilledPages, + spilledBytes, + noMoreLocations, + ImmutableList.of(), // pageBufferClientStatuses may be long, so we don't want to combine the lists + TDigestHistogram.merge(requestDurations.build()).orElseThrow()); // this is correct as long as all clients have the same shape of histogram + } + private static long mergeAvgs(long value1, long count1, long value2, long count2) { if (count1 == 0) { diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java index 2908b0ea3929..f5f09ab08168 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java @@ -25,6 +25,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import java.util.List; import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; @@ -485,6 +486,7 @@ public OperatorStats add(Iterable operators) Optional blockedReason = this.blockedReason; Mergeable base = getMergeableInfoOrNull(info); + ImmutableList.Builder operatorInfos = ImmutableList.builder(); for (OperatorStats operator : operators) { checkArgument(operator.getOperatorId() == operatorId, "Expected operatorId to be %s but was %s", operatorId, operator.getOperatorId()); checkArgument(operator.getOperatorType().equals(operatorType), "Expected operatorType to be %s but was %s", operatorType, operator.getOperatorType()); @@ -538,7 +540,7 @@ public OperatorStats add(Iterable operators) OperatorInfo info = operator.getInfo(); if (base != null && info != null) { verify(base.getClass() == info.getClass(), "Cannot merge operator infos: %s and %s", base, info); - base = mergeInfo(base, info); + operatorInfos.add(info); } } @@ -592,7 +594,7 @@ public OperatorStats add(Iterable operators) blockedReason, - (OperatorInfo) base); + (OperatorInfo) mergeInfos(base, operatorInfos.build())); } @SuppressWarnings("unchecked") @@ -606,9 +608,12 @@ private static Mergeable getMergeableInfoOrNull(OperatorInfo info) } @SuppressWarnings("unchecked") - private static Mergeable mergeInfo(Mergeable base, T other) + private static Mergeable mergeInfos(Mergeable base, List others) { - return (Mergeable) base.mergeWith(other); + if (base == null) { + return null; + } + return (Mergeable) base.mergeWith(others); } public OperatorStats summarize() diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java index 81872c8be2f9..a7b2be702930 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java @@ -26,6 +26,7 @@ import java.util.Base64; import java.util.List; import java.util.Locale; +import java.util.Optional; import static com.google.common.base.MoreObjects.ToStringHelper; import static com.google.common.base.MoreObjects.toStringHelper; @@ -190,6 +191,15 @@ public String toString() return helper.toString(); } + public static Optional merge(List histograms) + { + if (histograms.isEmpty()) { + return Optional.empty(); + } + + return Optional.of(histograms.get(0).mergeWith(histograms.subList(1, histograms.size()))); + } + private static String formatDouble(double value) { return format(Locale.US, "%.2f", value);