Skip to content

Commit

Permalink
[BEAM-13015] Only create a TimerBundleTracker if there are timers. (a…
Browse files Browse the repository at this point in the history
…pache#17445)

TimerBundleTracker.outputTimers shows up as 1% of CPU on internal benchmark
where the DoFns do not have timers.
  • Loading branch information
scwhittle authored May 20, 2022
1 parent bac4e28 commit 0c9cf43
Showing 1 changed file with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
private final String mainInputId;
private final FnApiStateAccessor<?> stateAccessor;
private final Map<String, FnDataReceiver<?>> outboundTimerReceivers;
private FnApiTimerBundleTracker timerBundleTracker;
private final @Nullable FnApiTimerBundleTracker timerBundleTracker;
private final DoFnInvoker<InputT, OutputT> doFnInvoker;
private final StartBundleArgumentProvider startBundleArgumentProvider;
private final ProcessBundleContextBase processContext;
Expand Down Expand Up @@ -483,10 +483,6 @@ private interface TriFunction<FirstT, SecondT, ThirdT> {
this.bundleFinalizer = bundleFinalizer;
this.onTimerContext = new OnTimerContext();
this.onWindowExpirationContext = new OnWindowExpirationContext<>();
this.timerBundleTracker =
new FnApiTimerBundleTracker(
keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow);
addResetFunction.accept(timerBundleTracker::reset);

this.mainOutputConsumers =
(Collection<FnDataReceiver<WindowedValue<OutputT>>>)
Expand Down Expand Up @@ -737,12 +733,22 @@ private ByteString encodeProgress(double value) throws IOException {
() -> currentWindow);

// Register as a consumer for each timer.
outboundTimerReceivers = new HashMap<>();
for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfo :
timerFamilyInfos.entrySet()) {
String localName = timerFamilyInfo.getKey();
Coder<Timer<Object>> timerCoder = timerFamilyInfo.getValue().getValue();
outboundTimerReceivers.put(localName, getOutgoingTimersConsumer.apply(localName, timerCoder));
this.outboundTimerReceivers = new HashMap<>();
if (timerFamilyInfos.isEmpty()) {
this.timerBundleTracker = null;
} else {
this.timerBundleTracker =
new FnApiTimerBundleTracker(
keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow);
addResetFunction.accept(timerBundleTracker::reset);

for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfo :
timerFamilyInfos.entrySet()) {
String localName = timerFamilyInfo.getKey();
Coder<Timer<Object>> timerCoder = timerFamilyInfo.getValue().getValue();
outboundTimerReceivers.put(
localName, getOutgoingTimersConsumer.apply(localName, timerCoder));
}
}
}

Expand Down Expand Up @@ -1639,6 +1645,7 @@ private HandlesSplits.SplitResult trySplitForElementAndRestriction(

private <K> void processTimer(
String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
checkNotNull(timerBundleTracker);
try {
currentKey = timer.getUserKey();
Iterator<BoundedWindow> windowIterator =
Expand Down Expand Up @@ -1737,7 +1744,9 @@ private <K> void processOnWindowExpiration(Timer<K> timer) {
}

private void finishBundle() throws Exception {
timerBundleTracker.outputTimers(outboundTimerReceivers::get);
if (timerBundleTracker != null) {
timerBundleTracker.outputTimers(outboundTimerReceivers::get);
}

doFnInvoker.invokeFinishBundle(finishBundleArgumentProvider);

Expand Down Expand Up @@ -1813,6 +1822,7 @@ private class FnApiTimer<K> implements org.apache.beam.sdk.state.Timer {

@Override
public void set(Instant absoluteTime) {
checkNotNull(timerBundleTracker);
// Ensures that the target time is reasonable. For event time timers this means that the time
// should be prior to window GC time.
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
Expand All @@ -1829,6 +1839,7 @@ public void set(Instant absoluteTime) {

@Override
public void setRelative() {
checkNotNull(timerBundleTracker);
Instant target;
if (period.equals(Duration.ZERO)) {
target = fireTimestamp.plus(offset);
Expand All @@ -1845,6 +1856,7 @@ public void setRelative() {

@Override
public void clear() {
checkNotNull(timerBundleTracker);
timerBundleTracker.timerModified(timerIdOrFamily, timeDomain, getClearedTimer());
}

Expand Down

0 comments on commit 0c9cf43

Please sign in to comment.