Skip to content

Commit

Permalink
add counter stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Dec 18, 2024
1 parent e68a79c commit 2791b23
Show file tree
Hide file tree
Showing 13 changed files with 432 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20240919-2.0.0", // [bomupgrader] sets version
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20241209-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20240924-2.0.0", // [bomupgrader] sets version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public Gauge getGauge(MetricName metricName) {
return getCurrentContainer().getGauge(metricName);
}

@Override
public Gauge getPerWorkerGauge(MetricName metricName) {
Gauge gauge = getCurrentContainer().getPerWorkerGauge(metricName);
return gauge;
}

@Override
public StringSet getStringSet(MetricName metricName) {
return getCurrentContainer().getStringSet(metricName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.dataflow.model.Base2Exponent;
import com.google.api.services.dataflow.model.BucketOptions;
import com.google.api.services.dataflow.model.DataflowGaugeValue;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import com.google.api.services.dataflow.model.Linear;
import com.google.api.services.dataflow.model.MetricValue;
Expand Down Expand Up @@ -86,6 +87,38 @@ private static Optional<MetricValue> convertCounterToMetricValue(
.setValueInt64(value));
}

/**
* @param metricName The {@link MetricName} that represents this counter.
* @param value The counter value.
* @return If the conversion succeeds, {@code MetricValue} that represents this counter. Otherwise
* returns an empty optional
*/
private static Optional<MetricValue> convertGaugeToMetricValue(
MetricName metricName,
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

if ((!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)
&& !metricName.getNamespace().equals(KafkaSinkMetrics.METRICS_NAMESPACE))) {
return Optional.empty();
}

Optional<LabeledMetricNameUtils.ParsedMetricName> labeledName =
getParsedMetricName(metricName, parsedPerWorkerMetricsCache);
if (!labeledName.isPresent() || labeledName.get().getBaseName().isEmpty()) {
return Optional.empty();
}

DataflowGaugeValue gauge_value = new DataflowGaugeValue();
gauge_value.setValue(value);

return Optional.of(
new MetricValue()
.setMetric(labeledName.get().getBaseName())
.setMetricLabels(labeledName.get().getMetricLabels())
.setValueGauge64(gauge_value));
}

