From 1e76244bf359e33011a87bdac2afc2de8ce2a2a4 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 26 Feb 2016 17:28:37 -0800 Subject: [PATCH] Implement InProcessEvaluationContext This is the primary "global state" object for the evaluation of a Pipeline using the InProcessPipelineRunner, and is responsible for properly routing information about the state of the pipeline to transform evaluators. Remove the InProcessEvaluationContext from the InProcessPipelineRunner class, and implement as a class directly. Fix associated imports. --- .../BoundedReadEvaluatorFactory.java | 1 - .../sdk/runners/inprocess/EvaluatorKey.java | 1 - .../inprocess/FlattenEvaluatorFactory.java | 1 - .../inprocess/GroupByKeyEvaluatorFactory.java | 1 - .../inprocess/InMemoryWatermarkManager.java | 14 +- .../runners/inprocess/InProcessBundle.java | 20 +- .../inprocess/InProcessEvaluationContext.java | 364 +++++++++++++++ .../inprocess/InProcessPipelineOptions.java | 7 +- .../inprocess/InProcessPipelineRunner.java | 106 +---- .../InProcessSideInputContainer.java | 71 ++- .../inprocess/ParDoMultiEvaluatorFactory.java | 1 - .../ParDoSingleEvaluatorFactory.java | 1 - .../sdk/runners/inprocess/StepAndKey.java | 68 +++ .../inprocess/TransformEvaluatorFactory.java | 1 - .../inprocess/TransformEvaluatorRegistry.java | 72 +++ .../UnboundedReadEvaluatorFactory.java | 1 - .../inprocess/ViewEvaluatorFactory.java | 1 - .../inprocess/WatermarkCallbackExecutor.java | 143 ++++++ .../BoundedReadEvaluatorFactoryTest.java | 2 +- .../FlattenEvaluatorFactoryTest.java | 1 - .../GroupByKeyEvaluatorFactoryTest.java | 1 - .../InMemoryWatermarkManagerTest.java | 12 + .../InProcessEvaluationContextTest.java | 436 ++++++++++++++++++ .../InProcessSideInputContainerTest.java | 92 ++-- .../ParDoMultiEvaluatorFactoryTest.java | 1 - .../ParDoSingleEvaluatorFactoryTest.java | 1 - .../UnboundedReadEvaluatorFactoryTest.java | 1 - .../inprocess/ViewEvaluatorFactoryTest.java | 1 - .../WatermarkCallbackExecutorTest.java | 126 +++++ 29 files changed, 1372 insertions(+), 176 deletions(-) create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java create mode 100644 sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java create mode 100644 sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index 1c0279897aac..2a164c3518d8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -18,7 +18,6 @@ import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java index 745f8f2718a3..307bc5cdb552 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java @@ -15,7 +15,6 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import java.util.Objects; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java index 14428888e2b5..bde1df45e9b4 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java @@ -16,7 +16,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Flatten; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java index 0347281749cb..ec63be84c9f1 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java @@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index e280e22d2bb9..7cf53aafe6d2 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -1209,8 +1209,11 @@ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) { * and deletedTimers. */ public TimerUpdate build() { - return new TimerUpdate(key, ImmutableSet.copyOf(completedTimers), - ImmutableSet.copyOf(setTimers), ImmutableSet.copyOf(deletedTimers)); + return new TimerUpdate( + key, + ImmutableSet.copyOf(completedTimers), + ImmutableSet.copyOf(setTimers), + ImmutableSet.copyOf(deletedTimers)); } } @@ -1245,6 +1248,13 @@ Iterable getDeletedTimers() { return deletedTimers; } + /** + * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers. + */ + public TimerUpdate withCompletedTimers(Iterable completedTimers) { + return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers); + } + @Override public int hashCode() { return Objects.hash(key, completedTimers, setTimers, deletedTimers); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java index cc20161097e8..112ba17d1442 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Google Inc. + * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.MoreObjects; -import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; import org.joda.time.Instant; @@ -64,6 +63,11 @@ private InProcessBundle(PCollection pcollection, boolean keyed, Object key) { this.elements = ImmutableList.builder(); } + @Override + public PCollection getPCollection() { + return pcollection; + } + @Override public InProcessBundle add(WindowedValue element) { checkState(!committed, "Can't add element %s to committed bundle %s", element, this); @@ -105,12 +109,12 @@ public Instant getSynchronizedProcessingOutputWatermark() { @Override public String toString() { - ToStringHelper toStringHelper = - MoreObjects.toStringHelper(this).add("pcollection", pcollection); - if (keyed) { - toStringHelper = toStringHelper.add("key", key); - } - return toStringHelper.add("elements", elements).toString(); + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); } }; } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java new file mode 100644 index 000000000000..757e9e11d9f9 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -0,0 +1,364 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; +import com.google.cloud.dataflow.sdk.util.ExecutionContext; +import com.google.cloud.dataflow.sdk.util.SideInputReader; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; + +/** + * The evaluation context for a specific pipeline being executed by the + * {@link InProcessPipelineRunner}. Contains state shared within the execution across all + * transforms. + * + *

{@link InProcessEvaluationContext} contains shared state for an execution of the + * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This + * consists of views into underlying state and watermark implementations, access to read and write + * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and + * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when + * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and + * known to be empty). + * + *

{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based + * on the current global state and updating the global state appropriately. This includes updating + * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that + * can be executed. + */ +class InProcessEvaluationContext { + /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */ + private final Map, String> stepNames; + + /** The options that were used to create this {@link Pipeline}. */ + private final InProcessPipelineOptions options; + + /** The current processing time and event time watermarks and timers. */ + private final InMemoryWatermarkManager watermarkManager; + + /** Executes callbacks based on the progression of the watermark. */ + private final WatermarkCallbackExecutor callbackExecutor; + + /** The stateInternals of the world, by applied PTransform and key. */ + private final ConcurrentMap> + applicationStateInternals; + + private final InProcessSideInputContainer sideInputContainer; + + private final CounterSet mergedCounters; + + public static InProcessEvaluationContext create( + InProcessPipelineOptions options, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, + Collection> views) { + return new InProcessEvaluationContext( + options, rootTransforms, valueToConsumers, stepNames, views); + } + + private InProcessEvaluationContext( + InProcessPipelineOptions options, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, + Collection> views) { + this.options = checkNotNull(options); + checkNotNull(rootTransforms); + checkNotNull(valueToConsumers); + checkNotNull(stepNames); + checkNotNull(views); + this.stepNames = stepNames; + + this.watermarkManager = + InMemoryWatermarkManager.create( + NanosOffsetClock.create(), rootTransforms, valueToConsumers); + this.sideInputContainer = InProcessSideInputContainer.create(this, views); + + this.applicationStateInternals = new ConcurrentHashMap<>(); + this.mergedCounters = new CounterSet(); + + this.callbackExecutor = WatermarkCallbackExecutor.create(); + } + + /** + * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided + * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}). + * + *

The result is the output of running the transform contained in the + * {@link InProcessTransformResult} on the contents of the provided bundle. + * + * @param completedBundle the bundle that was processed to produce the result. Potentially + * {@code null} if the transform that produced the result is a root + * transform + * @param completedTimers the timers that were delivered to produce the {@code completedBundle}, + * or an empty iterable if no timers were delivered + * @param result the result of evaluating the input bundle + * @return the committed bundles contained within the handled {@code result} + */ + public synchronized Iterable> handleResult( + @Nullable CommittedBundle completedBundle, + Iterable completedTimers, + InProcessTransformResult result) { + Iterable> committedBundles = + commitBundles(result.getOutputBundles()); + // Update watermarks and timers + watermarkManager.updateWatermarks( + completedBundle, + result.getTransform(), + result.getTimerUpdate().withCompletedTimers(completedTimers), + committedBundles, + result.getWatermarkHold()); + fireAllAvailableCallbacks(); + // Update counters + if (result.getCounters() != null) { + mergedCounters.merge(result.getCounters()); + } + // Update state internals + CopyOnAccessInMemoryStateInternals theirState = result.getState(); + if (theirState != null) { + CopyOnAccessInMemoryStateInternals committedState = theirState.commit(); + StepAndKey stepAndKey = + StepAndKey.of( + result.getTransform(), completedBundle == null ? null : completedBundle.getKey()); + if (!committedState.isEmpty()) { + applicationStateInternals.put(stepAndKey, committedState); + } else { + applicationStateInternals.remove(stepAndKey); + } + } + return committedBundles; + } + + private Iterable> commitBundles( + Iterable> bundles) { + ImmutableList.Builder> completed = ImmutableList.builder(); + for (UncommittedBundle inProgress : bundles) { + AppliedPTransform producing = + inProgress.getPCollection().getProducingTransformInternal(); + TransformWatermarks watermarks = watermarkManager.getWatermarks(producing); + CommittedBundle committed = + inProgress.commit(watermarks.getSynchronizedProcessingOutputTime()); + // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so + // filter them out + if (!Iterables.isEmpty(committed.getElements())) { + completed.add(committed); + } + } + return completed.build(); + } + + private void fireAllAvailableCallbacks() { + for (AppliedPTransform transform : stepNames.keySet()) { + fireAvailableCallbacks(transform); + } + } + + private void fireAvailableCallbacks(AppliedPTransform producingTransform) { + TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform); + callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark()); + } + + /** + * Create a {@link UncommittedBundle} for use by a source. + */ + public UncommittedBundle createRootBundle(PCollection output) { + return InProcessBundle.unkeyed(output); + } + + /** + * Create a {@link UncommittedBundle} whose elements belong to the specified {@link + * PCollection}. + */ + public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { + return input.isKeyed() + ? InProcessBundle.keyed(output, input.getKey()) + : InProcessBundle.unkeyed(output); + } + + /** + * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by + * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. + */ + public UncommittedBundle createKeyedBundle( + CommittedBundle input, Object key, PCollection output) { + return InProcessBundle.keyed(output, key); + } + + /** + * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided + * {@link PCollectionView}. + */ + public PCollectionViewWriter createPCollectionViewWriter( + PCollection> input, final PCollectionView output) { + return new PCollectionViewWriter() { + @Override + public void add(Iterable> values) { + sideInputContainer.write(output, values); + } + }; + } + + /** + * Schedule a callback to be executed after output would be produced for the given window + * if there had been input. + * + *

Output would be produced when the watermark for a {@link PValue} passes the point at + * which the trigger for the specified window (with the specified windowing strategy) must have + * fired from the perspective of that {@link PValue}, as specified by the value of + * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the + * {@link WindowingStrategy}. When the callback has fired, either values will have been produced + * for a key in that window, the window is empty, or all elements in the window are late. The + * callback will be executed regardless of whether values have been produced. + */ + public void scheduleAfterOutputWouldBeProduced( + PValue value, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + AppliedPTransform producing = getProducing(value); + callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable); + + fireAvailableCallbacks(lookupProducing(value)); + } + + private AppliedPTransform getProducing(PValue value) { + if (value.getProducingTransformInternal() != null) { + return value.getProducingTransformInternal(); + } + return lookupProducing(value); + } + + private AppliedPTransform lookupProducing(PValue value) { + for (AppliedPTransform transform : stepNames.keySet()) { + if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) { + return transform; + } + } + return null; + } + + /** + * Get the options used by this {@link Pipeline}. + */ + public InProcessPipelineOptions getPipelineOptions() { + return options; + } + + /** + * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key. + */ + public InProcessExecutionContext getExecutionContext( + AppliedPTransform application, Object key) { + StepAndKey stepAndKey = StepAndKey.of(application, key); + return new InProcessExecutionContext( + options.getClock(), + key, + (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey), + watermarkManager.getWatermarks(application)); + } + + /** + * Get all of the steps used in this {@link Pipeline}. + */ + public Collection> getSteps() { + return stepNames.keySet(); + } + + /** + * Get the Step Name for the provided application. + */ + public String getStepName(AppliedPTransform application) { + return stepNames.get(application); + } + + /** + * Returns a {@link SideInputReader} capable of reading the provided + * {@link PCollectionView PCollectionViews}. + * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to + * read + * @return a {@link SideInputReader} that can read all of the provided + * {@link PCollectionView PCollectionViews} + */ + public SideInputReader createSideInputReader(final List> sideInputs) { + return sideInputContainer.createReaderForViews(sideInputs); + } + + /** + * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent + * of all other {@link CounterSet CounterSets} created by this call. + * + * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in + * all created {@link CounterSet CounterSets} when the transforms that call this method + * complete. + */ + public CounterSet createCounterSet() { + return new CounterSet(); + } + + /** + * Returns all of the counters that have been merged into this context via calls to + * {@link CounterSet#merge(CounterSet)}. + */ + public CounterSet getCounters() { + return mergedCounters; + } + + /** + * Extracts all timers that have been fired and have not already been extracted. + * + *

This is a destructive operation. Timers will only appear in the result of this method once + * for each time they are set. + */ + public Map, Map> extractFiredTimers() { + return watermarkManager.extractFiredTimers(); + } + + /** + * Returns true if all steps are done. + */ + public boolean isDone() { + return watermarkManager.isDone(); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index d659d962f0e5..60c8543a2f20 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -15,10 +15,15 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.options.Default; import com.google.cloud.dataflow.sdk.options.PipelineOptions; /** * Options that can be used to configure the {@link InProcessPipelineRunner}. */ -public interface InProcessPipelineOptions extends PipelineOptions {} +public interface InProcessPipelineOptions extends PipelineOptions { + @Default.InstanceFactory(NanosOffsetClock.Factory.class) + Clock getClock(); + void setClock(Clock clock); +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 124de46b9476..7a268ee5fa62 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -17,31 +17,22 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.annotations.Experimental; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey; import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; -import com.google.cloud.dataflow.sdk.util.ExecutionContext; -import com.google.cloud.dataflow.sdk.util.SideInputReader; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.collect.ImmutableMap; import org.joda.time.Instant; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -82,6 +73,11 @@ public class InProcessPipelineRunner { * @param the type of elements that can be added to this bundle */ public static interface UncommittedBundle { + /** + * Returns the PCollection that the elements of this bundle belong to. + */ + PCollection getPCollection(); + /** * Outputs an element to this bundle. * @@ -110,7 +106,7 @@ public static interface UncommittedBundle { public static interface CommittedBundle { /** - * @return the PCollection that the elements of this bundle belong to + * Returns the PCollection that the elements of this bundle belong to. */ PCollection getPCollection(); @@ -154,84 +150,22 @@ public static interface PCollectionViewWriter { void add(Iterable> values); } - /** - * The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within - * the current evaluation. - */ - public static interface InProcessEvaluationContext { - /** - * Create a {@link UncommittedBundle} for use by a source. - */ - UncommittedBundle createRootBundle(PCollection output); - - /** - * Create a {@link UncommittedBundle} whose elements belong to the specified {@link - * PCollection}. - */ - UncommittedBundle createBundle(CommittedBundle input, PCollection output); - - /** - * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by - * {@link GroupByKeyOnly} {@link PTransform PTransforms}. - */ - UncommittedBundle createKeyedBundle( - CommittedBundle input, Object key, PCollection output); - - /** - * Create a bundle whose elements will be used in a PCollectionView. - */ - PCollectionViewWriter createPCollectionViewWriter( - PCollection> input, PCollectionView output); - - /** - * Get the options used by this {@link Pipeline}. - */ - InProcessPipelineOptions getPipelineOptions(); - - /** - * Get an {@link ExecutionContext} for the provided application. - */ - InProcessExecutionContext getExecutionContext( - AppliedPTransform application, @Nullable Object key); - - /** - * Get the Step Name for the provided application. - */ - String getStepName(AppliedPTransform application); - - /** - * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to - * read - * @return a {@link SideInputReader} that can read all of the provided - * {@link PCollectionView PCollectionViews} - */ - SideInputReader createSideInputReader(List> sideInputs); + //////////////////////////////////////////////////////////////////////////////////////////////// + private final InProcessPipelineOptions options; - /** - * Schedules a callback after the watermark for a {@link PValue} after the trigger for the - * specified window (with the specified windowing strategy) must have fired from the perspective - * of that {@link PValue}, as specified by the value of - * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the - * {@link WindowingStrategy}. - */ - void callAfterOutputMustHaveBeenProduced(PValue value, BoundedWindow window, - WindowingStrategy windowingStrategy, Runnable runnable); + public static InProcessPipelineRunner fromOptions(PipelineOptions options) { + return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class)); + } - /** - * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent - * of all other {@link CounterSet CounterSets} created by this call. - * - * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in - * all created {@link CounterSet CounterSets} when the transforms that call this method - * complete. - */ - CounterSet createCounterSet(); + private InProcessPipelineRunner(InProcessPipelineOptions options) { + this.options = options; + } - /** - * Returns all of the counters that have been merged into this context via calls to - * {@link CounterSet#merge(CounterSet)}. - */ - CounterSet getCounters(); + /** + * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}. + */ + public InProcessPipelineOptions getPipelineOptions() { + return options; } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java index bf9a2e1c53fe..37c9fcfa653f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -17,7 +17,6 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow; @@ -26,6 +25,7 @@ import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; +import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -89,7 +89,7 @@ private InProcessSideInputContainer(InProcessEvaluationContext context, * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without * casting, but will change as this {@link InProcessSideInputContainer} is modified. */ - public SideInputReader withViews(Collection> newContainedViews) { + public SideInputReader createReaderForViews(Collection> newContainedViews) { if (!containedViews.containsAll(newContainedViews)) { Set> currentlyContained = ImmutableSet.copyOf(containedViews); Set> newRequested = ImmutableSet.copyOf(newContainedViews); @@ -108,8 +108,20 @@ public SideInputReader withViews(Collection> newContainedView * *

The provided iterable is expected to contain only a single window and pane. */ - public void write(PCollectionView view, Iterable> values) - throws ExecutionException { + public void write(PCollectionView view, Iterable> values) { + Map>> valuesPerWindow = + indexValuesByWindow(values); + for (Map.Entry>> windowValues : + valuesPerWindow.entrySet()) { + updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue()); + } + } + + /** + * Index the provided values by all {@link BoundedWindow windows} in which they appear. + */ + private Map>> indexValuesByWindow( + Iterable> values) { Map>> valuesPerWindow = new HashMap<>(); for (WindowedValue value : values) { for (BoundedWindow window : value.getWindows()) { @@ -121,29 +133,40 @@ public void write(PCollectionView view, Iterable> windowValues.add(value); } } - for (Map.Entry>> windowValues : - valuesPerWindow.entrySet()) { - PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, windowValues.getKey()); - SettableFuture>> future = viewByWindows.get(windowedView); + return valuesPerWindow; + } + + /** + * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the + * specified values, if the values are part of a later pane than currently exist within the + * {@link PCollectionViewWindow}. + */ + private void updatePCollectionViewWindowValues( + PCollectionView view, BoundedWindow window, Collection> windowValues) { + PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); + SettableFuture>> future = null; + try { + future = viewByWindows.get(windowedView); if (future.isDone()) { - try { - Iterator> existingValues = future.get().iterator(); - PaneInfo newPane = windowValues.getValue().iterator().next().getPane(); - // The current value may have no elements, if no elements were produced for the window, - // but we are recieving late data. - if (!existingValues.hasNext() - || newPane.getIndex() > existingValues.next().getPane().getIndex()) { - viewByWindows.invalidate(windowedView); - viewByWindows.get(windowedView).set(windowValues.getValue()); - } - } catch (InterruptedException e) { - // TODO: Handle meaningfully. This should never really happen when the result remains - // useful, but the result could be available and the thread can still be interrupted. - Thread.currentThread().interrupt(); + Iterator> existingValues = future.get().iterator(); + PaneInfo newPane = windowValues.iterator().next().getPane(); + // The current value may have no elements, if no elements were produced for the window, + // but we are recieving late data. + if (!existingValues.hasNext() + || newPane.getIndex() > existingValues.next().getPane().getIndex()) { + viewByWindows.invalidate(windowedView); + viewByWindows.get(windowedView).set(windowValues); } } else { - future.set(windowValues.getValue()); + future.set(windowValues); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (future != null && !future.isDone()) { + future.set(Collections.>emptyList()); } + } catch (ExecutionException e) { + Throwables.propagate(e.getCause()); } } @@ -165,7 +188,7 @@ public T get(final PCollectionView view, final BoundedWindow window) { viewByWindows.get(windowedView); WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.callAfterOutputMustHaveBeenProduced( + evaluationContext.scheduleAfterOutputWouldBeProduced( view, window, windowingStrategy, new Runnable() { @Override public void run() { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java index e3ae1a028c16..24142c2151c9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java index cd79c219bd67..af5914bab051 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java new file mode 100644 index 000000000000..15955724eb0a --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.common.base.MoreObjects; + +import java.util.Objects; + +/** + * A (Step, Key) pair. This is useful as a map key or cache key for things that are available + * per-step in a keyed manner (e.g. State). + */ +final class StepAndKey { + private final AppliedPTransform step; + private final Object key; + + /** + * Create a new {@link StepAndKey} with the provided step and key. + */ + public static StepAndKey of(AppliedPTransform step, Object key) { + return new StepAndKey(step, key); + } + + private StepAndKey(AppliedPTransform step, Object key) { + this.step = step; + this.key = key; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(StepAndKey.class) + .add("step", step.getFullName()) + .add("key", key) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(step, key); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } else if (!(other instanceof StepAndKey)) { + return false; + } else { + StepAndKey that = (StepAndKey) other; + return Objects.equals(this.step, that.step) + && Objects.equals(this.key, that.key); + } + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java index 3b672e0def5e..860ddfe48f16 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java @@ -16,7 +16,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java new file mode 100644 index 000000000000..0c8cb7e80aca --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} + * implementations based on the type of {@link PTransform} of the application. + */ +class TransformEvaluatorRegistry implements TransformEvaluatorFactory { + public static TransformEvaluatorRegistry defaultRegistry() { + @SuppressWarnings("rawtypes") + ImmutableMap, TransformEvaluatorFactory> primitives = + ImmutableMap., TransformEvaluatorFactory>builder() + .put(Read.Bounded.class, new BoundedReadEvaluatorFactory()) + .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) + .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) + .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) + .put( + GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, + new GroupByKeyEvaluatorFactory()) + .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) + .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) + .build(); + return new TransformEvaluatorRegistry(primitives); + } + + // the TransformEvaluatorFactories can construct instances of all generic types of transform, + // so all instances of a primitive can be handled with the same evaluator factory. + @SuppressWarnings("rawtypes") + private final Map, TransformEvaluatorFactory> factories; + + private TransformEvaluatorRegistry( + @SuppressWarnings("rawtypes") + Map, TransformEvaluatorFactory> factories) { + this.factories = factories; + } + + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + @Nullable CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) + throws Exception { + TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass()); + return factory.forApplication(application, inputBundle, evaluationContext); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index 4beac337d604..97f0e25d38b4 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -21,7 +21,6 @@ import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java index f47cd1de986b..314d81f6aafc 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java new file mode 100644 index 000000000000..27d59b9a6487 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Ordering; + +import org.joda.time.Instant; + +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Executes callbacks that occur based on the progression of the watermark per-step. + * + *

Callbacks are registered by calls to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}, + * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the + * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the + * windowing strategy would have been produced. + * + *

NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any + * {@link AppliedPTransform} - any call to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)} + * that could have potentially already fired should be followed by a call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current + * value of the watermark. + */ +class WatermarkCallbackExecutor { + /** + * Create a new {@link WatermarkCallbackExecutor}. + */ + public static WatermarkCallbackExecutor create() { + return new WatermarkCallbackExecutor(); + } + + private final ConcurrentMap, PriorityQueue> + callbacks; + private final ExecutorService executor; + + private WatermarkCallbackExecutor() { + this.callbacks = new ConcurrentHashMap<>(); + this.executor = Executors.newSingleThreadExecutor(); + } + + /** + * Execute the provided {@link Runnable} after the next call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have + * produced output. + */ + public void callOnGuaranteedFiring( + AppliedPTransform step, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + WatermarkCallback callback = + WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); + + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + callbackQueue = new PriorityQueue<>(11, new CallbackOrdering()); + if (callbacks.putIfAbsent(step, callbackQueue) != null) { + callbackQueue = callbacks.get(step); + } + } + + synchronized (callbackQueue) { + callbackQueue.offer(callback); + } + } + + /** + * Schedule all pending callbacks that must have produced output by the time of the provided + * watermark. + */ + public void fireForWatermark(AppliedPTransform step, Instant watermark) { + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + return; + } + synchronized (callbackQueue) { + while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) { + executor.submit(callbackQueue.poll().getCallback()); + } + } + } + + private static class WatermarkCallback { + public static WatermarkCallback onGuaranteedFiring( + BoundedWindow window, WindowingStrategy strategy, Runnable callback) { + @SuppressWarnings("unchecked") + Instant firingAfter = + strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); + return new WatermarkCallback(firingAfter, callback); + } + + private final Instant fireAfter; + private final Runnable callback; + + private WatermarkCallback(Instant fireAfter, Runnable callback) { + this.fireAfter = fireAfter; + this.callback = callback; + } + + public boolean shouldFire(Instant currentWatermark) { + return currentWatermark.isAfter(fireAfter) + || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + public Runnable getCallback() { + return callback; + } + } + + private static class CallbackOrdering extends Ordering { + @Override + public int compare(WatermarkCallback left, WatermarkCallback right) { + return ComparisonChain.start() + .compare(left.fireAfter, right.fireAfter) + .compare(left.callback, right.callback, Ordering.arbitrary()) + .result(); + } + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index 9f22fbbe9e51..43955149ee03 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -25,7 +25,7 @@ import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; +import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java index bf25970affc1..0120b9880d65 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.when; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index 5c9e824afe41..4ced82f8c77d 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -23,7 +23,6 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 24251522d824..52398cf73a0b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -1047,6 +1047,18 @@ public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { assertThat(built.getCompletedTimers(), emptyIterable()); } + @Test + public void timerUpdateWithCompletedTimersNotAddedToExisting() { + TimerUpdateBuilder builder = TimerUpdate.builder(null); + TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + + TimerUpdate built = builder.build(); + assertThat(built.getCompletedTimers(), emptyIterable()); + assertThat( + built.withCompletedTimers(ImmutableList.of(timer)).getCompletedTimers(), contains(timer)); + assertThat(built.getCompletedTimers(), emptyIterable()); + } + private static Matcher earlierThan(final Instant laterInstant) { return new BaseMatcher() { @Override diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java new file mode 100644 index 000000000000..149096040a3d --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -0,0 +1,436 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing; +import com.google.cloud.dataflow.sdk.util.SideInputReader; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.common.Counter; +import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind; +import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.BagState; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import com.google.cloud.dataflow.sdk.util.state.StateTag; +import com.google.cloud.dataflow.sdk.util.state.StateTags; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link InProcessEvaluationContext}. + */ +@RunWith(JUnit4.class) +public class InProcessEvaluationContextTest { + private TestPipeline p; + private InProcessEvaluationContext context; + private PCollection created; + private PCollection> downstream; + private PCollectionView> view; + + @Before + public void setup() { + InProcessPipelineRunner runner = + InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); + p = TestPipeline.create(); + created = p.apply(Create.of(1, 2, 3)); + downstream = created.apply(WithKeys.of("foo")); + view = created.apply(View.asIterable()); + Collection> rootTransforms = + ImmutableList.>of(created.getProducingTransformInternal()); + Map>> valueToConsumers = new HashMap<>(); + valueToConsumers.put( + created, + ImmutableList.>of( + downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); + valueToConsumers.put(downstream, ImmutableList.>of()); + valueToConsumers.put(view, ImmutableList.>of()); + + Map, String> stepNames = new HashMap<>(); + stepNames.put(created.getProducingTransformInternal(), "s1"); + stepNames.put(downstream.getProducingTransformInternal(), "s2"); + stepNames.put(view.getProducingTransformInternal(), "s3"); + + Collection> views = ImmutableList.>of(view); + context = InProcessEvaluationContext.create( + runner.getPipelineOptions(), + rootTransforms, + valueToConsumers, + stepNames, + views); + } + + @Test + public void writeToViewWriterThenReadReads() { + PCollectionViewWriter> viewWriter = + context.createPCollectionViewWriter( + PCollection.>createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED), + view); + BoundedWindow window = new TestBoundedWindow(new Instant(1024L)); + BoundedWindow second = new TestBoundedWindow(new Instant(899999L)); + WindowedValue firstValue = + WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue secondValue = + WindowedValue.of( + 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)); + Iterable> values = ImmutableList.of(firstValue, secondValue); + viewWriter.add(values); + + SideInputReader reader = + context.createSideInputReader(ImmutableList.>of(view)); + assertThat(reader.get(view, window), containsInAnyOrder(1)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + + WindowedValue overrittenSecondValue = + WindowedValue.of( + 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); + viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(4444)); + } + + @Test + public void getExecutionContextSameStepSameKeyState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1", null); + stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); + + context.handleResult( + InProcessBundle.keyed(created, "foo").commit(Instant.now()), + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withState(stepContext.commitState()) + .build()); + + InProcessExecutionContext secondFooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + assertThat( + secondFooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + contains(1)); + } + + + @Test + public void getExecutionContextDifferentKeysIndependentState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(created.getProducingTransformInternal(), "bar"); + assertThat(barContext, not(equalTo(fooContext))); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void getExecutionContextDifferentStepsIndependentState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), myKey); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void handleResultMergesCounters() { + CounterSet counters = context.createCounterSet(); + Counter myCounter = Counter.longs("foo", AggregationKind.SUM); + counters.addCounter(myCounter); + + myCounter.addValue(4L); + InProcessTransformResult result = + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withCounters(counters) + .build(); + context.handleResult(null, ImmutableList.of(), result); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L)); + + CounterSet againCounters = context.createCounterSet(); + Counter myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM); + againCounters.add(myLongCounterAgain); + myLongCounterAgain.addValue(8L); + + InProcessTransformResult secondResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withCounters(againCounters) + .build(); + context.handleResult( + InProcessBundle.unkeyed(created).commit(Instant.now()), + ImmutableList.of(), + secondResult); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); + } + + @Test + public void handleResultStoresState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + CopyOnAccessInMemoryStateInternals state = + fooContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + BagState bag = state.state(StateNamespaces.global(), intBag); + bag.add(1); + bag.add(2); + bag.add(4); + + InProcessTransformResult stateResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(state) + .build(); + + context.handleResult( + InProcessBundle.keyed(created, myKey).commit(Instant.now()), + ImmutableList.of(), + stateResult); + + InProcessExecutionContext afterResultContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + CopyOnAccessInMemoryStateInternals afterResultState = + afterResultContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception { + final CountDownLatch callLatch = new CountDownLatch(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + callLatch.countDown(); + } + }; + + // Should call back after the end of the global window + context.scheduleAfterOutputWouldBeProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + + InProcessTransformResult result = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + + context.handleResult(null, ImmutableList.of(), result); + + // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit + // will likely be flaky if this logic is broken + assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); + + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.of(), finishedResult); + // Obtain the value via blocking call + assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.of(), finishedResult); + + final CountDownLatch callLatch = new CountDownLatch(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + callLatch.countDown(); + } + }; + context.scheduleAfterOutputWouldBeProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); + } + + @Test + public void extractFiredTimersExtractsTimers() { + InProcessTransformResult holdResult = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + context.handleResult(null, ImmutableList.of(), holdResult); + + String key = "foo"; + TimerData toFire = + TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); + InProcessTransformResult timerResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) + .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) + .build(); + + // haven't added any timers, must be empty + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + context.handleResult( + InProcessBundle.keyed(created, key).commit(Instant.now()), + ImmutableList.of(), + timerResult); + + // timer hasn't fired + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + + InProcessTransformResult advanceResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + // Should cause the downstream timer to fire + context.handleResult(null, ImmutableList.of(), advanceResult); + + Map, Map> fired = context.extractFiredTimers(); + assertThat( + fired, + Matchers.>hasKey(downstream.getProducingTransformInternal())); + Map downstreamFired = + fired.get(downstream.getProducingTransformInternal()); + assertThat(downstreamFired, Matchers.hasKey(key)); + + FiredTimers firedForKey = downstreamFired.get(key); + assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); + + // Don't reextract timers + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + } + + @Test + public void createBundleUnkeyedResultUnkeyed() { + CommittedBundle> newBundle = + context + .createBundle(InProcessBundle.unkeyed(created).commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(false)); + } + + @Test + public void createBundleKeyedResultPropagatesKey() { + CommittedBundle> newBundle = + context + .createBundle(InProcessBundle.keyed(created, "foo").commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(true)); + assertThat(newBundle.getKey(), Matchers.equalTo("foo")); + } + + @Test + public void createRootBundleUnkeyed() { + assertThat(context.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false)); + } + + @Test + public void createKeyedBundleKeyed() { + CommittedBundle> keyedBundle = + context + .createKeyedBundle( + InProcessBundle.unkeyed(created).commit(Instant.now()), "foo", downstream) + .commit(Instant.now()); + assertThat(keyedBundle.isKeyed(), is(true)); + assertThat(keyedBundle.getKey(), Matchers.equalTo("foo")); + } + + private static class TestBoundedWindow extends BoundedWindow { + private final Instant ts; + + public TestBoundedWindow(Instant ts) { + this.ts = ts; + } + + @Override + public Instant maxTimestamp() { + return ts; + } + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 4cfe78293687..16b4eb7d07e2 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.Mean; @@ -137,7 +136,7 @@ public void getAfterWriteReturnsPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, firstWindow); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -153,7 +152,7 @@ public void getReturnsLatestPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -164,7 +163,7 @@ public void getReturnsLatestPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(three)); Map overwrittenViewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(overwrittenViewContents, hasEntry("three", 3)); assertThat(overwrittenViewContents.size(), is(1)); @@ -176,15 +175,18 @@ public void getReturnsLatestPaneInWindow() throws Exception { */ @Test public void getBlocksUntilPaneAvailable() throws Exception { - BoundedWindow window = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; + BoundedWindow window = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(1024L); + } + }; Future singletonFuture = - getFutureOfView(container.withViews(ImmutableList.>of(singletonView)), - singletonView, window); + getFutureOfView( + container.createReaderForViews(ImmutableList.>of(singletonView)), + singletonView, + window); WindowedValue singletonValue = WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); @@ -203,7 +205,7 @@ public Instant maxTimestamp() { } }; SideInputReader newReader = - container.withViews(ImmutableList.>of(singletonView)); + container.createReaderForViews(ImmutableList.>of(singletonView)); Future singletonFuture = getFutureOfView(newReader, singletonView, window); WindowedValue singletonValue = @@ -216,25 +218,31 @@ public Instant maxTimestamp() { @Test public void withPCollectionViewsErrorsForContainsNotInViews() { - PCollectionView>> newView = PCollectionViews.multimapView(pipeline, - WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollectionView>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString()); - container.withViews(ImmutableList.>of(newView)); + container.createReaderForViews(ImmutableList.>of(newView)); } @Test public void withViewsForViewNotInContainerFails() { - PCollectionView>> newView = PCollectionViews.multimapView(pipeline, - WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollectionView>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("unknown views"); thrown.expectMessage(newView.toString()); - container.withViews(ImmutableList.>of(newView)); + container.createReaderForViews(ImmutableList.>of(newView)); } @Test @@ -242,7 +250,7 @@ public void getOnReaderForViewNotInReaderFails() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("unknown view: " + iterableView.toString()); - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(iterableView, GlobalWindow.INSTANCE); } @@ -255,11 +263,11 @@ public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exceptio PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, firstWindow), equalTo(2.875)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, secondWindow), equalTo(4.125)); } @@ -274,7 +282,7 @@ public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Excep container.write(iterableView, ImmutableList.of(firstValue, secondValue)); assertThat( - container.withViews(ImmutableList.>of(iterableView)) + container.createReaderForViews(ImmutableList.>of(iterableView)) .get(iterableView, firstWindow), contains(44, 44)); } @@ -286,11 +294,11 @@ public void writeForElementInMultipleWindowsSucceeds() throws Exception { ImmutableList.of(firstWindow, secondWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(multiWindowedValue)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, firstWindow), equalTo(2.875)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, secondWindow), equalTo(2.875)); } @@ -306,7 +314,7 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { immediatelyInvokeCallback(mapView, secondWindow); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(viewContents, hasEntry("one", 1)); @@ -317,8 +325,11 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { @Test public void finishOnPendingViewsSetsEmptyElements() throws Exception { immediatelyInvokeCallback(mapView, secondWindow); - Future> mapFuture = getFutureOfView( - container.withViews(ImmutableList.>of(mapView)), mapView, secondWindow); + Future> mapFuture = + getFutureOfView( + container.createReaderForViews(ImmutableList.>of(mapView)), + mapView, + secondWindow); assertThat(mapFuture.get().isEmpty(), is(true)); } @@ -329,18 +340,21 @@ public void finishOnPendingViewsSetsEmptyElements() throws Exception { */ private void immediatelyInvokeCallback(PCollectionView view, BoundedWindow window) { doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object callback = invocation.getArguments()[3]; - Runnable callbackRunnable = (Runnable) callback; - callbackRunnable.run(); - return null; - } - }) + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object callback = invocation.getArguments()[3]; + Runnable callbackRunnable = (Runnable) callback; + callbackRunnable.run(); + return null; + } + }) .when(context) - .callAfterOutputMustHaveBeenProduced(Mockito.eq(view), Mockito.eq(window), - Mockito.eq(view.getWindowingStrategyInternal()), Mockito.any(Runnable.class)); + .scheduleAfterOutputWouldBeProduced( + Mockito.eq(view), + Mockito.eq(window), + Mockito.eq(view.getWindowingStrategyInternal()), + Mockito.any(Runnable.class)); } private Future getFutureOfView(final SideInputReader myReader, diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java index 033f9de204d3..66430b6a7f4d 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java @@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java index ae599bab62bc..3b928b907761 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java @@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index f139c5648e95..a9bbcc8cc56c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java index 2f5cd0fb888a..2f5bdde6adcd 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java new file mode 100644 index 000000000000..be3e062fbde1 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link WatermarkCallbackExecutor}. + */ +@RunWith(JUnit4.class) +public class WatermarkCallbackExecutorTest { + private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create(); + private AppliedPTransform create; + private AppliedPTransform sum; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + PCollection created = p.apply(Create.of(1, 2, 3)); + create = created.getProducingTransformInternal(); + sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal(); + } + + @Test + public void onGuaranteedFiringFiresAfterTrigger() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + executor.callOnGuaranteedFiring( + create, + GlobalWindow.INSTANCE, + WindowingStrategy.globalDefault(), + new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void multipleCallbacksShouldFireFires() throws Exception { + CountDownLatch latch = new CountDownLatch(2); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void noCallbacksShouldFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + @Test + public void unrelatedStepShouldNotFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + private static class CountDownLatchCallback implements Runnable { + private final CountDownLatch latch; + + public CountDownLatchCallback(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + latch.countDown(); + } + } +}