Skip to content

Commit

Permalink
verifying shard tasks are up in autofollow test (opensearch-project#153)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Sep 17, 2021
1 parent c6cf654 commit bce6f0f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ fun RestHighLevelClient.waitForReplicationStart(index: String, waitFor : TimeVal
}, waitFor.seconds, TimeUnit.SECONDS)
}

fun RestHighLevelClient.waitForShardTaskStart(index: String, waitFor : TimeValue = TimeValue.timeValueSeconds(10)) {
assertBusy(
{
// Persistent tasks service appends identifiers like '[c]' to indicate child task hence the '*' wildcard
val request = ListTasksRequest().setDetailed(true).setActions(ShardReplicationExecutor.TASK_NAME + "*")
val response = tasks().list(request,RequestOptions.DEFAULT)
assertThat(response.tasks)
.withFailMessage("replication shard tasks not started")
.isNotEmpty
}, waitFor.seconds, TimeUnit.SECONDS)
}

fun RestHighLevelClient.leaderStats() : Map<String, Any> {
var request = Request("GET", REST_LEADER_STATS)
request.setJsonEntity("{}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import java.util.Locale
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService
import com.amazon.elasticsearch.replication.ReplicationPlugin
import com.amazon.elasticsearch.replication.waitForShardTaskStart
import org.elasticsearch.test.ESTestCase.assertBusy
import java.util.concurrent.TimeUnit

Expand All @@ -59,6 +60,8 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
private val indexPatternName = "test_pattern"
private val connectionAlias = "test_conn"
private val longIndexPatternName = "index_".repeat(43)
private val waitForShardTask = TimeValue.timeValueSeconds(10)

fun `test auto follow pattern`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand All @@ -83,6 +86,8 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
Assertions.assertThat(followerClient.indices()
.exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT))
.isEqualTo(true)
followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask)
followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask)
}, 60, TimeUnit.SECONDS)
} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
Expand Down Expand Up @@ -111,7 +116,10 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
Assertions.assertThat(followerClient.indices()
.exists(GetIndexRequest(leaderIndexNameNew), RequestOptions.DEFAULT))
.isEqualTo(true)
followerClient.waitForShardTaskStart(leaderIndexNameNew, waitForShardTask)
}, 30, TimeUnit.SECONDS)


} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
followerClient.stopReplication(leaderIndexNameNew)
Expand Down Expand Up @@ -151,6 +159,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[leaderIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
)
followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask)
}, 15, TimeUnit.SECONDS)

} finally {
Expand Down Expand Up @@ -182,9 +191,12 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
try {
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)

// Assert that there is still only one index replication task
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
assertBusy({
// Assert that there is still only one index replication task
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask)
},30, TimeUnit.SECONDS)
} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
}
Expand Down Expand Up @@ -259,6 +271,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {

Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
followerClient.waitForShardTaskStart(leaderIndexName, waitForShardTask)
} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
}
Expand All @@ -269,6 +282,7 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
}, 30, TimeUnit.SECONDS)

Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
followerClient.stopReplication(leaderIndexName)
}

fun createRandomIndex(client: RestHighLevelClient): String {
Expand Down

0 comments on commit bce6f0f

Please sign in to comment.