-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integration test for cluster reroute in #81
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import org.apache.http.message.BasicHeader | |
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy | ||
import org.apache.http.nio.entity.NStringEntity | ||
import org.apache.http.ssl.SSLContexts | ||
import org.apache.http.util.EntityUtils | ||
import org.apache.lucene.util.SetOnce | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest | ||
|
@@ -334,6 +335,87 @@ abstract class MultiClusterRestTestCase : ESTestCase() { | |
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong()) | ||
} | ||
|
||
protected fun getPrimaryNodeForShard(clusterName: String,indexname: String, shardNumber: String) :String { | ||
val cluster = getNamedCluster(clusterName) | ||
val persistentConnectionRequest = Request("GET", "/_cat/shards") | ||
|
||
val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest) | ||
val resp = EntityUtils.toString(persistentConnectionResponse.entity); | ||
var primaryNode:String = "" | ||
|
||
//leader_index 0 p STARTED 1 3.7kb 127.0.0.1 leaderCluster-1 | ||
//leader_index 0 r STARTED 1 3.7kb 127.0.0.1 leaderCluster-0 | ||
resp.lines().forEach { line -> | ||
val trimmed = line.replace("\\s+".toRegex(), " ") | ||
val parts = trimmed.split(" ") | ||
if(parts.size == 8) { | ||
if (parts.get(0).equals(indexname) | ||
&& parts.get(1).equals(shardNumber) | ||
&& parts.get(2).equals("p")) { | ||
primaryNode = parts.get(7) | ||
} | ||
} | ||
} | ||
return primaryNode | ||
} | ||
|
||
protected fun insertDocToIndex(clusterName: String, docCount: String, docValue: String, indexName: String) { | ||
val cluster = getNamedCluster(clusterName) | ||
val persistentConnectionRequest = Request("PUT", indexName + "/_doc/"+ docCount) | ||
val entityAsString = """ | ||
{"value" : "$docValue"}""".trimMargin() | ||
|
||
persistentConnectionRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON) | ||
val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest) | ||
assertEquals(HttpStatus.SC_CREATED.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong()) | ||
} | ||
|
||
protected fun docs(clusterName: String,indexName : String) : String{ | ||
val cluster = getNamedCluster(clusterName) | ||
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*") | ||
|
||
val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest) | ||
val resp = EntityUtils.toString(persistentConnectionResponse.entity); | ||
return resp | ||
} | ||
|
||
protected fun getNodesInCluster(clusterName: String) : List<String>{ | ||
val cluster = getNamedCluster(clusterName) | ||
val persistentConnectionRequest = Request("GET", "/_cat/nodes") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same suggestion as above to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest) | ||
val resp = EntityUtils.toString(persistentConnectionResponse.entity); | ||
|
||
//127.0.0.1 38 100 7 3.02 dimr * leaderCluster-1 | ||
//127.0.0.1 25 100 8 3.02 dimr - leaderCluster-0 | ||
var nodes = mutableListOf<String>() | ||
resp.lines().forEach { line -> | ||
val parts = line.split(" ") | ||
if(parts.size >= 8) { | ||
nodes.add(parts.last()) | ||
} | ||
} | ||
return nodes | ||
} | ||
|
||
protected fun rerouteShard(clusterName: String, shardNumber: String, indexName: String, fromNode: String, toNode : String) { | ||
val cluster = getNamedCluster(clusterName) | ||
val persistentConnectionRequest = Request("POST", "_cluster/reroute") | ||
val entityAsString = """ | ||
{ | ||
"commands": [{ | ||
"move": { | ||
"index": "$indexName", "shard": $shardNumber, | ||
"from_node": "$fromNode", "to_node": "$toNode" | ||
} | ||
}] | ||
}""".trimMargin() | ||
|
||
persistentConnectionRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON) | ||
val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest) | ||
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong()) | ||
} | ||
|
||
fun getReplicationTaskList(clusterName: String, action: String="*replication*"): List<TaskInfo> { | ||
val client = getClientForCluster(clusterName) | ||
val request = ListTasksRequest().setDetailed(true).setActions(action) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package com.amazon.elasticsearch.replication.integ.rest | ||
|
||
import com.amazon.elasticsearch.replication.MultiClusterRestTestCase | ||
import com.amazon.elasticsearch.replication.MultiClusterAnnotations | ||
import com.amazon.elasticsearch.replication.StartReplicationRequest | ||
import com.amazon.elasticsearch.replication.startReplication | ||
import com.amazon.elasticsearch.replication.stopReplication | ||
import org.apache.http.entity.ContentType | ||
import org.apache.http.nio.entity.NStringEntity | ||
import org.assertj.core.api.Assertions | ||
import org.elasticsearch.client.Request | ||
import org.elasticsearch.client.RequestOptions | ||
import org.elasticsearch.client.indices.CreateIndexRequest | ||
import org.elasticsearch.client.indices.GetIndexRequest | ||
import org.elasticsearch.test.ESTestCase.assertBusy | ||
import java.util.concurrent.TimeUnit | ||
|
||
|
||
@MultiClusterAnnotations.ClusterConfigurations( | ||
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), | ||
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) | ||
) | ||
|
||
class ClusterRerouteFollowerIT : MultiClusterRestTestCase() { | ||
private val leaderIndexName = "leader_index" | ||
private val followerIndexName = "follower_index" | ||
fun `test replication works after rerouting a shard from one node to another in follower cluster`() { | ||
val followerClient = getClientForCluster(FOLLOWER) | ||
val leaderClient = getClientForCluster(LEADER) | ||
try { | ||
try { | ||
changeTemplate(LEADER) | ||
} catch (ex1 : Exception) { | ||
logger.info("Changing template method is deprecated and throws an warning exception") | ||
} | ||
createConnectionBetweenClusters(FOLLOWER, LEADER) | ||
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) | ||
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() | ||
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) | ||
assertBusy ({ | ||
Assertions.assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) | ||
}, 1, TimeUnit.MINUTES) | ||
Comment on lines
+40
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont think you need to wait here to ensure that follower index exists. You can proceed to add data to leader. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) | ||
|
||
//Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying | ||
assertBusy ({ | ||
try { | ||
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") | ||
} catch (ex: Exception) { | ||
Assertions.assertThat(true).isEqualTo(false) | ||
} | ||
}, 1, TimeUnit.MINUTES) | ||
|
||
val nodes = getNodesInCluster(FOLLOWER) | ||
|
||
val primaryNode = getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0") | ||
val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get() | ||
rerouteShard(FOLLOWER, "0", followerIndexName, primaryNode, unassignedNode) | ||
|
||
assertBusy ({ | ||
Assertions.assertThat(getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")).isEqualTo(unassignedNode) | ||
}, 1, TimeUnit.MINUTES) | ||
logger.info("rereouted shard is " + getPrimaryNodeForShard(FOLLOWER,followerIndexName, "0")) | ||
insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName) | ||
|
||
//TODO: Fix this and uncomment this code | ||
/*assertBusy ({ | ||
try { | ||
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2") | ||
} catch (ex: Exception) { | ||
Assertions.assertThat(true).isEqualTo(false) | ||
} | ||
}, 1, TimeUnit.MINUTES)*/ | ||
Comment on lines
+66
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the current issue here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is solved in the pr #114 |
||
} finally { | ||
followerClient.stopReplication(followerIndexName) | ||
} | ||
} | ||
|
||
private fun changeTemplate(clusterName: String) { | ||
val cluster = getNamedCluster(clusterName) | ||
val persistentConnectionRequest = Request("PUT", "_template/all") | ||
val entityAsString = """ | ||
{"template": "*", "settings": {"number_of_shards": 1, "number_of_replicas": 0}}""".trimMargin() | ||
|
||
persistentConnectionRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON) | ||
cluster.lowLevelClient.performRequest(persistentConnectionRequest) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: new lines at the end of new files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package com.amazon.elasticsearch.replication.integ.rest | ||
|
||
import com.amazon.elasticsearch.replication.MultiClusterRestTestCase | ||
import com.amazon.elasticsearch.replication.MultiClusterAnnotations | ||
import com.amazon.elasticsearch.replication.StartReplicationRequest | ||
import com.amazon.elasticsearch.replication.startReplication | ||
import com.amazon.elasticsearch.replication.stopReplication | ||
import org.apache.http.entity.ContentType | ||
import org.apache.http.nio.entity.NStringEntity | ||
import org.assertj.core.api.Assertions | ||
import org.elasticsearch.client.Request | ||
import org.elasticsearch.client.RequestOptions | ||
import org.elasticsearch.client.indices.CreateIndexRequest | ||
import org.elasticsearch.client.indices.GetIndexRequest | ||
import org.elasticsearch.test.ESTestCase.assertBusy | ||
import org.junit.Ignore | ||
import java.util.concurrent.TimeUnit | ||
|
||
@MultiClusterAnnotations.ClusterConfigurations( | ||
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), | ||
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) | ||
) | ||
|
||
class ClusterRerouteLeaderIT : MultiClusterRestTestCase() { | ||
private val leaderIndexName = "leader_index" | ||
private val followerIndexName = "follower_index" | ||
|
||
fun `test replication works after rerouting a shard from one node to another in leader cluster`() { | ||
val followerClient = getClientForCluster(FOLLOWER) | ||
val leaderClient = getClientForCluster(LEADER) | ||
try { | ||
try { | ||
changeTemplate(LEADER) | ||
} catch (ex1 : Exception) { | ||
logger.info("Changing template method is deprecated and throws an warning exception") | ||
} | ||
createConnectionBetweenClusters(FOLLOWER, LEADER) | ||
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) | ||
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() | ||
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) | ||
assertBusy ({ | ||
Assertions.assertThat(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)).isEqualTo(true) | ||
},1, TimeUnit.MINUTES) | ||
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) | ||
|
||
//Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying | ||
assertBusy ({ | ||
try { | ||
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1") | ||
} catch (ex: Exception) { | ||
Assertions.assertThat(true).isEqualTo(false) | ||
} | ||
}, 1, TimeUnit.MINUTES) | ||
|
||
val nodes = getNodesInCluster(LEADER) | ||
val primaryNode = getPrimaryNodeForShard(LEADER,leaderIndexName, "0") | ||
val unassignedNode = nodes.filter{!it.equals(primaryNode)}.stream().findFirst().get() | ||
rerouteShard(LEADER, "0", leaderIndexName, primaryNode, unassignedNode) | ||
|
||
assertBusy ({ | ||
Assertions.assertThat(getPrimaryNodeForShard(LEADER,leaderIndexName, "0")).isEqualTo(unassignedNode) | ||
}, 1, TimeUnit.MINUTES) | ||
|
||
insertDocToIndex(LEADER, "2", "dummy data 2",leaderIndexName) | ||
|
||
assertBusy ({ | ||
try { | ||
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 2") | ||
} catch (ex: Exception) { | ||
Assertions.assertThat(true).isEqualTo(false) | ||
} | ||
}, 1, TimeUnit.MINUTES) | ||
} finally { | ||
followerClient.stopReplication(followerIndexName) | ||
} | ||
} | ||
|
||
private fun changeTemplate(clusterName: String) { | ||
val cluster = getNamedCluster(clusterName) | ||
val persistentConnectionRequest = Request("PUT", "_template/all") | ||
val entityAsString = """ | ||
{"template": "*", "settings": {"number_of_shards": 1, "number_of_replicas": 0}}""".trimMargin() | ||
|
||
persistentConnectionRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON) | ||
cluster.lowLevelClient.performRequest(persistentConnectionRequest) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can call
_cat/shards?format=json
to make it easier to parse the response.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done