Skip to content

Commit

Permalink
Fixed retry logic on writes on forbidden status (#7402)
Browse files Browse the repository at this point in the history
  • Loading branch information
kushagraThapar authored Jan 14, 2020
1 parent f93ec77 commit 48171b1
Showing 1 changed file with 35 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
return Mono.just(ShouldRetryResult.error(e));
}

// Received Connection error (HttpException), initiate the endpoint rediscovery
if (WebExceptionUtility.isNetworkFailure(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. " , e);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e);
}

this.retryContext = null;
// Received 403.3 on write region, initiate the endpoint re-discovery
CosmosClientException clientException = Utils.as(e, CosmosClientException.class);
Expand All @@ -88,7 +82,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN))
{
logger.warn("Endpoint not writable. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(false, true, e);
return this.shouldRetryOnEndpointFailureAsync(false, true);
}

// Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress)
Expand All @@ -98,7 +92,17 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
this.isReadRequest)
{
logger.warn("Endpoint not available for reads. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(true, false, e);
return this.shouldRetryOnEndpointFailureAsync(true, false);
}

// Received Connection error (HttpException), initiate the endpoint rediscovery
if (WebExceptionUtility.isNetworkFailure(e)) {
if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
} else {
return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false);
}
}

if (clientException != null &&
Expand Down Expand Up @@ -141,22 +145,13 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() {
}
}

private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, Exception e) {
private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount);
return Mono.just(ShouldRetryResult.noRetry());
}

this.failoverRetryCount++;

// Mark the current read endpoint as unavailable
if (this.isReadRequest) {
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint);
} else {
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint);
}
Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh);

// Some requests may be in progress when the endpoint manager and client are closed.
// In that case, the request won't succeed since the http client is closed.
Expand All @@ -172,19 +167,30 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead
} else {
retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS);
}
this.retryContext = new RetryContext(this.failoverRetryCount, false);
return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay)));
}

private Mono<ShouldRetryResult> shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) {
if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) {
logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount);
return Mono.just(ShouldRetryResult.noRetry());
}
Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh);
return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.noRetry()));
}

private Mono<Void> refreshLocation(boolean isReadRequest, boolean forceRefresh) {
this.failoverRetryCount++;

Mono<Void> completable = this.globalEndpointManager.refreshLocationAsync(null, forceRefresh);
if (isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) {
// refresh cache and
// if it is a read request or if it is a write but we are sure the write
// hasn't reached the service retry
return completable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay)));
// Mark the current read endpoint as unavailable
if (isReadRequest) {
logger.warn("marking the endpoint {} as unavailable for read", this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint);
} else {
// refresh cache and
// no retry for writes which we are not sure if have reached to the service or not
return completable.then(Mono.just(ShouldRetryResult.noRetry()));
logger.warn("marking the endpoint {} as unavailable for write", this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint);
}
return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh);
}

@Override
Expand Down

0 comments on commit 48171b1

Please sign in to comment.