From 230c099139655ba45decc808ecfac00bf4c87ebf Mon Sep 17 00:00:00 2001 From: jto Date: Thu, 4 Apr 2024 12:25:18 +0200 Subject: [PATCH] [Flink] finalize checkpoint marks in the new Flink source implementation (#30849) --- .../streaming/io/source/FlinkSourceSplit.java | 3 ++ .../unbounded/FlinkUnboundedSourceReader.java | 27 ++++++----- .../FlinkUnboundedSourceReaderTest.java | 45 ++++++++++++++----- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java index 9831ff9ee19f..43be493c83e8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java @@ -37,6 +37,9 @@ */ public class FlinkSourceSplit implements SourceSplit, Serializable { // The index of the split. + + private static final long serialVersionUID = 7458114818012108972L; + private final int splitIndex; private final Source beamSplitSource; private final byte @Nullable [] splitState; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index 2dcf1a3f594f..e2566b00fc2f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -21,11 +21,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; -import java.util.NavigableMap; +import java.util.Map; import java.util.Optional; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -78,8 +77,8 @@ public class FlinkUnboundedSourceReader private final List readers = new ArrayList<>(); private int currentReaderIndex = 0; private volatile boolean shouldEmitWatermark; - private final NavigableMap>> unfinishedCheckpoints = - new TreeMap<>(); + private final LinkedHashMap>> pendingCheckpoints = + new LinkedHashMap<>(); public FlinkUnboundedSourceReader( String stepName, @@ -103,22 +102,22 @@ protected FlinkUnboundedSourceReader( protected void addSplitsToUnfinishedForCheckpoint( long checkpointId, List> flinkSourceSplits) { - unfinishedCheckpoints.put(checkpointId, flinkSourceSplits); + pendingCheckpoints.put(checkpointId, flinkSourceSplits); } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - SortedMap>> headMap = - unfinishedCheckpoints.headMap(checkpointId + 1); - for (List> splits : headMap.values()) { - for (FlinkSourceSplit s : splits) { - finalizeSourceSplit(s.getCheckpointMark()); + List finalized = new ArrayList<>(); + for (Map.Entry>> e : pendingCheckpoints.entrySet()) { + if (e.getKey() <= checkpointId) { + for (FlinkSourceSplit s : e.getValue()) { + finalizeSourceSplit(s.getCheckpointMark()); + } + finalized.add(e.getKey()); } } - for (long checkpoint : new ArrayList<>(headMap.keySet())) { - unfinishedCheckpoints.remove(checkpoint); - } + finalized.forEach(pendingCheckpoints::remove); } @Override diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index 295f250df3d6..dccadc5e7f24 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -73,13 +73,8 @@ public void testSnapshotStateAndRestore() throws Exception { reader = createReader()) { pollAndValidate(reader, splits, validatingOutput, numSplits * numRecordsPerSplit / 2); snapshot = reader.snapshotState(0L); - // use higher checkpoint number to verify that we finalize everything that was created - // up to that checkpoint - reader.notifyCheckpointComplete(1L); } - assertEquals(numSplits, DummySource.numFinalizeCalled.size()); - // Create another reader, add the snapshot splits back. try (SourceReader< WindowedValue>>, @@ -299,15 +294,43 @@ public void testPendingBytesMetric() throws Exception { } } - // --------------- private helper classes ----------------- - /** A source whose advance() method only returns true occasionally. */ - private static class DummySource extends TestCountingSource { + @Test + public void testCheckMarksFinalized() throws Exception { - static List numFinalizeCalled = new ArrayList<>(); + final int numSplits = 2; + final int numRecordsPerSplit = 10; - static { - TestCountingSource.setFinalizeTracker(numFinalizeCalled); + List>> splits = + createSplits(numSplits, numRecordsPerSplit, 0); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + // Create a reader, take a snapshot. + try (SourceReader< + WindowedValue>>, + FlinkSourceSplit>> + reader = createReader()) { + List finalizeTracker = new ArrayList<>(); + TestCountingSource.setFinalizeTracker(finalizeTracker); + pollAndValidate(reader, splits, validatingOutput, numSplits * numRecordsPerSplit / 2); + assertTrue(finalizeTracker.isEmpty()); + reader.snapshotState(0L); + // notifyCheckpointComplete is normally called by the SourceOperator + reader.notifyCheckpointComplete(0L); + // every split should be finalized + assertEquals(numSplits, finalizeTracker.size()); + pollAndValidate(reader, splits, validatingOutput, numSplits); + // no notifyCheckpointComplete here, assume the checkpoint failed + reader.snapshotState(1L); + pollAndValidate(reader, splits, validatingOutput, numSplits); + reader.snapshotState(2L); + reader.notifyCheckpointComplete(2L); + // 2 * numSplits more should be finalized + assertEquals(3 * numSplits, finalizeTracker.size()); } + } + + // --------------- private helper classes ----------------- + /** A source whose advance() method only returns true occasionally. */ + private static class DummySource extends TestCountingSource { public DummySource(int numMessagesPerShard) { super(numMessagesPerShard);