Skip to content

Commit

Permalink
Add a StatefulDoFn test that sets event time timer within allowed lat…
Browse files Browse the repository at this point in the history
…eness (#16922)
  • Loading branch information
y1chi authored Mar 10, 2022
1 parent 0d454e8 commit 03af085
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
4 changes: 3 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ task validatesRunnerV2Streaming {

'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerAlignAfterGcTimeUnbounded',

// TODO: remove once dataflow runner v2 fixed fake windowing strategy.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',

'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics',

'org.apache.beam.sdk.transforms.WaitTest.testWaitWithSameFixedWindows',
Expand Down Expand Up @@ -568,7 +571,6 @@ task validatesRunnerV2Streaming {
// TODO(BEAM-13525)
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnWindowTimestampSkew',

// TODO(BEAM-13952)
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4526,6 +4526,54 @@ public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instan
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
public void testEventTimeTimerSetWithinAllowedLateness() throws Exception {
final String timerId = "foo";

DoFn<KV<String, Long>, KV<Long, Instant>> fn =
new DoFn<KV<String, Long>, KV<Long, Instant>>() {

@TimerId(timerId)
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void processElement(
@TimerId(timerId) Timer timer,
@Timestamp Instant timestamp,
OutputReceiver<KV<Long, Instant>> r) {
timer.set(timestamp.plus(Duration.standardSeconds(10)));
r.output(KV.of(3L, timestamp));
}

@OnTimer(timerId)
public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) {
r.output(KV.of(42L, timestamp));
}
};

TestStream<KV<String, Long>> stream =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))
.advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(5)))
.addElements(KV.of("hello", 37L))
.advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
.advanceWatermarkToInfinity();

PCollection<KV<Long, Instant>> output =
pipeline
.apply(stream)
.apply(
Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardSeconds(10)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(10)))
.apply(ParDo.of(fn));
PAssert.that(output)
.containsInAnyOrder(
KV.of(3L, new Instant(0).plus(Duration.standardSeconds(5))),
KV.of(42L, new Instant(0).plus(Duration.standardSeconds(15))));
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
public void testEventTimeTimerAlignAfterGcTimeUnbounded() throws Exception {
Expand Down

0 comments on commit 03af085

Please sign in to comment.