Skip to content

Commit

Permalink
Add connector metrics to EXPLAIN ANALYZE verbose output
Browse files Browse the repository at this point in the history
It should allow us to get quick feedback on a specific query
(without the need to "dig" into the QueryInfo JSON).
  • Loading branch information
rzeyde-varada authored and sopel39 committed Nov 25, 2021
1 parent b47e6b8 commit b11a241
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.Map;
Expand All @@ -39,7 +40,8 @@ public HashCollisionPlanNodeStats(
DataSize planNodeOutputDataSize,
DataSize planNodeSpilledDataSize,
Map<String, OperatorInputStats> operatorInputStats,
Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats)
Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats,
Metrics metrics)
{
super(
planNodeId,
Expand All @@ -50,7 +52,9 @@ public HashCollisionPlanNodeStats(
planNodeOutputPositions,
planNodeOutputDataSize,
planNodeSpilledDataSize,
operatorInputStats);
operatorInputStats,
metrics,
Metrics.EMPTY);
this.operatorHashCollisionsStats = requireNonNull(operatorHashCollisionsStats, "operatorHashCollisionsStats is null");
}

Expand Down Expand Up @@ -104,6 +108,7 @@ public PlanNodeStats mergeWith(PlanNodeStats other)
merged.getPlanNodeOutputDataSize(),
merged.getPlanNodeSpilledDataSize(),
merged.operatorInputStats,
operatorHashCollisionsStats);
operatorHashCollisionsStats,
merged.getMetrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.spi.Mergeable;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.Map;
Expand All @@ -42,6 +43,8 @@ public class PlanNodeStats
private final long planNodeOutputPositions;
private final DataSize planNodeOutputDataSize;
private final DataSize planNodeSpilledDataSize;
private final Metrics metrics;
private final Metrics connectorMetrics;

protected final Map<String, OperatorInputStats> operatorInputStats;

Expand All @@ -54,7 +57,9 @@ public class PlanNodeStats
long planNodeOutputPositions,
DataSize planNodeOutputDataSize,
DataSize planNodeSpilledDataSize,
Map<String, OperatorInputStats> operatorInputStats)
Map<String, OperatorInputStats> operatorInputStats,
Metrics metrics,
Metrics connectorMetrics)
{
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");

Expand All @@ -67,6 +72,8 @@ public class PlanNodeStats
this.planNodeSpilledDataSize = requireNonNull(planNodeSpilledDataSize, "planNodeSpilledDataSize is null");

this.operatorInputStats = requireNonNull(operatorInputStats, "operatorInputStats is null");
this.metrics = requireNonNull(metrics, "metrics is null");
this.connectorMetrics = requireNonNull(connectorMetrics, "connectorMetrics is null");
}

private static double computedStdDev(double sumSquared, double sum, long n)
Expand Down Expand Up @@ -122,6 +129,16 @@ public DataSize getPlanNodeSpilledDataSize()
return planNodeSpilledDataSize;
}

public Metrics getMetrics()
{
return metrics;
}

public Metrics getConnectorMetrics()
{
return connectorMetrics;
}

public Map<String, Double> getOperatorInputPositionsAverages()
{
return operatorInputStats.entrySet().stream()
Expand Down Expand Up @@ -160,6 +177,8 @@ public PlanNodeStats mergeWith(PlanNodeStats other)
planNodeInputPositions, planNodeInputDataSize,
planNodeOutputPositions, planNodeOutputDataSize,
succinctBytes(this.planNodeSpilledDataSize.toBytes() + other.planNodeSpilledDataSize.toBytes()),
operatorInputStats);
operatorInputStats,
this.metrics.mergeWith(other.metrics),
this.connectorMetrics.mergeWith(other.connectorMetrics));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.operator.PipelineStats;
import io.trino.operator.TaskStats;
import io.trino.operator.WindowInfo;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.ArrayList;
Expand Down Expand Up @@ -82,6 +83,8 @@ private static List<PlanNodeStats> getPlanNodeStats(TaskStats taskStats)
Map<PlanNodeId, Map<String, OperatorInputStats>> operatorInputStats = new HashMap<>();
Map<PlanNodeId, Map<String, OperatorHashCollisionsStats>> operatorHashCollisionsStats = new HashMap<>();
Map<PlanNodeId, WindowOperatorStats> windowNodeStats = new HashMap<>();
Map<PlanNodeId, Metrics> metrics = new HashMap<>();
Map<PlanNodeId, Metrics> connectorMetrics = new HashMap<>();

for (PipelineStats pipelineStats : taskStats.getPipelines()) {
// Due to eventual consistently collected stats, these could be empty
Expand Down Expand Up @@ -121,6 +124,9 @@ private static List<PlanNodeStats> getPlanNodeStats(TaskStats taskStats)
operatorStats.getSumSquaredInputPositions())),
(map1, map2) -> mergeMaps(map1, map2, OperatorInputStats::merge));

metrics.merge(planNodeId, operatorStats.getMetrics(), Metrics::mergeWith);
connectorMetrics.merge(planNodeId, operatorStats.getConnectorMetrics(), Metrics::mergeWith);

planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum);
planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum);
planNodeSpilledDataSize.merge(planNodeId, operatorStats.getSpilledDataSize().toBytes(), Long::sum);
Expand Down Expand Up @@ -195,7 +201,8 @@ private static List<PlanNodeStats> getPlanNodeStats(TaskStats taskStats)
succinctBytes(planNodeOutputBytes.getOrDefault(planNodeId, 0L)),
succinctBytes(planNodeSpilledDataSize.get(planNodeId)),
operatorInputStats.get(planNodeId),
operatorHashCollisionsStats.get(planNodeId));
operatorHashCollisionsStats.get(planNodeId),
metrics.get(planNodeId));
}
else if (windowNodeStats.containsKey(planNodeId)) {
nodeStats = new WindowPlanNodeStats(
Expand All @@ -208,7 +215,8 @@ else if (windowNodeStats.containsKey(planNodeId)) {
succinctBytes(planNodeOutputBytes.getOrDefault(planNodeId, 0L)),
succinctBytes(planNodeSpilledDataSize.get(planNodeId)),
operatorInputStats.get(planNodeId),
windowNodeStats.get(planNodeId));
windowNodeStats.get(planNodeId),
metrics.get(planNodeId));
}
else {
nodeStats = new PlanNodeStats(
Expand All @@ -220,7 +228,9 @@ else if (windowNodeStats.containsKey(planNodeId)) {
outputPositions,
succinctBytes(planNodeOutputBytes.getOrDefault(planNodeId, 0L)),
succinctBytes(planNodeSpilledDataSize.get(planNodeId)),
operatorInputStats.get(planNodeId));
operatorInputStats.get(planNodeId),
metrics.get(planNodeId),
connectorMetrics.get(planNodeId));
}

stats.add(nodeStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.trino.cost.PlanCostEstimate;
import io.trino.cost.PlanNodeStatsAndCostSummary;
import io.trino.cost.PlanNodeStatsEstimate;
import io.trino.spi.metrics.Metric;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.planprinter.NodeRepresentation.TypedSymbol;

Expand All @@ -27,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -135,6 +138,8 @@ private String printStats(PlanRepresentation plan, NodeRepresentation node)
}
output.append("\n");

printMetrics(output, "connector metrics:", nodeStats.getConnectorMetrics());
printMetrics(output, "metrics:", nodeStats.getMetrics());
printDistributions(output, nodeStats);
printCollisions(output, nodeStats);

Expand All @@ -145,6 +150,16 @@ private String printStats(PlanRepresentation plan, NodeRepresentation node)
return output.toString();
}

private void printMetrics(StringBuilder output, String label, Metrics metrics)
{
if (!verbose || metrics.getMetrics().isEmpty()) {
return;
}
output.append(label).append("\n");
Map<String, Metric<?>> sortedMap = new TreeMap<>(metrics.getMetrics());
sortedMap.forEach((name, metric) -> output.append(format(" '%s' = %s\n", name, metric)));
}

private void printDistributions(StringBuilder output, PlanNodeStats stats)
{
Map<String, Double> inputAverages = stats.getOperatorInputPositionsAverages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.Map;
Expand All @@ -34,7 +35,8 @@ public WindowPlanNodeStats(
DataSize planNodeOutputDataSize,
DataSize planNodeSpilledDataSize,
Map<String, OperatorInputStats> operatorInputStats,
WindowOperatorStats windowOperatorStats)
WindowOperatorStats windowOperatorStats,
Metrics metrics)
{
super(
planNodeId,
Expand All @@ -45,7 +47,9 @@ public WindowPlanNodeStats(
planNodeOutputPositions,
planNodeOutputDataSize,
planNodeSpilledDataSize,
operatorInputStats);
operatorInputStats,
metrics,
Metrics.EMPTY);
this.windowOperatorStats = windowOperatorStats;
}

Expand All @@ -69,6 +73,7 @@ public PlanNodeStats mergeWith(PlanNodeStats other)
merged.getPlanNodeOutputDataSize(),
merged.getPlanNodeSpilledDataSize(),
merged.operatorInputStats,
windowOperatorStats);
windowOperatorStats,
merged.getMetrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ public void testCustomMetricsScanOnly()
assertThat(((Count<?>) metrics.getMetrics().get("finished")).getTotal()).isGreaterThan(0);
}

@Test
public void testExplainCustomMetricsScanOnly()
{
assertExplainAnalyze(
"EXPLAIN ANALYZE VERBOSE SELECT partkey FROM part",
"'rows' = LongCount\\{total=2000}");
}

@Test
public void testExplainCustomMetricsScanFilter()
{
assertExplainAnalyze(
"EXPLAIN ANALYZE VERBOSE SELECT partkey FROM part WHERE partkey % 1000 > 0",
"'rows' = LongCount\\{total=2000}");
}

private Metrics collectCustomMetrics(String sql)
{
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
Expand Down

0 comments on commit b11a241

Please sign in to comment.