diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 2d3462eb2693..54055475048a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -236,7 +236,7 @@ static class Factory stateAccessor; private final Map> outboundTimerReceivers; - private FnApiTimerBundleTracker timerBundleTracker; + private final @Nullable FnApiTimerBundleTracker timerBundleTracker; private final DoFnInvoker doFnInvoker; private final StartBundleArgumentProvider startBundleArgumentProvider; private final ProcessBundleContextBase processContext; @@ -483,10 +483,6 @@ private interface TriFunction { this.bundleFinalizer = bundleFinalizer; this.onTimerContext = new OnTimerContext(); this.onWindowExpirationContext = new OnWindowExpirationContext<>(); - this.timerBundleTracker = - new FnApiTimerBundleTracker( - keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow); - addResetFunction.accept(timerBundleTracker::reset); this.mainOutputConsumers = (Collection>>) @@ -737,12 +733,22 @@ private ByteString encodeProgress(double value) throws IOException { () -> currentWindow); // Register as a consumer for each timer. - outboundTimerReceivers = new HashMap<>(); - for (Map.Entry>>> timerFamilyInfo : - timerFamilyInfos.entrySet()) { - String localName = timerFamilyInfo.getKey(); - Coder> timerCoder = timerFamilyInfo.getValue().getValue(); - outboundTimerReceivers.put(localName, getOutgoingTimersConsumer.apply(localName, timerCoder)); + this.outboundTimerReceivers = new HashMap<>(); + if (timerFamilyInfos.isEmpty()) { + this.timerBundleTracker = null; + } else { + this.timerBundleTracker = + new FnApiTimerBundleTracker( + keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow); + addResetFunction.accept(timerBundleTracker::reset); + + for (Map.Entry>>> timerFamilyInfo : + timerFamilyInfos.entrySet()) { + String localName = timerFamilyInfo.getKey(); + Coder> timerCoder = timerFamilyInfo.getValue().getValue(); + outboundTimerReceivers.put( + localName, getOutgoingTimersConsumer.apply(localName, timerCoder)); + } } } @@ -1639,6 +1645,7 @@ private HandlesSplits.SplitResult trySplitForElementAndRestriction( private void processTimer( String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { + checkNotNull(timerBundleTracker); try { currentKey = timer.getUserKey(); Iterator windowIterator = @@ -1737,7 +1744,9 @@ private void processOnWindowExpiration(Timer timer) { } private void finishBundle() throws Exception { - timerBundleTracker.outputTimers(outboundTimerReceivers::get); + if (timerBundleTracker != null) { + timerBundleTracker.outputTimers(outboundTimerReceivers::get); + } doFnInvoker.invokeFinishBundle(finishBundleArgumentProvider); @@ -1813,6 +1822,7 @@ private class FnApiTimer implements org.apache.beam.sdk.state.Timer { @Override public void set(Instant absoluteTime) { + checkNotNull(timerBundleTracker); // Ensures that the target time is reasonable. For event time timers this means that the time // should be prior to window GC time. if (TimeDomain.EVENT_TIME.equals(timeDomain)) { @@ -1829,6 +1839,7 @@ public void set(Instant absoluteTime) { @Override public void setRelative() { + checkNotNull(timerBundleTracker); Instant target; if (period.equals(Duration.ZERO)) { target = fireTimestamp.plus(offset); @@ -1845,6 +1856,7 @@ public void setRelative() { @Override public void clear() { + checkNotNull(timerBundleTracker); timerBundleTracker.timerModified(timerIdOrFamily, timeDomain, getClearedTimer()); }