Skip to content

Commit

Permalink
Allow exposing custom metrics from the connector
Browse files Browse the repository at this point in the history
  • Loading branch information
rzeyde-varada authored and martint committed Jul 2, 2021
1 parent d159c6c commit 38305b7
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.operator.OperationTimer.OperationTiming;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class OperatorContext
private final CounterStat outputPositions = new CounterStat();

private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();
private final AtomicReference<Metrics> metrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value.

private final AtomicLong physicalWrittenDataSize = new AtomicLong();

Expand Down Expand Up @@ -219,6 +221,16 @@ public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits)
dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits);
}

/**
* Overwrites the metrics with the latest one.
*
* @param metrics Latest operator's metrics.
*/
public void setLatestMetrics(Metrics metrics)
{
this.metrics.set(metrics);
}

public void recordPhysicalWrittenData(long sizeInBytes)
{
physicalWrittenDataSize.getAndAdd(sizeInBytes);
Expand Down Expand Up @@ -531,6 +543,7 @@ public OperatorStats getOperatorStats()
outputPositions.getTotalCount(),

dynamicFilterSplitsProcessed.get(),
metrics.get(),

succinctBytes(physicalWrittenDataSize.get()),

Expand Down
14 changes: 14 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/OperatorStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.spi.Mergeable;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class OperatorStats
private final long outputPositions;

private final long dynamicFilterSplitsProcessed;
private final Metrics metrics;

private final DataSize physicalWrittenDataSize;

Expand Down Expand Up @@ -115,6 +117,7 @@ public OperatorStats(
@JsonProperty("outputPositions") long outputPositions,

@JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed,
@JsonProperty("metrics") Metrics metrics,

@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,

Expand Down Expand Up @@ -169,6 +172,7 @@ public OperatorStats(
this.outputPositions = outputPositions;

this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed;
this.metrics = requireNonNull(metrics);

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null");

Expand Down Expand Up @@ -332,6 +336,12 @@ public long getDynamicFilterSplitsProcessed()
return dynamicFilterSplitsProcessed;
}

@JsonProperty
public Metrics getMetrics()
{
return metrics;
}

@JsonProperty
public DataSize getPhysicalWrittenDataSize()
{
Expand Down Expand Up @@ -451,6 +461,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)
long outputPositions = this.outputPositions;

long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed;
Metrics.Accumulator metricsAccumulator = Metrics.accumulator().add(this.getMetrics());

long physicalWrittenDataSize = this.physicalWrittenDataSize.toBytes();

Expand Down Expand Up @@ -498,6 +509,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)
outputPositions += operator.getOutputPositions();

dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed();
metricsAccumulator.add(operator.getMetrics());

physicalWrittenDataSize += operator.getPhysicalWrittenDataSize().toBytes();

Expand Down Expand Up @@ -557,6 +569,7 @@ public OperatorStats add(Iterable<OperatorStats> operators)
outputPositions,

dynamicFilterSplitsProcessed,
metricsAccumulator.get(),

succinctBytes(physicalWrittenDataSize),

Expand Down Expand Up @@ -623,6 +636,7 @@ public OperatorStats summarize()
outputDataSize,
outputPositions,
dynamicFilterSplitsProcessed,
metrics,
physicalWrittenDataSize,
blockedWall,
finishCalls,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordPageSource;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.Type;
import io.trino.split.EmptySplit;
import io.trino.split.PageSourceProvider;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class ScanFilterAndProjectOperator
private long physicalBytes;
private long readTimeNanos;
private long dynamicFilterSplitsProcessed;
private Metrics metrics = Metrics.EMPTY;

private ScanFilterAndProjectOperator(
Session session,
Expand Down Expand Up @@ -160,6 +162,12 @@ public long getDynamicFilterSplitsProcessed()
return dynamicFilterSplitsProcessed;
}

@Override
public Metrics getConnectorMetrics()
{
return metrics;
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand All @@ -172,6 +180,7 @@ public void close()
if (pageSource != null) {
try {
pageSource.close();
metrics = pageSource.getMetrics();
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down Expand Up @@ -394,6 +403,7 @@ public ProcessState<Page> process()
processedPositions += page.getPositionCount();
physicalBytes = pageSource.getCompletedBytes();
readTimeNanos = pageSource.getReadTimeNanos();
metrics = pageSource.getMetrics();

return ProcessState.ofResult(page);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public void finish()
throw new UncheckedIOException(e);
}
systemMemoryContext.setBytes(source.getSystemMemoryUsage());
operatorContext.setLatestMetrics(source.getMetrics());
}
}

Expand Down Expand Up @@ -322,7 +323,7 @@ public Page getOutput()

// updating system memory usage should happen after page is loaded.
systemMemoryContext.setBytes(source.getSystemMemoryUsage());

operatorContext.setLatestMetrics(source.getMetrics());
return page;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;
import io.trino.split.EmptySplit;
import io.trino.split.PageSourceProvider;

Expand Down Expand Up @@ -117,6 +118,12 @@ public long getDynamicFilterSplitsProcessed()
return splitToPages.getDynamicFilterSplitsProcessed();
}

@Override
public Metrics getConnectorMetrics()
{
return splitToPages.source.getMetrics();
}

