Skip to content

Commit

Permalink
Resolve conflicts and enabled tests
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jan 5, 2023
1 parent aa5e3f3 commit 68d16e2
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 33 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))



### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void ingestDocs(int docCount) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,34 @@ private void createIndex() {
* This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before
* relocation and after relocation documents are indexed and documents are verified
*/
public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception {
final String old_primary = internalCluster().startNode();
public void testPrimaryRelocation() throws Exception {
final String oldPrimary = internalCluster().startNode(featureFlagSettings());
createIndex();
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
ingestDocs(initialDocCount);

logger.info("--> verifying count {}", initialDocCount);
assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

logger.info("--> start another node");
final String new_primary = internalCluster().startNode();
final String newPrimary = internalCluster().startNode(featureFlagSettings());
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertEquals(clusterHealthResponse.isTimedOut(), false);

logger.info("--> relocate the shard");
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary))
.add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
Expand All @@ -94,25 +94,26 @@ public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exc
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertEquals(clusterHealthResponse.isTimedOut(), false);

logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();

logger.info("--> state {}", state);

assertThat(
state.getRoutingNodes().node(state.nodes().resolveNode(new_primary).getId()).iterator().next().state(),
equalTo(ShardRoutingState.STARTED)
assertEquals(
state.getRoutingNodes().node(state.nodes().resolveNode(newPrimary).getId()).iterator().next().state(),
ShardRoutingState.STARTED
);

final int finalDocCount = initialDocCount;
ingestDocs(finalDocCount);
refresh(INDEX_NAME);

logger.info("--> verifying count again {}", initialDocCount + finalDocCount);
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(
client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
assertHitCount(
Expand All @@ -122,40 +123,40 @@ public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exc
}

/**
* This test verifies the primary to primary relocation behavior when segment replication round fails on new primary.
* Post failure, more documents are ingested and verified on replica which confirms replica still getting refresh from
* older primary.
* This test verifies the primary relocation behavior when segment replication round fails during recovery. Post
* failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the
* replicas.
*/
public void testPrimaryRelocationWithSegRepFailure() throws Exception {
final String old_primary = internalCluster().startNode();
final String oldPrimary = internalCluster().startNode(featureFlagSettings());
createIndex();
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(1, 100);
ingestDocs(initialDocCount);

logger.info("--> verifying count {}", initialDocCount);
assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

logger.info("--> start another node");
final String new_primary = internalCluster().startNode();
final String newPrimary = internalCluster().startNode(featureFlagSettings());
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertEquals(clusterHealthResponse.isTimedOut(), false);

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
old_primary
oldPrimary
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, new_primary),
internalCluster().getInstance(TransportService.class, newPrimary),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
throw new OpenSearchCorruptionException("expected");
Expand All @@ -168,7 +169,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary))
.add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
Expand All @@ -179,7 +180,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertEquals(clusterHealthResponse.isTimedOut(), false);

final int finalDocCount = initialDocCount;
ingestDocs(finalDocCount);
Expand All @@ -188,7 +189,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
logger.info("Verify older primary is still refreshing replica nodes");
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(
client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
assertHitCount(
Expand All @@ -197,8 +198,12 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
);
}

/**
* This test verifies primary recovery behavior with continuous ingestion
*
*/
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
prepareCreate(
INDEX_NAME,
Settings.builder()
Expand Down Expand Up @@ -227,15 +232,15 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E
);
}

final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertEquals(clusterHealthResponse.isTimedOut(), false);

logger.info("--> relocate the shard from primary to replica");
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin()
Expand All @@ -261,13 +266,13 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertEquals(clusterHealthResponse.isTimedOut(), false);

logger.info("--> verifying count");
assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L));
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ public void recoverReplica(
targetSupplier,
markAsRecovering,
inSyncIds,
routingTable
routingTable,
(a, b) -> null
);
OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
computeReplicationTargets();
Expand Down

0 comments on commit 68d16e2

Please sign in to comment.