Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully fail replication on bootstrap failure. #166

Merged
merged 1 commit into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class TransportPauseIndexReplicationAction @Inject constructor(transportService:
if (replicationOverallState == ReplicationOverallState.PAUSED.name)
throw ResourceAlreadyExistsException("Index ${request.indexName} is already paused")
else if (replicationOverallState != ReplicationOverallState.RUNNING.name)
throw IllegalStateException("Unknown value of replication state:$replicationOverallState")
throw IllegalStateException("Cannot pause when in $replicationOverallState state")
}

override fun executor(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class TransportReplicationStatusAction @Inject constructor(transportService: Tra
listener.completeWith {
try {
val metadata = replicationMetadataManager.getIndexReplicationMetadata(request!!.indices()[0])
val remoteClient = client.getRemoteClusterClient(metadata.connectionName)
var status = if (metadata.overallState.isNullOrEmpty()) "STOPPED" else metadata.overallState
var reason = metadata.reason
if (!status.equals("RUNNING")) {
Expand All @@ -62,6 +61,7 @@ class TransportReplicationStatusAction @Inject constructor(transportService: Tra
}
var followerResponse = client.suspendExecute(ShardsInfoAction.INSTANCE,
ShardInfoRequest(metadata.followerContext.resource),true)
val remoteClient = client.getRemoteClusterClient(metadata.connectionName)
var leaderResponse = remoteClient.suspendExecute(ShardsInfoAction.INSTANCE,
ShardInfoRequest(metadata.leaderContext.resource),true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,17 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
try {
log.info("Stopping index replication on index:" + request.indexName)
val isPaused = validateStopReplicationRequest(request)

// NOTE: We remove the block first before validation since it is harmless idempotent operations and
// gives back control of the index even if any failure happens in one of the steps post this.
val updateIndexBlockRequest = UpdateIndexBlockRequest(request.indexName,IndexBlockUpdateType.REMOVE_BLOCK)
val updateIndexBlockResponse = client.suspendExecute(UpdateIndexBlockAction.INSTANCE, updateIndexBlockRequest, injectSecurityContext = true)
if(!updateIndexBlockResponse.isAcknowledged) {
throw OpenSearchException("Failed to remove index block on ${request.indexName}")
}

val isPaused = validateStopReplicationRequest(request)

// Index will be deleted if replication is stopped while it is restoring. So no need to close/reopen
val restoring = clusterService.state().custom<RestoreInProgress>(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).any { entry ->
entry.indices().any { it == request.indexName }
Expand Down Expand Up @@ -178,10 +181,12 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
?:
throw IllegalArgumentException("No replication in progress for index:${request.indexName}")
val replicationOverallState = replicationStateParams[REPLICATION_LAST_KNOWN_OVERALL_STATE]
if (replicationOverallState == ReplicationOverallState.RUNNING.name)
return false
else if (replicationOverallState == ReplicationOverallState.PAUSED.name)
if (replicationOverallState == ReplicationOverallState.PAUSED.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on previous logic, this should return true in "Pause" state?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just realized and updated.

return true
else if (replicationOverallState == ReplicationOverallState.RUNNING.name ||
replicationOverallState == ReplicationOverallState.STOPPED.name ||
replicationOverallState == ReplicationOverallState.FAILED.name)
return false
throw IllegalStateException("Unknown value of replication state:$replicationOverallState")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TransportUpdateIndexReplicationAction @Inject constructor(transportService
if (replicationOverallState == ReplicationOverallState.RUNNING.name || replicationOverallState == ReplicationOverallState.PAUSED.name)
return

throw IllegalStateException("Unknown value of replication state:$replicationOverallState")
throw IllegalStateException("Cannot update settings when in $replicationOverallState state")
}

override fun executor(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.replication.task.index

import org.opensearch.replication.ReplicationException
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATED_INDEX_SETTING
import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.action.index.block.IndexBlockUpdateType
Expand Down Expand Up @@ -49,7 +48,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchTimeoutException
import org.opensearch.ResourceNotFoundException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions
Expand Down Expand Up @@ -91,6 +89,7 @@ import org.opensearch.persistent.PersistentTasksService
import org.opensearch.tasks.TaskId
import org.opensearch.tasks.TaskManager
import org.opensearch.threadpool.ThreadPool
import java.util.Collections
import java.util.function.Predicate
import java.util.stream.Collectors
import kotlin.coroutines.resume
Expand Down Expand Up @@ -760,15 +759,15 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
if (doesValidIndexExists()) {
return InitFollowState
} else {
throw ResourceNotFoundException("""
Unable to find in progress restore for remote index: $leaderAlias:$leaderIndex.
return FailedState(Collections.emptyMap(), """
Unable to find in progress restore for remote index: $leaderAlias:$leaderIndex.
This can happen if there was a badly timed master node failure.""".trimIndent())
}
} else if (restore.state() == RestoreInProgress.State.FAILURE) {
val failureReason = restore.shards().values().find {
it.value.state() == RestoreInProgress.State.FAILURE
}!!.value.reason()
throw org.opensearch.replication.ReplicationException("Remote restore failed: $failureReason")
return FailedState(Collections.emptyMap(), failureReason)
} else {
return InitFollowState
}
Expand Down Expand Up @@ -868,4 +867,4 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript

}

}
}