Skip to content

Commit

Permalink
This closes #4057: [BEAM-3035] Extract ReifyTimestampsAndWindows
Browse files Browse the repository at this point in the history
  • Loading branch information
jkff committed Nov 16, 2017
2 parents 40100db + 7966b75 commit 0df7ba9
Show file tree
Hide file tree
Showing 9 changed files with 447 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand Down Expand Up @@ -128,9 +125,7 @@ public void testWindowedAutoComplete() {
TimestampedValue.of("xB", new Instant(2)),
TimestampedValue.of("xB", new Instant(2)));

PCollection<String> input = p
.apply(Create.of(words))
.apply(new ReifyTimestamps<String>());
PCollection<String> input = p.apply(Create.timestamped(words));

PCollection<KV<String, List<CompletionCandidate>>> output =
input.apply(Window.<String>into(SlidingWindows.of(new Duration(2))))
Expand Down Expand Up @@ -161,17 +156,4 @@ private static List<CompletionCandidate> parseList(String... entries) {
}
return all;
}

private static class ReifyTimestamps<T>
extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<TimestampedValue<T>> input) {
return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
}
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
*/
package org.apache.beam.sdk.testing;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
Expand Down Expand Up @@ -60,10 +58,7 @@ public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();

return input
.apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
.setCoder(
ValueInSingleWindow.Coder.of(
input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()))
.apply(Reify.<T>windows())
.apply(
WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
.withKeyType(new TypeDescriptor<Integer>() {}))
Expand All @@ -80,10 +75,4 @@ public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input
.setWindowingStrategyInternal(input.getWindowingStrategy());
}

private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, ValueInSingleWindow<T>> {
@DoFn.ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
c.output(ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()));
}
}
}
192 changes: 192 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;

/** {@link PTransform PTransforms} for reifying the timestamp, window and pane of values. */
public class Reify {
/** Private implementation of {@link #windows()}. */
private static class Window<T>
extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> {
@Override
public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) {
return input
.apply(
ParDo.of(
new DoFn<T, ValueInSingleWindow<T>>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
c.outputWithTimestamp(
ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()),
c.timestamp());
}
}))
.setCoder(
ValueInSingleWindow.Coder.of(
input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()));
}
}

private static class Timestamp<T>
extends PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> {
@Override
public PCollection<TimestampedValue<T>> expand(PCollection<T> input) {
return input
.apply(
ParDo.of(
new DoFn<T, TimestampedValue<T>>() {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(TimestampedValue.of(context.element(), context.timestamp()));
}
}))
.setCoder(TimestampedValueCoder.of(input.getCoder()));
}
}

private static class WindowInValue<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>> {
@Override
public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> input) {
KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();
return input
.apply(
ParDo.of(
new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
c.output(
KV.of(
c.element().getKey(),
ValueInSingleWindow.of(
c.element().getValue(), c.timestamp(), window, c.pane())));
}
}))
.setCoder(
KvCoder.of(
coder.getKeyCoder(),
ValueInSingleWindow.Coder.of(
coder.getValueCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder())));
}
}

private static class TimestampInValue<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> {
@Override
public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<KV<K, V>> input) {
KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();
return input
.apply(
ParDo.of(
new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(
KV.of(
context.element().getKey(),
TimestampedValue.of(
context.element().getValue(), context.timestamp())));
}
}))
.setCoder(
KvCoder.of(coder.getKeyCoder(), TimestampedValueCoder.of(coder.getValueCoder())));
}
}

