diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java index 62b16eedca0b..8cda1341fd22 100644 --- a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java +++ b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java @@ -34,6 +34,7 @@ public class SourceTestCompat { public static class TestMetricGroup extends UnregisteredMetricsGroup implements SourceReaderMetricGroup { public final Map> registeredGauge = new HashMap<>(); + public final Map registeredCounter = new HashMap<>(); public final Counter numRecordsInCounter = new SimpleCounter(); @Override @@ -52,6 +53,18 @@ public > GaugeT gauge(String name, GaugeT gauge) { return gauge; } + @Override + public Counter counter(String name) { + // The OperatorIOMetricsGroup will register some IO metrics in the constructor. + // At that time, the construction of this class has not finihsed yet, so we + // need to delegate the call to the parent class. + if (registeredCounter != null) { + return registeredCounter.computeIfAbsent(name, ignored -> super.counter(name)); + } else { + return super.counter(name); + } + } + @Override public Counter getNumRecordsInErrorsCounter() { return new SimpleCounter(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 885571a7ee77..d892049bce4b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -18,11 +18,11 @@ package org.apache.beam.runners.flink; import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; +import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.ACCUMULATOR_NAME; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.joda.time.Duration; @@ -80,6 +80,6 @@ public MetricResults metrics() { } MetricsContainerStepMap getMetricsContainerStepMap() { - return (MetricsContainerStepMap) accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME); + return (MetricsContainerStepMap) accumulators.get(ACCUMULATOR_NAME); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index e389be5f1537..7aa4ba784cc7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -17,29 +17,11 @@ */ package org.apache.beam.runners.flink.metrics; -import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsFilter; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.MetricOptions; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,31 +34,14 @@ * which have a defined end. They are not essential during execution because metrics will also be * reported using the configured metrics reporter. */ -public class FlinkMetricContainer { - +public class FlinkMetricContainer extends FlinkMetricContainerBase { public static final String ACCUMULATOR_NAME = "__metricscontainers"; - private static final Logger LOG = LoggerFactory.getLogger(FlinkMetricContainer.class); - private static final String METRIC_KEY_SEPARATOR = - GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER); - - private final MetricsContainerStepMap metricsContainers; private final RuntimeContext runtimeContext; - private final Map flinkCounterCache; - private final Map flinkDistributionGaugeCache; - private final Map flinkGaugeCache; public FlinkMetricContainer(RuntimeContext runtimeContext) { this.runtimeContext = runtimeContext; - this.flinkCounterCache = new HashMap<>(); - this.flinkDistributionGaugeCache = new HashMap<>(); - this.flinkGaugeCache = new HashMap<>(); - this.metricsContainers = new MetricsContainerStepMap(); - } - - public MetricsContainerImpl getMetricsContainer(String stepName) { - return metricsContainers.getContainer(stepName); } /** @@ -100,124 +65,8 @@ public void registerMetricsForPipelineResult() { metricsAccumulator.add(metricsContainers); } - /** - * Update this container with metrics from the passed {@link MonitoringInfo}s, and send updates - * along to Flink's internal metrics framework. - */ - public void updateMetrics(String stepName, List monitoringInfos) { - getMetricsContainer(stepName).update(monitoringInfos); - updateMetrics(stepName); - } - - /** - * Update Flink's internal metrics ({@link this#flinkCounterCache}) with the latest metrics for a - * given step. - */ - void updateMetrics(String stepName) { - MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers); - MetricQueryResults metricQueryResults = - metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build()); - updateCounters(metricQueryResults.getCounters()); - updateDistributions(metricQueryResults.getDistributions()); - updateGauge(metricQueryResults.getGauges()); - } - - private void updateCounters(Iterable> counters) { - for (MetricResult metricResult : counters) { - String flinkMetricName = getFlinkMetricNameString(metricResult.getKey()); - - Long update = metricResult.getAttempted(); - - // update flink metric - Counter counter = - flinkCounterCache.computeIfAbsent( - flinkMetricName, n -> runtimeContext.getMetricGroup().counter(n)); - // Beam counters are already pre-aggregated, just update with the current value here - counter.inc(update - counter.getCount()); - } - } - - private void updateDistributions(Iterable> distributions) { - for (MetricResult metricResult : distributions) { - String flinkMetricName = getFlinkMetricNameString(metricResult.getKey()); - - DistributionResult update = metricResult.getAttempted(); - - // update flink metric - FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName); - if (gauge == null) { - gauge = - runtimeContext - .getMetricGroup() - .gauge(flinkMetricName, new FlinkDistributionGauge(update)); - flinkDistributionGaugeCache.put(flinkMetricName, gauge); - } else { - gauge.update(update); - } - } - } - - private void updateGauge(Iterable> gauges) { - for (MetricResult metricResult : gauges) { - String flinkMetricName = getFlinkMetricNameString(metricResult.getKey()); - - GaugeResult update = metricResult.getAttempted(); - - // update flink metric - FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName); - if (gauge == null) { - gauge = runtimeContext.getMetricGroup().gauge(flinkMetricName, new FlinkGauge(update)); - flinkGaugeCache.put(flinkMetricName, gauge); - } else { - gauge.update(update); - } - } - } - - @VisibleForTesting - static String getFlinkMetricNameString(MetricKey metricKey) { - MetricName metricName = metricKey.metricName(); - // We use only the MetricName here, the step name is already contained - // in the operator name which is passed to Flink's MetricGroup to which - // the metric with the following name will be added. - return metricName.getNamespace() + METRIC_KEY_SEPARATOR + metricName.getName(); - } - - /** Flink {@link Gauge} for {@link DistributionResult}. */ - public static class FlinkDistributionGauge implements Gauge { - - DistributionResult data; - - FlinkDistributionGauge(DistributionResult data) { - this.data = data; - } - - void update(DistributionResult data) { - this.data = data; - } - - @Override - public DistributionResult getValue() { - return data; - } - } - - /** Flink {@link Gauge} for {@link GaugeResult}. */ - public static class FlinkGauge implements Gauge { - - GaugeResult data; - - FlinkGauge(GaugeResult data) { - this.data = data; - } - - void update(GaugeResult update) { - this.data = update; - } - - @Override - public Long getValue() { - return data.getValue(); - } + @Override + protected MetricGroup getMetricGroup() { + return runtimeContext.getMetricGroup(); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java new file mode 100644 index 000000000000..3a430bd6e368 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.MetricsApi; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +/** + * The base helper class for holding a {@link MetricsContainerImpl} and forwarding Beam metrics to + * Flink accumulators and metrics. The two subclasses of this base class are {@link + * FlinkMetricContainer} and {@link FlinkMetricContainerWithoutAccumulator}. The former is used when + * {@link org.apache.flink.api.common.functions.RuntimeContext Flink RuntimeContext} is available. + * The latter is used otherwise. + */ +abstract class FlinkMetricContainerBase { + + private static final String METRIC_KEY_SEPARATOR = + GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER); + + protected final MetricsContainerStepMap metricsContainers; + private final Map flinkCounterCache; + private final Map flinkDistributionGaugeCache; + private final Map flinkGaugeCache; + + public FlinkMetricContainerBase() { + this.flinkCounterCache = new HashMap<>(); + this.flinkDistributionGaugeCache = new HashMap<>(); + this.flinkGaugeCache = new HashMap<>(); + this.metricsContainers = new MetricsContainerStepMap(); + } + + protected abstract MetricGroup getMetricGroup(); + + public MetricsContainerImpl getMetricsContainer(String stepName) { + return metricsContainers.getContainer(stepName); + } + + /** + * Update this container with metrics from the passed {@link MetricsApi.MonitoringInfo}s, and send + * updates along to Flink's internal metrics framework. + */ + public void updateMetrics(String stepName, List monitoringInfos) { + getMetricsContainer(stepName).update(monitoringInfos); + updateMetrics(stepName); + } + + /** + * Update Flink's internal metrics ({@link this#flinkCounterCache}) with the latest metrics for a + * given step. + */ + void updateMetrics(String stepName) { + MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers); + MetricQueryResults metricQueryResults = + metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build()); + updateCounters(metricQueryResults.getCounters()); + updateDistributions(metricQueryResults.getDistributions()); + updateGauge(metricQueryResults.getGauges()); + } + + private void updateCounters(Iterable> counters) { + for (MetricResult metricResult : counters) { + String flinkMetricName = getFlinkMetricNameString(metricResult.getKey()); + + Long update = metricResult.getAttempted(); + + // update flink metric + Counter counter = + flinkCounterCache.computeIfAbsent(flinkMetricName, n -> getMetricGroup().counter(n)); + // Beam counters are already pre-aggregated, just update with the current value here + counter.inc(update - counter.getCount()); + } + } + + private void updateDistributions(Iterable> distributions) { + for (MetricResult metricResult : distributions) { + String flinkMetricName = getFlinkMetricNameString(metricResult.getKey()); + + DistributionResult update = metricResult.getAttempted(); + + // update flink metric + FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName); + if (gauge == null) { + gauge = getMetricGroup().gauge(flinkMetricName, new FlinkDistributionGauge(update)); + flinkDistributionGaugeCache.put(flinkMetricName, gauge); + } else { + gauge.update(update); + } + } + } + + private void updateGauge(Iterable> gauges) { + for (MetricResult metricResult : gauges) { + String flinkMetricName = getFlinkMetricNameString(metricResult.getKey()); + + GaugeResult update = metricResult.getAttempted(); + + // update flink metric + FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName); + if (gauge == null) { + gauge = getMetricGroup().gauge(flinkMetricName, new FlinkGauge(update)); + flinkGaugeCache.put(flinkMetricName, gauge); + } else { + gauge.update(update); + } + } + } + + @VisibleForTesting + static String getFlinkMetricNameString(MetricKey metricKey) { + MetricName metricName = metricKey.metricName(); + // We use only the MetricName here, the step name is already contained + // in the operator name which is passed to Flink's MetricGroup to which + // the metric with the following name will be added. + return metricName.getNamespace() + METRIC_KEY_SEPARATOR + metricName.getName(); + } + + /** Flink {@link Gauge} for {@link DistributionResult}. */ + public static class FlinkDistributionGauge implements Gauge { + + DistributionResult data; + + FlinkDistributionGauge(DistributionResult data) { + this.data = data; + } + + void update(DistributionResult data) { + this.data = data; + } + + @Override + public DistributionResult getValue() { + return data; + } + } + + /** Flink {@link Gauge} for {@link GaugeResult}. */ + public static class FlinkGauge implements Gauge { + + GaugeResult data; + + FlinkGauge(GaugeResult data) { + this.data = data; + } + + void update(GaugeResult update) { + this.data = update; + } + + @Override + public Long getValue() { + return data.getValue(); + } + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerWithoutAccumulator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerWithoutAccumulator.java new file mode 100644 index 000000000000..00b1ea052e50 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerWithoutAccumulator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import org.apache.flink.metrics.MetricGroup; + +/** + * The base helper class for holding a {@link + * org.apache.beam.runners.core.metrics.MetricsContainerImpl MetricsContainerImpl} and forwarding + * Beam metrics to Flink accumulators and metrics. This class is used when {@link + * org.apache.flink.api.common.functions.RuntimeContext Flink RuntimeContext} is not available. + * + * @see FlinkMetricContainer + */ +public class FlinkMetricContainerWithoutAccumulator extends FlinkMetricContainerBase { + private final MetricGroup metricGroup; + + public FlinkMetricContainerWithoutAccumulator(MetricGroup metricGroup) { + this.metricGroup = metricGroup; + } + + @Override + protected MetricGroup getMetricGroup() { + return metricGroup; + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index 736a2dd9da59..60b84e63263f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -33,11 +33,11 @@ public class ReaderInvocationUtil> { private final String stepName; - private final FlinkMetricContainer container; + private final FlinkMetricContainerBase container; private final Boolean enableMetrics; public ReaderInvocationUtil( - String stepName, PipelineOptions options, FlinkMetricContainer container) { + String stepName, PipelineOptions options, FlinkMetricContainerBase container) { FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class); this.stepName = stepName; this.enableMetrics = !flinkPipelineOptions.getDisableMetrics(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java index c001b263340c..0b9fdd9dcd7c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java @@ -44,6 +44,8 @@ */ public abstract class FlinkSource implements Source, Map>>> { + + protected final String stepName; protected final org.apache.beam.sdk.io.Source beamSource; protected final Boundedness boundedness; protected final SerializablePipelineOptions serializablePipelineOptions; @@ -53,18 +55,20 @@ public abstract class FlinkSource // ----------------- public static methods to construct sources -------------------- public static FlinkBoundedSource bounded( + String stepName, BoundedSource boundedSource, SerializablePipelineOptions serializablePipelineOptions, int numSplits) { return new FlinkBoundedSource<>( - boundedSource, serializablePipelineOptions, Boundedness.BOUNDED, numSplits); + stepName, boundedSource, serializablePipelineOptions, Boundedness.BOUNDED, numSplits); } public static FlinkUnboundedSource unbounded( + String stepName, UnboundedSource source, SerializablePipelineOptions serializablePipelineOptions, int numSplits) { - return new FlinkUnboundedSource<>(source, serializablePipelineOptions, numSplits); + return new FlinkUnboundedSource<>(stepName, source, serializablePipelineOptions, numSplits); } public static FlinkBoundedSource unboundedImpulse(long shutdownSourceAfterIdleMs) { @@ -77,6 +81,7 @@ public static FlinkBoundedSource unboundedImpulse(long shutdownSourceAft // BeamImpulseSource will be discarded after the impulse emission. So the streaming // job won't see another impulse after failover. return new FlinkBoundedSource<>( + "Impulse", new BeamImpulseSource(), new SerializablePipelineOptions(flinkPipelineOptions), Boundedness.CONTINUOUS_UNBOUNDED, @@ -86,6 +91,7 @@ record -> Watermark.MAX_WATERMARK.getTimestamp()); public static FlinkBoundedSource boundedImpulse() { return new FlinkBoundedSource<>( + "Impulse", new BeamImpulseSource(), new SerializablePipelineOptions(FlinkPipelineOptions.defaults()), Boundedness.BOUNDED, @@ -96,10 +102,12 @@ record -> Watermark.MAX_WATERMARK.getTimestamp()); // ------ Common implementations for both bounded and unbounded source --------- protected FlinkSource( + String stepName, org.apache.beam.sdk.io.Source beamSource, SerializablePipelineOptions serializablePipelineOptions, Boundedness boundedness, int numSplits) { + this.stepName = stepName; this.beamSource = beamSource; this.serializablePipelineOptions = serializablePipelineOptions; this.boundedness = boundedness; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index 689c7d831333..d4b3005c86c9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -39,6 +39,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator; +import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -87,6 +89,7 @@ public abstract class FlinkSourceReaderBase protected final SourceReaderContext context; private final ScheduledExecutorService executor; + protected final ReaderInvocationUtil> invocationUtil; protected final Counter numRecordsInCounter; protected final long idleTimeoutMs; private final CompletableFuture idleTimeoutFuture; @@ -96,10 +99,12 @@ public abstract class FlinkSourceReaderBase private boolean noMoreSplits; protected FlinkSourceReaderBase( + String stepName, SourceReaderContext context, PipelineOptions pipelineOptions, @Nullable Function timestampExtractor) { this( + stepName, Executors.newSingleThreadScheduledExecutor( r -> new Thread(r, "FlinkSource-Executor-Thread-" + context.getIndexOfSubtask())), context, @@ -108,6 +113,7 @@ protected FlinkSourceReaderBase( } protected FlinkSourceReaderBase( + String stepName, ScheduledExecutorService executor, SourceReaderContext context, PipelineOptions pipelineOptions, @@ -124,6 +130,9 @@ protected FlinkSourceReaderBase( this.waitingForSplitChangeFuture = new CompletableFuture<>(); this.idleTimeoutCountingDown = false; this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context); + FlinkMetricContainerWithoutAccumulator metricsContainer = + new FlinkMetricContainerWithoutAccumulator(context.metricGroup()); + this.invocationUtil = new ReaderInvocationUtil<>(stepName, pipelineOptions, metricsContainer); } @Override @@ -366,10 +375,10 @@ public SourceOutput getAndMaybeCreateSplitOutput(ReaderOutput public boolean startOrAdvance() throws IOException { if (started) { - return reader.advance(); + return invocationUtil.invokeAdvance(reader); } else { started = true; - return reader.start(); + return invocationUtil.invokeStart(reader); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java index c2bd904dcc60..ab9a6cc03cd5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java @@ -41,20 +41,22 @@ public class FlinkBoundedSource extends FlinkSource> { protected final @Nullable TimestampExtractor> timestampExtractor; public FlinkBoundedSource( + String stepName, BoundedSource beamSource, SerializablePipelineOptions serializablePipelineOptions, Boundedness boundedness, int numSplits) { - this(beamSource, serializablePipelineOptions, boundedness, numSplits, null); + this(stepName, beamSource, serializablePipelineOptions, boundedness, numSplits, null); } public FlinkBoundedSource( + String stepName, BoundedSource beamSource, SerializablePipelineOptions serializablePipelineOptions, Boundedness boundedness, int numSplits, @Nullable TimestampExtractor> timestampExtractor) { - super(beamSource, serializablePipelineOptions, boundedness, numSplits); + super(stepName, beamSource, serializablePipelineOptions, boundedness, numSplits); this.timestampExtractor = timestampExtractor; } @@ -62,6 +64,6 @@ public FlinkBoundedSource( public SourceReader, FlinkSourceSplit> createReader( SourceReaderContext readerContext) throws Exception { return new FlinkBoundedSourceReader<>( - readerContext, serializablePipelineOptions.get(), timestampExtractor); + stepName, readerContext, serializablePipelineOptions.get(), timestampExtractor); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java index 9cea73f6a4a3..86d2d45f5eac 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java @@ -54,20 +54,22 @@ public class FlinkBoundedSourceReader extends FlinkSourceReaderBase, Long> timestampExtractor) { - super(context, pipelineOptions, timestampExtractor); + super(stepName, context, pipelineOptions, timestampExtractor); currentSplitId = -1; } @VisibleForTesting protected FlinkBoundedSourceReader( + String stepName, SourceReaderContext context, PipelineOptions pipelineOptions, ScheduledExecutorService executor, @Nullable Function, Long> timestampExtractor) { - super(executor, context, pipelineOptions, timestampExtractor); + super(stepName, executor, context, pipelineOptions, timestampExtractor); currentSplitId = -1; } @@ -105,7 +107,7 @@ public InputStatus pollNext(ReaderOutput> output) throws Except // If the advance() invocation throws exception here, the job will just fail over and read // everything again from // the beginning. So the failover granularity is the entire Flink job. - if (!tempCurrentReader.advance()) { + if (!invocationUtil.invokeAdvance(tempCurrentReader)) { finishSplit(currentSplitId); currentReader = null; currentSplitId = -1; @@ -133,7 +135,7 @@ private boolean moveToNextNonEmptyReader() throws IOException { Optional readerAndOutput; while ((readerAndOutput = createAndTrackNextReader()).isPresent()) { ReaderAndOutput rao = readerAndOutput.get(); - if (rao.reader.start()) { + if (invocationUtil.invokeStart(rao.reader)) { currentSplitId = Integer.parseInt(rao.splitId); currentReader = rao.reader; return true; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java index b40492201700..8ef2edfa606e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java @@ -40,18 +40,25 @@ public class FlinkUnboundedSource extends FlinkSource beamSource, SerializablePipelineOptions serializablePipelineOptions, int numSplits) { - this(beamSource, serializablePipelineOptions, numSplits, null); + this(stepName, beamSource, serializablePipelineOptions, numSplits, null); } public FlinkUnboundedSource( + String stepName, UnboundedSource beamSource, SerializablePipelineOptions serializablePipelineOptions, int numSplits, @Nullable TimestampExtractor>> timestampExtractor) { - super(beamSource, serializablePipelineOptions, Boundedness.CONTINUOUS_UNBOUNDED, numSplits); + super( + stepName, + beamSource, + serializablePipelineOptions, + Boundedness.CONTINUOUS_UNBOUNDED, + numSplits); this.timestampExtractor = timestampExtractor; } @@ -59,6 +66,6 @@ public FlinkUnboundedSource( public SourceReader>, FlinkSourceSplit> createReader( SourceReaderContext readerContext) throws Exception { return new FlinkUnboundedSourceReader<>( - readerContext, serializablePipelineOptions.get(), timestampExtractor); + stepName, readerContext, serializablePipelineOptions.get(), timestampExtractor); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index 3c596360efd7..4faaaac95804 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -69,10 +69,11 @@ public class FlinkUnboundedSourceReader private volatile boolean shouldEmitWatermark; public FlinkUnboundedSourceReader( + String stepName, SourceReaderContext context, PipelineOptions pipelineOptions, @Nullable Function>, Long> timestampExtractor) { - super(context, pipelineOptions, timestampExtractor); + super(stepName, context, pipelineOptions, timestampExtractor); this.readers = new ArrayList<>(); this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE); this.currentReaderIndex = 0; @@ -80,11 +81,12 @@ public FlinkUnboundedSourceReader( @VisibleForTesting protected FlinkUnboundedSourceReader( + String stepName, SourceReaderContext context, PipelineOptions pipelineOptions, ScheduledExecutorService executor, @Nullable Function>, Long> timestampExtractor) { - super(executor, context, pipelineOptions, timestampExtractor); + super(stepName, executor, context, pipelineOptions, timestampExtractor); this.readers = new ArrayList<>(); this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE); this.currentReaderIndex = 0; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java index d0efaa64332b..930ad88b0dcb 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java @@ -37,7 +37,7 @@ import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName; import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder; -import org.apache.beam.runners.flink.metrics.FlinkMetricContainer.FlinkDistributionGauge; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainerBase.FlinkDistributionGauge; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.DistributionResult; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java index 5c54ce4c44e1..3af9062ba9b4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java @@ -29,6 +29,8 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; @@ -198,6 +200,10 @@ public boolean requiresDeduping() { */ public class CountingSourceReader extends UnboundedReader> implements TestReader { + public static final String ADVANCE_COUNTER_NAMESPACE = "testNameSpace"; + public static final String ADVANCE_COUNTER_NAME = "advanceCounter"; + private final Counter advanceCounter = + Metrics.counter(ADVANCE_COUNTER_NAMESPACE, ADVANCE_COUNTER_NAME); private int current; private boolean closed; @@ -213,6 +219,7 @@ public boolean start() { @Override public boolean advance() { + advanceCounter.inc(); if (current >= numMessagesPerShard - 1 || haltEmission) { return false; } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java index dcab3aff0f5b..462a1ba0153d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.when; @@ -32,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import javax.annotation.Nullable; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.values.KV; import org.apache.flink.api.common.eventtime.Watermark; @@ -40,6 +42,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.apache.flink.metrics.Counter; import org.junit.Test; import org.mockito.Mockito; @@ -194,6 +197,31 @@ public void testNumBytesInMetrics() throws Exception { assertEquals(numRecordsPerSplit * numSplits, testMetricGroup.numRecordsInCounter.getCount()); } + @Test + public void testMetricsContainer() throws Exception { + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup(); + try (SourceReader>> reader = + createReader(executor, 0L, null, testMetricGroup)) { + reader.start(); + + List>> splits = createSplits(2, 10, 0); + reader.addSplits(splits); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + + // Need to poll once to create all the readers. + reader.pollNext(validatingOutput); + Counter advanceCounter = + testMetricGroup.registeredCounter.get( + TestCountingSource.CountingSourceReader.ADVANCE_COUNTER_NAMESPACE + + "." + + TestCountingSource.CountingSourceReader.ADVANCE_COUNTER_NAME); + assertNotNull(advanceCounter); + assertTrue("The reader should have advanced.", advanceCounter.getCount() > 0); + } + } + // --------------- abstract methods --------------- protected abstract KV getKVPairs(OutputT record); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java index 6303a729652a..84cb2a72ddaf 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java @@ -138,9 +138,10 @@ protected FlinkBoundedSourceReader> createReader( SourceReaderContext mockContext = createSourceReaderContext(testMetricGroup); if (executor != null) { return new FlinkBoundedSourceReader<>( - mockContext, pipelineOptions, executor, timestampExtractor); + "FlinkBoundedSource", mockContext, pipelineOptions, executor, timestampExtractor); } else { - return new FlinkBoundedSourceReader<>(mockContext, pipelineOptions, timestampExtractor); + return new FlinkBoundedSourceReader<>( + "FlinkBoundedSource", mockContext, pipelineOptions, timestampExtractor); } } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index f420bd8900ff..b7cba373cf75 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -303,9 +303,10 @@ protected FlinkUnboundedSourceReader> createReader( SourceReaderContext mockContext = createSourceReaderContext(metricGroup); if (executor != null) { return new FlinkUnboundedSourceReader<>( - mockContext, pipelineOptions, executor, timestampExtractor); + "FlinkUnboundedReader", mockContext, pipelineOptions, executor, timestampExtractor); } else { - return new FlinkUnboundedSourceReader<>(mockContext, pipelineOptions, timestampExtractor); + return new FlinkUnboundedSourceReader<>( + "FlinkUnboundedReader", mockContext, pipelineOptions, timestampExtractor); } }