Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10991] Cherrypick #12980 to 2.25.0: Revert "[BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle (#11924)" #12993

Merged
merged 2 commits into from
Oct 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
## Known Issues

* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Dataflow streaming timers once against not strictly time ordered when set earlier mid-bundle, as the fix for [BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543) introduced more severe bugs and has been rolled back.


# [2.24.0] - 2020-09-18
Expand Down Expand Up @@ -131,6 +132,9 @@
--temp_location, or pass method="STREAMING_INSERTS" to WriteToBigQuery ([BEAM-6928](https://issues.apache.org/jira/browse/BEAM-6928)).
* Python SDK now understands `typing.FrozenSet` type hints, which are not interchangeable with `typing.Set`. You may need to update your pipelines if type checking fails. ([BEAM-10197](https://issues.apache.org/jira/browse/BEAM-10197))

## Known issues

* When a timer fires but is reset prior to being executed, a watermark hold may be leaked, causing a stuck pipeline [BEAM-10991](https://issues.apache.org/jira/browse/BEAM-10991).

robinyqiu marked this conversation as resolved.
Show resolved Hide resolved
# [2.23.0] - 2020-06-29

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -529,7 +527,7 @@ public void start(
synchronizedProcessingTime);

this.cachedFiredTimers = null;
this.toBeFiredTimersOrdered = null;
this.cachedFiredUserTimers = null;
}

public void flushState() {
Expand Down Expand Up @@ -569,67 +567,28 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
return nextTimer;
}

private PriorityQueue<TimerData> toBeFiredTimersOrdered = null;

// to track if timer is reset earlier mid-bundle.
// Map of timer's id to timer's firing time to check
// the actual firing time of a timer.
private Map<String, Instant> firedTimer = new HashMap<>();
// Lazily initialized
private Iterator<TimerData> cachedFiredUserTimers = null;

public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
if (toBeFiredTimersOrdered == null) {

toBeFiredTimersOrdered = new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
.filter(
timer ->
WindmillTimerInternals.isUserTimer(timer)
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
.iterator()
.forEachRemaining(
timerData -> {
firedTimer.put(
timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
timerData.getTimestamp());
toBeFiredTimersOrdered.add(timerData);
});
}

Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime();

if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
List<TimerData> currentTimers = userTimerInternals.getCurrentTimers();

for (TimerData timerData : currentTimers) {
firedTimer.put(
timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
timerData.getTimestamp());
toBeFiredTimersOrdered.add(timerData);
}
}

TimerData nextTimer = null;

// fire timer only if its timestamp matched. Else it is either reset or obsolete.
while (!toBeFiredTimersOrdered.isEmpty()) {
nextTimer = toBeFiredTimersOrdered.poll();
String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId();
if (firedTimer.containsKey(timerUniqueId)
&& firedTimer.get(timerUniqueId).isEqual(nextTimer.getTimestamp())) {
break;
} else {
nextTimer = null;
}
if (cachedFiredUserTimers == null) {
cachedFiredUserTimers =
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
.filter(
timer ->
WindmillTimerInternals.isUserTimer(timer)
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
.iterator();
}

if (nextTimer == null) {
if (!cachedFiredUserTimers.hasNext()) {
return null;
}

TimerData nextTimer = cachedFiredUserTimers.next();
// User timers must be explicitly deleted when delivered, to release the implied hold
userTimerInternals.deleteTimer(nextTimer);
return nextTimer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
Expand Down Expand Up @@ -227,29 +225,6 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
timers.clear();
}

public boolean hasTimerBefore(Instant time) {
for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
if (cell.getValue()) {
if (timerData.getTimestamp().isBefore(time)) {
return true;
}
}
}
return false;
}

public List<TimerData> getCurrentTimers() {
List<TimerData> timerDataList = new ArrayList<>();
for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
if (cell.getValue()) {
timerDataList.add(timerData);
}
}
return timerDataList;
}

private boolean needsWatermarkHold(TimerData timerData) {
// If it is a user timer or a system timer with outputTimestamp different than timestamp
return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4202,7 +4202,7 @@ public void testEventTimeTimerOrdering() throws Exception {
}

testEventTimeTimerOrderingWithInputPTransform(
now, numTestElements, builder.advanceWatermarkToInfinity(), IsBounded.BOUNDED);
now, numTestElements, builder.advanceWatermarkToInfinity());
}

/** A test makes sure that an event time timers are correctly ordered using Create transform. */
Expand All @@ -4213,7 +4213,7 @@ public void testEventTimeTimerOrdering() throws Exception {
UsesStatefulParDo.class,
UsesStrictTimerOrdering.class
})
public void testEventTimeTimerOrderingWithCreateBounded() throws Exception {
public void testEventTimeTimerOrderingWithCreate() throws Exception {
final int numTestElements = 100;
final Instant now = new Instant(1500000000000L);

Expand All @@ -4223,39 +4223,13 @@ public void testEventTimeTimerOrderingWithCreateBounded() throws Exception {
}

testEventTimeTimerOrderingWithInputPTransform(
now, numTestElements, Create.timestamped(elements), IsBounded.BOUNDED);
}

/**
* A test makes sure that an event time timers are correctly ordered using Create transform
* unbounded.
*/
@Test
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
UsesStatefulParDo.class,
UsesUnboundedPCollections.class,
UsesStrictTimerOrdering.class
})
public void testEventTimeTimerOrderingWithCreateUnbounded() throws Exception {
final int numTestElements = 100;
final Instant now = new Instant(1500000000000L);

List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
for (int i = 0; i < numTestElements; i++) {
elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000)));
}

