Skip to content

Commit

Permalink
Merge pull request #16840: Add tests for processing time continuation…
Browse files Browse the repository at this point in the history
… trigger
  • Loading branch information
kennknowles authored Feb 15, 2022
2 parents c558e85 + cad8515 commit 150e311
Showing 1 changed file with 114 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,25 @@
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.LargeKeys;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down Expand Up @@ -90,7 +98,8 @@
/** Tests for GroupByKey. */
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"unchecked"
"unchecked",
"unused"
})
@RunWith(Enclosed.class)
public class GroupByKeyTest implements Serializable {
Expand All @@ -103,7 +112,7 @@ public abstract static class SharedTestBase {

/** Tests validating basic {@link GroupByKey} scenarios. */
@RunWith(JUnit4.class)
public static class BasicTests extends SharedTestBase {
public static class BasicTests extends SharedTestBase implements Serializable {
@Test
@Category(ValidatesRunner.class)
public void testGroupByKey() {
Expand Down Expand Up @@ -186,6 +195,109 @@ public void testCombiningAccumulatingProcessingTime() throws Exception {
p.run();
}

/**
* Tests that data from a processing time trigger flows through subsequent GroupByKey
* transforms. To test this with TestStream, we check that it arrives in an early pane,
* demonstrating that the watermark did not cause the output.
*/
@Test
@Category({ValidatesRunner.class, UsesTestStreamWithProcessingTime.class})
public void testAfterProcessingTimeContinuationTriggerEarly() throws Exception {
final long triggerMillis = 1; // Setting of the processing time trigger
final long advanceMillis = 10; // How far we advance time for the first GBK to trigger
final long waitMillis = 500; // How far we advance time for the second GBK to trigger

PCollection<Integer> triggeredSums =
p.apply(
TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(new Instant(0))
.addElements(42)
.advanceProcessingTime(Duration.millis(advanceMillis))
.advanceProcessingTime(Duration.millis(waitMillis))
.advanceWatermarkToInfinity())
.apply(
Window.<Integer>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.millis(triggerMillis))))
.accumulatingFiredPanes()
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withAllowedLateness(Duration.millis(0)))
.apply("Triggered sum", Sum.integersGlobally().withoutDefaults())
.apply("Second Triggered sum", Sum.integersGlobally().withoutDefaults());

PAssert.that(triggeredSums).inEarlyGlobalWindowPanes().containsInAnyOrder(42);

p.run();
}

/**
* Tests that data from a processing time trigger flows through subsequent GroupByKey
* transforms. This version does not depend on {@link TestStream} with processing time, because
* many runners do not support it.
*
* <p>Since a delay in processing time is fundamental to this test, it unfortunately must have a
* real-time sleep. Currently the sleep is sub-second so it is acceptable delay for a unit test
* suite.
*/
@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesUnboundedPCollections.class})
public void testAfterProcessingTimeContinuationTriggerUsingState() throws Exception {
final long triggerMillis = 1;
final long waitMillis = 500;

PCollection<Integer> triggeredSums =
p.apply(
GenerateSequence.from(0)
.to(1)) // forces unbounded so delay cannot be fast-forwarded
.apply(WithKeys.of("dummy key"))
.apply(
"output then delay",
ParDo.of(
new DoFn<KV<String, Long>, Integer>() {
private static final String DELAY_TIMER = "delay";

@TimerId(DELAY_TIMER)
private final TimerSpec delayTimerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void process(
@Timestamp Instant timestamp,
BoundedWindow window,
OutputReceiver<Integer> out,
@TimerId(DELAY_TIMER) Timer delayTimer) {
out.output(42);

// wait a little bit while stopping downstream from firing the window
delayTimer.set(window.maxTimestamp().minus(Duration.millis(10)));
}

@OnTimer(DELAY_TIMER)
public void onDelay(@Timestamp Instant timestamp)
throws InterruptedException {
// noop, just here to force the pipeline to sleep so downstream GBK will
// trigger
Thread.sleep(waitMillis);
}
}))
.apply(
Window.<Integer>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.millis(triggerMillis))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.millis(0)))
.apply("Triggered sum", Sum.integersGlobally().withoutDefaults())
.apply("Second Triggered sum", Sum.integersGlobally().withoutDefaults());

PAssert.that(triggeredSums).inEarlyGlobalWindowPanes().containsInAnyOrder(42);

p.run();
}

@Test
public void testGroupByKeyNonDeterministic() throws Exception {

Expand Down

0 comments on commit 150e311

Please sign in to comment.