Skip to content

Commit

Permalink
[Backport] Handle clean-up of stale index task during cancellation (#645
Browse files Browse the repository at this point in the history
) (#909) (#918)

Signed-off-by: Sai Kumar <[email protected]>
(cherry picked from commit 9e06f40)

Co-authored-by: Sai Kumar <[email protected]>
  • Loading branch information
1 parent 0721bcc commit 5abfc6e
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
val blockListedSettings :Set<String> = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet())

const val SLEEP_TIME_BETWEEN_POLL_MS = 5000L
const val TASK_CANCELLATION_REASON = "Index replication task was cancelled by user"
const val AUTOPAUSED_REASON_PREFIX = "AutoPaused: "
const val TASK_CANCELLATION_REASON = AUTOPAUSED_REASON_PREFIX + "Index replication task was cancelled by user"

}

Expand Down Expand Up @@ -263,13 +264,6 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
}
}

override fun onCancelled() {
log.info("Cancelling the index replication task.")
client.execute(PauseIndexReplicationAction.INSTANCE,
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
super.onCancelled()
}

private suspend fun failReplication(failedState: FailedState) {
withContext(NonCancellable) {
val reason = failedState.errorMsg
Expand Down Expand Up @@ -313,6 +307,23 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
return MonitoringState
}

fun isTrackingTaskForIndex(): Boolean {
val persistentTasks = clusterService.state().metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)
val runningTasksForIndex = persistentTasks.findTasks(IndexReplicationExecutor.TASK_NAME, Predicate { true }).stream()
.map { task -> task as PersistentTask<IndexReplicationParams> }
.filter { task -> task.params!!.followerIndexName == followerIndexName}
.toArray()
assert(runningTasksForIndex.size <= 1) { "Found more than one running index task for index[$followerIndexName]" }
for (runningTask in runningTasksForIndex) {
val currentTask = runningTask as PersistentTask<IndexReplicationParams>
log.info("Verifying task details - currentTask={isAssigned=${currentTask.isAssigned},executorNode=${currentTask.executorNode}}")
if(currentTask.isAssigned && currentTask.executorNode == clusterService.state().nodes.localNodeId) {
return true
}
}
return false
}

private fun isResumed(): Boolean {
return clusterService.state().routingTable.hasIndex(followerIndexName)
}
Expand Down Expand Up @@ -651,7 +662,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
log.error("Going to initiate auto-pause of $followerIndexName due to shard failures - $state")
val pauseReplicationResponse = client.suspendExecute(
replicationMetadata,
PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "AutoPaused: ${state.errorMsg}"),
PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "$AUTOPAUSED_REASON_PREFIX + ${state.errorMsg}"),
defaultContext = true
)
if (!pauseReplicationResponse.isAcknowledged) {
Expand Down Expand Up @@ -688,10 +699,27 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
}

