Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose stage buffer utilization distribution to event listener #14638

Merged
merged 2 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.trino.spi.eventlistener.QueryOutputMetadata;
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.eventlistener.StageCpuDistribution;
import io.trino.spi.eventlistener.StageOutputBufferUtilization;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.resourcegroups.QueryType;
import io.trino.spi.resourcegroups.ResourceGroupId;
Expand All @@ -76,6 +77,7 @@

import javax.inject.Inject;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -220,6 +222,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
true,
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
Optional.empty()),
createQueryContext(
queryInfo.getSession(),
Expand Down Expand Up @@ -324,6 +327,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
queryStats.getCompletedDrivers(),
queryInfo.isFinalQueryInfo(),
getCpuDistributions(queryInfo),
getStageOutputBufferUtilizations(queryInfo),
operatorSummaries.build(),
serializedPlanNodeStatsAndCosts);
}
Expand Down Expand Up @@ -703,6 +707,44 @@ private static StageCpuDistribution computeCpuDistribution(StageInfo stageInfo)
snapshot.getTotal() / snapshot.getCount());
}

private static List<StageOutputBufferUtilization> getStageOutputBufferUtilizations(QueryInfo queryInfo)
{
if (queryInfo.getOutputStage().isEmpty()) {
return ImmutableList.of();
}

ImmutableList.Builder<StageOutputBufferUtilization> builder = ImmutableList.builder();
populateStageOutputBufferUtilization(queryInfo.getOutputStage().get(), builder);

return builder.build();
}

private static void populateStageOutputBufferUtilization(StageInfo stageInfo, ImmutableList.Builder<StageOutputBufferUtilization> utilizations)
{
stageInfo.getStageStats().getOutputBufferUtilization()
.ifPresent(utilization -> {
utilizations.add(new StageOutputBufferUtilization(
stageInfo.getStageId().getId(),
stageInfo.getTasks().size(),
// scale ratio to percentages
utilization.getP01() * 100,
utilization.getP05() * 100,
utilization.getP10() * 100,
utilization.getP25() * 100,
utilization.getP50() * 100,
utilization.getP75() * 100,
utilization.getP90() * 100,
utilization.getP95() * 100,
utilization.getP99() * 100,
utilization.getMin() * 100,
utilization.getMax() * 100,
Duration.ofNanos(utilization.getTotal())));
});
for (StageInfo subStage : stageInfo.getSubStages()) {
populateStageOutputBufferUtilization(subStage, utilizations);
}
}

private static class FragmentNode
{
private final PlanFragmentId fragmentId;
Expand Down
10 changes: 9 additions & 1 deletion core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,22 @@
<!-- Any exclusions below can be deleted after each release -->
<item>
<code>java.method.numberOfParametersChanged</code>
<old>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;java.util.Optional&lt;io.trino.spi.metrics.Distribution&lt;?&gt;&gt;&gt;, java.util.List&lt;java.lang.String&gt;, java.util.Optional&lt;java.lang.String&gt;)</old>
<old>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;java.util.Optional&lt;io.trino.spi.metrics.Distribution&lt;?&gt;&gt;&gt;, java.util.List&lt;java.lang.String&gt;, java.util.Optional&lt;java.lang.String&gt;)
</old>
<new>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;java.lang.String&gt;, java.util.Optional&lt;java.lang.String&gt;)</new>
</item>
<item>
<ignore>true</ignore>
<code>java.method.removed</code>
<old>method java.util.List&lt;java.util.Optional&lt;io.trino.spi.metrics.Distribution&lt;?&gt;&gt;&gt; io.trino.spi.eventlistener.QueryStatistics::getStageOutputBufferUtilizationDistribution()</old>
</item>
<item>
<ignore>true</ignore>
<code>java.method.numberOfParametersChanged</code>
<old>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;java.lang.String&gt;, java.util.Optional&lt;java.lang.String&gt;)</old>
<new>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageOutputBufferUtilization&gt;, java.util.List&lt;java.lang.String&gt;, java.util.Optional&lt;java.lang.String&gt;)
</new>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class QueryStatistics
private final boolean complete;

private final List<StageCpuDistribution> cpuTimeDistribution;
private final List<StageOutputBufferUtilization> outputBufferUtilization;

