From c829a38e6b16e7db17b79c567ddae28a2b097c7d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 28 Feb 2019 12:05:51 -0500 Subject: [PATCH 1/4] =?UTF-8?q?Don=E2=80=99t=20ack=20if=20unable=20to=20re?= =?UTF-8?q?move=20failing=20replica?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...TransportVerifyShardBeforeCloseAction.java | 7 +-- .../TransportResyncReplicationAction.java | 8 +-- .../replication/ReplicationOperation.java | 59 +++++++++---------- .../TransportReplicationAction.java | 35 ++--------- .../replication/TransportWriteAction.java | 14 ++--- .../ReplicationOperationTests.java | 57 +++++++++--------- .../TransportReplicationActionTests.java | 4 +- .../TransportWriteActionTests.java | 9 +-- .../discovery/ClusterDisruptionIT.java | 56 ++++++++++++++++++ .../ESIndexLevelReplicationTestCase.java | 8 +-- 10 files changed, 129 insertions(+), 128 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index be757a02c3982..13269344a1ac6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.util.Objects; -import java.util.function.Consumer; public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> { @@ -130,10 +129,8 @@ class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy { } @Override - public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess, - final Consumer onPrimaryDemoted, final Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index e9a6e7b48152d..3f09f00b9ac1e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -48,7 +48,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + shardStateAction.remoteShardFailed( + replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index f001d9a29e29d..e2425826f62e5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -34,7 +35,9 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.transport.TransportException; import java.io.IOException; import java.util.ArrayList; @@ -43,7 +46,6 @@ import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; public class ReplicationOperation< Request extends ReplicationRequest, @@ -133,10 +135,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica for (String allocationId : replicationGroup.getUnavailableInSyncShards()) { pendingActions.incrementAndGet(); replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, - ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, - throwable -> decPendingAndFinishIfNeeded() - ); + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } } @@ -192,9 +191,8 @@ public void onFailure(Exception replicaException) { shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); } String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, - replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); + replicasProxy.failShardIfNeeded(shard, message, replicaException, + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } @Override @@ -204,13 +202,18 @@ public String toString() { }); } - private void onPrimaryDemoted(Exception demotionFailure) { - String primaryFail = String.format(Locale.ROOT, - "primary shard [%s] was demoted while failing replica shard", - primary.routingEntry()); - // we are no longer the primary, fail ourselves and start over - primary.failShard(primaryFail, demotionFailure); - finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure)); + private void onNoLongerPrimary(Exception failure) { + final String message; + if (failure instanceof ShardStateAction.NoLongerPrimaryShardException) { + // we are no longer the primary, fail ourselves and start over + message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry()); + primary.failShard(message, failure); + } else { + // these can occur if the node is shutting down and are okay any other exception here is not expected and merits investigation. + assert failure instanceof NodeClosedException || failure instanceof TransportException : failure; + message = String.format(Locale.ROOT, "primary node [%s] is shutting down while failing replica shard", primary.routingEntry()); + } + finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure)); } /** @@ -370,31 +373,23 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo * of active shards. Whether a failure is needed is left up to the * implementation. * - * @param replica shard to fail - * @param message a (short) description of the reason - * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed - * @param onSuccess a callback to call when the shard has been successfully removed from the active set. - * @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted - * by the master. - * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the + * @param replica shard to fail + * @param message a (short) description of the reason + * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed + * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener); /** * Marks shard copy as stale if needed, removing its allocation id from * the set of in-sync allocation ids. Whether marking as stale is needed * is left up to the implementation. * - * @param shardId shard id - * @param allocationId allocation id to remove from the set of in-sync allocation ids - * @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set. - * @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted - * by the master. - * @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored. + * @param shardId shard id + * @param allocationId allocation id to remove from the set of in-sync allocation ids + * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 326f7bacdb8f6..cc0c69418d7eb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -84,7 +84,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -1173,47 +1172,21 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { // This does not need to fail the shard. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be // "alive" if it were to fail. - onSuccess.run(); + listener.onResponse(null); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { // This does not need to make the shard stale. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be // "alive" if it were to be marked as stale. - onSuccess.run(); - } - - protected final ActionListener createShardActionListener(final Runnable onSuccess, - final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { - return new ActionListener() { - @Override - public void onResponse(Void aVoid) { - onSuccess.run(); - } - - @Override - public void onFailure(Exception shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - onPrimaryDemoted.accept(shardFailedError); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onIgnoredFailure.accept(shardFailedError); - } - } - }; + listener.onResponse(null); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index f44694f55d960..4781682437545 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -376,20 +375,17 @@ class WriteActionReplicasProxy extends ReplicasProxy { } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { if (TransportActions.isShardNotAvailableException(exception) == false) { logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); } - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + shardStateAction.remoteShardFailed( + replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8fa10c4ee26d7..f4e31da055325 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -21,14 +21,16 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -39,6 +41,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; @@ -51,7 +54,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -115,10 +117,8 @@ public void testReplication() throws Exception { final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures); final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); - final TestReplicationOperation op = new TestReplicationOperation(request, - primary, listener, replicasProxy); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy); op.execute(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet())); @@ -162,7 +162,7 @@ private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, Shar } } - public void testDemotedPrimary() throws Exception { + public void testNoLongerPrimary() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); @@ -198,26 +198,30 @@ public void testDemotedPrimary() throws Exception { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean(); + final Exception shardActionFailure; + if (randomBoolean()) { + shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT)); + } else { + shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead"); + } final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) { @Override public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { - super.failShardIfNeeded(replica, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.failShardIfNeeded(replica, message, exception, shardActionListener); } else { assertThat(replica, equalTo(failedReplica)); - onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); + shardActionListener.onFailure(shardActionFailure); } } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { - onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); + shardActionListener.onFailure(shardActionFailure); } else { - super.markShardCopyAsStaleIfNeeded(shardId, allocationId, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.markShardCopyAsStaleIfNeeded(shardId, allocationId, shardActionListener); } } }; @@ -225,6 +229,7 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) { @Override public void failShard(String message, Exception exception) { + assertThat(exception, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); assertTrue(primaryFailed.compareAndSet(false, true)); } }; @@ -233,7 +238,11 @@ public void failShard(String message, Exception exception) { assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertTrue("listener is not marked as done", listener.isDone()); - assertTrue(primaryFailed.get()); + if (shardActionFailure instanceof ShardStateAction.NoLongerPrimaryShardException) { + assertTrue(primaryFailed.get()); + } else { + assertFalse(primaryFailed.get()); + } assertListenerThrows("should throw exception to trigger retry", listener, ReplicationOperation.RetryOnPrimaryException.class); } @@ -594,33 +603,23 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { if (failedReplicas.add(replica) == false) { fail("replica [" + replica + "] was failed twice"); } if (opFailures.containsKey(replica)) { - if (randomBoolean()) { - onSuccess.run(); - } else { - onIgnoredFailure.accept(new ElasticsearchException("simulated")); - } + listener.onResponse(null); } else { fail("replica [" + replica + "] was failed"); } } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { if (markedAsStaleCopies.add(allocationId) == false) { fail("replica [" + allocationId + "] was marked as stale twice"); } - if (randomBoolean()) { - onSuccess.run(); - } else { - onIgnoredFailure.accept(new ElasticsearchException("simulated")); - } + listener.onResponse(null); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 110ab9bcb99a2..ffc9c2bf70a8a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -744,11 +744,9 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { } AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set - ); + ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A replication action doesn't not fail the request assertEquals(0, shardFailedRequests.length); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 6e1ec3c76797d..0072d1bc83e8d 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -311,11 +311,9 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { } AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set - ); + ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A write replication action proxy should fail the shard assertEquals(1, shardFailedRequests.length); @@ -329,8 +327,6 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); assertTrue(success.get()); assertNull(failure.get()); - assertNull(ignoredFailure.get()); - } else if (randomBoolean()) { // simulate the primary has been demoted transport.handleRemoteError(shardFailedRequest.requestId, @@ -338,15 +334,12 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { "shard-failed-test")); assertFalse(success.get()); assertNotNull(failure.get()); - assertNull(ignoredFailure.get()); - } else { // simulated an "ignored" exception transport.handleRemoteError(shardFailedRequest.requestId, new NodeClosedException(state.nodes().getLocalNode())); assertFalse(success.get()); assertNull(failure.get()); - assertNotNull(ignoredFailure.get()); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 9fd08511446d7..847b055cb5288 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -37,8 +37,12 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -52,6 +56,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -66,6 +71,7 @@ import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; @@ -436,4 +442,54 @@ public void testIndicesDeleted() throws Exception { assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } + public void testRestartPrimaryNodeWhileIndexing() throws Exception { + startCluster(3); + String index = "failover_index"; + assertAcked(client().admin().indices().prepareCreate(index).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, between(1, 2)))); + AtomicBoolean stopped = new AtomicBoolean(); + Thread[] threads = new Thread[between(1, 4)]; + AtomicInteger docID = new AtomicInteger(); + Set ackedDocs = ConcurrentCollections.newConcurrentSet(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + while (stopped.get() == false && docID.get() < 5000) { + int docId = frequently() ? docID.getAndIncrement() : between(0, docID.getAndIncrement()); + try { + IndexResponse response = client().prepareIndex(index, "_doc", Integer.toString(docId)) + .setSource("{\"f\":" + docId + "}", XContentType.JSON).get(); + assertThat(response.getResult(), isOneOf(CREATED, UPDATED)); + logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo()); + ackedDocs.add(response.getId()); + } catch (ElasticsearchException ignore) { + logger.info("--> fail to index id={}", docId); + } + } + }); + threads[i].start(); + } + ensureGreen(index); + assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(100))); + ClusterState clusterState = internalCluster().clusterService().state(); + for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { + if (shardRouting.primary()) { + String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); + internalCluster().restartNode(nodeName, new InternalTestCluster.RestartCallback()); + break; + } + } + ensureGreen(index); + assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(200))); + stopped.set(true); + for (Thread thread : threads) { + thread.join(); + } + clusterState = internalCluster().clusterService().state(); + for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { + String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + IndexShard indexShard = indicesService.getShardOrNull(shardRouting.shardId()); + assertThat(IndexShardTestCase.getShardDocUIDs(indexShard), equalTo(ackedDocs)); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index d20d57aed64a9..8b984c22bfd0e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -96,7 +96,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -721,15 +720,12 @@ public void onFailure(Exception e) { } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { throw new UnsupportedOperationException("failing shard " + replica + " isn't supported. failure: " + message, exception); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { throw new UnsupportedOperationException("can't mark " + shardId + ", aid [" + allocationId + "] as stale"); } } From ab65f4b14ae8af85708e8b4340fd72599546f803 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 4 Mar 2019 11:57:15 -0500 Subject: [PATCH 2/4] restart random data node --- .../discovery/ClusterDisruptionIT.java | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 847b055cb5288..ac19fd68cde80 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -442,9 +442,9 @@ public void testIndicesDeleted() throws Exception { assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } - public void testRestartPrimaryNodeWhileIndexing() throws Exception { + public void testRestartNodeWhileIndexing() throws Exception { startCluster(3); - String index = "failover_index"; + String index = "restart_while_indexing"; assertAcked(client().admin().indices().prepareCreate(index).setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, between(1, 2)))); AtomicBoolean stopped = new AtomicBoolean(); @@ -454,15 +454,14 @@ public void testRestartPrimaryNodeWhileIndexing() throws Exception { for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { while (stopped.get() == false && docID.get() < 5000) { - int docId = frequently() ? docID.getAndIncrement() : between(0, docID.getAndIncrement()); + String id = Integer.toString(docID.incrementAndGet()); try { - IndexResponse response = client().prepareIndex(index, "_doc", Integer.toString(docId)) - .setSource("{\"f\":" + docId + "}", XContentType.JSON).get(); + IndexResponse response = client().prepareIndex(index, "_doc", id).setSource("{}", XContentType.JSON).get(); assertThat(response.getResult(), isOneOf(CREATED, UPDATED)); logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo()); ackedDocs.add(response.getId()); } catch (ElasticsearchException ignore) { - logger.info("--> fail to index id={}", docId); + logger.info("--> fail to index id={}", id); } } }); @@ -470,21 +469,14 @@ public void testRestartPrimaryNodeWhileIndexing() throws Exception { } ensureGreen(index); assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(100))); - ClusterState clusterState = internalCluster().clusterService().state(); - for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { - if (shardRouting.primary()) { - String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); - internalCluster().restartNode(nodeName, new InternalTestCluster.RestartCallback()); - break; - } - } + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback()); ensureGreen(index); assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(200))); stopped.set(true); for (Thread thread : threads) { thread.join(); } - clusterState = internalCluster().clusterService().state(); + ClusterState clusterState = internalCluster().clusterService().state(); for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); From 6afb094f04d21f2f30c601a4beea9fa25de06c6c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 4 Mar 2019 12:38:35 -0500 Subject: [PATCH 3/4] assert transport exception --- .../replication/ReplicationOperation.java | 16 +++++++++++----- .../replication/ReplicationOperationTests.java | 3 +++ .../replication/TransportWriteActionTests.java | 4 ++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index e2425826f62e5..77bcb13c3a755 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -203,15 +204,20 @@ public String toString() { } private void onNoLongerPrimary(Exception failure) { + final boolean nodeIsClosing = failure instanceof NodeClosedException || + (failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage())); final String message; - if (failure instanceof ShardStateAction.NoLongerPrimaryShardException) { + if (nodeIsClosing) { + message = String.format(Locale.ROOT, "primary node [%s] is shutting down while failing replica shard", primary.routingEntry()); + } else { + if (Assertions.ENABLED) { + if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) { + throw new AssertionError("unexpected failure", failure); + } + } // we are no longer the primary, fail ourselves and start over message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry()); primary.failShard(message, failure); - } else { - // these can occur if the node is shutting down and are okay any other exception here is not expected and merits investigation. - assert failure instanceof NodeClosedException || failure instanceof TransportException : failure; - message = String.format(Locale.ROOT, "primary node [%s] is shutting down while failing replica shard", primary.routingEntry()); } finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure)); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index f4e31da055325..adb79b1fe3bba 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportException; import java.util.ArrayList; import java.util.Collections; @@ -201,6 +202,8 @@ public void testNoLongerPrimary() throws Exception { final Exception shardActionFailure; if (randomBoolean()) { shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT)); + } else if (randomBoolean()) { + shardActionFailure = new TransportException("TransportService is closed stopped can't send request"); } else { shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead"); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 0072d1bc83e8d..f540374a56c20 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -335,11 +335,11 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { assertFalse(success.get()); assertNotNull(failure.get()); } else { - // simulated an "ignored" exception + // simulated a node closing exception transport.handleRemoteError(shardFailedRequest.requestId, new NodeClosedException(state.nodes().getLocalNode())); assertFalse(success.get()); - assertNull(failure.get()); + assertNotNull(failure.get()); } } From 6b56fbcee0171e995a01ac7f1ba288963be44d41 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 5 Mar 2019 09:33:32 -0500 Subject: [PATCH 4/4] wording + comment --- .../action/support/replication/ReplicationOperation.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 77bcb13c3a755..71483245ee3fa 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -208,7 +208,10 @@ private void onNoLongerPrimary(Exception failure) { (failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage())); final String message; if (nodeIsClosing) { - message = String.format(Locale.ROOT, "primary node [%s] is shutting down while failing replica shard", primary.routingEntry()); + message = String.format(Locale.ROOT, + "node with primary [%s] is shutting down while failing replica shard", primary.routingEntry()); + // We prefer not to fail the primary to avoid unnecessary warning log + // when the node with the primary shard is gracefully shutting down. } else { if (Assertions.ENABLED) { if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) {