Skip to content

Commit

Permalink
Address John's comments about separting conversion and validation checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Dec 19, 2024
1 parent 948dfe6 commit 11bee27
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,37 @@
import com.google.api.services.dataflow.model.OutlierStats;
import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support
* converting metrics from {@link BigQuerySinkMetrics} and from {@link KafkaSinkMetrics} with this
* converter.
*/
public class MetricsToPerStepNamespaceMetricsConverter {

private static final Logger LOG =
LoggerFactory.getLogger(MetricsToPerStepNamespaceMetricsConverter.class);

// Avoids to introduce mandatory kafka-io dependency to Dataflow worker
// keep in sync with org.apache.beam.sdk.io.kafka.KafkaSinkMetrics.METRICS_NAMESPACE
public static String KAFKA_SINK_METRICS_NAMESPACE = "KafkaSink";
private static String[] SUPPORTED_NAMESPACES = {
KAFKA_SINK_METRICS_NAMESPACE, BigQuerySinkMetrics.METRICS_NAMESPACE
};

private static Optional<LabeledMetricNameUtils.ParsedMetricName> getParsedMetricName(
MetricName metricName,
Expand All @@ -60,6 +71,23 @@ private static Optional<LabeledMetricNameUtils.ParsedMetricName> getParsedMetric
return parsedMetricName;
}

/**
* @param metricName The {@link MetricName} that represents this metric.
* @return boolean If the metric is from a supported namespace.
*/
private static boolean isNameSpaceSupported(MetricName metricName) {
boolean isValidNameSpace =
Stream.of(SUPPORTED_NAMESPACES).anyMatch(x -> x.equals(metricName.getNamespace()));
if (!isValidNameSpace) {
LOG.warn(
"Dropping metric {} since {} is not one of the supported namespaces: {}",
metricName,
metricName.getNamespace(),
Arrays.toString(SUPPORTED_NAMESPACES));
}
return isValidNameSpace;
}

/**
* @param metricName The {@link MetricName} that represents this counter.
* @param value The counter value.
Expand All @@ -71,9 +99,7 @@ private static Optional<MetricValue> convertCounterToMetricValue(
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

if (value == 0
|| (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)
&& !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) {
if (value == 0) {
return Optional.empty();
}

Expand All @@ -98,11 +124,6 @@ private static Optional<MetricValue> convertGaugeToMetricValue(
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

if ((!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)
&& !metricName.getNamespace().equals(KAFKA_SINK_METRICS_NAMESPACE))) {
return Optional.empty();
}

Optional<LabeledMetricNameUtils.ParsedMetricName> labeledName =
getParsedMetricName(metricName, parsedPerWorkerMetricsCache);
if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) {
Expand Down Expand Up @@ -237,6 +258,11 @@ public static Collection<PerStepNamespaceMetrics> convert(
for (Entry<MetricName, Long> entry : counters.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}

Optional<MetricValue> metricValue =
convertCounterToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
Expand All @@ -259,6 +285,11 @@ public static Collection<PerStepNamespaceMetrics> convert(

for (Entry<MetricName, LockFreeHistogram.Snapshot> entry : histograms.entrySet()) {
MetricName metricName = entry.getKey();

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}
Optional<MetricValue> metricValue =
convertHistogramToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
Expand All @@ -281,8 +312,13 @@ public static Collection<PerStepNamespaceMetrics> convert(

for (Entry<MetricName, Long> entry : gauges.entrySet()) {
MetricName metricName = entry.getKey();
Optional<MetricValue> metricValue;
metricValue =

boolean validNameSpace = isNameSpaceSupported(metricName);
if (!validNameSpace) {
continue;
}

Optional<MetricValue> metricValue =
convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,29 @@ public void testConvert_skipInvalidMetricNames() {
assertThat(parsedMetricNames.size(), equalTo(0));
}

@Test
public void testConvert_skipInvalidMetricNameSpaces() {
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames = new HashMap<>();

Map<MetricName, Long> counters = new HashMap<>();
Map<MetricName, Long> emptyGauges = new HashMap<MetricName, Long>();
MetricName invalidNameSpace1 = MetricName.named("Unsupported", "baseLabel");
counters.put(invalidNameSpace1, 5L);

Map<MetricName, LockFreeHistogram.Snapshot> histograms = new HashMap<>();
MetricName invalidNameSpace2 = MetricName.named("Unsupported", "****");
LockFreeHistogram nonEmptyLinearHistogram =
new LockFreeHistogram(invalidNameSpace2, lienarBuckets);
nonEmptyLinearHistogram.update(-5.0);
histograms.put(invalidNameSpace2, nonEmptyLinearHistogram.getSnapshotAndReset().get());

Collection<PerStepNamespaceMetrics> conversionResult =
MetricsToPerStepNamespaceMetricsConverter.convert(
"testStep", counters, emptyGauges, histograms, parsedMetricNames);
assertThat(conversionResult.size(), equalTo(0));
assertThat(parsedMetricNames.size(), equalTo(0));
}

@Test
public void testConvert_successfulConvertHistograms() {
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames = new HashMap<>();
Expand Down

0 comments on commit 11bee27

Please sign in to comment.