Skip to content

Commit

Permalink
[Backport] Handle clean-up of stale index task during cancellation (o…
Browse files Browse the repository at this point in the history
…pensearch-project#645) (opensearch-project#909) (opensearch-project#913)

Signed-off-by: Sai Kumar <[email protected]>
(cherry picked from commit 9e06f40)

Co-authored-by: Sai Kumar <[email protected]>
  • Loading branch information
1 parent 7a981ae commit 0757b62
Show file tree
Hide file tree
Showing 3 changed files with 346 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
val blockListedSettings :Set<String> = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet())

const val SLEEP_TIME_BETWEEN_POLL_MS = 5000L
const val TASK_CANCELLATION_REASON = "Index replication task was cancelled by user"
const val AUTOPAUSED_REASON_PREFIX = "AutoPaused: "
const val TASK_CANCELLATION_REASON = AUTOPAUSED_REASON_PREFIX + "Index replication task was cancelled by user"

}


Expand Down Expand Up @@ -260,12 +262,6 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
}
}

override fun onCancelled() {
log.info("Cancelling the index replication task.")
client.execute(PauseIndexReplicationAction.INSTANCE,
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
super.onCancelled()
}
private suspend fun failReplication(failedState: FailedState) {
withContext(NonCancellable) {
val reason = failedState.errorMsg
Expand Down Expand Up @@ -309,6 +305,23 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
return MonitoringState
}

fun isTrackingTaskForIndex(): Boolean {
val persistentTasks = clusterService.state().metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)
val runningTasksForIndex = persistentTasks.findTasks(IndexReplicationExecutor.TASK_NAME, Predicate { true }).stream()
.map { task -> task as PersistentTask<IndexReplicationParams> }
.filter { task -> task.params!!.followerIndexName == followerIndexName}
.toArray()
assert(runningTasksForIndex.size <= 1) { "Found more than one running index task for index[$followerIndexName]" }
for (runningTask in runningTasksForIndex) {
val currentTask = runningTask as PersistentTask<IndexReplicationParams>
log.info("Verifying task details - currentTask={isAssigned=${currentTask.isAssigned},executorNode=${currentTask.executorNode}}")
if(currentTask.isAssigned && currentTask.executorNode == clusterService.state().nodes.localNodeId) {
return true
}
}
return false
}

private fun isResumed(): Boolean {
return clusterService.state().routingTable.hasIndex(followerIndexName)
}
Expand Down Expand Up @@ -649,7 +662,7 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
log.info("Going to initiate auto-pause of $followerIndexName due to shard failures - $state")
val pauseReplicationResponse = client.suspendExecute(
replicationMetadata,
PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "AutoPaused: ${state.errorMsg}"),
PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "$AUTOPAUSED_REASON_PREFIX + ${state.errorMsg}"),
defaultContext = true
)
if (!pauseReplicationResponse.isAcknowledged) {
Expand Down Expand Up @@ -686,10 +699,27 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
}

override suspend fun cleanup() {
if (currentTaskState.state == ReplicationState.RESTORING) {
log.info("Replication stopped before restore could finish, so removing partial restore..")
cancelRestore()
// If the task is already running on the other node,
// OpenSearch persistent task framework cancels any stale tasks on the old nodes.
// Currently, we don't have view on the cancellation reason. Before triggering
// any further actions on the index from this task, verify that, this is the actual task tracking the index.
// - stale task during cancellation shouldn't trigger further actions.
if(isTrackingTaskForIndex()) {
if (currentTaskState.state == ReplicationState.RESTORING) {
log.info("Replication stopped before restore could finish, so removing partial restore..")
cancelRestore()
}

// if cancelled and not in paused state.
val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerIndexName)
if(isCancelled && replicationStateParams != null
&& replicationStateParams[REPLICATION_LAST_KNOWN_OVERALL_STATE] == ReplicationOverallState.RUNNING.name) {
log.info("Task is cancelled. Moving the index to auto-pause state")
client.execute(PauseIndexReplicationAction.INSTANCE,
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
}
}

/* This is to minimise overhead of calling an additional listener as
* it continues to be called even after the task is completed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const val REST_REPLICATION_TASKS = "_tasks?actions=*replication*&detailed&pretty
const val REST_LEADER_STATS = "${REST_REPLICATION_PREFIX}leader_stats"
const val REST_FOLLOWER_STATS = "${REST_REPLICATION_PREFIX}follower_stats"
const val REST_AUTO_FOLLOW_STATS = "${REST_REPLICATION_PREFIX}autofollow_stats"
const val INDEX_TASK_CANCELLATION_REASON = "Index replication task was cancelled by user"
const val INDEX_TASK_CANCELLATION_REASON = "AutoPaused: Index replication task was cancelled by user"
const val STATUS_REASON_USER_INITIATED = "User initiated"
const val STATUS_REASON_SHARD_TASK_CANCELLED = "Shard task killed or cancelled."
const val STATUS_REASON_INDEX_NOT_FOUND = "no such index"
Expand Down
Loading

0 comments on commit 0757b62

Please sign in to comment.