testEventTimeTimerOrderingWithInputPTransform(
now, numTestElements, Create.timestamped(elements), IsBounded.UNBOUNDED);
now, numTestElements, Create.timestamped(elements));
}

private void testEventTimeTimerOrderingWithInputPTransform(
Instant now,
int numTestElements,
PTransform<PBegin, PCollection<KV<String, String>>> transform,
IsBounded isBounded)
PTransform<PBegin, PCollection<KV<String, String>>> transform)
throws Exception {

final String timerIdBagAppend = "append";
Expand Down Expand Up @@ -4339,8 +4313,7 @@ public void onTimer(
}
};

PCollection<String> output =
pipeline.apply(transform).setIsBoundedInternal(isBounded).apply(ParDo.of(fn));
PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn));
List<String> expected =
IntStream.rangeClosed(0, numTestElements)
.mapToObj(expandFn(numTestElements))
Expand Down Expand Up @@ -4420,25 +4393,16 @@ public void testTwoTimersSettingEachOther() {
TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of()))
.addElements(KV.of(null, null))
.advanceWatermarkToInfinity();
pipeline.apply(TwoTimerTest.of(now, end, input, IsBounded.BOUNDED));
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class})
public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() {
Instant now = new Instant(1500000000000L);
Instant end = now.plus(100);
pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.BOUNDED));
pipeline.apply(TwoTimerTest.of(now, end, input));
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class})
public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() {
public void testTwoTimersSettingEachOtherWithCreateAsInput() {
Instant now = new Instant(1500000000000L);
Instant end = now.plus(100);
pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.UNBOUNDED));
pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null))));
pipeline.run();
}

Expand Down Expand Up @@ -4612,26 +4576,18 @@ public void onTimer(
private static class TwoTimerTest extends PTransform<PBegin, PDone> {

private static PTransform<PBegin, PDone> of(
Instant start,
Instant end,
PTransform<PBegin, PCollection<KV<Void, Void>>> input,
IsBounded isBounded) {
return new TwoTimerTest(start, end, input, isBounded);
Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) {
return new TwoTimerTest(start, end, input);
}

private final Instant start;
private final Instant end;
private final IsBounded isBounded;
private final transient PTransform<PBegin, PCollection<KV<Void, Void>>> inputPTransform;

public TwoTimerTest(
Instant start,
Instant end,
PTransform<PBegin, PCollection<KV<Void, Void>>> input,
IsBounded isBounded) {
Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) {
this.start = start;
this.end = end;
this.isBounded = isBounded;
this.inputPTransform = input;
}

Expand All @@ -4644,7 +4600,6 @@ public PDone expand(PBegin input) {
PCollection<String> result =
input
.apply(inputPTransform)
.setIsBoundedInternal(isBounded)
.apply(
ParDo.of(
new DoFn<KV<Void, Void>, String>() {
Expand Down Expand Up @@ -4709,7 +4664,7 @@ public void onTimer2(
}));

List<String> expected =
LongStream.rangeClosed(0, end.minus(start.getMillis()).getMillis())
LongStream.rangeClosed(0, 100)
.mapToObj(e -> (Long) e)
.flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + ":" + e).stream())
.collect(Collectors.toList());
Expand Down