From 2b569443f5995435baaffe3cae42bead69a3f9bb Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 2 Jun 2023 18:08:04 +0530 Subject: [PATCH] Remove any stale replication tasks from cluster state (#905) (#979) Clear stale replication tasks in STOP API Signed-off-by: monusingh-1 (cherry picked from commit bc9b61a4807e93d660d7b34cc12409fa6de778c7) Co-authored-by: Monu Singh --- .../TransportStopIndexReplicationAction.kt | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt index f8e6bf46..3e249f15 100644 --- a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt @@ -54,6 +54,8 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.settings.Settings import org.opensearch.replication.util.stackTraceToString +import org.opensearch.persistent.PersistentTasksCustomMetadata +import org.opensearch.persistent.RemovePersistentTaskAction import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService import java.io.IOException @@ -136,6 +138,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: } } replicationMetadataManager.deleteIndexReplicationMetadata(request.indexName) + removeStaleReplicationTasksFromClusterState(request) listener.onResponse(AcknowledgedResponse(true)) } catch (e: Exception) { log.error("Stop replication failed for index[${request.indexName}] with error ${e.stackTraceToString()}") @@ -144,6 +147,32 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: } } + private suspend fun removeStaleReplicationTasksFromClusterState(request: StopIndexReplicationRequest) { + try { + val allTasks: PersistentTasksCustomMetadata = + clusterService.state().metadata().custom(PersistentTasksCustomMetadata.TYPE) + for (singleTask in allTasks.tasks()) { + if (isReplicationTask(singleTask, request) && !singleTask.isAssigned){ + log.info("Removing task: ${singleTask.id} from cluster state") + val removeRequest: RemovePersistentTaskAction.Request = + RemovePersistentTaskAction.Request(singleTask.id) + client.suspendExecute(RemovePersistentTaskAction.INSTANCE, removeRequest) + } + } + } catch (e: Exception) { + log.info("Could not update cluster state") + } + } + + // Remove index replication task metadata, format replication:index:fruit-1 + // Remove shard replication task metadata, format replication:[fruit-1][0] + private fun isReplicationTask( + singleTask: PersistentTasksCustomMetadata.PersistentTask<*>, + request: StopIndexReplicationRequest + ) = singleTask.id.startsWith("replication:") && + (singleTask.id == "replication:index:${request.indexName}" || singleTask.id.split(":")[1].contains(request.indexName)) + + private fun validateReplicationStateOfIndex(request: StopIndexReplicationRequest) { // If replication blocks/settings are present, Stop action should proceed with the clean-up // This can happen during settings of follower index are carried over in the snapshot and the restore is @@ -153,6 +182,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: return } + //check for stale replication tasks + val allTasks: PersistentTasksCustomMetadata? = + clusterService.state()?.metadata()?.custom(PersistentTasksCustomMetadata.TYPE) + allTasks?.tasks()?.forEach{ + if (isReplicationTask(it, request) && !it.isAssigned){ + return + } + } + val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName) ?: throw IllegalArgumentException("No replication in progress for index:${request.indexName}")