Skip to content

Commit

Permalink
Batch merge running operator stats in PipelineContext
Browse files Browse the repository at this point in the history
  • Loading branch information
pettyjamesm authored and martint committed Nov 12, 2021
1 parent c9c711e commit a748831
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,9 @@ public OperatorInfo getInfo()
return info;
}

public OperatorStats add(OperatorStats... operators)
public OperatorStats add(OperatorStats operatorStats)
{
return add(ImmutableList.copyOf(operators));
return add(ImmutableList.of(operatorStats));
}

public OperatorStats add(Iterable<OperatorStats> operators)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.ListMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.stats.CounterStat;
import io.airlift.stats.Distribution;
Expand All @@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -385,7 +384,7 @@ public PipelineStats getPipelineStats()
List<DriverStats> drivers = new ArrayList<>();

TreeMap<Integer, OperatorStats> operatorSummaries = new TreeMap<>(this.operatorSummaries);
Multimap<Integer, OperatorStats> runningOperators = ArrayListMultimap.create();
ListMultimap<Integer, OperatorStats> runningOperators = ArrayListMultimap.create();
for (DriverContext driverContext : driverContexts) {
DriverStats driverStats = driverContext.getDriverStats();
drivers.add(driverStats);
Expand Down Expand Up @@ -422,15 +421,23 @@ public PipelineStats getPipelineStats()
}

// merge the running operator stats into the operator summary
for (Entry<Integer, OperatorStats> entry : runningOperators.entries()) {
OperatorStats current = operatorSummaries.get(entry.getKey());
if (current == null) {
current = entry.getValue();
for (Integer operatorId : runningOperators.keySet()) {
List<OperatorStats> runningStats = runningOperators.get(operatorId);
if (runningStats.isEmpty()) {
continue;
}
OperatorStats current = operatorSummaries.get(operatorId);
OperatorStats combined;
if (current != null) {
combined = current.add(runningStats);
}
else {
current = current.add(entry.getValue());
combined = runningStats.get(0);
if (runningStats.size() > 1) {
combined = combined.add(runningStats.subList(1, runningStats.size()));
}
}
operatorSummaries.put(entry.getKey(), current);
operatorSummaries.put(operatorId, combined);
}

Set<DriverStats> runningDriverStats = drivers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -196,7 +197,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual)
@Test
public void testAdd()
{
OperatorStats actual = EXPECTED.add(EXPECTED, EXPECTED);
OperatorStats actual = EXPECTED.add(ImmutableList.of(EXPECTED, EXPECTED));

assertEquals(actual.getStageId(), 0);
assertEquals(actual.getOperatorId(), 41);
Expand Down Expand Up @@ -246,7 +247,7 @@ public void testAdd()
@Test
public void testAddMergeable()
{
OperatorStats actual = MERGEABLE.add(MERGEABLE, MERGEABLE);
OperatorStats actual = MERGEABLE.add(ImmutableList.of(MERGEABLE, MERGEABLE));

assertEquals(actual.getStageId(), 0);
assertEquals(actual.getOperatorId(), 41);
Expand Down

0 comments on commit a748831

Please sign in to comment.