diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 965be822e0d3..143254a5b37b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -262,11 +262,22 @@ public int compareTo(TimerData that) { .compare(this.getDomain(), that.getDomain()) .compare(this.getTimerId(), that.getTimerId()) .compare(this.getTimerFamilyId(), that.getTimerFamilyId()); - if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) { + int compResult = chain.result(); + if (compResult == 0 && !this.getNamespace().equals(that.getNamespace())) { // Obtaining the stringKey may be expensive; only do so if required - chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey()); + compResult = this.getNamespace().stringKey().compareTo(that.getNamespace().stringKey()); } - return chain.result(); + return compResult; + } + + public String stringKey() { + return getNamespace().stringKey() + + "/" + + getDomain().toString() + + "/" + + getTimerFamilyId() + + ":" + + getTimerId(); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 5a477bb86c5e..23d011ff0c8b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import java.util.Map; +import java.util.NavigableSet; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -24,6 +26,8 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -35,6 +39,8 @@ class DirectTimerInternals implements TimerInternals { private final Clock processingTimeClock; private final TransformWatermarks watermarks; private final TimerUpdateBuilder timerUpdateBuilder; + private final Map> modifiedTimers; + private final Map modifiedTimerIds; public static DirectTimerInternals create( Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) { @@ -46,6 +52,11 @@ private DirectTimerInternals( this.processingTimeClock = clock; this.watermarks = watermarks; this.timerUpdateBuilder = timerUpdateBuilder; + this.modifiedTimers = Maps.newHashMap(); + this.modifiedTimers.put(TimeDomain.EVENT_TIME, Sets.newTreeSet()); + this.modifiedTimers.put(TimeDomain.PROCESSING_TIME, Sets.newTreeSet()); + this.modifiedTimers.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Sets.newTreeSet()); + this.modifiedTimerIds = Maps.newHashMap(); } @Override @@ -56,8 +67,7 @@ public void setTimer( Instant target, Instant outputTimestamp, TimeDomain timeDomain) { - timerUpdateBuilder.setTimer( - TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain)); + setTimer(TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain)); } /** @@ -68,6 +78,8 @@ public void setTimer( @Override public void setTimer(TimerData timerData) { timerUpdateBuilder.setTimer(timerData); + getModifiedTimersOrdered(timerData.getDomain()).add(timerData); + modifiedTimerIds.put(timerData.stringKey(), timerData); } @Override @@ -93,27 +105,25 @@ public void deleteTimer(StateNamespace namespace, String timerId, String timerFa /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ @Deprecated @Override - public void deleteTimer(TimerData timerKey) { - timerUpdateBuilder.deletedTimer(timerKey); + public void deleteTimer(TimerData timerData) { + timerUpdateBuilder.deletedTimer(timerData); + modifiedTimerIds.put(timerData.stringKey(), timerData); } public TimerUpdate getTimerUpdate() { return timerUpdateBuilder.build(); } - public boolean containsUpdateForTimeBefore( - Instant maxWatermarkTime, Instant maxProcessingTime, Instant maxSynchronizedProcessingTime) { - TimerUpdate update = timerUpdateBuilder.build(); - return hasTimeBefore( - update.getSetTimers(), - maxWatermarkTime, - maxProcessingTime, - maxSynchronizedProcessingTime) - || hasTimeBefore( - update.getDeletedTimers(), - maxWatermarkTime, - maxProcessingTime, - maxSynchronizedProcessingTime); + public NavigableSet getModifiedTimersOrdered(TimeDomain timeDomain) { + NavigableSet modified = modifiedTimers.get(timeDomain); + if (modified == null) { + throw new IllegalStateException("Unexpected time domain " + timeDomain); + } + return modified; + } + + public Map getModifiedTimerIds() { + return modifiedTimerIds; } @Override @@ -135,32 +145,4 @@ public Instant currentInputWatermarkTime() { public @Nullable Instant currentOutputWatermarkTime() { return watermarks.getOutputWatermark(); } - - private boolean hasTimeBefore( - Iterable timers, - Instant maxWatermarkTime, - Instant maxProcessingTime, - Instant maxSynchronizedProcessingTime) { - for (TimerData timerData : timers) { - Instant currentTime; - switch (timerData.getDomain()) { - case EVENT_TIME: - currentTime = maxWatermarkTime; - break; - case PROCESSING_TIME: - currentTime = maxProcessingTime; - break; - case SYNCHRONIZED_PROCESSING_TIME: - currentTime = maxSynchronizedProcessingTime; - break; - default: - throw new RuntimeException("Unexpected timeDomain " + timerData.getDomain()); - } - if (timerData.getTimestamp().isBefore(currentTime) - || timerData.getTimestamp().isEqual(currentTime)) { - return true; - } - } - return false; - } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index a537d30f4529..a43afe6118c4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -172,7 +172,7 @@ public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) @Override public void run() { DriverState drive = executionDriver.drive(); - if (drive.isTermainal()) { + if (drive.isTerminal()) { State newPipelineState = State.UNKNOWN; switch (drive) { case FAILED: diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java index 1d4fa87b8e68..dfd3ee486cd1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -39,6 +38,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +81,7 @@ public static ExecutionDriver create( // watermark of a PTransform before enqueuing the resulting bundle to pendingUpdates of downstream // PTransform, which can lead to watermark being updated past the emitted elements. private final Map, Collection>> inflightBundles = - new ConcurrentHashMap<>(); + Maps.newHashMap(); private final AtomicReference state = new AtomicReference<>(ExecutorState.QUIESCENT); @@ -166,15 +166,17 @@ private void processBundle(CommittedBundle bundle, AppliedPTransform private void processBundle( CommittedBundle bundle, AppliedPTransform consumer, CompletionCallback callback) { - inflightBundles.compute( - consumer, - (k, v) -> { - if (v == null) { - v = new ArrayList<>(); - } - v.add(bundle); - return v; - }); + synchronized (inflightBundles) { + inflightBundles.compute( + consumer, + (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(bundle); + return v; + }); + } outstandingWork.incrementAndGet(); bundleProcessor.process(bundle, consumer, callback); } @@ -182,24 +184,28 @@ private void processBundle( /** Fires any available timers. */ private void fireTimers() { try { - for (FiredTimers> transformTimers : - evaluationContext.extractFiredTimers(inflightBundles.keySet())) { - Collection delivery = transformTimers.getTimers(); - KeyedWorkItem work = - KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); - @SuppressWarnings({"unchecked", "rawtypes"}) - CommittedBundle bundle = - evaluationContext - .createKeyedBundle( - transformTimers.getKey(), - (PCollection) - Iterables.getOnlyElement( - transformTimers.getExecutable().getMainInputs().values())) - .add(WindowedValue.valueInGlobalWindow(work)) - .commit(evaluationContext.now()); - processBundle( - bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery)); - state.set(ExecutorState.ACTIVE); + synchronized (inflightBundles) { + for (FiredTimers> transformTimers : + evaluationContext.extractFiredTimers(inflightBundles.keySet())) { + Collection delivery = transformTimers.getTimers(); + KeyedWorkItem work = + KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle bundle = + evaluationContext + .createKeyedBundle( + transformTimers.getKey(), + (PCollection) + Iterables.getOnlyElement( + transformTimers.getExecutable().getMainInputs().values())) + .add(WindowedValue.valueInGlobalWindow(work)) + .commit(evaluationContext.now()); + processBundle( + bundle, + transformTimers.getExecutable(), + new TimerIterableCompletionCallback(delivery)); + state.set(ExecutorState.ACTIVE); + } } } catch (Exception e) { LOG.error("Internal Error while delivering timers", e); @@ -311,12 +317,14 @@ public final CommittedResult handleResult( state.set(ExecutorState.ACTIVE); } outstandingWork.decrementAndGet(); - inflightBundles.compute( - result.getTransform(), - (k, v) -> { - v.remove(inputBundle); - return v.isEmpty() ? null : v; - }); + synchronized (inflightBundles) { + inflightBundles.compute( + result.getTransform(), + (k, v) -> { + v.remove(inputBundle); + return v.isEmpty() ? null : v; + }); + } return committedResult; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index ebd305f4f365..054b22d7d7b9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -17,14 +17,10 @@ */ package org.apache.beam.runners.direct; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - import com.google.auto.value.AutoValue; -import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; +import java.util.NavigableSet; +import javax.annotation.Nullable; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; @@ -47,7 +43,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; import org.joda.time.Instant; /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */ @@ -155,7 +150,6 @@ private static class StatefulParDoEvaluator implements TransformEvaluator>> { private final DoFnLifecycleManagerRemovingTransformEvaluator> delegateEvaluator; - private final List pushedBackTimers = new ArrayList<>(); private final DirectTimerInternals timerInternals; DirectStepContext stepContext; @@ -174,45 +168,46 @@ public void processElement(WindowedValue>> gbkRes for (WindowedValue> windowedValue : gbkResult.getValue().elementsIterable()) { delegateEvaluator.processElement(windowedValue); } - PriorityQueue toBeFiredTimers = - new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); - Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; for (TimerData timerData : gbkResult.getValue().timersIterable()) { - toBeFiredTimers.add(timerData); - switch (timerData.getDomain()) { - case EVENT_TIME: - maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp()); - break; - case PROCESSING_TIME: - maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp()); - break; - case SYNCHRONIZED_PROCESSING_TIME: - maxSynchronizedProcessingTime = - Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp()); + // Get any new or modified timers that are earlier than the current one. In order to + // maintain timer ordering, + // we need to fire these timers first. + NavigableSet earlierTimers = + timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData, true); + while (!earlierTimers.isEmpty()) { + TimerData insertedTimer = earlierTimers.pollFirst(); + if (timerModified(insertedTimer)) { + continue; + } + // Make sure to register this timer as deleted. This could be a timer that was originally + // set for the future + // and not in the bundle but was reset to an earlier time in this bundle. If we don't + // explicity delete the + // future timer, then it will still fire. + timerInternals.deleteTimer(insertedTimer); + processTimer(insertedTimer, gbkResult.getValue().key()); } - } - while (!timerInternals.containsUpdateForTimeBefore( - maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime) - && !toBeFiredTimers.isEmpty()) { - - TimerData timer = toBeFiredTimers.poll(); - checkState( - timer.getNamespace() instanceof WindowNamespace, - "Expected Timer %s to be in a %s, but got %s", - timer, - WindowNamespace.class.getSimpleName(), - timer.getNamespace().getClass().getName()); - WindowNamespace windowNamespace = (WindowNamespace) timer.getNamespace(); - BoundedWindow timerWindow = windowNamespace.getWindow(); - - delegateEvaluator.onTimer(timer, gbkResult.getValue().key(), timerWindow); - clearWatermarkHold(timer); + // As long as the timer hasn't been modified or deleted earlier in the bundle, fire it. + if (!timerModified(timerData)) { + processTimer(timerData, gbkResult.getValue().key()); + } } - pushedBackTimers.addAll(toBeFiredTimers); + } + + // Check to see if a timer has been modified inside this bundle. + private boolean timerModified(TimerData timerData) { + @Nullable + TimerData modifiedTimer = timerInternals.getModifiedTimerIds().get(timerData.stringKey()); + return modifiedTimer != null && !modifiedTimer.equals(timerData); + } + + private void processTimer(TimerData timerData, K key) throws Exception { + WindowNamespace windowNamespace = (WindowNamespace) timerData.getNamespace(); + BoundedWindow timerWindow = windowNamespace.getWindow(); + delegateEvaluator.onTimer(timerData, key, timerWindow); + clearWatermarkHold(timerData); } private void clearWatermarkHold(TimerData timer) { @@ -256,9 +251,7 @@ public TransformResult>> finishBundle() throws Ex watermarkHold = delegateResult.getWatermarkHold(); } - TimerUpdate timerUpdate = - delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers); - pushedBackTimers.clear(); + TimerUpdate timerUpdate = delegateResult.getTimerUpdate(); StepTransformResult.Builder>> regroupedResult = StepTransformResult.>>withHold( delegateResult.getTransform(), watermarkHold) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index e6626d0f2589..a68021e05581 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -26,17 +26,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Objects; +import java.util.Queue; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,7 +43,6 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.local.Bundle; @@ -60,16 +57,14 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SortedMultiset; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TreeMultiset; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -239,7 +234,7 @@ static class AppliedPTransformInputWatermark implements Watermark { // Entries in this table represent the authoritative timestamp for which // a per-key-and-StateNamespace timer is set. - private final Map, Table> existingTimers; + private final Map, Map> existingTimers; // This per-key sorted set allows quick retrieval of timers that should fire for a key private final Map, NavigableSet> objectTimers; @@ -343,15 +338,14 @@ private Instant getMinimumOutputTimestamp(SortedMultiset timers) { synchronized void updateTimers(TimerUpdate update) { NavigableSet keyTimers = objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>()); - Table existingTimersForKey = - existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create()); + Map existingTimersForKey = + existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap()); + HashSet newSetTimers = Sets.newHashSet(); for (TimerData timer : update.getSetTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - @Nullable - TimerData existingTimer = - existingTimersForKey.get( - timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId()); + newSetTimers.add(timer.stringKey()); + @Nullable TimerData existingTimer = existingTimersForKey.get(timer.stringKey()); if (existingTimer == null) { pendingTimers.add(timer); @@ -366,32 +360,29 @@ synchronized void updateTimers(TimerUpdate update) { keyTimers.add(timer); } - existingTimersForKey.put( - timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId(), timer); + existingTimersForKey.put(timer.stringKey(), timer); } } for (TimerData timer : update.getDeletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - @Nullable - TimerData existingTimer = - existingTimersForKey.get( - timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId()); + @Nullable TimerData existingTimer = existingTimersForKey.get(timer.stringKey()); if (existingTimer != null) { pendingTimers.remove(existingTimer); keyTimers.remove(existingTimer); - existingTimersForKey.remove( - existingTimer.getNamespace(), - existingTimer.getTimerId() + '+' + existingTimer.getTimerFamilyId()); + existingTimersForKey.remove(existingTimer.stringKey()); } } } for (TimerData timer : update.getCompletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - keyTimers.remove(timer); - pendingTimers.remove(timer); + if (!newSetTimers.contains(timer.stringKey())) { + keyTimers.remove(timer); + pendingTimers.remove(timer); + existingTimersForKey.remove(timer.stringKey()); + } } } @@ -519,7 +510,7 @@ private static class SynchronizedProcessingTimeInputWatermark implements Waterma private final Collection> pendingBundles; private final Map, NavigableSet> processingTimers; private final Map, NavigableSet> synchronizedProcessingTimers; - private final Map, Table> existingTimers; + private final Map, Map> existingTimers; private final NavigableSet pendingTimers; @@ -629,21 +620,18 @@ private Instant getMinimumOutputTimestamp(NavigableSet timers) { } private synchronized void updateTimers(TimerUpdate update) { - Map> timerMap = timerMap(update.key); - Table existingTimersForKey = - existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create()); + Map existingTimersForKey = + existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap()); + HashSet newSetTimers = Sets.newHashSet(); for (TimerData addedTimer : update.setTimers.values()) { - NavigableSet timerQueue = timerMap.get(addedTimer.getDomain()); + NavigableSet timerQueue = + processQueueForDomain(update.key, addedTimer.getDomain()); if (timerQueue == null) { continue; } - - @Nullable - TimerData existingTimer = - existingTimersForKey.get( - addedTimer.getNamespace(), - addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId()); + newSetTimers.add(addedTimer.stringKey()); + @Nullable TimerData existingTimer = existingTimersForKey.get(addedTimer.stringKey()); if (existingTimer == null) { timerQueue.add(addedTimer); } else if (!existingTimer.equals(addedTimer)) { @@ -651,34 +639,30 @@ private synchronized void updateTimers(TimerUpdate update) { timerQueue.add(addedTimer); } // else the timer is already set identically, so noop. - existingTimersForKey.put( - addedTimer.getNamespace(), - addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId(), - addedTimer); + existingTimersForKey.put(addedTimer.stringKey(), addedTimer); } for (TimerData deletedTimer : update.deletedTimers.values()) { - NavigableSet timerQueue = timerMap.get(deletedTimer.getDomain()); + NavigableSet timerQueue = + processQueueForDomain(update.key, deletedTimer.getDomain()); if (timerQueue == null) { continue; } - - @Nullable - TimerData existingTimer = - existingTimersForKey.get( - deletedTimer.getNamespace(), - deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId()); + String timerKey = deletedTimer.stringKey(); + @Nullable TimerData existingTimer = existingTimersForKey.get(timerKey); if (existingTimer != null) { pendingTimers.remove(deletedTimer); timerQueue.remove(deletedTimer); - existingTimersForKey.remove( - existingTimer.getNamespace(), - existingTimer.getTimerId() + '+' + existingTimer.getTimerFamilyId()); + existingTimersForKey.remove(timerKey); } } for (TimerData completedTimer : update.completedTimers) { - pendingTimers.remove(completedTimer); + String timerKey = completedTimer.stringKey(); + if (!newSetTimers.contains(timerKey)) { + pendingTimers.remove(completedTimer); + existingTimersForKey.remove(timerKey); + } } // notify of TimerData update @@ -713,15 +697,16 @@ private synchronized Map, List> extractFiredDomainTi return firedTimers; } - private Map> timerMap(StructuralKey key) { - NavigableSet processingQueue = - processingTimers.computeIfAbsent(key, k -> new TreeSet<>()); - NavigableSet synchronizedProcessingQueue = - synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet<>()); - EnumMap> result = new EnumMap<>(TimeDomain.class); - result.put(TimeDomain.PROCESSING_TIME, processingQueue); - result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue); - return result; + private @Nullable NavigableSet processQueueForDomain( + StructuralKey key, TimeDomain timeDomain) { + switch (timeDomain) { + case PROCESSING_TIME: + return processingTimers.computeIfAbsent(key, k -> new TreeSet<>()); + case SYNCHRONIZED_PROCESSING_TIME: + return synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet<>()); + default: + return null; + } } @Override @@ -853,7 +838,7 @@ public Instant get() { * *

The result collection retains ordering of timers (from earliest to latest). */ - private static Map, List> extractFiredTimers( + private static synchronized Map, List> extractFiredTimers( Instant latestTime, Map, NavigableSet> objectTimers) { Map, List> result = new HashMap<>(); Set> emptyKeys = new HashSet<>(); @@ -895,8 +880,7 @@ private static Map, List> extractFiredTimers( private final Map transformToWatermarks; /** A queue of pending updates to the state of this {@link WatermarkManager}. */ - private final ConcurrentLinkedQueue> - pendingUpdates; + private final Queue> pendingUpdates; /** A lock used to control concurrency for updating pending values. */ private final Lock refreshLock; @@ -914,7 +898,7 @@ private static Map, List> extractFiredTimers( * bundle processor at a time. */ private final Map> transformsWithAlreadyExtractedTimers = - new ConcurrentHashMap<>(); + Maps.newHashMap(); /** * Creates a new {@link WatermarkManager}. All watermarks within the newly created {@link @@ -942,7 +926,7 @@ private WatermarkManager( this.graph = graph; this.getName = getName; - this.pendingUpdates = new ConcurrentLinkedQueue<>(); + this.pendingUpdates = Queues.newArrayDeque(); this.refreshLock = new ReentrantLock(); this.pendingRefreshes = new HashSet<>(); @@ -1000,18 +984,20 @@ private static Consumer timerUpdateConsumer( Map> transformsWithAlreadyExtractedTimers, ExecutableT executable) { return update -> { - String timerIdWithNs = TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(update); - transformsWithAlreadyExtractedTimers.compute( - executable, - (k, v) -> { - if (v != null) { - v.remove(timerIdWithNs); - if (v.isEmpty()) { - v = null; + String timerIdWithNs = update.stringKey(); + synchronized (transformsWithAlreadyExtractedTimers) { + transformsWithAlreadyExtractedTimers.compute( + executable, + (k, v) -> { + if (v != null) { + v.remove(timerIdWithNs); + if (v.isEmpty()) { + v = null; + } } - } - return v; - }); + return v; + }); + } }; } @@ -1108,9 +1094,11 @@ public void updateWatermarks( @Nullable Bundle unprocessedInputs, Iterable> outputs, Instant earliestHold) { - pendingUpdates.offer( - PendingWatermarkUpdate.create( - executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold)); + synchronized (pendingUpdates) { + pendingUpdates.offer( + PendingWatermarkUpdate.create( + executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold)); + } tryApplyPendingUpdates(); } @@ -1140,10 +1128,12 @@ private void applyAllPendingUpdates() { /** Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive. */ @GuardedBy("refreshLock") private void applyNUpdates(int numUpdates) { - for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates <= 0); i++) { - PendingWatermarkUpdate pending = pendingUpdates.poll(); - applyPendingUpdate(pending); - pendingRefreshes.add(pending.getExecutable()); + synchronized (pendingUpdates) { + for (int i = 0; !pendingUpdates.isEmpty() && ((i < numUpdates) || (numUpdates <= 0)); i++) { + PendingWatermarkUpdate pending = pendingUpdates.poll(); + applyPendingUpdate(pending); + pendingRefreshes.add(pending.getExecutable()); + } } } @@ -1269,26 +1259,27 @@ public Collection> extractFiredTimers( if (ignoredExecutables.contains(transform)) { continue; } - if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) { - TransformWatermarks watermarks = watermarksEntry.getValue(); - Collection> firedTimers = watermarks.extractFiredTimers(); - if (!firedTimers.isEmpty()) { - List newTimers = - firedTimers.stream() - .flatMap(f -> f.getTimers().stream()) - .collect(Collectors.toList()); - transformsWithAlreadyExtractedTimers.compute( - transform, - (k, v) -> { - if (v == null) { - v = new HashSet<>(); - } - final Set toUpdate = v; - newTimers.forEach( - td -> toUpdate.add(TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(td))); - return v; - }); - allTimers.addAll(firedTimers); + synchronized (transformsWithAlreadyExtractedTimers) { + if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) { + TransformWatermarks watermarks = watermarksEntry.getValue(); + Collection> firedTimers = watermarks.extractFiredTimers(); + if (!firedTimers.isEmpty()) { + List newTimers = + firedTimers.stream() + .flatMap(f -> f.getTimers().stream()) + .collect(Collectors.toList()); + transformsWithAlreadyExtractedTimers.compute( + transform, + (k, v) -> { + if (v == null) { + v = new HashSet<>(); + } + final Set toUpdate = v; + newTimers.forEach(td -> toUpdate.add(td.stringKey())); + return v; + }); + allTimers.addAll(firedTimers); + } } } } @@ -1563,24 +1554,18 @@ static TimerKey of(TimerData timerData) { * *

setTimers and deletedTimers are collections of {@link TimerData} that have been added to the * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as - * the input to the executed step. pushedBackTimers are timers that were in completedTimers at the - * input, but were pushed back due to processing constraints. + * the input to the executed step. */ public static class TimerUpdate { private final StructuralKey key; private final Iterable completedTimers; private final Map setTimers; private final Map deletedTimers; - private final Iterable pushedBackTimers; /** Returns a TimerUpdate for a null key with no timers. */ public static TimerUpdate empty() { return new TimerUpdate( - null, - Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap(), - Collections.emptyList()); + null, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap()); } /** @@ -1651,26 +1636,19 @@ public TimerUpdate build() { key, ImmutableList.copyOf(completedTimers), ImmutableMap.copyOf(setTimers), - ImmutableMap.copyOf(deletedTimers), - Collections.emptyList()); + ImmutableMap.copyOf(deletedTimers)); } } - private static String getTimerIdAndTimerFamilyIdWithNamespace(TimerData td) { - return td.getNamespace() + td.getTimerId() + td.getTimerFamilyId(); - } - private TimerUpdate( StructuralKey key, Iterable completedTimers, Map setTimers, - Map deletedTimers, - Iterable pushedBackTimers) { + Map deletedTimers) { this.key = key; this.completedTimers = completedTimers; this.setTimers = setTimers; this.deletedTimers = deletedTimers; - this.pushedBackTimers = pushedBackTimers; } @VisibleForTesting @@ -1693,46 +1671,21 @@ public Iterable getDeletedTimers() { return deletedTimers.values(); } - Iterable getPushedBackTimers() { - return pushedBackTimers; - } - boolean isEmpty() { - return Iterables.isEmpty(completedTimers) - && setTimers.isEmpty() - && deletedTimers.isEmpty() - && Iterables.isEmpty(pushedBackTimers); + return Iterables.isEmpty(completedTimers) && setTimers.isEmpty() && deletedTimers.isEmpty(); } /** * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers. - * Note that if any of the completed timers is in pushedBackTimers, then it is set instead. The - * pushedBackTimers are cleared afterwards. */ public TimerUpdate withCompletedTimers(Iterable completedTimers) { List timersToComplete = new ArrayList<>(); - Set pushedBack = Sets.newHashSet(pushedBackTimers); Map newSetTimers = Maps.newLinkedHashMap(); newSetTimers.putAll(setTimers); for (TimerData td : completedTimers) { - TimerKey timerKey = TimerKey.of(td); - if (!pushedBack.contains(td)) { - timersToComplete.add(td); - } else if (!newSetTimers.containsKey(timerKey)) { - newSetTimers.put(timerKey, td); - } + timersToComplete.add(td); } - return new TimerUpdate( - key, timersToComplete, newSetTimers, deletedTimers, Collections.emptyList()); - } - - /** - * Returns a {@link TimerUpdate} that is like this one, but with the pushedBackTimersare removed - * set by provided pushedBackTimers. - */ - public TimerUpdate withPushedBackTimers(Iterable pushedBackTimers) { - return new TimerUpdate( - key, completedTimers, setTimers, deletedTimers, Lists.newArrayList(pushedBackTimers)); + return new TimerUpdate(key, timersToComplete, newSetTimers, deletedTimers); } @Override @@ -1759,7 +1712,6 @@ public String toString() { .add("setTimers", setTimers) .add("completedTimers", completedTimers) .add("deletedTimers", deletedTimers) - .add("pushedBackTimers", pushedBackTimers) .toString(); } } diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java index c17dc5479a96..3638e4e2878c 100644 --- a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java +++ b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java @@ -33,7 +33,7 @@ enum DriverState { this.terminal = terminal; } - public boolean isTermainal() { + public boolean isTerminal() { return terminal; } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index f5ce5e20c4c0..5f8d173d8ad1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -4085,7 +4085,8 @@ public void onTimer(OutputReceiver r) { ValidatesRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class, - UsesLoopingTimer.class + UsesLoopingTimer.class, + UsesStrictTimerOrdering.class }) public void testEventTimeTimerLoop() { final String stateId = "count"; @@ -4105,6 +4106,7 @@ public void testEventTimeTimerLoop() { public void processElement( @StateId(stateId) ValueState countState, @TimerId(timerId) Timer loopTimer) { + countState.write(0); loopTimer.offset(Duration.millis(1)).setRelative(); } @@ -4113,7 +4115,7 @@ public void onLoopTimer( @StateId(stateId) ValueState countState, @TimerId(timerId) Timer loopTimer, OutputReceiver r) { - int count = MoreObjects.firstNonNull(countState.read(), 0); + int count = Preconditions.checkNotNull(countState.read()); if (count < loopCount) { r.output(count); countState.write(count + 1); @@ -4123,7 +4125,9 @@ public void onLoopTimer( }; PCollection output = - pipeline.apply(Create.of(KV.of("hello", 42))).apply(ParDo.of(fn)); + pipeline + .apply(Create.of(KV.of("hello1", 42), KV.of("hello2", 42), KV.of("hello3", 42))) + .apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder(0, 1, 2, 3, 4); pipeline.run();