@Override
public Duration getReadTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.operator.WorkProcessor.ProcessState;
import io.trino.spi.Page;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.Type;
import io.trino.sql.planner.LocalExecutionPlanner.OperatorFactoryWithTypes;
import io.trino.sql.planner.plan.PlanNodeId;
Expand All @@ -39,6 +40,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -232,11 +234,14 @@ private void workProcessorOperatorStateMonitor(WorkProcessor.ProcessState<Page>
long deltaReadTimeNanos = deltaAndSet(context.readTimeNanos, sourceOperator.getReadTime().roundTo(NANOSECONDS));

long deltaDynamicFilterSplitsProcessed = deltaAndSet(context.dynamicFilterSplitsProcessed, sourceOperator.getDynamicFilterSplitsProcessed());
Metrics metrics = sourceOperator.getConnectorMetrics();
context.connectorMetrics.set(metrics);

operatorContext.recordPhysicalInputWithTiming(deltaPhysicalInputDataSize, deltaPhysicalInputPositions, deltaReadTimeNanos);
operatorContext.recordNetworkInput(deltaInternalNetworkInputDataSize, deltaInternalNetworkInputPositions);
operatorContext.recordProcessedInput(deltaInputDataSize, deltaInputPositions);
operatorContext.recordDynamicFilterSplitProcessed(deltaDynamicFilterSplitsProcessed);
operatorContext.setLatestMetrics(metrics);
}

if (state.getType() == FINISHED) {
Expand Down Expand Up @@ -337,6 +342,7 @@ private List<OperatorStats> getNestedOperatorStats()
context.outputPositions.get(),

context.dynamicFilterSplitsProcessed.get(),
context.connectorMetrics.get(),

DataSize.ofBytes(0),

Expand Down Expand Up @@ -676,6 +682,7 @@ private static class WorkProcessorOperatorContext
final AtomicLong outputPositions = new AtomicLong();

final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();
final AtomicReference<Metrics> connectorMetrics = new AtomicReference<>(Metrics.EMPTY);

final AtomicLong peakUserMemoryReservation = new AtomicLong();
final AtomicLong peakSystemMemoryReservation = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;

import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -67,4 +68,6 @@ default long getDynamicFilterSplitsProcessed()
{
return 0;
}

Metrics getConnectorMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.metadata.Split;
import io.trino.spi.Page;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.ArrayList;
Expand Down Expand Up @@ -175,6 +176,7 @@ public boolean isFinished()
public void close()
throws Exception
{
operatorContext.setLatestMetrics(sourceOperator.getConnectorMetrics());
sourceOperator.close();
}

Expand All @@ -191,6 +193,7 @@ private void updateOperatorStats()
long currentInputPositions = sourceOperator.getInputPositions();

long currentDynamicFilterSplitsProcessed = sourceOperator.getDynamicFilterSplitsProcessed();
Metrics currentMetrics = sourceOperator.getConnectorMetrics();

if (currentPhysicalInputBytes != previousPhysicalInputBytes
|| currentPhysicalInputPositions != previousPhysicalInputPositions
Expand Down Expand Up @@ -229,6 +232,8 @@ private void updateOperatorStats()
operatorContext.recordDynamicFilterSplitProcessed(currentDynamicFilterSplitsProcessed - previousDynamicFilterSplitsProcessed);
previousDynamicFilterSplitsProcessed = currentDynamicFilterSplitsProcessed;
}

operatorContext.setLatestMetrics(currentMetrics);
}

private static class SplitBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.operator.OperatorStats;
import io.trino.operator.TableWriterOperator;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;
import org.joda.time.DateTime;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class TestQueryStats
succinctBytes(116L),
117L,
1833,
Metrics.EMPTY,
succinctBytes(118L),
new Duration(119, NANOSECONDS),
120L,
Expand Down Expand Up @@ -101,6 +103,7 @@ public class TestQueryStats
succinctBytes(216L),
217L,
2833,
Metrics.EMPTY,
succinctBytes(218L),
new Duration(219, NANOSECONDS),
220L,
Expand Down Expand Up @@ -140,6 +143,7 @@ public class TestQueryStats
succinctBytes(316L),
317L,
3833,
Metrics.EMPTY,
succinctBytes(318L),
new Duration(319, NANOSECONDS),
320L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package io.trino.operator;

import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.connector.CatalogName;
import io.trino.operator.PartitionedOutputOperator.PartitionedOutputInfo;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -59,6 +61,7 @@ public class TestOperatorStats
DataSize.ofBytes(12),
13,
533,
Metrics.EMPTY,

DataSize.ofBytes(14),

Expand Down Expand Up @@ -106,6 +109,7 @@ public class TestOperatorStats
DataSize.ofBytes(12),
13,
533,
Metrics.EMPTY,

DataSize.ofBytes(14),

Expand Down Expand Up @@ -163,6 +167,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual)
assertEquals(actual.getOutputPositions(), 13);

assertEquals(actual.getDynamicFilterSplitsProcessed(), 533);
assertEquals(actual.getMetrics().getMetrics(), ImmutableMap.of());

assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(14));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.operator.WorkProcessorAssertion.Transform;
import io.trino.spi.Page;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.LocalExecutionPlanner.OperatorFactoryWithTypes;
import io.trino.sql.planner.plan.PlanNodeId;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -392,6 +393,12 @@ public Duration getReadTime()
return new Duration(7, NANOSECONDS);
}

@Override
public Metrics getConnectorMetrics()
{
return Metrics.EMPTY;
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand Down
Loading

0 comments on commit 38305b7

Please sign in to comment.