diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ef934efa94be..9938affd0ba2 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20240919-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20241209-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20240924-2.0.0", // [bomupgrader] sets version diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java index f9cd098edaa6..5bc6ecc64ea6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java @@ -74,6 +74,12 @@ public Gauge getGauge(MetricName metricName) { return getCurrentContainer().getGauge(metricName); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName); + return gauge; + } + @Override public StringSet getStringSet(MetricName metricName) { return getCurrentContainer().getStringSet(metricName); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java index 91baefa0be4c..ce3464f522df 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java @@ -19,6 +19,7 @@ import com.google.api.services.dataflow.model.Base2Exponent; import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowGaugeValue; import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.api.services.dataflow.model.Linear; import com.google.api.services.dataflow.model.MetricValue; @@ -86,6 +87,38 @@ private static Optional convertCounterToMetricValue( .setValueInt64(value)); } + /** + * @param metricName The {@link MetricName} that represents this counter. + * @param value The counter value. + * @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise + * returns an empty optional + */ + private static Optional convertGaugeToMetricValue( + MetricName metricName, + Long value, + Map parsedPerWorkerMetricsCache) { + + if ((!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE) + && !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) { + return Optional.empty(); + } + + Optional labeledName = + getParsedMetricName(metricName, parsedPerWorkerMetricsCache); + if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) { + return Optional.empty(); + } + + DataflowGaugeValue gauge_value = new DataflowGaugeValue(); + gauge_value.setValue(value); + + return Optional.of( + new MetricValue() + .setMetric(labeledName.get().getBaseName()) + .setMetricLabels(labeledName.get().getMetricLabels()) + .setValueGauge64(gauge_value)); + } + /** * Adds {@code outlierStats} to {@code outputHistogram} if {@code inputHistogram} has recorded * overflow or underflow values. @@ -196,6 +229,7 @@ private static Optional convertHistogramToMetricValue( public static Collection convert( String stepName, Map counters, + Map gauges, Map histograms, Map parsedPerWorkerMetricsCache) { @@ -245,6 +279,27 @@ public static Collection convert( stepNamespaceMetrics.getMetricValues().add(metricValue.get()); } + for (Entry entry : gauges.entrySet()) { + MetricName metricName = entry.getKey(); + Optional metricValue; + metricValue = + convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); + if (!metricValue.isPresent()) { + continue; + } + + PerStepNamespaceMetrics stepNamespaceMetrics = + metricsByNamespace.get(metricName.getNamespace()); + if (stepNamespaceMetrics == null) { + stepNamespaceMetrics = + new PerStepNamespaceMetrics() + .setMetricValues(new ArrayList<>()) + .setOriginalStep(stepName) + .setMetricsNamespace(metricName.getNamespace()); + metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics); + } + stepNamespaceMetrics.getMetricValues().add(metricValue.get()); + } return metricsByNamespace.values(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 7cc0dc68f7e7..075ac54f441f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -58,6 +58,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class StreamingStepMetricsContainer implements MetricsContainer { + private final String stepName; private static boolean enablePerWorkerMetrics = false; @@ -69,6 +70,10 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private MetricsMap gauges = new MetricsMap<>(GaugeCell::new); + // how to handle concurrency + private final ConcurrentHashMap perWorkerGauges = + new ConcurrentHashMap<>(); + private MetricsMap stringSet = new MetricsMap<>(StringSetCell::new); private MetricsMap distributions = @@ -163,6 +168,19 @@ public Gauge getGauge(MetricName metricName) { return gauges.get(metricName); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + if (!enablePerWorkerMetrics) { + return MetricsContainer.super.getPerWorkerGauge(metricName); // returns no op gauge + } + Gauge val = perWorkerGauges.get(metricName); + if (val != null) { + return val; + } + + return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName)); + } + @Override public StringSet getStringSet(MetricName metricName) { return stringSet.get(metricName); @@ -330,11 +348,10 @@ private void deleteStaleCounters( @VisibleForTesting Iterable extractPerWorkerMetricUpdates() { ConcurrentHashMap counters = new ConcurrentHashMap(); + ConcurrentHashMap gauges = new ConcurrentHashMap(); ConcurrentHashMap histograms = new ConcurrentHashMap(); HashSet currentZeroValuedCounters = new HashSet(); - - // Extract metrics updates. perWorkerCounters.forEach( (k, v) -> { Long val = v.getAndSet(0); @@ -344,6 +361,13 @@ Iterable extractPerWorkerMetricUpdates() { } counters.put(k, val); }); + + perWorkerGauges.forEach( + (k, v) -> { + Long val = v.getCumulative().value(); + gauges.put(k, val); // no special handing for zero, since that value is important + v.reset(); + }); perWorkerHistograms.forEach( (k, v) -> { v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot)); @@ -352,7 +376,7 @@ Iterable extractPerWorkerMetricUpdates() { deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock)); return MetricsToPerStepNamespaceMetricsConverter.convert( - stepName, counters, histograms, parsedPerWorkerMetricsCache); + stepName, counters, gauges, histograms, parsedPerWorkerMetricsCache); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index 3557f0d193c5..571213635ecc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -358,7 +358,6 @@ private Optional createWorkerMessageForPerWorkerMetrics() { List metrics = new ArrayList<>(); allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues())); - if (metrics.isEmpty()) { return Optional.empty(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java index f9a9bb42906b..7939db911245 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java @@ -23,6 +23,7 @@ import com.google.api.services.dataflow.model.Base2Exponent; import com.google.api.services.dataflow.model.BucketOptions; +import com.google.api.services.dataflow.model.DataflowGaugeValue; import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.api.services.dataflow.model.Linear; import com.google.api.services.dataflow.model.MetricValue; @@ -44,6 +45,7 @@ @RunWith(JUnit4.class) public class MetricsToPerStepNamespaceMetricsConverterTest { + private static final HistogramData.BucketType lienarBuckets = HistogramData.LinearBuckets.of(0, 10, 10); private static final HistogramData.BucketType exponentialBuckets = @@ -86,6 +88,7 @@ public void testConvert_successfulyConvertCounters() { String step = "testStepName"; Map emptyHistograms = new HashMap<>(); Map counters = new HashMap(); + Map emptyGauges = new HashMap(); Map parsedMetricNames = new HashMap<>(); MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "metric1"); @@ -99,7 +102,7 @@ public void testConvert_successfulyConvertCounters() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, counters, emptyHistograms, parsedMetricNames); + step, counters, emptyGauges, emptyHistograms, parsedMetricNames); MetricValue expectedVal1 = new MetricValue().setMetric("metric1").setValueInt64(5L).setMetricLabels(new HashMap<>()); @@ -133,6 +136,7 @@ public void testConvert_skipInvalidMetricNames() { Map parsedMetricNames = new HashMap<>(); Map counters = new HashMap<>(); + Map emptyGauges = new HashMap(); MetricName invalidName1 = MetricName.named("BigQuerySink", "**"); counters.put(invalidName1, 5L); @@ -144,7 +148,7 @@ public void testConvert_skipInvalidMetricNames() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - "testStep", counters, histograms, parsedMetricNames); + "testStep", counters, emptyGauges, histograms, parsedMetricNames); assertThat(conversionResult.size(), equalTo(0)); assertThat(parsedMetricNames.size(), equalTo(0)); } @@ -152,7 +156,7 @@ public void testConvert_skipInvalidMetricNames() { @Test public void testConvert_successfulConvertHistograms() { Map parsedMetricNames = new HashMap<>(); - + Map emptyGauges = new HashMap(); Map histograms = new HashMap<>(); MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel"); MetricName bigQueryMetric2 = @@ -181,7 +185,7 @@ public void testConvert_successfulConvertHistograms() { Map emptyCounters = new HashMap<>(); Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, emptyCounters, histograms, parsedMetricNames); + step, emptyCounters, emptyGauges, histograms, parsedMetricNames); // Expected value 1 List bucketCounts1 = ImmutableList.of(0L, 1L, 1L, 1L); @@ -271,7 +275,7 @@ public void testConvert_skipUnknownHistogramBucketType() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, emptyCounters, histograms, parsedMetricNames); + step, emptyCounters, emptyCounters, histograms, parsedMetricNames); assertThat(conversionResult.size(), equalTo(0)); assertThat(parsedMetricNames.size(), equalTo(0)); } @@ -280,6 +284,7 @@ public void testConvert_skipUnknownHistogramBucketType() { public void testConvert_convertCountersAndHistograms() { String step = "testStep"; Map counters = new HashMap<>(); + Map emptyGauges = new HashMap<>(); Map histograms = new HashMap<>(); Map parsedMetricNames = new HashMap<>(); @@ -293,7 +298,7 @@ public void testConvert_convertCountersAndHistograms() { Collection conversionResult = MetricsToPerStepNamespaceMetricsConverter.convert( - step, counters, histograms, parsedMetricNames); + step, counters, emptyGauges, histograms, parsedMetricNames); // Expected counter MetricValue Map counterLabelMap = new HashMap<>(); @@ -345,4 +350,77 @@ public void testConvert_convertCountersAndHistograms() { parsedMetricNames, IsMapContaining.hasEntry(histogramMetricName, parsedHistogramMetricName)); } + + @Test + public void testConvert_successfulyConvertGauges() { + String step = "testStepName"; + Map emptyHistograms = new HashMap<>(); + Map counters = new HashMap(); + Map gauges = new HashMap(); + // convertCountersAndHistograms + Map parsedMetricNames = new HashMap<>(); + + MetricName KafkaMetric1 = MetricName.named("KafkaSink", "metric1"); + MetricName KafkaMetric2 = MetricName.named("KafkaSink", "metric2*label1:val1;label2:val2;"); + MetricName KafkaMetric3 = MetricName.named("KafkaSink", "metric3"); // ? + + gauges.put(KafkaMetric1, 5L); + gauges.put(KafkaMetric2, 10L); + gauges.put(KafkaMetric3, 0L); // zero valued metric is still reported + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert( + step, counters, gauges, emptyHistograms, parsedMetricNames); + + DataflowGaugeValue gauge_value1 = new DataflowGaugeValue(); + gauge_value1.setValue(5L); + + DataflowGaugeValue gauge_value2 = new DataflowGaugeValue(); + gauge_value2.setValue(10L); + + DataflowGaugeValue gauge_value3 = new DataflowGaugeValue(); + gauge_value3.setValue(0L); // zero valued + + MetricValue expectedVal1 = + new MetricValue() + .setMetric("metric1") + .setValueGauge64(gauge_value1) + .setMetricLabels(new HashMap<>()); + + Map val2LabelMap = new HashMap<>(); + val2LabelMap.put("label1", "val1"); + val2LabelMap.put("label2", "val2"); + MetricValue expectedVal2 = + new MetricValue() + .setMetric("metric2") + .setValueGauge64(gauge_value2) + .setMetricLabels(val2LabelMap); + + MetricValue expectedVal3 = + new MetricValue() + .setMetric("metric3") + .setValueGauge64(gauge_value3) + .setMetricLabels(new HashMap<>()); + + assertThat(conversionResult.size(), equalTo(1)); + PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next(); + + assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step)); + assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("KafkaSink")); + assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(3)); + assertThat( + perStepNamespaceMetrics.getMetricValues(), + containsInAnyOrder(expectedVal1, expectedVal2, expectedVal3)); + + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric1 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric1.getName()).get(); + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric2 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric2.getName()).get(); + LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric3 = + LabeledMetricNameUtils.parseMetricName(KafkaMetric3.getName()).get(); + assertThat(parsedMetricNames.size(), equalTo(3)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric1, parsedKafkaMetric1)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric2, parsedKafkaMetric2)); + assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric3, parsedKafkaMetric3)); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingGauge.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingGauge.java new file mode 100644 index 000000000000..5eb19d21d5f3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingGauge.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; + +/** Implementation of {@link Gauge} that delegates to the instance for the current context. */ +@Internal +public class DelegatingGauge implements Metric, Gauge, Serializable { + private final MetricName name; + private final boolean processWideContainer; + private final boolean perWorkerGauge; + + /** + * Create a {@code DelegatingGauge} with {@code perWorkerGauge} and {@code processWideContainer} + * set to false. + * + * @param name Metric name for this metric. + */ + public DelegatingGauge(MetricName name) { + this(name, false, false); + } + + /** + * Create a {@code DelegatingGauge} with {@code perWorkerGauge} set to false. + * + * @param name Metric name for this metric. + * @param processWideContainer Whether this Gauge is stored in the ProcessWide container or the + * current thread's container. + */ + public DelegatingGauge(MetricName name, boolean processWideContainer) { + this(name, processWideContainer, false); + } + + /** + * @param name Metric name for this metric. + * @param processWideContainer Whether this gauge is stored in the ProcessWide container or the + * current thread's container. + * @param perWorkerGauge Whether this gauge refers to a perWorker metric or not. + */ + public DelegatingGauge(MetricName name, boolean processWideContainer, boolean perWorkerGauge) { + this.name = name; + this.processWideContainer = processWideContainer; + this.perWorkerGauge = perWorkerGauge; + } + + /** Set the gauge. */ + @Override + public void set(long n) { + MetricsContainer container = + this.processWideContainer + ? MetricsEnvironment.getProcessWideContainer() + : MetricsEnvironment.getCurrentContainer(); + if (container == null) { + return; + } + if (perWorkerGauge) { + container.getPerWorkerGauge(name).set(n); + } else { + container.getGauge(name).set(n); + } + } + + @Override + public MetricName getName() { + return name; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index 0c4766bb2c0b..2d07df1d899b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -75,6 +75,14 @@ default Histogram getPerWorkerHistogram( return NoOpHistogram.getInstance(); } + /** + * Return the {@link Gauge} that should be used for implementing the given per-worker {@code + * metricName} in this container. + */ + default Gauge getPerWorkerGauge(MetricName metricName) { + return NoOpGauge.getInstance(); + } + /** Return the cumulative values for any metrics in this container as MonitoringInfos. */ default Iterable getMonitoringInfos() { throw new RuntimeException("getMonitoringInfos is not implemented on this MetricsContainer."); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java new file mode 100644 index 000000000000..39d3186bba66 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +/** + * A no-op implementation of Gauge. This class exists to provide a default if an implementation of + * MetricsContainer does not override a Gauge getter. + */ +public class NoOpGauge implements Gauge { + + private static final NoOpGauge singleton = new NoOpGauge(); + private static final MetricName name = MetricName.named(NoOpGauge.class, "singleton"); + + private NoOpGauge() {} + + @Override + public void set(long n) {} + + @Override + public MetricName getName() { + return name; + } + + public static NoOpGauge getInstance() { + return singleton; + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java index 147a30dcdd1a..ee03627302d2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -23,7 +23,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,10 @@ public interface KafkaMetrics { void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime); + // use atomic Long + void updateBacklogBytes(String topic, int partitionId, long backlog); + + // write to metrics container void updateKafkaMetrics(); /** No-op implementation of {@code KafkaResults}. */ @@ -42,6 +48,9 @@ private NoOpKafkaMetrics() {} @Override public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {} + @Override + public void updateBacklogBytes(String topic, int partitionId, long elapsedTime) {} + @Override public void updateKafkaMetrics() {} @@ -69,13 +78,21 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { static HashMap latencyHistograms = new HashMap(); + // metric name abstract HashMap> perTopicRpcLatencies(); + // worry about concurrency? with atomic Gauge in + static HashMap backlogGauges = new HashMap(); + + abstract HashMap perTopicPartitionBacklogs(); + abstract AtomicBoolean isWritable(); public static KafkaMetricsImpl create() { return new AutoValue_KafkaMetrics_KafkaMetricsImpl( - new HashMap>(), new AtomicBoolean(true)); + new HashMap>(), + new HashMap(), + new AtomicBoolean(true)); } /** Record the rpc status and latency of a successful Kafka poll RPC call. */ @@ -93,6 +110,21 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { } } + /** + * @param topicName topicName + * @param partitionId partitionId for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + * @param backlog backlog for the topic Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + */ + @Override + public void updateBacklogBytes(String topicName, int partitionId, long backlog) { + if (isWritable().get()) { + String name = KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName(); + perTopicPartitionBacklogs().put(name, backlog); + } + } + /** Record rpc latency histogram metrics for all recorded topics. */ private void recordRpcLatencyMetrics() { for (Map.Entry> topicLatencies : @@ -114,6 +146,14 @@ private void recordRpcLatencyMetrics() { } } + private void recordBacklogBytes() { + for (Map.Entry backlogs : perTopicPartitionBacklogs().entrySet()) { + Gauge gauge = + KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", backlogs.getKey())); + gauge.set(backlogs.getValue()); + } + } + /** * Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics} * containers. This function will only report metrics once per instance. Subsequent calls to @@ -125,6 +165,7 @@ public void updateKafkaMetrics() { LOG.warn("Updating stale Kafka metrics container"); return; } + recordBacklogBytes(); recordRpcLatencyMetrics(); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java index f71926f97d27..9ddbcf8f0ba6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.io.kafka; +import org.apache.beam.sdk.metrics.DelegatingGauge; import org.apache.beam.sdk.metrics.DelegatingHistogram; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; import org.apache.beam.sdk.metrics.MetricName; @@ -40,6 +42,7 @@ public class KafkaSinkMetrics { // Base Metric names private static final String RPC_LATENCY = "RpcLatency"; + private static final String ESTIAMTED_BACKLOG_SIZE = "EstimatedBacklogSize"; // Kafka Consumer Method names enum RpcMethod { @@ -49,6 +52,7 @@ enum RpcMethod { // Metric labels private static final String TOPIC_LABEL = "topic_name"; private static final String RPC_METHOD = "rpc_method"; + private static final String PARTITION_ID = "partition_id"; /** * Creates an Histogram metric to record RPC latency. Metric will have name. @@ -71,6 +75,48 @@ public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic return new DelegatingHistogram(metricName, buckets, false, true); } + /** + * Creates an Histogram metric to record RPC latency. Metric will have name. + * + *

