From bce6f0f6756ad98387d7819cba6d08fe4c71da3e Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Thu, 16 Sep 2021 21:22:30 +0530 Subject: [PATCH] verifying shard tasks are up in autofollow test (#153) Signed-off-by: Gaurav Bafna --- .../replication/ReplicationHelpers.kt | 12 +++++++++++ .../integ/rest/UpdateAutoFollowPatternIT.kt | 20 ++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt index f086511a..d49a4002 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt @@ -258,6 +258,18 @@ fun RestHighLevelClient.waitForReplicationStart(index: String, waitFor : TimeVal }, waitFor.seconds, TimeUnit.SECONDS) } +fun RestHighLevelClient.waitForShardTaskStart(index: String, waitFor : TimeValue = TimeValue.timeValueSeconds(10)) { + assertBusy( + { + // Persistent tasks service appends identifiers like '[c]' to indicate child task hence the '*' wildcard + val request = ListTasksRequest().setDetailed(true).setActions(ShardReplicationExecutor.TASK_NAME + "*") + val response = tasks().list(request,RequestOptions.DEFAULT) + assertThat(response.tasks) + .withFailMessage("replication shard tasks not started") + .isNotEmpty + }, waitFor.seconds, TimeUnit.SECONDS) +} + fun RestHighLevelClient.leaderStats() : Map { var request = Request("GET", REST_LEADER_STATS) request.setJsonEntity("{}") diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt index 6714020a..94a786eb 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt @@ -45,6 +45,7 @@ import java.util.Locale import org.elasticsearch.cluster.metadata.IndexMetadata import org.elasticsearch.cluster.metadata.MetadataCreateIndexService import com.amazon.elasticsearch.replication.ReplicationPlugin +import com.amazon.elasticsearch.replication.waitForShardTaskStart import org.elasticsearch.test.ESTestCase.assertBusy import java.util.concurrent.TimeUnit @@ -59,6 +60,8 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { private val indexPatternName = "test_pattern" private val connectionAlias = "test_conn" private val longIndexPatternName = "index_".repeat(43) + private val waitForShardTask = TimeValue.timeValueSeconds(10) + fun `test auto follow pattern`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) @@ -83,6 +86,8 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { Assertions.assertThat(followerClient.indices() .exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT)) .isEqualTo(true) + followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask) + followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) }, 60, TimeUnit.SECONDS) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) @@ -111,7 +116,10 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { Assertions.assertThat(followerClient.indices() .exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT)) .isEqualTo(true) + followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask) }, 30, TimeUnit.SECONDS) + + } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) followerClient.stopReplication(leaderIndexNameNew) @@ -151,6 +159,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { .getSettings(getSettingsRequest, RequestOptions.DEFAULT) .indexToSettings[leaderIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] ) + followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) }, 15, TimeUnit.SECONDS) } finally { @@ -182,9 +191,12 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { try { followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) - // Assert that there is still only one index replication task - Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) - Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) + assertBusy({ + // Assert that there is still only one index replication task + Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) + Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) + followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) + },30, TimeUnit.SECONDS) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) } @@ -259,6 +271,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) + followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask) } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) } @@ -269,6 +282,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { }, 30, TimeUnit.SECONDS) Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) + followerClient.stopReplication(leaderIndexName) } fun createRandomIndex(client: RestHighLevelClient): String {