Skip to content

Commit

Permalink
stopReplication API removed from integ test cases and stopAllReplicat…
Browse files Browse the repository at this point in the history
…ionJobs API added in MultiClusterRestTestCase (#619)

Signed-off-by: sricharanvuppu <[email protected]>

Signed-off-by: sricharanvuppu <[email protected]>
(cherry picked from commit ad8e8de)
  • Loading branch information
sricharanvuppu committed Nov 30, 2022
1 parent f998f00 commit c93f0b0
Show file tree
Hide file tree
Showing 16 changed files with 999 additions and 1,563 deletions.
136 changes: 56 additions & 80 deletions src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,120 +44,96 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
val follower = getClientForCluster(FOLL)
val leader = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLL, LEADER)

val leaderIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val followerIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
// Create an empty index on the leader and trigger replication on it
val createIndexResponse = leader.indices().create(CreateIndexRequest(leaderIndex), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
try {
follower.startReplication(StartReplicationRequest("source", leaderIndex, followerIndex), waitForRestore=true)

val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
follower.startReplication(StartReplicationRequest("source", leaderIndex, followerIndex), waitForRestore=true)
val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
assertBusy({
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}, 60L, TimeUnit.SECONDS)
// Ensure force merge on leader doesn't impact replication
for (i in 2..5) {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)

assertBusy({
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
}
leader.indices().forcemerge(ForceMergeRequest(leaderIndex), RequestOptions.DEFAULT)
for (i in 6..10) {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
assertBusy({
for (i in 2..10) {
val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}, 60L, TimeUnit.SECONDS)

// Ensure force merge on leader doesn't impact replication
for (i in 2..5) {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
leader.indices().forcemerge(ForceMergeRequest(leaderIndex), RequestOptions.DEFAULT)
for (i in 6..10) {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
assertBusy({
for (i in 2..10) {
val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}, 60L, TimeUnit.SECONDS)

// Force merge on follower however isn't allowed due to WRITE block
Assertions.assertThatThrownBy {
follower.indices().forcemerge(ForceMergeRequest(followerIndex), RequestOptions.DEFAULT)
}.isInstanceOf(OpenSearchStatusException::class.java)
.hasMessage("OpenSearch exception [type=cluster_block_exception, reason=index [$followerIndex] " +
"blocked by: [FORBIDDEN/1000/index read-only(cross-cluster-replication)];]")

} finally {
follower.stopReplication(followerIndex)
}
}, 60L, TimeUnit.SECONDS)
// Force merge on follower however isn't allowed due to WRITE block
Assertions.assertThatThrownBy {
follower.indices().forcemerge(ForceMergeRequest(followerIndex), RequestOptions.DEFAULT)
}.isInstanceOf(OpenSearchStatusException::class.java)
.hasMessage("OpenSearch exception [type=cluster_block_exception, reason=index [$followerIndex] " +
"blocked by: [FORBIDDEN/1000/index read-only(cross-cluster-replication)];]")
}

fun `test existing index replication`() {
val follower = getClientForCluster(FOLL)
val leader = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLL, LEADER)

// Create an index with data before commencing replication
val leaderIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val followerIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
val response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED)

follower.startReplication(StartReplicationRequest("source", leaderIndex, followerIndex), waitForRestore=true)

assertBusy {
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
follower.stopReplication(followerIndex)
}

fun `test that index operations are replayed to follower during replication`() {
val followerClient = getClientForCluster(FOLL)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLL, LEADER)

val leaderIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val followerIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()

try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore=true)

// Create document
var source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED)

assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}, 60L, TimeUnit.SECONDS)

// Update document
source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED)

assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
},60L, TimeUnit.SECONDS)

// Delete document
val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT)
assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED)

assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isFalse()
}, 60L, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore=true)
// Create document
var source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED)
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}, 60L, TimeUnit.SECONDS)
// Update document
source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED)
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
},60L, TimeUnit.SECONDS)
// Delete document
val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT)
assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED)
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isFalse()
}, 60L, TimeUnit.SECONDS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.replication

import com.nhaarman.mockitokotlin2.stub
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
Expand Down Expand Up @@ -56,6 +57,7 @@ import org.junit.After
import org.junit.AfterClass
import org.junit.Before
import org.junit.BeforeClass
import org.opensearch.index.mapper.ObjectMapper
import java.nio.file.Files
import java.security.KeyManagementException
import java.security.KeyStore
Expand Down Expand Up @@ -419,8 +421,32 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
testCluster.lowLevelClient.performRequest(request)
}
}

private fun stopAllReplicationJobs(testCluster: TestCluster) {
val indicesResponse = testCluster.lowLevelClient.performRequest((Request("GET","/_cat/indices/*,-.*?format=json&pretty")))
val indicesResponseEntity = EntityUtils.toString(indicesResponse.entity)
var parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indicesResponseEntity)
parser.list().forEach{ item->
val str = item.toString()
val map = str.subSequence(1,str.length-1).split(",").associate {
val (key, value) = it.trim().split("=")
key to value
}
val ind = map.get("index")
try {
val stopRequest = Request("POST","/_plugins/_replication/" + ind.toString() + "/_stop")
stopRequest.setJsonEntity("{}")
stopRequest.setOptions(RequestOptions.DEFAULT)
val response=testCluster.lowLevelClient.performRequest(stopRequest)
}
catch (e:ResponseException){
if(e.response.statusLine.statusCode!=400) {
throw e
}
}
}
}
protected fun wipeIndicesFromCluster(testCluster: TestCluster) {
stopAllReplicationJobs(testCluster)
try {
val deleteRequest = Request("DELETE", "*,-.*") // All except system indices
val response = testCluster.lowLevelClient.performRequest(deleteRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,35 @@ class ClusterRerouteFollowerIT : MultiClusterRestTestCase() {
fun `test replication works after rerouting a shard from one node to another in follower cluster`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
try {
changeTemplate(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)

//Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying
assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1")
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)

val nodes = getNodesInCluster(FOLLOWER)

val primaryNode = getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")
val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get()
rerouteShard(FOLLOWER, "0", followerIndexName, primaryNode, unassignedNode)

assertBusy ({
Assertions.assertThat(getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")).isEqualTo(unassignedNode)
}, 1, TimeUnit.MINUTES)
logger.info("rereouted shard is " + getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0"))
insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName)

assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2")
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
} finally {
followerClient.stopReplication(followerIndexName)
}
changeTemplate(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
//Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying
assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1")
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
val nodes = getNodesInCluster(FOLLOWER)
val primaryNode = getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")
val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get()
rerouteShard(FOLLOWER, "0", followerIndexName, primaryNode, unassignedNode)
assertBusy ({
Assertions.assertThat(getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")).isEqualTo(unassignedNode)
}, 1, TimeUnit.MINUTES)
logger.info("rereouted shard is " + getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0"))
insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName)
assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2")
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
}
}
Loading

0 comments on commit c93f0b0

Please sign in to comment.