Skip to content

Commit

Permalink
added few integ tests when open and close is triggered on leader index
Browse files Browse the repository at this point in the history
Signed-off-by: naveen pajjuri <[email protected]>
  • Loading branch information
naveen pajjuri committed Aug 2, 2021
1 parent f8dbc76 commit cfc0d4d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ fun `validate paused status resposne`(statusResp: Map<String, Any>) {
Assert.assertFalse(statusResp.containsKey("remote_checkpoint"))
}

fun `validate paused status on closed index`(statusResp: Map<String, Any>) {
Assert.assertEquals(statusResp.getValue("status"), "PAUSED")
assertThat(statusResp.getValue("reason").toString().contains("org.elasticsearch.cluster.block.ClusterBlockException"))
Assert.assertFalse(statusResp.containsKey("shard_replication_details"))
Assert.assertFalse(statusResp.containsKey("follower_checkpoint"))
Assert.assertFalse(statusResp.containsKey("leader_checkpoint"))
}

fun `validate aggregated paused status resposne`(statusResp: Map<String, Any>) {
Assert.assertEquals(statusResp.getValue("status"),"PAUSED")
Assert.assertEquals(statusResp.getValue("reason"), STATUS_REASON_USER_INITIATED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package com.amazon.elasticsearch.replication.integ.rest
import com.amazon.elasticsearch.replication.MultiClusterAnnotations
import com.amazon.elasticsearch.replication.MultiClusterRestTestCase
import com.amazon.elasticsearch.replication.StartReplicationRequest
import com.amazon.elasticsearch.replication.`validate not paused status resposne`
import com.amazon.elasticsearch.replication.`validate paused status on closed index`
import com.amazon.elasticsearch.replication.pauseReplication
import com.amazon.elasticsearch.replication.replicationStatus
import com.amazon.elasticsearch.replication.resumeReplication
import com.amazon.elasticsearch.replication.startReplication
import com.amazon.elasticsearch.replication.stopReplication
import com.amazon.elasticsearch.replication.updateReplication
Expand Down Expand Up @@ -123,6 +128,54 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}
}


fun `test replication when _close is triggered on leader`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true)
assertBusy {
assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true)
}
leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close"))
assertBusy ({
try {
assertThat(followerClient.replicationStatus(followerIndexName)).containsKey("status")
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate paused status on closed index`(statusResp)
} catch (e : Exception) {
Assert.fail()
}
},30, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}


fun `test replication when _close and _open is triggered on leader`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true)
followerClient.pauseReplication(followerIndexName)
leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_close"))
leaderClient.lowLevelClient.performRequest(Request("POST", "/" + leaderIndexName + "/_open"))
followerClient.resumeReplication(followerIndexName)
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate not paused status resposne`(statusResp)

} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test start replication fails when replication has already been started for the same index`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand Down

0 comments on commit cfc0d4d

Please sign in to comment.