From 33edf00ada05ef2ee5c8f2d58429d1b8de6b1f82 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 17 Feb 2023 17:56:21 -0800 Subject: [PATCH 1/2] Skip performOnPrimary step when executing PublishCheckpoint. Signed-off-by: Marc Handalian Fix spotless. Signed-off-by: Marc Handalian --- .../SegmentReplicationRelocationIT.java | 41 +++--------- .../checkpoint/PublishCheckpointAction.java | 64 ++++++++++--------- .../PublishCheckpointActionTests.java | 10 +-- 3 files changed, 44 insertions(+), 71 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 4f0983553de7c..21550dc74a078 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -21,9 +21,7 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.IndexModule; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; @@ -33,6 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; /** @@ -43,22 +42,13 @@ public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); private void createIndex(int replicaCount) { - prepareCreate( - INDEX_NAME, - Settings.builder() - .put("index.number_of_shards", 1) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", replicaCount) - .put("index.refresh_interval", -1) - ).get(); + prepareCreate(INDEX_NAME, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)).get(); } /** * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before * relocation and after relocation documents are indexed and documents are verified */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocation() throws Exception { final String oldPrimary = internalCluster().startNode(); createIndex(1); @@ -135,7 +125,6 @@ public void testPrimaryRelocation() throws Exception { * failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the * replicas. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocationWithSegRepFailure() throws Exception { final String oldPrimary = internalCluster().startNode(); createIndex(1); @@ -220,7 +209,6 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { * This test verifies primary recovery behavior with continuous ingestion * */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { final String primary = internalCluster().startNode(); createIndex(1); @@ -297,7 +285,6 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E * operations during handoff. The test verifies all docs ingested are searchable on new primary. * */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { final String primary = internalCluster().startNode(); createIndex(1); @@ -396,7 +383,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { assertBusy(() -> { client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); - }, 1, TimeUnit.MINUTES); + }, 2, TimeUnit.MINUTES); flushAndRefresh(INDEX_NAME); waitForSearchableDocs(totalDocCount, replica, newPrimary); verifyStoreContent(); @@ -406,13 +393,10 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { * This test verifies that adding a new node which results in peer recovery as replica; also bring replica's * replication checkpoint upto the primary's by performing a round of segment replication. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testNewlyAddedReplicaIsUpdated() throws Exception { final String primary = internalCluster().startNode(); - prepareCreate( - INDEX_NAME, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - ).get(); + prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)) + .get(); for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } @@ -430,10 +414,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception { ensureGreen(INDEX_NAME); // Update replica count settings to 1 so that peer recovery triggers and recover replica assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) ); ClusterHealthResponse clusterHealthResponse = client().admin() @@ -454,10 +435,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception { /** * This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - * - * TODO: Ignoring this test as its flaky and needs separate fix */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); final String primaryNode = internalCluster().startNode(); @@ -465,7 +443,7 @@ public void testAddNewReplicaFailure() throws Exception { logger.info("--> creating test index ..."); prepareCreate( INDEX_NAME, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) ).get(); @@ -505,10 +483,7 @@ public void testAddNewReplicaFailure() throws Exception { ensureGreen(INDEX_NAME); // Add Replica shard to the new empty replica node assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1)) ); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica); waitForRecovery.await(); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index e7b53874c9d1b..a93b56546175f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -13,12 +13,18 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationMode; +import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.ReplicationTask; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -33,15 +39,12 @@ import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.List; import java.util.Objects; -import org.opensearch.action.support.replication.ReplicationMode; - /** * Replication action responsible for publishing checkpoint to a replica shard. * @@ -107,7 +110,6 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { * Publish checkpoint request to shard */ final void publish(IndexShard indexShard) { - String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { @@ -115,28 +117,26 @@ final void publish(IndexShard indexShard) { threadContext.markAsSystemContext(); PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); - final ReplicationTimer timer = new ReplicationTimer(); - timer.start(); - transportService.sendChildRequest( - clusterService.localNode(), - transportPrimaryAction, - new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), - task, - transportOptions, - new TransportResponseHandler() { - @Override - public ReplicationResponse read(StreamInput in) throws IOException { - return newResponseInstance(in); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + final List replicationTargets = indexShard.getReplicationGroup().getReplicationTargets(); + for (ShardRouting replicationTarget : replicationTargets) { + if (replicationTarget.primary()) { + continue; + } + final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId()); + final ConcreteReplicaRequest replicaRequest = new ConcreteReplicaRequest<>( + request, + replicationTarget.allocationId().getId(), + primaryTerm, + indexShard.getLastKnownGlobalCheckpoint(), + indexShard.getMaxSeqNoOfUpdatesOrDeletes() + ); + final ReplicationTimer timer = new ReplicationTimer(); + timer.start(); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); + ActionListener listener = new ActionListener<>() { @Override - public void handleResponse(ReplicationResponse response) { + public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) { timer.stop(); logger.trace( () -> new ParameterizedMessage( @@ -146,12 +146,11 @@ public void handleResponse(ReplicationResponse response) { timer.time() ) ); - task.setPhase("finished"); taskManager.unregister(task); } @Override - public void handleException(TransportException e) { + public void onFailure(Exception e) { timer.stop(); logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time()); task.setPhase("finished"); @@ -174,8 +173,13 @@ public void handleException(TransportException e) { e ); } - } - ); + }; + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>( + listener, + ReplicaResponse::new + ); + transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler); + } logger.trace( () -> new ParameterizedMessage( "[shardId {}] Publishing replication checkpoint [{}]", @@ -192,7 +196,7 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse())); + throw new OpenSearchException("PublishCheckpointAction should not hit primary shards"); } @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 728397e665a49..704b40b05c49d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -8,9 +8,9 @@ package org.opensearch.indices.replication.checkpoint; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; @@ -33,7 +33,6 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; -import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; @@ -105,14 +104,9 @@ public void testPublishCheckpointActionOnPrimary() { ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); - final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); - action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { - // we should forward the request containing the current publish checkpoint to the replica - assertThat(result.replicaRequest(), sameInstance(request)); - })); - + expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); }); } public void testPublishCheckpointActionOnReplica() { From e360577138a8ef432073801b3202958535652eb1 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 20 Feb 2023 14:53:53 -0800 Subject: [PATCH 2/2] PR Feedback: - Add random refresh policy to reloation ITs. - add back finishing ReplicationTask. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationRelocationIT.java | 13 ++++++++----- .../checkpoint/PublishCheckpointAction.java | 1 + 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 21550dc74a078..cd9f095b95afe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -55,12 +55,13 @@ public void testPrimaryRelocation() throws Exception { final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(100, 1000); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); final List> pendingIndexResponses = new ArrayList<>(); for (int i = 0; i < initialDocCount; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -105,7 +106,7 @@ public void testPrimaryRelocation() throws Exception { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -131,12 +132,13 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(100, 1000); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); final List> pendingIndexResponses = new ArrayList<>(); for (int i = 0; i < initialDocCount; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -189,7 +191,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); @@ -297,6 +299,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { } logger.info("--> flush to have segments on disk"); client().admin().indices().prepareFlush().execute().actionGet(); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); logger.info("--> index more docs so there are ops in the transaction log"); final List> pendingIndexResponses = new ArrayList<>(); @@ -304,7 +307,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(refreshPolicy) .setSource("field", "value" + i) .execute() ); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index a93b56546175f..0fd934c31ef7f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -146,6 +146,7 @@ public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) { timer.time() ) ); + task.setPhase("finished"); taskManager.unregister(task); }