From 5e1ebee8b47a71a243abbf7da79882da5deebb7d Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Fri, 20 Jan 2023 09:02:20 -0800 Subject: [PATCH] Allow to set timeout for finishing a remote bundle in Samza portable runner (#25031) --- .../runners/samza/SamzaPipelineOptions.java | 7 +++ .../samza/runtime/SamzaDoFnRunners.java | 38 +++++++++++++-- .../runtime/SdkHarnessDoFnRunnerTest.java | 48 +++++++++++++++++++ 3 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SdkHarnessDoFnRunnerTest.java diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java index 814b14f98b8a..9dd5234c35bf 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java @@ -135,6 +135,13 @@ public interface SamzaPipelineOptions extends PipelineOptions { void setMaxBundleTimeMs(long maxBundleTimeMs); + @Description( + "Wait if necessary for completing a remote bundle processing for at most the given time (in milliseconds). if the value of timeout is negative, wait forever until the bundle processing is completed. Used only in portable mode for now.") + @Default.Long(-1) + long getBundleProcessingTimeout(); + + void setBundleProcessingTimeout(long timeoutMs); + @Description( "The number of threads to run DoFn.processElements in parallel within a bundle. Used only in non-portable mode.") @Default.Integer(1) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index 41fe8190dec1..8af62059de9f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -21,8 +21,12 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -232,6 +236,7 @@ public static DoFnRunner createPortable( (SamzaExecutionContext) context.getApplicationContainerContext(); final DoFnRunner underlyingRunner = new SdkHarnessDoFnRunner<>( + pipelineOptions, stepName, timerInternalsFactory, WindowUtils.getWindowStrategy( @@ -248,10 +253,11 @@ public static DoFnRunner createPortable( : underlyingRunner; } - private static class SdkHarnessDoFnRunner implements DoFnRunner { + static class SdkHarnessDoFnRunner implements DoFnRunner { private static final int DEFAULT_METRIC_SAMPLE_RATE = 100; + private final SamzaPipelineOptions pipelineOptions; private final SamzaTimerInternalsFactory timerInternalsFactory; private final WindowingStrategy windowingStrategy; private final DoFnRunners.OutputManager outputManager; @@ -267,6 +273,7 @@ private static class SdkHarnessDoFnRunner implements DoFnRunner timerInternalsFactory, WindowingStrategy windowingStrategy, @@ -276,6 +283,7 @@ private SdkHarnessDoFnRunner( BagState> bundledEventsBag, StateRequestHandler stateRequestHandler, SamzaExecutionContext samzaExecutionContext) { + this.pipelineOptions = pipelineOptions; this.timerInternalsFactory = timerInternalsFactory; this.windowingStrategy = windowingStrategy; this.outputManager = outputManager; @@ -426,8 +434,16 @@ public void onTimer( @Override public void finishBundle() { try { - // RemoteBundle close blocks until all results are received - remoteBundle.close(); + runWithTimeout( + pipelineOptions.getBundleProcessingTimeout(), + () -> { + // RemoteBundle close blocks until all results are received + try { + remoteBundle.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); emitResults(); emitMetrics(); bundledEventsBag.clear(); @@ -439,6 +455,22 @@ public void finishBundle() { } } + /** + * Run a function and wait for at most the given time (in milliseconds). + * + * @param timeoutInMs the time to wait for completing the function call. If the value of timeout + * is negative, wait forever until the function call is completed + * @param runnable the main function + */ + static void runWithTimeout(long timeoutInMs, Runnable runnable) + throws ExecutionException, InterruptedException, TimeoutException { + if (timeoutInMs < 0) { + runnable.run(); + } else { + CompletableFuture.runAsync(runnable).get(timeoutInMs, TimeUnit.MILLISECONDS); + } + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SdkHarnessDoFnRunnerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SdkHarnessDoFnRunnerTest.java new file mode 100644 index 000000000000..e6029beb93b0 --- /dev/null +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SdkHarnessDoFnRunnerTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import java.util.concurrent.TimeoutException; +import org.junit.Test; + +public class SdkHarnessDoFnRunnerTest { + + @Test(expected = TimeoutException.class) + public void testRunWithTimeoutOccurred() throws Exception { + SamzaDoFnRunners.SdkHarnessDoFnRunner.runWithTimeout( + 100, + () -> { + try { + Thread.sleep(500); + } catch (InterruptedException ignored) { + } + }); + } + + @Test + public void testRunWithTimeoutDisabled() throws Exception { + SamzaDoFnRunners.SdkHarnessDoFnRunner.runWithTimeout( + -1, + () -> { + try { + Thread.sleep(500); + } catch (InterruptedException ignored) { + } + }); + } +}