Skip to content

Commit

Permalink
#20812 handle @RequiresStableInput in portable flink (#22889)
Browse files Browse the repository at this point in the history
* Handle @RequiresStableInput in portable flink (#20812)

* [runners-flink] Remove unnnecessary dependency on flink-annotations

* Fix @RequiresStableInput for portable Flink (#20812)

 Fix FlinkRequiresStableInputTest flakiness (#21333)

* Flink: Tests for stateful stable dofns (#20812)

* Enable commit for kafka flink portable test

* Apply suggestions from code review

Co-authored-by: Lukasz Cwik <[email protected]>

* Add callback to BufferingDoFnRunner for flushing SDK harness results

* revert changes in website

Co-authored-by: Lukasz Cwik <[email protected]>
  • Loading branch information
je-ik and lukecwik authored Jan 23, 2023
1 parent 93ca2fe commit b1aba9d
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 226 deletions.
1 change: 0 additions & 1 deletion runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ dependencies {
implementation project(":sdks:java:fn-execution")
implementation library.java.jackson_databind
runtimeOnly library.java.jackson_jaxb_annotations
implementation "org.apache.flink:flink-annotations:$flink_version"
examplesJavaIntegrationTest project(project.path)
examplesJavaIntegrationTest project(":examples:java")
examplesJavaIntegrationTest project(path: ":examples:java", configuration: "testRuntimeMigration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ public static boolean requiresTimeSortedInput(
return requiresTimeSortedInput;
}

public static boolean requiresStableInput(RunnerApi.ExecutableStagePayload payload) {

return payload.getComponents().getTransformsMap().values().stream()
.filter(t -> t.getSpec().getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN))
.anyMatch(
t -> {
try {
return RunnerApi.ParDoPayload.parseFrom(t.getSpec().getPayload())
.getRequiresStableInput();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
});
}

/** Do not construct. */
private FlinkPortableRunnerUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.utils;

import java.util.concurrent.locks.Lock;

public class Locker implements AutoCloseable {

public static Locker locked(Lock lock) {
Locker locker = new Locker(lock);
lock.lock();
return locker;
}

private final Lock lock;

Locker(Lock lock) {
this.lock = lock;
}

@Override
public void close() {
lock.unlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
Expand Down Expand Up @@ -177,15 +177,15 @@ public class DoFnOperator<InputT, OutputT>

protected final String stepName;

private final Coder<WindowedValue<InputT>> windowedInputCoder;
final Coder<WindowedValue<InputT>> windowedInputCoder;

private final Map<TupleTag<?>, Coder<?>> outputCoders;
final Map<TupleTag<?>, Coder<?>> outputCoders;

protected final Coder<?> keyCoder;
final Coder<?> keyCoder;

final KeySelector<WindowedValue<InputT>, ?> keySelector;

private final TimerInternals.TimerDataCoderV2 timerCoder;
final TimerInternals.TimerDataCoderV2 timerCoder;

/** Max number of elements to include in a bundle. */
private final long maxBundleSize;
Expand All @@ -197,7 +197,9 @@ public class DoFnOperator<InputT, OutputT>
private final Map<String, PCollectionView<?>> sideInputMapping;

/** If true, we must process elements only after a checkpoint is finished. */
private final boolean requiresStableInput;
final boolean requiresStableInput;

final int numConcurrentCheckpoints;

private final boolean usesOnWindowExpiration;

Expand Down Expand Up @@ -301,10 +303,8 @@ public DoFnOperator(
this.doFnSchemaInformation = doFnSchemaInformation;
this.sideInputMapping = sideInputMapping;

this.requiresStableInput =
// WindowDoFnOperator does not use a DoFn
doFn != null
&& DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput();
this.requiresStableInput = isRequiresStableInput(doFn);

this.usesOnWindowExpiration =
doFn != null && DoFnSignatures.getSignature(doFn.getClass()).onWindowExpiration() != null;

Expand All @@ -323,9 +323,22 @@ public DoFnOperator(
+ Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints()));
}

this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints();

this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing();
}

private boolean isRequiresStableInput(DoFn<InputT, OutputT> doFn) {
// WindowDoFnOperator does not use a DoFn
return doFn != null
&& DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput();
}

@VisibleForTesting
boolean getRequiresStableInput() {
return requiresStableInput;
}

// allow overriding this in WindowDoFnOperator because this one dynamically creates
// the DoFn
protected DoFn<InputT, OutputT> getDoFn() {
Expand Down Expand Up @@ -490,21 +503,8 @@ public void open() throws Exception {
doFnSchemaInformation,
sideInputMapping);

if (requiresStableInput) {
// put this in front of the root FnRunner before any additional wrappers
doFnRunner =
bufferingDoFnRunner =
BufferingDoFnRunner.create(
doFnRunner,
"stable-input-buffer",
windowedInputCoder,
windowingStrategy.getWindowFn().windowCoder(),
getOperatorStateBackend(),
getKeyedStateBackend(),
options.getNumConcurrentCheckpoints(),
serializedOptions);
}
doFnRunner = createWrappingDoFnRunner(doFnRunner, stepContext);
doFnRunner =
createBufferingDoFnRunnerIfNeeded(createWrappingDoFnRunner(doFnRunner, stepContext));
earlyBindStateIfNeeded();

if (!options.getDisableMetrics()) {
Expand Down Expand Up @@ -545,6 +545,36 @@ public void open() throws Exception {
pendingFinalizations = new LinkedHashMap<>();
}

DoFnRunner<InputT, OutputT> createBufferingDoFnRunnerIfNeeded(
DoFnRunner<InputT, OutputT> wrappedRunner) throws Exception {

if (requiresStableInput) {
// put this in front of the root FnRunner before any additional wrappers
return this.bufferingDoFnRunner =
BufferingDoFnRunner.create(
wrappedRunner,
"stable-input-buffer",
windowedInputCoder,
windowingStrategy.getWindowFn().windowCoder(),
getOperatorStateBackend(),
getBufferingKeyedStateBackend(),
numConcurrentCheckpoints,
serializedOptions);
}
return wrappedRunner;
}

/**
* Retrieve a keyed state backend that should be used to buffer elements for {@link @{code @}
* RequiresStableInput} functionality. By default this is the default keyed backend, but can be
* override in @{link ExecutableStageDoFnOperator}.
*
* @return the keyed backend to use for element buffering
*/
<K> @Nullable KeyedStateBackend<K> getBufferingKeyedStateBackend() {
return getKeyedStateBackend();
}

private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAccessException {
if (keyCoder != null) {
if (doFn != null) {
Expand Down Expand Up @@ -598,7 +628,9 @@ void flushData() throws Exception {
}
if (currentOutputWatermark < Long.MAX_VALUE) {
throw new RuntimeException(
"There are still watermark holds. Watermark held at " + currentOutputWatermark);
String.format(
"There are still watermark holds left when terminating operator %s Watermark held %d",
getOperatorName(), currentOutputWatermark));
}

// sanity check: these should have been flushed out by +Inf watermarks
Expand All @@ -617,7 +649,12 @@ void flushData() throws Exception {

public long getEffectiveInputWatermark() {
// hold back by the pushed back values waiting for side inputs
return Math.min(pushedBackWatermark, currentInputWatermark);
long combinedPushedBackWatermark = pushedBackWatermark;
if (requiresStableInput) {
combinedPushedBackWatermark =
Math.min(combinedPushedBackWatermark, bufferingDoFnRunner.getOutputWatermarkHold());
}
return Math.min(combinedPushedBackWatermark, currentInputWatermark);
}

public long getCurrentOutputWatermark() {
Expand Down Expand Up @@ -760,8 +797,8 @@ public long applyInputWatermarkHold(long inputWatermark) {
}

/**
* Allows to apply a hold to the output watermark before it is send out. By default, just passes
* the potential output watermark through which will make it the new output watermark.
* Allows to apply a hold to the output watermark before it is sent out. Used to apply hold on
* output watermark for delayed (asynchronous or buffered) processing.
*
* @param currentOutputWatermark the current output watermark
* @param potentialOutputWatermark The potential new output watermark which can be adjusted, if
Expand Down Expand Up @@ -797,7 +834,7 @@ private void maybeEmitWatermark(long watermark) {
return;
}

LOG.debug("Emitting watermark {}", watermark);
LOG.debug("Emitting watermark {} from {}", watermark, getOperatorName());
currentOutputWatermark = watermark;
output.emitWatermark(new Watermark(watermark));

Expand Down Expand Up @@ -902,7 +939,7 @@ protected void scheduleForCurrentProcessingTime(ProcessingTimeCallbackCompat cal
timeService.registerTimer(timeService.getCurrentProcessingTime(), callback);
}

private void updateOutputWatermark() {
void updateOutputWatermark() {
try {
processInputWatermark(false);
} catch (Exception ex) {
Expand Down Expand Up @@ -1005,6 +1042,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// We can now release all buffered data which was held back for
// @RequiresStableInput guarantees.
bufferingDoFnRunner.checkpointCompleted(checkpointId);
updateOutputWatermark();
}

List<InMemoryBundleFinalizer.Finalization> finalizations =
Expand Down
Loading

0 comments on commit b1aba9d

Please sign in to comment.