private static class ExtractTimestampsFromValues<K, V>
extends PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> {
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, TimestampedValue<V>>> input) {
KvCoder<K, TimestampedValue<V>> kvCoder = (KvCoder<K, TimestampedValue<V>>) input.getCoder();
TimestampedValueCoder<V> tvCoder = (TimestampedValueCoder<V>) kvCoder.getValueCoder();
return input
.apply(
ParDo.of(
new DoFn<KV<K, TimestampedValue<V>>, KV<K, V>>() {
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
}

@ProcessElement
public void processElement(ProcessContext context) {
KV<K, TimestampedValue<V>> kv = context.element();
context.outputWithTimestamp(
KV.of(kv.getKey(), kv.getValue().getValue()),
kv.getValue().getTimestamp());
}
}))
.setCoder(KvCoder.of(kvCoder.getKeyCoder(), tvCoder.getValueCoder()));
}
}

private Reify() {}

/**
* Create a {@link PTransform} that will output all inputs wrapped in a {@link TimestampedValue}.
*/
public static <T> PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> timestamps() {
return new Timestamp<>();
}

/**
* Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside
* the value.
*/
public static <K, V>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>>
timestampsInValue() {
return new TimestampInValue<>();
}

/**
* Create a {@link PTransform} that will reify information from the processing context into
* instances of {@link ValueInSingleWindow}.
*
* @param <T> element type
*/
public static <T> PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> windows() {
return new Window<>();
}

/**
* Create a {@link PTransform} that will output all input {@link KV KVs} with the window pane info
* inside the value.
*/
public static <K, V>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>>
windowsInValue() {
return new WindowInValue<>();
}

public static <K, V>
PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>
extractTimestampsFromValues() {
return new ExtractTimestampsFromValues<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,74 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;

/**
* {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original
* value with the original timestamp.
*
* @deprecated Use {@link Reify}
*/
@Deprecated
class ReifyTimestamps {
private ReifyTimestamps() {}

/**
* Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside
* the value.
*
* @deprecated Use {@link Reify#timestampsInValue()}
*/
@Deprecated
public static <K, V>
PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>>
inValues() {
return ParDo.of(new ReifyValueTimestampDoFn<K, V>());
return new InValues<>();
}

/**
* Create a {@link PTransform} that consumes {@link KV KVs} with a {@link TimestampedValue} as the
* value, and outputs the {@link KV} of the input key and value at the timestamp specified by the
* {@link TimestampedValue}.
*
* @deprecated Use {@link Reify#extractTimestampsFromValues()}.
*/
@Deprecated
public static <K, V>
PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>
extractFromValues() {
return ParDo.of(new ExtractTimestampedValueDoFn<K, V>());
return new ExtractTimestampsFromValues<>();
}

private static class ReifyValueTimestampDoFn<K, V>
extends DoFn<KV<K, V>, KV<K, TimestampedValue<V>>> {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(
KV.of(
context.element().getKey(),
TimestampedValue.of(context.element().getValue(), context.timestamp())));
private static class RemoveWildcard<T>
extends PTransform<PCollection<? extends T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<? extends T> input) {
return input.apply(
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element());
}
}));
}
}

private static class ExtractTimestampedValueDoFn<K, V>
extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
private static class InValues<K, V>
extends PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> {
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<? extends KV<K, V>> input) {
return input.apply(new RemoveWildcard<KV<K, V>>()).apply(Reify.<K, V>timestampsInValue());
}
}

@ProcessElement
public void processElement(ProcessContext context) {
KV<K, TimestampedValue<V>> kv = context.element();
context.outputWithTimestamp(
KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp());
private static class ExtractTimestampsFromValues<K, V>
extends PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> {
@Override
public PCollection<KV<K, V>> expand(PCollection<? extends KV<K, TimestampedValue<V>>> input) {
return input
.apply(new RemoveWildcard<KV<K, TimestampedValue<V>>>())
.apply(Reify.<K, V>extractTimestampsFromValues());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {

return input
.apply(rewindow)
.apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
.apply("ReifyOriginalTimestamps", Reify.<K, V>timestampsInValue())
.apply(GroupByKey.<K, TimestampedValue<V>>create())
// Set the windowing strategy directly, so that it doesn't get counted as the user having
// set allowed lateness.
Expand Down
Loading

0 comments on commit 0df7ba9

Please sign in to comment.