From 1a27d2cee0c5ab226a50cc2dc53da4ad18cbde12 Mon Sep 17 00:00:00 2001 From: Kanishk Karanawat Date: Sat, 21 Oct 2023 15:20:32 -0700 Subject: [PATCH] flush buffer during drain operation for requiresStableInput operator (#28554) --- .../runners/flink/FlinkPipelineOptions.java | 8 ++ .../wrappers/streaming/DoFnOperator.java | 14 +++ .../wrappers/streaming/DoFnOperatorTest.java | 92 +++++++++++++++++++ .../flink_java_pipeline_options.html | 5 + .../flink_python_pipeline_options.html | 5 + 5 files changed, 124 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 1e01514fe8b6..650768c7b44b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -320,6 +320,14 @@ public interface FlinkPipelineOptions void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB); + @Description( + "Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining," + + "the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.") + @Default.Boolean(false) + Boolean getEnableStableInputDrain(); + + void setEnableStableInputDrain(Boolean enableStableInputDrain); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 726ffb229188..0a9731da8b56 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -199,6 +199,12 @@ public class DoFnOperator /** If true, we must process elements only after a checkpoint is finished. */ final boolean requiresStableInput; + /** + * If both requiresStableInput and this parameter are true, we must flush the buffer during drain + * operation. + */ + final boolean enableStableInputDrain; + final int numConcurrentCheckpoints; private final boolean usesOnWindowExpiration; @@ -323,6 +329,8 @@ public DoFnOperator( + Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints())); } + this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain(); + this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints(); this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing(); @@ -626,6 +634,12 @@ void flushData() throws Exception { while (bundleStarted) { invokeFinishBundle(); } + if (requiresStableInput && enableStableInputDrain) { + // Flush any buffered events here before draining the pipeline. Note that this is best-effort + // and requiresStableInput contract might be violated in cases where buffer processing fails. + bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE); + updateOutputWatermark(); + } if (currentOutputWatermark < Long.MAX_VALUE) { throw new RuntimeException( String.format( diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 722d32b309c4..17cc16cc76e0 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -2015,6 +2015,98 @@ public void finishBundle(FinishBundleContext context) { WindowedValue.valueInGlobalWindow("finishBundle"))); } + @Test + public void testExactlyOnceBufferingFlushDuringDrain() throws Exception { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setMaxBundleSize(2L); + options.setCheckpointingInterval(1L); + options.setEnableStableInputDrain(true); + + TupleTag outputTag = new TupleTag<>("main-output"); + WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = + WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); + + numStartBundleCalled = 0; + DoFn doFn = + new DoFn() { + @StartBundle + public void startBundle(StartBundleContext context) { + numStartBundleCalled += 1; + } + + @ProcessElement + // Use RequiresStableInput to force buffering elements + @RequiresStableInput + public void processElement(ProcessContext context) { + context.output(context.element()); + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) { + context.output( + "finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); + } + }; + + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory<>( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), + new SerializablePipelineOptions(options)); + + Supplier> doFnOperatorSupplier = + () -> + new DoFnOperator<>( + doFn, + "stepName", + windowedValueCoder, + Collections.emptyMap(), + outputTag, + Collections.emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + options, + null, + null, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + DoFnOperator doFnOperator = doFnOperatorSupplier.get(); + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("a"))); + testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("b"))); + + assertThat(Iterables.size(testHarness.getOutput()), is(0)); + assertThat(numStartBundleCalled, is(0)); + + // Simulate pipeline drain scenario + OperatorSubtaskState backup = testHarness.snapshot(0, 0); + doFnOperator.flushData(); + + assertThat(numStartBundleCalled, is(1)); + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + + doFnOperator = doFnOperatorSupplier.get(); + testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); + testHarness.open(); + + doFnOperator.notifyCheckpointComplete(0L); + + assertThat(numStartBundleCalled, is(1)); + assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable()); + } + @Test public void testExactlyOnceBufferingKeyed() throws Exception { FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index ba8b597aaeeb..87d69ee60fe3 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -52,6 +52,11 @@ Disable Beam metrics in Flink Runner Default: false + + enableStableInputDrain + Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator. + Default: false + executionModeForBatch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672 diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index 5293f35e6a1e..27ae27ad05a3 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -52,6 +52,11 @@ Disable Beam metrics in Flink Runner Default: false + + enable_stable_input_drain + Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator. + Default: false + execution_mode_for_batch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672