diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 0ee86a6058c63..5dab63f85cfc7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; @@ -455,7 +456,7 @@ private void updateSettings(final LongConsumer handler, final AtomicInteger retr private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; - if (shouldRetry(params.getRemoteCluster(), e)) { + if (shouldRetry(e)) { if (isStopped() == false) { // Only retry is the shard follow task is not stopped. int currentRetry = retryCounter.incrementAndGet(); @@ -484,7 +485,7 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { return Math.min(backOffDelay, maxRetryDelayInMillis); } - static boolean shouldRetry(String remoteCluster, Exception e) { + static boolean shouldRetry(final Exception e) { if (NetworkExceptionHelper.isConnectException(e)) { return true; } else if (NetworkExceptionHelper.isCloseConnectionException(e)) { @@ -503,7 +504,8 @@ static boolean shouldRetry(String remoteCluster, Exception e) { actual instanceof ConnectTransportException || actual instanceof NodeClosedException || actual instanceof NoSuchRemoteClusterException || - (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")); + (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) || + actual instanceof EsRejectedExecutionException; } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 8454553dde6a0..77cdfc1b9ceb2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -396,7 +396,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll return; } - if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) { + if (ShardFollowNodeTask.shouldRetry(e)) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 09d00dc6a33ac..35f461f0cc0aa 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -255,8 +256,16 @@ public void testReceiveRetryableError() { startTask(task, 63, -1); int max = randomIntBetween(1, 30); + final Exception[] exceptions = new Exception[max]; for (int i = 0; i < max; i++) { - readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); + final Exception exception; + if (randomBoolean()) { + exception = new ShardNotFoundException(new ShardId("leader_index", "", 0)); + } else { + exception = new EsRejectedExecutionException("leader_index rejected"); + } + exceptions[i] = exception; + readFailures.add(exception); } mappingVersions.add(1L); leaderGlobalCheckpoints.add(63L); @@ -272,10 +281,17 @@ public void testReceiveRetryableError() { final Map.Entry> entry = status.readExceptions().entrySet().iterator().next(); assertThat(entry.getValue().v1(), equalTo(Math.toIntExact(retryCounter.get()))); assertThat(entry.getKey(), equalTo(0L)); - assertThat(entry.getValue().v2(), instanceOf(ShardNotFoundException.class)); - final ShardNotFoundException shardNotFoundException = (ShardNotFoundException) entry.getValue().v2(); - assertThat(shardNotFoundException.getShardId().getIndexName(), equalTo("leader_index")); - assertThat(shardNotFoundException.getShardId().getId(), equalTo(0)); + if (exceptions[Math.toIntExact(retryCounter.get()) - 1] instanceof ShardNotFoundException) { + assertThat(entry.getValue().v2(), instanceOf(ShardNotFoundException.class)); + final ShardNotFoundException shardNotFoundException = (ShardNotFoundException) entry.getValue().v2(); + assertThat(shardNotFoundException.getShardId().getIndexName(), equalTo("leader_index")); + assertThat(shardNotFoundException.getShardId().getId(), equalTo(0)); + } else { + assertThat(entry.getValue().v2().getCause(), instanceOf(EsRejectedExecutionException.class)); + final EsRejectedExecutionException rejectedExecutionException = + (EsRejectedExecutionException) entry.getValue().v2().getCause(); + assertThat(rejectedExecutionException.getMessage(), equalTo("leader_index rejected")); + } } retryCounter.incrementAndGet(); };