Skip to content

Commit

Permalink
Use bulk merging of TDigestHistogram
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Apr 24, 2023
1 parent a179619 commit e1cba30
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.tracing.TrinoAttributes;
import io.trino.util.Failures;
import io.trino.util.Optionals;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import org.joda.time.DateTime;

Expand Down Expand Up @@ -458,7 +457,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
long failedInputBlockedTime = 0;

long bufferedDataSize = 0;
Optional<TDigestHistogram> outputBufferUtilization = Optional.empty();
ImmutableList.Builder<TDigestHistogram> bufferUtilizationHistograms = ImmutableList.builderWithExpectedSize(taskInfos.size());
long outputDataSize = 0;
long failedOutputDataSize = 0;
long outputPositions = 0;
Expand Down Expand Up @@ -535,7 +534,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
inputBlockedTime += taskStats.getInputBlockedTime().roundTo(NANOSECONDS);

bufferedDataSize += taskInfo.getOutputBuffers().getTotalBufferedBytes();
outputBufferUtilization = Optionals.combine(outputBufferUtilization, taskInfo.getOutputBuffers().getUtilization(), TDigestHistogram::mergeWith);
taskInfo.getOutputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add);
outputDataSize += taskStats.getOutputDataSize().toBytes();
outputPositions += taskStats.getOutputPositions();

Expand Down Expand Up @@ -640,7 +639,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
succinctDuration(inputBlockedTime, NANOSECONDS),
succinctDuration(failedInputBlockedTime, NANOSECONDS),
succinctBytes(bufferedDataSize),
outputBufferUtilization,
TDigestHistogram.merge(bufferUtilizationHistograms.build()),
succinctBytes(outputDataSize),
succinctBytes(failedOutputDataSize),
outputPositions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,47 @@ public DirectExchangeClientStatus mergeWith(DirectExchangeClientStatus other)
requestDuration.mergeWith(other.requestDuration)); // this is correct as long as all clients have the same shape of histogram
}

@Override
public DirectExchangeClientStatus mergeWith(List<DirectExchangeClientStatus> others)
{
if (others.isEmpty()) {
return this;
}

long bufferedBytes = this.bufferedBytes;
long maxBufferedBytes = this.maxBufferedBytes;
long averageBytesPerRequest = this.averageBytesPerRequest;
long successfulRequestsCount = this.successfulRequestsCount;
int bufferedPages = this.bufferedPages;
int spilledPages = this.spilledPages;
long spilledBytes = this.spilledBytes;
boolean noMoreLocations = this.noMoreLocations;
ImmutableList.Builder<TDigestHistogram> requestDurations = ImmutableList.builderWithExpectedSize(others.size());
for (DirectExchangeClientStatus other : others) {
bufferedBytes = (bufferedBytes + other.bufferedBytes) / 2; // this is correct as long as all clients have the same buffer size (capacity)
maxBufferedBytes = Math.max(maxBufferedBytes, other.maxBufferedBytes);
averageBytesPerRequest = mergeAvgs(averageBytesPerRequest, successfulRequestsCount, other.averageBytesPerRequest, other.successfulRequestsCount);
successfulRequestsCount = successfulRequestsCount + other.successfulRequestsCount;
bufferedPages = bufferedPages + other.bufferedPages;
spilledPages = spilledPages + other.spilledPages;
spilledBytes = spilledBytes + other.spilledBytes;
noMoreLocations = noMoreLocations && other.noMoreLocations; // if at least one has some locations, mergee has some too
requestDurations.add(other.requestDuration);
}

return new DirectExchangeClientStatus(
bufferedBytes,
maxBufferedBytes,
averageBytesPerRequest,
successfulRequestsCount,
bufferedPages,
spilledPages,
spilledBytes,
noMoreLocations,
ImmutableList.of(), // pageBufferClientStatuses may be long, so we don't want to combine the lists
TDigestHistogram.merge(requestDurations.build()).orElseThrow()); // this is correct as long as all clients have the same shape of histogram
}

private static long mergeAvgs(long value1, long count1, long value2, long count2)
{
if (count1 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -485,6 +486,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)
Optional<BlockedReason> blockedReason = this.blockedReason;

Mergeable<OperatorInfo> base = getMergeableInfoOrNull(info);
ImmutableList.Builder<OperatorInfo> operatorInfos = ImmutableList.builder();
for (OperatorStats operator : operators) {
checkArgument(operator.getOperatorId() == operatorId, "Expected operatorId to be %s but was %s", operatorId, operator.getOperatorId());
checkArgument(operator.getOperatorType().equals(operatorType), "Expected operatorType to be %s but was %s", operatorType, operator.getOperatorType());
Expand Down Expand Up @@ -538,7 +540,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)
OperatorInfo info = operator.getInfo();
if (base != null && info != null) {
verify(base.getClass() == info.getClass(), "Cannot merge operator infos: %s and %s", base, info);
base = mergeInfo(base, info);
operatorInfos.add(info);
}
}

Expand Down Expand Up @@ -592,7 +594,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)

blockedReason,

(OperatorInfo) base);
(OperatorInfo) mergeInfos(base, operatorInfos.build()));
}

@SuppressWarnings("unchecked")
Expand All @@ -606,9 +608,12 @@ private static Mergeable<OperatorInfo> getMergeableInfoOrNull(OperatorInfo info)
}

@SuppressWarnings("unchecked")
private static <T> Mergeable<T> mergeInfo(Mergeable<T> base, T other)
private static <T> Mergeable<T> mergeInfos(Mergeable<T> base, List<T> others)
{
return (Mergeable<T>) base.mergeWith(other);
if (base == null) {
return null;
}
return (Mergeable<T>) base.mergeWith(others);
}

public OperatorStats summarize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

import static com.google.common.base.MoreObjects.ToStringHelper;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -190,6 +191,15 @@ public String toString()
return helper.toString();
}

public static Optional<TDigestHistogram> merge(List<TDigestHistogram> histograms)
{
if (histograms.isEmpty()) {
return Optional.empty();
}

return Optional.of(histograms.get(0).mergeWith(histograms.subList(1, histograms.size())));
}

private static String formatDouble(double value)
{
return format(Locale.US, "%.2f", value);
Expand Down

0 comments on commit e1cba30

Please sign in to comment.