diff --git a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt index 1ac2f7de..e2c81b5f 100644 --- a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt @@ -45,6 +45,7 @@ import org.opensearch.tasks.TaskId import org.opensearch.threadpool.Scheduler import org.opensearch.threadpool.ThreadPool import java.util.concurrent.ConcurrentSkipListSet +import java.util.concurrent.TimeUnit class AutoFollowTask(id: Long, type: String, action: String, description: String, parentTask: TaskId, headers: Map, @@ -91,7 +92,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String private fun addRetryScheduler() { log.debug("Adding retry scheduler") - if(retryScheduler != null && !retryScheduler!!.isCancelled) { + if(retryScheduler != null && retryScheduler!!.getDelay(TimeUnit.NANOSECONDS) > 0L) { return } try { diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt index 9f12bbfb..13168a6a 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt @@ -23,6 +23,7 @@ import org.opensearch.replication.task.index.IndexReplicationExecutor import org.apache.http.HttpStatus import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity +import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest @@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.MetadataCreateIndexService import org.opensearch.replication.AutoFollowStats import org.opensearch.replication.ReplicationPlugin +import org.opensearch.replication.action.changes.TransportGetChangesAction import org.opensearch.replication.updateReplicationStartBlockSetting import org.opensearch.replication.updateAutofollowRetrySetting import org.opensearch.replication.updateAutoFollowConcurrentStartReplicationJobSetting @@ -63,6 +65,10 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { private val longIndexPatternName = "index_".repeat(43) private val waitForShardTask = TimeValue.timeValueSeconds(10) + companion object { + private val log = LogManager.getLogger(UpdateAutoFollowPatternIT::class.java) + } + fun `test auto follow pattern`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) @@ -316,36 +322,43 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() { Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) } - fun `test autofollow task with start replication block`() { + fun `test autofollow task with start replication block and retries`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias) - val leaderIndexName = createRandomIndex(leaderClient) try { //modify retry duration to account for autofollow trigger in next retry followerClient.updateAutofollowRetrySetting("1m") - // Add replication start block - followerClient.updateReplicationStartBlockSetting(true) - followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) - sleep(30000) // Default poll for auto follow in worst case - // verify both index replication tasks and autofollow tasks - // Replication shouldn't have been started - 0 tasks - // Autofollow task should still be up - 1 task - Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(0) - Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) + for (repeat in 1..2) { + log.info("Current Iteration $repeat") + // Add replication start block + followerClient.updateReplicationStartBlockSetting(true) + createRandomIndex(leaderClient) + followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern) + sleep(95000) // wait for auto follow trigger in the worst case + // verify both index replication tasks and autofollow tasks + // Replication shouldn't have been started - (repeat-1) tasks as for current loop index shouldn't be + // created yet. + // Autofollow task should still be up - 1 task + Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(repeat-1) + Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) - var stats = followerClient.AutoFollowStats() - var failedIndices = stats["failed_indices"] as List<*> - assert(failedIndices.size == 1) - // Remove replication start block - followerClient.updateReplicationStartBlockSetting(false) - sleep(60000) // wait for auto follow trigger in the worst case - // Index should be replicated and autofollow task should be present - Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1) - Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) - stats = followerClient.AutoFollowStats() - failedIndices = stats["failed_indices"] as List<*> - assert(failedIndices.isEmpty()) + var stats = followerClient.AutoFollowStats() + var failedIndices = stats["failed_indices"] as List<*> + // Every time failed replication task will be 1 as + // there are already running jobs in the previous iteration + log.info("Current failed indices $failedIndices") + assert(failedIndices.size == 1) + // Remove replication start block + followerClient.updateReplicationStartBlockSetting(false) + sleep(95000) // wait for auto follow trigger in the worst case + // Index should be replicated and autofollow task should be present + Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(repeat) + Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1) + stats = followerClient.AutoFollowStats() + failedIndices = stats["failed_indices"] as List<*> + assert(failedIndices.isEmpty()) + } } finally { followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName) }