Skip to content

Commit

Permalink
Add MetricsContainer support to the Flink sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiangjie Qin authored and minxhe committed Nov 25, 2024
1 parent 47aeeba commit 6f9c14b
Show file tree
Hide file tree
Showing 17 changed files with 340 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SourceTestCompat {
public static class TestMetricGroup extends UnregisteredMetricsGroup
implements SourceReaderMetricGroup {
public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
public final Map<String, Counter> registeredCounter = new HashMap<>();
public final Counter numRecordsInCounter = new SimpleCounter();

@Override
Expand All @@ -52,6 +53,18 @@ public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT gauge) {
return gauge;
}

@Override
public Counter counter(String name) {
// The OperatorIOMetricsGroup will register some IO metrics in the constructor.
// At that time, the construction of this class has not finihsed yet, so we
// need to delegate the call to the parent class.
if (registeredCounter != null) {
return registeredCounter.computeIfAbsent(name, ignored -> super.counter(name));
} else {
return super.counter(name);
}
}

@Override
public Counter getNumRecordsInErrorsCounter() {
return new SimpleCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.beam.runners.flink;

import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.ACCUMULATOR_NAME;

import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.joda.time.Duration;
Expand Down Expand Up @@ -80,6 +80,6 @@ public MetricResults metrics() {
}

MetricsContainerStepMap getMetricsContainerStepMap() {
return (MetricsContainerStepMap) accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME);
return (MetricsContainerStepMap) accumulators.get(ACCUMULATOR_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,11 @@
*/
package org.apache.beam.runners.flink.metrics;

import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,31 +34,14 @@
* which have a defined end. They are not essential during execution because metrics will also be
* reported using the configured metrics reporter.
*/
public class FlinkMetricContainer {

public class FlinkMetricContainer extends FlinkMetricContainerBase {
public static final String ACCUMULATOR_NAME = "__metricscontainers";

private static final Logger LOG = LoggerFactory.getLogger(FlinkMetricContainer.class);

private static final String METRIC_KEY_SEPARATOR =
GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER);

private final MetricsContainerStepMap metricsContainers;
private final RuntimeContext runtimeContext;
private final Map<String, Counter> flinkCounterCache;
private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache;
private final Map<String, FlinkGauge> flinkGaugeCache;

public FlinkMetricContainer(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
this.flinkCounterCache = new HashMap<>();
this.flinkDistributionGaugeCache = new HashMap<>();
this.flinkGaugeCache = new HashMap<>();
this.metricsContainers = new MetricsContainerStepMap();
}

public MetricsContainerImpl getMetricsContainer(String stepName) {
return metricsContainers.getContainer(stepName);
}

/**
Expand All @@ -100,124 +65,8 @@ public void registerMetricsForPipelineResult() {
metricsAccumulator.add(metricsContainers);
}

/**
* Update this container with metrics from the passed {@link MonitoringInfo}s, and send updates
* along to Flink's internal metrics framework.
*/
public void updateMetrics(String stepName, List<MonitoringInfo> monitoringInfos) {
getMetricsContainer(stepName).update(monitoringInfos);
updateMetrics(stepName);
}

/**
* Update Flink's internal metrics ({@link this#flinkCounterCache}) with the latest metrics for a
* given step.
*/
void updateMetrics(String stepName) {
MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
MetricQueryResults metricQueryResults =
metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build());
updateCounters(metricQueryResults.getCounters());
updateDistributions(metricQueryResults.getDistributions());
updateGauge(metricQueryResults.getGauges());
}

private void updateCounters(Iterable<MetricResult<Long>> counters) {
for (MetricResult<Long> metricResult : counters) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

Long update = metricResult.getAttempted();

// update flink metric
Counter counter =
flinkCounterCache.computeIfAbsent(
flinkMetricName, n -> runtimeContext.getMetricGroup().counter(n));
// Beam counters are already pre-aggregated, just update with the current value here
counter.inc(update - counter.getCount());
}
}

private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
for (MetricResult<DistributionResult> metricResult : distributions) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

DistributionResult update = metricResult.getAttempted();

// update flink metric
FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
if (gauge == null) {
gauge =
runtimeContext
.getMetricGroup()
.gauge(flinkMetricName, new FlinkDistributionGauge(update));
flinkDistributionGaugeCache.put(flinkMetricName, gauge);
} else {
gauge.update(update);
}
}
}

private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
for (MetricResult<GaugeResult> metricResult : gauges) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

GaugeResult update = metricResult.getAttempted();

// update flink metric
FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
if (gauge == null) {
gauge = runtimeContext.getMetricGroup().gauge(flinkMetricName, new FlinkGauge(update));
flinkGaugeCache.put(flinkMetricName, gauge);
} else {
gauge.update(update);
}
}
}

@VisibleForTesting
static String getFlinkMetricNameString(MetricKey metricKey) {
MetricName metricName = metricKey.metricName();
// We use only the MetricName here, the step name is already contained
// in the operator name which is passed to Flink's MetricGroup to which
// the metric with the following name will be added.
return metricName.getNamespace() + METRIC_KEY_SEPARATOR + metricName.getName();
}

/** Flink {@link Gauge} for {@link DistributionResult}. */
public static class FlinkDistributionGauge implements Gauge<DistributionResult> {

DistributionResult data;

FlinkDistributionGauge(DistributionResult data) {
this.data = data;
}

void update(DistributionResult data) {
this.data = data;
}

@Override
public DistributionResult getValue() {
return data;
}
}

/** Flink {@link Gauge} for {@link GaugeResult}. */
public static class FlinkGauge implements Gauge<Long> {

GaugeResult data;

FlinkGauge(GaugeResult data) {
this.data = data;
}

void update(GaugeResult update) {
this.data = update;
}

@Override
public Long getValue() {
return data.getValue();
}
@Override
protected MetricGroup getMetricGroup() {
return runtimeContext.getMetricGroup();
}
}
Loading

0 comments on commit 6f9c14b

Please sign in to comment.