From 249553fe88ebf3514568374e23e2dd0ee0b6621e Mon Sep 17 00:00:00 2001 From: sricharanvuppu <113983630+sricharanvuppu@users.noreply.github.com> Date: Tue, 31 Jan 2023 11:09:08 +0530 Subject: [PATCH] [Backport #619 to 1.1] stopReplication API removed from integ test cases (#694) * stopReplication API removed from integ test cases and stopAllReplicationJobs API added in MultiClusterRestTestCase (#619) Signed-off-by: sricharanvuppu --- .../replication/BasicReplicationIT.kt | 136 ++- .../replication/MultiClusterRestTestCase.kt | 28 +- .../integ/rest/ClusterRerouteFollowerIT.kt | 69 +- .../integ/rest/ClusterRerouteLeaderIT.kt | 67 +- .../integ/rest/PauseReplicationIT.kt | 140 ++- .../integ/rest/ReplicationStatusIT.kt | 14 +- .../integ/rest/ResumeReplicationIT.kt | 207 ++-- .../replication/integ/rest/SecurityBase.kt | 1 - .../integ/rest/SecurityCustomRolesIT.kt | 240 ++--- .../integ/rest/SecurityCustomRolesLeaderIT.kt | 15 - .../integ/rest/SecurityDlsFlsIT.kt | 121 +-- .../integ/rest/StartReplicationIT.kt | 902 +++++++----------- .../integ/rest/StopReplicationIT.kt | 107 +-- .../integ/rest/UpdateAutoFollowPatternIT.kt | 86 +- .../replication/task/TaskCancellationIT.kt | 48 +- .../shard/TransportReplayChangesActionIT.kt | 96 +- 16 files changed, 895 insertions(+), 1382 deletions(-) diff --git a/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt index 7d426ace..ff3bfa83 100644 --- a/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt @@ -44,120 +44,96 @@ class BasicReplicationIT : MultiClusterRestTestCase() { val follower = getClientForCluster(FOLL) val leader = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLL, LEADER) - val leaderIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) val followerIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) // Create an empty index on the leader and trigger replication on it val createIndexResponse = leader.indices().create(CreateIndexRequest(leaderIndex), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - follower.startReplication(StartReplicationRequest("source", leaderIndex, followerIndex), waitForRestore=true) - - val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) - var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT) + follower.startReplication(StartReplicationRequest("source", leaderIndex, followerIndex), waitForRestore=true) + val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) + var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT) + assertThat(response.result).isEqualTo(Result.CREATED) + assertBusy({ + val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT) + assertThat(getResponse.isExists).isTrue() + assertThat(getResponse.sourceAsMap).isEqualTo(source) + }, 60L, TimeUnit.SECONDS) + // Ensure force merge on leader doesn't impact replication + for (i in 2..5) { + response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT) assertThat(response.result).isEqualTo(Result.CREATED) - - assertBusy({ - val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT) + } + leader.indices().forcemerge(ForceMergeRequest(leaderIndex), RequestOptions.DEFAULT) + for (i in 6..10) { + response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT) + assertThat(response.result).isEqualTo(Result.CREATED) + } + assertBusy({ + for (i in 2..10) { + val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) - }, 60L, TimeUnit.SECONDS) - - // Ensure force merge on leader doesn't impact replication - for (i in 2..5) { - response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT) - assertThat(response.result).isEqualTo(Result.CREATED) - } - leader.indices().forcemerge(ForceMergeRequest(leaderIndex), RequestOptions.DEFAULT) - for (i in 6..10) { - response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT) - assertThat(response.result).isEqualTo(Result.CREATED) } - assertBusy({ - for (i in 2..10) { - val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT) - assertThat(getResponse.isExists).isTrue() - assertThat(getResponse.sourceAsMap).isEqualTo(source) - } - }, 60L, TimeUnit.SECONDS) - - // Force merge on follower however isn't allowed due to WRITE block - Assertions.assertThatThrownBy { - follower.indices().forcemerge(ForceMergeRequest(followerIndex), RequestOptions.DEFAULT) - }.isInstanceOf(OpenSearchStatusException::class.java) - .hasMessage("OpenSearch exception [type=cluster_block_exception, reason=index [$followerIndex] " + - "blocked by: [FORBIDDEN/1000/index read-only(cross-cluster-replication)];]") - - } finally { - follower.stopReplication(followerIndex) - } + }, 60L, TimeUnit.SECONDS) + // Force merge on follower however isn't allowed due to WRITE block + Assertions.assertThatThrownBy { + follower.indices().forcemerge(ForceMergeRequest(followerIndex), RequestOptions.DEFAULT) + }.isInstanceOf(OpenSearchStatusException::class.java) + .hasMessage("OpenSearch exception [type=cluster_block_exception, reason=index [$followerIndex] " + + "blocked by: [FORBIDDEN/1000/index read-only(cross-cluster-replication)];]") } fun `test existing index replication`() { val follower = getClientForCluster(FOLL) val leader = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLL, LEADER) - // Create an index with data before commencing replication val leaderIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) val followerIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) val response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT) assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED) - follower.startReplication(StartReplicationRequest("source", leaderIndex, followerIndex), waitForRestore=true) - assertBusy { val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) } - follower.stopReplication(followerIndex) } fun `test that index operations are replayed to follower during replication`() { val followerClient = getClientForCluster(FOLL) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLL, LEADER) - + val leaderIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) + val followerIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore=true) - - // Create document - var source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) - var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) - assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED) - - assertBusy({ - val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) - assertThat(getResponse.isExists).isTrue() - assertThat(getResponse.sourceAsMap).isEqualTo(source) - }, 60L, TimeUnit.SECONDS) - - // Update document - source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) - response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) - assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED) - - assertBusy({ - val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) - assertThat(getResponse.isExists).isTrue() - assertThat(getResponse.sourceAsMap).isEqualTo(source) - },60L, TimeUnit.SECONDS) - - // Delete document - val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT) - assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED) - - assertBusy({ - val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) - assertThat(getResponse.isExists).isFalse() - }, 60L, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore=true) + // Create document + var source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) + var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) + assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED) + assertBusy({ + val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) + assertThat(getResponse.isExists).isTrue() + assertThat(getResponse.sourceAsMap).isEqualTo(source) + }, 60L, TimeUnit.SECONDS) + // Update document + source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) + response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) + assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED) + assertBusy({ + val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) + assertThat(getResponse.isExists).isTrue() + assertThat(getResponse.sourceAsMap).isEqualTo(source) + },60L, TimeUnit.SECONDS) + // Delete document + val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT) + assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED) + assertBusy({ + val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) + assertThat(getResponse.isExists).isFalse() + }, 60L, TimeUnit.SECONDS) } } diff --git a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt index 999b704e..ea433695 100644 --- a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt @@ -11,6 +11,7 @@ package org.opensearch.replication +import com.nhaarman.mockitokotlin2.stub import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass @@ -56,6 +57,7 @@ import org.junit.After import org.junit.AfterClass import org.junit.Before import org.junit.BeforeClass +import org.opensearch.index.mapper.ObjectMapper import java.nio.file.Files import java.security.KeyManagementException import java.security.KeyStore @@ -408,8 +410,32 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() { testCluster.lowLevelClient.performRequest(request) } } - + private fun stopAllReplicationJobs(testCluster: TestCluster) { + val indicesResponse = testCluster.lowLevelClient.performRequest((Request("GET","/_cat/indices/*,-.*?format=json&pretty"))) + val indicesResponseEntity = EntityUtils.toString(indicesResponse.entity) + var parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indicesResponseEntity) + parser.list().forEach{ item-> + val str = item.toString() + val map = str.subSequence(1,str.length-1).split(",").associate { + val (key, value) = it.trim().split("=") + key to value + } + val ind = map.get("index") + try { + val stopRequest = Request("POST","/_plugins/_replication/" + ind.toString() + "/_stop") + stopRequest.setJsonEntity("{}") + stopRequest.setOptions(RequestOptions.DEFAULT) + val response=testCluster.lowLevelClient.performRequest(stopRequest) + } + catch (e:ResponseException){ + if(e.response.statusLine.statusCode!=400) { + throw e + } + } + } + } protected fun wipeIndicesFromCluster(testCluster: TestCluster) { + stopAllReplicationJobs(testCluster) try { val deleteRequest = Request("DELETE", "*,-.*") // All except system indices val response = testCluster.lowLevelClient.performRequest(deleteRequest) diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt index 4461f026..0266b6cc 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteFollowerIT.kt @@ -28,44 +28,35 @@ class ClusterRerouteFollowerIT : MultiClusterRestTestCase() { fun `test replication works after rerouting a shard from one node to another in follower cluster`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - try { - changeTemplate(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) - - //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying - assertBusy ({ - try { - Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") - } catch (ex: Exception) { - Assert.fail("Exception while querying follower cluster. Failing to retry again") - } - }, 1, TimeUnit.MINUTES) - - val nodes = getNodesInCluster(FOLLOWER) - - val primaryNode = getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0") - val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get() - rerouteShard(FOLLOWER, "0", followerIndexName, primaryNode, unassignedNode) - - assertBusy ({ - Assertions.assertThat(getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")).isEqualTo(unassignedNode) - }, 1, TimeUnit.MINUTES) - logger.info("rereouted shard is " + getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")) - insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName) - - assertBusy ({ - try { - Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2") - } catch (ex: Exception) { - Assert.fail("Exception while querying follower cluster. Failing to retry again") - } - }, 1, TimeUnit.MINUTES) - } finally { - followerClient.stopReplication(followerIndexName) - } + changeTemplate(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) + //Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying + assertBusy ({ + try { + Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") + } catch (ex: Exception) { + Assert.fail("Exception while querying follower cluster. Failing to retry again") + } + }, 1, TimeUnit.MINUTES) + val nodes = getNodesInCluster(FOLLOWER) + val primaryNode = getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0") + val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get() + rerouteShard(FOLLOWER, "0", followerIndexName, primaryNode, unassignedNode) + assertBusy ({ + Assertions.assertThat(getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")).isEqualTo(unassignedNode) + }, 1, TimeUnit.MINUTES) + logger.info("rereouted shard is " + getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")) + insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName) + assertBusy ({ + try { + Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2") + } catch (ex: Exception) { + Assert.fail("Exception while querying follower cluster. Failing to retry again") + } + }, 1, TimeUnit.MINUTES) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt index f0f7a560..a03d45cf 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ClusterRerouteLeaderIT.kt @@ -29,43 +29,34 @@ class ClusterRerouteLeaderIT : MultiClusterRestTestCase() { fun `test replication works after rerouting a shard from one node to another in leader cluster`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - try { - changeTemplate(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) - - //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying - assertBusy ({ - try { - Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") - } catch (ex: Exception) { - Assert.fail("Exception while querying follower cluster. Failing to retry again") - } - }, 1, TimeUnit.MINUTES) - - val nodes = getNodesInCluster(LEADER) - val primaryNode = getPrimaryNodeForShard(LEADER,leaderIndexName, "0") - val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get() - rerouteShard(LEADER, "0", leaderIndexName, primaryNode, unassignedNode) - - assertBusy ({ - Assertions.assertThat(getPrimaryNodeForShard(LEADER,leaderIndexName, "0")).isEqualTo(unassignedNode) - }, 1, TimeUnit.MINUTES) - - insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName) - - assertBusy ({ - try { - Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2") - } catch (ex: Exception) { - Assert.fail("Exception while querying follower cluster. Failing to retry again") - } - }, 1, TimeUnit.MINUTES) - } finally { - followerClient.stopReplication(followerIndexName) - } + changeTemplate(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) + //Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying + assertBusy ({ + try { + Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") + } catch (ex: Exception) { + Assert.fail("Exception while querying follower cluster. Failing to retry again") + } + }, 1, TimeUnit.MINUTES) + val nodes = getNodesInCluster(LEADER) + val primaryNode = getPrimaryNodeForShard(LEADER,leaderIndexName, "0") + val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get() + rerouteShard(LEADER, "0", leaderIndexName, primaryNode, unassignedNode) + assertBusy ({ + Assertions.assertThat(getPrimaryNodeForShard(LEADER,leaderIndexName, "0")).isEqualTo(unassignedNode) + }, 1, TimeUnit.MINUTES) + insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName) + assertBusy ({ + try { + Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2") + } catch (ex: Exception) { + Assert.fail("Exception while querying follower cluster. Failing to retry again") + } + }, 1, TimeUnit.MINUTES) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt index 6d013746..7c976236 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt @@ -53,38 +53,28 @@ class PauseReplicationIT: MultiClusterRestTestCase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "pause_index_follow_state" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - - val myReason = "I want to pause!" - - /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication - and verify the same - */ - followerClient.pauseReplication(followerIndexName, myReason) - // Since, we were still in FOLLOWING phase when pause was called, the index - // in follower index should not have been deleted in follower cluster - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - - val statusResp = followerClient.replicationStatus(followerIndexName) - `validate paused status response`(statusResp, myReason) - - var settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build() - - followerClient.updateReplication( followerIndexName, settings) - followerClient.resumeReplication(followerIndexName) - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + val myReason = "I want to pause!" + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName, myReason) + // Since, we were still in FOLLOWING phase when pause was called, the index + // in follower index should not have been deleted in follower cluster + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) } + val statusResp = followerClient.replicationStatus(followerIndexName) + `validate paused status response`(statusResp, myReason) + var settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + followerClient.updateReplication( followerIndexName, settings) + followerClient.resumeReplication(followerIndexName) } fun `test pause replication in restoring state with multiple shards`() { @@ -110,7 +100,6 @@ class PauseReplicationIT: MultiClusterRestTestCase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "pause_index_restore_state" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() @@ -120,24 +109,20 @@ class PauseReplicationIT: MultiClusterRestTestCase() { assertThat(leaderClient.indices() .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) } - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - false) - //Given the size of index, the replication should be in RESTORING phase at this point - assertThatThrownBy { - followerClient.pauseReplication(followerIndexName) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining("Index is in restore phase currently for index: ${followerIndexName}") - // wait for the shard tasks to be up as the replication block is added before adding shard replication tasks - // During intermittent test failures, stop replication under finally block executes before this without removing - // replication block (even though next call to _stop replication API can succeed in removing this block). - assertBusy({ - assertTrue(followerClient.getShardReplicationTasks(followerIndexName).isNotEmpty()) - }, 30L, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + false) + //Given the size of index, the replication should be in RESTORING phase at this point + assertThatThrownBy { + followerClient.pauseReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Index is in restore phase currently for index: ${followerIndexName}") + // wait for the shard tasks to be up as the replication block is added before adding shard replication tasks + // During intermittent test failures, stop replication under finally block executes before this without removing + // replication block (even though next call to _stop replication API can succeed in removing this block). + assertBusy({ + assertTrue(followerClient.getShardReplicationTasks(followerIndexName).isNotEmpty()) + }, 30L, TimeUnit.SECONDS) } fun `test pause without replication in progress`() { @@ -163,12 +148,10 @@ class PauseReplicationIT: MultiClusterRestTestCase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "pause_index_with_stop" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() try { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication and verify the same */ @@ -192,37 +175,30 @@ class PauseReplicationIT: MultiClusterRestTestCase() { fun `test pause replication when leader cluster is unavailable`() { val followerClient = getClientForCluster(FOLLOWER) val followerIndexName = "pause_index_leader_down" - try { - val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), - waitForRestore = true) - // Need to wait till index blocks appear into state - assertBusy({ - val clusterBlocksResponse = followerClient.lowLevelClient.performRequest(Request("GET", "/_cluster/state/blocks")) - val clusterResponseString = EntityUtils.toString(clusterBlocksResponse.entity) - assertThat(clusterResponseString.contains("cross-cluster-replication")) - .withFailMessage("Cant find replication block after starting replication") - .isTrue() - }, 10, TimeUnit.SECONDS) - - // setting an invalid seed so that leader cluster is unavailable - val settings: Settings = Settings.builder() - .putList("cluster.remote.source.seeds", "127.0.0.1:9305") - .build() - val updateSettingsRequest = ClusterUpdateSettingsRequest() - updateSettingsRequest.persistentSettings(settings) - followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT) - - followerClient.pauseReplication(followerIndexName) - - val statusResp = followerClient.replicationStatus(followerIndexName) - `validate paused status response`(statusResp) - - } finally { - followerClient.stopReplication(followerIndexName) - } + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + // Need to wait till index blocks appear into state + assertBusy({ + val clusterBlocksResponse = followerClient.lowLevelClient.performRequest(Request("GET", "/_cluster/state/blocks")) + val clusterResponseString = EntityUtils.toString(clusterBlocksResponse.entity) + assertThat(clusterResponseString.contains("cross-cluster-replication")) + .withFailMessage("Cant find replication block after starting replication") + .isTrue() + }, 10, TimeUnit.SECONDS) + + // setting an invalid seed so that leader cluster is unavailable + val settings: Settings = Settings.builder() + .putList("cluster.remote.source.seeds", "127.0.0.1:9305") + .build() + val updateSettingsRequest = ClusterUpdateSettingsRequest() + updateSettingsRequest.persistentSettings(settings) + followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT) + followerClient.pauseReplication(followerIndexName) + val statusResp = followerClient.replicationStatus(followerIndexName) + `validate paused status response`(statusResp) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStatusIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStatusIT.kt index 4b4e3762..46a4c2f7 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStatusIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStatusIT.kt @@ -38,15 +38,11 @@ class ReplicationStatusIT: MultiClusterRestTestCase() { createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(indexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", indexName, indexName), waitForRestore = true) - assertBusy({ - var statusResp = followerClient.replicationStatus(indexName) - `validate status syncing response`(statusResp) - }, 30, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(indexName) - } + followerClient.startReplication(StartReplicationRequest("source", indexName, indexName), waitForRestore = true) + assertBusy({ + var statusResp = followerClient.replicationStatus(indexName) + `validate status syncing response`(statusResp) + }, 30, TimeUnit.SECONDS) } fun `test replication status without valid params`() { diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt index 99ae2f79..dfc062aa 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ResumeReplicationIT.kt @@ -59,26 +59,19 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { fun `test pause and resume replication in following state and empty index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - - /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication - and verify the same - */ - followerClient.pauseReplication(followerIndexName) - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate paused status response`(statusResp) - statusResp = followerClient.replicationStatus(followerIndexName,false) - `validate aggregated paused status response`(statusResp) - followerClient.resumeReplication(followerIndexName) - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate paused status response`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate aggregated paused status response`(statusResp) + followerClient.resumeReplication(followerIndexName) } @@ -86,127 +79,88 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - - assertThatThrownBy { - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate status syncing response`(statusResp) - statusResp = followerClient.replicationStatus(followerIndexName,false) - `validate status syncing aggregated response`(statusResp) - followerClient.resumeReplication(followerIndexName) - statusResp = followerClient.replicationStatus(followerIndexName) - `validate not paused status response`(statusResp) - statusResp = followerClient.replicationStatus(followerIndexName,false) - `validate not paused status aggregated response`(statusResp) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining("Replication on Index ${followerIndexName} is already running") - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + assertThatThrownBy { + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate status syncing aggregated response`(statusResp) + followerClient.resumeReplication(followerIndexName) + statusResp = followerClient.replicationStatus(followerIndexName) + `validate not paused status response`(statusResp) + statusResp = followerClient.replicationStatus(followerIndexName,false) + `validate not paused status aggregated response`(statusResp) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Replication on Index ${followerIndexName} is already running") } fun `test resume without retention lease`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - var createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - - - followerClient.pauseReplication(followerIndexName) - - // If we delete the existing index and recreate the index with same name, retention leases should be lost - val deleteIndexResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(deleteIndexResponse.isAcknowledged).isTrue() - createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() - - assertThatThrownBy { - followerClient.resumeReplication(followerIndexName) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining("Retention lease doesn't exist. Replication can't be resumed for $followerIndexName") - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + followerClient.pauseReplication(followerIndexName) + // If we delete the existing index and recreate the index with same name, retention leases should be lost + val deleteIndexResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(deleteIndexResponse.isAcknowledged).isTrue() + createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + assertThatThrownBy { + followerClient.resumeReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Retention lease doesn't exist. Replication can't be resumed for $followerIndexName") } fun `test pause and resume replication amid leader index close and open`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - - /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication - and verify the same - */ - followerClient.pauseReplication(followerIndexName) - - leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - - followerClient.resumeReplication(followerIndexName) - - //Update mapping post resume assert - val sourceMap : MutableMap = HashMap() - sourceMap["x"] = "y" - val indexResponse = leaderClient.index(IndexRequest(leaderIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT) - assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED) - assertBusy ({ - Assert.assertEquals( - leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName], - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] - ) - }, 60L, TimeUnit.SECONDS) - - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + followerClient.resumeReplication(followerIndexName) + //Update mapping post resume assert + val sourceMap : MutableMap = HashMap() + sourceMap["x"] = "y" + val indexResponse = leaderClient.index(IndexRequest(leaderIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT) + assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED) + assertBusy ({ + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + }, 60, TimeUnit.SECONDS) } fun `test pause and resume replication amid index close`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - - /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication - and verify the same - */ - followerClient.pauseReplication(followerIndexName) - - leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - - assertThatThrownBy { - followerClient.resumeReplication(followerIndexName) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining("closed") - } finally { - try { - followerClient.stopReplication(followerIndexName) - } catch (e: Exception) { - // DO nothing - } - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + assertThatThrownBy { + followerClient.resumeReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("closed") } fun `test that replication fails to resume when custom analyser is not present in follower`() { @@ -217,24 +171,20 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) try { Files.copy(synonyms, synonymPath) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - createConnectionBetweenClusters(FOLLOWER, LEADER) followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) followerClient.pauseReplication(followerIndexName) leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) .build() - try { leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) } catch (e: Exception) { assumeNoException("Ignored test as analyzer setting could not be added", e) } leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - assertThatThrownBy { followerClient.resumeReplication(followerIndexName) }.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception") @@ -242,11 +192,6 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { if (Files.exists(synonymPath)) { Files.delete(synonymPath) } - try { - followerClient.stopReplication(followerIndexName) - } catch (e: Exception) { - // DO nothing - } } } @@ -261,26 +206,21 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) try { Files.copy(synonyms, synonymPath) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - createConnectionBetweenClusters(FOLLOWER, LEADER) - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) followerClient.pauseReplication(followerIndexName) leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); Files.copy(synonyms, followerSynonymPath) val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) .build() - try { leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) } catch (e: Exception) { assumeNoException("Ignored test as analyzer setting could not be added", e) } leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - followerClient.resumeReplication(followerIndexName) var statusResp = followerClient.replicationStatus(followerIndexName) `validate status syncing response`(statusResp) @@ -291,11 +231,6 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { if (Files.exists(followerSynonymPath)) { Files.delete(followerSynonymPath) } - try { - followerClient.stopReplication(followerIndexName) - } catch (e: Exception) { - // DO nothing - } } } @@ -312,7 +247,6 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { try { Files.copy(synonyms, synonymPath) Files.copy(synonyms, followerSynonymPath) - var settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) .build() try { @@ -321,7 +255,6 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { } catch (e: Exception) { assumeNoException("Ignored test as analyzer setting could not be added", e) } - createConnectionBetweenClusters(FOLLOWER, LEADER) val overriddenSettings: Settings = Settings.builder() .put("index.analysis.filter.my_filter.synonyms_path", followerSynonymFilename) @@ -329,7 +262,6 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), waitForRestore = true) followerClient.pauseReplication(followerIndexName) leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - Files.copy(synonyms, newSynonymPath) settings = Settings.builder() .put("index.analysis.filter.my_filter.synonyms_path", "synonyms_new.txt") @@ -340,7 +272,6 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { assumeNoException("Ignored test as analyzer setting could not be added", e) } leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); - followerClient.resumeReplication(followerIndexName) var statusResp = followerClient.replicationStatus(followerIndexName) `validate status syncing response`(statusResp) @@ -354,12 +285,6 @@ class ResumeReplicationIT: MultiClusterRestTestCase() { if (Files.exists(newSynonymPath)) { Files.delete(newSynonymPath) } - try { - followerClient.stopReplication(followerIndexName) - } catch (e: Exception) { - // DO nothing - } } } - } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt index df37573c..fb4e5075 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityBase.kt @@ -243,7 +243,6 @@ abstract class SecurityBase : MultiClusterRestTestCase() { private fun createRoleWithPermissions(indexPattern: String, role: String) { val followerClient = testClusters.get(FOLLOWER) val persistentConnectionRequest = Request("PUT", "_plugins/_security/api/roles/"+role) - val entityAsString = """ { "cluster_permissions": [ diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt index fb69ada0..40ff44ee 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesIT.kt @@ -56,17 +56,14 @@ class SecurityCustomRolesIT: SecurityBase() { val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - followerClient.startReplication(startReplicationRequest, - requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - OpenSearchTestCase.assertBusy { - Assertions.assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - } finally { - followerClient.stopReplication(followerIndexName) + var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) + + followerClient.startReplication(startReplicationRequest, + requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"), waitForRestore = true) + assertBusy { + Assertions.assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } } @@ -116,26 +113,23 @@ class SecurityCustomRolesIT: SecurityBase() { val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - var requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password") - followerClient.startReplication(startReplicationRequest, waitForRestore = true, - requestOptions = requestOptions) - - /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication - and verify the same - */ - followerClient.pauseReplication(followerIndexName, - requestOptions = requestOptions) - // Validate paused replication using Status Api - assertBusy { - `validate aggregated paused status response`(followerClient.replicationStatus(followerIndexName, - requestOptions = requestOptions)) - } - } finally { - followerClient.stopReplication(followerIndexName) + var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) + var requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password") + followerClient.startReplication(startReplicationRequest, waitForRestore = true, + requestOptions = requestOptions) + + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName, + requestOptions = requestOptions) + + // Validate paused replication using Status Api + assertBusy { + `validate aggregated paused status response`(followerClient.replicationStatus(followerIndexName, + requestOptions = requestOptions)) } } @@ -147,22 +141,18 @@ class SecurityCustomRolesIT: SecurityBase() { val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - followerClient.startReplication(startReplicationRequest, waitForRestore = true, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - Assertions.assertThatThrownBy { - followerClient.pauseReplication(followerIndexName, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser2","password")) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining("403 Forbidden") + followerClient.startReplication(startReplicationRequest, waitForRestore = true, + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - } finally { - followerClient.stopReplication(followerIndexName) - } + Assertions.assertThatThrownBy { + followerClient.pauseReplication(followerIndexName, + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser2","password")) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("403 Forbidden") } fun `test for FOLLOWER that STATUS Api works for user with valid permissions`() { @@ -173,19 +163,16 @@ class SecurityCustomRolesIT: SecurityBase() { val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - followerClient.startReplication(startReplicationRequest, waitForRestore = true, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - assertBusy { - `validate status syncing response`(followerClient.replicationStatus(followerIndexName, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))) - } - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(startReplicationRequest, waitForRestore = true, + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + + assertBusy { + `validate status syncing response`(followerClient.replicationStatus(followerIndexName, + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))) } } @@ -197,21 +184,18 @@ class SecurityCustomRolesIT: SecurityBase() { val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - followerClient.startReplication(startReplicationRequest, waitForRestore = true, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - Assertions.assertThatThrownBy { - followerClient.replicationStatus(followerIndexName, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser2","password")) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining("403 Forbidden") - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(startReplicationRequest, waitForRestore = true, + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + + Assertions.assertThatThrownBy { + followerClient.replicationStatus(followerIndexName, + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser2","password")) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("403 Forbidden") } fun `test for FOLLOWER that UPDATE settings works for user with valid permissions`() { @@ -229,41 +213,39 @@ class SecurityCustomRolesIT: SecurityBase() { val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")), + + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")), + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + assertBusy { + Assertions.assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) + + settings = Settings.builder() + .put("index.shard.check_on_startup", "checksum") + .build() + followerClient.updateReplication(followerIndexName, settings, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - assertBusy { - Assertions.assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - val getSettingsRequest = GetSettingsRequest() - getSettingsRequest.indices(followerIndexName) + + // Wait for the settings to get updated at follower cluster. + assertBusy ({ Assert.assertEquals( - "1", + "checksum", followerClient.indices() .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + .indexToSettings[followerIndexName]["index.shard.check_on_startup"] ) - - settings = Settings.builder() - .put("index.shard.check_on_startup", "checksum") - .build() - followerClient.updateReplication(followerIndexName, settings, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - - assertBusy { - Assert.assertEquals( - "checksum", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName]["index.shard.check_on_startup"] - ) - } - } finally { - followerClient.stopReplication(followerIndexName) - } + }, 30L, TimeUnit.SECONDS) } fun `test for FOLLOWER that UPDATE settings is forbidden for user with invalid permissions`() { @@ -272,45 +254,36 @@ class SecurityCustomRolesIT: SecurityBase() { val followerIndexName = "follower-index1" setMetadataSyncDelay() - createConnectionBetweenClusters(FOLLOWER, LEADER) - var settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .build() - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")), - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - assertBusy { - Assertions.assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - val getSettingsRequest = GetSettingsRequest() - getSettingsRequest.indices(followerIndexName) - Assert.assertEquals( - "1", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] - ) - - settings = Settings.builder() - .put("index.shard.check_on_startup", "checksum") - .build() - - Assertions.assertThatThrownBy { - followerClient.updateReplication(followerIndexName, settings, - requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser2","password")) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining("403 Forbidden") - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")), + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"), waitForRestore = true) + assertBusy { + Assertions.assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) } + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) + settings = Settings.builder() + .put("index.shard.check_on_startup", "checksum") + .build() + Assertions.assertThatThrownBy { + followerClient.updateReplication(followerIndexName, settings, + requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser2","password")) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("403 Forbidden") } fun `test for FOLLOWER that AutoFollow works for user with valid permissions`() { @@ -323,12 +296,10 @@ class SecurityCustomRolesIT: SecurityBase() { val leaderIndexName = createRandomIndex(indexPrefix, leaderClient) var leaderIndexNameNew = "" createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - try { followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms"), requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - // Verify that existing index matching the pattern are replicated. assertBusy ({ Assertions.assertThat(followerClient.indices() @@ -336,7 +307,6 @@ class SecurityCustomRolesIT: SecurityBase() { .isEqualTo(true) }, 30, TimeUnit.SECONDS) Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) - leaderIndexNameNew = createRandomIndex(indexPrefix, leaderClient) // Verify that newly created index on leader which match the pattern are also replicated. assertBusy ({ @@ -346,8 +316,6 @@ class SecurityCustomRolesIT: SecurityBase() { }, 60, TimeUnit.SECONDS) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) - followerClient.stopReplication(leaderIndexName, false) - followerClient.stopReplication(leaderIndexNameNew) } } @@ -357,7 +325,6 @@ class SecurityCustomRolesIT: SecurityBase() { val indexPattern = "follower-index1*" val indexPatternName = "test_pattern" createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - Assertions.assertThatThrownBy { followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleNoPerms"), @@ -387,16 +354,13 @@ class SecurityCustomRolesIT: SecurityBase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() try { var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - followerClient.startReplication(startReplicationRequest, waitForRestore = true, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying assertBusy ({ @@ -406,7 +370,6 @@ class SecurityCustomRolesIT: SecurityBase() { Assert.fail("Exception while querying follower cluster. Failing to retry again") } }, 1, TimeUnit.MINUTES) - assertBusy { `validate status syncing response`(followerClient.replicationStatus(followerIndexName, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))) @@ -421,7 +384,6 @@ class SecurityCustomRolesIT: SecurityBase() { }, 100, TimeUnit.SECONDS) } finally { updateRole(followerIndexName,"followerRoleValidPerms", true) - followerClient.stopReplication(followerIndexName) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt index 80d92cab..3eac2bd2 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityCustomRolesLeaderIT.kt @@ -42,13 +42,10 @@ class SecurityCustomRolesLeaderIT: SecurityBase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, useRoles = UseRoles(leaderClusterRole = "leaderRoleNoPerms",followerClusterRole = "followerRoleValidPerms")) - Assertions.assertThatThrownBy { followerClient.startReplication(startReplicationRequest, requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser6","password")) } .isInstanceOf(ResponseException::class.java) @@ -61,16 +58,13 @@ class SecurityCustomRolesLeaderIT: SecurityBase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() try { var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - followerClient.startReplication(startReplicationRequest, waitForRestore = true, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) //Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying assertBusy ({ @@ -80,23 +74,18 @@ class SecurityCustomRolesLeaderIT: SecurityBase() { Assert.fail("Exception while querying follower cluster. Failing to retry again") } }, 1, TimeUnit.MINUTES) - assertBusy { `validate status syncing response`(followerClient.replicationStatus(followerIndexName, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))) } - updateRole(followerIndexName,"leaderRoleValidPerms", false) insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName) - assertBusy ({ validatePausedState(followerClient.replicationStatus(followerIndexName, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))) }, 100, TimeUnit.SECONDS) - } finally { updateRole(followerIndexName,"leaderRoleValidPerms", true) - followerClient.stopReplication(followerIndexName) } } @@ -105,17 +94,14 @@ class SecurityCustomRolesLeaderIT: SecurityBase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() try { var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - updateFileChunkPermissions("","leaderRoleValidPerms", false) followerClient.startReplication(startReplicationRequest, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - assertBusy ({ validateNotInProgressState(followerClient.replicationStatus(followerIndexName, requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))) @@ -125,7 +111,6 @@ class SecurityCustomRolesLeaderIT: SecurityBase() { Assert.assertNull(ex) } finally { updateFileChunkPermissions("","leaderRoleValidPerms", true) - followerClient.stopReplication(followerIndexName) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt index d73625c1..3080c06f 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/SecurityDlsFlsIT.kt @@ -45,13 +45,10 @@ class SecurityDlsFlsIT: SecurityBase() { val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerDlsRole")) - Assertions.assertThatThrownBy { followerClient.startReplication(startReplicationRequest, requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) } .isInstanceOf(ResponseException::class.java) @@ -61,7 +58,6 @@ class SecurityDlsFlsIT: SecurityBase() { fun `test for FOLLOWER that STOP replication is forbidden for user with DLS or FLS enabled`() { val followerClient = getClientForCluster(FOLLOWER) - Assertions.assertThatThrownBy { followerClient.stopReplication("follower-index1", requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) @@ -75,25 +71,18 @@ class SecurityDlsFlsIT: SecurityBase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - - followerClient.startReplication(startReplicationRequest, waitForRestore = true, - requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - - Assertions.assertThatThrownBy { - followerClient.pauseReplication(followerIndexName, - requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining(DLS_FLS_EXCEPTION_MESSAGE) - .hasMessageContaining("403 Forbidden") - } finally { - followerClient.stopReplication(followerIndexName) - } + var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) + followerClient.startReplication(startReplicationRequest, waitForRestore = true, + requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + Assertions.assertThatThrownBy { + followerClient.pauseReplication(followerIndexName, + requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining(DLS_FLS_EXCEPTION_MESSAGE) + .hasMessageContaining("403 Forbidden") } fun `test for FOLLOWER that STATUS Api is forbidden for user with DLS or FLS enabled`() { @@ -101,25 +90,18 @@ class SecurityDlsFlsIT: SecurityBase() { val leaderClient = getClientForCluster(LEADER) val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) - - followerClient.startReplication(startReplicationRequest, waitForRestore = true, - requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - - Assertions.assertThatThrownBy { - followerClient.replicationStatus(followerIndexName, - requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining(DLS_FLS_EXCEPTION_MESSAGE) - .hasMessageContaining("403 Forbidden") - } finally { - followerClient.stopReplication(followerIndexName) - } + var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")) + followerClient.startReplication(startReplicationRequest, waitForRestore = true, + requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) + Assertions.assertThatThrownBy { + followerClient.replicationStatus(followerIndexName, + requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining(DLS_FLS_EXCEPTION_MESSAGE) + .hasMessageContaining("403 Forbidden") } fun `test for FOLLOWER that UPDATE settings is forbidden for user with DLS or FLS enabled`() { @@ -128,46 +110,37 @@ class SecurityDlsFlsIT: SecurityBase() { val followerIndexName = "follower-index1" setMetadataSyncDelay() - createConnectionBetweenClusters(FOLLOWER, LEADER) - var settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .build() - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, - useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")), - requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password")) - assertBusy { - Assertions.assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - val getSettingsRequest = GetSettingsRequest() - getSettingsRequest.indices(followerIndexName) - Assert.assertEquals( - "1", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] - ) - - settings = Settings.builder() - .put("index.shard.check_on_startup", "checksum") - .build() - - Assertions.assertThatThrownBy { - followerClient.updateReplication(followerIndexName, settings, - requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) - }.isInstanceOf(ResponseException::class.java) - .hasMessageContaining(DLS_FLS_EXCEPTION_MESSAGE) - .hasMessageContaining("403 Forbidden") - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, + useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerRoleValidPerms")), + requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"), waitForRestore = true) + assertBusy { + Assertions.assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) } + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) + settings = Settings.builder() + .put("index.shard.check_on_startup", "checksum") + .build() + Assertions.assertThatThrownBy { + followerClient.updateReplication(followerIndexName, settings, + requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser3","password")) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining(DLS_FLS_EXCEPTION_MESSAGE) + .hasMessageContaining("403 Forbidden") } fun `test for FOLLOWER that START replication is forbidden for user with FLS enabled`() { @@ -176,13 +149,10 @@ class SecurityDlsFlsIT: SecurityBase() { val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerFlsRole")) - Assertions.assertThatThrownBy { followerClient.startReplication(startReplicationRequest, requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser4","password")) } .isInstanceOf(ResponseException::class.java) @@ -196,13 +166,10 @@ class SecurityDlsFlsIT: SecurityBase() { val followerIndexName = "follower-index1" createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() - var startReplicationRequest = StartReplicationRequest("source",leaderIndexName,followerIndexName, useRoles = UseRoles(leaderClusterRole = "leaderRoleValidPerms",followerClusterRole = "followerFieldMaskRole")) - Assertions.assertThatThrownBy { followerClient.startReplication(startReplicationRequest, requestOptions= RequestOptions.DEFAULT.addBasicAuthHeader("testUser5","password")) } .isInstanceOf(ResponseException::class.java) diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index e47cee5c..f005b8a2 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -72,6 +72,7 @@ import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion. import java.lang.Thread.sleep import org.opensearch.replication.updateReplicationStartBlockSetting import java.nio.file.Files +import java.util.* import java.util.concurrent.TimeUnit @@ -95,52 +96,39 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test start replication in following state and empty index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } } fun `test start replication with settings`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() val settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3) .build() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, settings = settings)) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - - val getSettingsRequest = GetSettingsRequest() - getSettingsRequest.indices(followerIndexName) - getSettingsRequest.includeDefaults(true) - assertBusy ({ - Assert.assertEquals( - "3", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] - ) - }, 15, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, settings = settings), waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + assertBusy ({ + Assert.assertEquals( + "3", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) + }, 15, TimeUnit.SECONDS) } @@ -150,22 +138,18 @@ class StartReplicationIT: MultiClusterRestTestCase() { createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close")) - assertBusy ({ - try { - assertThat(followerClient.replicationStatus(followerIndexName)).containsKey("status") - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate paused status on closed index`(statusResp) - } catch (e : Exception) { - Assert.fail() - } - },30, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) + leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close")) + assertBusy ({ + try { + assertThat(followerClient.replicationStatus(followerIndexName)).containsKey("status") + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate paused status on closed index`(statusResp) + } catch (e : Exception) { + Assert.fail() + } + },30, TimeUnit.SECONDS) } @@ -175,24 +159,18 @@ class StartReplicationIT: MultiClusterRestTestCase() { createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - followerClient.pauseReplication(followerIndexName) - leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close")) - leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_open")) - followerClient.resumeReplication(followerIndexName) - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate not paused status response`(statusResp) - - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + followerClient.pauseReplication(followerIndexName) + leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close")) + leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_open")) + followerClient.resumeReplication(followerIndexName) + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate not paused status response`(statusResp) } fun `test start replication fails when replication has already been started for the same index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) @@ -207,9 +185,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test start replication fails when remote cluster alias does not exist`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() assertThatThrownBy { @@ -221,9 +197,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test start replication fails when index does not exist`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() assertThatThrownBy { @@ -235,9 +209,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test start replication fails when the follower cluster is write blocked or metadata blocked`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() addClusterMetadataBlock(FOLLOWER, "true") @@ -252,118 +224,99 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test that follower index has same mapping as leader index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + // test that new mapping created on leader is also propagated to follower + val putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}", XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName] + assertBusy({ Assert.assertEquals( - leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName], - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] + leaderMappings, + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] ) - // test that new mapping created on leader is also propagated to follower - val putMappingRequest = PutMappingRequest(leaderIndexName) - putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}", XContentType.JSON) - leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) - val sourceMap = mapOf("name" to randomAlphaOfLength(5)) - leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) - val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName] - assertBusy { - Assert.assertEquals( - leaderMappings, - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] - ) - } - - } finally { - followerClient.stopReplication(followerIndexName) - } + }, 30L, TimeUnit.SECONDS) } fun `test that index settings are getting replicated`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build() - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - val getSettingsRequest = GetSettingsRequest() - getSettingsRequest.indices(followerIndexName) - getSettingsRequest.names(IndexMetadata.SETTING_NUMBER_OF_REPLICAS) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.names(IndexMetadata.SETTING_NUMBER_OF_REPLICAS) + assertBusy({ Assert.assertEquals( - "0", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + "0", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] ) - } finally { - followerClient.stopReplication(followerIndexName) - } + }, 30L, TimeUnit.SECONDS) } fun `test that aliases settings are getting replicated`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) .alias(Alias("leaderAlias").filter("{\"term\":{\"year\":2016}}").routing("1")) , RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - assertBusy({ - Assert.assertEquals( - leaderClient.indices().getAlias(GetAliasesRequest().indices(leaderIndexName), - RequestOptions.DEFAULT).aliases[leaderIndexName], - followerClient.indices().getAlias(GetAliasesRequest().indices(followerIndexName), - RequestOptions.DEFAULT).aliases[followerIndexName] - ) - - }, 30L, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) } + assertBusy({ + Assert.assertEquals( + leaderClient.indices().getAlias(GetAliasesRequest().indices(leaderIndexName), + RequestOptions.DEFAULT).aliases[leaderIndexName], + followerClient.indices().getAlias(GetAliasesRequest().indices(followerIndexName), + RequestOptions.DEFAULT).aliases[followerIndexName] + ) + + }, 30L, TimeUnit.SECONDS) } fun `test that replication cannot be started on leader alias directly`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER, "source") - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).alias(Alias("leader_alias")), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { followerClient.startReplication(StartReplicationRequest("source", "leader_alias", followerIndexName)) fail("Expected startReplication to fail") @@ -377,87 +330,67 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test that translog settings are set on leader and not on follower`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - assertThat(followerClient.indices() - .getSettings(GetSettingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .getSetting(followerIndexName, - IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) - .isNullOrEmpty()) - } - - assertThat(leaderClient.indices() - .getSettings(GetSettingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .getSetting(leaderIndexName, - IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) == "true") - - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + assertThat(followerClient.indices() + .getSettings(GetSettingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .getSetting(followerIndexName, + IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) + .isNullOrEmpty()) } + assertThat(leaderClient.indices() + .getSettings(GetSettingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .getSetting(leaderIndexName, + IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) == "true") } fun `test that translog settings are set on leader`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - - val leaderSettings = leaderClient.indices() - .getSettings(GetSettingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - assertThat(leaderSettings.getSetting(leaderIndexName, - IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) == "true") - assertThat(leaderSettings.getSetting(leaderIndexName, - IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.key) == "32mb") - - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + val leaderSettings = leaderClient.indices() + .getSettings(GetSettingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + assertThat(leaderSettings.getSetting(leaderIndexName, + IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) == "true") + assertThat(leaderSettings.getSetting(leaderIndexName, + IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.key) == "32mb") } fun `test that replication continues after removing translog settings based on retention lease`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), - waitForRestore = true) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - // Turn-off the settings and index doc - val settingsBuilder = Settings.builder() - .put(IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false) - val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName) - .settings(settingsBuilder.build()), RequestOptions.DEFAULT) - Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true) - val sourceMap = mapOf("name" to randomAlphaOfLength(5)) - leaderClient.index(IndexRequest(leaderIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT) - assertBusy { - followerClient.get(GetRequest(followerIndexName).id("2"), RequestOptions.DEFAULT).isExists - } - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) } + // Turn-off the settings and index doc + val settingsBuilder = Settings.builder() + .put( IndexSettings.INDEX_PLUGINS_REPLICATION_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false) + val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName) + .settings(settingsBuilder.build()), RequestOptions.DEFAULT) + Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true) + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT) + assertBusy({ + followerClient.get(GetRequest(followerIndexName).id("2"), RequestOptions.DEFAULT).isExists + }, 30L, TimeUnit.SECONDS) } private fun addClusterMetadataBlock(clusterName: String, blockValue: String) { @@ -482,194 +415,163 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test that dynamic index settings and alias are getting replicated `() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - setMetadataSyncDelay() - createConnectionBetweenClusters(FOLLOWER, LEADER) - var settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build() - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - - settings = Settings.builder() - .build() - - followerClient.updateReplication( followerIndexName, settings) - - val getSettingsRequest = GetSettingsRequest() - getSettingsRequest.indices(followerIndexName) - getSettingsRequest.includeDefaults(true) - Assert.assertEquals( - "0", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] - ) - - settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) - .put("routing.allocation.enable", "none") - .build() - - leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) - - var indicesAliasesRequest = IndicesAliasesRequest() - var aliasAction = IndicesAliasesRequest.AliasActions.add() - .index(leaderIndexName) - .alias("alias1").filter("{\"term\":{\"year\":2016}}").routing("1") - indicesAliasesRequest.addAliasAction(aliasAction) - leaderClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT) - - TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) - getSettingsRequest.indices(followerIndexName) - // Leader setting is copied + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + settings = Settings.builder() + .build() + followerClient.updateReplication( followerIndexName, settings) + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + getSettingsRequest.includeDefaults(true) + Assert.assertEquals( + "0", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) + settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put("routing.allocation.enable", "none") + .build() + leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) + var indicesAliasesRequest = IndicesAliasesRequest() + var aliasAction = IndicesAliasesRequest.AliasActions.add() + .index(leaderIndexName) + .alias("alias1").filter("{\"term\":{\"year\":2016}}").routing("1") + indicesAliasesRequest.addAliasAction(aliasAction) + leaderClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT) + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + getSettingsRequest.indices(followerIndexName) + // Leader setting is copied + assertBusy({ Assert.assertEquals( - "2", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + "2", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] ) assertEqualAliases() - - // Case 2 : Blocklisted setting are not copied - Assert.assertNull(followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName].get("index.routing.allocation.enable")) - - //Alias test case 2: Update existing alias - aliasAction = IndicesAliasesRequest.AliasActions.add() - .index(leaderIndexName) - .routing("2") - .alias("alias1") - .writeIndex(true) - .isHidden(false) - indicesAliasesRequest.addAliasAction(aliasAction) - leaderClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT) - - //Use Update API - settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3) - .put("index.routing.allocation.enable", "none") - .put("index.search.idle.after", "10s") - .build() - - followerClient.updateReplication( followerIndexName, settings) - TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) - - // Case 3 : Updated Settings take higher priority. Blocklisted settins shouldn't matter for that + }, 30L, TimeUnit.SECONDS) + // Case 2 : Blocklisted setting are not copied + Assert.assertNull(followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName].get("index.routing.allocation.enable")) + //Alias test case 2: Update existing alias + aliasAction = IndicesAliasesRequest.AliasActions.add() + .index(leaderIndexName) + .routing("2") + .alias("alias1") + .writeIndex(true) + .isHidden(false) + indicesAliasesRequest.addAliasAction(aliasAction) + leaderClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT) + //Use Update API + settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3) + .put("index.routing.allocation.enable", "none") + .put("index.search.idle.after", "10s") + .build() + followerClient.updateReplication( followerIndexName, settings) + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + // Case 3 : Updated Settings take higher priority. Blocklisted settins shouldn't matter for that + assertBusy({ Assert.assertEquals( - "3", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + "3", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] ) - Assert.assertEquals( - "10s", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName]["index.search.idle.after"] + "10s", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName]["index.search.idle.after"] ) - Assert.assertEquals( - "none", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName]["index.routing.allocation.enable"] + "none", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName]["index.routing.allocation.enable"] ) - assertEqualAliases() - - //Clear the settings - settings = Settings.builder() - .build() - followerClient.updateReplication( followerIndexName, settings) - - //Alias test case 3: Delete one alias and add another alias - aliasAction = IndicesAliasesRequest.AliasActions.remove() - .index(leaderIndexName) - .alias("alias1") - indicesAliasesRequest.addAliasAction(aliasAction - ) - leaderClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT) - var aliasAction2 = IndicesAliasesRequest.AliasActions.add() - .index(leaderIndexName) - .routing("12") - .alias("alias2") - .indexRouting("indexRouting") - indicesAliasesRequest.addAliasAction(aliasAction2) - - TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) - + }, 30L, TimeUnit.SECONDS) + //Clear the settings + settings = Settings.builder() + .build() + followerClient.updateReplication( followerIndexName, settings) + //Alias test case 3: Delete one alias and add another alias + aliasAction = IndicesAliasesRequest.AliasActions.remove() + .index(leaderIndexName) + .alias("alias1") + indicesAliasesRequest.addAliasAction(aliasAction + ) + leaderClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT) + var aliasAction2 = IndicesAliasesRequest.AliasActions.add() + .index(leaderIndexName) + .routing("12") + .alias("alias2") + .indexRouting("indexRouting") + indicesAliasesRequest.addAliasAction(aliasAction2) + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + assertBusy({ Assert.assertEquals( - null, - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName]["index.search.idle.after"] + null, + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName]["index.search.idle.after"] ) - assertEqualAliases() - } finally { - followerClient.stopReplication(followerIndexName) - } - + }, 30L, TimeUnit.SECONDS) } fun `test that static index settings are getting replicated `() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - setMetadataSyncDelay() - createConnectionBetweenClusters(FOLLOWER, LEADER) - var settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .build() - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - val getSettingsRequest = GetSettingsRequest() - getSettingsRequest.indices(followerIndexName) - Assert.assertEquals( - "1", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] - ) - - settings = Settings.builder() - .put("index.shard.check_on_startup", "checksum") - .build() - followerClient.updateReplication(followerIndexName, settings) - - TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) - Assert.assertEquals( - "checksum", - followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName]["index.shard.check_on_startup"] - ) - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) } + val getSettingsRequest = GetSettingsRequest() + getSettingsRequest.indices(followerIndexName) + Assert.assertEquals( + "1", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + ) + settings = Settings.builder() + .put("index.shard.check_on_startup", "checksum") + .build() + followerClient.updateReplication(followerIndexName, settings) + TimeUnit.SECONDS.sleep(SLEEP_TIME_BETWEEN_SYNC) + Assert.assertEquals( + "checksum", + followerClient.indices() + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName]["index.shard.check_on_startup"] + ) } fun `test that replication fails to start when custom analyser is not present in follower`() { @@ -682,7 +584,6 @@ class StartReplicationIT: MultiClusterRestTestCase() { val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .build() - val leaderClient = getClientForCluster(LEADER) val followerClient = getClientForCluster(FOLLOWER) try { @@ -692,9 +593,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { } catch (e: Exception) { assumeNoException("Ignored test as analyzer setting could not be added", e) } - createConnectionBetweenClusters(FOLLOWER, LEADER) - assertThatThrownBy { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) }.isInstanceOf(ResponseException::class.java).hasMessageContaining("resource_not_found_exception") @@ -714,11 +613,9 @@ class StartReplicationIT: MultiClusterRestTestCase() { try { Files.copy(synonyms, leaderSynonymPath) Files.copy(synonyms, followerSynonymPath) - val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .build() - val leaderClient = getClientForCluster(LEADER) val followerClient = getClientForCluster(FOLLOWER) try { @@ -728,16 +625,11 @@ class StartReplicationIT: MultiClusterRestTestCase() { } catch (e: Exception) { assumeNoException("Ignored test as analyzer setting could not be added", e) } - createConnectionBetweenClusters(FOLLOWER, LEADER) - - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } } finally { if (Files.exists(leaderSynonymPath)) { @@ -759,11 +651,9 @@ class StartReplicationIT: MultiClusterRestTestCase() { try { Files.copy(synonyms, leaderSynonymPath) Files.copy(synonyms, followerSynonymPath) - val settings: Settings = Settings.builder().loadFromStream(synonymsJson, javaClass.getResourceAsStream(synonymsJson), false) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .build() - val leaderClient = getClientForCluster(LEADER) val followerClient = getClientForCluster(FOLLOWER) try { @@ -773,19 +663,14 @@ class StartReplicationIT: MultiClusterRestTestCase() { } catch (e: Exception) { assumeNoException("Ignored test as analyzer setting could not be added", e) } - createConnectionBetweenClusters(FOLLOWER, LEADER) - - try { - val overriddenSettings: Settings = Settings.builder() - .put("index.analysis.filter.my_filter.synonyms_path", synonymFollowerFilename) - .build() - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings)) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - } finally { - followerClient.stopReplication(followerIndexName) + val overriddenSettings: Settings = Settings.builder() + .put("index.analysis.filter.my_filter.synonyms_path", synonymFollowerFilename) + .build() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName, overriddenSettings), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } } finally { if (Files.exists(leaderSynonymPath)) { @@ -800,71 +685,58 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test that follower index cannot be deleted after starting replication`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - // Need to wait till index blocks appear into state - assertBusy { - val clusterBlocksResponse = followerClient.lowLevelClient.performRequest(Request("GET", "/_cluster/state/blocks")) - val clusterResponseString = EntityUtils.toString(clusterBlocksResponse.entity) - assertThat(clusterResponseString.contains("cross-cluster-replication")) - .withFailMessage("Cant find replication block afer starting replication") - .isTrue() - } - // Delete index - assertThatThrownBy { - followerClient.indices().delete(DeleteIndexRequest(followerIndexName), RequestOptions.DEFAULT) - }.isInstanceOf(OpenSearchStatusException::class.java).hasMessageContaining("cluster_block_exception") - // Close index - assertThatThrownBy { - followerClient.indices().close(CloseIndexRequest(followerIndexName), RequestOptions.DEFAULT) - }.isInstanceOf(OpenSearchStatusException::class.java).hasMessageContaining("cluster_block_exception") - // Index document - assertThatThrownBy { - val sourceMap = mapOf("name" to randomAlphaOfLength(5)) - followerClient.index(IndexRequest(followerIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) - }.isInstanceOf(OpenSearchStatusException::class.java).hasMessageContaining("cluster_block_exception") - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) + } + // Need to wait till index blocks appear into state + assertBusy { + val clusterBlocksResponse = followerClient.lowLevelClient.performRequest(Request("GET", "/_cluster/state/blocks")) + val clusterResponseString = EntityUtils.toString(clusterBlocksResponse.entity) + assertThat(clusterResponseString.contains("cross-cluster-replication")) + .withFailMessage("Cant find replication block afer starting replication") + .isTrue() } + // Delete index + assertThatThrownBy { + followerClient.indices().delete(DeleteIndexRequest(followerIndexName), RequestOptions.DEFAULT) + }.isInstanceOf(OpenSearchStatusException::class.java).hasMessageContaining("cluster_block_exception") + // Close index + assertThatThrownBy { + followerClient.indices().close(CloseIndexRequest(followerIndexName), RequestOptions.DEFAULT) + }.isInstanceOf(OpenSearchStatusException::class.java).hasMessageContaining("cluster_block_exception") + // Index document + assertThatThrownBy { + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + followerClient.index(IndexRequest(followerIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + }.isInstanceOf(OpenSearchStatusException::class.java).hasMessageContaining("cluster_block_exception") } fun `test that replication gets paused if the leader index is deleted`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) - } - assertBusy { - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate status syncing response`(statusResp) - } - val deleteIndexResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName), RequestOptions.DEFAULT) - assertThat(deleteIndexResponse.isAcknowledged).isTrue() - - assertBusy({ - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate paused status response due to leader index deleted`(statusResp) - }, 15, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) } + assertBusy { + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) + } + val deleteIndexResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(deleteIndexResponse.isAcknowledged).isTrue() + assertBusy({ + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate paused status response due to leader index deleted`(statusResp) + }, 15, TimeUnit.SECONDS) } fun `test forcemerge on leader during replication bootstrap`() { @@ -876,7 +748,6 @@ class StartReplicationIT: MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() @@ -886,28 +757,22 @@ class StartReplicationIT: MultiClusterRestTestCase() { assertThat(leaderClient.indices() .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) } - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - false) - //Given the size of index, the replication should be in RESTORING phase at this point - leaderClient.indices().forcemerge(ForceMergeRequest(leaderIndexName), RequestOptions.DEFAULT) - - assertBusy { - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate status syncing response`(statusResp) - } - TimeUnit.SECONDS.sleep(30) - - assertBusy ({ - Assert.assertEquals(leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(), - followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString()) - }, - 30, TimeUnit.SECONDS - ) - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + false) + //Given the size of index, the replication should be in RESTORING phase at this point + leaderClient.indices().forcemerge(ForceMergeRequest(leaderIndexName), RequestOptions.DEFAULT) + assertBusy { + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) } + TimeUnit.SECONDS.sleep(30) + assertBusy ({ + Assert.assertEquals(leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(), + followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString()) + }, + 30, TimeUnit.SECONDS + ) } fun `test that snapshot on leader does not affect replication during bootstrap`() { @@ -919,15 +784,11 @@ class StartReplicationIT: MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER) - val repoPath = PathUtils.get(buildDir, repoPath) - val putRepositoryRequest = PutRepositoryRequest("my-repo") .type(FsRepository.TYPE) .settings("{\"location\": \"$repoPath\"}", XContentType.JSON) - leaderClient.snapshot().createRepository(putRepositoryRequest, RequestOptions.DEFAULT) - val createIndexResponse = leaderClient.indices().create( CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT @@ -941,30 +802,25 @@ class StartReplicationIT: MultiClusterRestTestCase() { .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT) ) } - try { - followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - false - ) - //Given the size of index, the replication should be in RESTORING phase at this point - leaderClient.snapshot().create(CreateSnapshotRequest("my-repo", "snapshot_1").indices(leaderIndexName), RequestOptions.DEFAULT) - - assertBusy({ - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate status syncing response`(statusResp) - }, 30, TimeUnit.SECONDS - ) - assertBusy({ - Assert.assertEquals( - leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(), - followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString() - )}, - 30, TimeUnit.SECONDS - ) - } finally { - followerClient.stopReplication(followerIndexName) - } + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + false + ) + //Given the size of index, the replication should be in RESTORING phase at this point + leaderClient.snapshot().create(CreateSnapshotRequest("my-repo", "snapshot_1").indices(leaderIndexName), RequestOptions.DEFAULT) + assertBusy({ + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) + }, 30, TimeUnit.SECONDS + ) + assertBusy({ + Assert.assertEquals( + leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(), + followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString() + )}, + 30, TimeUnit.SECONDS + ) } fun `test that replication cannot be started when soft delete is disabled`() { @@ -989,64 +845,48 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test leader stats`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) .build() - val createIndexResponse = leaderClient.indices().create( CreateIndexRequest(leaderIndexName).settings(settings), RequestOptions.DEFAULT ) assertThat(createIndexResponse.isAcknowledged).isTrue() - - try { - followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - true - ) - - val docCount = 50 - - for (i in 1..docCount) { - val sourceMap = mapOf("name" to randomAlphaOfLength(5)) - leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) - } - - assertBusy { - val stats = leaderClient.leaderStats() - assertThat(stats.size).isEqualTo(9) - assertThat(stats.getValue("num_replicated_indices").toString()).isEqualTo("1") - assertThat(stats.getValue("operations_read").toString()).isEqualTo(docCount.toString()) - assertThat(stats.getValue("operations_read_lucene").toString()).isEqualTo("0") - assertThat(stats.getValue("operations_read_translog").toString()).isEqualTo(docCount.toString()) - assertThat(stats.containsKey("index_stats")) - } - - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true + ) + val docCount = 50 + for (i in 1..docCount) { + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) } + // Have to wait until the new operations are available to read at the leader cluster + assertBusy({ + val stats = leaderClient.leaderStats() + assertThat(stats.size).isEqualTo(9) + assertThat(stats.getValue("num_replicated_indices").toString()).isEqualTo("1") + assertThat(stats.getValue("operations_read").toString()).isEqualTo(docCount.toString()) + assertThat(stats.getValue("operations_read_lucene").toString()).isEqualTo("0") + assertThat(stats.getValue("operations_read_translog").toString()).isEqualTo(docCount.toString()) + assertThat(stats.containsKey("index_stats")) + }, 60L, TimeUnit.SECONDS) } fun `test follower stats`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - val followerIndex2 = "follower_index_2" val followerIndex3 = "follower_index_3" - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create( CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT ) assertThat(createIndexResponse.isAcknowledged).isTrue() - - try { followerClient.startReplication( StartReplicationRequest("source", leaderIndexName, followerIndexName), TimeValue.timeValueSeconds(10), @@ -1067,7 +907,6 @@ class StartReplicationIT: MultiClusterRestTestCase() { val sourceMap = mapOf("name" to randomAlphaOfLength(5)) leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) } - followerClient.pauseReplication(followerIndex2) followerClient.stopReplication(followerIndex3) assertBusy({ @@ -1084,26 +923,18 @@ class StartReplicationIT: MultiClusterRestTestCase() { assertThat(stats.containsKey("index_stats")) assertThat(stats.size).isEqualTo(16) }, 60L, TimeUnit.SECONDS) - } finally { - followerClient.stopReplication(followerIndexName) - followerClient.stopReplication(followerIndex2) - } } fun `test that replication cannot be started on invalid indexName`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).alias(Alias("leaderAlias")), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - assertValidationFailure(followerClient, "leaderIndex", followerIndexName, "Value leaderIndex must be lowercase") assertValidationFailure(followerClient, "leaderindex", "followerIndex", "Value followerIndex must be lowercase") - assertValidationFailure(followerClient, "test*", followerIndexName, "Value test* must not contain the following characters") assertValidationFailure(followerClient, "test#", followerIndexName, @@ -1114,10 +945,8 @@ class StartReplicationIT: MultiClusterRestTestCase() { "Value . must not be '.' or '..'") assertValidationFailure(followerClient, "..", followerIndexName, "Value .. must not be '.' or '..'") - assertValidationFailure(followerClient, "_leader", followerIndexName, "Value _leader must not start with '_' or '-' or '+'") - assertValidationFailure(followerClient, "-leader", followerIndexName, "Value -leader must not start with '_' or '-' or '+'") assertValidationFailure(followerClient, "+leader", followerIndexName, @@ -1131,31 +960,22 @@ class StartReplicationIT: MultiClusterRestTestCase() { fun `test that replication is not started when start block is set`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create( CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT ) assertThat(createIndexResponse.isAcknowledged).isTrue() - // Setting to add replication start block followerClient.updateReplicationStartBlockSetting(true) - assertThatThrownBy { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) } .isInstanceOf(ResponseException::class.java) .hasMessageContaining("[FORBIDDEN] Replication START block is set") - // Remove replication start block and start replication followerClient.updateReplicationStartBlockSetting(false) - - try { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) - } finally { - followerClient.stopReplication(followerIndexName) - } } private fun assertValidationFailure(client: RestHighLevelClient, leader: String, follower: String, errrorMsg: String) { @@ -1171,10 +991,8 @@ class StartReplicationIT: MultiClusterRestTestCase() { var getAliasesRequest = GetAliasesRequest().indices(followerIndexName) var aliasRespone = followerClient.indices().getAlias(getAliasesRequest, RequestOptions.DEFAULT) var followerAliases = aliasRespone.aliases.get(followerIndexName) - aliasRespone = leaderClient.indices().getAlias(GetAliasesRequest().indices(leaderIndexName), RequestOptions.DEFAULT) var leaderAliases = aliasRespone.aliases.get(leaderIndexName) - Assert.assertEquals(followerAliases, leaderAliases) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt index 09b797ae..abfe247c 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt @@ -229,7 +229,6 @@ class StopReplicationIT: MultiClusterRestTestCase() { .withFailMessage("Cant find replication block after starting replication") .isTrue() }, 10, TimeUnit.SECONDS) - // Remove leader cluster from settings val settings: Settings = Settings.builder() .putNull("cluster.remote.source.seeds") @@ -237,7 +236,6 @@ class StopReplicationIT: MultiClusterRestTestCase() { val updateSettingsRequest = ClusterUpdateSettingsRequest() updateSettingsRequest.persistentSettings(settings) followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT) - followerClient.stopReplication(followerIndexName) val sourceMap = mapOf("name" to randomAlphaOfLength(5)) followerClient.index(IndexRequest(followerIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT) @@ -247,71 +245,54 @@ class StopReplicationIT: MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER, "source") - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() val snapshotSuffix = Random().nextInt(1000).toString() - - try { - followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - true - ) - - assertBusy({ - var statusResp = followerClient.replicationStatus(followerIndexName) - `validate status syncing response`(statusResp) - assertThat(followerClient.getShardReplicationTasks(followerIndexName)).isNotEmpty() - }, 60, TimeUnit.SECONDS) - - // Trigger snapshot on the follower cluster - val createSnapshotRequest = CreateSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix") - createSnapshotRequest.waitForCompletion(true) - followerClient.snapshot().create(createSnapshotRequest, RequestOptions.DEFAULT) - - assertBusy { - var snapshotStatusResponse = followerClient.snapshot().status(SnapshotsStatusRequest(TestCluster.FS_SNAPSHOT_REPO, - arrayOf("test-$snapshotSuffix")), RequestOptions.DEFAULT) - for (snapshotStatus in snapshotStatusResponse.snapshots) { - Assert.assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatus.state) - } - } - - // Restore follower index on leader cluster - val restoreSnapshotRequest = RestoreSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix") - restoreSnapshotRequest.indices(followerIndexName) - restoreSnapshotRequest.waitForCompletion(true) - restoreSnapshotRequest.renamePattern("(.+)") - restoreSnapshotRequest.renameReplacement("restored-\$1") - leaderClient.snapshot().restore(restoreSnapshotRequest, RequestOptions.DEFAULT) - - assertBusy { - assertThat(leaderClient.indices().exists(GetIndexRequest("restored-$followerIndexName"), RequestOptions.DEFAULT)).isEqualTo(true) + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true + ) + assertBusy({ + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) + assertThat(followerClient.getShardReplicationTasks(followerIndexName)).isNotEmpty() + }, 60, TimeUnit.SECONDS) + // Trigger snapshot on the follower cluster + val createSnapshotRequest = CreateSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix") + createSnapshotRequest.waitForCompletion(true) + followerClient.snapshot().create(createSnapshotRequest, RequestOptions.DEFAULT) + assertBusy { + var snapshotStatusResponse = followerClient.snapshot().status(SnapshotsStatusRequest(TestCluster.FS_SNAPSHOT_REPO, + arrayOf("test-$snapshotSuffix")), RequestOptions.DEFAULT) + for (snapshotStatus in snapshotStatusResponse.snapshots) { + Assert.assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatus.state) } - - // Invoke stop on the new leader cluster index - assertThatThrownBy { leaderClient.stopReplication("restored-$followerIndexName") } - .isInstanceOf(ResponseException::class.java) - .hasMessageContaining("Metadata for restored-$followerIndexName doesn't exist") - - // Start replication on the new leader index - followerClient.startReplication( - StartReplicationRequest("source", "restored-$followerIndexName", "restored-$followerIndexName"), - TimeValue.timeValueSeconds(10), - true, true - ) - - assertBusy({ - var statusResp = followerClient.replicationStatus("restored-$followerIndexName") - `validate status syncing response`(statusResp) - assertThat(followerClient.getShardReplicationTasks("restored-$followerIndexName")).isNotEmpty() - }, 60, TimeUnit.SECONDS) - - } finally { - followerClient.stopReplication("restored-$followerIndexName") - followerClient.stopReplication(followerIndexName) } - + // Restore follower index on leader cluster + val restoreSnapshotRequest = RestoreSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix") + restoreSnapshotRequest.indices(followerIndexName) + restoreSnapshotRequest.waitForCompletion(true) + restoreSnapshotRequest.renamePattern("(.+)") + restoreSnapshotRequest.renameReplacement("restored-\$1") + leaderClient.snapshot().restore(restoreSnapshotRequest, RequestOptions.DEFAULT) + assertBusy { + assertThat(leaderClient.indices().exists(GetIndexRequest("restored-$followerIndexName"), RequestOptions.DEFAULT)).isEqualTo(true) + } + // Invoke stop on the new leader cluster index + assertThatThrownBy { leaderClient.stopReplication("restored-$followerIndexName") } + .isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Metadata for restored-$followerIndexName doesn't exist") + // Start replication on the new leader index + followerClient.startReplication( + StartReplicationRequest("source", "restored-$followerIndexName", "restored-$followerIndexName"), + TimeValue.timeValueSeconds(10), + true, true + ) + assertBusy({ + var statusResp = followerClient.replicationStatus("restored-$followerIndexName") + `validate status syncing response`(statusResp) + assertThat(followerClient.getShardReplicationTasks("restored-$followerIndexName")).isNotEmpty() + }, 60, TimeUnit.SECONDS) } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt index c71dab10..62c52414 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt @@ -67,21 +67,17 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { val leaderIndexName = createRandomIndex(leaderClient) var leaderIndexNameNew = "" createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - try { followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) - // Verify that existing index matching the pattern are replicated. assertBusy ({ Assertions.assertThat(followerClient.indices() .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) .isEqualTo(true) }, 30, TimeUnit.SECONDS) - assertBusy ({ Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) }, 10, TimeUnit.SECONDS) - leaderIndexNameNew = createRandomIndex(leaderClient) // Verify that newly created index on leader which match the pattern are also replicated. assertBusy ({ @@ -101,8 +97,6 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { }, 60, TimeUnit.SECONDS) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) - followerClient.stopReplication(leaderIndexName, false) - followerClient.stopReplication(leaderIndexNameNew) } } @@ -111,20 +105,16 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { val leaderClient = getClientForCluster(LEADER) var leaderIndexNameNew = "" createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - try { // Set poll duration to 30sec from 60sec (default) val settings = Settings.builder().put(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL.key, TimeValue.timeValueSeconds(30)) val clusterUpdateSetttingsReq = ClusterUpdateSettingsRequest().persistentSettings(settings) val clusterUpdateResponse = followerClient.cluster().putSettings(clusterUpdateSetttingsReq, RequestOptions.DEFAULT) - var lastExecutionTime = 0L var stats = followerClient.AutoFollowStats() - Assert.assertTrue(clusterUpdateResponse.isAcknowledged) followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) - leaderIndexNameNew = createRandomIndex(leaderClient) // Verify that newly created index on leader which match the pattern are also replicated. assertBusy({ @@ -139,9 +129,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { lastExecutionTime = key["last_execution_time"]!! as Long } } - }, 30, TimeUnit.SECONDS) - assertBusy({ var af_stats = stats.get("autofollow_stats")!! as ArrayList> for (key in af_stats) { @@ -150,11 +138,8 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { } } }, 40, TimeUnit.SECONDS) - - } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) - followerClient.stopReplication(leaderIndexNameNew) } } @@ -164,14 +149,11 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { val leaderClient = getClientForCluster(LEADER) val leaderIndexName = createRandomIndex(leaderClient) createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - try { val settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3) .build() - followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern, settings) - // Verify that existing index matching the pattern are replicated. assertBusy ({ Assertions.assertThat(followerClient.indices() @@ -179,8 +161,6 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { .isEqualTo(true) }, 30, TimeUnit.SECONDS) Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) - - val getSettingsRequest = GetSettingsRequest() getSettingsRequest.indices(leaderIndexName) getSettingsRequest.includeDefaults(true) @@ -195,7 +175,6 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { }, 15, TimeUnit.SECONDS) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) - followerClient.stopReplication(leaderIndexName) } } @@ -203,27 +182,22 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { val indexPatternName2 = "test_pattern2" val indexPattern2 = "lead_index*" val leaderIndexName2 = "lead_index1" - val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) val leaderIndexName = createRandomIndex(leaderClient) leaderClient.indices().create(CreateIndexRequest(leaderIndexName2), RequestOptions.DEFAULT) createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - try { followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName2, indexPattern2) - assertBusy ({ Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(2) }, 30, TimeUnit.SECONDS) - // Verify that existing index matching the pattern are replicated. assertBusy ({ Assertions.assertThat(followerClient.indices() .exists(GetIndexRequest(leaderIndexName2), RequestOptions.DEFAULT)) .isEqualTo(true) - var stats = followerClient.AutoFollowStats() Assertions.assertThat(stats.size).isEqualTo(5) assert(stats["num_success_start_replication"]!! as Int == 2) @@ -237,9 +211,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName2) followerClient.waitForShardTaskStart(leaderIndexName) - followerClient.stopReplication(leaderIndexName) followerClient.waitForShardTaskStart(leaderIndexName2) - followerClient.stopReplication(leaderIndexName2) } } @@ -248,35 +220,26 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { val leaderClient = getClientForCluster(LEADER) val leaderIndexName = createRandomIndex(leaderClient) createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - + followerClient.startReplication(StartReplicationRequest(connectionAlias, leaderIndexName, leaderIndexName), + TimeValue.timeValueSeconds(10),true, waitForRestore = true) + assertBusy({ + Assertions.assertThat(followerClient.indices() + .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + }, 30, TimeUnit.SECONDS) + // Assert that there is no auto follow task & one index replication task + Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(0) + Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) try { - followerClient.startReplication(StartReplicationRequest(connectionAlias, leaderIndexName, leaderIndexName), - TimeValue.timeValueSeconds(10),true, waitForRestore = true) - + followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) assertBusy({ - Assertions.assertThat(followerClient.indices() - .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - }, 30, TimeUnit.SECONDS) - - // Assert that there is no auto follow task & one index replication task - Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(0) - Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) - - try { - followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) - - assertBusy({ - // Assert that there is still only one index replication task - Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) - Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) - followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) - },30, TimeUnit.SECONDS) - } finally { - followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) - } + // Assert that there is still only one index replication task + Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) + Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) + followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) + },30, TimeUnit.SECONDS) } finally { - followerClient.stopReplication(leaderIndexName) + followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) } } @@ -292,7 +255,6 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { fun `test auto follow should fail on pattern name validation failure`() { val followerClient = getClientForCluster(FOLLOWER) createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - assertPatternNameValidation(followerClient, "testPattern", "Value testPattern must be lowercase") assertPatternNameValidation(followerClient, "testPattern*", @@ -305,10 +267,8 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { "Value . must not be '.' or '..'") assertPatternNameValidation(followerClient, "..", "Value .. must not be '.' or '..'") - assertPatternNameValidation(followerClient, "_leader", "Value _leader must not start with '_' or '-' or '+'") - assertPatternNameValidation(followerClient, "-leader", "Value -leader must not start with '_' or '-' or '+'") assertPatternNameValidation(followerClient, "+leader", @@ -331,9 +291,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - val leaderIndexName = createRandomIndex(leaderClient) - try { followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) @@ -343,52 +301,42 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) .isEqualTo(true) } - Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) } - // Verify that auto follow tasks is stopped but the shard replication task remains. assertBusy ({ Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(0) }, 30, TimeUnit.SECONDS) - Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) - followerClient.stopReplication(leaderIndexName) } fun `test autofollow task with start replication block`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - val leaderIndexName = createRandomIndex(leaderClient) try { - // Add replication start block followerClient.updateReplicationStartBlockSetting(true) followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) sleep(30000) // Default poll for auto follow in worst case - // verify both index replication tasks and autofollow tasks // Replication shouldn't have been started - 0 tasks // Autofollow task should still be up - 1 task Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(0) Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) - // Remove replication start block followerClient.updateReplicationStartBlockSetting(false) sleep(45000) // poll for auto follow in worst case - // Index should be replicated and autofollow task should be present Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) - followerClient.stopReplication(leaderIndexName) } } diff --git a/src/test/kotlin/org/opensearch/replication/task/TaskCancellationIT.kt b/src/test/kotlin/org/opensearch/replication/task/TaskCancellationIT.kt index 586290dc..5bbfef7e 100644 --- a/src/test/kotlin/org/opensearch/replication/task/TaskCancellationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/task/TaskCancellationIT.kt @@ -45,68 +45,52 @@ class TaskCancellationIT : MultiClusterRestTestCase() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) val primaryShards = 3 - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create( CreateIndexRequest(leaderIndexName).settings(Settings.builder().put("index.number_of_shards", primaryShards).build()), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { - followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - // Wait for Shard tasks to come up. - var tasks = Collections.emptyList() - assertBusy { - tasks = followerClient.getShardReplicationTasks(followerIndexName) - Assert.assertEquals(tasks.size, primaryShards) - } - - // Cancel one shard task - val cancelTasksRequest = CancelTasksRequest.Builder().withTaskId(TaskId(tasks[0])). - withWaitForCompletion(true).build() - followerClient.tasks().cancel(cancelTasksRequest, RequestOptions.DEFAULT) - - // Verify that replication is continuing and the shards tasks are up and running - assertBusy { - Assert.assertEquals(followerClient.getShardReplicationTasks(followerIndexName).size, primaryShards) - assertThat(followerClient.getIndexReplicationTask(followerIndexName).isNotBlank()).isTrue() - `validate status due shard task cancellation`(followerClient.replicationStatus(followerIndexName)) - } - } finally { - followerClient.stopReplication(followerIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + // Wait for Shard tasks to come up. + var tasks = Collections.emptyList() + assertBusy { + tasks = followerClient.getShardReplicationTasks(followerIndexName) + Assert.assertEquals(tasks.size, primaryShards) + } + // Cancel one shard task + val cancelTasksRequest = CancelTasksRequest.Builder().withTaskId(TaskId(tasks[0])). + withWaitForCompletion(true).build() + followerClient.tasks().cancel(cancelTasksRequest, RequestOptions.DEFAULT) + // Verify that replication is continuing and the shards tasks are up and running + assertBusy { + Assert.assertEquals(followerClient.getShardReplicationTasks(followerIndexName).size, primaryShards) + assertThat(followerClient.getIndexReplicationTask(followerIndexName).isNotBlank()).isTrue() + `validate status due shard task cancellation`(followerClient.replicationStatus(followerIndexName)) } } fun `test user triggering cancel on an index task`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() - try { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) // Wait for Shard tasks to come up. assertBusy { assertThat(followerClient.getShardReplicationTasks(followerIndexName).isEmpty()).isEqualTo(false) } - // Cancel the index replication task var task = followerClient.getIndexReplicationTask(followerIndexName) assertThat(task.isNullOrBlank()).isFalse() val cancelTasksRequest = CancelTasksRequest.Builder().withTaskId(TaskId(task)). withWaitForCompletion(true).build() followerClient.tasks().cancel(cancelTasksRequest, RequestOptions.DEFAULT) - // Verify that replication has paused. assertBusy { assertThat(followerClient.getShardReplicationTasks(followerIndexName).isEmpty()).isTrue() assertThat(followerClient.getIndexReplicationTask(followerIndexName).isNullOrBlank()).isTrue() `validate status due index task cancellation`(followerClient.replicationStatus(followerIndexName)) } - } finally { - followerClient.stopReplication(followerIndexName) - } } } diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TransportReplayChangesActionIT.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TransportReplayChangesActionIT.kt index 8df58e23..58a56112 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TransportReplayChangesActionIT.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TransportReplayChangesActionIT.kt @@ -36,60 +36,48 @@ class TransportReplayChangesActionIT : MultiClusterRestTestCase() { // Create a leader/follower index val leaderIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) val followerIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) - - try { - val doc1 = mapOf("name" to randomAlphaOfLength(20)) - // Create Leader Index - val response = leader.index(IndexRequest(leaderIndex).id("1").source(doc1), RequestOptions.DEFAULT) - Assertions.assertThat(response.result) - .withFailMessage("Failed to create leader data").isEqualTo(DocWriteResponse.Result.CREATED) - - // Setup Mapping on leader - var putMappingRequest = PutMappingRequest(leaderIndex) - putMappingRequest.source( - "{\"dynamic\":\"strict\",\"properties\":{\"name\":{\"type\":\"text\"}}}", - XContentType.JSON - ) - leader.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) - - // Start replication - follower.startReplication( - StartReplicationRequest("source", leaderIndex, followerIndex), - waitForRestore = true - ) - assertBusy { - val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT) - Assertions.assertThat(getResponse.isExists).isTrue() - Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(doc1) - } - - // Add a new field in mapping. - putMappingRequest = PutMappingRequest(leaderIndex) - putMappingRequest.source( - "{\"dynamic\":\"strict\",\"properties\":{\"name\":{\"type\":\"text\"},\"place\":{\"type\":\"text\"}}}", - XContentType.JSON - ) - leader.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) - - // Ingest a doc on the leader - val doc2 = mapOf("name" to randomAlphaOfLength(5), "place" to randomAlphaOfLength(5)) - leader.index(IndexRequest(leaderIndex).id("2").source(doc2), RequestOptions.DEFAULT) - - // Verify that replication is working as expected. - assertBusy ({ - Assert.assertEquals(leader.count(CountRequest(leaderIndex), RequestOptions.DEFAULT).toString(), - follower.count(CountRequest(followerIndex), RequestOptions.DEFAULT).toString()) - `validate status syncing response`(follower.replicationStatus(followerIndex)) - val getResponse = follower.get(GetRequest(followerIndex, "2"), RequestOptions.DEFAULT) - Assertions.assertThat(getResponse.isExists).isTrue() - Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(doc2) - }, - 30, TimeUnit.SECONDS - ) - - } finally { - follower.stopReplication(followerIndex) + val doc1 = mapOf("name" to randomAlphaOfLength(20)) + // Create Leader Index + val response = leader.index(IndexRequest(leaderIndex).id("1").source(doc1), RequestOptions.DEFAULT) + Assertions.assertThat(response.result) + .withFailMessage("Failed to create leader data").isEqualTo(DocWriteResponse.Result.CREATED) + // Setup Mapping on leader + var putMappingRequest = PutMappingRequest(leaderIndex) + putMappingRequest.source( + "{\"dynamic\":\"strict\",\"properties\":{\"name\":{\"type\":\"text\"}}}", + XContentType.JSON + ) + leader.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + // Start replication + follower.startReplication( + StartReplicationRequest("source", leaderIndex, followerIndex), + waitForRestore = true + ) + assertBusy { + val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT) + Assertions.assertThat(getResponse.isExists).isTrue() + Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(doc1) } - + // Add a new field in mapping. + putMappingRequest = PutMappingRequest(leaderIndex) + putMappingRequest.source( + "{\"dynamic\":\"strict\",\"properties\":{\"name\":{\"type\":\"text\"},\"place\":{\"type\":\"text\"}}}", + XContentType.JSON + ) + leader.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + // Ingest a doc on the leader + val doc2 = mapOf("name" to randomAlphaOfLength(5), "place" to randomAlphaOfLength(5)) + leader.index(IndexRequest(leaderIndex).id("2").source(doc2), RequestOptions.DEFAULT) + // Verify that replication is working as expected. + assertBusy ({ + Assert.assertEquals(leader.count(CountRequest(leaderIndex), RequestOptions.DEFAULT).toString(), + follower.count(CountRequest(followerIndex), RequestOptions.DEFAULT).toString()) + `validate status syncing response`(follower.replicationStatus(followerIndex)) + val getResponse = follower.get(GetRequest(followerIndex, "2"), RequestOptions.DEFAULT) + Assertions.assertThat(getResponse.isExists).isTrue() + Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(doc2) + }, + 30, TimeUnit.SECONDS + ) } } \ No newline at end of file