From 5a822adc26908bc020bcd0905b8029357f5e6216 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Fri, 24 May 2024 11:51:50 +0200 Subject: [PATCH] [flink] #31390 emit watermark with empty source --- .../io/source/FlinkSourceReaderBase.java | 9 ++-- .../unbounded/FlinkUnboundedSourceReader.java | 8 +-- .../io/source/EmptyUnboundedSource.java | 4 +- .../FlinkUnboundedSourceReaderTest.java | 54 +++++++++++++++++-- 4 files changed, 63 insertions(+), 12 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index 1f4f31f90ebf..9ae50ab5bff4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -339,13 +339,14 @@ public SourceOutput getAndMaybeCreateSplitOutput(ReaderOutput return outputForSplit; } - public boolean startOrAdvance() throws IOException { + public boolean startOrAdvance(ReaderOutput output) throws IOException { if (started) { + // associate output with the split + getAndMaybeCreateSplitOutput(output); return invocationUtil.invokeAdvance(reader); - } else { - started = true; - return invocationUtil.invokeStart(reader); } + started = true; + return invocationUtil.invokeStart(reader); } public @Nullable SourceOutput sourceOutput() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index e2566b00fc2f..54c23bea3fb1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -157,7 +157,7 @@ public InputStatus pollNext(ReaderOutput>> ou maybeEmitWatermark(); maybeCreateReaderForNewSplits(); - ReaderAndOutput reader = nextReaderWithData(); + ReaderAndOutput reader = nextReaderWithData(output); if (reader != null) { emitRecord(reader, output); return InputStatus.MORE_AVAILABLE; @@ -300,12 +300,14 @@ private void maybeCreateReaderForNewSplits() throws Exception { } } - private @Nullable ReaderAndOutput nextReaderWithData() throws IOException { + private @Nullable ReaderAndOutput nextReaderWithData( + ReaderOutput>> output) throws IOException { + int numReaders = readers.size(); for (int i = 0; i < numReaders; i++) { ReaderAndOutput readerAndOutput = readers.get(currentReaderIndex); currentReaderIndex = (currentReaderIndex + 1) % numReaders; - if (readerAndOutput.startOrAdvance()) { + if (readerAndOutput.startOrAdvance(output)) { return readerAndOutput; } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/EmptyUnboundedSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/EmptyUnboundedSource.java index ade48820dbda..8ddbe5218f8c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/EmptyUnboundedSource.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/EmptyUnboundedSource.java @@ -48,7 +48,7 @@ public List> split( @Override public UnboundedReader createReader( - PipelineOptions options, @Nullable DummyCheckpointMark checkpointMark) throws IOException { + PipelineOptions options, @Nullable DummyCheckpointMark checkpointMark) { return new UnboundedReader() { @Override public boolean start() throws IOException { @@ -56,7 +56,7 @@ public boolean start() throws IOException { } @Override - public boolean advance() throws IOException { + public boolean advance() { return false; } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index dccadc5e7f24..9525c06db8ed 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -44,12 +44,16 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.metrics.Gauge; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; import org.junit.Test; /** Unite tests for {@link FlinkUnboundedSourceReader}. */ @@ -228,6 +232,38 @@ public void testWatermark() throws Exception { public void testWatermarkOnEmptySource() throws Exception { ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService(); + AtomicReference watermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); + ReaderOutput>>> output = + new ReaderOutput>>>() { + @Override + public void collect(WindowedValue>> unused) {} + + @Override + public void collect( + WindowedValue>> unused, long l) {} + + @Override + public void emitWatermark(Watermark w) { + watermark.compareAndSet( + BoundedWindow.TIMESTAMP_MIN_VALUE, Instant.ofEpochMilli(w.getTimestamp())); + } + + @Override + public void markIdle() {} + + @Override + public SourceOutput>>> + createOutputForSplit(String s) { + return this; + } + + @Override + public void releaseOutputForSplit(String s) {} + + @Override + public void markActive() {} + }; + Instant now = Instant.now(); try (FlinkUnboundedSourceReader> reader = (FlinkUnboundedSourceReader>) createReader(executor, -1L)) { List>> splits = createEmptySplits(2); @@ -236,22 +272,34 @@ public void testWatermarkOnEmptySource() throws Exception { reader.notifyNoMoreSplits(); for (int i = 0; i < 4; i++) { - assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(null)); + assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output)); } + // move first reader to 'now' + ((EmptyUnboundedSource>) splits.get(0).getBeamSplitSource()) + .setWatermark(now); + // force trigger timeout + executor.triggerScheduledTasks(); + for (int i = 0; i < 4; i++) { + assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output)); + } + + // check we have emitted watermark + assertEquals(now, watermark.get()); + // move first reader to end of time ((EmptyUnboundedSource>) splits.get(0).getBeamSplitSource()) .setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); for (int i = 0; i < 4; i++) { - assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(null)); + assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output)); } // move the second reader to end of time ((EmptyUnboundedSource>) splits.get(1).getBeamSplitSource()) .setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); - assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(null)); + assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output)); } }