diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java index b9d2160a5c806..cc6e681c6d853 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/ClientRetryPolicy.java @@ -71,6 +71,12 @@ public Mono shouldRetry(Exception e) { return Mono.just(ShouldRetryResult.error(e)); } + // Received Connection error (HttpException), initiate the endpoint rediscovery + if (WebExceptionUtility.isNetworkFailure(e)) { + logger.warn("Endpoint not reachable. Will refresh cache and retry. " , e); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false, e); + } + this.retryContext = null; // Received 403.3 on write region, initiate the endpoint re-discovery CosmosClientException clientException = Utils.as(e, CosmosClientException.class); @@ -81,8 +87,8 @@ public Mono shouldRetry(Exception e) { Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.FORBIDDEN) && Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { - logger.warn("Endpoint not writable. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(false, true); + logger.warn("Endpoint not writable. Will refresh cache and retry. ", e); + return this.shouldRetryOnEndpointFailureAsync(false, true, e); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -91,14 +97,8 @@ public Mono shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.DATABASE_ACCOUNT_NOTFOUND) && this.isReadRequest) { - logger.warn("Endpoint not available for reads. Will refresh cache and retry. {}", e.toString()); - return this.shouldRetryOnEndpointFailureAsync(true, false); - } - - // Received Connection error (HttpRequestException), initiate the endpoint rediscovery - if (WebExceptionUtility.isNetworkFailure(e)) { - logger.warn("Endpoint not reachable. Will refresh cache and retry. {}" , e.toString()); - return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false); + logger.warn("Endpoint not available for reads. Will refresh cache and retry. ", e); + return this.shouldRetryOnEndpointFailureAsync(true, false, e); } if (clientException != null && @@ -141,7 +141,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable() { } } - private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh) { + private Mono shouldRetryOnEndpointFailureAsync(boolean isReadRequest , boolean forceRefresh, Exception e) { if (!this.enableEndpointDiscovery || this.failoverRetryCount > MaxRetryCount) { logger.warn("ShouldRetryOnEndpointFailureAsync() Not retrying. Retry count = {}", this.failoverRetryCount); return Mono.just(ShouldRetryResult.noRetry()); @@ -173,8 +173,18 @@ private Mono shouldRetryOnEndpointFailureAsync(boolean isRead retryDelay = Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS); } this.retryContext = new RetryContext(this.failoverRetryCount, false); - return this.globalEndpointManager.refreshLocationAsync(null, forceRefresh) - .then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); + + Mono completable = this.globalEndpointManager.refreshLocationAsync(null, forceRefresh); + if (isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) { + // refresh cache and + // if it is a read request or if it is a write but we are sure the write + // hasn't reached the service retry + return completable.then(Mono.just(ShouldRetryResult.retryAfter(retryDelay))); + } else { + // refresh cache and + // no retry for writes which we are not sure if have reached to the service or not + return completable.then(Mono.just(ShouldRetryResult.noRetry())); + } } @Override diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java index e71dd73a27f62..0425028688ed5 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/test/java/com/azure/data/cosmos/internal/ClientRetryPolicyTest.java @@ -65,10 +65,10 @@ public void networkFailureOnWrite() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); + // We don't want to retry writes on network failure validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() - .shouldRetry(true) - .backOfTime(i > 0 ? Duration.ofMillis(ClientRetryPolicy.RetryIntervalInMS) : Duration.ZERO) + .shouldRetry(false) .build()); Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any()); @@ -76,6 +76,62 @@ public void networkFailureOnWrite() throws Exception { } } + @Test(groups = "unit") + public void networkFailureOnUpsert() throws Exception { + RetryOptions retryOptions = new RetryOptions(); + GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); + Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); + ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); + + Exception exception = ReadTimeoutException.INSTANCE; + + RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( + OperationType.Upsert, "/dbs/db/colls/col/docs/docId", ResourceType.Document); + dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class); + + clientRetryPolicy.onBeforeSendRequest(dsr); + for (int i = 0; i < 10; i++) { + Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); + // We don't want to retry writes on network failure + validateSuccess(shouldRetry, ShouldRetryValidator.builder() + .nullException() + .shouldRetry(false) + .build()); + + Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any()); + Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any()); + } + } + + @Test(groups = "unit") + public void networkFailureOnDelete() throws Exception { + RetryOptions retryOptions = new RetryOptions(); + GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); + Mockito.doReturn(new URL("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); + ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(endpointManager, true, retryOptions); + + Exception exception = ReadTimeoutException.INSTANCE; + + RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( + OperationType.Delete, "/dbs/db/colls/col/docs/docId", ResourceType.Document); + dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class); + + clientRetryPolicy.onBeforeSendRequest(dsr); + for (int i = 0; i < 10; i++) { + Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); + // We don't want to retry writes on network failure + validateSuccess(shouldRetry, ShouldRetryValidator.builder() + .nullException() + .shouldRetry(false) + .build()); + + Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any()); + Mockito.verify(endpointManager, Mockito.times(i + 1)).markEndpointUnavailableForWrite(Mockito.any()); + } + } + @Test(groups = "unit") public void onBeforeSendRequestNotInvoked() { RetryOptions retryOptions = new RetryOptions();