From fb55e8a7589b816af158996f73271738ff1a6e96 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Fri, 30 Jul 2021 13:33:44 +0530 Subject: [PATCH] Integ test: forcemerge and snapshot on leader during bootstrap Signed-off-by: Sooraj Sinha --- .../elasticsearch/replication/IndexUtil.kt | 31 ++++++ .../integ/rest/PauseReplicationIT.kt | 25 +---- .../integ/rest/StartReplicationIT.kt | 95 +++++++++++++++++++ .../integ/rest/StopReplicationIT.kt | 24 +---- 4 files changed, 130 insertions(+), 45 deletions(-) create mode 100644 src/test/kotlin/com/amazon/elasticsearch/replication/IndexUtil.kt diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/IndexUtil.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/IndexUtil.kt new file mode 100644 index 00000000..8b13894f --- /dev/null +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/IndexUtil.kt @@ -0,0 +1,31 @@ +package com.amazon.elasticsearch.replication + +import org.apache.logging.log4j.LogManager +import org.assertj.core.api.Assertions +import org.elasticsearch.action.DocWriteResponse +import org.elasticsearch.action.admin.indices.flush.FlushRequest +import org.elasticsearch.action.index.IndexRequest +import org.elasticsearch.client.RequestOptions +import org.elasticsearch.client.RestHighLevelClient +import org.elasticsearch.test.ESTestCase + +object IndexUtil { + private val log = LogManager.getLogger(IndexUtil::class.java) + + fun fillIndex(clusterClient: RestHighLevelClient, + indexName : String, + nFields: Int, + fieldLength: Int, + stepSize: Int) { + for (i in nFields downTo 1 step stepSize) { + val sourceMap : MutableMap = HashMap() + for (j in stepSize downTo 1) + sourceMap[(i-j).toString()] = ESTestCase.randomAlphaOfLength(fieldLength) + log.info("Updating index with map of size:${sourceMap.size}") + val indexResponse = clusterClient.index(IndexRequest(indexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) + Assertions.assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED) + } + //flush the index + clusterClient.indices().flush(FlushRequest(indexName), RequestOptions.DEFAULT) + } +} diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt index 6639516e..11f6392b 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt @@ -15,6 +15,7 @@ package com.amazon.elasticsearch.replication.integ.rest +import com.amazon.elasticsearch.replication.IndexUtil import com.amazon.elasticsearch.replication.MultiClusterAnnotations import com.amazon.elasticsearch.replication.MultiClusterRestTestCase import com.amazon.elasticsearch.replication.StartReplicationRequest @@ -29,14 +30,10 @@ import com.amazon.elasticsearch.replication.updateReplication import org.apache.http.util.EntityUtils import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy -import org.elasticsearch.action.DocWriteResponse import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest -import org.elasticsearch.action.admin.indices.flush.FlushRequest -import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Request import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.ResponseException -import org.elasticsearch.client.RestHighLevelClient import org.elasticsearch.client.indices.CreateIndexRequest import org.elasticsearch.client.indices.GetIndexRequest import org.elasticsearch.cluster.metadata.IndexMetadata @@ -47,7 +44,6 @@ import org.elasticsearch.test.ESTestCase.assertBusy import java.util.concurrent.TimeUnit - @MultiClusterAnnotations.ClusterConfigurations( MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) @@ -117,7 +113,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() // Put a large amount of data into the index - fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize) + IndexUtil.fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize) assertBusy { assertThat(leaderClient.indices() .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) @@ -136,23 +132,6 @@ class PauseReplicationIT: MultiClusterRestTestCase() { } } - private fun fillIndex(clusterClient: RestHighLevelClient, - indexName : String, - nFields: Int, - fieldLength: Int, - stepSize: Int) { - for (i in nFields downTo 1 step stepSize) { - val sourceMap : MutableMap = HashMap() - for (j in stepSize downTo 1) - sourceMap[(i-j).toString()] = randomAlphaOfLength(fieldLength) - logger.info("Updating index with map of size:${sourceMap.size}") - val indexResponse = clusterClient.index(IndexRequest(indexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) - assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED) - } - //flush the index - clusterClient.indices().flush(FlushRequest(indexName), RequestOptions.DEFAULT) - } - fun `test pause without replication in progress`() { val followerClient = getClientForCluster(FOLLOWER) //ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ? diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt index 70fc767d..f4e2a99d 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt @@ -16,6 +16,7 @@ package com.amazon.elasticsearch.replication.integ.rest +import com.amazon.elasticsearch.replication.IndexUtil import com.amazon.elasticsearch.replication.MultiClusterAnnotations import com.amazon.elasticsearch.replication.MultiClusterRestTestCase import com.amazon.elasticsearch.replication.StartReplicationRequest @@ -36,10 +37,13 @@ import org.apache.http.util.EntityUtils import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.elasticsearch.ElasticsearchStatusException +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest import org.elasticsearch.action.admin.indices.alias.Alias import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.get.GetRequest @@ -47,6 +51,7 @@ import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Request import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.ResponseException +import org.elasticsearch.client.core.CountRequest import org.elasticsearch.client.indices.CloseIndexRequest import org.elasticsearch.client.indices.CreateIndexRequest import org.elasticsearch.client.indices.GetIndexRequest @@ -55,8 +60,11 @@ import org.elasticsearch.client.indices.PutMappingRequest import org.elasticsearch.cluster.metadata.IndexMetadata import org.elasticsearch.common.io.PathUtils import org.elasticsearch.common.settings.Settings +import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.index.IndexSettings +import org.elasticsearch.index.mapper.MapperService +import org.elasticsearch.repositories.fs.FsRepository import org.elasticsearch.test.ESTestCase.assertBusy import org.junit.Assert import java.nio.file.Files @@ -72,6 +80,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { private val followerIndexName = "follower_index" private val leaderClusterPath = "testclusters/leaderCluster-0" private val followerClusterPath = "testclusters/followCluster-0" + private val repoPath = "testclusters/repo" private val buildDir = System.getProperty("build.dir") private val synonymsJson = "/analyzers/synonym_setting.json" private val synonymMapping = "{\"properties\":{\"value\":{\"type\":\"text\",\"analyzer\":\"standard\",\"search_analyzer\":\"my_analyzer\"}}}" @@ -811,6 +820,92 @@ class StartReplicationIT: MultiClusterRestTestCase() { } } + fun `test forcemerge on leader during replication bootstrap`() { + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, Long.MAX_VALUE) + .build() + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), + RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + // Put a large amount of data into the index + IndexUtil.fillIndex(leaderClient, leaderIndexName, 5000, 1000, 1000) + assertBusy { + assertThat(leaderClient.indices() + .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) + } + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + false) + //Given the size of index, the replication should be in RESTORING phase at this point + leaderClient.indices().forcemerge(ForceMergeRequest(leaderIndexName), RequestOptions.DEFAULT) + + assertBusy { + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) + } + assertBusy { + Assert.assertEquals(leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(), + followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString()) + } + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + fun `test that snapshot on leader does not affect replication during bootstrap`() { + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, Long.MAX_VALUE) + .build() + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val repoPath = PathUtils.get(buildDir, repoPath) + + val putRepositoryRequest = PutRepositoryRequest("my-repo") + .type(FsRepository.TYPE) + .settings("{\"location\": \"$repoPath\"}", XContentType.JSON) + + leaderClient.snapshot().createRepository(putRepositoryRequest, RequestOptions.DEFAULT) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), + RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + // Put a large amount of data into the index + IndexUtil.fillIndex(leaderClient, leaderIndexName, 5000, 1000, 1000) + assertBusy { + assertThat(leaderClient.indices() + .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) + } + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + false) + //Given the size of index, the replication should be in RESTORING phase at this point + leaderClient.snapshot().create(CreateSnapshotRequest("my-repo", "snapshot_1").indices(leaderIndexName), RequestOptions.DEFAULT) + + assertBusy { + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) + } + assertBusy { + Assert.assertEquals(leaderClient.count(CountRequest(leaderIndexName), RequestOptions.DEFAULT).toString(), + followerClient.count(CountRequest(followerIndexName), RequestOptions.DEFAULT).toString()) + } + } finally { + followerClient.stopReplication(followerIndexName) + } + } + private fun assertEqualAliases() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StopReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StopReplicationIT.kt index 63fa0b56..f323eca0 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StopReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StopReplicationIT.kt @@ -15,6 +15,7 @@ package com.amazon.elasticsearch.replication.integ.rest +import com.amazon.elasticsearch.replication.IndexUtil import com.amazon.elasticsearch.replication.MultiClusterAnnotations import com.amazon.elasticsearch.replication.MultiClusterRestTestCase import com.amazon.elasticsearch.replication.StartReplicationRequest @@ -24,14 +25,11 @@ import org.apache.http.util.EntityUtils import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.elasticsearch.ElasticsearchStatusException -import org.elasticsearch.action.DocWriteResponse import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest -import org.elasticsearch.action.admin.indices.flush.FlushRequest import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Request import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.ResponseException -import org.elasticsearch.client.RestHighLevelClient import org.elasticsearch.client.indices.CreateIndexRequest import org.elasticsearch.client.indices.GetIndexRequest import org.elasticsearch.cluster.metadata.IndexMetadata @@ -40,7 +38,6 @@ import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.index.mapper.MapperService import org.elasticsearch.test.ESTestCase.assertBusy import java.util.concurrent.TimeUnit -import kotlin.collections.HashMap const val LEADER = "leaderCluster" @@ -104,7 +101,7 @@ class StopReplicationIT: MultiClusterRestTestCase() { RequestOptions.DEFAULT) assertThat(createIndexResponse.isAcknowledged).isTrue() // Put a large amount of data into the index - fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize) + IndexUtil.fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize) assertBusy { assertThat(leaderClient.indices() .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) @@ -137,23 +134,6 @@ class StopReplicationIT: MultiClusterRestTestCase() { testStopReplicationInRestoringState(settings, 5, 10, 5) } - private fun fillIndex(clusterClient: RestHighLevelClient, - indexName : String, - nFields: Int, - fieldLength: Int, - stepSize: Int) { - for (i in nFields downTo 1 step stepSize) { - val sourceMap : MutableMap = HashMap() - for (j in stepSize downTo 1) - sourceMap[(i-j).toString()] = randomAlphaOfLength(fieldLength) - logger.info("Updating index with map of size:${sourceMap.size}") - val indexResponse = clusterClient.index(IndexRequest(indexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) - assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED) - } - //flush the index - clusterClient.indices().flush(FlushRequest(indexName), RequestOptions.DEFAULT) - } - @AwaitsFix(bugUrl = "") fun `test follower index unblocked after stop replication`() { val followerClient = getClientForCluster(FOLLOWER)