From 4092554498931d3a75bc2a2161b32f112688429c Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 13 Jan 2020 15:20:03 -0800 Subject: [PATCH] Fixed retry logic on writes on forbidden status --- .../cosmos/internal/ClientRetryPolicy.java | 64 ++++++++++--------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java index cc6e681c6d853..112bdb6370eca 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java @@ -71,12 +71,6 @@ public Mono shouldRetry(Exception e) { return Mono.just(ShouldRetryResult.error(e)); } - // Received Connection error (HttpException), initiate the endpoint rediscovery - if (WebExceptionUtility.isNetworkFailure(e)) { - logger.warn("Endpoint not reachable. Will refresh cache and retry. " , e); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e); - } - this.retryContext = null; // Received 403.3 on write region, initiate the endpoint re-discovery CosmosClientException clientException = Utils.as(e, CosmosClientException.class); @@ -88,7 +82,7 @@ public Mono shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { logger.warn("Endpoint not writable. Will refresh cache and retry. ", e); - return this.shouldRetryOnEndpointFailureAsync(false, true, e); + return this.shouldRetryOnEndpointFailureAsync(false, true); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -98,7 +92,17 @@ public Mono shouldRetry(Exception e) { this.isReadRequest) { logger.warn("Endpoint not available for reads. Will refresh cache and retry. ", e); - return this.shouldRetryOnEndpointFailureAsync(true, false, e); + return this.shouldRetryOnEndpointFailureAsync(true, false); + } + + // Received Connection error (HttpException), initiate the endpoint rediscovery + if (WebExceptionUtility.isNetworkFailure(e)) { + if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) { + logger.warn("Endpoint not reachable. Will refresh cache and retry. ", e); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false); + } else { + return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false); + } } if (clientException != null && @@ -141,22 +145,13 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() { } } - private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, Exception e) { + private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Mono.just(ShouldRetryResult.noRetry()); } - this.failoverRetryCount++; - - // Mark the current read endpoint as unavailable - if (this.isReadRequest) { - logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint); - } else { - logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint); - } + Mono refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh); // Some requests may be in progress when the endpoint manager and client are closed. // In that case, the request won't succeed since the http client is closed. @@ -172,19 +167,30 @@ private Mono shouldRetryOnEndpointFailureAsync(boolean isRead } else { retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS); } - this.retryContext = new RetryContext(this.failoverRetryCount, false); + return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); + } + + private Mono shouldNotRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { + if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { + logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); + return Mono.just(ShouldRetryResult.noRetry()); + } + Mono refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh); + return refreshLocationCompletable.then(Mono.just(ShouldRetryResult.noRetry())); + } + + private Mono refreshLocation(boolean isReadRequest, boolean forceRefresh) { + this.failoverRetryCount++; - Mono completable = this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); - if (isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) { - // refresh cache and - // if it is a read request or if it is a write but we are sure the write - // hasn't reached the service retry - return completable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); + // Mark the current read endpoint as unavailable + if (isReadRequest) { + logger.warn("marking the endpoint {} as unavailable for read", this.locationEndpoint); + this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoint); } else { - // refresh cache and - // no retry for writes which we are not sure if have reached to the service or not - return completable.then(Mono.just(ShouldRetryResult.noRetry())); + logger.warn("marking the endpoint {} as unavailable for write", this.locationEndpoint); + this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoint); } + return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); } @Override