diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java index a9cea996680b..96c0374067cf 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricsSink; +import org.apache.beam.sdk.metrics.StringSetResult; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Instant; /** Test class to be used as a input to {@link MetricsSink} implementations tests. */ @@ -71,4 +73,13 @@ public List> getGauges() { GaugeResult.create(100L, new Instant(345862800L)), GaugeResult.create(120L, new Instant(345862800L))); } + + @Override + public Iterable> getStringSets() { + return makeResults( + "s3", + "n3", + StringSetResult.create(ImmutableSet.of("ab")), + StringSetResult.create(ImmutableSet.of("cd"))); + } } diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java index afbe77bdb885..10e9481d271b 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java @@ -94,7 +94,9 @@ public void testWriteMetricsWithCommittedSupported() throws Exception { + "\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}]}"; + + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd" + + "\"]},\"committed\":{\"stringSet\":[\"ab\"]},\"name\":{\"name\":\"n3\"," + + "\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0)); } @@ -114,7 +116,8 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception { + "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\"" + ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}]}"; + + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]}," + + "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0)); } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java index b6dae10da6bc..67cf3280a83c 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.StringSetResult; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -84,6 +85,11 @@ public Iterable> getDistributions() { public Iterable> getGauges() { return Collections.emptyList(); } + + @Override + public Iterable> getStringSets() { + return Collections.emptyList(); + } }; } }; diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java index 8e28f3fda0e8..10749f854dc4 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.core.metrics.GaugeData; import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; +import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricFiltering; @@ -33,6 +34,7 @@ import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.checkerframework.checker.nullness.qual.Nullable; @@ -52,6 +54,9 @@ public class JetMetricResults extends MetricResults { @GuardedBy("this") private final Gauges gauges = new Gauges(); + @GuardedBy("this") + private final StringSets stringSet = new StringSets(); + @GuardedBy("this") private IMap metricsAccumulator; @@ -70,18 +75,21 @@ public synchronized MetricQueryResults queryMetrics(@Nullable MetricsFilter filt updateLocalMetrics(metricsAccumulator); } return new QueryResults( - counters.filter(filter), distributions.filter(filter), gauges.filter(filter)); + counters.filter(filter), distributions.filter(filter), gauges.filter(filter), + stringSet.filter(filter)); } private synchronized void updateLocalMetrics(IMap metricsAccumulator) { counters.clear(); distributions.clear(); gauges.clear(); + stringSet.clear(); for (MetricUpdates metricUpdates : metricsAccumulator.values()) { counters.merge(metricUpdates.counterUpdates()); distributions.merge(metricUpdates.distributionUpdates()); gauges.merge(metricUpdates.gaugeUpdates()); + stringSet.merge(metricUpdates.stringSetUpdates()); } } @@ -93,14 +101,17 @@ private static class QueryResults extends MetricQueryResults { private final Iterable> counters; private final Iterable> distributions; private final Iterable> gauges; + private final Iterable> stringSets; private QueryResults( Iterable> counters, Iterable> distributions, - Iterable> gauges) { + Iterable> gauges, + Iterable> stringSets) { this.counters = counters; this.distributions = distributions; this.gauges = gauges; + this.stringSets = stringSets; } @Override @@ -117,6 +128,11 @@ public Iterable> getDistributions() { public Iterable> getGauges() { return gauges; } + + @Override + public Iterable> getStringSets() { + return stringSets; + } } private static class Counters { @@ -212,4 +228,35 @@ private MetricResult toUpdateResult(Map.Entry return MetricResult.create(key, gaugeResult, gaugeResult); } } + + private static class StringSets { + + private final Map stringSets = new HashMap<>(); + + void merge(Iterable> updates) { + for (MetricUpdate update : updates) { + MetricKey key = update.getKey(); + StringSetData oldStringSet = stringSets.getOrDefault(key, StringSetData.empty()); + StringSetData updatedStringSet = update.getUpdate().combine(oldStringSet); + stringSets.put(key, updatedStringSet); + } + } + + void clear() { + stringSets.clear(); + } + + Iterable> filter(MetricsFilter filter) { + return FluentIterable.from(stringSets.entrySet()) + .filter(matchesFilter(filter)) + .transform(this::toUpdateResult) + .toList(); + } + + private MetricResult toUpdateResult(Map.Entry entry) { + MetricKey key = entry.getKey(); + StringSetResult stringSetResult = entry.getValue().extractResult(); + return MetricResult.create(key, stringSetResult, stringSetResult); + } + } } diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java index fc94e408bfd3..72c6361d15a9 100644 --- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java +++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java @@ -25,6 +25,7 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -41,6 +42,7 @@ import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @SuppressWarnings({ @@ -53,14 +55,17 @@ public class PortableMetrics extends MetricResults { private Iterable> counters; private Iterable> distributions; private Iterable> gauges; + private Iterable> stringSets; private PortableMetrics( Iterable> counters, Iterable> distributions, - Iterable> gauges) { + Iterable> gauges, + Iterable> stringSets) { this.counters = counters; this.distributions = distributions; this.gauges = gauges; + this.stringSets = stringSets; } public static PortableMetrics of(JobApi.MetricResults jobMetrics) { @@ -75,7 +80,9 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) { Iterables.filter( this.distributions, (distribution) -> MetricFiltering.matches(filter, distribution.getKey())), - Iterables.filter(this.gauges, (gauge) -> MetricFiltering.matches(filter, gauge.getKey()))); + Iterables.filter(this.gauges, (gauge) -> MetricFiltering.matches(filter, gauge.getKey())), + Iterables.filter(this.stringSets, (stringSet) -> MetricFiltering.matches(filter, + stringSet.getKey()))); } private static PortableMetrics convertMonitoringInfosToMetricResults( @@ -89,7 +96,12 @@ private static PortableMetrics convertMonitoringInfosToMetricResults( extractDistributionMetricsFromJobMetrics(monitoringInfoList); Iterable> gaugesFromMetrics = extractGaugeMetricsFromJobMetrics(monitoringInfoList); - return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics, gaugesFromMetrics); + //TODO: Need more work here for portable metrics. Need to introduce a new urn/encoder/decoder + // and more for set of string. + Iterable> stringSetFromMetrics = + extractStringSetMetricsFromJobMetrics(); + return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics, + gaugesFromMetrics, stringSetFromMetrics); } private static Iterable> @@ -123,6 +135,12 @@ private static MetricResult convertGaugeMonitoringInfoToGauge( return MetricResult.create(key, false, result); } + private static Iterable> extractStringSetMetricsFromJobMetrics() { + MetricKey metricKey = MetricKey.create("not-supported", + MetricName.named("not-supported", "not-supported")); + return Collections.singleton(MetricResult.create(metricKey, false, StringSetResult.empty())); + } + private static MetricResult convertDistributionMonitoringInfoToDistribution( MetricsApi.MonitoringInfo monitoringInfo) { Map labelsMap = monitoringInfo.getLabelsMap();