diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index f5895ffdb207..0a5a74b49c65 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -47,17 +47,25 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.testing.LargeKeys; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime; +import org.apache.beam.sdk.testing.UsesTimersInParDo; +import org.apache.beam.sdk.testing.UsesUnboundedPCollections; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -90,7 +98,8 @@ /** Tests for GroupByKey. */ @SuppressWarnings({ "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) - "unchecked" + "unchecked", + "unused" }) @RunWith(Enclosed.class) public class GroupByKeyTest implements Serializable { @@ -103,7 +112,7 @@ public abstract static class SharedTestBase { /** Tests validating basic {@link GroupByKey} scenarios. */ @RunWith(JUnit4.class) - public static class BasicTests extends SharedTestBase { + public static class BasicTests extends SharedTestBase implements Serializable { @Test @Category(ValidatesRunner.class) public void testGroupByKey() { @@ -186,6 +195,109 @@ public void testCombiningAccumulatingProcessingTime() throws Exception { p.run(); } + /** + * Tests that data from a processing time trigger flows through subsequent GroupByKey + * transforms. To test this with TestStream, we check that it arrives in an early pane, + * demonstrating that the watermark did not cause the output. + */ + @Test + @Category({ValidatesRunner.class, UsesTestStreamWithProcessingTime.class}) + public void testAfterProcessingTimeContinuationTriggerEarly() throws Exception { + final long triggerMillis = 1; // Setting of the processing time trigger + final long advanceMillis = 10; // How far we advance time for the first GBK to trigger + final long waitMillis = 500; // How far we advance time for the second GBK to trigger + + PCollection triggeredSums = + p.apply( + TestStream.create(VarIntCoder.of()) + .advanceWatermarkTo(new Instant(0)) + .addElements(42) + .advanceProcessingTime(Duration.millis(advanceMillis)) + .advanceProcessingTime(Duration.millis(waitMillis)) + .advanceWatermarkToInfinity()) + .apply( + Window.configure() + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.millis(triggerMillis)))) + .accumulatingFiredPanes() + .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) + .withAllowedLateness(Duration.millis(0))) + .apply("Triggered sum", Sum.integersGlobally().withoutDefaults()) + .apply("Second Triggered sum", Sum.integersGlobally().withoutDefaults()); + + PAssert.that(triggeredSums).inEarlyGlobalWindowPanes().containsInAnyOrder(42); + + p.run(); + } + + /** + * Tests that data from a processing time trigger flows through subsequent GroupByKey + * transforms. This version does not depend on {@link TestStream} with processing time, because + * many runners do not support it. + * + *

Since a delay in processing time is fundamental to this test, it unfortunately must have a + * real-time sleep. Currently the sleep is sub-second so it is acceptable delay for a unit test + * suite. + */ + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesUnboundedPCollections.class}) + public void testAfterProcessingTimeContinuationTriggerUsingState() throws Exception { + final long triggerMillis = 1; + final long waitMillis = 500; + + PCollection triggeredSums = + p.apply( + GenerateSequence.from(0) + .to(1)) // forces unbounded so delay cannot be fast-forwarded + .apply(WithKeys.of("dummy key")) + .apply( + "output then delay", + ParDo.of( + new DoFn, Integer>() { + private static final String DELAY_TIMER = "delay"; + + @TimerId(DELAY_TIMER) + private final TimerSpec delayTimerSpec = + TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process( + @Timestamp Instant timestamp, + BoundedWindow window, + OutputReceiver out, + @TimerId(DELAY_TIMER) Timer delayTimer) { + out.output(42); + + // wait a little bit while stopping downstream from firing the window + delayTimer.set(window.maxTimestamp().minus(Duration.millis(10))); + } + + @OnTimer(DELAY_TIMER) + public void onDelay(@Timestamp Instant timestamp) + throws InterruptedException { + // noop, just here to force the pipeline to sleep so downstream GBK will + // trigger + Thread.sleep(waitMillis); + } + })) + .apply( + Window.configure() + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.millis(triggerMillis)))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.millis(0))) + .apply("Triggered sum", Sum.integersGlobally().withoutDefaults()) + .apply("Second Triggered sum", Sum.integersGlobally().withoutDefaults()); + + PAssert.that(triggeredSums).inEarlyGlobalWindowPanes().containsInAnyOrder(42); + + p.run(); + } + @Test public void testGroupByKeyNonDeterministic() throws Exception {