Skip to content

Commit

Permalink
[WIP] Also fire timer on finished bundle
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Aug 13, 2024
1 parent 78da517 commit 5e7b170
Showing 1 changed file with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.joda.time.Instant;

/** Flink operator for executing window {@link DoFn DoFns}. */
@SuppressWarnings({
Expand Down Expand Up @@ -81,6 +82,7 @@ public WindowDoFnOperator(
public void open() throws Exception {
stateInternals = new HashMap<>();
inMemTimerInternals = new HashMap<>();
setBundleFinishedCallback(this::onFinishBundle);
super.open();
}

Expand Down Expand Up @@ -128,15 +130,19 @@ protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
// element or by the key of a firing timer
stateInternalsFactory = key -> (StateInternals) keyedStateInternals;
} else {
// On a non-keyed stream, no flink does not provide a state backend.
// We use an in memory state instead.
stateInternalsFactory =
key -> stateInternals.computeIfAbsent(key, k -> InMemoryStateInternals.forKey(k));
key -> stateInternals.computeIfAbsent(key, InMemoryStateInternals::forKey);
}

TimerInternalsFactory<K> timerInternalsFactory;
if (timerInternals != null) {
// this will implicitly be keyed like the StateInternalsFactory
timerInternalsFactory = key -> timerInternals;
} else {
// On a non-keyed stream, no flink does not provide a timer service.
// We use an in memory timer instead.
timerInternalsFactory =
key -> inMemTimerInternals.computeIfAbsent(key, k -> new InMemoryTimerInternals());
}
Expand All @@ -157,39 +163,44 @@ protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
return doFn;
}

@Override
void flushData() throws Exception {
private void advanceWatermarkAndFireTimers(Instant watermark) throws Exception {
for (Map.Entry<K, InMemoryTimerInternals> entry : inMemTimerInternals.entrySet()) {
K key = entry.getKey();
InMemoryTimerInternals timer = entry.getValue();
timer.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
timer.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
timer.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
timer.advanceInputWatermark(watermark);
timer.advanceProcessingTime(watermark);
timer.advanceSynchronizedProcessingTime(watermark);

TimerData timerData = timer.removeNextEventTimer();
while (timerData != null) {
doFnRunner.processElement(
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData))));
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData))));
timerData = timer.removeNextEventTimer();
}

timerData = timer.removeNextProcessingTimer();
while (timerData != null) {
doFnRunner.processElement(
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData))));
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData))));
timerData = timer.removeNextProcessingTimer();
}

timerData = timer.removeNextSynchronizedProcessingTimer();
while (timerData != null) {
doFnRunner.processElement(
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData))));
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timerData))));
timerData = timer.removeNextSynchronizedProcessingTimer();
}
}
}

@Override
void flushData() throws Exception {
// equivalent to processPendingProcessingTimeTimers for in memory timers
advanceWatermarkAndFireTimers(new Instant(Long.MAX_VALUE));
super.flushData();
}

Expand All @@ -201,4 +212,12 @@ protected void fireTimer(TimerData timer) {
KeyedWorkItems.timersWorkItem(
(K) keyedStateInternals.getKey(), Collections.singletonList(timer))));
}

private void onFinishBundle() {
try {
advanceWatermarkAndFireTimers(new Instant(getEffectiveInputWatermark()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit 5e7b170

Please sign in to comment.