Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] For replica recovery, force segment replication sync from peer recovery source #5746

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix unit test
Signed-off-by: Suraj Singh <[email protected]>
dreamer-89 committed Feb 1, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit e4e107d69bf10492b1a6f924947d3f64014c043a
Original file line number Diff line number Diff line change
@@ -9,20 +9,15 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
@@ -138,129 +133,6 @@ public void testCancelPrimaryAllocation() throws Exception {
verifyStoreContent();
}

/**
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
* replication checkpoint upto the primary's by performing a round of segment replication.
*/
public void testNewlyAddedReplicaIsUpdated() {
internalCluster().startNode(featureFlagSettings());
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have some segment files on disk");
flush(INDEX_NAME);
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
// Update replica count settings to 1 so that peer recovery triggers and recover replicaNode
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertFalse(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);
}

/**
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)

).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have some segment files on disk");
flush(INDEX_NAME);
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
logger.info("--> verifying count");
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
throw new OpenSearchCorruptionException("expected");
}
connection.sendRequest(requestId, action, request, options);
}
);
ensureGreen(INDEX_NAME);
// Add Replica shard to the new empty replica node
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertTrue(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
@@ -32,6 +33,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This test class verifies primary shard relocation with segment replication as replication strategy.
*/
@@ -398,4 +401,129 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
waitForSearchableDocs(totalDocCount, replica, newPrimary);
verifyStoreContent();
}

/**
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
* replication checkpoint upto the primary's by performing a round of segment replication.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testNewlyAddedReplicaIsUpdated() {
internalCluster().startNode(featureFlagSettings());
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have some segment files on disk");
flush(INDEX_NAME);
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
// Update replica count settings to 1 so that peer recovery triggers and recover replicaNode
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertFalse(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);
}

/**
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)

).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have some segment files on disk");
flush(INDEX_NAME);
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
logger.info("--> verifying count");
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
throw new OpenSearchCorruptionException("expected");
}
connection.sendRequest(requestId, action, request, options);
}
);
ensureGreen(INDEX_NAME);
// Add Replica shard to the new empty replica node
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertTrue(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the flakiness in these tests is asserting that IndicesService exists. After the round of SR fails the shard will fail and node will be yellow, it will then try and spin up and recover the shard again, causing this assertion to trip?

to confirm you could maybe flip this assertion to true and wrap it in an assertBusy and see that it always succeeds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the flakiness in these tests is asserting that IndicesService exists.

Yes, it was flaky and fails asserting index on replica doesn't exist.

After the round of SR fails the shard will fail and node will be yellow, it will then try and spin up and recover the shard again, causing this assertion to trip?

Bingo, yes you are right. The recovery kicks in again post failure. Modified the test to wait for first round of recovery before assertion.

assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
}

}
Original file line number Diff line number Diff line change
@@ -518,18 +518,17 @@ public void testReplicaReceivesLowerGeneration() throws Exception {
replicateSegments(primary, List.of(replica_1));

assertEqualCommittedSegments(primary, replica_1);
assertLatestCommitGen(4, primary, replica_1);
assertLatestCommitGen(2, replica_2);
assertLatestCommitGen(4, primary);
assertLatestCommitGen(5, replica_1);
assertLatestCommitGen(3, replica_2);

shards.promoteReplicaToPrimary(replica_2).get();
primary.close("demoted", false);
primary.store().close();
IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId());
shards.recoverReplica(oldPrimary);
assertLatestCommitGen(4, oldPrimary);
assertEqualCommittedSegments(oldPrimary, replica_1);

assertLatestCommitGen(4, replica_2);
assertLatestCommitGen(5, oldPrimary);
assertLatestCommitGen(5, replica_2);

numDocs = randomIntBetween(numDocs + 1, numDocs + 10);
shards.indexDocs(numDocs);
@@ -716,10 +715,12 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
for (IndexShard shard : shards.getReplicas()) {
assertDocCounts(shard, totalDocs, numDocs);
}
assertEquals(additonalDocs, nextPrimary.translogStats().estimatedNumberOfOperations());
assertEquals(additonalDocs, replica.translogStats().estimatedNumberOfOperations());
assertEquals(additonalDocs, nextPrimary.translogStats().getUncommittedOperations());
assertEquals(additonalDocs, replica.translogStats().getUncommittedOperations());
assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations());
assertEquals(totalDocs, oldPrimary.translogStats().estimatedNumberOfOperations());
assertEquals(totalDocs, nextPrimary.translogStats().estimatedNumberOfOperations());
assertEquals(totalDocs, replica.translogStats().estimatedNumberOfOperations());
assertEquals(totalDocs, nextPrimary.translogStats().getUncommittedOperations());
assertEquals(totalDocs, replica.translogStats().getUncommittedOperations());

// promote the replica
shards.syncGlobalCheckpoint();