diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java index ce3464f522df..83ab9434a7d3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java @@ -26,16 +26,20 @@ import com.google.api.services.dataflow.model.OutlierStats; import com.google.api.services.dataflow.model.PerStepNamespaceMetrics; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.stream.Stream; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support @@ -43,9 +47,16 @@ * converter. */ public class MetricsToPerStepNamespaceMetricsConverter { + + private static final Logger LOG = + LoggerFactory.getLogger(MetricsToPerStepNamespaceMetricsConverter.class); + // Avoids to introduce mandatory kafka-io dependency to Dataflow worker // keep in sync with org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE public static String KAFKA_SINK_METRICS_NAMESPACE = "KafkaSink"; + private static String[] SUPPORTED_NAMESPACES = { + KAFKA_SINK_METRICS_NAMESPACE, BigQuerySinkMetrics.METRICS_NAMESPACE + }; private static Optional getParsedMetricName( MetricName metricName, @@ -60,6 +71,23 @@ private static Optional getParsedMetric return parsedMetricName; } + /** + * @param metricName The {@link MetricName} that represents this metric. + * @return boolean If the metric is from a supported namespace. + */ + private static boolean isNameSpaceSupported(MetricName metricName) { + boolean isValidNameSpace = + Stream.of(SUPPORTED_NAMESPACES).anyMatch(x -> x.equals(metricName.getNamespace())); + if (!isValidNameSpace) { + LOG.warn( + "Dropping metric {} since {} is not one of the supported namespaces: {}", + metricName, + metricName.getNamespace(), + Arrays.toString(SUPPORTED_NAMESPACES)); + } + return isValidNameSpace; + } + /** * @param metricName The {@link MetricName} that represents this counter. * @param value The counter value. @@ -71,9 +99,7 @@ private static Optional convertCounterToMetricValue( Long value, Map parsedPerWorkerMetricsCache) { - if (value == 0 - || (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE) - && !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) { + if (value == 0) { return Optional.empty(); } @@ -98,11 +124,6 @@ private static Optional convertGaugeToMetricValue( Long value, Map parsedPerWorkerMetricsCache) { - if ((!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE) - && !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) { - return Optional.empty(); - } - Optional labeledName = getParsedMetricName(metricName, parsedPerWorkerMetricsCache); if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) { @@ -237,6 +258,11 @@ public static Collection convert( for (Entry entry : counters.entrySet()) { MetricName metricName = entry.getKey(); + boolean validNameSpace = isNameSpaceSupported(metricName); + if (!validNameSpace) { + continue; + } + Optional metricValue = convertCounterToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); if (!metricValue.isPresent()) { @@ -259,6 +285,11 @@ public static Collection convert( for (Entry entry : histograms.entrySet()) { MetricName metricName = entry.getKey(); + + boolean validNameSpace = isNameSpaceSupported(metricName); + if (!validNameSpace) { + continue; + } Optional metricValue = convertHistogramToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); if (!metricValue.isPresent()) { @@ -281,8 +312,13 @@ public static Collection convert( for (Entry entry : gauges.entrySet()) { MetricName metricName = entry.getKey(); - Optional metricValue; - metricValue = + + boolean validNameSpace = isNameSpaceSupported(metricName); + if (!validNameSpace) { + continue; + } + + Optional metricValue = convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache); if (!metricValue.isPresent()) { continue; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java index 3161b961cf9e..331f3005bf41 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java @@ -152,6 +152,29 @@ public void testConvert_skipInvalidMetricNames() { assertThat(parsedMetricNames.size(), equalTo(0)); } + @Test + public void testConvert_skipInvalidMetricNameSpaces() { + Map parsedMetricNames = new HashMap<>(); + + Map counters = new HashMap<>(); + Map emptyGauges = new HashMap(); + MetricName invalidNameSpace1 = MetricName.named("Unsupported", "baseLabel"); + counters.put(invalidNameSpace1, 5L); + + Map histograms = new HashMap<>(); + MetricName invalidNameSpace2 = MetricName.named("Unsupported", "****"); + LockFreeHistogram nonEmptyLinearHistogram = + new LockFreeHistogram(invalidNameSpace2, lienarBuckets); + nonEmptyLinearHistogram.update(-5.0); + histograms.put(invalidNameSpace2, nonEmptyLinearHistogram.getSnapshotAndReset().get()); + + Collection conversionResult = + MetricsToPerStepNamespaceMetricsConverter.convert( + "testStep", counters, emptyGauges, histograms, parsedMetricNames); + assertThat(conversionResult.size(), equalTo(0)); + assertThat(parsedMetricNames.size(), equalTo(0)); + } + @Test public void testConvert_successfulConvertHistograms() { Map parsedMetricNames = new HashMap<>();