From 9e06f40522f8c0fa006ac2b565e76d2e303f700d Mon Sep 17 00:00:00 2001 From: Sai Kumar Date: Wed, 31 May 2023 14:15:09 +0530 Subject: [PATCH] [Backport] Handle clean-up of stale index task during cancellation (#645) (#909) Signed-off-by: Sai Kumar --- .../task/index/IndexReplicationTask.kt | 52 +++++++++++++---- .../replication/ReplicationHelpers.kt | 2 +- .../task/index/IndexReplicationTaskTests.kt | 57 +++++++++++++++++++ 3 files changed, 98 insertions(+), 13 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 421699f8..2796eb43 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -156,7 +156,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript val blockListedSettings :Set = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet()) const val SLEEP_TIME_BETWEEN_POLL_MS = 5000L - const val TASK_CANCELLATION_REASON = "Index replication task was cancelled by user" + const val AUTOPAUSED_REASON_PREFIX = "AutoPaused: " + const val TASK_CANCELLATION_REASON = AUTOPAUSED_REASON_PREFIX + "Index replication task was cancelled by user" } @@ -263,13 +264,6 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } } - override fun onCancelled() { - log.info("Cancelling the index replication task.") - client.execute(PauseIndexReplicationAction.INSTANCE, - PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON)) - super.onCancelled() - } - private suspend fun failReplication(failedState: FailedState) { withContext(NonCancellable) { val reason = failedState.errorMsg @@ -313,6 +307,23 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript return MonitoringState } + fun isTrackingTaskForIndex(): Boolean { + val persistentTasks = clusterService.state().metadata.custom(PersistentTasksCustomMetadata.TYPE) + val runningTasksForIndex = persistentTasks.findTasks(IndexReplicationExecutor.TASK_NAME, Predicate { true }).stream() + .map { task -> task as PersistentTask } + .filter { task -> task.params!!.followerIndexName == followerIndexName} + .toArray() + assert(runningTasksForIndex.size <= 1) { "Found more than one running index task for index[$followerIndexName]" } + for (runningTask in runningTasksForIndex) { + val currentTask = runningTask as PersistentTask + log.info("Verifying task details - currentTask={isAssigned=${currentTask.isAssigned},executorNode=${currentTask.executorNode}}") + if(currentTask.isAssigned && currentTask.executorNode == clusterService.state().nodes.localNodeId) { + return true + } + } + return false + } + private fun isResumed(): Boolean { return clusterService.state().routingTable.hasIndex(followerIndexName) } @@ -651,7 +662,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript log.error("Going to initiate auto-pause of $followerIndexName due to shard failures - $state") val pauseReplicationResponse = client.suspendExecute( replicationMetadata, - PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "AutoPaused: ${state.errorMsg}"), + PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "$AUTOPAUSED_REASON_PREFIX + ${state.errorMsg}"), defaultContext = true ) if (!pauseReplicationResponse.isAcknowledged) { @@ -688,10 +699,27 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } override suspend fun cleanup() { - if (currentTaskState.state == ReplicationState.RESTORING) { - log.info("Replication stopped before restore could finish, so removing partial restore..") - cancelRestore() + // If the task is already running on the other node, + // OpenSearch persistent task framework cancels any stale tasks on the old nodes. + // Currently, we don't have view on the cancellation reason. Before triggering + // any further actions on the index from this task, verify that, this is the actual task tracking the index. + // - stale task during cancellation shouldn't trigger further actions. + if(isTrackingTaskForIndex()) { + if (currentTaskState.state == ReplicationState.RESTORING) { + log.info("Replication stopped before restore could finish, so removing partial restore..") + cancelRestore() + } + + // if cancelled and not in paused state. + val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerIndexName) + if(isCancelled && replicationStateParams != null + && replicationStateParams[REPLICATION_LAST_KNOWN_OVERALL_STATE] == ReplicationOverallState.RUNNING.name) { + log.info("Task is cancelled. Moving the index to auto-pause state") + client.execute(PauseIndexReplicationAction.INSTANCE, + PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON)) + } } + /* This is to minimise overhead of calling an additional listener as * it continues to be called even after the task is completed. */ diff --git a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt index 3fe4e11f..f5c9ff24 100644 --- a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt @@ -51,7 +51,7 @@ const val REST_REPLICATION_TASKS = "_tasks?actions=*replication*&detailed&pretty const val REST_LEADER_STATS = "${REST_REPLICATION_PREFIX}leader_stats" const val REST_FOLLOWER_STATS = "${REST_REPLICATION_PREFIX}follower_stats" const val REST_AUTO_FOLLOW_STATS = "${REST_REPLICATION_PREFIX}autofollow_stats" -const val INDEX_TASK_CANCELLATION_REASON = "Index replication task was cancelled by user" +const val INDEX_TASK_CANCELLATION_REASON = "AutoPaused: Index replication task was cancelled by user" const val STATUS_REASON_USER_INITIATED = "User initiated" const val STATUS_REASON_SHARD_TASK_CANCELLED = "Shard task killed or cancelled." const val STATUS_REASON_INDEX_NOT_FOUND = "no such index" diff --git a/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt index 083533d9..2032cc26 100644 --- a/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt @@ -26,6 +26,8 @@ import org.opensearch.cluster.ClusterStateObserver import org.opensearch.cluster.RestoreInProgress import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.Metadata +import org.opensearch.cluster.node.DiscoveryNode +import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.routing.RoutingTable import org.opensearch.common.settings.Settings import org.opensearch.common.settings.SettingsModule @@ -209,6 +211,61 @@ class IndexReplicationTaskTests : OpenSearchTestCase() { assertThat(shardTasks.size == 2).isTrue } + fun testIsTrackingTaskForIndex() = runBlocking { + val replicationTask: IndexReplicationTask = spy(createIndexReplicationTask()) + var taskManager = Mockito.mock(TaskManager::class.java) + replicationTask.setPersistent(taskManager) + var rc = ReplicationContext(followerIndex) + var rm = ReplicationMetadata(connectionName, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "reason", rc, rc, Settings.EMPTY) + replicationTask.setReplicationMetadata(rm) + + // when index replication task is valid + var tasks = PersistentTasksCustomMetadata.builder() + var leaderIndex = Index(followerIndex, "_na_") + tasks.addTask( "replication:0", IndexReplicationExecutor.TASK_NAME, IndexReplicationParams("remoteCluster", leaderIndex, followerIndex), + PersistentTasksCustomMetadata.Assignment("same_node", "test assignment on other node")) + + var metadata = Metadata.builder() + .put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()) + .build() + var routingTableBuilder = RoutingTable.builder() + .addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX)) + .addAsNew(metadata.index(followerIndex)) + var discoveryNodesBuilder = DiscoveryNodes.Builder() + .localNodeId("same_node") + var newClusterState = ClusterState.builder(clusterService.state()) + .metadata(metadata) + .routingTable(routingTableBuilder.build()) + .nodes(discoveryNodesBuilder.build()).build() + setState(clusterService, newClusterState) + assertThat(replicationTask.isTrackingTaskForIndex()).isTrue + + // when index replication task is not valid + tasks = PersistentTasksCustomMetadata.builder() + leaderIndex = Index(followerIndex, "_na_") + tasks.addTask( "replication:0", IndexReplicationExecutor.TASK_NAME, IndexReplicationParams("remoteCluster", leaderIndex, followerIndex), + PersistentTasksCustomMetadata.Assignment("other_node", "test assignment on other node")) + + metadata = Metadata.builder() + .put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()) + .build() + routingTableBuilder = RoutingTable.builder() + .addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX)) + .addAsNew(metadata.index(followerIndex)) + discoveryNodesBuilder = DiscoveryNodes.Builder() + .localNodeId("same_node") + newClusterState = ClusterState.builder(clusterService.state()) + .metadata(metadata) + .routingTable(routingTableBuilder.build()) + .nodes(discoveryNodesBuilder.build()).build() + setState(clusterService, newClusterState) + assertThat(replicationTask.isTrackingTaskForIndex()).isFalse + } + private fun createIndexReplicationTask() : IndexReplicationTask { var threadPool = TestThreadPool("IndexReplicationTask") //Hack Alert : Though it is meant to force rejection , this is to make overallTaskScope not null