Skip to content

Commit

Permalink
Merge pull request #624 from priyatsh/main
Browse files Browse the repository at this point in the history
Github-Issue-544:Replication auto pauses on follower cluster having w…
  • Loading branch information
priyatsh authored Nov 17, 2022
2 parents f3fee90 + 2384c38 commit 2fba1f7
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING,
IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS
)

val blockListedSettings :Set<String> = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,158 @@ class StartReplicationIT: MultiClusterRestTestCase() {

}

fun `test that wait_for_active_shards setting is set on leader and not on follower`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2))
.build()

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

// Verify the setting on leader
val getLeaderSettingsRequest = GetSettingsRequest()
getLeaderSettingsRequest.indices(leaderIndexName)
getLeaderSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"2",
leaderClient.indices()
.getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()]
)
}, 15, TimeUnit.SECONDS)

// Verify that the setting is not updated on follower and follower has default value of the setting
val getSettingsRequest = GetSettingsRequest()
getSettingsRequest.indices(followerIndexName)
getSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key)
)
}, 15, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test that wait_for_active_shards setting is updated on leader and not on follower`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

//Use Update API
val settingsBuilder = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2))

val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName)
.settings(settingsBuilder.build()), RequestOptions.DEFAULT)
Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true)

TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

// Verify the setting on leader
val getLeaderSettingsRequest = GetSettingsRequest()
getLeaderSettingsRequest.indices(leaderIndexName)
getLeaderSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"2",
leaderClient.indices()
.getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()]
)
}, 15, TimeUnit.SECONDS)


val getSettingsRequest = GetSettingsRequest()
getSettingsRequest.indices(followerIndexName)
getSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key)
)
}, 15, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test that wait_for_active_shards setting is updated on follower through start replication api`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()

val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2))
.build()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, settings = settings))
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

val getSettingsRequest = GetSettingsRequest()
getSettingsRequest.indices(followerIndexName)
getSettingsRequest.includeDefaults(true)
assertBusy ({
Assert.assertEquals(
"2",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()]
)
}, 15, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

private fun excludeAllClusterNodes(clusterName: String) {
val transientSettingsRequest = Request("PUT", "_cluster/settings")
// Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster.
Expand Down

0 comments on commit 2fba1f7

Please sign in to comment.