Skip to content

Commit

Permalink
Fix timer consistency in direct runner
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Feb 3, 2022
1 parent 9794fb4 commit 8ea77f9
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,22 @@ public int compareTo(TimerData that) {
.compare(this.getDomain(), that.getDomain())
.compare(this.getTimerId(), that.getTimerId())
.compare(this.getTimerFamilyId(), that.getTimerFamilyId());
if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) {
int compResult = chain.result();
if (compResult == 0 && !this.getNamespace().equals(that.getNamespace())) {
// Obtaining the stringKey may be expensive; only do so if required
chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey());
compResult = this.getNamespace().stringKey().compareTo(that.getNamespace().stringKey());
}
return chain.result();
return compResult;
}

public String stringKey() {
return getNamespace().stringKey()
+ "/"
+ getDomain().toString()
+ "/"
+ getTimerFamilyId()
+ ":"
+ getTimerId();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
*/
package org.apache.beam.runners.direct;

import java.util.Map;
import java.util.NavigableSet;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

Expand All @@ -35,6 +39,8 @@ class DirectTimerInternals implements TimerInternals {
private final Clock processingTimeClock;
private final TransformWatermarks watermarks;
private final TimerUpdateBuilder timerUpdateBuilder;
private final Map<TimeDomain, NavigableSet<TimerData>> modifiedTimers;
private final Map<String, TimerData> modifiedTimerIds;

public static DirectTimerInternals create(
Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
Expand All @@ -46,6 +52,11 @@ private DirectTimerInternals(
this.processingTimeClock = clock;
this.watermarks = watermarks;
this.timerUpdateBuilder = timerUpdateBuilder;
this.modifiedTimers = Maps.newHashMap();
this.modifiedTimers.put(TimeDomain.EVENT_TIME, Sets.newTreeSet());
this.modifiedTimers.put(TimeDomain.PROCESSING_TIME, Sets.newTreeSet());
this.modifiedTimers.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Sets.newTreeSet());
this.modifiedTimerIds = Maps.newHashMap();
}

@Override
Expand All @@ -56,8 +67,7 @@ public void setTimer(
Instant target,
Instant outputTimestamp,
TimeDomain timeDomain) {
timerUpdateBuilder.setTimer(
TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
setTimer(TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
}

/**
Expand All @@ -68,6 +78,8 @@ public void setTimer(
@Override
public void setTimer(TimerData timerData) {
timerUpdateBuilder.setTimer(timerData);
getModifiedTimersOrdered(timerData.getDomain()).add(timerData);
modifiedTimerIds.put(timerData.stringKey(), timerData);
}

@Override
Expand All @@ -93,27 +105,25 @@ public void deleteTimer(StateNamespace namespace, String timerId, String timerFa
/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
@Deprecated
@Override
public void deleteTimer(TimerData timerKey) {
timerUpdateBuilder.deletedTimer(timerKey);
public void deleteTimer(TimerData timerData) {
timerUpdateBuilder.deletedTimer(timerData);
modifiedTimerIds.put(timerData.stringKey(), timerData);
}

public TimerUpdate getTimerUpdate() {
return timerUpdateBuilder.build();
}

public boolean containsUpdateForTimeBefore(
Instant maxWatermarkTime, Instant maxProcessingTime, Instant maxSynchronizedProcessingTime) {
TimerUpdate update = timerUpdateBuilder.build();
return hasTimeBefore(
update.getSetTimers(),
maxWatermarkTime,
maxProcessingTime,
maxSynchronizedProcessingTime)
|| hasTimeBefore(
update.getDeletedTimers(),
maxWatermarkTime,
maxProcessingTime,
maxSynchronizedProcessingTime);
public NavigableSet<TimerData> getModifiedTimersOrdered(TimeDomain timeDomain) {
NavigableSet<TimerData> modified = modifiedTimers.get(timeDomain);
if (modified == null) {
throw new IllegalStateException("Unexpected time domain " + timeDomain);
}
return modified;
}

public Map<String, TimerData> getModifiedTimerIds() {
return modifiedTimerIds;
}

@Override
Expand All @@ -135,32 +145,4 @@ public Instant currentInputWatermarkTime() {
public @Nullable Instant currentOutputWatermarkTime() {
return watermarks.getOutputWatermark();
}

private boolean hasTimeBefore(
Iterable<? extends TimerData> timers,
Instant maxWatermarkTime,
Instant maxProcessingTime,
Instant maxSynchronizedProcessingTime) {
for (TimerData timerData : timers) {
Instant currentTime;
switch (timerData.getDomain()) {
case EVENT_TIME:
currentTime = maxWatermarkTime;
break;
case PROCESSING_TIME:
currentTime = maxProcessingTime;
break;
case SYNCHRONIZED_PROCESSING_TIME:
currentTime = maxSynchronizedProcessingTime;
break;
default:
throw new RuntimeException("Unexpected timeDomain " + timerData.getDomain());
}
if (timerData.getTimestamp().isBefore(currentTime)
|| timerData.getTimestamp().isEqual(currentTime)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry)
@Override
public void run() {
DriverState drive = executionDriver.drive();
if (drive.isTermainal()) {
if (drive.isTerminal()) {
State newPipelineState = State.UNKNOWN;
switch (drive) {
case FAILED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -39,6 +38,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,7 +81,7 @@ public static ExecutionDriver create(
// watermark of a PTransform before enqueuing the resulting bundle to pendingUpdates of downstream
// PTransform, which can lead to watermark being updated past the emitted elements.
private final Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> inflightBundles =
new ConcurrentHashMap<>();
Maps.newHashMap();

private final AtomicReference<ExecutorState> state =
new AtomicReference<>(ExecutorState.QUIESCENT);
Expand Down Expand Up @@ -166,40 +166,46 @@ private void processBundle(CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?>

private void processBundle(
CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?> consumer, CompletionCallback callback) {
inflightBundles.compute(
consumer,
(k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
v.add(bundle);
return v;
});
synchronized (inflightBundles) {
inflightBundles.compute(
consumer,
(k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
v.add(bundle);
return v;
});
}
outstandingWork.incrementAndGet();
bundleProcessor.process(bundle, consumer, callback);
}

/** Fires any available timers. */
private void fireTimers() {
try {
for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
Collection<TimerData> delivery = transformTimers.getTimers();
KeyedWorkItem<?, Object> work =
KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
evaluationContext
.createKeyedBundle(
transformTimers.getKey(),
(PCollection)
Iterables.getOnlyElement(
transformTimers.getExecutable().getMainInputs().values()))
.add(WindowedValue.valueInGlobalWindow(work))
.commit(evaluationContext.now());
processBundle(
bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery));
state.set(ExecutorState.ACTIVE);
synchronized (inflightBundles) {
for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
Collection<TimerData> delivery = transformTimers.getTimers();
KeyedWorkItem<?, Object> work =
KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
evaluationContext
.createKeyedBundle(
transformTimers.getKey(),
(PCollection)
Iterables.getOnlyElement(
transformTimers.getExecutable().getMainInputs().values()))
.add(WindowedValue.valueInGlobalWindow(work))
.commit(evaluationContext.now());
processBundle(
bundle,
transformTimers.getExecutable(),
new TimerIterableCompletionCallback(delivery));
state.set(ExecutorState.ACTIVE);
}
}
} catch (Exception e) {
LOG.error("Internal Error while delivering timers", e);
Expand Down Expand Up @@ -311,12 +317,14 @@ public final CommittedResult handleResult(
state.set(ExecutorState.ACTIVE);
}
outstandingWork.decrementAndGet();
inflightBundles.compute(
result.getTransform(),
(k, v) -> {
v.remove(inputBundle);
return v.isEmpty() ? null : v;
});
synchronized (inflightBundles) {
inflightBundles.compute(
result.getTransform(),
(k, v) -> {
v.remove(inputBundle);
return v.isEmpty() ? null : v;
});
}
return committedResult;
}

Expand Down
Loading

0 comments on commit 8ea77f9

Please sign in to comment.