Skip to content

Commit

Permalink
Add MetricsContainer support to the Flink sources. #28609
Browse files Browse the repository at this point in the history
Co-authored-by: Jiangjie Qin <[email protected]>
  • Loading branch information
tvalentyn and Jiangjie Qin authored Oct 24, 2023
2 parents bc06581 + 36ba537 commit 8a2ed4d
Showing 18 changed files with 347 additions and 183 deletions.
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ public class SourceTestCompat {
public static class TestMetricGroup
extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup {
public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
public final Map<String, Counter> registeredCounter = new HashMap<>();
public final Counter numRecordsInCounter = new SimpleCounter();

@Override
@@ -41,6 +42,18 @@ public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT gauge) {
return gauge;
}

@Override
public Counter counter(String name) {
// The OperatorIOMetricsGroup will register some IO metrics in the constructor.
// At that time, the construction of this class has not finihsed yet, so we
// need to delegate the call to the parent class.
if (registeredCounter != null) {
return registeredCounter.computeIfAbsent(name, ignored -> super.counter(name));
} else {
return super.counter(name);
}
}

@Override
public OperatorIOMetricGroup getIOMetricGroup() {
return new OperatorIOMetricGroup(this) {
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ public class SourceTestCompat {
public static class TestMetricGroup extends UnregisteredMetricsGroup
implements SourceReaderMetricGroup {
public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
public final Map<String, Counter> registeredCounter = new HashMap<>();
public final Counter numRecordsInCounter = new SimpleCounter();

@Override
@@ -52,6 +53,18 @@ public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT gauge) {
return gauge;
}

@Override
public Counter counter(String name) {
// The OperatorIOMetricsGroup will register some IO metrics in the constructor.
// At that time, the construction of this class has not finihsed yet, so we
// need to delegate the call to the parent class.
if (registeredCounter != null) {
return registeredCounter.computeIfAbsent(name, ignored -> super.counter(name));
} else {
return super.counter(name);
}
}

@Override
public Counter getNumRecordsInErrorsCounter() {
return new SimpleCounter();
Original file line number Diff line number Diff line change
@@ -18,11 +18,11 @@
package org.apache.beam.runners.flink;

import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.ACCUMULATOR_NAME;

import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.joda.time.Duration;
@@ -80,6 +80,6 @@ public MetricResults metrics() {
}

MetricsContainerStepMap getMetricsContainerStepMap() {
return (MetricsContainerStepMap) accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME);
return (MetricsContainerStepMap) accumulators.get(ACCUMULATOR_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -17,29 +17,10 @@
*/
package org.apache.beam.runners.flink.metrics;

import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -52,31 +33,15 @@
* which have a defined end. They are not essential during execution because metrics will also be
* reported using the configured metrics reporter.
*/
public class FlinkMetricContainer {

public class FlinkMetricContainer extends FlinkMetricContainerBase {
public static final String ACCUMULATOR_NAME = "__metricscontainers";

private static final Logger LOG = LoggerFactory.getLogger(FlinkMetricContainer.class);

private static final String METRIC_KEY_SEPARATOR =
GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER);

private final MetricsContainerStepMap metricsContainers;
private final RuntimeContext runtimeContext;
private final Map<String, Counter> flinkCounterCache;
private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache;
private final Map<String, FlinkGauge> flinkGaugeCache;

public FlinkMetricContainer(RuntimeContext runtimeContext) {
super(runtimeContext.getMetricGroup());
this.runtimeContext = runtimeContext;
this.flinkCounterCache = new HashMap<>();
this.flinkDistributionGaugeCache = new HashMap<>();
this.flinkGaugeCache = new HashMap<>();
this.metricsContainers = new MetricsContainerStepMap();
}

public MetricsContainerImpl getMetricsContainer(String stepName) {
return metricsContainers.getContainer(stepName);
}

/**
@@ -99,125 +64,4 @@ public void registerMetricsForPipelineResult() {
}
metricsAccumulator.add(metricsContainers);
}

/**
* Update this container with metrics from the passed {@link MonitoringInfo}s, and send updates
* along to Flink's internal metrics framework.
*/
public void updateMetrics(String stepName, List<MonitoringInfo> monitoringInfos) {
getMetricsContainer(stepName).update(monitoringInfos);
updateMetrics(stepName);
}

/**
* Update Flink's internal metrics ({@link this#flinkCounterCache}) with the latest metrics for a
* given step.
*/
void updateMetrics(String stepName) {
MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
MetricQueryResults metricQueryResults =
metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build());
updateCounters(metricQueryResults.getCounters());
updateDistributions(metricQueryResults.getDistributions());
updateGauge(metricQueryResults.getGauges());
}

private void updateCounters(Iterable<MetricResult<Long>> counters) {
for (MetricResult<Long> metricResult : counters) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

Long update = metricResult.getAttempted();

// update flink metric
Counter counter =
flinkCounterCache.computeIfAbsent(
flinkMetricName, n -> runtimeContext.getMetricGroup().counter(n));
// Beam counters are already pre-aggregated, just update with the current value here
counter.inc(update - counter.getCount());
}
}

private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
for (MetricResult<DistributionResult> metricResult : distributions) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

DistributionResult update = metricResult.getAttempted();

// update flink metric
FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
if (gauge == null) {
gauge =
runtimeContext
.getMetricGroup()
.gauge(flinkMetricName, new FlinkDistributionGauge(update));
flinkDistributionGaugeCache.put(flinkMetricName, gauge);
} else {
gauge.update(update);
}
}
}

private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
for (MetricResult<GaugeResult> metricResult : gauges) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

GaugeResult update = metricResult.getAttempted();

// update flink metric
FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
if (gauge == null) {
gauge = runtimeContext.getMetricGroup().gauge(flinkMetricName, new FlinkGauge(update));
flinkGaugeCache.put(flinkMetricName, gauge);
} else {
gauge.update(update);
}
}
}

@VisibleForTesting
static String getFlinkMetricNameString(MetricKey metricKey) {
MetricName metricName = metricKey.metricName();
// We use only the MetricName here, the step name is already contained
// in the operator name which is passed to Flink's MetricGroup to which
// the metric with the following name will be added.
return metricName.getNamespace() + METRIC_KEY_SEPARATOR + metricName.getName();
}

/** Flink {@link Gauge} for {@link DistributionResult}. */
public static class FlinkDistributionGauge implements Gauge<DistributionResult> {

DistributionResult data;

FlinkDistributionGauge(DistributionResult data) {
this.data = data;
}

void update(DistributionResult data) {
this.data = data;
}

@Override
public DistributionResult getValue() {
return data;
}
}

/** Flink {@link Gauge} for {@link GaugeResult}. */
public static class FlinkGauge implements Gauge<Long> {

GaugeResult data;

FlinkGauge(GaugeResult data) {
this.data = data;
}

void update(GaugeResult update) {
this.data = update;
}

@Override
public Long getValue() {
return data.getValue();
}
}
}
Loading

0 comments on commit 8a2ed4d

Please sign in to comment.