From 1414c093f57b9eca62dd949a48aad0ab333cb756 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 22 Nov 2022 14:08:30 -0800 Subject: [PATCH] Add integration test for indexing during relocation Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationIT.java | 106 +++++++++++++++--- .../opensearch/index/shard/IndexShard.java | 4 +- 2 files changed, 90 insertions(+), 20 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 0f1ea390914d8..3ff499876df7d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -10,11 +10,14 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.junit.BeforeClass; +import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -29,6 +32,7 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; @@ -45,6 +49,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Arrays; import java.util.List; @@ -95,7 +100,6 @@ protected boolean addMockInternalEngine() { return false; } - private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); public void ingestDocs(int docCount) throws Exception { @@ -118,9 +122,8 @@ public void ingestDocs(int docCount) throws Exception { } /** - * This Integration Relocates a primary shard to another node. Before Relocation and after relocation we index single document. We don't perform any flush - * before relocation is done. - * This test will pass if we perform flush before relocation. + * This test relocates a primary shard to a newly added node in the cluster. Before relocation and after relocation + * we index documents. We don't perform any flush before relocation is done. */ public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception { logger.info("--> starting [primary] ..."); @@ -146,9 +149,6 @@ public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exc assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - // If we do a flush before relocation, this test will pass. - // flush(INDEX_NAME); - logger.info("--> start another node"); final String new_primary = internalCluster().startNode(); ClusterHealthResponse clusterHealthResponse = client().admin() @@ -179,24 +179,94 @@ public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exc assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); final int finalDocCount = 1; - client().prepareIndex(INDEX_NAME).setId("20").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + client().prepareIndex(INDEX_NAME).setId("201").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); waitForReplicaUpdate(); logger.info("--> verifying count again {} + {}", initialDocCount, finalDocCount); client().admin().indices().prepareRefresh().execute().actionGet(); - assertBusy(() -> { - assertHitCount( - client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount + assertHitCount( + client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + } + + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { + logger.info("--> starting [primary] ..."); + final String primary = internalCluster().startNode(); + + logger.info("--> creating test index ..."); + prepareCreate( + INDEX_NAME, + Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put("index.refresh_interval", -1) + ).get(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have an actual index"); + client().admin().indices().prepareFlush().execute().actionGet(); + logger.info("--> index more docs so we have something in the translog"); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 10; i < 20; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() ); - }); - assertBusy(() -> { - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount + } + + logger.info("--> start another node"); + final String replica = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> relocate the shard from primary to replica"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .execute(); + logger.info("--> index 100 docs while relocating"); + for (int i = 20; i < 120; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() ); - }); + } + relocationListener.actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> verifying count"); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 120); } public void testPrimaryStopped_ReplicaPromoted() throws Exception { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 59d6f51972ccb..ff819548f728a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -451,7 +451,7 @@ public boolean shouldCache(Query query) { * This function is used to perform a segment replication on target primary node in order to copy segment files * previously copied to other replicas. This is done so that new primary doesn't conflict during new segment * replication round with existing replicas. - * @param listener + * @param listener segment replication event listener */ public void performSegmentReplicationRefresh(StepListener listener) { this.segmentReplicationTargetService.startReplication( @@ -471,7 +471,7 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - logger.error("segment replication failure post recovery {}", e); + logger.error("segment replication failure post recovery", e); listener.onFailure(e); if (sendShardFailure == true) { failShard("segment replication failure post recovery", e);