Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle #11924

Merged
merged 7 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
* Upgrade Sphinx to 3.0.3 for building PyDoc.
* Added a PTransform for image annotation using Google Cloud AI image processing service
([BEAM-9646](https://issues.apache.org/jira/browse/BEAM-9646))
* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -519,7 +521,7 @@ public void start(
synchronizedProcessingTime);

this.cachedFiredTimers = null;
this.cachedFiredUserTimers = null;
this.toBeFiredTimersOrdered = null;
}

public void flushState() {
Expand Down Expand Up @@ -559,28 +561,67 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
return nextTimer;
}

// Lazily initialized
private Iterator<TimerData> cachedFiredUserTimers = null;
private PriorityQueue<TimerData> toBeFiredTimersOrdered = null;

// to track if timer is reset earlier mid-bundle.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment about what are the keys and values of this map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

// Map of timer's id to timer's firing time to check
// the actual firing time of a timer.
private Map<String, Instant> firedTimer = new HashMap<>();

public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
if (cachedFiredUserTimers == null) {
cachedFiredUserTimers =
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
.filter(
timer ->
WindmillTimerInternals.isUserTimer(timer)
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
.iterator();
if (toBeFiredTimersOrdered == null) {

toBeFiredTimersOrdered = new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
.filter(
timer ->
WindmillTimerInternals.isUserTimer(timer)
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
.iterator()
.forEachRemaining(
timerData -> {
firedTimer.put(
timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
timerData.getTimestamp());
toBeFiredTimersOrdered.add(timerData);
});
}

if (!cachedFiredUserTimers.hasNext()) {
Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime();

if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
List<TimerData> currentTimers = userTimerInternals.getCurrentTimers();

for (TimerData timerData : currentTimers) {
firedTimer.put(
timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
timerData.getTimestamp());
toBeFiredTimersOrdered.add(timerData);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles for comment. This doesn't look right to me, as I don't think we should be modifying the WindmillTimerInternals here. I think we just want to merge the timer modifications from processing the workitem into this priority queue; note that if timers are deleted, we need to detect that as well and remove from the priority queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I don't actually understand what this block is for.

FWIW to do timer deletion/reset cheaply without building a bespoke data structure just keep a map from id to firing time or tombstone. This way, whenever a timer comes up in the prio queue you pull out the actual time for it from the map. If it is actually set for another time, don't fire it. If it is obsolete, don't fire it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reuvenlax done


TimerData nextTimer = null;

// fire timer only if its timestamp matched. Else it is either reset or obsolete.
while (!toBeFiredTimersOrdered.isEmpty()) {
nextTimer = toBeFiredTimersOrdered.poll();
String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId();
if (firedTimer.containsKey(timerUniqueId)
&& firedTimer.get(timerUniqueId).isEqual(nextTimer.getTimestamp())) {
break;
} else {
nextTimer = null;
}
}

if (nextTimer == null) {
return null;
}
TimerData nextTimer = cachedFiredUserTimers.next();

// User timers must be explicitly deleted when delivered, to release the implied hold
userTimerInternals.deleteTimer(nextTimer);
return nextTimer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
Expand Down Expand Up @@ -225,6 +227,29 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
timers.clear();
}

public boolean hasTimerBefore(Instant time) {
for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
if (cell.getValue()) {
if (timerData.getTimestamp().isBefore(time)) {
return true;
}
}
}
return false;
}

public List<TimerData> getCurrentTimers() {
List<TimerData> timerDataList = new ArrayList<>();
for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
if (cell.getValue()) {
timerDataList.add(timerData);
}
}
return timerDataList;
}

private boolean needsWatermarkHold(TimerData timerData) {
// If it is a user timer or a system timer with outputTimestamp different than timestamp
return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3959,7 +3959,7 @@ public void testEventTimeTimerOrdering() throws Exception {
}

testEventTimeTimerOrderingWithInputPTransform(
now, numTestElements, builder.advanceWatermarkToInfinity());
now, numTestElements, builder.advanceWatermarkToInfinity(), IsBounded.BOUNDED);
}

/** A test makes sure that an event time timers are correctly ordered using Create transform. */
Expand All @@ -3970,7 +3970,7 @@ public void testEventTimeTimerOrdering() throws Exception {
UsesStatefulParDo.class,
UsesStrictTimerOrdering.class
})
public void testEventTimeTimerOrderingWithCreate() throws Exception {
public void testEventTimeTimerOrderingWithCreateBounded() throws Exception {
final int numTestElements = 100;
final Instant now = new Instant(1500000000000L);

Expand All @@ -3980,13 +3980,39 @@ public void testEventTimeTimerOrderingWithCreate() throws Exception {
}

testEventTimeTimerOrderingWithInputPTransform(
now, numTestElements, Create.timestamped(elements));
now, numTestElements, Create.timestamped(elements), IsBounded.BOUNDED);
}

/**
* A test makes sure that an event time timers are correctly ordered using Create transform
* unbounded.
*/
@Test
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
UsesStatefulParDo.class,
UsesUnboundedPCollections.class,
UsesStrictTimerOrdering.class
})
public void testEventTimeTimerOrderingWithCreateUnbounded() throws Exception {
final int numTestElements = 100;
final Instant now = new Instant(1500000000000L);

List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
for (int i = 0; i < numTestElements; i++) {
elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000)));
}