override suspend fun cleanup() {
if (currentTaskState.state == ReplicationState.RESTORING) {
log.info("Replication stopped before restore could finish, so removing partial restore..")
cancelRestore()
// If the task is already running on the other node,
// OpenSearch persistent task framework cancels any stale tasks on the old nodes.
// Currently, we don't have view on the cancellation reason. Before triggering
// any further actions on the index from this task, verify that, this is the actual task tracking the index.
// - stale task during cancellation shouldn't trigger further actions.
if(isTrackingTaskForIndex()) {
if (currentTaskState.state == ReplicationState.RESTORING) {
log.info("Replication stopped before restore could finish, so removing partial restore..")
cancelRestore()
}

// if cancelled and not in paused state.
val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerIndexName)
if(isCancelled && replicationStateParams != null
&& replicationStateParams[REPLICATION_LAST_KNOWN_OVERALL_STATE] == ReplicationOverallState.RUNNING.name) {
log.info("Task is cancelled. Moving the index to auto-pause state")
client.execute(PauseIndexReplicationAction.INSTANCE,
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
}
}

/* This is to minimise overhead of calling an additional listener as
* it continues to be called even after the task is completed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const val REST_REPLICATION_TASKS = "_tasks?actions=*replication*&detailed&pretty
const val REST_LEADER_STATS = "${REST_REPLICATION_PREFIX}leader_stats"
const val REST_FOLLOWER_STATS = "${REST_REPLICATION_PREFIX}follower_stats"
const val REST_AUTO_FOLLOW_STATS = "${REST_REPLICATION_PREFIX}autofollow_stats"
const val INDEX_TASK_CANCELLATION_REASON = "Index replication task was cancelled by user"
const val INDEX_TASK_CANCELLATION_REASON = "AutoPaused: Index replication task was cancelled by user"
const val STATUS_REASON_USER_INITIATED = "User initiated"
const val STATUS_REASON_SHARD_TASK_CANCELLED = "Shard task killed or cancelled."
const val STATUS_REASON_INDEX_NOT_FOUND = "no such index"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.opensearch.cluster.ClusterStateObserver
import org.opensearch.cluster.RestoreInProgress
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.routing.RoutingTable
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsModule
Expand Down Expand Up @@ -209,6 +211,61 @@ class IndexReplicationTaskTests : OpenSearchTestCase() {
assertThat(shardTasks.size == 2).isTrue
}

fun testIsTrackingTaskForIndex() = runBlocking {
val replicationTask: IndexReplicationTask = spy(createIndexReplicationTask())
var taskManager = Mockito.mock(TaskManager::class.java)
replicationTask.setPersistent(taskManager)
var rc = ReplicationContext(followerIndex)
var rm = ReplicationMetadata(connectionName, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "reason", rc, rc, Settings.EMPTY)
replicationTask.setReplicationMetadata(rm)

// when index replication task is valid
var tasks = PersistentTasksCustomMetadata.builder()
var leaderIndex = Index(followerIndex, "_na_")
tasks.addTask<PersistentTaskParams>( "replication:0", IndexReplicationExecutor.TASK_NAME, IndexReplicationParams("remoteCluster", leaderIndex, followerIndex),
PersistentTasksCustomMetadata.Assignment("same_node", "test assignment on other node"))

var metadata = Metadata.builder()
.put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build())
.build()
var routingTableBuilder = RoutingTable.builder()
.addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX))
.addAsNew(metadata.index(followerIndex))
var discoveryNodesBuilder = DiscoveryNodes.Builder()
.localNodeId("same_node")
var newClusterState = ClusterState.builder(clusterService.state())
.metadata(metadata)
.routingTable(routingTableBuilder.build())
.nodes(discoveryNodesBuilder.build()).build()
setState(clusterService, newClusterState)
assertThat(replicationTask.isTrackingTaskForIndex()).isTrue

// when index replication task is not valid
tasks = PersistentTasksCustomMetadata.builder()
leaderIndex = Index(followerIndex, "_na_")
tasks.addTask<PersistentTaskParams>( "replication:0", IndexReplicationExecutor.TASK_NAME, IndexReplicationParams("remoteCluster", leaderIndex, followerIndex),
PersistentTasksCustomMetadata.Assignment("other_node", "test assignment on other node"))

metadata = Metadata.builder()
.put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build())
.build()
routingTableBuilder = RoutingTable.builder()
.addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX))
.addAsNew(metadata.index(followerIndex))
discoveryNodesBuilder = DiscoveryNodes.Builder()
.localNodeId("same_node")
newClusterState = ClusterState.builder(clusterService.state())
.metadata(metadata)
.routingTable(routingTableBuilder.build())
.nodes(discoveryNodesBuilder.build()).build()
setState(clusterService, newClusterState)
assertThat(replicationTask.isTrackingTaskForIndex()).isFalse
}

private fun createIndexReplicationTask() : IndexReplicationTask {
var threadPool = TestThreadPool("IndexReplicationTask")
//Hack Alert : Though it is meant to force rejection , this is to make overallTaskScope not null
Expand Down

0 comments on commit 5abfc6e

Please sign in to comment.