Skip to content

Commit

Permalink
Add support for global sequence processing to the "ordered" extension…
Browse files Browse the repository at this point in the history
… in Java SDK (#32540)

* Initial changes to support processing global sequences.

* Refactor the DoFns out of the transform and into a class hierarchy.

* Next round of implementation of Global Sequence handling.

* Added ticker timers in global sequence processing.

* Corrected the emission batch logic.

* Reworked some tests and fixed the batch output logic.

* Pluggable combiner for the global sequence.

* First iteration of the efficient merging accumulator

* Mostly complete implementation of the accumulator and corresponding tests.

* Additional round of test refinements.

* Added logic to DQL the records below the global sequence range.

* Added providing a global sequence combiner through a handler.

* Added SequenceRangeAccumulatorCoder and tests. Improved logic of creating timers.

* Fixed logging levels (moved them to "trace") on several transforms.

* Round of code improvements and cleanups.

* Tests to verify that the the global sequence is correctly produced by the transform.

* Added batch processing verification to the global sequence processing.

* A round of documentation update and minor clean up.

* Fixed the description in CHANGES.md

* Polish by "spotless"

* Polish by "spotless"

* Removed unneeded logging configuration file.

* Made ContiguousSequenceRange open ended.

* Removed details from 2.60.0 section in CHANGES.md.

* Update sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/DefaultSequenceCombiner.java

Co-authored-by: Danny McCormick <[email protected]>

* Fixed spotless related errors.

* Added a note about the new functionality to CHANGES.md

* Added clarification around the data structure used in the sequence combiner.

* Added clarification around the data structure used in the sequence combiner.

* Fixed the problem with allowed lateness being set to 0 in the global sequence tracker.

* Parameterized the GlobalSequenceTracker with the max number of events to trigger the re-evaluation. Fixed accidentally disabled unit tests.

* Made the event timer used to wait for the event arrival respect the lateness of the input.

* Created new failure reason code - "before initial sequence"

---------

Co-authored-by: Danny McCormick <[email protected]>
  • Loading branch information
slilichenko and damccorm authored Oct 9, 2024
1 parent 7177baf commit 20d0f6e
Show file tree
Hide file tree
Showing 25 changed files with 3,639 additions and 831 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added support for processing events which use a global sequence to "ordered" extension (Java) [#32540](https://github.com/apache/beam/pull/32540)

## Breaking Changes

Expand Down
6 changes: 6 additions & 0 deletions sdks/java/extensions/ordered/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation library.java.slf4j_jdk14
testImplementation project(path: ':sdks:java:core')
testImplementation 'junit:junit:4.13.1'
testImplementation project(path: ':runners:google-cloud-dataflow-java')
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(path: ":runners:google-cloud-dataflow-java")
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core")
testImplementation project(path: ":sdks:java:io:google-cloud-platform")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

/** A range of contiguous event sequences and the latest timestamp of the events in the range. */
@AutoValue
public abstract class ContiguousSequenceRange {
public static final ContiguousSequenceRange EMPTY =
ContiguousSequenceRange.of(
Long.MIN_VALUE, Long.MIN_VALUE, Instant.ofEpochMilli(Long.MIN_VALUE));

/** @return inclusive starting sequence */
public abstract long getStart();

/** @return exclusive end sequence */
public abstract long getEnd();

/** @return latest timestamp of all events in the range */
public abstract Instant getTimestamp();

public static ContiguousSequenceRange of(long start, long end, Instant timestamp) {
return new AutoValue_ContiguousSequenceRange(start, end, timestamp);
}

static class CompletedSequenceRangeCoder extends CustomCoder<ContiguousSequenceRange> {

private static final CompletedSequenceRangeCoder INSTANCE = new CompletedSequenceRangeCoder();

static CompletedSequenceRangeCoder of() {
return INSTANCE;
}

private CompletedSequenceRangeCoder() {}

@Override
public void encode(
ContiguousSequenceRange value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
VarLongCoder.of().encode(value.getStart(), outStream);
VarLongCoder.of().encode(value.getEnd(), outStream);
InstantCoder.of().encode(value.getTimestamp(), outStream);
}

@Override
public ContiguousSequenceRange decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
long start = VarLongCoder.of().decode(inStream);
long end = VarLongCoder.of().decode(inStream);
Instant timestamp = InstantCoder.of().decode(inStream);
return ContiguousSequenceRange.of(start, end, timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
extends Serializable {

/**
* Is this event the first expected event for the given key and window?
* Is this event the first expected event for the given key and window if the per key sequence is
* used? In case of global sequence it determines the first global sequence event.
*
* @param sequenceNumber the sequence number of the event as defined by the key of the input
* PCollection to {@link OrderedEventProcessor}
Expand All @@ -41,8 +42,8 @@ public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
boolean isInitialEvent(long sequenceNumber, EventT event);

/**
* If the event was the first event in the sequence, create the state to hold the required data
* needed for processing. This data will be persisted.
* If the event was the first event for a given key, create the state to hold the required data
* needed for processing. This data will be persisted in a Beam state.
*
* @param event the first event in the sequence.
* @return the state to persist.
Expand All @@ -53,6 +54,8 @@ public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
/**
* Is this event the last expected event for a given key and window?
*
* <p>Note, this method is not used yet with global sequences.
*
* @param sequenceNumber of the event
* @param event being processed
* @return true if the last event. There are cases where it's impossible to know whether it's the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import org.apache.beam.sdk.extensions.ordered.ContiguousSequenceRange.CompletedSequenceRangeCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* PTransform to produce the side input of the maximum contiguous range of sequence numbers.
*
* @param <EventKeyT> type of event key
* @param <EventT> type of event
* @param <ResultT> type of processing result
* @param <StateT> type of state
*/
class GlobalSequenceTracker<
EventKeyT, EventT, ResultT, StateT extends MutableState<EventT, ResultT>>
extends PTransform<
PCollection<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>>,
PCollectionView<ContiguousSequenceRange>> {

private final Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer;
private final @Nullable Duration frequencyOfGeneration;
private final int maxElementsBeforeReevaluatingGlobalSequence;

/**
* Constructor used in batch pipelines.
*
* @param sideInputProducer
*/
public GlobalSequenceTracker(
Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer) {
this.sideInputProducer = sideInputProducer;
this.frequencyOfGeneration = null;
this.maxElementsBeforeReevaluatingGlobalSequence = 0;
}

public GlobalSequenceTracker(
Combine.GloballyAsSingletonView<
TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ContiguousSequenceRange>
sideInputProducer,
Duration globalSequenceGenerationFrequency,
int maxElementsBeforeReevaluatingGlobalSequence) {
this.sideInputProducer = sideInputProducer;
this.frequencyOfGeneration = globalSequenceGenerationFrequency;
this.maxElementsBeforeReevaluatingGlobalSequence = maxElementsBeforeReevaluatingGlobalSequence;
}

@Override
public PCollectionView<ContiguousSequenceRange> expand(
PCollection<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>> input) {
input
.getPipeline()
.getCoderRegistry()
.registerCoderForClass(ContiguousSequenceRange.class, CompletedSequenceRangeCoder.of());

if (frequencyOfGeneration != null) {
// This branch will only be executed in case of streaming pipelines.
// For batch pipelines the side input should only be computed once.
input =
input.apply(
"Triggering Setup",
// Reproduce the windowing of the input PCollection, but change the triggering
// in order to create a slowing changing side input
Window.<TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>>into(
(WindowFn<? super TimestampedValue<KV<EventKeyT, KV<Long, EventT>>>, ?>)
input.getWindowingStrategy().getWindowFn())
.accumulatingFiredPanes()
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(
maxElementsBeforeReevaluatingGlobalSequence),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(frequencyOfGeneration)))));
}
return input.apply("Create Side Input", sideInputProducer);
}
}
Loading

0 comments on commit 20d0f6e

Please sign in to comment.