Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.3] [Backport] Handle clean-up of stale index task during cancellation (#645) #916

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
val blockListedSettings :Set<String> = blSettings.stream().map { k -> k.key }.collect(Collectors.toSet())

const val SLEEP_TIME_BETWEEN_POLL_MS = 5000L
const val TASK_CANCELLATION_REASON = "Index replication task was cancelled by user"
const val AUTOPAUSED_REASON_PREFIX = "AutoPaused: "
const val TASK_CANCELLATION_REASON = AUTOPAUSED_REASON_PREFIX + "Index replication task was cancelled by user"

}

Expand Down Expand Up @@ -263,13 +264,6 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
}
}

override fun onCancelled() {
log.info("Cancelling the index replication task.")
client.execute(PauseIndexReplicationAction.INSTANCE,
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
super.onCancelled()
}

private suspend fun failReplication(failedState: FailedState) {
withContext(NonCancellable) {
val reason = failedState.errorMsg
Expand Down Expand Up @@ -313,6 +307,23 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
return MonitoringState
}

fun isTrackingTaskForIndex(): Boolean {
val persistentTasks = clusterService.state().metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)
val runningTasksForIndex = persistentTasks.findTasks(IndexReplicationExecutor.TASK_NAME, Predicate { true }).stream()
.map { task -> task as PersistentTask<IndexReplicationParams> }
.filter { task -> task.params!!.followerIndexName == followerIndexName}
.toArray()
assert(runningTasksForIndex.size <= 1) { "Found more than one running index task for index[$followerIndexName]" }
for (runningTask in runningTasksForIndex) {
val currentTask = runningTask as PersistentTask<IndexReplicationParams>
log.info("Verifying task details - currentTask={isAssigned=${currentTask.isAssigned},executorNode=${currentTask.executorNode}}")
if(currentTask.isAssigned && currentTask.executorNode == clusterService.state().nodes.localNodeId) {
return true
}
}
return false
}

private fun isResumed(): Boolean {
return clusterService.state().routingTable.hasIndex(followerIndexName)
}
Expand Down Expand Up @@ -651,7 +662,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
log.error("Going to initiate auto-pause of $followerIndexName due to shard failures - $state")
val pauseReplicationResponse = client.suspendExecute(
replicationMetadata,
PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "AutoPaused: ${state.errorMsg}"),
PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, "$AUTOPAUSED_REASON_PREFIX + ${state.errorMsg}"),
defaultContext = true
)
if (!pauseReplicationResponse.isAcknowledged) {
Expand Down Expand Up @@ -688,10 +699,27 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
}

override suspend fun cleanup() {
if (currentTaskState.state == ReplicationState.RESTORING) {
log.info("Replication stopped before restore could finish, so removing partial restore..")
cancelRestore()
// If the task is already running on the other node,
// OpenSearch persistent task framework cancels any stale tasks on the old nodes.
// Currently, we don't have view on the cancellation reason. Before triggering
// any further actions on the index from this task, verify that, this is the actual task tracking the index.
// - stale task during cancellation shouldn't trigger further actions.
if(isTrackingTaskForIndex()) {
if (currentTaskState.state == ReplicationState.RESTORING) {
log.info("Replication stopped before restore could finish, so removing partial restore..")
cancelRestore()
}

// if cancelled and not in paused state.
val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerIndexName)
if(isCancelled && replicationStateParams != null
&& replicationStateParams[REPLICATION_LAST_KNOWN_OVERALL_STATE] == ReplicationOverallState.RUNNING.name) {
log.info("Task is cancelled. Moving the index to auto-pause state")
client.execute(PauseIndexReplicationAction.INSTANCE,
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
}
}

/* This is to minimise overhead of calling an additional listener as
* it continues to be called even after the task is completed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const val REST_REPLICATION_TASKS = "_tasks?actions=*replication*&detailed&pretty
const val REST_LEADER_STATS = "${REST_REPLICATION_PREFIX}leader_stats"
const val REST_FOLLOWER_STATS = "${REST_REPLICATION_PREFIX}follower_stats"
const val REST_AUTO_FOLLOW_STATS = "${REST_REPLICATION_PREFIX}autofollow_stats"
const val INDEX_TASK_CANCELLATION_REASON = "Index replication task was cancelled by user"
const val INDEX_TASK_CANCELLATION_REASON = "AutoPaused: Index replication task was cancelled by user"
const val STATUS_REASON_USER_INITIATED = "User initiated"
const val STATUS_REASON_SHARD_TASK_CANCELLED = "Shard task killed or cancelled."
const val STATUS_REASON_INDEX_NOT_FOUND = "no such index"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.opensearch.cluster.ClusterStateObserver
import org.opensearch.cluster.RestoreInProgress
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.routing.RoutingTable
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsModule
Expand Down Expand Up @@ -209,6 +211,61 @@ class IndexReplicationTaskTests : OpenSearchTestCase() {
assertThat(shardTasks.size == 2).isTrue
}

fun testIsTrackingTaskForIndex() = runBlocking {
val replicationTask: IndexReplicationTask = spy(createIndexReplicationTask())
var taskManager = Mockito.mock(TaskManager::class.java)
replicationTask.setPersistent(taskManager)
var rc = ReplicationContext(followerIndex)
var rm = ReplicationMetadata(connectionName, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "reason", rc, rc, Settings.EMPTY)
replicationTask.setReplicationMetadata(rm)

// when index replication task is valid
var tasks = PersistentTasksCustomMetadata.builder()
var leaderIndex = Index(followerIndex, "_na_")
tasks.addTask<PersistentTaskParams>( "replication:0", IndexReplicationExecutor.TASK_NAME, IndexReplicationParams("remoteCluster", leaderIndex, followerIndex),
PersistentTasksCustomMetadata.Assignment("same_node", "test assignment on other node"))

var metadata = Metadata.builder()
.put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build())
.build()
var routingTableBuilder = RoutingTable.builder()
.addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX))
.addAsNew(metadata.index(followerIndex))
var discoveryNodesBuilder = DiscoveryNodes.Builder()
.localNodeId("same_node")
var newClusterState = ClusterState.builder(clusterService.state())
.metadata(metadata)
.routingTable(routingTableBuilder.build())
.nodes(discoveryNodesBuilder.build()).build()
setState(clusterService, newClusterState)
assertThat(replicationTask.isTrackingTaskForIndex()).isTrue

// when index replication task is not valid
tasks = PersistentTasksCustomMetadata.builder()
leaderIndex = Index(followerIndex, "_na_")
tasks.addTask<PersistentTaskParams>( "replication:0", IndexReplicationExecutor.TASK_NAME, IndexReplicationParams("remoteCluster", leaderIndex, followerIndex),
PersistentTasksCustomMetadata.Assignment("other_node", "test assignment on other node"))

metadata = Metadata.builder()
.put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build())
.build()
routingTableBuilder = RoutingTable.builder()
.addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX))
.addAsNew(metadata.index(followerIndex))
discoveryNodesBuilder = DiscoveryNodes.Builder()
.localNodeId("same_node")
newClusterState = ClusterState.builder(clusterService.state())
.metadata(metadata)
.routingTable(routingTableBuilder.build())
.nodes(discoveryNodesBuilder.build()).build()
setState(clusterService, newClusterState)
assertThat(replicationTask.isTrackingTaskForIndex()).isFalse
}

private fun createIndexReplicationTask() : IndexReplicationTask {
var threadPool = TestThreadPool("IndexReplicationTask")
//Hack Alert : Though it is meant to force rejection , this is to make overallTaskScope not null
Expand Down