Skip to content

Commit

Permalink
Fix @RequiresStableInput for portable Flink (apache#20812)
Browse files Browse the repository at this point in the history
 Fix FlinkRequiresStableInputTest flakiness (apache#21333)
  • Loading branch information
je-ik committed Jan 3, 2023
1 parent a438567 commit 80a0a5e
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ public class DoFnOperator<InputT, OutputT>
/** If true, we must process elements only after a checkpoint is finished. */
private final boolean requiresStableInput;

private final int numConcurrentCheckpoints;

private final boolean usesOnWindowExpiration;

private final boolean finishBundleBeforeCheckpointing;
Expand Down Expand Up @@ -321,6 +323,8 @@ public DoFnOperator(
+ Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints()));
}

this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints();

this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing();
}

Expand Down Expand Up @@ -499,21 +503,8 @@ public void open() throws Exception {
doFnSchemaInformation,
sideInputMapping);

doFnRunner = createWrappingDoFnRunner(doFnRunner, stepContext);
if (requiresStableInput) {
// put this in front of the root FnRunner before any additional wrappers
doFnRunner =
bufferingDoFnRunner =
BufferingDoFnRunner.create(
doFnRunner,
"stable-input-buffer",
windowedInputCoder,
windowingStrategy.getWindowFn().windowCoder(),
getOperatorStateBackend(),
getKeyedStateBackend(),
options.getNumConcurrentCheckpoints(),
serializedOptions);
}
doFnRunner =
createBufferingDoFnRunnerIfNeeded(createWrappingDoFnRunner(doFnRunner, stepContext));
earlyBindStateIfNeeded();

if (!options.getDisableMetrics()) {
Expand Down Expand Up @@ -554,6 +545,36 @@ public void open() throws Exception {
pendingFinalizations = new LinkedHashMap<>();
}

private DoFnRunner<InputT, OutputT> createBufferingDoFnRunnerIfNeeded(
DoFnRunner<InputT, OutputT> wrappedRunner) throws Exception {

if (requiresStableInput) {
// put this in front of the root FnRunner before any additional wrappers
return this.bufferingDoFnRunner =
BufferingDoFnRunner.create(
wrappedRunner,
"stable-input-buffer",
windowedInputCoder,
windowingStrategy.getWindowFn().windowCoder(),
getOperatorStateBackend(),
getBufferingKeyedStateBackend(),
numConcurrentCheckpoints,
serializedOptions);
}
return wrappedRunner;
}

/**
* Retrieve a keyed state backend that should be used to buffer elements for {@link @{code @}
* RequiresStableInput} functionality. By default this is the default keyed backend, but can be
* override in @{link ExecutableStageDoFnOperator}.
*
* @return the keyed backend to use for element buffering
*/
<K> @Nullable KeyedStateBackend<K> getBufferingKeyedStateBackend() {
return getKeyedStateBackend();
}

private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAccessException {
if (keyCoder != null) {
if (doFn != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ public ExecutableStageDoFnOperator(
windowedInputCoder);
}

@Override
<K> @Nullable KeyedStateBackend<K> getBufferingKeyedStateBackend() {
// do not use keyed backend for buffering if we do not process stateful DoFn
// ExecutableStage uses keyed backend by default
return isStateful ? super.getKeyedStateBackend() : null;
}

@Override
protected Lock getLockToAcquireForStateAccessDuringBundles() {
return stateBackendLock;
Expand Down Expand Up @@ -1058,7 +1065,7 @@ public DoFn<InputT, OutputT> getFn() {
}

private DoFnRunner<InputT, OutputT> ensureStateDoFnRunner(
SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner,
DoFnRunner<InputT, OutputT> sdkHarnessRunner,
RunnerApi.ExecutableStagePayload payload,
StepContext stepContext) {

Expand Down Expand Up @@ -1106,7 +1113,9 @@ public void processElement(WindowedValue<InputT> input) {
@SuppressWarnings({"unchecked", "rawtypes"})
final ByteBuffer key =
FlinkKeyUtils.encodeKey(((KV) input.getValue()).getKey(), (Coder) keyCoder);
getKeyedStateBackend().setCurrentKey(key);
if (getKeyedStateBackend() != null) {
getKeyedStateBackend().setCurrentKey(key);
}
super.processElement(input);
}
}
Expand Down
Loading

0 comments on commit 80a0a5e

Please sign in to comment.