From 399efcd94edcc62fc6565f13be7c8a66d66ea328 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 20 Feb 2023 16:44:14 -0700 Subject: [PATCH] Skip performOnPrimary step when executing PublishCheckpoint. (#6366) * Skip performOnPrimary step when executing PublishCheckpoint. Signed-off-by: Marc Handalian Fix spotless. Signed-off-by: Marc Handalian * PR Feedback: - Add random refresh policy to reloation ITs. - add back finishing ReplicationTask. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- .../SegmentReplicationRelocationIT.java | 50 +++++---------- .../checkpoint/PublishCheckpointAction.java | 63 ++++++++++--------- .../PublishCheckpointActionTests.java | 12 +--- 3 files changed, 53 insertions(+), 72 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 95617dc229b97..cd9f095b95afe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -21,9 +21,7 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.IndexModule; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; @@ -33,6 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; /** @@ -43,15 +42,7 @@ public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); private void createIndex(int replicaCount) { - prepareCreate( - INDEX_NAME, - Settings.builder() - .put("index.number_of_shards", 1) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", replicaCount) - .put("index.refresh_interval", -1) - ).get(); + prepareCreate(INDEX_NAME, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)).get(); } /** @@ -64,12 +55,13 @@ public void testPrimaryRelocation() throws Exception { final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(100, 1000); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); final List> pendingIndexResponses = new ArrayList<>(); for (int i = 0; i < initialDocCount; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -114,7 +106,7 @@ public void testPrimaryRelocation() throws Exception { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -140,12 +132,13 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(100, 1000); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); final List> pendingIndexResponses = new ArrayList<>(); for (int i = 0; i < initialDocCount; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -198,7 +191,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -306,6 +299,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { } logger.info("--> flush to have segments on disk"); client().admin().indices().prepareFlush().execute().actionGet(); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); logger.info("--> index more docs so there are ops in the transaction log"); final List> pendingIndexResponses = new ArrayList<>(); @@ -313,7 +307,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -392,7 +386,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { assertBusy(() -> { client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); - }, 1, TimeUnit.MINUTES); + }, 2, TimeUnit.MINUTES); flushAndRefresh(INDEX_NAME); waitForSearchableDocs(totalDocCount, replica, newPrimary); verifyStoreContent(); @@ -402,13 +396,10 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { * This test verifies that adding a new node which results in peer recovery as replica; also bring replica's * replication checkpoint upto the primary's by performing a round of segment replication. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testNewlyAddedReplicaIsUpdated() throws Exception { final String primary = internalCluster().startNode(); - prepareCreate( - INDEX_NAME, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - ).get(); + prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)) + .get(); for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } @@ -426,10 +417,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception { ensureGreen(INDEX_NAME); // Update replica count settings to 1 so that peer recovery triggers and recover replica assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) ); ClusterHealthResponse clusterHealthResponse = client().admin() @@ -450,10 +438,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception { /** * This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - * - * TODO: Ignoring this test as its flaky and needs separate fix */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); final String primaryNode = internalCluster().startNode(); @@ -461,7 +446,7 @@ public void testAddNewReplicaFailure() throws Exception { logger.info("--> creating test index ..."); prepareCreate( INDEX_NAME, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) ).get(); @@ -501,10 +486,7 @@ public void testAddNewReplicaFailure() throws Exception { ensureGreen(INDEX_NAME); // Add Replica shard to the new empty replica node assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) ); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica); waitForRecovery.await(); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index e7b53874c9d1b..0fd934c31ef7f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -13,12 +13,18 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationMode; +import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.ReplicationTask; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -33,15 +39,12 @@ import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.List; import java.util.Objects; -import org.opensearch.action.support.replication.ReplicationMode; - /** * Replication action responsible for publishing checkpoint to a replica shard. * @@ -107,7 +110,6 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { * Publish checkpoint request to shard */ final void publish(IndexShard indexShard) { - String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { @@ -115,28 +117,26 @@ final void publish(IndexShard indexShard) { threadContext.markAsSystemContext(); PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); - final ReplicationTimer timer = new ReplicationTimer(); - timer.start(); - transportService.sendChildRequest( - clusterService.localNode(), - transportPrimaryAction, - new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), - task, - transportOptions, - new TransportResponseHandler() { - @Override - public ReplicationResponse read(StreamInput in) throws IOException { - return newResponseInstance(in); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + final List replicationTargets = indexShard.getReplicationGroup().getReplicationTargets(); + for (ShardRouting replicationTarget : replicationTargets) { + if (replicationTarget.primary()) { + continue; + } + final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId()); + final ConcreteReplicaRequest replicaRequest = new ConcreteReplicaRequest<>( + request, + replicationTarget.allocationId().getId(), + primaryTerm, + indexShard.getLastKnownGlobalCheckpoint(), + indexShard.getMaxSeqNoOfUpdatesOrDeletes() + ); + final ReplicationTimer timer = new ReplicationTimer(); + timer.start(); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); + ActionListener listener = new ActionListener<>() { @Override - public void handleResponse(ReplicationResponse response) { + public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) { timer.stop(); logger.trace( () -> new ParameterizedMessage( @@ -151,7 +151,7 @@ public void handleResponse(ReplicationResponse response) { } @Override - public void handleException(TransportException e) { + public void onFailure(Exception e) { timer.stop(); logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time()); task.setPhase("finished"); @@ -174,8 +174,13 @@ public void handleException(TransportException e) { e ); } - } - ); + }; + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>( + listener, + ReplicaResponse::new + ); + transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler); + } logger.trace( () -> new ParameterizedMessage( "[shardId {}] Publishing replication checkpoint [{}]", @@ -192,7 +197,7 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse())); + throw new OpenSearchException("PublishCheckpointAction should not hit primary shards"); } @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 88b9512eedaba..704b40b05c49d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -8,9 +8,9 @@ package org.opensearch.indices.replication.checkpoint; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; @@ -33,10 +33,9 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; import static org.opensearch.test.ClusterServiceUtils.createClusterService; @@ -105,14 +104,9 @@ public void testPublishCheckpointActionOnPrimary() { ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); - final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); - action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { - // we should forward the request containing the current publish checkpoint to the replica - assertThat(result.replicaRequest(), sameInstance(request)); - })); - + expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); }); } public void testPublishCheckpointActionOnReplica() {