From 7680207a473f30c3cbbed35db4d93a765309544d Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Wed, 15 Jan 2025 15:46:06 +0100 Subject: [PATCH] [flink] FlinkRunner initializes the same split twice (#31313) --- .../streaming/io/source/FlinkSource.java | 16 +---- .../io/source/FlinkSourceSplitEnumerator.java | 71 +++++++++++-------- 2 files changed, 43 insertions(+), 44 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java index 506b651da68f..d11141b0b4b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java @@ -29,8 +29,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -73,18 +71,6 @@ public static FlinkUnboundedSource unbounded( return new FlinkUnboundedSource<>(stepName, source, serializablePipelineOptions, numSplits); } - public static FlinkUnboundedSource unboundedImpulse(long shutdownSourceAfterIdleMs) { - FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults(); - flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs); - return new FlinkUnboundedSource<>( - "Impulse", - new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>( - new BeamImpulseSource()), - new SerializablePipelineOptions(flinkPipelineOptions), - 1, - record -> BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); - } - public static FlinkBoundedSource boundedImpulse() { return new FlinkBoundedSource<>( "Impulse", @@ -130,7 +116,7 @@ public Boundedness getBoundedness() { throws Exception { FlinkSourceSplitEnumerator enumerator = new FlinkSourceSplitEnumerator<>( - enumContext, beamSource, serializablePipelineOptions.get(), numSplits); + enumContext, beamSource, serializablePipelineOptions.get(), numSplits, true); checkpoint.forEach( (subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId)); return enumerator; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java index be2e8ad9ad77..9dc295854dc5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java @@ -63,45 +63,58 @@ public FlinkSourceSplitEnumerator( Source beamSource, PipelineOptions pipelineOptions, int numSplits) { + + this(context, beamSource, pipelineOptions, numSplits, false); + } + + public FlinkSourceSplitEnumerator( + SplitEnumeratorContext> context, + Source beamSource, + PipelineOptions pipelineOptions, + int numSplits, + boolean splitsInitialized) { + this.context = context; this.beamSource = beamSource; this.pipelineOptions = pipelineOptions; this.numSplits = numSplits; this.pendingSplits = new HashMap<>(numSplits); - this.splitsInitialized = false; + this.splitsInitialized = splitsInitialized; } @Override public void start() { - context.callAsync( - () -> { - try { - LOG.info("Starting source {}", beamSource); - List> beamSplitSourceList = splitBeamSource(); - Map>> flinkSourceSplitsList = new HashMap<>(); - int i = 0; - for (Source beamSplitSource : beamSplitSourceList) { - int targetSubtask = i % context.currentParallelism(); - List> splitsForTask = - flinkSourceSplitsList.computeIfAbsent( - targetSubtask, ignored -> new ArrayList<>()); - splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource)); - i++; + if (!splitsInitialized) { + context.callAsync( + () -> { + try { + LOG.info("Starting source {}", beamSource); + List> beamSplitSourceList = splitBeamSource(); + Map>> flinkSourceSplitsList = new HashMap<>(); + int i = 0; + for (Source beamSplitSource : beamSplitSourceList) { + int targetSubtask = i % context.currentParallelism(); + List> splitsForTask = + flinkSourceSplitsList.computeIfAbsent( + targetSubtask, ignored -> new ArrayList<>()); + splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource)); + i++; + } + return flinkSourceSplitsList; + } catch (Exception e) { + throw new RuntimeException(e); } - return flinkSourceSplitsList; - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - (sourceSplits, error) -> { - if (error != null) { - throw new RuntimeException("Failed to start source enumerator.", error); - } else { - pendingSplits.putAll(sourceSplits); - splitsInitialized = true; - sendPendingSplitsToSourceReaders(); - } - }); + }, + (sourceSplits, error) -> { + if (error != null) { + throw new RuntimeException("Failed to start source enumerator.", error); + } else { + pendingSplits.putAll(sourceSplits); + splitsInitialized = true; + sendPendingSplitsToSourceReaders(); + } + }); + } } @Override