Skip to content

Commit

Permalink
feat: make stream wait timeout a first class citizen (#1473)
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf authored May 11, 2023
1 parent aa7c8bd commit bc8a4ad
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,12 @@ public void testNonRetryServerStreamingSettingsContextWithRetry() {
.build();
Duration newTimeout = Duration.ofSeconds(5);
RetrySettings contextRetrySettings =
retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build();
retrySettings
.toBuilder()
.setInitialRpcTimeout(newTimeout)
.setMaxRpcTimeout(newTimeout)
.setMaxAttempts(3)
.build();
GrpcCallContext retryingContext =
defaultCallContext
.withRetrySettings(contextRetrySettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
callable.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withStreamIdleTimeout(callSettings.getIdleTimeout()));
.withStreamIdleTimeout(callSettings.getIdleTimeout())
.withStreamWaitTimeout(callSettings.getWaitTimeout()));

return callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
* A callable that generates Server Streaming attempts. At any one time, it is responsible for at
Expand Down Expand Up @@ -181,15 +180,6 @@ public void cancel() {
}
isStarted = true;

// Propagate the totalTimeout as the overall stream deadline, so long as the user
// has not provided a timeout via the ApiCallContext. If they have, retain it.
Duration totalTimeout =
outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();

if (totalTimeout != null && context != null && context.getTimeout() == null) {
context = context.withTimeout(totalTimeout);
}

// Call the inner callable
call();
}
Expand Down Expand Up @@ -218,13 +208,10 @@ public Void call() {

ApiCallContext attemptContext = context;

// Set the streamWaitTimeout to the attempt RPC Timeout, only if the context
// does not already have a timeout set by a user via withStreamWaitTimeout.
if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
&& attemptContext.getStreamWaitTimeout() == null) {
&& attemptContext.getTimeout() == null) {
attemptContext =
attemptContext.withStreamWaitTimeout(
outerRetryingFuture.getAttemptSettings().getRpcTimeout());
attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout());
}

attemptContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@
* <p>This class includes settings that are applicable to all server streaming calls, which
* currently just includes retries and watchdog timers.
*
* <p>The watchdog timer is configured via {@code idleTimeout}. The watchdog will terminate any
* stream that has not has seen any demand (via {@link StreamController#request(int)}) in the
* configured interval. To turn off idle checks, set the interval to {@link Duration#ZERO}.
* <p>The watchdog timer is configured via {@code idleTimeout} and {@code waitTimeout}. The watchdog
* will terminate any stream that has not has seen any demand (via {@link
* StreamController#request(int)}) in the configured interval or has not seen a message from the
* server in {@code waitTimeout}. To turn off idle checks, set the interval to {@link
* Duration#ZERO}.
*
* <p>Retry configuration allows for the stream to be restarted and resumed. It is composed of 3
* parts: the retryable codes, the retry settings and the stream resumption strategy. The retryable
Expand Down Expand Up @@ -79,12 +81,14 @@ public final class ServerStreamingCallSettings<RequestT, ResponseT>
@Nonnull private final StreamResumptionStrategy<RequestT, ResponseT> resumptionStrategy;

@Nonnull private final Duration idleTimeout;
@Nonnull private final Duration waitTimeout;

private ServerStreamingCallSettings(Builder<RequestT, ResponseT> builder) {
this.retryableCodes = ImmutableSet.copyOf(builder.retryableCodes);
this.retrySettings = builder.retrySettingsBuilder.build();
this.resumptionStrategy = builder.resumptionStrategy;
this.idleTimeout = builder.idleTimeout;
this.waitTimeout = builder.waitTimeout;
}

/**
Expand Down Expand Up @@ -123,6 +127,15 @@ public Duration getIdleTimeout() {
return idleTimeout;
}

/**
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
* the {@link #waitTimeout} does.
*/
@Nonnull
public Duration getWaitTimeout() {
return waitTimeout;
}

public Builder<RequestT, ResponseT> toBuilder() {
return new Builder<>(this);
}
Expand All @@ -135,6 +148,7 @@ public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("idleTimeout", idleTimeout)
.add("waitTimeout", waitTimeout)
.add("retryableCodes", retryableCodes)
.add("retrySettings", retrySettings)
.toString();
Expand All @@ -148,13 +162,16 @@ public static class Builder<RequestT, ResponseT>

@Nonnull private Duration idleTimeout;

@Nonnull private Duration waitTimeout;

/** Initialize the builder with default settings */
private Builder() {
this.retryableCodes = ImmutableSet.of();
this.retrySettingsBuilder = RetrySettings.newBuilder();
this.resumptionStrategy = new SimpleStreamResumptionStrategy<>();

this.idleTimeout = Duration.ZERO;
this.waitTimeout = Duration.ZERO;
}

private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
Expand All @@ -164,6 +181,7 @@ private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
this.resumptionStrategy = settings.resumptionStrategy;

this.idleTimeout = settings.idleTimeout;
this.waitTimeout = settings.waitTimeout;
}

/**
Expand Down Expand Up @@ -233,9 +251,9 @@ public Builder<RequestT, ResponseT> setSimpleTimeoutNoRetries(@Nonnull Duration
.setInitialRetryDelay(Duration.ZERO)
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(Duration.ZERO)
.setInitialRpcTimeout(Duration.ZERO)
.setInitialRpcTimeout(timeout)
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ZERO)
.setMaxRpcTimeout(timeout)
.setMaxAttempts(1)
.build());

Expand Down Expand Up @@ -264,14 +282,27 @@ public Duration getIdleTimeout() {
}

/**
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
* the {@link #idleTimeout} does. {@link Duration#ZERO} disables the watchdog.
* Set how long to wait before considering the stream orphaned by the user and closing it.
* {@link Duration#ZERO} disables the check for abandoned streams.
*/
public Builder<RequestT, ResponseT> setIdleTimeout(@Nonnull Duration idleTimeout) {
this.idleTimeout = Preconditions.checkNotNull(idleTimeout);
return this;
}

@Nonnull
public Duration getWaitTimeout() {
return waitTimeout;
}

/**
* Set the maximum amount of time to wait for the next message from the server. {@link
* Duration#ZERO} disables the check for abandoned streams.
*/
public void setWaitTimeout(@Nonnull Duration waitTimeout) {
this.waitTimeout = waitTimeout;
}

@Override
public ServerStreamingCallSettings<RequestT, ResponseT> build() {
return new ServerStreamingCallSettings<>(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testNonRetriedServerStreamingCallableWithRetrySettings() throws Exce
ServerStreamingCallable<Object, Object> callable =
Callables.retrying(innerServerStreamingCallable, callSettings, clientContext);

Duration timeout = retrySettings.getTotalTimeout();
Duration timeout = retrySettings.getInitialRpcTimeout();

callable.call("Is your refrigerator running?", callContextWithRetrySettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ServerStreamingAttemptCallableTest {
private FakeRetryingFuture fakeRetryingFuture;
private StreamResumptionStrategy<String, String> resumptionStrategy;
private static Duration totalTimeout = Duration.ofHours(1);
private static final Duration attemptTimeout = Duration.ofMinutes(1);
private FakeCallContext mockedCallContext;

@Before
Expand Down Expand Up @@ -100,7 +101,6 @@ public void testUserProvidedContextTimeout() {
// Ensure that the callable did not overwrite the user provided timeouts
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.never())
.withStreamWaitTimeout(Mockito.any(Duration.class));

Expand Down Expand Up @@ -128,7 +128,7 @@ public void testNoUserProvidedContextTimeout() {
Mockito.doReturn(BaseApiTracer.getInstance()).when(mockedCallContext).getTracer();
Mockito.doReturn(null).when(mockedCallContext).getTimeout();
Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout();
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout);
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(attemptTimeout);
Mockito.doReturn(mockedCallContext)
.when(mockedCallContext)
.withStreamWaitTimeout(Mockito.any(Duration.class));
Expand All @@ -139,10 +139,7 @@ public void testNoUserProvidedContextTimeout() {
// Ensure that the callable configured the timeouts via the Settings in the
// absence of user-defined timeouts.
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1))
.withStreamWaitTimeout(Mockito.any(Duration.class));
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(attemptTimeout);

// Should notify outer observer
Truth.assertThat(observer.controller).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public void idleTimeoutIsNotLost() {
assertThat(builder.build().toBuilder().getIdleTimeout()).isEqualTo(idleTimeout);
}

@Test
public void waitTimeoutIsNotLost() {
Duration waitTimeout = Duration.ofSeconds(5);

ServerStreamingCallSettings.Builder<Object, Object> builder =
ServerStreamingCallSettings.newBuilder();

builder.setWaitTimeout(waitTimeout);

assertThat(builder.getWaitTimeout()).isEqualTo(waitTimeout);
assertThat(builder.build().getWaitTimeout()).isEqualTo(waitTimeout);
assertThat(builder.build().toBuilder().getWaitTimeout()).isEqualTo(waitTimeout);
}

@Test
public void testRetrySettingsBuilder() {
RetrySettings initialSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,28 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.rpc.CancelledException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Status;
import com.google.showcase.v1beta1.EchoClient;
import com.google.showcase.v1beta1.EchoResponse;
import com.google.showcase.v1beta1.EchoSettings;
import com.google.showcase.v1beta1.ExpandRequest;
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.threeten.bp.Duration;

public class ITServerSideStreaming {

Expand Down Expand Up @@ -104,6 +110,49 @@ public void testGrpc_serverError_receiveErrorAfterLastWordInStream() {
assertThat(cancelledException.getStatusCode().getCode()).isEqualTo(StatusCode.Code.CANCELLED);
}

@Test
public void testGrpc_serverWaitTimeout_watchdogCancelsStream() throws Exception {
EchoSettings.Builder settings =
EchoSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(
EchoSettings.defaultGrpcTransportProviderBuilder()
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
.build());

settings
.expandSettings()
.setIdleTimeout(Duration.ofMillis(100))
.setWaitTimeout(Duration.ofMillis(100));

settings.getStubSettingsBuilder().setStreamWatchdogCheckInterval(Duration.ofMillis(50));

EchoClient echoClient = EchoClient.create(settings.build());

String content = "The rain in Spain stays mainly on the plain!";
ServerStream<EchoResponse> responseStream =
echoClient
.expandCallable()
.call(
ExpandRequest.newBuilder()
.setContent(content)
// Configure server interval for returning the next response
.setStreamWaitTime(
com.google.protobuf.Duration.newBuilder().setSeconds(1).build())
.build());
ArrayList<String> responses = new ArrayList<>();
try {
for (EchoResponse response : responseStream) {
responses.add(response.getContent());
}
Assert.fail("No exception was thrown");
} catch (WatchdogTimeoutException e) {
assertThat(e).hasMessageThat().contains("Canceled due to timeout waiting for next response");
} finally {
echoClient.close();
}
}

@Test
public void testHttpJson_receiveStreamedContent() {
String content = "The rain in Spain stays mainly on the plain!";
Expand Down

0 comments on commit bc8a4ad

Please sign in to comment.