From 48ccda6fcc2c795c7a002482688e5c2004f1338c Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 2 Jun 2023 18:01:38 +0530 Subject: [PATCH] Remove any stale replication tasks from cluster state (#905) (#981) --- .../TransportStopIndexReplicationAction.kt | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt index 8f7ff425..8a6fdf71 100644 --- a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt @@ -54,6 +54,8 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.settings.Settings import org.opensearch.replication.util.stackTraceToString +import org.opensearch.persistent.PersistentTasksCustomMetadata +import org.opensearch.persistent.RemovePersistentTaskAction import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService import java.io.IOException @@ -136,6 +138,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: } } replicationMetadataManager.deleteIndexReplicationMetadata(request.indexName) + removeStaleReplicationTasksFromClusterState(request) listener.onResponse(AcknowledgedResponse(true)) } catch (e: Exception) { log.error("Stop replication failed for index[${request.indexName}] with error ${e.stackTraceToString()}") @@ -144,6 +147,32 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: } } + private suspend fun removeStaleReplicationTasksFromClusterState(request: StopIndexReplicationRequest) { + try { + val allTasks: PersistentTasksCustomMetadata = + clusterService.state().metadata().custom(PersistentTasksCustomMetadata.TYPE) + for (singleTask in allTasks.tasks()) { + if (isReplicationTask(singleTask, request) && !singleTask.isAssigned){ + log.info("Removing task: ${singleTask.id} from cluster state") + val removeRequest: RemovePersistentTaskAction.Request = + RemovePersistentTaskAction.Request(singleTask.id) + client.suspendExecute(RemovePersistentTaskAction.INSTANCE, removeRequest) + } + } + } catch (e: Exception) { + log.info("Could not update cluster state") + } + } + + // Remove index replication task metadata, format replication:index:fruit-1 + // Remove shard replication task metadata, format replication:[fruit-1][0] + private fun isReplicationTask( + singleTask: PersistentTasksCustomMetadata.PersistentTask<*>, + request: StopIndexReplicationRequest + ) = singleTask.id.startsWith("replication:") && + (singleTask.id == "replication:index:${request.indexName}" || singleTask.id.split(":")[1].contains(request.indexName)) + + private fun validateReplicationStateOfIndex(request: StopIndexReplicationRequest) { // If replication blocks/settings are present, Stop action should proceed with the clean-up // This can happen during settings of follower index are carried over in the snapshot and the restore is @@ -153,6 +182,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: return } + //check for stale replication tasks + val allTasks: PersistentTasksCustomMetadata? = + clusterService.state()?.metadata()?.custom(PersistentTasksCustomMetadata.TYPE) + allTasks?.tasks()?.forEach{ + if (isReplicationTask(it, request) && !it.isAssigned){ + return + } + } + val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName) ?: throw IllegalArgumentException("No replication in progress for index:${request.indexName}")