Skip to content

Commit

Permalink
Fix compilation and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitsinha54 committed Jul 6, 2024
1 parent b733f96 commit 44a3901
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;

/** Test class to be used as a input to {@link MetricsSink} implementations tests. */
Expand Down Expand Up @@ -71,4 +73,13 @@ public List<MetricResult<GaugeResult>> getGauges() {
GaugeResult.create(100L, new Instant(345862800L)),
GaugeResult.create(120L, new Instant(345862800L)));
}

@Override
public Iterable<MetricResult<StringSetResult>> getStringSets() {
return makeResults(
"s3",
"n3",
StringSetResult.create(ImmutableSet.of("ab")),
StringSetResult.create(ImmutableSet.of("cd")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ public void testWriteMetricsWithCommittedSupported() throws Exception {
+ "\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":"
+ "\"ns1\"},\"step\":\"s3\"}]}";
+ "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd"
+ "\"]},\"committed\":{\"stringSet\":[\"ab\"]},\"name\":{\"name\":\"n3\","
+ "\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size());
assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0));
}
Expand All @@ -114,7 +116,8 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception {
+ "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\""
+ ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":"
+ "\"ns1\"},\"step\":\"s3\"}]}";
+ "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]},"
+ "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size());
assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

Expand Down Expand Up @@ -84,6 +85,11 @@ public Iterable<MetricResult<DistributionResult>> getDistributions() {
public Iterable<MetricResult<GaugeResult>> getGauges() {
return Collections.emptyList();
}

@Override
public Iterable<MetricResult<StringSetResult>> getStringSets() {
return Collections.emptyList();
}
};
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
Expand All @@ -33,6 +34,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -52,6 +54,9 @@ public class JetMetricResults extends MetricResults {
@GuardedBy("this")
private final Gauges gauges = new Gauges();

@GuardedBy("this")
private final StringSets stringSet = new StringSets();

@GuardedBy("this")
private IMap<String, MetricUpdates> metricsAccumulator;

Expand All @@ -70,18 +75,21 @@ public synchronized MetricQueryResults queryMetrics(@Nullable MetricsFilter filt
updateLocalMetrics(metricsAccumulator);
}
return new QueryResults(
counters.filter(filter), distributions.filter(filter), gauges.filter(filter));
counters.filter(filter), distributions.filter(filter), gauges.filter(filter),
stringSet.filter(filter));
}

private synchronized void updateLocalMetrics(IMap<String, MetricUpdates> metricsAccumulator) {
counters.clear();
distributions.clear();
gauges.clear();
stringSet.clear();

for (MetricUpdates metricUpdates : metricsAccumulator.values()) {
counters.merge(metricUpdates.counterUpdates());
distributions.merge(metricUpdates.distributionUpdates());
gauges.merge(metricUpdates.gaugeUpdates());
stringSet.merge(metricUpdates.stringSetUpdates());
}
}

Expand All @@ -93,14 +101,17 @@ private static class QueryResults extends MetricQueryResults {
private final Iterable<MetricResult<Long>> counters;
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;

private QueryResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges) {
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
}

@Override
Expand All @@ -117,6 +128,11 @@ public Iterable<MetricResult<DistributionResult>> getDistributions() {
public Iterable<MetricResult<GaugeResult>> getGauges() {
return gauges;
}

@Override
public Iterable<MetricResult<StringSetResult>> getStringSets() {
return stringSets;
}
}

private static class Counters {
Expand Down Expand Up @@ -212,4 +228,35 @@ private MetricResult<GaugeResult> toUpdateResult(Map.Entry<MetricKey, GaugeData>
return MetricResult.create(key, gaugeResult, gaugeResult);
}
}

private static class StringSets {

private final Map<MetricKey, StringSetData> stringSets = new HashMap<>();

void merge(Iterable<MetricUpdate<StringSetData>> updates) {
for (MetricUpdate<StringSetData> update : updates) {
MetricKey key = update.getKey();
StringSetData oldStringSet = stringSets.getOrDefault(key, StringSetData.empty());
StringSetData updatedStringSet = update.getUpdate().combine(oldStringSet);
stringSets.put(key, updatedStringSet);
}
}

void clear() {
stringSets.clear();
}

Iterable<MetricResult<StringSetResult>> filter(MetricsFilter filter) {
return FluentIterable.from(stringSets.entrySet())
.filter(matchesFilter(filter))
.transform(this::toUpdateResult)
.toList();
}

private MetricResult<StringSetResult> toUpdateResult(Map.Entry<MetricKey, StringSetData> entry) {
MetricKey key = entry.getKey();
StringSetResult stringSetResult = entry.getValue().extractResult();
return MetricResult.create(key, stringSetResult, stringSetResult);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -41,6 +42,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

@SuppressWarnings({
Expand All @@ -53,14 +55,17 @@ public class PortableMetrics extends MetricResults {
private Iterable<MetricResult<Long>> counters;
private Iterable<MetricResult<DistributionResult>> distributions;
private Iterable<MetricResult<GaugeResult>> gauges;
private Iterable<MetricResult<StringSetResult>> stringSets;

private PortableMetrics(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges) {
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
}

public static PortableMetrics of(JobApi.MetricResults jobMetrics) {
Expand All @@ -75,7 +80,9 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
Iterables.filter(
this.distributions,
(distribution) -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(this.gauges, (gauge) -> MetricFiltering.matches(filter, gauge.getKey())));
Iterables.filter(this.gauges, (gauge) -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(this.stringSets, (stringSet) -> MetricFiltering.matches(filter,
stringSet.getKey())));
}

private static PortableMetrics convertMonitoringInfosToMetricResults(
Expand All @@ -89,7 +96,12 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
extractDistributionMetricsFromJobMetrics(monitoringInfoList);
Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
extractGaugeMetricsFromJobMetrics(monitoringInfoList);
return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics, gaugesFromMetrics);
//TODO: Need more work here for portable metrics. Need to introduce a new urn/encoder/decoder
// and more for set of string.
Iterable<MetricResult<StringSetResult>> stringSetFromMetrics =
extractStringSetMetricsFromJobMetrics();
return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics,
gaugesFromMetrics, stringSetFromMetrics);
}

private static Iterable<MetricResult<DistributionResult>>
Expand Down Expand Up @@ -123,6 +135,12 @@ private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
return MetricResult.create(key, false, result);
}

private static Iterable<MetricResult<StringSetResult>> extractStringSetMetricsFromJobMetrics() {
MetricKey metricKey = MetricKey.create("not-supported",
MetricName.named("not-supported", "not-supported"));
return Collections.singleton(MetricResult.create(metricKey, false, StringSetResult.empty()));
}

private static MetricResult<DistributionResult> convertDistributionMonitoringInfoToDistribution(
MetricsApi.MonitoringInfo monitoringInfo) {
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
Expand Down

0 comments on commit 44a3901

Please sign in to comment.