diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 906312ac..395119c3 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -143,7 +143,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, - IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, + IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS ) val blockListedSettings :Set = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet()) diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index cc6bce2c..73b28a15 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -1242,6 +1242,158 @@ class StartReplicationIT: MultiClusterRestTestCase() { } + fun `test that wait_for_active_shards setting is set on leader and not on follower`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2)) + .build() + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + // Verify the setting on leader + val getLeaderSettingsRequest = GetSettingsRequest() + getLeaderSettingsRequest.indices(leaderIndexName) + getLeaderSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "2", + leaderClient.indices() + .getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()] + ) + }, 15, TimeUnit.SECONDS) + + // Verify that the setting is not updated on follower and follower has default value of the setting + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key) + ) + }, 15, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + fun `test that wait_for_active_shards setting is updated on leader and not on follower`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + //Use Update API + val settingsBuilder = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2)) + + val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName) + .settings(settingsBuilder.build()), RequestOptions.DEFAULT) + Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true) + + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + // Verify the setting on leader + val getLeaderSettingsRequest = GetSettingsRequest() + getLeaderSettingsRequest.indices(leaderIndexName) + getLeaderSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "2", + leaderClient.indices() + .getSettings(getLeaderSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[leaderIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()] + ) + }, 15, TimeUnit.SECONDS) + + + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + + assertBusy ({ + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .getSetting(followerIndexName, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.key) + ) + }, 15, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + fun `test that wait_for_active_shards setting is updated on follower through start replication api`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(2)) + .build() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, settings = settings)) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + assertBusy ({ + Assert.assertEquals( + "2", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey()] + ) + }, 15, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(followerIndexName) + } + } + private fun excludeAllClusterNodes(clusterName: String) { val transientSettingsRequest = Request("PUT", "_cluster/settings") // Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster.