diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 7baf193ef191..69d701e14570 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.java.functions.KeySelector; +import org.joda.time.Instant; /** Flink operator for executing window {@link DoFn DoFns}. */ @SuppressWarnings({ @@ -81,6 +82,7 @@ public WindowDoFnOperator( public void open() throws Exception { stateInternals = new HashMap<>(); inMemTimerInternals = new HashMap<>(); + setBundleFinishedCallback(this::onFinishBundle); super.open(); } @@ -128,8 +130,10 @@ protected DoFn, KV> getDoFn() { // element or by the key of a firing timer stateInternalsFactory = key -> (StateInternals) keyedStateInternals; } else { + // On a non-keyed stream, no flink does not provide a state backend. + // We use an in memory state instead. stateInternalsFactory = - key -> stateInternals.computeIfAbsent(key, k -> InMemoryStateInternals.forKey(k)); + key -> stateInternals.computeIfAbsent(key, InMemoryStateInternals::forKey); } TimerInternalsFactory timerInternalsFactory; @@ -137,6 +141,8 @@ protected DoFn, KV> getDoFn() { // this will implicitly be keyed like the StateInternalsFactory timerInternalsFactory = key -> timerInternals; } else { + // On a non-keyed stream, no flink does not provide a timer service. + // We use an in memory timer instead. timerInternalsFactory = key -> inMemTimerInternals.computeIfAbsent(key, k -> new InMemoryTimerInternals()); } @@ -157,39 +163,44 @@ protected DoFn, KV> getDoFn() { return doFn; } - @Override - void flushData() throws Exception { + private void advanceWatermarkAndFireTimers(Instant watermark) throws Exception { for (Map.Entry entry : inMemTimerInternals.entrySet()) { K key = entry.getKey(); InMemoryTimerInternals timer = entry.getValue(); - timer.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); - timer.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); - timer.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + timer.advanceInputWatermark(watermark); + timer.advanceProcessingTime(watermark); + timer.advanceSynchronizedProcessingTime(watermark); TimerData timerData = timer.removeNextEventTimer(); while (timerData != null) { doFnRunner.processElement( - WindowedValue.valueInGlobalWindow( - KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData)))); + WindowedValue.valueInGlobalWindow( + KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData)))); timerData = timer.removeNextEventTimer(); } timerData = timer.removeNextProcessingTimer(); while (timerData != null) { doFnRunner.processElement( - WindowedValue.valueInGlobalWindow( - KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData)))); + WindowedValue.valueInGlobalWindow( + KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData)))); timerData = timer.removeNextProcessingTimer(); } timerData = timer.removeNextSynchronizedProcessingTimer(); while (timerData != null) { doFnRunner.processElement( - WindowedValue.valueInGlobalWindow( - KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData)))); + WindowedValue.valueInGlobalWindow( + KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData)))); timerData = timer.removeNextSynchronizedProcessingTimer(); } } + } + + @Override + void flushData() throws Exception { + // equivalent to processPendingProcessingTimeTimers for in memory timers + advanceWatermarkAndFireTimers(new Instant(Long.MAX_VALUE)); super.flushData(); } @@ -201,4 +212,12 @@ protected void fireTimer(TimerData timer) { KeyedWorkItems.timersWorkItem( (K) keyedStateInternals.getKey(), Collections.singletonList(timer)))); } + + private void onFinishBundle() { + try { + advanceWatermarkAndFireTimers(new Instant(getEffectiveInputWatermark())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } }