diff --git a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java index 91f105bc2633..2bfcc7962901 100644 --- a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java +++ b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java @@ -58,6 +58,7 @@ import io.trino.spi.eventlistener.QueryOutputMetadata; import io.trino.spi.eventlistener.QueryStatistics; import io.trino.spi.eventlistener.StageCpuDistribution; +import io.trino.spi.eventlistener.StageOutputBufferUtilization; import io.trino.spi.metrics.Metrics; import io.trino.spi.resourcegroups.QueryType; import io.trino.spi.resourcegroups.ResourceGroupId; @@ -76,6 +77,7 @@ import javax.inject.Inject; +import java.time.Duration; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -220,6 +222,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur true, ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), Optional.empty()), createQueryContext( queryInfo.getSession(), @@ -324,6 +327,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo) queryStats.getCompletedDrivers(), queryInfo.isFinalQueryInfo(), getCpuDistributions(queryInfo), + getStageOutputBufferUtilizations(queryInfo), operatorSummaries.build(), serializedPlanNodeStatsAndCosts); } @@ -703,6 +707,44 @@ private static StageCpuDistribution computeCpuDistribution(StageInfo stageInfo) snapshot.getTotal() / snapshot.getCount()); } + private static List getStageOutputBufferUtilizations(QueryInfo queryInfo) + { + if (queryInfo.getOutputStage().isEmpty()) { + return ImmutableList.of(); + } + + ImmutableList.Builder builder = ImmutableList.builder(); + populateStageOutputBufferUtilization(queryInfo.getOutputStage().get(), builder); + + return builder.build(); + } + + private static void populateStageOutputBufferUtilization(StageInfo stageInfo, ImmutableList.Builder utilizations) + { + stageInfo.getStageStats().getOutputBufferUtilization() + .ifPresent(utilization -> { + utilizations.add(new StageOutputBufferUtilization( + stageInfo.getStageId().getId(), + stageInfo.getTasks().size(), + // scale ratio to percentages + utilization.getP01() * 100, + utilization.getP05() * 100, + utilization.getP10() * 100, + utilization.getP25() * 100, + utilization.getP50() * 100, + utilization.getP75() * 100, + utilization.getP90() * 100, + utilization.getP95() * 100, + utilization.getP99() * 100, + utilization.getMin() * 100, + utilization.getMax() * 100, + Duration.ofNanos(utilization.getTotal()))); + }); + for (StageInfo subStage : stageInfo.getSubStages()) { + populateStageOutputBufferUtilization(subStage, utilizations); + } + } + private static class FragmentNode { private final PlanFragmentId fragmentId; diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 057923cd27c0..9fa906c5b04d 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -183,7 +183,8 @@ java.method.numberOfParametersChanged - method void io.trino.spi.eventlistener.QueryStatistics::<init>(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List<io.trino.spi.eventlistener.StageGcStatistics>, int, boolean, java.util.List<io.trino.spi.eventlistener.StageCpuDistribution>, java.util.List<java.util.Optional<io.trino.spi.metrics.Distribution<?>>>, java.util.List<java.lang.String>, java.util.Optional<java.lang.String>) + method void io.trino.spi.eventlistener.QueryStatistics::<init>(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List<io.trino.spi.eventlistener.StageGcStatistics>, int, boolean, java.util.List<io.trino.spi.eventlistener.StageCpuDistribution>, java.util.List<java.util.Optional<io.trino.spi.metrics.Distribution<?>>>, java.util.List<java.lang.String>, java.util.Optional<java.lang.String>) + method void io.trino.spi.eventlistener.QueryStatistics::<init>(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List<io.trino.spi.eventlistener.StageGcStatistics>, int, boolean, java.util.List<io.trino.spi.eventlistener.StageCpuDistribution>, java.util.List<java.lang.String>, java.util.Optional<java.lang.String>) @@ -191,6 +192,13 @@ java.method.removed method java.util.List<java.util.Optional<io.trino.spi.metrics.Distribution<?>>> io.trino.spi.eventlistener.QueryStatistics::getStageOutputBufferUtilizationDistribution() + + true + java.method.numberOfParametersChanged + method void io.trino.spi.eventlistener.QueryStatistics::<init>(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List<io.trino.spi.eventlistener.StageGcStatistics>, int, boolean, java.util.List<io.trino.spi.eventlistener.StageCpuDistribution>, java.util.List<java.lang.String>, java.util.Optional<java.lang.String>) + method void io.trino.spi.eventlistener.QueryStatistics::<init>(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List<io.trino.spi.eventlistener.StageGcStatistics>, int, boolean, java.util.List<io.trino.spi.eventlistener.StageCpuDistribution>, java.util.List<io.trino.spi.eventlistener.StageOutputBufferUtilization>, java.util.List<java.lang.String>, java.util.Optional<java.lang.String>) + + diff --git a/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java index 240c3b0ef7eb..5577986cf247 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java +++ b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java @@ -67,6 +67,7 @@ public class QueryStatistics private final boolean complete; private final List cpuTimeDistribution; + private final List outputBufferUtilization; /** * Operator summaries serialized to JSON. Serialization format and structure @@ -116,6 +117,7 @@ public QueryStatistics( int completedSplits, boolean complete, List cpuTimeDistribution, + List outputBufferUtilization, List operatorSummaries, Optional planNodeStatsAndCosts) { @@ -154,6 +156,7 @@ public QueryStatistics( this.completedSplits = completedSplits; this.complete = complete; this.cpuTimeDistribution = requireNonNull(cpuTimeDistribution, "cpuTimeDistribution is null"); + this.outputBufferUtilization = requireNonNull(outputBufferUtilization, "outputBufferUtilization is null"); this.operatorSummaries = requireNonNull(operatorSummaries, "operatorSummaries is null"); this.planNodeStatsAndCosts = requireNonNull(planNodeStatsAndCosts, "planNodeStatsAndCosts is null"); } @@ -368,6 +371,12 @@ public List getCpuTimeDistribution() return cpuTimeDistribution; } + @JsonProperty + public List getOutputBufferUtilization() + { + return outputBufferUtilization; + } + @JsonProperty public List getOperatorSummaries() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/eventlistener/StageOutputBufferUtilization.java b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/StageOutputBufferUtilization.java new file mode 100644 index 000000000000..fbd168c0c620 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/StageOutputBufferUtilization.java @@ -0,0 +1,159 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.eventlistener; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Duration; + +import static java.util.Objects.requireNonNull; + +/** + * This class is JSON serializable for convenience and serialization compatibility is not guaranteed across versions. + */ +public class StageOutputBufferUtilization +{ + private final int stageId; + private final int tasks; + private final double p01; + private final double p05; + private final double p10; + private final double p25; + private final double p50; + private final double p75; + private final double p90; + private final double p95; + private final double p99; + private final double min; + private final double max; + private final Duration duration; + + @JsonCreator + public StageOutputBufferUtilization( + int stageId, + int tasks, + double p01, + double p05, + double p10, + double p25, + double p50, + double p75, + double p90, + double p95, + double p99, + double min, + double max, + Duration duration) + { + this.stageId = stageId; + this.tasks = tasks; + this.p01 = p01; + this.p05 = p05; + this.p10 = p10; + this.p25 = p25; + this.p50 = p50; + this.p75 = p75; + this.p90 = p90; + this.p95 = p95; + this.p99 = p99; + this.min = min; + this.max = max; + this.duration = requireNonNull(duration, "duration is null"); + } + + @JsonProperty + public int getStageId() + { + return stageId; + } + + @JsonProperty + public int getTasks() + { + return tasks; + } + + @JsonProperty + public double getP01() + { + return p01; + } + + @JsonProperty + public double getP05() + { + return p05; + } + + @JsonProperty + public double getP10() + { + return p10; + } + + @JsonProperty + public double getP25() + { + return p25; + } + + @JsonProperty + public double getP50() + { + return p50; + } + + @JsonProperty + public double getP75() + { + return p75; + } + + @JsonProperty + public double getP90() + { + return p90; + } + + @JsonProperty + public double getP95() + { + return p95; + } + + @JsonProperty + public double getP99() + { + return p99; + } + + @JsonProperty + public double getMin() + { + return min; + } + + @JsonProperty + public double getMax() + { + return max; + } + + @JsonProperty + public Duration getDuration() + { + return duration; + } +} diff --git a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java index c44ad7b07a2c..6f270ff080a7 100644 --- a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java +++ b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java @@ -27,6 +27,7 @@ import io.trino.spi.eventlistener.QueryStatistics; import io.trino.spi.eventlistener.SplitCompletedEvent; import io.trino.spi.eventlistener.SplitStatistics; +import io.trino.spi.eventlistener.StageOutputBufferUtilization; import io.trino.spi.resourcegroups.QueryType; import io.trino.spi.resourcegroups.ResourceGroupId; import io.trino.spi.session.ResourceEstimates; @@ -179,6 +180,7 @@ public class TestHttpEventListener 0, true, Collections.emptyList(), + List.of(new StageOutputBufferUtilization(0, 10, 0.1, 0.5, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99, 0.0, 1.0, Duration.ofSeconds(1234))), Collections.emptyList(), Optional.empty()); diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java index 8ec8d05875e7..18c8ae9ceea8 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java @@ -183,6 +183,7 @@ public void testSplitsForNormalQuery() assertTrue(statistics.getWallTime().getSeconds() >= 0); assertTrue(statistics.getCpuTimeDistribution().size() > 0); assertTrue(statistics.getOperatorSummaries().size() > 0); + assertTrue(statistics.getOutputBufferUtilization().size() > 0); } @Test