Skip to content

Commit

Permalink
Allow to set timeout for finishing a remote bundle in Samza portable …
Browse files Browse the repository at this point in the history
…runner (#25031)
  • Loading branch information
alnzng authored Jan 20, 2023
1 parent 9cecec3 commit 5e1ebee
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ public interface SamzaPipelineOptions extends PipelineOptions {

void setMaxBundleTimeMs(long maxBundleTimeMs);

@Description(
"Wait if necessary for completing a remote bundle processing for at most the given time (in milliseconds). if the value of timeout is negative, wait forever until the bundle processing is completed. Used only in portable mode for now.")
@Default.Long(-1)
long getBundleProcessingTimeout();

void setBundleProcessingTimeout(long timeoutMs);

@Description(
"The number of threads to run DoFn.processElements in parallel within a bundle. Used only in non-portable mode.")
@Default.Integer(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
Expand Down Expand Up @@ -232,6 +236,7 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
(SamzaExecutionContext) context.getApplicationContainerContext();
final DoFnRunner<InT, FnOutT> underlyingRunner =
new SdkHarnessDoFnRunner<>(
pipelineOptions,
stepName,
timerInternalsFactory,
WindowUtils.getWindowStrategy(
Expand All @@ -248,10 +253,11 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
: underlyingRunner;
}

private static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT> {
static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT> {

private static final int DEFAULT_METRIC_SAMPLE_RATE = 100;

private final SamzaPipelineOptions pipelineOptions;
private final SamzaTimerInternalsFactory timerInternalsFactory;
private final WindowingStrategy windowingStrategy;
private final DoFnRunners.OutputManager outputManager;
Expand All @@ -267,6 +273,7 @@ private static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT
private final String metricName;

private SdkHarnessDoFnRunner(
SamzaPipelineOptions pipelineOptions,
String stepName,
SamzaTimerInternalsFactory<?> timerInternalsFactory,
WindowingStrategy windowingStrategy,
Expand All @@ -276,6 +283,7 @@ private SdkHarnessDoFnRunner(
BagState<WindowedValue<InT>> bundledEventsBag,
StateRequestHandler stateRequestHandler,
SamzaExecutionContext samzaExecutionContext) {
this.pipelineOptions = pipelineOptions;
this.timerInternalsFactory = timerInternalsFactory;
this.windowingStrategy = windowingStrategy;
this.outputManager = outputManager;
Expand Down Expand Up @@ -426,8 +434,16 @@ public <KeyT> void onTimer(
@Override
public void finishBundle() {
try {
// RemoteBundle close blocks until all results are received
remoteBundle.close();
runWithTimeout(
pipelineOptions.getBundleProcessingTimeout(),
() -> {
// RemoteBundle close blocks until all results are received
try {
remoteBundle.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
emitResults();
emitMetrics();
bundledEventsBag.clear();
Expand All @@ -439,6 +455,22 @@ public void finishBundle() {
}
}

/**
* Run a function and wait for at most the given time (in milliseconds).
*
* @param timeoutInMs the time to wait for completing the function call. If the value of timeout
* is negative, wait forever until the function call is completed
* @param runnable the main function
*/
static void runWithTimeout(long timeoutInMs, Runnable runnable)
throws ExecutionException, InterruptedException, TimeoutException {
if (timeoutInMs < 0) {
runnable.run();
} else {
CompletableFuture.runAsync(runnable).get(timeoutInMs, TimeUnit.MILLISECONDS);
}
}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;

import java.util.concurrent.TimeoutException;
import org.junit.Test;

public class SdkHarnessDoFnRunnerTest {

@Test(expected = TimeoutException.class)
public void testRunWithTimeoutOccurred() throws Exception {
SamzaDoFnRunners.SdkHarnessDoFnRunner.runWithTimeout(
100,
() -> {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
});
}

@Test
public void testRunWithTimeoutDisabled() throws Exception {
SamzaDoFnRunners.SdkHarnessDoFnRunner.runWithTimeout(
-1,
() -> {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
});
}
}

0 comments on commit 5e1ebee

Please sign in to comment.