diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index 7aa4ba784cc7..3f4d9617bdd2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -21,7 +21,6 @@ import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.metrics.MetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +40,7 @@ public class FlinkMetricContainer extends FlinkMetricContainerBase { private final RuntimeContext runtimeContext; public FlinkMetricContainer(RuntimeContext runtimeContext) { + super(runtimeContext.getMetricGroup()); this.runtimeContext = runtimeContext; } @@ -65,8 +65,4 @@ public void registerMetricsForPipelineResult() { metricsAccumulator.add(metricsContainers); } - @Override - protected MetricGroup getMetricGroup() { - return runtimeContext.getMetricGroup(); - } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java index 3a430bd6e368..a4a363689329 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java @@ -56,15 +56,19 @@ abstract class FlinkMetricContainerBase { private final Map flinkCounterCache; private final Map flinkDistributionGaugeCache; private final Map flinkGaugeCache; + private final MetricGroup metricGroup; - public FlinkMetricContainerBase() { + public FlinkMetricContainerBase(MetricGroup metricGroup) { this.flinkCounterCache = new HashMap<>(); this.flinkDistributionGaugeCache = new HashMap<>(); this.flinkGaugeCache = new HashMap<>(); this.metricsContainers = new MetricsContainerStepMap(); + this.metricGroup = metricGroup; } - protected abstract MetricGroup getMetricGroup(); + public MetricGroup getMetricGroup() { + return metricGroup; + } public MetricsContainerImpl getMetricsContainer(String stepName) { return metricsContainers.getContainer(stepName); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerWithoutAccumulator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerWithoutAccumulator.java index 00b1ea052e50..88d52273108a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerWithoutAccumulator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerWithoutAccumulator.java @@ -28,14 +28,7 @@ * @see FlinkMetricContainer */ public class FlinkMetricContainerWithoutAccumulator extends FlinkMetricContainerBase { - private final MetricGroup metricGroup; - public FlinkMetricContainerWithoutAccumulator(MetricGroup metricGroup) { - this.metricGroup = metricGroup; - } - - @Override - protected MetricGroup getMetricGroup() { - return metricGroup; + super(metricGroup); } }