From 7966b759a15a2ea266e6db0d3f884c835d0bf628 Mon Sep 17 00:00:00 2001 From: "wtanaka.com" Date: Sat, 7 Oct 2017 08:45:45 -1000 Subject: [PATCH] [BEAM-3035] Introduces Reify transform Initially contains stuff from ReifyTimestamps, for reifying/extracting timestamps and windows. --- .../examples/complete/AutoCompleteTest.java | 20 +- .../beam/sdk/testing/GatherAllPanes.java | 15 +- .../org/apache/beam/sdk/transforms/Reify.java | 192 ++++++++++++++++ .../beam/sdk/transforms/ReifyTimestamps.java | 55 +++-- .../apache/beam/sdk/transforms/Reshuffle.java | 2 +- .../apache/beam/sdk/transforms/ReifyTest.java | 212 ++++++++++++++++++ .../beam/sdk/transforms/ReshuffleTest.java | 4 +- .../sdk/transforms/SplittableDoFnTest.java | 9 +- .../apache/beam/sdk/transforms/WatchTest.java | 2 +- 9 files changed, 447 insertions(+), 64 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java index ef57da48c3e6..900d96667cfd 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java @@ -27,10 +27,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -128,9 +125,7 @@ public void testWindowedAutoComplete() { TimestampedValue.of("xB", new Instant(2)), TimestampedValue.of("xB", new Instant(2))); - PCollection input = p - .apply(Create.of(words)) - .apply(new ReifyTimestamps()); + PCollection input = p.apply(Create.timestamped(words)); PCollection>> output = input.apply(Window.into(SlidingWindows.of(new Duration(2)))) @@ -161,17 +156,4 @@ private static List parseList(String... entries) { } return all; } - - private static class ReifyTimestamps - extends PTransform>, PCollection> { - @Override - public PCollection expand(PCollection> input) { - return input.apply(ParDo.of(new DoFn, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); - } - })); - } - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java index 6b24d95d1791..979e979d3e62 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java @@ -17,13 +17,11 @@ */ package org.apache.beam.sdk.testing; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reify; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -60,10 +58,7 @@ public PCollection>> expand(PCollection input WindowFn originalWindowFn = input.getWindowingStrategy().getWindowFn(); return input - .apply(ParDo.of(new ReifyTimestampsAndWindowsFn())) - .setCoder( - ValueInSingleWindow.Coder.of( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) + .apply(Reify.windows()) .apply( WithKeys.>of(0) .withKeyType(new TypeDescriptor() {})) @@ -80,10 +75,4 @@ public PCollection>> expand(PCollection input .setWindowingStrategyInternal(input.getWindowingStrategy()); } - private static class ReifyTimestampsAndWindowsFn extends DoFn> { - @DoFn.ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - c.output(ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane())); - } - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java new file mode 100644 index 000000000000..caa89e6e55e6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.joda.time.Duration; + +/** {@link PTransform PTransforms} for reifying the timestamp, window and pane of values. */ +public class Reify { + /** Private implementation of {@link #windows()}. */ + private static class Window + extends PTransform, PCollection>> { + @Override + public PCollection> expand(PCollection input) { + return input + .apply( + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.outputWithTimestamp( + ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()), + c.timestamp()); + } + })) + .setCoder( + ValueInSingleWindow.Coder.of( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())); + } + } + + private static class Timestamp + extends PTransform, PCollection>> { + @Override + public PCollection> expand(PCollection input) { + return input + .apply( + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext context) { + context.output(TimestampedValue.of(context.element(), context.timestamp())); + } + })) + .setCoder(TimestampedValueCoder.of(input.getCoder())); + } + } + + private static class WindowInValue + extends PTransform>, PCollection>>> { + @Override + public PCollection>> expand(PCollection> input) { + KvCoder coder = (KvCoder) input.getCoder(); + return input + .apply( + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.output( + KV.of( + c.element().getKey(), + ValueInSingleWindow.of( + c.element().getValue(), c.timestamp(), window, c.pane()))); + } + })) + .setCoder( + KvCoder.of( + coder.getKeyCoder(), + ValueInSingleWindow.Coder.of( + coder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()))); + } + } + + private static class TimestampInValue + extends PTransform>, PCollection>>> { + @Override + public PCollection>> expand(PCollection> input) { + KvCoder coder = (KvCoder) input.getCoder(); + return input + .apply( + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext context) { + context.output( + KV.of( + context.element().getKey(), + TimestampedValue.of( + context.element().getValue(), context.timestamp()))); + } + })) + .setCoder( + KvCoder.of(coder.getKeyCoder(), TimestampedValueCoder.of(coder.getValueCoder()))); + } + } + + private static class ExtractTimestampsFromValues + extends PTransform>>, PCollection>> { + @Override + public PCollection> expand(PCollection>> input) { + KvCoder> kvCoder = (KvCoder>) input.getCoder(); + TimestampedValueCoder tvCoder = (TimestampedValueCoder) kvCoder.getValueCoder(); + return input + .apply( + ParDo.of( + new DoFn>, KV>() { + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + + @ProcessElement + public void processElement(ProcessContext context) { + KV> kv = context.element(); + context.outputWithTimestamp( + KV.of(kv.getKey(), kv.getValue().getValue()), + kv.getValue().getTimestamp()); + } + })) + .setCoder(KvCoder.of(kvCoder.getKeyCoder(), tvCoder.getValueCoder())); + } + } + + private Reify() {} + + /** + * Create a {@link PTransform} that will output all inputs wrapped in a {@link TimestampedValue}. + */ + public static PTransform, PCollection>> timestamps() { + return new Timestamp<>(); + } + + /** + * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside + * the value. + */ + public static + PTransform>, PCollection>>> + timestampsInValue() { + return new TimestampInValue<>(); + } + + /** + * Create a {@link PTransform} that will reify information from the processing context into + * instances of {@link ValueInSingleWindow}. + * + * @param element type + */ + public static PTransform, PCollection>> windows() { + return new Window<>(); + } + + /** + * Create a {@link PTransform} that will output all input {@link KV KVs} with the window pane info + * inside the value. + */ + public static + PTransform>, PCollection>>> + windowsInValue() { + return new WindowInValue<>(); + } + + public static + PTransform>>, PCollection>> + extractTimestampsFromValues() { + return new ExtractTimestampsFromValues<>(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java index 990f235456f4..583dc386d377 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java @@ -21,59 +21,74 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; /** * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original * value with the original timestamp. + * + * @deprecated Use {@link Reify} */ +@Deprecated class ReifyTimestamps { private ReifyTimestamps() {} /** * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside * the value. + * + * @deprecated Use {@link Reify#timestampsInValue()} */ + @Deprecated public static PTransform>, PCollection>>> inValues() { - return ParDo.of(new ReifyValueTimestampDoFn()); + return new InValues<>(); } /** * Create a {@link PTransform} that consumes {@link KV KVs} with a {@link TimestampedValue} as the * value, and outputs the {@link KV} of the input key and value at the timestamp specified by the * {@link TimestampedValue}. + * + * @deprecated Use {@link Reify#extractTimestampsFromValues()}. */ + @Deprecated public static PTransform>>, PCollection>> extractFromValues() { - return ParDo.of(new ExtractTimestampedValueDoFn()); + return new ExtractTimestampsFromValues<>(); } - private static class ReifyValueTimestampDoFn - extends DoFn, KV>> { - @ProcessElement - public void processElement(ProcessContext context) { - context.output( - KV.of( - context.element().getKey(), - TimestampedValue.of(context.element().getValue(), context.timestamp()))); + private static class RemoveWildcard + extends PTransform, PCollection> { + @Override + public PCollection expand(PCollection input) { + return input.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process(ProcessContext c) { + c.output(c.element()); + } + })); } } - private static class ExtractTimestampedValueDoFn - extends DoFn>, KV> { + private static class InValues + extends PTransform>, PCollection>>> { @Override - public Duration getAllowedTimestampSkew() { - return Duration.millis(Long.MAX_VALUE); + public PCollection>> expand(PCollection> input) { + return input.apply(new RemoveWildcard>()).apply(Reify.timestampsInValue()); } + } - @ProcessElement - public void processElement(ProcessContext context) { - KV> kv = context.element(); - context.outputWithTimestamp( - KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp()); + private static class ExtractTimestampsFromValues + extends PTransform>>, PCollection>> { + @Override + public PCollection> expand(PCollection>> input) { + return input + .apply(new RemoveWildcard>>()) + .apply(Reify.extractTimestampsFromValues()); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 68e4560b2850..8920559f6397 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -84,7 +84,7 @@ public PCollection> expand(PCollection> input) { return input .apply(rewindow) - .apply("ReifyOriginalTimestamps", ReifyTimestamps.inValues()) + .apply("ReifyOriginalTimestamps", Reify.timestampsInValue()) .apply(GroupByKey.>create()) // Set the windowing strategy directly, so that it doesn't get counted as the user having // set allowed lateness. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java new file mode 100644 index 000000000000..9e5ce9f0e930 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.Serializable; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Utility transforms for reifying implicit context into explicit fields. */ +@RunWith(JUnit4.class) +public class ReifyTest implements Serializable { + public static final WithTimestamps> TIMESTAMP_FROM_V = + WithTimestamps.of( + new SerializableFunction, Instant>() { + @Override + public Instant apply(KV input) { + return new Instant(input.getValue().longValue()); + } + }); + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void extractFromValuesSucceeds() { + PCollection>> preified = + pipeline.apply( + Create.of( + KV.of("foo", TimestampedValue.of(0, new Instant((0)))), + KV.of("foo", TimestampedValue.of(1, new Instant(1))), + KV.of("bar", TimestampedValue.of(2, new Instant(2))), + KV.of("baz", TimestampedValue.of(3, new Instant(3))))); + + PCollection> timestamped = + preified.apply(Reify.extractTimestampsFromValues()); + + PAssert.that(timestamped) + .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)); + + timestamped.apply( + "AssertElementTimestamps", + ParDo.of( + new DoFn, Void>() { + @ProcessElement + public void verifyTimestampsEqualValue(ProcessContext context) { + assertThat( + new Instant(context.element().getValue().longValue()), + equalTo(context.timestamp())); + } + })); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void extractFromValuesWhenValueTimestampedLaterSucceeds() { + PCollection>> preified = + pipeline.apply( + Create.timestamped( + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(0, new Instant((0)))), new Instant(100)), + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(1, new Instant(1))), new Instant(101L)), + TimestampedValue.of( + KV.of("bar", TimestampedValue.of(2, new Instant(2))), new Instant(102L)), + TimestampedValue.of( + KV.of("baz", TimestampedValue.of(3, new Instant(3))), new Instant(103L)))); + + PCollection> timestamped = + preified.apply(ReifyTimestamps.extractFromValues()); + + PAssert.that(timestamped) + .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)); + + timestamped.apply( + "AssertElementTimestamps", + ParDo.of( + new DoFn, Void>() { + @ProcessElement + public void verifyTimestampsEqualValue(ProcessContext context) { + assertThat( + new Instant(context.element().getValue().longValue()), + equalTo(context.timestamp())); + } + })); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void globalWindowNoKeys() { + PCollection> result = + pipeline + .apply( + TestStream.create(StringUtf8Coder.of()) + .addElements(TimestampedValue.of("dei", new Instant(123L))) + .advanceWatermarkToInfinity()) + .apply(Reify.windows()); + PAssert.that(result) + .containsInAnyOrder( + ValueInSingleWindow.of( + "dei", new Instant(123L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void timestampedValuesSucceeds() { + PCollection> timestamped = + pipeline + .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3))) + .apply(TIMESTAMP_FROM_V); + + PCollection>> reified = + timestamped.apply(Reify.timestampsInValue()); + + PAssert.that(reified) + .containsInAnyOrder( + KV.of("foo", TimestampedValue.of(0, new Instant(0))), + KV.of("foo", TimestampedValue.of(1, new Instant(1))), + KV.of("bar", TimestampedValue.of(2, new Instant(2))), + KV.of("baz", TimestampedValue.of(3, new Instant(3)))); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void timestampsSucceeds() { + PCollection timestamped = + pipeline.apply( + Create.timestamped( + TimestampedValue.of("foo", new Instant(0L)), + TimestampedValue.of("bar", new Instant(1L)))); + + PCollection> reified = timestamped.apply(Reify.timestamps()); + + PAssert.that(reified) + .containsInAnyOrder( + TimestampedValue.of("foo", new Instant(0)), TimestampedValue.of("bar", new Instant(1))); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void windowsInValueSucceeds() { + PCollection> timestamped = + pipeline + .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3))) + .apply(TIMESTAMP_FROM_V); + + PCollection>> reified = + timestamped.apply(Reify.windowsInValue()); + + PAssert.that(reified) + .containsInAnyOrder( + KV.of( + "foo", + ValueInSingleWindow.of( + 0, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)), + KV.of( + "foo", + ValueInSingleWindow.of( + 1, new Instant(1), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)), + KV.of( + "bar", + ValueInSingleWindow.of( + 2, new Instant(2), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)), + KV.of( + "baz", + ValueInSingleWindow.of( + 3, new Instant(3), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); + + pipeline.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 0eb8e2d87a58..12eddf2e5f64 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -131,7 +131,7 @@ public String apply(String input) { return input; } })) - .apply("ReifyOriginalTimestamps", ReifyTimestamps.inValues()); + .apply("ReifyOriginalTimestamps", Reify.timestampsInValue()); // The outer TimestampedValue is the reified timestamp post-reshuffle. The inner // TimestampedValue is the pre-reshuffle timestamp. @@ -140,7 +140,7 @@ public String apply(String input) { .apply(Reshuffle.>of()) .apply( "ReifyReshuffledTimestamps", - ReifyTimestamps.>inValues()) + Reify.>timestampsInValue()) .apply(Values.>>create()); PAssert.that(output) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index d2d2529871f5..f70dfbbdb608 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -98,13 +98,6 @@ public void splitRange( } } - private static class ReifyTimestampsFn extends DoFn> { - @ProcessElement - public void process(ProcessContext c) { - c.output(TimestampedValue.of(c.element(), c.timestamp())); - } - } - private static PipelineOptions streamingTestPipelineOptions() { // Using testing options with streaming=true makes it possible to enable UsesSplittableParDo // tests in Dataflow runner, because as of writing, it can run Splittable DoFn only in @@ -176,7 +169,7 @@ public void testPairWithIndexWindowedTimestamped() { assertEquals(windowFn, res.getWindowingStrategy().getWindowFn()); PCollection>> timestamped = - res.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn>())); + res.apply(Reify.>timestamps()); for (int i = 0; i < 4; ++i) { Instant base = now.minus(Duration.standardSeconds(i)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java index 113e8fe1a5fc..89043766cdae 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java @@ -272,7 +272,7 @@ public void testMultiplePollsWithManyResults() { standardSeconds(30) /* timeToFail */)) .withPollInterval(Duration.millis(500)) .withOutputCoder(VarIntCoder.of())) - .apply(ReifyTimestamps.inValues()) + .apply(Reify.timestampsInValue()) .apply("Drop timestamped input", Values.>create()); PAssert.that(res)