/**
* Operator summaries serialized to JSON. Serialization format and structure
Expand Down Expand Up @@ -116,6 +117,7 @@ public QueryStatistics(
int completedSplits,
boolean complete,
List<StageCpuDistribution> cpuTimeDistribution,
List<StageOutputBufferUtilization> outputBufferUtilization,
List<String> operatorSummaries,
Optional<String> planNodeStatsAndCosts)
{
Expand Down Expand Up @@ -154,6 +156,7 @@ public QueryStatistics(
this.completedSplits = completedSplits;
this.complete = complete;
this.cpuTimeDistribution = requireNonNull(cpuTimeDistribution, "cpuTimeDistribution is null");
this.outputBufferUtilization = requireNonNull(outputBufferUtilization, "outputBufferUtilization is null");
this.operatorSummaries = requireNonNull(operatorSummaries, "operatorSummaries is null");
this.planNodeStatsAndCosts = requireNonNull(planNodeStatsAndCosts, "planNodeStatsAndCosts is null");
}
Expand Down Expand Up @@ -368,6 +371,12 @@ public List<StageCpuDistribution> getCpuTimeDistribution()
return cpuTimeDistribution;
}

@JsonProperty
public List<StageOutputBufferUtilization> getOutputBufferUtilization()
{
return outputBufferUtilization;
}

@JsonProperty
public List<String> getOperatorSummaries()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.eventlistener;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Duration;

import static java.util.Objects.requireNonNull;

/**
* This class is JSON serializable for convenience and serialization compatibility is not guaranteed across versions.
*/
public class StageOutputBufferUtilization
{
private final int stageId;
private final int tasks;
private final double p01;
private final double p05;
private final double p10;
private final double p25;
private final double p50;
private final double p75;
private final double p90;
private final double p95;
private final double p99;
private final double min;
private final double max;
private final Duration duration;

@JsonCreator
public StageOutputBufferUtilization(
int stageId,
int tasks,
double p01,
double p05,
double p10,
double p25,
double p50,
double p75,
double p90,
double p95,
double p99,
double min,
double max,
Duration duration)
{
this.stageId = stageId;
this.tasks = tasks;
this.p01 = p01;
this.p05 = p05;
this.p10 = p10;
this.p25 = p25;
this.p50 = p50;
this.p75 = p75;
this.p90 = p90;
this.p95 = p95;
this.p99 = p99;
this.min = min;
this.max = max;
this.duration = requireNonNull(duration, "duration is null");
}

@JsonProperty
public int getStageId()
{
return stageId;
}

@JsonProperty
public int getTasks()
{
return tasks;
}

@JsonProperty
public double getP01()
{
return p01;
}

@JsonProperty
public double getP05()
{
return p05;
}

@JsonProperty
public double getP10()
{
return p10;
}

@JsonProperty
public double getP25()
{
return p25;
}

@JsonProperty
public double getP50()
{
return p50;
}

@JsonProperty
public double getP75()
{
return p75;
}

@JsonProperty
public double getP90()
{
return p90;
}

@JsonProperty
public double getP95()
{
return p95;
}

@JsonProperty
public double getP99()
{
return p99;
}

@JsonProperty
public double getMin()
{
return min;
}

@JsonProperty
public double getMax()
{
return max;
}

@JsonProperty
public Duration getDuration()
{
return duration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.eventlistener.SplitCompletedEvent;
import io.trino.spi.eventlistener.SplitStatistics;
import io.trino.spi.eventlistener.StageOutputBufferUtilization;
import io.trino.spi.resourcegroups.QueryType;
import io.trino.spi.resourcegroups.ResourceGroupId;
import io.trino.spi.session.ResourceEstimates;
Expand Down Expand Up @@ -179,6 +180,7 @@ public class TestHttpEventListener
0,
true,
Collections.emptyList(),
List.of(new StageOutputBufferUtilization(0, 10, 0.1, 0.5, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99, 0.0, 1.0, Duration.ofSeconds(1234))),
Collections.emptyList(),
Optional.empty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public void testSplitsForNormalQuery()
assertTrue(statistics.getWallTime().getSeconds() >= 0);
assertTrue(statistics.getCpuTimeDistribution().size() > 0);
assertTrue(statistics.getOperatorSummaries().size() > 0);
assertTrue(statistics.getOutputBufferUtilization().size() > 0);
}

@Test
Expand Down