Skip to content

Commit

Permalink
Modify autofollow retry scheduler logic check to account for complete…
Browse files Browse the repository at this point in the history
…d runs (#839)

Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon authored May 26, 2023
1 parent 3e3787d commit f5c94f7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.opensearch.tasks.TaskId
import org.opensearch.threadpool.Scheduler
import org.opensearch.threadpool.ThreadPool
import java.util.concurrent.ConcurrentSkipListSet
import java.util.concurrent.TimeUnit

class AutoFollowTask(id: Long, type: String, action: String, description: String, parentTask: TaskId,
headers: Map<String, String>,
Expand Down Expand Up @@ -91,7 +92,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String

private fun addRetryScheduler() {
log.debug("Adding retry scheduler")
if(retryScheduler != null && !retryScheduler!!.isCancelled) {
if(retryScheduler != null && retryScheduler!!.getDelay(TimeUnit.NANOSECONDS) > 0L) {
return
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.replication.task.index.IndexReplicationExecutor
import org.apache.hc.core5.http.HttpStatus
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
Expand All @@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.replication.AutoFollowStats
import org.opensearch.replication.ReplicationPlugin
import org.opensearch.replication.action.changes.TransportGetChangesAction
import org.opensearch.replication.updateReplicationStartBlockSetting
import org.opensearch.replication.updateAutofollowRetrySetting
import org.opensearch.replication.updateAutoFollowConcurrentStartReplicationJobSetting
Expand All @@ -62,6 +64,10 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
private val longIndexPatternName = "index_".repeat(43)
private val waitForShardTask = TimeValue.timeValueSeconds(10)

companion object {
private val log = LogManager.getLogger(UpdateAutoFollowPatternIT::class.java)
}

fun `test auto follow pattern`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand Down Expand Up @@ -315,36 +321,43 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
}

fun `test autofollow task with start replication block`() {
fun `test autofollow task with start replication block and retries`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias)
val leaderIndexName = createRandomIndex(leaderClient)
try {
//modify retry duration to account for autofollow trigger in next retry
followerClient.updateAutofollowRetrySetting("1m")
// Add replication start block
followerClient.updateReplicationStartBlockSetting(true)
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)
sleep(30000) // Default poll for auto follow in worst case
// verify both index replication tasks and autofollow tasks
// Replication shouldn't have been started - 0 tasks
// Autofollow task should still be up - 1 task
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(0)
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
for (repeat in 1..2) {
log.info("Current Iteration $repeat")
// Add replication start block
followerClient.updateReplicationStartBlockSetting(true)
createRandomIndex(leaderClient)
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)
sleep(95000) // wait for auto follow trigger in the worst case
// verify both index replication tasks and autofollow tasks
// Replication shouldn't have been started - (repeat-1) tasks as for current loop index shouldn't be
// created yet.
// Autofollow task should still be up - 1 task
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(repeat-1)
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)

var stats = followerClient.AutoFollowStats()
var failedIndices = stats["failed_indices"] as List<*>
assert(failedIndices.size == 1)
// Remove replication start block
followerClient.updateReplicationStartBlockSetting(false)
sleep(60000) // wait for auto follow trigger in the worst case
// Index should be replicated and autofollow task should be present
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
stats = followerClient.AutoFollowStats()
failedIndices = stats["failed_indices"] as List<*>
assert(failedIndices.isEmpty())
var stats = followerClient.AutoFollowStats()
var failedIndices = stats["failed_indices"] as List<*>
// Every time failed replication task will be 1 as
// there are already running jobs in the previous iteration
log.info("Current failed indices $failedIndices")
assert(failedIndices.size == 1)
// Remove replication start block
followerClient.updateReplicationStartBlockSetting(false)
sleep(95000) // wait for auto follow trigger in the worst case
// Index should be replicated and autofollow task should be present
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(repeat)
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
stats = followerClient.AutoFollowStats()
failedIndices = stats["failed_indices"] as List<*>
assert(failedIndices.isEmpty())
}
} finally {
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
}
Expand Down

0 comments on commit f5c94f7

Please sign in to comment.