Skip to content

Commit

Permalink
Reduce number of TDigestHistogram allocations
Browse files Browse the repository at this point in the history
On coordinator operators stats from all tasks
will be merged. It does make sense to perform
merging as bulk operation.
  • Loading branch information
sopel39 committed Mar 22, 2023
1 parent c001664 commit a228741
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
16 changes: 16 additions & 0 deletions core/trino-spi/src/main/java/io/trino/spi/Mergeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.spi;

import java.util.List;

public interface Mergeable<T>
{
/**
Expand All @@ -21,4 +23,18 @@ public interface Mergeable<T>
* @throws NullPointerException if other is null
*/
T mergeWith(T other);

@SuppressWarnings("unchecked")
default T mergeWith(List<T> others)
{
if (others.isEmpty()) {
return (T) this;
}

Mergeable<T> result = this;
for (T other : others) {
result = (Mergeable<T>) result.mergeWith(other);
}
return (T) result;
}
}
27 changes: 19 additions & 8 deletions core/trino-spi/src/main/java/io/trino/spi/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.trino.spi.Mergeable;
import io.trino.spi.Unstable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
Expand Down Expand Up @@ -85,31 +87,40 @@ public String toString()

public static class Accumulator
{
private final Map<String, Metric<?>> merged = new HashMap<>();
private final Map<String, List<Metric<?>>> groupedMetrics = new HashMap<>();

private Accumulator()
{
}

public Accumulator add(Metrics metrics)
public Accumulator add(List<Metrics> metricsList)
{
metrics.getMetrics().forEach((key, value) ->
merged.merge(key, value, Accumulator::merge));
metricsList.forEach(this::add);
return this;
}

@SuppressWarnings({"rawtypes", "unchecked"})
private static Metric<?> merge(Metric<?> a, Metric<?> b)
public Accumulator add(Metrics metrics)
{
return (Metric<?>) ((Metric) a).mergeWith(b);
metrics.getMetrics().forEach((key, value) ->
groupedMetrics.computeIfAbsent(key, ignored -> new ArrayList<>()).add(value));
return this;
}

public Metrics get()
{
if (merged.isEmpty()) {
if (groupedMetrics.isEmpty()) {
return EMPTY;
}

Map<String, Metric<?>> merged = new HashMap<>();
groupedMetrics.forEach((key, values) -> merged.put(key, merge(values.get(0), values.subList(1, values.size()))));
return new Metrics(merged);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private static Metric<?> merge(Metric<?> a, List<Metric<?>> b)
{
return (Metric<?>) ((Metric) a).mergeWith(b);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.metrics.Distribution;

import java.util.Base64;
import java.util.List;
import java.util.Locale;

import static com.google.common.base.MoreObjects.ToStringHelper;
Expand Down Expand Up @@ -70,6 +71,20 @@ public TDigestHistogram mergeWith(TDigestHistogram other)
return new TDigestHistogram(result);
}

@Override
public TDigestHistogram mergeWith(List<TDigestHistogram> others)
{
if (others.isEmpty()) {
return this;
}

TDigest result = getDigest();
for (TDigestHistogram other : others) {
other.mergeTo(result);
}
return new TDigestHistogram(result);
}

private synchronized void mergeTo(TDigest digest)
{
digest.mergeWith(this.digest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Map;

import static io.trino.spi.metrics.Metrics.accumulator;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -110,6 +111,13 @@ public void testFailIncompatibleTypes()
merge(m1, m2);
}

@Test
public void testReduceSingleMetrics()
{
Metrics metrics = new Metrics(ImmutableMap.of("a", new LongCount(0)));
assertThat(accumulator().add(metrics).get()).isEqualTo(metrics);
}

private static Metrics merge(Metrics... metrics)
{
return Arrays.stream(metrics).reduce(Metrics.EMPTY, Metrics::mergeWith);
Expand Down

0 comments on commit a228741

Please sign in to comment.