From f0b143c2b2ce8593a3bea360ea9c0a73d098b8c2 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 22 Apr 2022 16:16:41 +0000 Subject: [PATCH] [BEAM-13015] Only create a TimerBundleTracker if there are timers. TimerBundleTracker.outputTimers shows up as 1% of CPU on internal benchmark where the DoFns do not have timers. --- .../beam/fn/harness/FnApiDoFnRunner.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index cf84027a7e90..dfb870eed184 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -236,7 +236,7 @@ static class Factory stateAccessor; private final Map> outboundTimerReceivers; - private FnApiTimerBundleTracker timerBundleTracker; + private final @Nullable FnApiTimerBundleTracker timerBundleTracker; private final DoFnInvoker doFnInvoker; private final StartBundleArgumentProvider startBundleArgumentProvider; private final ProcessBundleContextBase processContext; @@ -483,10 +483,6 @@ private interface TriFunction { this.bundleFinalizer = bundleFinalizer; this.onTimerContext = new OnTimerContext(); this.onWindowExpirationContext = new OnWindowExpirationContext<>(); - this.timerBundleTracker = - new FnApiTimerBundleTracker( - keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow); - addResetFunction.accept(timerBundleTracker::reset); this.mainOutputConsumers = (Collection>>) @@ -737,12 +733,22 @@ private ByteString encodeProgress(double value) throws IOException { () -> currentWindow); // Register as a consumer for each timer. - outboundTimerReceivers = new HashMap<>(); - for (Map.Entry>>> timerFamilyInfo : - timerFamilyInfos.entrySet()) { - String localName = timerFamilyInfo.getKey(); - Coder> timerCoder = timerFamilyInfo.getValue().getValue(); - outboundTimerReceivers.put(localName, getOutgoingTimersConsumer.apply(localName, timerCoder)); + this.outboundTimerReceivers = new HashMap<>(); + if (timerFamilyInfos.isEmpty()) { + this.timerBundleTracker = null; + } else { + this.timerBundleTracker = + new FnApiTimerBundleTracker( + keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow); + addResetFunction.accept(timerBundleTracker::reset); + + for (Map.Entry>>> timerFamilyInfo : + timerFamilyInfos.entrySet()) { + String localName = timerFamilyInfo.getKey(); + Coder> timerCoder = timerFamilyInfo.getValue().getValue(); + outboundTimerReceivers.put( + localName, getOutgoingTimersConsumer.apply(localName, timerCoder)); + } } } @@ -1639,6 +1645,7 @@ private HandlesSplits.SplitResult trySplitForElementAndRestriction( private void processTimer( String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { + checkNotNull(timerBundleTracker); try { currentKey = timer.getUserKey(); Iterator windowIterator = @@ -1737,7 +1744,9 @@ private void processOnWindowExpiration(Timer timer) { } private void finishBundle() throws Exception { - timerBundleTracker.outputTimers(outboundTimerReceivers::get); + if (timerBundleTracker != null) { + timerBundleTracker.outputTimers(outboundTimerReceivers::get); + } doFnInvoker.invokeFinishBundle(finishBundleArgumentProvider); @@ -1811,6 +1820,7 @@ private class FnApiTimer implements org.apache.beam.sdk.state.Timer { @Override public void set(Instant absoluteTime) { + checkNotNull(timerBundleTracker); // Ensures that the target time is reasonable. For event time timers this means that the time // should be prior to window GC time. if (TimeDomain.EVENT_TIME.equals(timeDomain)) { @@ -1827,6 +1837,7 @@ public void set(Instant absoluteTime) { @Override public void setRelative() { + checkNotNull(timerBundleTracker); Instant target; if (period.equals(Duration.ZERO)) { target = fireTimestamp.plus(offset); @@ -1843,6 +1854,7 @@ public void setRelative() { @Override public void clear() { + checkNotNull(timerBundleTracker); timerBundleTracker.timerModified(timerIdOrFamily, timeDomain, getClearedTimer()); }