Skip to content

Commit

Permalink
[backport 1.1] Replication auto pauses on follower cluster having wai…
Browse files Browse the repository at this point in the history
…t_for_active_shards true (#894)

* Merge pull request #624 from priyatsh/main (#888)

Github-Issue-544:Replication auto pauses on follower cluster having w…
(cherry picked from commit 2fba1f7)

Signed-off-by: Monu Singh <[email protected]>
Co-authored-by: Priyanka Sharma <[email protected]>

* Fix merge conflits

Signed-off-by: Monu Singh <[email protected]>

---------

Signed-off-by: Monu Singh <[email protected]>
Co-authored-by: Priyanka Sharma <[email protected]>
  • Loading branch information
monusingh-1 and priyatsh authored May 29, 2023
1 parent 7e46fc4 commit 4534314
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING,
IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS
)

val blockListedSettings :Set<String> = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,13 @@
package org.opensearch.replication.integ.rest


import kotlinx.coroutines.delay
import org.opensearch.replication.IndexUtil
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.`validate not paused status response`
import org.opensearch.replication.`validate paused status on closed index`
import org.opensearch.replication.pauseReplication
import org.opensearch.replication.replicationStatus
import org.opensearch.replication.resumeReplication
import org.opensearch.replication.`validate paused status response due to leader index deleted`
import org.opensearch.replication.`validate status syncing response`
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.opensearch.replication.updateReplication
import org.apache.http.HttpStatus
import org.apache.http.entity.ContentType
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
Expand All @@ -51,27 +36,20 @@ import org.opensearch.client.RequestOptions
import org.opensearch.client.ResponseException
import org.opensearch.client.RestHighLevelClient
import org.opensearch.client.core.CountRequest
import org.opensearch.client.indices.CloseIndexRequest
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.client.indices.GetMappingsRequest
import org.opensearch.client.indices.PutMappingRequest
import org.opensearch.client.indices.*
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexSettings
import org.opensearch.index.mapper.MapperService
import org.opensearch.replication.*
import org.opensearch.repositories.fs.FsRepository
import org.opensearch.test.OpenSearchTestCase.assertBusy
import org.junit.Assert
import org.opensearch.replication.followerStats
import org.opensearch.replication.leaderStats
import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion.log
import java.lang.Thread.sleep
import org.opensearch.replication.updateReplicationStartBlockSetting
import java.nio.file.Files
import java.util.*
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -1052,6 +1030,188 @@ class StartReplicationIT: MultiClusterRestTestCase() {
)
}

fun `test that wait_for_active_shards setting is set on leader and not on follower`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2))
.build()

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

// Verify the setting on leader
val getLeaderSettingsRequest = GetSettingsRequest()
getLeaderSettingsRequest.indices(leaderIndexName)
getLeaderSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"2",
leaderClient.indices()
.getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()]
)
}, 15, TimeUnit.SECONDS)

// Verify that the setting is not updated on follower and follower has default value of the setting
val getSettingsRequest = GetSettingsRequest()
getSettingsRequest.indices(followerIndexName)
getSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key)
)
}, 15, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test that wait_for_active_shards setting is updated on leader and not on follower`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

//Use Update API
val settingsBuilder = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2))

val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName)
.settings(settingsBuilder.build()), RequestOptions.DEFAULT)
Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true)

TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

// Verify the setting on leader
val getLeaderSettingsRequest = GetSettingsRequest()
getLeaderSettingsRequest.indices(leaderIndexName)
getLeaderSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"2",
leaderClient.indices()
.getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()]
)
}, 15, TimeUnit.SECONDS)


val getSettingsRequest = GetSettingsRequest()
getSettingsRequest.indices(followerIndexName)
getSettingsRequest.includeDefaults(true)

assertBusy ({
Assert.assertEquals(
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key)
)
}, 15, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test that wait_for_active_shards setting is updated on follower through start replication api`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()

val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2))
.build()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, settings = settings))
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC)

val getSettingsRequest = GetSettingsRequest()
getSettingsRequest.indices(followerIndexName)
getSettingsRequest.includeDefaults(true)
assertBusy ({
Assert.assertEquals(
"2",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()]
)
}, 15, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

private fun excludeAllClusterNodes(clusterName: String) {
val transientSettingsRequest = Request("PUT", "_cluster/settings")
// Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster.
val excludeIps = getClusterNodeIPs(clusterName)
val entityAsString = """
{
"transient": {
"cluster.routing.allocation.exclude._ip": "${excludeIps.joinToString()}"
}
}""".trimMargin()
transientSettingsRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON)
val transientSettingsResponse = getNamedCluster(clusterName).lowLevelClient.performRequest(transientSettingsRequest)
assertEquals(HttpStatus.SC_OK.toLong(), transientSettingsResponse.statusLine.statusCode.toLong())
}

private fun getClusterNodeIPs(clusterName: String): List<String> {
val clusterClient = getNamedCluster(clusterName).lowLevelClient
val nodesRequest = Request("GET", "_cat/nodes?format=json")
val nodesResponse = EntityUtils.toString(clusterClient.performRequest(nodesRequest).entity)
val nodeIPs = arrayListOf<String>()
val parser = XContentType.JSON.xContent().createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, nodesResponse)
parser.list().forEach {
it as Map<*, *>
nodeIPs.add(it["ip"] as String)
}
return nodeIPs
}

private fun assertValidationFailure(client: RestHighLevelClient, leader: String, follower: String, errrorMsg: String) {
assertThatThrownBy {
client.startReplication(StartReplicationRequest("source", leader, follower))
Expand Down

0 comments on commit 4534314

Please sign in to comment.