Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bulk merging of TDigestHistogram #17186

Merged
merged 1 commit into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.tracing.TrinoAttributes;
import io.trino.util.Failures;
import io.trino.util.Optionals;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import org.joda.time.DateTime;

Expand Down Expand Up @@ -458,7 +457,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
long failedInputBlockedTime = 0;

long bufferedDataSize = 0;
Optional<TDigestHistogram> outputBufferUtilization = Optional.empty();
ImmutableList.Builder<TDigestHistogram> bufferUtilizationHistograms = ImmutableList.builderWithExpectedSize(taskInfos.size());
long outputDataSize = 0;
long failedOutputDataSize = 0;
long outputPositions = 0;
Expand Down Expand Up @@ -535,7 +534,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
inputBlockedTime += taskStats.getInputBlockedTime().roundTo(NANOSECONDS);

bufferedDataSize += taskInfo.getOutputBuffers().getTotalBufferedBytes();
outputBufferUtilization = Optionals.combine(outputBufferUtilization, taskInfo.getOutputBuffers().getUtilization(), TDigestHistogram::mergeWith);
taskInfo.getOutputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add);
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
outputDataSize += taskStats.getOutputDataSize().toBytes();
outputPositions += taskStats.getOutputPositions();

Expand Down Expand Up @@ -640,7 +639,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
succinctDuration(inputBlockedTime, NANOSECONDS),
succinctDuration(failedInputBlockedTime, NANOSECONDS),
succinctBytes(bufferedDataSize),
outputBufferUtilization,
TDigestHistogram.merge(bufferUtilizationHistograms.build()),
succinctBytes(outputDataSize),
succinctBytes(failedOutputDataSize),
outputPositions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,47 @@ public DirectExchangeClientStatus mergeWith(DirectExchangeClientStatus other)
requestDuration.mergeWith(other.requestDuration)); // this is correct as long as all clients have the same shape of histogram
}

@Override
public DirectExchangeClientStatus mergeWith(List<DirectExchangeClientStatus> others)
{
if (others.isEmpty()) {
return this;
}

long bufferedBytes = this.bufferedBytes;
long maxBufferedBytes = this.maxBufferedBytes;
long averageBytesPerRequest = this.averageBytesPerRequest;
long successfulRequestsCount = this.successfulRequestsCount;
int bufferedPages = this.bufferedPages;
int spilledPages = this.spilledPages;
long spilledBytes = this.spilledBytes;
boolean noMoreLocations = this.noMoreLocations;
ImmutableList.Builder<TDigestHistogram> requestDurations = ImmutableList.builderWithExpectedSize(others.size());
for (DirectExchangeClientStatus other : others) {
bufferedBytes = (bufferedBytes + other.bufferedBytes) / 2; // this is correct as long as all clients have the same buffer size (capacity)
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
maxBufferedBytes = Math.max(maxBufferedBytes, other.maxBufferedBytes);
averageBytesPerRequest = mergeAvgs(averageBytesPerRequest, successfulRequestsCount, other.averageBytesPerRequest, other.successfulRequestsCount);
successfulRequestsCount = successfulRequestsCount + other.successfulRequestsCount;
bufferedPages = bufferedPages + other.bufferedPages;
spilledPages = spilledPages + other.spilledPages;
spilledBytes = spilledBytes + other.spilledBytes;
noMoreLocations = noMoreLocations && other.noMoreLocations; // if at least one has some locations, mergee has some too
requestDurations.add(other.requestDuration);
}

return new DirectExchangeClientStatus(
bufferedBytes,
maxBufferedBytes,
averageBytesPerRequest,
successfulRequestsCount,
bufferedPages,
spilledPages,
spilledBytes,
noMoreLocations,
ImmutableList.of(), // pageBufferClientStatuses may be long, so we don't want to combine the lists
TDigestHistogram.merge(requestDurations.build()).orElseThrow()); // this is correct as long as all clients have the same shape of histogram
}

private static long mergeAvgs(long value1, long count1, long value2, long count2)
{
if (count1 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -485,6 +486,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)
Optional<BlockedReason> blockedReason = this.blockedReason;

Mergeable<OperatorInfo> base = getMergeableInfoOrNull(info);
ImmutableList.Builder<OperatorInfo> operatorInfos = ImmutableList.builder();
for (OperatorStats operator : operators) {
checkArgument(operator.getOperatorId() == operatorId, "Expected operatorId to be %s but was %s", operatorId, operator.getOperatorId());
checkArgument(operator.getOperatorType().equals(operatorType), "Expected operatorType to be %s but was %s", operatorType, operator.getOperatorType());
Expand Down Expand Up @@ -538,7 +540,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)
OperatorInfo info = operator.getInfo();
if (base != null && info != null) {
verify(base.getClass() == info.getClass(), "Cannot merge operator infos: %s and %s", base, info);
base = mergeInfo(base, info);
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
operatorInfos.add(info);
}
}

Expand Down Expand Up @@ -592,7 +594,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)

blockedReason,

(OperatorInfo) base);
(OperatorInfo) mergeInfos(base, operatorInfos.build()));
}

@SuppressWarnings("unchecked")
Expand All @@ -606,9 +608,12 @@ private static Mergeable<OperatorInfo> getMergeableInfoOrNull(OperatorInfo info)
}

@SuppressWarnings("unchecked")
private static <T> Mergeable<T> mergeInfo(Mergeable<T> base, T other)
private static <T> Mergeable<T> mergeInfos(Mergeable<T> base, List<T> others)
{
return (Mergeable<T>) base.mergeWith(other);
if (base == null) {
return null;
}
return (Mergeable<T>) base.mergeWith(others);
}

public OperatorStats summarize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

import static com.google.common.base.MoreObjects.ToStringHelper;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -190,6 +191,15 @@ public String toString()
return helper.toString();
}

public static Optional<TDigestHistogram> merge(List<TDigestHistogram> histograms)
{
if (histograms.isEmpty()) {
return Optional.empty();
}

return Optional.of(histograms.get(0).mergeWith(histograms.subList(1, histograms.size())));
}

private static String formatDouble(double value)
{
return format(Locale.US, "%.2f", value);
Expand Down