diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt index a2cf5fd0c..1075352f7 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt @@ -130,6 +130,14 @@ fun `validate paused status resposne`(statusResp: Map) { Assert.assertFalse(statusResp.containsKey("remote_checkpoint")) } +fun `validate paused status on closed index`(statusResp: Map) { + Assert.assertEquals(statusResp.getValue("status"), "PAUSED") + assertThat(statusResp.getValue("reason").toString().contains("org.elasticsearch.cluster.block.ClusterBlockException")) + Assert.assertFalse(statusResp.containsKey("shard_replication_details")) + Assert.assertFalse(statusResp.containsKey("follower_checkpoint")) + Assert.assertFalse(statusResp.containsKey("leader_checkpoint")) +} + fun `validate aggregated paused status resposne`(statusResp: Map) { Assert.assertEquals(statusResp.getValue("status"),"PAUSED") Assert.assertEquals(statusResp.getValue("reason"), STATUS_REASON_USER_INITIATED) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt index d1af1f009..67c9cb505 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt @@ -19,6 +19,11 @@ package com.amazon.elasticsearch.replication.integ.rest import com.amazon.elasticsearch.replication.MultiClusterAnnotations import com.amazon.elasticsearch.replication.MultiClusterRestTestCase import com.amazon.elasticsearch.replication.StartReplicationRequest +import com.amazon.elasticsearch.replication.`validate not paused status resposne` +import com.amazon.elasticsearch.replication.`validate paused status on closed index` +import com.amazon.elasticsearch.replication.pauseReplication +import com.amazon.elasticsearch.replication.replicationStatus +import com.amazon.elasticsearch.replication.resumeReplication import com.amazon.elasticsearch.replication.startReplication import com.amazon.elasticsearch.replication.stopReplication import com.amazon.elasticsearch.replication.updateReplication @@ -123,6 +128,54 @@ class StartReplicationIT: MultiClusterRestTestCase() { } } + + fun `test replication when _close is triggered on leader`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + assertBusy { + assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) + } + leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close")) + assertBusy ({ + try { + assertThat(followerClient.replicationStatus(followerIndexName)).containsKey("status") + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate paused status on closed index`(statusResp) + } catch (e : Exception) { + Assert.fail() + } + },30, TimeUnit.SECONDS) + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + + fun `test replication when _close and _open is triggered on leader`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + followerClient.pauseReplication(followerIndexName) + leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close")) + leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_open")) + followerClient.resumeReplication(followerIndexName) + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate not paused status resposne`(statusResp) + + } finally { + followerClient.stopReplication(followerIndexName) + } + } + fun `test start replication fails when replication has already been started for the same index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER)