Skip to content

Commit

Permalink
Integ test: forcemerge and snapshot on leader during bootstrap
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Aug 6, 2021
1 parent 7b5776d commit fb55e8a
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 45 deletions.
31 changes: 31 additions & 0 deletions src/test/kotlin/com/amazon/elasticsearch/replication/IndexUtil.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.amazon.elasticsearch.replication

import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions
import org.elasticsearch.action.DocWriteResponse
import org.elasticsearch.action.admin.indices.flush.FlushRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.test.ESTestCase

object IndexUtil {
private val log = LogManager.getLogger(IndexUtil::class.java)

fun fillIndex(clusterClient: RestHighLevelClient,
indexName : String,
nFields: Int,
fieldLength: Int,
stepSize: Int) {
for (i in nFields downTo 1 step stepSize) {
val sourceMap : MutableMap<String, String> = HashMap()
for (j in stepSize downTo 1)
sourceMap[(i-j).toString()] = ESTestCase.randomAlphaOfLength(fieldLength)
log.info("Updating index with map of size:${sourceMap.size}")
val indexResponse = clusterClient.index(IndexRequest(indexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
Assertions.assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED)
}
//flush the index
clusterClient.indices().flush(FlushRequest(indexName), RequestOptions.DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.elasticsearch.replication.integ.rest

import com.amazon.elasticsearch.replication.IndexUtil
import com.amazon.elasticsearch.replication.MultiClusterAnnotations
import com.amazon.elasticsearch.replication.MultiClusterRestTestCase
import com.amazon.elasticsearch.replication.StartReplicationRequest
Expand All @@ -29,14 +30,10 @@ import com.amazon.elasticsearch.replication.updateReplication
import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.elasticsearch.action.DocWriteResponse
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.elasticsearch.action.admin.indices.flush.FlushRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Request
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.ResponseException
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.indices.CreateIndexRequest
import org.elasticsearch.client.indices.GetIndexRequest
import org.elasticsearch.cluster.metadata.IndexMetadata
Expand All @@ -47,7 +44,6 @@ import org.elasticsearch.test.ESTestCase.assertBusy
import java.util.concurrent.TimeUnit



@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
Expand Down Expand Up @@ -117,7 +113,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
// Put a large amount of data into the index
fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize)
IndexUtil.fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize)
assertBusy {
assertThat(leaderClient.indices()
.exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT))
Expand All @@ -136,23 +132,6 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
}
}

private fun fillIndex(clusterClient: RestHighLevelClient,
indexName : String,
nFields: Int,
fieldLength: Int,
stepSize: Int) {
for (i in nFields downTo 1 step stepSize) {
val sourceMap : MutableMap<String, String> = HashMap()
for (j in stepSize downTo 1)
sourceMap[(i-j).toString()] = randomAlphaOfLength(fieldLength)
logger.info("Updating index with map of size:${sourceMap.size}")
val indexResponse = clusterClient.index(IndexRequest(indexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED)
}
//flush the index
clusterClient.indices().flush(FlushRequest(indexName), RequestOptions.DEFAULT)
}

fun `test pause without replication in progress`() {
val followerClient = getClientForCluster(FOLLOWER)
//ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.elasticsearch.replication.integ.rest


import com.amazon.elasticsearch.replication.IndexUtil
import com.amazon.elasticsearch.replication.MultiClusterAnnotations
import com.amazon.elasticsearch.replication.MultiClusterRestTestCase
import com.amazon.elasticsearch.replication.StartReplicationRequest
Expand All @@ -36,17 +37,21 @@ import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.elasticsearch.action.admin.indices.alias.Alias
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Request
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.ResponseException
import org.elasticsearch.client.core.CountRequest
import org.elasticsearch.client.indices.CloseIndexRequest
import org.elasticsearch.client.indices.CreateIndexRequest
import org.elasticsearch.client.indices.GetIndexRequest
Expand All @@ -55,8 +60,11 @@ import org.elasticsearch.client.indices.PutMappingRequest
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.io.PathUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.IndexSettings
import org.elasticsearch.index.mapper.MapperService
import org.elasticsearch.repositories.fs.FsRepository
import org.elasticsearch.test.ESTestCase.assertBusy
import org.junit.Assert
import java.nio.file.Files
Expand All @@ -72,6 +80,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {
private val followerIndexName = "follower_index"
private val leaderClusterPath = "testclusters/leaderCluster-0"
private val followerClusterPath = "testclusters/followCluster-0"
private val repoPath = "testclusters/repo"
private val buildDir = System.getProperty("build.dir")
private val synonymsJson = "/analyzers/synonym_setting.json"
private val synonymMapping = "{\"properties\":{\"value\":{\"type\":\"text\",\"analyzer\":\"standard\",\"search_analyzer\":\"my_analyzer\"}}}"
Expand Down Expand Up @@ -811,6 +820,92 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}
}

fun `test forcemerge on leader during replication bootstrap`() {
val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, Long.MAX_VALUE)
.build()
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings),
RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
// Put a large amount of data into the index
IndexUtil.fillIndex(leaderClient, leaderIndexName, 5000, 1000, 1000)
assertBusy {
assertThat(leaderClient.indices()
.exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT))
}
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
false)
//Given the size of index, the replication should be in RESTORING phase at this point
leaderClient.indices().forcemerge(ForceMergeRequest(leaderIndexName), RequestOptions.DEFAULT)

assertBusy {
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
}
assertBusy {
Assert.assertEquals(leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(),
followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString())
}
} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test that snapshot on leader does not affect replication during bootstrap`() {
val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, Long.MAX_VALUE)
.build()
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)

val repoPath = PathUtils.get(buildDir, repoPath)

val putRepositoryRequest = PutRepositoryRequest("my-repo")
.type(FsRepository.TYPE)
.settings("{\"location\": \"$repoPath\"}", XContentType.JSON)

leaderClient.snapshot().createRepository(putRepositoryRequest, RequestOptions.DEFAULT)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings),
RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
// Put a large amount of data into the index
IndexUtil.fillIndex(leaderClient, leaderIndexName, 5000, 1000, 1000)
assertBusy {
assertThat(leaderClient.indices()
.exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT))
}
try {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
false)
//Given the size of index, the replication should be in RESTORING phase at this point
leaderClient.snapshot().create(CreateSnapshotRequest("my-repo", "snapshot_1").indices(leaderIndexName), RequestOptions.DEFAULT)

assertBusy {
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
}
assertBusy {
Assert.assertEquals(leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(),
followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString())
}
} finally {
followerClient.stopReplication(followerIndexName)
}
}

private fun assertEqualAliases() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.elasticsearch.replication.integ.rest

import com.amazon.elasticsearch.replication.IndexUtil
import com.amazon.elasticsearch.replication.MultiClusterAnnotations
import com.amazon.elasticsearch.replication.MultiClusterRestTestCase
import com.amazon.elasticsearch.replication.StartReplicationRequest
Expand All @@ -24,14 +25,11 @@ import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.DocWriteResponse
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.elasticsearch.action.admin.indices.flush.FlushRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Request
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.ResponseException
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.indices.CreateIndexRequest
import org.elasticsearch.client.indices.GetIndexRequest
import org.elasticsearch.cluster.metadata.IndexMetadata
Expand All @@ -40,7 +38,6 @@ import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.mapper.MapperService
import org.elasticsearch.test.ESTestCase.assertBusy
import java.util.concurrent.TimeUnit
import kotlin.collections.HashMap


const val LEADER = "leaderCluster"
Expand Down Expand Up @@ -104,7 +101,7 @@ class StopReplicationIT: MultiClusterRestTestCase() {
RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
// Put a large amount of data into the index
fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize)
IndexUtil.fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize)
assertBusy {
assertThat(leaderClient.indices()
.exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT))
Expand Down Expand Up @@ -137,23 +134,6 @@ class StopReplicationIT: MultiClusterRestTestCase() {
testStopReplicationInRestoringState(settings, 5, 10, 5)
}

private fun fillIndex(clusterClient: RestHighLevelClient,
indexName : String,
nFields: Int,
fieldLength: Int,
stepSize: Int) {
for (i in nFields downTo 1 step stepSize) {
val sourceMap : MutableMap<String, String> = HashMap()
for (j in stepSize downTo 1)
sourceMap[(i-j).toString()] = randomAlphaOfLength(fieldLength)
logger.info("Updating index with map of size:${sourceMap.size}")
val indexResponse = clusterClient.index(IndexRequest(indexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED)
}
//flush the index
clusterClient.indices().flush(FlushRequest(indexName), RequestOptions.DEFAULT)
}

@AwaitsFix(bugUrl = "")
fun `test follower index unblocked after stop replication`() {
val followerClient = getClientForCluster(FOLLOWER)
Expand Down

0 comments on commit fb55e8a

Please sign in to comment.