Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for processing time continuation trigger #16840

Merged
merged 1 commit into from
Feb 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,25 @@
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.LargeKeys;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down Expand Up @@ -90,7 +98,8 @@
/** Tests for GroupByKey. */
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"unchecked"
"unchecked",
"unused"
})
@RunWith(Enclosed.class)
public class GroupByKeyTest implements Serializable {
Expand All @@ -103,7 +112,7 @@ public abstract static class SharedTestBase {

/** Tests validating basic {@link GroupByKey} scenarios. */
@RunWith(JUnit4.class)
public static class BasicTests extends SharedTestBase {
public static class BasicTests extends SharedTestBase implements Serializable {
@Test
@Category(ValidatesRunner.class)
public void testGroupByKey() {
Expand Down Expand Up @@ -186,6 +195,109 @@ public void testCombiningAccumulatingProcessingTime() throws Exception {
p.run();
}

/**
* Tests that data from a processing time trigger flows through subsequent GroupByKey
* transforms. To test this with TestStream, we check that it arrives in an early pane,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where you check it is in the early pane in the test. If it's there maybe a comment would help.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact I had pushed this with it commented out. The PAssert itself only inspects early panes now.

* demonstrating that the watermark did not cause the output.
*/
@Test
@Category({ValidatesRunner.class, UsesTestStreamWithProcessingTime.class})
public void testAfterProcessingTimeContinuationTriggerEarly() throws Exception {
final long triggerMillis = 1; // Setting of the processing time trigger
final long advanceMillis = 10; // How far we advance time for the first GBK to trigger
final long waitMillis = 500; // How far we advance time for the second GBK to trigger

PCollection<Integer> triggeredSums =
p.apply(
TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(new Instant(0))
.addElements(42)
.advanceProcessingTime(Duration.millis(advanceMillis))
.advanceProcessingTime(Duration.millis(waitMillis))
.advanceWatermarkToInfinity())
.apply(
Window.<Integer>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.millis(triggerMillis))))
.accumulatingFiredPanes()
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withAllowedLateness(Duration.millis(0)))
.apply("Triggered sum", Sum.integersGlobally().withoutDefaults())
.apply("Second Triggered sum", Sum.integersGlobally().withoutDefaults());

PAssert.that(triggeredSums).inEarlyGlobalWindowPanes().containsInAnyOrder(42);

p.run();
}

/**
* Tests that data from a processing time trigger flows through subsequent GroupByKey
* transforms. This version does not depend on {@link TestStream} with processing time, because
* many runners do not support it.
*
* <p>Since a delay in processing time is fundamental to this test, it unfortunately must have a
* real-time sleep. Currently the sleep is sub-second so it is acceptable delay for a unit test
* suite.
*/
@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesUnboundedPCollections.class})
public void testAfterProcessingTimeContinuationTriggerUsingState() throws Exception {
final long triggerMillis = 1;
final long waitMillis = 500;

PCollection<Integer> triggeredSums =
p.apply(
GenerateSequence.from(0)
.to(1)) // forces unbounded so delay cannot be fast-forwarded
.apply(WithKeys.of("dummy key"))
.apply(
"output then delay",
ParDo.of(
new DoFn<KV<String, Long>, Integer>() {
private static final String DELAY_TIMER = "delay";

@TimerId(DELAY_TIMER)
private final TimerSpec delayTimerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void process(
@Timestamp Instant timestamp,
BoundedWindow window,
OutputReceiver<Integer> out,
@TimerId(DELAY_TIMER) Timer delayTimer) {
out.output(42);

// wait a little bit while stopping downstream from firing the window
delayTimer.set(window.maxTimestamp().minus(Duration.millis(10)));
}

@OnTimer(DELAY_TIMER)
public void onDelay(@Timestamp Instant timestamp)
throws InterruptedException {
// noop, just here to force the pipeline to sleep so downstream GBK will
// trigger
Thread.sleep(waitMillis);
}
}))
.apply(
Window.<Integer>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.millis(triggerMillis))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.millis(0)))
.apply("Triggered sum", Sum.integersGlobally().withoutDefaults())
.apply("Second Triggered sum", Sum.integersGlobally().withoutDefaults());

PAssert.that(triggeredSums).inEarlyGlobalWindowPanes().containsInAnyOrder(42);

p.run();
}

@Test
public void testGroupByKeyNonDeterministic() throws Exception {

Expand Down