Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't retry on network failure on writes #7243

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -81,7 +81,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.FORBIDDEN) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN))
{
logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString());
logger.warn("Endpoint not writable. Will refresh cache and retry ", e);
return this.shouldRetryOnEndpointFailureAsync(false, true);
}

@@ -91,14 +91,18 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.DATABASE_ACCOUNT_NOTFOUND) &&
this.isReadRequest)
{
logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString());
logger.warn("Endpoint not available for reads. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(true, false);
}

// Received Connection error (HttpRequestException), initiate the endpoint rediscovery
// Received Connection error (HttpException), initiate the endpoint rediscovery
if (WebExceptionUtility.isNetworkFailure(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString());
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
} else {
return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false);
}
}

if (clientException != null &&
@@ -147,23 +151,14 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead
return Mono.just(ShouldRetryResult.noRetry());
}

this.failoverRetryCount++;

// Mark the current read endpoint as unavailable
if (this.isReadRequest) {
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint);
} else {
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint);
}
Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh);

// Some requests may be in progress when the endpoint manager and client are closed.
// In that case, the request won't succeed since the http client is closed.
// Therefore just skip the retry here to avoid the delay because retrying won't go through in the end.

Duration retryDelay = Duration.ZERO;
if (!this.isReadRequest) {
if (!isReadRequest) {
logger.debug("Failover happening. retryCount {}", this.failoverRetryCount);
if (this.failoverRetryCount > 1) {
//if retried both endpoints, follow regular retry interval.
@@ -172,9 +167,32 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead
} else {
retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS);
}
return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay)));
}

private Mono<ShouldRetryResult> shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount);
return Mono.just(ShouldRetryResult.noRetry());
}
Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh);
return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.noRetry()));
}

private Mono<Void> refreshLocation(boolean isReadRequest, boolean forceRefresh) {
this.failoverRetryCount++;

// Mark the current read endpoint as unavailable
if (isReadRequest) {
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint);
} else {
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint);
}

this.retryContext = new RetryContext(this.failoverRetryCount, false);
return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh)
.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay)));
return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh);
}

@Override
Original file line number Diff line number Diff line change
@@ -66,15 +66,68 @@ public void networkFailureOnWrite() throws Exception {
Mono<IRetryPolicy.ShouldRetryResult> shouldRetry = clientRetryPolicy.shouldRetry(exception);
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(true)
.backOfTime(i > 0 ? Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS) : Duration.ZERO)
.shouldRetry(false)
.build());

Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any());
}
}

@Test(groups = "unit")
public void networkFailureOnUpsert() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
OperationType.Upsert, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class);

clientRetryPolicy.onBeforeSendRequest(dsr);
for (int i = 0; i < 10; i++) {
Mono<IRetryPolicy.ShouldRetryResult> shouldRetry = clientRetryPolicy.shouldRetry(exception);
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(false)
.build());

Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any());
}
}

@Test(groups = "unit")
public void networkFailureOnDelete() throws Exception {
RetryOptions retryOptions = new RetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions);

Exception exception = ReadTimeoutException.INSTANCE;

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
OperationType.Delete, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class);

clientRetryPolicy.onBeforeSendRequest(dsr);
for (int i = 0; i < 10; i++) {
Mono<IRetryPolicy.ShouldRetryResult> shouldRetry = clientRetryPolicy.shouldRetry(exception);
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(false)
.build());

Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any());
}
}

@Test(groups = "unit")
public void onBeforeSendRequestNotInvoked() {
RetryOptions retryOptions = new RetryOptions();