From a748831f174046f6f6ae717aadc2455ba9d74dac Mon Sep 17 00:00:00 2001 From: James Petty Date: Wed, 20 Oct 2021 14:17:46 -0400 Subject: [PATCH] Batch merge running operator stats in PipelineContext --- .../java/io/trino/operator/OperatorStats.java | 4 +-- .../io/trino/operator/PipelineContext.java | 25 ++++++++++++------- .../io/trino/operator/TestOperatorStats.java | 5 ++-- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java index 9bb88ea3224e..bd360910424c 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java @@ -443,9 +443,9 @@ public OperatorInfo getInfo() return info; } - public OperatorStats add(OperatorStats... operators) + public OperatorStats add(OperatorStats operatorStats) { - return add(ImmutableList.copyOf(operators)); + return add(ImmutableList.of(operatorStats)); } public OperatorStats add(Iterable operators) diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java index 16eddcfc852d..ac56ff3b2412 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java @@ -17,7 +17,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.stats.CounterStat; import io.airlift.stats.Distribution; @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -385,7 +384,7 @@ public PipelineStats getPipelineStats() List drivers = new ArrayList<>(); TreeMap operatorSummaries = new TreeMap<>(this.operatorSummaries); - Multimap runningOperators = ArrayListMultimap.create(); + ListMultimap runningOperators = ArrayListMultimap.create(); for (DriverContext driverContext : driverContexts) { DriverStats driverStats = driverContext.getDriverStats(); drivers.add(driverStats); @@ -422,15 +421,23 @@ public PipelineStats getPipelineStats() } // merge the running operator stats into the operator summary - for (Entry entry : runningOperators.entries()) { - OperatorStats current = operatorSummaries.get(entry.getKey()); - if (current == null) { - current = entry.getValue(); + for (Integer operatorId : runningOperators.keySet()) { + List runningStats = runningOperators.get(operatorId); + if (runningStats.isEmpty()) { + continue; + } + OperatorStats current = operatorSummaries.get(operatorId); + OperatorStats combined; + if (current != null) { + combined = current.add(runningStats); } else { - current = current.add(entry.getValue()); + combined = runningStats.get(0); + if (runningStats.size() > 1) { + combined = combined.add(runningStats.subList(1, runningStats.size())); + } } - operatorSummaries.put(entry.getKey(), current); + operatorSummaries.put(operatorId, combined); } Set runningDriverStats = drivers.stream() diff --git a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java index 5bba039377e8..bd48778a6802 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java @@ -13,6 +13,7 @@ */ package io.trino.operator; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.json.JsonCodec; import io.airlift.units.DataSize; @@ -196,7 +197,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual) @Test public void testAdd() { - OperatorStats actual = EXPECTED.add(EXPECTED, EXPECTED); + OperatorStats actual = EXPECTED.add(ImmutableList.of(EXPECTED, EXPECTED)); assertEquals(actual.getStageId(), 0); assertEquals(actual.getOperatorId(), 41); @@ -246,7 +247,7 @@ public void testAdd() @Test public void testAddMergeable() { - OperatorStats actual = MERGEABLE.add(MERGEABLE, MERGEABLE); + OperatorStats actual = MERGEABLE.add(ImmutableList.of(MERGEABLE, MERGEABLE)); assertEquals(actual.getStageId(), 0); assertEquals(actual.getOperatorId(), 41);