diff --git a/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java b/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java index 8a4df4244..e1777ea65 100644 --- a/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java +++ b/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java @@ -244,7 +244,12 @@ public void testNonRetryServerStreamingSettingsContextWithRetry() { .build(); Duration newTimeout = Duration.ofSeconds(5); RetrySettings contextRetrySettings = - retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build(); + retrySettings + .toBuilder() + .setInitialRpcTimeout(newTimeout) + .setMaxRpcTimeout(newTimeout) + .setMaxAttempts(3) + .build(); GrpcCallContext retryingContext = defaultCallContext .withRetrySettings(contextRetrySettings) diff --git a/gax/src/main/java/com/google/api/gax/rpc/Callables.java b/gax/src/main/java/com/google/api/gax/rpc/Callables.java index 939e66b7f..348c84cfd 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Callables.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Callables.java @@ -117,7 +117,8 @@ public static ServerStreamingCallable callable.withDefaultCallContext( clientContext .getDefaultCallContext() - .withStreamIdleTimeout(callSettings.getIdleTimeout())); + .withStreamIdleTimeout(callSettings.getIdleTimeout()) + .withStreamWaitTimeout(callSettings.getWaitTimeout())); return callable; } diff --git a/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java b/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java index cdae46ffc..c67362151 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java @@ -37,7 +37,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import javax.annotation.concurrent.GuardedBy; -import org.threeten.bp.Duration; /** * A callable that generates Server Streaming attempts. At any one time, it is responsible for at @@ -181,15 +180,6 @@ public void cancel() { } isStarted = true; - // Propagate the totalTimeout as the overall stream deadline, so long as the user - // has not provided a timeout via the ApiCallContext. If they have, retain it. - Duration totalTimeout = - outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout(); - - if (totalTimeout != null && context != null && context.getTimeout() == null) { - context = context.withTimeout(totalTimeout); - } - // Call the inner callable call(); } @@ -218,13 +208,10 @@ public Void call() { ApiCallContext attemptContext = context; - // Set the streamWaitTimeout to the attempt RPC Timeout, only if the context - // does not already have a timeout set by a user via withStreamWaitTimeout. if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero() - && attemptContext.getStreamWaitTimeout() == null) { + && attemptContext.getTimeout() == null) { attemptContext = - attemptContext.withStreamWaitTimeout( - outerRetryingFuture.getAttemptSettings().getRpcTimeout()); + attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout()); } attemptContext diff --git a/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java b/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java index 3fe85948a..91215c963 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java @@ -48,9 +48,11 @@ *

This class includes settings that are applicable to all server streaming calls, which * currently just includes retries and watchdog timers. * - *

The watchdog timer is configured via {@code idleTimeout}. The watchdog will terminate any - * stream that has not has seen any demand (via {@link StreamController#request(int)}) in the - * configured interval. To turn off idle checks, set the interval to {@link Duration#ZERO}. + *

The watchdog timer is configured via {@code idleTimeout} and {@code waitTimeout}. The watchdog + * will terminate any stream that has not has seen any demand (via {@link + * StreamController#request(int)}) in the configured interval or has not seen a message from the + * server in {@code waitTimeout}. To turn off idle checks, set the interval to {@link + * Duration#ZERO}. * *

Retry configuration allows for the stream to be restarted and resumed. It is composed of 3 * parts: the retryable codes, the retry settings and the stream resumption strategy. The retryable @@ -81,12 +83,14 @@ public final class ServerStreamingCallSettings @Nonnull private final StreamResumptionStrategy resumptionStrategy; @Nonnull private final Duration idleTimeout; + @Nonnull private final Duration waitTimeout; private ServerStreamingCallSettings(Builder builder) { this.retryableCodes = ImmutableSet.copyOf(builder.retryableCodes); this.retrySettings = builder.retrySettingsBuilder.build(); this.resumptionStrategy = builder.resumptionStrategy; this.idleTimeout = builder.idleTimeout; + this.waitTimeout = builder.waitTimeout; } /** @@ -125,6 +129,15 @@ public Duration getIdleTimeout() { return idleTimeout; } + /** + * See the class documentation of {@link ServerStreamingCallSettings} for a description of what + * the {@link #waitTimeout} does. + */ + @Nonnull + public Duration getWaitTimeout() { + return waitTimeout; + } + public Builder toBuilder() { return new Builder<>(this); } @@ -137,6 +150,7 @@ public static Builder newBuilder() { public String toString() { return MoreObjects.toStringHelper(this) .add("idleTimeout", idleTimeout) + .add("waitTimeout", waitTimeout) .add("retryableCodes", retryableCodes) .add("retrySettings", retrySettings) .toString(); @@ -149,6 +163,7 @@ public static class Builder @Nonnull private StreamResumptionStrategy resumptionStrategy; @Nonnull private Duration idleTimeout; + @Nonnull private Duration waitTimeout; /** Initialize the builder with default settings */ private Builder() { @@ -157,6 +172,7 @@ private Builder() { this.resumptionStrategy = new SimpleStreamResumptionStrategy<>(); this.idleTimeout = Duration.ZERO; + this.waitTimeout = Duration.ZERO; } private Builder(ServerStreamingCallSettings settings) { @@ -166,6 +182,7 @@ private Builder(ServerStreamingCallSettings settings) { this.resumptionStrategy = settings.resumptionStrategy; this.idleTimeout = settings.idleTimeout; + this.waitTimeout = settings.waitTimeout; } /** @@ -235,9 +252,9 @@ public Builder setSimpleTimeoutNoRetries(@Nonnull Duration .setInitialRetryDelay(Duration.ZERO) .setRetryDelayMultiplier(1) .setMaxRetryDelay(Duration.ZERO) - .setInitialRpcTimeout(Duration.ZERO) + .setInitialRpcTimeout(timeout) .setRpcTimeoutMultiplier(1) - .setMaxRpcTimeout(Duration.ZERO) + .setMaxRpcTimeout(timeout) .setMaxAttempts(1) .build()); @@ -274,6 +291,19 @@ public Builder setIdleTimeout(@Nonnull Duration idleTimeout return this; } + @Nonnull + public Duration getWaitTimeout() { + return waitTimeout; + } + + /** + * See the class documentation of {@link ServerStreamingCallSettings} for a description of what + * the {@link #waitTimeout} does. {@link Duration#ZERO} disables the watchdog. + */ + public void setWaitTimeout(@Nonnull Duration waitTimeout) { + this.waitTimeout = waitTimeout; + } + @Override public ServerStreamingCallSettings build() { return new ServerStreamingCallSettings<>(this); diff --git a/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java b/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java index eaa0be04c..04e025fec 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java @@ -137,7 +137,7 @@ public void testNonRetriedServerStreamingCallableWithRetrySettings() throws Exce ServerStreamingCallable callable = Callables.retrying(innerServerStreamingCallable, callSettings, clientContext); - Duration timeout = retrySettings.getTotalTimeout(); + Duration timeout = retrySettings.getInitialRpcTimeout(); callable.call("Is your refrigerator running?", callContextWithRetrySettings); diff --git a/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java b/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java index 5cdf8bc4a..576759391 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java @@ -61,7 +61,8 @@ public class ServerStreamingAttemptCallableTest { private AccumulatingObserver observer; private FakeRetryingFuture fakeRetryingFuture; private StreamResumptionStrategy resumptionStrategy; - private static Duration totalTimeout = Duration.ofHours(1); + private static final Duration totalTimeout = Duration.ofHours(1); + private static final Duration attemptTimeout = Duration.ofMinutes(1); private FakeCallContext mockedCallContext; @Before @@ -100,7 +101,6 @@ public void testUserProvidedContextTimeout() { // Ensure that the callable did not overwrite the user provided timeouts Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout(); Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout); - Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout(); Mockito.verify(mockedCallContext, Mockito.never()) .withStreamWaitTimeout(Mockito.any(Duration.class)); @@ -128,7 +128,7 @@ public void testNoUserProvidedContextTimeout() { Mockito.doReturn(NoopApiTracer.getInstance()).when(mockedCallContext).getTracer(); Mockito.doReturn(null).when(mockedCallContext).getTimeout(); Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout(); - Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout); + Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(attemptTimeout); Mockito.doReturn(mockedCallContext) .when(mockedCallContext) .withStreamWaitTimeout(Mockito.any(Duration.class)); @@ -139,10 +139,7 @@ public void testNoUserProvidedContextTimeout() { // Ensure that the callable configured the timeouts via the Settings in the // absence of user-defined timeouts. Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout(); - Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout); - Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout(); - Mockito.verify(mockedCallContext, Mockito.times(1)) - .withStreamWaitTimeout(Mockito.any(Duration.class)); + Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(attemptTimeout); // Should notify outer observer Truth.assertThat(observer.controller).isNotNull();