Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11971] Fix directrunner timer consistency #16650

Merged
merged 1 commit into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,22 @@ public int compareTo(TimerData that) {
.compare(this.getDomain(), that.getDomain())
.compare(this.getTimerId(), that.getTimerId())
.compare(this.getTimerFamilyId(), that.getTimerFamilyId());
if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) {
int compResult = chain.result();
if (compResult == 0 && !this.getNamespace().equals(that.getNamespace())) {
// Obtaining the stringKey may be expensive; only do so if required
chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey());
compResult = this.getNamespace().stringKey().compareTo(that.getNamespace().stringKey());
}
return chain.result();
return compResult;
}

public String stringKey() {
return getNamespace().stringKey()
+ "/"
+ getDomain().toString()
+ "/"
+ getTimerFamilyId()
je-ik marked this conversation as resolved.
Show resolved Hide resolved
+ ":"
+ getTimerId();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
*/
package org.apache.beam.runners.direct;

import java.util.Map;
import java.util.NavigableSet;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

Expand All @@ -35,6 +39,8 @@ class DirectTimerInternals implements TimerInternals {
private final Clock processingTimeClock;
private final TransformWatermarks watermarks;
private final TimerUpdateBuilder timerUpdateBuilder;
private final Map<TimeDomain, NavigableSet<TimerData>> modifiedTimers;
private final Map<String, TimerData> modifiedTimerIds;

public static DirectTimerInternals create(
Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
Expand All @@ -46,6 +52,11 @@ private DirectTimerInternals(
this.processingTimeClock = clock;
this.watermarks = watermarks;
this.timerUpdateBuilder = timerUpdateBuilder;
this.modifiedTimers = Maps.newHashMap();
this.modifiedTimers.put(TimeDomain.EVENT_TIME, Sets.newTreeSet());
this.modifiedTimers.put(TimeDomain.PROCESSING_TIME, Sets.newTreeSet());
this.modifiedTimers.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Sets.newTreeSet());
this.modifiedTimerIds = Maps.newHashMap();
}

@Override
Expand All @@ -56,8 +67,7 @@ public void setTimer(
Instant target,
Instant outputTimestamp,
TimeDomain timeDomain) {
timerUpdateBuilder.setTimer(
TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
setTimer(TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
}

/**
Expand All @@ -68,6 +78,8 @@ public void setTimer(
@Override
public void setTimer(TimerData timerData) {
timerUpdateBuilder.setTimer(timerData);
getModifiedTimersOrdered(timerData.getDomain()).add(timerData);
modifiedTimerIds.put(timerData.stringKey(), timerData);
}

@Override
Expand All @@ -93,27 +105,25 @@ public void deleteTimer(StateNamespace namespace, String timerId, String timerFa
/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
@Deprecated
@Override
public void deleteTimer(TimerData timerKey) {
timerUpdateBuilder.deletedTimer(timerKey);
public void deleteTimer(TimerData timerData) {
timerUpdateBuilder.deletedTimer(timerData);
modifiedTimerIds.put(timerData.stringKey(), timerData);
}

public TimerUpdate getTimerUpdate() {
return timerUpdateBuilder.build();
}

public boolean containsUpdateForTimeBefore(
Instant maxWatermarkTime, Instant maxProcessingTime, Instant maxSynchronizedProcessingTime) {
TimerUpdate update = timerUpdateBuilder.build();
return hasTimeBefore(
update.getSetTimers(),
maxWatermarkTime,
maxProcessingTime,
maxSynchronizedProcessingTime)
|| hasTimeBefore(
update.getDeletedTimers(),
maxWatermarkTime,
maxProcessingTime,
maxSynchronizedProcessingTime);
public NavigableSet<TimerData> getModifiedTimersOrdered(TimeDomain timeDomain) {
NavigableSet<TimerData> modified = modifiedTimers.get(timeDomain);
if (modified == null) {
throw new IllegalStateException("Unexpected time domain " + timeDomain);
}
return modified;
}

public Map<String, TimerData> getModifiedTimerIds() {
return modifiedTimerIds;
}

@Override
Expand All @@ -135,32 +145,4 @@ public Instant currentInputWatermarkTime() {
public @Nullable Instant currentOutputWatermarkTime() {
return watermarks.getOutputWatermark();
}

private boolean hasTimeBefore(
Iterable<? extends TimerData> timers,
Instant maxWatermarkTime,
Instant maxProcessingTime,
Instant maxSynchronizedProcessingTime) {
for (TimerData timerData : timers) {
Instant currentTime;
switch (timerData.getDomain()) {
case EVENT_TIME:
currentTime = maxWatermarkTime;
break;
case PROCESSING_TIME:
currentTime = maxProcessingTime;
break;
case SYNCHRONIZED_PROCESSING_TIME:
currentTime = maxSynchronizedProcessingTime;
break;
default:
throw new RuntimeException("Unexpected timeDomain " + timerData.getDomain());
}
if (timerData.getTimestamp().isBefore(currentTime)
|| timerData.getTimestamp().isEqual(currentTime)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry)
@Override
public void run() {
DriverState drive = executionDriver.drive();
if (drive.isTermainal()) {
if (drive.isTerminal()) {
State newPipelineState = State.UNKNOWN;
switch (drive) {
case FAILED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -39,6 +38,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,7 +81,7 @@ public static ExecutionDriver create(
// watermark of a PTransform before enqueuing the resulting bundle to pendingUpdates of downstream
// PTransform, which can lead to watermark being updated past the emitted elements.
private final Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> inflightBundles =
new ConcurrentHashMap<>();
Maps.newHashMap();

private final AtomicReference<ExecutorState> state =
new AtomicReference<>(ExecutorState.QUIESCENT);
Expand Down Expand Up @@ -166,40 +166,46 @@ private void processBundle(CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?>

private void processBundle(
CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?> consumer, CompletionCallback callback) {
inflightBundles.compute(
consumer,
(k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
v.add(bundle);
return v;
});
synchronized (inflightBundles) {
inflightBundles.compute(
consumer,
(k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
v.add(bundle);
return v;
});
}
outstandingWork.incrementAndGet();
bundleProcessor.process(bundle, consumer, callback);
}

/** Fires any available timers. */
private void fireTimers() {
try {
for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
Collection<TimerData> delivery = transformTimers.getTimers();
KeyedWorkItem<?, Object> work =
KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
evaluationContext
.createKeyedBundle(
transformTimers.getKey(),
(PCollection)
Iterables.getOnlyElement(
transformTimers.getExecutable().getMainInputs().values()))
.add(WindowedValue.valueInGlobalWindow(work))
.commit(evaluationContext.now());
processBundle(
bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery));
state.set(ExecutorState.ACTIVE);
synchronized (inflightBundles) {
for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
Collection<TimerData> delivery = transformTimers.getTimers();
KeyedWorkItem<?, Object> work =
KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
@SuppressWarnings({"unchecked", "rawtypes"})
CommittedBundle<?> bundle =
evaluationContext
.createKeyedBundle(
transformTimers.getKey(),
(PCollection)
Iterables.getOnlyElement(
transformTimers.getExecutable().getMainInputs().values()))
.add(WindowedValue.valueInGlobalWindow(work))
.commit(evaluationContext.now());
processBundle(
bundle,
transformTimers.getExecutable(),
new TimerIterableCompletionCallback(delivery));
state.set(ExecutorState.ACTIVE);
}
}
} catch (Exception e) {
LOG.error("Internal Error while delivering timers", e);
Expand Down Expand Up @@ -311,12 +317,14 @@ public final CommittedResult handleResult(
state.set(ExecutorState.ACTIVE);
}
outstandingWork.decrementAndGet();
inflightBundles.compute(
result.getTransform(),
(k, v) -> {
v.remove(inputBundle);
return v.isEmpty() ? null : v;
});
synchronized (inflightBundles) {
inflightBundles.compute(
result.getTransform(),
(k, v) -> {
v.remove(inputBundle);
return v.isEmpty() ? null : v;
});
}
return committedResult;
}

Expand Down
Loading