From 45343149c44a3c35f3403aade200cd735f92b105 Mon Sep 17 00:00:00 2001 From: Monu Singh Date: Mon, 29 May 2023 16:26:02 +0530 Subject: [PATCH] [backport 1.1] Replication auto pauses on follower cluster having wait_for_active_shards true (#894) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Merge pull request #624 from priyatsh/main (#888) Github-Issue-544:Replication auto pauses on follower cluster having w… (cherry picked from commit 2fba1f7dfa395580f3aef864d7047f56454f684f) Signed-off-by: Monu Singh Co-authored-by: Priyanka Sharma <114481135+priyatsh@users.noreply.github.com> * Fix merge conflits Signed-off-by: Monu Singh --------- Signed-off-by: Monu Singh Co-authored-by: Priyanka Sharma <114481135+priyatsh@users.noreply.github.com> --- .../task/index/IndexReplicationTask.kt | 3 +- .../integ/rest/StartReplicationIT.kt | 214 +++++++++++++++--- 2 files changed, 189 insertions(+), 28 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 0513a43b..c20144e2 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -148,7 +148,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, - IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, + IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS ) val blockListedSettings :Set = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet()) diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 0ec921b0..5264d245 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -12,28 +12,13 @@ package org.opensearch.replication.integ.rest -import kotlinx.coroutines.delay -import org.opensearch.replication.IndexUtil -import org.opensearch.replication.MultiClusterAnnotations -import org.opensearch.replication.MultiClusterRestTestCase -import org.opensearch.replication.StartReplicationRequest -import org.opensearch.replication.`validate not paused status response` -import org.opensearch.replication.`validate paused status on closed index` -import org.opensearch.replication.pauseReplication -import org.opensearch.replication.replicationStatus -import org.opensearch.replication.resumeReplication -import org.opensearch.replication.`validate paused status response due to leader index deleted` -import org.opensearch.replication.`validate status syncing response` -import org.opensearch.replication.startReplication -import org.opensearch.replication.stopReplication -import org.opensearch.replication.updateReplication import org.apache.http.HttpStatus import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils -import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Assert import org.opensearch.OpenSearchStatusException import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest @@ -51,27 +36,20 @@ import org.opensearch.client.RequestOptions import org.opensearch.client.ResponseException import org.opensearch.client.RestHighLevelClient import org.opensearch.client.core.CountRequest -import org.opensearch.client.indices.CloseIndexRequest -import org.opensearch.client.indices.CreateIndexRequest -import org.opensearch.client.indices.GetIndexRequest -import org.opensearch.client.indices.GetMappingsRequest -import org.opensearch.client.indices.PutMappingRequest +import org.opensearch.client.indices.* import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.MetadataCreateIndexService import org.opensearch.common.io.PathUtils import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.DeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentType import org.opensearch.index.IndexSettings import org.opensearch.index.mapper.MapperService +import org.opensearch.replication.* import org.opensearch.repositories.fs.FsRepository import org.opensearch.test.OpenSearchTestCase.assertBusy -import org.junit.Assert -import org.opensearch.replication.followerStats -import org.opensearch.replication.leaderStats -import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion.log -import java.lang.Thread.sleep -import org.opensearch.replication.updateReplicationStartBlockSetting import java.nio.file.Files import java.util.* import java.util.concurrent.TimeUnit @@ -1052,6 +1030,188 @@ class StartReplicationIT: MultiClusterRestTestCase() { ) } + fun `test that wait_for_active_shards setting is set on leader and not on follower`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2)) + .build() + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + // Verify the setting on leader + val getLeaderSettingsRequest = GetSettingsRequest() + getLeaderSettingsRequest.indices(leaderIndexName) + getLeaderSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "2", + leaderClient.indices() + .getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()] + ) + }, 15, TimeUnit.SECONDS) + + // Verify that the setting is not updated on follower and follower has default value of the setting + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key) + ) + }, 15, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + fun `test that wait_for_active_shards setting is updated on leader and not on follower`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + //Use Update API + val settingsBuilder = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2)) + + val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName) + .settings(settingsBuilder.build()), RequestOptions.DEFAULT) + Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true) + + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + // Verify the setting on leader + val getLeaderSettingsRequest = GetSettingsRequest() + getLeaderSettingsRequest.indices(leaderIndexName) + getLeaderSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "2", + leaderClient.indices() + .getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()] + ) + }, 15, TimeUnit.SECONDS) + + + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key) + ) + }, 15, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + fun `test that wait_for_active_shards setting is updated on follower through start replication api`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2)) + .build() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, settings = settings)) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + assertBusy ({ + Assert.assertEquals( + "2", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()] + ) + }, 15, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + private fun excludeAllClusterNodes(clusterName: String) { + val transientSettingsRequest = Request("PUT", "_cluster/settings") + // Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster. + val excludeIps = getClusterNodeIPs(clusterName) + val entityAsString = """ + { + "transient": { + "cluster.routing.allocation.exclude._ip": "${excludeIps.joinToString()}" + } + }""".trimMargin() + transientSettingsRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON) + val transientSettingsResponse = getNamedCluster(clusterName).lowLevelClient.performRequest(transientSettingsRequest) + assertEquals(HttpStatus.SC_OK.toLong(), transientSettingsResponse.statusLine.statusCode.toLong()) + } + + private fun getClusterNodeIPs(clusterName: String): List { + val clusterClient = getNamedCluster(clusterName).lowLevelClient + val nodesRequest = Request("GET", "_cat/nodes?format=json") + val nodesResponse = EntityUtils.toString(clusterClient.performRequest(nodesRequest).entity) + val nodeIPs = arrayListOf() + val parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, nodesResponse) + parser.list().forEach { + it as Map<*, *> + nodeIPs.add(it["ip"] as String) + } + return nodeIPs + } + private fun assertValidationFailure(client: RestHighLevelClient, leader: String, follower: String, errrorMsg: String) { assertThatThrownBy { client.startReplication(StartReplicationRequest("source", leader, follower))