'EstimatedBacklogSize*topic_name:{topic};partitionId:{partitionId};' + * + * @param topic Kafka topic associated with this metric. + * @param partitionId partition id associated with this metric. + * @return Counter. + */ + public static Gauge createBacklogGauge(String topic, int partitionId) { + return new DelegatingGauge(getMetricGaugeName(topic, partitionId), false, true); + } + + /** + * Creates an Gauge metric to record per partition backlog. Metric will have name. + * + *

'EstimatedBacklogSize*topic_name:{topic};partition_id:{partitionId};' + * + * @param name MetricName for the KafkaSink. + * @return Counter. + */ + public static Gauge createBacklogGauge(MetricName name) { + return new DelegatingGauge(name, false, true); + } + + /** + * Creates an MetricName based on topic name and partition id. + * + *

'EstimatedBacklogSize*topic_name:{topic};partition_id:{partitionId};' + * + * @param topic Kafka topic associated with this metric. + * @param partitionId partition id associated with this metric. + * @return MetricName. + */ + public static MetricName getMetricGaugeName(String topic, int partitionId) { + LabeledMetricNameUtils.MetricNameBuilder nameBuilder = + LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIAMTED_BACKLOG_SIZE); + nameBuilder.addLabel(PARTITION_ID, String.valueOf(partitionId)); + nameBuilder.addLabel(TOPIC_LABEL, topic); + return nameBuilder.build(METRICS_NAMESPACE); + } + /** * Returns a container to store metrics for Kafka metrics in Unbounded Readed. If these metrics * are disabled, then we return a no-op container. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index ab9e26b3b740..5755e7179e6d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -231,8 +231,9 @@ public boolean advance() throws IOException { kafkaResults.updateKafkaMetrics(); return true; } else { // -- (b) + kafkaResults = KafkaSinkMetrics.kafkaMetrics(); nextBatch(); - + kafkaResults.updateKafkaMetrics(); if (!curBatch.hasNext()) { return false; } @@ -302,13 +303,11 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { @Override public long getSplitBacklogBytes() { long backlogBytes = 0; - for (PartitionState p : partitionStates) { long pBacklog = p.approxBacklogInBytes(); if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } - backlogBytes += pBacklog; } return backlogBytes; @@ -455,6 +454,10 @@ private static class PartitionState { this.timestampPolicy = timestampPolicy; } + public TopicPartition topicPartition() { + return topicPartition; + } + // Update consumedOffset, avgRecordSize, and avgOffsetGap void recordConsumed(long offset, int size, long offsetGap) { nextOffset = offset + 1; @@ -672,6 +675,7 @@ private void nextBatch() throws IOException { partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator()); reportBacklog(); + reportBacklogMetrics(); // cycle through the partitions in order to interleave records from each. curBatch = Iterators.cycle(new ArrayList<>(partitionStates)); @@ -743,6 +747,16 @@ private void reportBacklog() { backlogElementsOfSplit.set(splitBacklogMessages); } + private void reportBacklogMetrics() { + for (PartitionState p : partitionStates) { + long pBacklog = p.approxBacklogInBytes(); + if (pBacklog != UnboundedReader.BACKLOG_UNKNOWN) { + kafkaResults.updateBacklogBytes( + p.topicPartition().topic(), p.topicPartition().partition(), pBacklog); + } + } + } + private long getSplitBacklogMessageCount() { long backlogCount = 0; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java index b84e143be773..e9339878d42f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java @@ -24,7 +24,9 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -59,6 +61,9 @@ public static class TestMetricsContainer extends MetricsContainerImpl { perWorkerHistograms = new ConcurrentHashMap, TestHistogram>(); + public ConcurrentHashMap perWorkerGauges = + new ConcurrentHashMap(); + public TestMetricsContainer() { super("TestStep"); } @@ -70,9 +75,16 @@ public Histogram getPerWorkerHistogram( return perWorkerHistograms.get(KV.of(metricName, bucketType)); } + @Override + public Gauge getPerWorkerGauge(MetricName metricName) { + perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName)); + return perWorkerGauges.get(metricName); + } + @Override public void reset() { perWorkerHistograms.clear(); + perWorkerGauges.clear(); } } @@ -83,10 +95,11 @@ public void testNoOpKafkaMetrics() throws Exception { KafkaMetrics results = KafkaMetrics.NoOpKafkaMetrics.getInstance(); results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); - + results.updateBacklogBytes("test-topic", 0, 10); results.updateKafkaMetrics(); assertThat(testContainer.perWorkerHistograms.size(), equalTo(0)); + assertThat(testContainer.perWorkerGauges.size(), equalTo(0)); } @Test @@ -99,6 +112,7 @@ public void testKafkaRPCLatencyMetrics() throws Exception { KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics(); results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); + results.updateBacklogBytes("test-topic", 0, 10); results.updateKafkaMetrics(); // RpcLatency*rpc_method:POLL;topic_name:test-topic @@ -110,6 +124,11 @@ public void testKafkaRPCLatencyMetrics() throws Exception { assertThat( testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, containsInAnyOrder(Double.valueOf(10.0))); + + MetricName gaugeName = + MetricName.named("KafkaSink", "EstimatedBacklogSize*partition_id:0;topic_name:test-topic;"); + assertThat(testContainer.perWorkerGauges.size(), equalTo(1)); + assertThat(testContainer.perWorkerGauges.get(gaugeName).getCumulative().value(), equalTo(10L)); } @Test