testEventTimeTimerOrderingWithInputPTransform(
now, numTestElements, Create.timestamped(elements), IsBounded.UNBOUNDED);
}

private void testEventTimeTimerOrderingWithInputPTransform(
Instant now,
int numTestElements,
PTransform<PBegin, PCollection<KV<String, String>>> transform)
PTransform<PBegin, PCollection<KV<String, String>>> transform,
IsBounded isBounded)
throws Exception {

final String timerIdBagAppend = "append";
Expand Down Expand Up @@ -4070,7 +4096,8 @@ public void onTimer(
}
};

PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn));
PCollection<String> output =
pipeline.apply(transform).setIsBoundedInternal(isBounded).apply(ParDo.of(fn));
List<String> expected =
IntStream.rangeClosed(0, numTestElements)
.mapToObj(expandFn(numTestElements))
Expand Down Expand Up @@ -4154,16 +4181,25 @@ public void testTwoTimersSettingEachOther() {
TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of()))
.addElements(KV.of(null, null))
.advanceWatermarkToInfinity();
pipeline.apply(TwoTimerTest.of(now, end, input));
pipeline.apply(TwoTimerTest.of(now, end, input, IsBounded.BOUNDED));
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class})
public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() {
Instant now = new Instant(1500000000000L);
Instant end = now.plus(100);
pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.BOUNDED));
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class})
public void testTwoTimersSettingEachOtherWithCreateAsInput() {
public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() {
Instant now = new Instant(1500000000000L);
Instant end = now.plus(100);
pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null))));
pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.UNBOUNDED));
pipeline.run();
}

Expand Down Expand Up @@ -4337,18 +4373,26 @@ public void onTimer(
private static class TwoTimerTest extends PTransform<PBegin, PDone> {

private static PTransform<PBegin, PDone> of(
Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) {
return new TwoTimerTest(start, end, input);
Instant start,
Instant end,
PTransform<PBegin, PCollection<KV<Void, Void>>> input,
IsBounded isBounded) {
return new TwoTimerTest(start, end, input, isBounded);
}

private final Instant start;
private final Instant end;
private final IsBounded isBounded;
private final transient PTransform<PBegin, PCollection<KV<Void, Void>>> inputPTransform;

public TwoTimerTest(
Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) {
Instant start,
Instant end,
PTransform<PBegin, PCollection<KV<Void, Void>>> input,
IsBounded isBounded) {
this.start = start;
this.end = end;
this.isBounded = isBounded;
this.inputPTransform = input;
}

Expand All @@ -4361,6 +4405,7 @@ public PDone expand(PBegin input) {
PCollection<String> result =
input
.apply(inputPTransform)
.setIsBoundedInternal(isBounded)
.apply(
ParDo.of(
new DoFn<KV<Void, Void>, String>() {
Expand Down Expand Up @@ -4425,7 +4470,7 @@ public void onTimer2(
}));

List<String> expected =
LongStream.rangeClosed(0, 100)
LongStream.rangeClosed(0, end.minus(start.getMillis()).getMillis())
.mapToObj(e -> (Long) e)
.flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + ":" + e).stream())
.collect(Collectors.toList());
Expand Down