From 38305b7c5f4d790eaf7684792e9adc20725bf4dd Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Thu, 12 Nov 2020 18:30:46 +0200 Subject: [PATCH] Allow exposing custom metrics from the connector --- .../io/trino/operator/OperatorContext.java | 13 ++++ .../java/io/trino/operator/OperatorStats.java | 14 ++++ .../ScanFilterAndProjectOperator.java | 10 +++ .../io/trino/operator/TableScanOperator.java | 3 +- .../TableScanWorkProcessorOperator.java | 7 ++ .../WorkProcessorPipelineSourceOperator.java | 7 ++ .../operator/WorkProcessorSourceOperator.java | 3 + .../WorkProcessorSourceOperatorAdapter.java | 5 ++ .../io/trino/execution/TestQueryStats.java | 4 + .../io/trino/operator/TestOperatorStats.java | 5 ++ ...stWorkProcessorPipelineSourceOperator.java | 7 ++ .../spi/connector/ConnectorPageSource.java | 11 +++ .../main/java/io/trino/spi/metrics/Count.java | 20 +++++ .../io/trino/spi/metrics/Distribution.java | 22 ++++++ .../java/io/trino/spi/metrics/Metric.java | 23 ++++++ .../java/io/trino/spi/metrics/Metrics.java | 75 +++++++++++++++++++ 16 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 core/trino-spi/src/main/java/io/trino/spi/metrics/Count.java create mode 100644 core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java create mode 100644 core/trino-spi/src/main/java/io/trino/spi/metrics/Metric.java create mode 100644 core/trino-spi/src/main/java/io/trino/spi/metrics/Metrics.java diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index e8fca5f13e06..c055fc6a7e94 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -28,6 +28,7 @@ import io.trino.operator.OperationTimer.OperationTiming; import io.trino.spi.Page; import io.trino.spi.TrinoException; +import io.trino.spi.metrics.Metrics; import io.trino.sql.planner.plan.PlanNodeId; import javax.annotation.Nullable; @@ -83,6 +84,7 @@ public class OperatorContext private final CounterStat outputPositions = new CounterStat(); private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong(); + private final AtomicReference metrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value. private final AtomicLong physicalWrittenDataSize = new AtomicLong(); @@ -219,6 +221,16 @@ public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits) dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits); } + /** + * Overwrites the metrics with the latest one. + * + * @param metrics Latest operator's metrics. + */ + public void setLatestMetrics(Metrics metrics) + { + this.metrics.set(metrics); + } + public void recordPhysicalWrittenData(long sizeInBytes) { physicalWrittenDataSize.getAndAdd(sizeInBytes); @@ -531,6 +543,7 @@ public OperatorStats getOperatorStats() outputPositions.getTotalCount(), dynamicFilterSplitsProcessed.get(), + metrics.get(), succinctBytes(physicalWrittenDataSize.get()), diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java index 9d975c15721b..ebf5a2df5cd5 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java @@ -19,6 +19,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.spi.Mergeable; +import io.trino.spi.metrics.Metrics; import io.trino.sql.planner.plan.PlanNodeId; import javax.annotation.Nullable; @@ -63,6 +64,7 @@ public class OperatorStats private final long outputPositions; private final long dynamicFilterSplitsProcessed; + private final Metrics metrics; private final DataSize physicalWrittenDataSize; @@ -115,6 +117,7 @@ public OperatorStats( @JsonProperty("outputPositions") long outputPositions, @JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed, + @JsonProperty("metrics") Metrics metrics, @JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize, @@ -169,6 +172,7 @@ public OperatorStats( this.outputPositions = outputPositions; this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed; + this.metrics = requireNonNull(metrics); this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null"); @@ -332,6 +336,12 @@ public long getDynamicFilterSplitsProcessed() return dynamicFilterSplitsProcessed; } + @JsonProperty + public Metrics getMetrics() + { + return metrics; + } + @JsonProperty public DataSize getPhysicalWrittenDataSize() { @@ -451,6 +461,7 @@ public OperatorStats add(Iterable operators) long outputPositions = this.outputPositions; long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed; + Metrics.Accumulator metricsAccumulator = Metrics.accumulator().add(this.getMetrics()); long physicalWrittenDataSize = this.physicalWrittenDataSize.toBytes(); @@ -498,6 +509,7 @@ public OperatorStats add(Iterable operators) outputPositions += operator.getOutputPositions(); dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed(); + metricsAccumulator.add(operator.getMetrics()); physicalWrittenDataSize += operator.getPhysicalWrittenDataSize().toBytes(); @@ -557,6 +569,7 @@ public OperatorStats add(Iterable operators) outputPositions, dynamicFilterSplitsProcessed, + metricsAccumulator.get(), succinctBytes(physicalWrittenDataSize), @@ -623,6 +636,7 @@ public OperatorStats summarize() outputDataSize, outputPositions, dynamicFilterSplitsProcessed, + metrics, physicalWrittenDataSize, blockedWall, finishCalls, diff --git a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java index db77f4e63026..d0b3a93f5b78 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java @@ -39,6 +39,7 @@ import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.RecordPageSource; import io.trino.spi.connector.UpdatablePageSource; +import io.trino.spi.metrics.Metrics; import io.trino.spi.type.Type; import io.trino.split.EmptySplit; import io.trino.split.PageSourceProvider; @@ -79,6 +80,7 @@ public class ScanFilterAndProjectOperator private long physicalBytes; private long readTimeNanos; private long dynamicFilterSplitsProcessed; + private Metrics metrics = Metrics.EMPTY; private ScanFilterAndProjectOperator( Session session, @@ -160,6 +162,12 @@ public long getDynamicFilterSplitsProcessed() return dynamicFilterSplitsProcessed; } + @Override + public Metrics getConnectorMetrics() + { + return metrics; + } + @Override public WorkProcessor getOutputPages() { @@ -172,6 +180,7 @@ public void close() if (pageSource != null) { try { pageSource.close(); + metrics = pageSource.getMetrics(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -394,6 +403,7 @@ public ProcessState process() processedPositions += page.getPositionCount(); physicalBytes = pageSource.getCompletedBytes(); readTimeNanos = pageSource.getReadTimeNanos(); + metrics = pageSource.getMetrics(); return ProcessState.ofResult(page); } diff --git a/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java index 168737f90833..67e323ae81db 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java @@ -247,6 +247,7 @@ public void finish() throw new UncheckedIOException(e); } systemMemoryContext.setBytes(source.getSystemMemoryUsage()); + operatorContext.setLatestMetrics(source.getMetrics()); } } @@ -322,7 +323,7 @@ public Page getOutput() // updating system memory usage should happen after page is loaded. systemMemoryContext.setBytes(source.getSystemMemoryUsage()); - + operatorContext.setLatestMetrics(source.getMetrics()); return page; } } diff --git a/core/trino-main/src/main/java/io/trino/operator/TableScanWorkProcessorOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableScanWorkProcessorOperator.java index 3b3247a5cbca..7728323875db 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableScanWorkProcessorOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableScanWorkProcessorOperator.java @@ -32,6 +32,7 @@ import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.connector.UpdatablePageSource; +import io.trino.spi.metrics.Metrics; import io.trino.split.EmptySplit; import io.trino.split.PageSourceProvider; @@ -117,6 +118,12 @@ public long getDynamicFilterSplitsProcessed() return splitToPages.getDynamicFilterSplitsProcessed(); } + @Override + public Metrics getConnectorMetrics() + { + return splitToPages.source.getMetrics(); + } + @Override public Duration getReadTime() { diff --git a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java index 280df7d31a7d..580bd1defb66 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java @@ -29,6 +29,7 @@ import io.trino.operator.WorkProcessor.ProcessState; import io.trino.spi.Page; import io.trino.spi.connector.UpdatablePageSource; +import io.trino.spi.metrics.Metrics; import io.trino.spi.type.Type; import io.trino.sql.planner.LocalExecutionPlanner.OperatorFactoryWithTypes; import io.trino.sql.planner.plan.PlanNodeId; @@ -39,6 +40,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkState; @@ -232,11 +234,14 @@ private void workProcessorOperatorStateMonitor(WorkProcessor.ProcessState long deltaReadTimeNanos = deltaAndSet(context.readTimeNanos, sourceOperator.getReadTime().roundTo(NANOSECONDS)); long deltaDynamicFilterSplitsProcessed = deltaAndSet(context.dynamicFilterSplitsProcessed, sourceOperator.getDynamicFilterSplitsProcessed()); + Metrics metrics = sourceOperator.getConnectorMetrics(); + context.connectorMetrics.set(metrics); operatorContext.recordPhysicalInputWithTiming(deltaPhysicalInputDataSize, deltaPhysicalInputPositions, deltaReadTimeNanos); operatorContext.recordNetworkInput(deltaInternalNetworkInputDataSize, deltaInternalNetworkInputPositions); operatorContext.recordProcessedInput(deltaInputDataSize, deltaInputPositions); operatorContext.recordDynamicFilterSplitProcessed(deltaDynamicFilterSplitsProcessed); + operatorContext.setLatestMetrics(metrics); } if (state.getType() == FINISHED) { @@ -337,6 +342,7 @@ private List getNestedOperatorStats() context.outputPositions.get(), context.dynamicFilterSplitsProcessed.get(), + context.connectorMetrics.get(), DataSize.ofBytes(0), @@ -676,6 +682,7 @@ private static class WorkProcessorOperatorContext final AtomicLong outputPositions = new AtomicLong(); final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong(); + final AtomicReference connectorMetrics = new AtomicReference<>(Metrics.EMPTY); final AtomicLong peakUserMemoryReservation = new AtomicLong(); final AtomicLong peakSystemMemoryReservation = new AtomicLong(); diff --git a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperator.java b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperator.java index 5dc2d733b015..42860ee9422a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperator.java @@ -16,6 +16,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.spi.connector.UpdatablePageSource; +import io.trino.spi.metrics.Metrics; import java.util.Optional; import java.util.function.Supplier; @@ -67,4 +68,6 @@ default long getDynamicFilterSplitsProcessed() { return 0; } + + Metrics getConnectorMetrics(); } diff --git a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java index 5cec3c6c562f..4c534b1e706a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java +++ b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java @@ -21,6 +21,7 @@ import io.trino.metadata.Split; import io.trino.spi.Page; import io.trino.spi.connector.UpdatablePageSource; +import io.trino.spi.metrics.Metrics; import io.trino.sql.planner.plan.PlanNodeId; import java.util.ArrayList; @@ -175,6 +176,7 @@ public boolean isFinished() public void close() throws Exception { + operatorContext.setLatestMetrics(sourceOperator.getConnectorMetrics()); sourceOperator.close(); } @@ -191,6 +193,7 @@ private void updateOperatorStats() long currentInputPositions = sourceOperator.getInputPositions(); long currentDynamicFilterSplitsProcessed = sourceOperator.getDynamicFilterSplitsProcessed(); + Metrics currentMetrics = sourceOperator.getConnectorMetrics(); if (currentPhysicalInputBytes != previousPhysicalInputBytes || currentPhysicalInputPositions != previousPhysicalInputPositions @@ -229,6 +232,8 @@ private void updateOperatorStats() operatorContext.recordDynamicFilterSplitProcessed(currentDynamicFilterSplitsProcessed - previousDynamicFilterSplitsProcessed); previousDynamicFilterSplitsProcessed = currentDynamicFilterSplitsProcessed; } + + operatorContext.setLatestMetrics(currentMetrics); } private static class SplitBuffer diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java index 34ff5221d532..38571bc8f2c4 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java @@ -22,6 +22,7 @@ import io.trino.operator.OperatorStats; import io.trino.operator.TableWriterOperator; import io.trino.spi.eventlistener.StageGcStatistics; +import io.trino.spi.metrics.Metrics; import io.trino.sql.planner.plan.PlanNodeId; import org.joda.time.DateTime; import org.testng.annotations.Test; @@ -62,6 +63,7 @@ public class TestQueryStats succinctBytes(116L), 117L, 1833, + Metrics.EMPTY, succinctBytes(118L), new Duration(119, NANOSECONDS), 120L, @@ -101,6 +103,7 @@ public class TestQueryStats succinctBytes(216L), 217L, 2833, + Metrics.EMPTY, succinctBytes(218L), new Duration(219, NANOSECONDS), 220L, @@ -140,6 +143,7 @@ public class TestQueryStats succinctBytes(316L), 317L, 3833, + Metrics.EMPTY, succinctBytes(318L), new Duration(319, NANOSECONDS), 320L, diff --git a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java index 8e9747c04687..e23e0549c8b0 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java @@ -13,11 +13,13 @@ */ package io.trino.operator; +import com.google.common.collect.ImmutableMap; import io.airlift.json.JsonCodec; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.connector.CatalogName; import io.trino.operator.PartitionedOutputOperator.PartitionedOutputInfo; +import io.trino.spi.metrics.Metrics; import io.trino.sql.planner.plan.PlanNodeId; import org.testng.annotations.Test; @@ -59,6 +61,7 @@ public class TestOperatorStats DataSize.ofBytes(12), 13, 533, + Metrics.EMPTY, DataSize.ofBytes(14), @@ -106,6 +109,7 @@ public class TestOperatorStats DataSize.ofBytes(12), 13, 533, + Metrics.EMPTY, DataSize.ofBytes(14), @@ -163,6 +167,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual) assertEquals(actual.getOutputPositions(), 13); assertEquals(actual.getDynamicFilterSplitsProcessed(), 533); + assertEquals(actual.getMetrics().getMetrics(), ImmutableMap.of()); assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(14)); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java index 34f7e9bf5fe1..67d1f2243b29 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java @@ -26,6 +26,7 @@ import io.trino.operator.WorkProcessorAssertion.Transform; import io.trino.spi.Page; import io.trino.spi.connector.UpdatablePageSource; +import io.trino.spi.metrics.Metrics; import io.trino.sql.planner.LocalExecutionPlanner.OperatorFactoryWithTypes; import io.trino.sql.planner.plan.PlanNodeId; import org.testng.annotations.AfterClass; @@ -392,6 +393,12 @@ public Duration getReadTime() return new Duration(7, NANOSECONDS); } + @Override + public Metrics getConnectorMetrics() + { + return Metrics.EMPTY; + } + @Override public WorkProcessor getOutputPages() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java index 5b5149965baf..f16a72a2428c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java @@ -14,6 +14,7 @@ package io.trino.spi.connector; import io.trino.spi.Page; +import io.trino.spi.metrics.Metrics; import java.io.Closeable; import java.io.IOException; @@ -70,4 +71,14 @@ default CompletableFuture isBlocked() { return NOT_BLOCKED; } + + /** + * Returns the connector's metrics, mapping a metric ID to its latest value. + * Each call must return an immutable snapshot of available metrics. + * Same ID metrics are merged across all tasks and exposed via OperatorStats. + */ + default Metrics getMetrics() + { + return Metrics.EMPTY; + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/metrics/Count.java b/core/trino-spi/src/main/java/io/trino/spi/metrics/Count.java new file mode 100644 index 000000000000..9655073c5e37 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/metrics/Count.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.metrics; + +public interface Count + extends Metric +{ + long getTotal(); +} diff --git a/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java b/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java new file mode 100644 index 000000000000..e66700548cce --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.metrics; + +public interface Distribution + extends Metric +{ + long getTotal(); + + double getPercentile(double percentile); +} diff --git a/core/trino-spi/src/main/java/io/trino/spi/metrics/Metric.java b/core/trino-spi/src/main/java/io/trino/spi/metrics/Metric.java new file mode 100644 index 000000000000..a0024b26c637 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/metrics/Metric.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.metrics; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.trino.spi.Mergeable; + +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) +public interface Metric + extends Mergeable +{ +} diff --git a/core/trino-spi/src/main/java/io/trino/spi/metrics/Metrics.java b/core/trino-spi/src/main/java/io/trino/spi/metrics/Metrics.java new file mode 100644 index 000000000000..38317d244d7c --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/metrics/Metrics.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.metrics; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import io.trino.spi.Mergeable; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class Metrics + implements Mergeable +{ + public static final Metrics EMPTY = new Metrics(Map.of()); + + private final Map metrics; + + @JsonCreator + public Metrics(Map metrics) + { + this.metrics = Map.copyOf(requireNonNull(metrics, "metrics is null")); + } + + @JsonValue + public Map getMetrics() + { + return metrics; + } + + @Override + public Metrics mergeWith(Metrics other) + { + return accumulator().add(this).add(other).get(); + } + + public static Accumulator accumulator() + { + return new Accumulator(); + } + + public static class Accumulator + { + private final Map merged = new HashMap<>(); + + private Accumulator() + { + } + + public Accumulator add(Metrics metrics) + { + metrics.getMetrics().forEach((key, value) -> + merged.merge(key, value, (left, right) -> (Metric) left.mergeWith(right))); + return this; + } + + public Metrics get() + { + return new Metrics(merged); + } + } +}