Skip to content

Commit

Permalink
Add integration test for indexing during relocation
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Nov 23, 2022
1 parent b413298 commit 1414c09
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -29,6 +32,7 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
Expand All @@ -45,6 +49,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -95,7 +100,6 @@ protected boolean addMockInternalEngine() {
return false;
}


private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);

public void ingestDocs(int docCount) throws Exception {
Expand All @@ -118,9 +122,8 @@ public void ingestDocs(int docCount) throws Exception {
}

/**
* This Integration Relocates a primary shard to another node. Before Relocation and after relocation we index single document. We don't perform any flush
* before relocation is done.
* This test will pass if we perform flush before relocation.
* This test relocates a primary shard to a newly added node in the cluster. Before relocation and after relocation
* we index documents. We don't perform any flush before relocation is done.
*/
public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception {
logger.info("--> starting [primary] ...");
Expand All @@ -146,9 +149,6 @@ public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exc
assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

// If we do a flush before relocation, this test will pass.
// flush(INDEX_NAME);

logger.info("--> start another node");
final String new_primary = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
Expand Down Expand Up @@ -179,24 +179,94 @@ public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exc
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

final int finalDocCount = 1;
client().prepareIndex(INDEX_NAME).setId("20").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
client().prepareIndex(INDEX_NAME).setId("201").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
waitForReplicaUpdate();

logger.info("--> verifying count again {} + {}", initialDocCount, finalDocCount);
client().admin().indices().prepareRefresh().execute().actionGet();
assertBusy(() -> {
assertHitCount(
client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
assertHitCount(
client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
assertHitCount(
client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
}

public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
logger.info("--> starting [primary] ...");
final String primary = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put("index.refresh_interval", -1)
).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have an actual index");
client().admin().indices().prepareFlush().execute().actionGet();
logger.info("--> index more docs so we have something in the translog");
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 10; i < 20; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
});
assertBusy(() -> {
assertHitCount(
client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
}

logger.info("--> start another node");
final String replica = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> relocate the shard from primary to replica");
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica))
.execute();
logger.info("--> index 100 docs while relocating");
for (int i = 20; i < 120; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
});
}
relocationListener.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> verifying count");
assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 120);
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public boolean shouldCache(Query query) {
* This function is used to perform a segment replication on target primary node in order to copy segment files
* previously copied to other replicas. This is done so that new primary doesn't conflict during new segment
* replication round with existing replicas.
* @param listener
* @param listener segment replication event listener
*/
public void performSegmentReplicationRefresh(StepListener<Void> listener) {
this.segmentReplicationTargetService.startReplication(
Expand All @@ -471,7 +471,7 @@ public void onReplicationDone(SegmentReplicationState state) {

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error("segment replication failure post recovery {}", e);
logger.error("segment replication failure post recovery", e);
listener.onFailure(e);
if (sendShardFailure == true) {
failShard("segment replication failure post recovery", e);
Expand Down

0 comments on commit 1414c09

Please sign in to comment.