Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: make stream wait timeout a first class citizen [WIP] #1409

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ public void testNonRetryServerStreamingSettingsContextWithRetry() {
.build();
Duration newTimeout = Duration.ofSeconds(5);
RetrySettings contextRetrySettings =
retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build();
retrySettings
.toBuilder()
.setInitialRpcTimeout(newTimeout)
.setMaxRpcTimeout(newTimeout)
.setMaxAttempts(3)
.build();
GrpcCallContext retryingContext =
defaultCallContext
.withRetrySettings(contextRetrySettings)
Expand Down
3 changes: 2 additions & 1 deletion gax/src/main/java/com/google/api/gax/rpc/Callables.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
callable.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withStreamIdleTimeout(callSettings.getIdleTimeout()));
.withStreamIdleTimeout(callSettings.getIdleTimeout())
.withStreamWaitTimeout(callSettings.getWaitTimeout()));

return callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
* A callable that generates Server Streaming attempts. At any one time, it is responsible for at
Expand Down Expand Up @@ -181,15 +180,6 @@ public void cancel() {
}
isStarted = true;

// Propagate the totalTimeout as the overall stream deadline, so long as the user
// has not provided a timeout via the ApiCallContext. If they have, retain it.
Duration totalTimeout =
outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();

if (totalTimeout != null && context != null && context.getTimeout() == null) {
context = context.withTimeout(totalTimeout);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After removing this block, if someone set totalTimeout but not rpc timeout, the stream won't have a deadline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this already covered by #1191?


// Call the inner callable
call();
}
Expand Down Expand Up @@ -218,13 +208,10 @@ public Void call() {

ApiCallContext attemptContext = context;

// Set the streamWaitTimeout to the attempt RPC Timeout, only if the context
// does not already have a timeout set by a user via withStreamWaitTimeout.
if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
&& attemptContext.getStreamWaitTimeout() == null) {
&& attemptContext.getTimeout() == null) {
attemptContext =
attemptContext.withStreamWaitTimeout(
outerRetryingFuture.getAttemptSettings().getRpcTimeout());
attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout());
}

attemptContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
* <p>This class includes settings that are applicable to all server streaming calls, which
* currently just includes retries and watchdog timers.
*
* <p>The watchdog timer is configured via {@code idleTimeout}. The watchdog will terminate any
* stream that has not has seen any demand (via {@link StreamController#request(int)}) in the
* configured interval. To turn off idle checks, set the interval to {@link Duration#ZERO}.
* <p>The watchdog timer is configured via {@code idleTimeout} and {@code waitTimeout}. The watchdog
* will terminate any stream that has not has seen any demand (via {@link
* StreamController#request(int)}) in the configured interval or has not seen a message from the
* server in {@code waitTimeout}. To turn off idle checks, set the interval to {@link
* Duration#ZERO}.
*
* <p>Retry configuration allows for the stream to be restarted and resumed. It is composed of 3
* parts: the retryable codes, the retry settings and the stream resumption strategy. The retryable
Expand Down Expand Up @@ -81,12 +83,14 @@ public final class ServerStreamingCallSettings<RequestT, ResponseT>
@Nonnull private final StreamResumptionStrategy<RequestT, ResponseT> resumptionStrategy;

@Nonnull private final Duration idleTimeout;
@Nonnull private final Duration waitTimeout;

private ServerStreamingCallSettings(Builder<RequestT, ResponseT> builder) {
this.retryableCodes = ImmutableSet.copyOf(builder.retryableCodes);
this.retrySettings = builder.retrySettingsBuilder.build();
this.resumptionStrategy = builder.resumptionStrategy;
this.idleTimeout = builder.idleTimeout;
this.waitTimeout = builder.waitTimeout;
}

/**
Expand Down Expand Up @@ -125,6 +129,15 @@ public Duration getIdleTimeout() {
return idleTimeout;
}

/**
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
* the {@link #waitTimeout} does.
*/
@Nonnull
public Duration getWaitTimeout() {
return waitTimeout;
}

public Builder<RequestT, ResponseT> toBuilder() {
return new Builder<>(this);
}
Expand All @@ -137,6 +150,7 @@ public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("idleTimeout", idleTimeout)
.add("waitTimeout", waitTimeout)
.add("retryableCodes", retryableCodes)
.add("retrySettings", retrySettings)
.toString();
Expand All @@ -149,6 +163,7 @@ public static class Builder<RequestT, ResponseT>
@Nonnull private StreamResumptionStrategy<RequestT, ResponseT> resumptionStrategy;

@Nonnull private Duration idleTimeout;
@Nonnull private Duration waitTimeout;

/** Initialize the builder with default settings */
private Builder() {
Expand All @@ -157,6 +172,7 @@ private Builder() {
this.resumptionStrategy = new SimpleStreamResumptionStrategy<>();

this.idleTimeout = Duration.ZERO;
this.waitTimeout = Duration.ZERO;
}

private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
Expand All @@ -166,6 +182,7 @@ private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
this.resumptionStrategy = settings.resumptionStrategy;

this.idleTimeout = settings.idleTimeout;
this.waitTimeout = settings.waitTimeout;
}

/**
Expand Down Expand Up @@ -235,9 +252,9 @@ public Builder<RequestT, ResponseT> setSimpleTimeoutNoRetries(@Nonnull Duration
.setInitialRetryDelay(Duration.ZERO)
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(Duration.ZERO)
.setInitialRpcTimeout(Duration.ZERO)
.setInitialRpcTimeout(timeout)
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ZERO)
.setMaxRpcTimeout(timeout)
.setMaxAttempts(1)
.build());

Expand Down Expand Up @@ -274,6 +291,19 @@ public Builder<RequestT, ResponseT> setIdleTimeout(@Nonnull Duration idleTimeout
return this;
}

@Nonnull
public Duration getWaitTimeout() {
return waitTimeout;
}

/**
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
* the {@link #waitTimeout} does. {@link Duration#ZERO} disables the watchdog.
*/
public void setWaitTimeout(@Nonnull Duration waitTimeout) {
this.waitTimeout = waitTimeout;
}

@Override
public ServerStreamingCallSettings<RequestT, ResponseT> build() {
return new ServerStreamingCallSettings<>(this);
Expand Down
2 changes: 1 addition & 1 deletion gax/src/test/java/com/google/api/gax/rpc/CallableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testNonRetriedServerStreamingCallableWithRetrySettings() throws Exce
ServerStreamingCallable<Object, Object> callable =
Callables.retrying(innerServerStreamingCallable, callSettings, clientContext);

Duration timeout = retrySettings.getTotalTimeout();
Duration timeout = retrySettings.getInitialRpcTimeout();

callable.call("Is your refrigerator running?", callContextWithRetrySettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class ServerStreamingAttemptCallableTest {
private AccumulatingObserver observer;
private FakeRetryingFuture fakeRetryingFuture;
private StreamResumptionStrategy<String, String> resumptionStrategy;
private static Duration totalTimeout = Duration.ofHours(1);
private static final Duration totalTimeout = Duration.ofHours(1);
private static final Duration attemptTimeout = Duration.ofMinutes(1);
private FakeCallContext mockedCallContext;

@Before
Expand Down Expand Up @@ -100,7 +101,6 @@ public void testUserProvidedContextTimeout() {
// Ensure that the callable did not overwrite the user provided timeouts
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.never())
.withStreamWaitTimeout(Mockito.any(Duration.class));

Expand Down Expand Up @@ -128,7 +128,7 @@ public void testNoUserProvidedContextTimeout() {
Mockito.doReturn(NoopApiTracer.getInstance()).when(mockedCallContext).getTracer();
Mockito.doReturn(null).when(mockedCallContext).getTimeout();
Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout();
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout);
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(attemptTimeout);
Mockito.doReturn(mockedCallContext)
.when(mockedCallContext)
.withStreamWaitTimeout(Mockito.any(Duration.class));
Expand All @@ -139,10 +139,7 @@ public void testNoUserProvidedContextTimeout() {
// Ensure that the callable configured the timeouts via the Settings in the
// absence of user-defined timeouts.
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1))
.withStreamWaitTimeout(Mockito.any(Duration.class));
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(attemptTimeout);

// Should notify outer observer
Truth.assertThat(observer.controller).isNotNull();
Expand Down