/**
* Adds {@code outlierStats} to {@code outputHistogram} if {@code inputHistogram} has recorded
* overflow or underflow values.
Expand Down Expand Up @@ -196,6 +229,7 @@ private static Optional<MetricValue> convertHistogramToMetricValue(
public static Collection<PerStepNamespaceMetrics> convert(
String stepName,
Map<MetricName, Long> counters,
Map<MetricName, Long> gauges,
Map<MetricName, LockFreeHistogram.Snapshot> histograms,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {

Expand Down Expand Up @@ -245,6 +279,27 @@ public static Collection<PerStepNamespaceMetrics> convert(
stepNamespaceMetrics.getMetricValues().add(metricValue.get());
}

for (Entry<MetricName, Long> entry : gauges.entrySet()) {
MetricName metricName = entry.getKey();
Optional<MetricValue> metricValue;
metricValue =
convertGaugeToMetricValue(metricName, entry.getValue(), parsedPerWorkerMetricsCache);
if (!metricValue.isPresent()) {
continue;
}

PerStepNamespaceMetrics stepNamespaceMetrics =
metricsByNamespace.get(metricName.getNamespace());
if (stepNamespaceMetrics == null) {
stepNamespaceMetrics =
new PerStepNamespaceMetrics()
.setMetricValues(new ArrayList<>())
.setOriginalStep(stepName)
.setMetricsNamespace(metricName.getNamespace());
metricsByNamespace.put(metricName.getNamespace(), stepNamespaceMetrics);
}
stepNamespaceMetrics.getMetricValues().add(metricValue.get());
}
return metricsByNamespace.values();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class StreamingStepMetricsContainer implements MetricsContainer {

private final String stepName;

private static boolean enablePerWorkerMetrics = false;
Expand All @@ -69,6 +70,10 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

// how to handle concurrency
private final ConcurrentHashMap<MetricName, GaugeCell> perWorkerGauges =
new ConcurrentHashMap<>();

private MetricsMap<MetricName, StringSetCell> stringSet = new MetricsMap<>(StringSetCell::new);

private MetricsMap<MetricName, DeltaDistributionCell> distributions =
Expand Down Expand Up @@ -163,6 +168,19 @@ public Gauge getGauge(MetricName metricName) {
return gauges.get(metricName);
}

@Override
public Gauge getPerWorkerGauge(MetricName metricName) {
if (!enablePerWorkerMetrics) {
return MetricsContainer.super.getPerWorkerGauge(metricName); // returns no op gauge
}
Gauge val = perWorkerGauges.get(metricName);
if (val != null) {
return val;
}

return perWorkerGauges.computeIfAbsent(metricName, name -> new GaugeCell(metricName));
}

@Override
public StringSet getStringSet(MetricName metricName) {
return stringSet.get(metricName);
Expand Down Expand Up @@ -330,11 +348,10 @@ private void deleteStaleCounters(
@VisibleForTesting
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
ConcurrentHashMap<MetricName, Long> counters = new ConcurrentHashMap<MetricName, Long>();
ConcurrentHashMap<MetricName, Long> gauges = new ConcurrentHashMap<MetricName, Long>();
ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot> histograms =
new ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot>();
HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();

// Extract metrics updates.
perWorkerCounters.forEach(
(k, v) -> {
Long val = v.getAndSet(0);
Expand All @@ -344,6 +361,13 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
}
counters.put(k, val);
});

perWorkerGauges.forEach(
(k, v) -> {
Long val = v.getCumulative().value();
gauges.put(k, val); // no special handing for zero, since that value is important
v.reset();
});
perWorkerHistograms.forEach(
(k, v) -> {
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
Expand All @@ -352,7 +376,7 @@ Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));

return MetricsToPerStepNamespaceMetricsConverter.convert(
stepName, counters, histograms, parsedPerWorkerMetricsCache);
stepName, counters, gauges, histograms, parsedPerWorkerMetricsCache);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {

List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues()));

if (metrics.isEmpty()) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.api.services.dataflow.model.Base2Exponent;
import com.google.api.services.dataflow.model.BucketOptions;
import com.google.api.services.dataflow.model.DataflowGaugeValue;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import com.google.api.services.dataflow.model.Linear;
import com.google.api.services.dataflow.model.MetricValue;
Expand All @@ -44,6 +45,7 @@

@RunWith(JUnit4.class)
public class MetricsToPerStepNamespaceMetricsConverterTest {

private static final HistogramData.BucketType lienarBuckets =
HistogramData.LinearBuckets.of(0, 10, 10);
private static final HistogramData.BucketType exponentialBuckets =
Expand Down Expand Up @@ -86,6 +88,7 @@ public void testConvert_successfulyConvertCounters() {
String step = "testStepName";
Map<MetricName, LockFreeHistogram.Snapshot> emptyHistograms = new HashMap<>();
Map<MetricName, Long> counters = new HashMap<MetricName, Long>();
Map<MetricName, Long> emptyGauges = new HashMap<MetricName, Long>();
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames = new HashMap<>();

MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "metric1");
Expand All @@ -99,7 +102,7 @@ public void testConvert_successfulyConvertCounters() {

Collection<PerStepNamespaceMetrics> conversionResult =
MetricsToPerStepNamespaceMetricsConverter.convert(
step, counters, emptyHistograms, parsedMetricNames);
step, counters, emptyGauges, emptyHistograms, parsedMetricNames);

MetricValue expectedVal1 =
new MetricValue().setMetric("metric1").setValueInt64(5L).setMetricLabels(new HashMap<>());
Expand Down Expand Up @@ -133,6 +136,7 @@ public void testConvert_skipInvalidMetricNames() {
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames = new HashMap<>();

Map<MetricName, Long> counters = new HashMap<>();
Map<MetricName, Long> emptyGauges = new HashMap<MetricName, Long>();
MetricName invalidName1 = MetricName.named("BigQuerySink", "**");
counters.put(invalidName1, 5L);

Expand All @@ -144,15 +148,15 @@ public void testConvert_skipInvalidMetricNames() {

Collection<PerStepNamespaceMetrics> conversionResult =
MetricsToPerStepNamespaceMetricsConverter.convert(
"testStep", counters, histograms, parsedMetricNames);
"testStep", counters, emptyGauges, histograms, parsedMetricNames);
assertThat(conversionResult.size(), equalTo(0));
assertThat(parsedMetricNames.size(), equalTo(0));
}

@Test
public void testConvert_successfulConvertHistograms() {
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames = new HashMap<>();

Map<MetricName, Long> emptyGauges = new HashMap<MetricName, Long>();
Map<MetricName, LockFreeHistogram.Snapshot> histograms = new HashMap<>();
MetricName bigQueryMetric1 = MetricName.named("BigQuerySink", "baseLabel");
MetricName bigQueryMetric2 =
Expand Down Expand Up @@ -181,7 +185,7 @@ public void testConvert_successfulConvertHistograms() {
Map<MetricName, Long> emptyCounters = new HashMap<>();
Collection<PerStepNamespaceMetrics> conversionResult =
MetricsToPerStepNamespaceMetricsConverter.convert(
step, emptyCounters, histograms, parsedMetricNames);
step, emptyCounters, emptyGauges, histograms, parsedMetricNames);

// Expected value 1
List<Long> bucketCounts1 = ImmutableList.of(0L, 1L, 1L, 1L);
Expand Down Expand Up @@ -271,7 +275,7 @@ public void testConvert_skipUnknownHistogramBucketType() {

Collection<PerStepNamespaceMetrics> conversionResult =
MetricsToPerStepNamespaceMetricsConverter.convert(
step, emptyCounters, histograms, parsedMetricNames);
step, emptyCounters, emptyCounters, histograms, parsedMetricNames);
assertThat(conversionResult.size(), equalTo(0));
assertThat(parsedMetricNames.size(), equalTo(0));
}
Expand All @@ -280,6 +284,7 @@ public void testConvert_skipUnknownHistogramBucketType() {
public void testConvert_convertCountersAndHistograms() {
String step = "testStep";
Map<MetricName, Long> counters = new HashMap<>();
Map<MetricName, Long> emptyGauges = new HashMap<>();
Map<MetricName, LockFreeHistogram.Snapshot> histograms = new HashMap<>();
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames = new HashMap<>();

Expand All @@ -293,7 +298,7 @@ public void testConvert_convertCountersAndHistograms() {

Collection<PerStepNamespaceMetrics> conversionResult =
MetricsToPerStepNamespaceMetricsConverter.convert(
step, counters, histograms, parsedMetricNames);
step, counters, emptyGauges, histograms, parsedMetricNames);

// Expected counter MetricValue
Map<String, String> counterLabelMap = new HashMap<>();
Expand Down Expand Up @@ -345,4 +350,77 @@ public void testConvert_convertCountersAndHistograms() {
parsedMetricNames,
IsMapContaining.hasEntry(histogramMetricName, parsedHistogramMetricName));
}

@Test
public void testConvert_successfulyConvertGauges() {
String step = "testStepName";
Map<MetricName, LockFreeHistogram.Snapshot> emptyHistograms = new HashMap<>();
Map<MetricName, Long> counters = new HashMap<MetricName, Long>();
Map<MetricName, Long> gauges = new HashMap<MetricName, Long>();
// convertCountersAndHistograms
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedMetricNames = new HashMap<>();

MetricName KafkaMetric1 = MetricName.named("KafkaSink", "metric1");
MetricName KafkaMetric2 = MetricName.named("KafkaSink", "metric2*label1:val1;label2:val2;");
MetricName KafkaMetric3 = MetricName.named("KafkaSink", "metric3"); // ?

gauges.put(KafkaMetric1, 5L);
gauges.put(KafkaMetric2, 10L);
gauges.put(KafkaMetric3, 0L); // zero valued metric is still reported

Collection<PerStepNamespaceMetrics> conversionResult =
MetricsToPerStepNamespaceMetricsConverter.convert(
step, counters, gauges, emptyHistograms, parsedMetricNames);

DataflowGaugeValue gauge_value1 = new DataflowGaugeValue();
gauge_value1.setValue(5L);

DataflowGaugeValue gauge_value2 = new DataflowGaugeValue();
gauge_value2.setValue(10L);

DataflowGaugeValue gauge_value3 = new DataflowGaugeValue();
gauge_value3.setValue(0L); // zero valued

MetricValue expectedVal1 =
new MetricValue()
.setMetric("metric1")
.setValueGauge64(gauge_value1)
.setMetricLabels(new HashMap<>());

Map<String, String> val2LabelMap = new HashMap<>();
val2LabelMap.put("label1", "val1");
val2LabelMap.put("label2", "val2");
MetricValue expectedVal2 =
new MetricValue()
.setMetric("metric2")
.setValueGauge64(gauge_value2)
.setMetricLabels(val2LabelMap);

MetricValue expectedVal3 =
new MetricValue()
.setMetric("metric3")
.setValueGauge64(gauge_value3)
.setMetricLabels(new HashMap<>());

assertThat(conversionResult.size(), equalTo(1));
PerStepNamespaceMetrics perStepNamespaceMetrics = conversionResult.iterator().next();

assertThat(perStepNamespaceMetrics.getOriginalStep(), equalTo(step));
assertThat(perStepNamespaceMetrics.getMetricsNamespace(), equalTo("KafkaSink"));
assertThat(perStepNamespaceMetrics.getMetricValues().size(), equalTo(3));
assertThat(
perStepNamespaceMetrics.getMetricValues(),
containsInAnyOrder(expectedVal1, expectedVal2, expectedVal3));

LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric1 =
LabeledMetricNameUtils.parseMetricName(KafkaMetric1.getName()).get();
LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric2 =
LabeledMetricNameUtils.parseMetricName(KafkaMetric2.getName()).get();
LabeledMetricNameUtils.ParsedMetricName parsedKafkaMetric3 =
LabeledMetricNameUtils.parseMetricName(KafkaMetric3.getName()).get();
assertThat(parsedMetricNames.size(), equalTo(3));
assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric1, parsedKafkaMetric1));
assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric2, parsedKafkaMetric2));
assertThat(parsedMetricNames, IsMapContaining.hasEntry(KafkaMetric3, parsedKafkaMetric3));
}
}
Loading

0 comments on commit 2791b23

Please sign in to comment.