Skip to content

Commit

Permalink
Merge 27960e9 into d5318e6
Browse files Browse the repository at this point in the history
  • Loading branch information
naveenpajjuri authored Apr 6, 2022
2 parents d5318e6 + 27960e9 commit 26a92da
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 44 deletions.
2 changes: 1 addition & 1 deletion docs/RFC.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ The replication machinery is implemented as an OpenSearch plugin that exposes AP

### Details

The Start Replication API performs basic validations (such as existence checks on the remote cluster & index) and then spawns a persistent background task named `IndexReplicationTask` in the follower cluster to coordinate the replication process. This task does not replicate any data directly, but rather is responsible for initiating subtasks and monitoring the overall replication process. Hence, it can run on any node on the cluster (including master nodes) and we chose the node with the least number of tasks at the time. Each step in the workflow is checkpointed so that it can be resumed safely if interrupted.
The Start Replication API performs basic validations (such as existence checks on the remote cluster & index) and then spawns a persistent background task named `IndexReplicationTask` in the follower cluster to coordinate the replication process. This task does not replicate any data directly, but rather is responsible for initiating subtasks and monitoring the overall replication process. Hence, it can run on any node on the cluster (including Cluster manager nodes) and we chose the node with the least number of tasks at the time. Each step in the workflow is checkpointed so that it can be resumed safely if interrupted.


![Details](/docs/images/rfc1.png?raw=true "Details")
Expand Down
13 changes: 5 additions & 8 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@

package org.opensearch.replication

import org.opensearch.replication.action.autofollow.AutoFollowMasterNodeAction
import org.opensearch.replication.action.autofollow.TransportAutoFollowMasterNodeAction
import org.opensearch.replication.action.autofollow.TransportUpdateAutoFollowPatternAction
import org.opensearch.replication.action.autofollow.UpdateAutoFollowPatternAction
import org.opensearch.replication.action.changes.GetChangesAction
import org.opensearch.replication.action.changes.TransportGetChangesAction
import org.opensearch.replication.action.index.ReplicateIndexAction
import org.opensearch.replication.action.index.ReplicateIndexMasterNodeAction
import org.opensearch.replication.action.index.ReplicateIndexClusterManagerNodeAction
import org.opensearch.replication.action.index.TransportReplicateIndexAction
import org.opensearch.replication.action.index.TransportReplicateIndexMasterNodeAction
import org.opensearch.replication.action.index.TransportReplicateIndexClusterManagerNodeAction
import org.opensearch.replication.action.index.block.TransportUpddateIndexBlockAction
import org.opensearch.replication.action.index.block.UpdateIndexBlockAction
import org.opensearch.replication.action.pause.PauseIndexReplicationAction
Expand Down Expand Up @@ -120,6 +116,7 @@ import org.opensearch.plugins.EnginePlugin
import org.opensearch.plugins.PersistentTaskPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.RepositoryPlugin
import org.opensearch.replication.action.autofollow.*
import org.opensearch.replication.action.stats.AutoFollowStatsAction
import org.opensearch.replication.action.stats.FollowerStatsAction
import org.opensearch.replication.action.stats.LeaderStatsAction
Expand Down Expand Up @@ -218,12 +215,12 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
override fun getActions(): List<ActionHandler<out ActionRequest, out ActionResponse>> {
return listOf(ActionHandler(GetChangesAction.INSTANCE, TransportGetChangesAction::class.java),
ActionHandler(ReplicateIndexAction.INSTANCE, TransportReplicateIndexAction::class.java),
ActionHandler(ReplicateIndexMasterNodeAction.INSTANCE, TransportReplicateIndexMasterNodeAction::class.java),
ActionHandler(ReplicateIndexClusterManagerNodeAction.INSTANCE, TransportReplicateIndexClusterManagerNodeAction::class.java),
ActionHandler(ReplayChangesAction.INSTANCE, TransportReplayChangesAction::class.java),
ActionHandler(GetStoreMetadataAction.INSTANCE, TransportGetStoreMetadataAction::class.java),
ActionHandler(GetFileChunkAction.INSTANCE, TransportGetFileChunkAction::class.java),
ActionHandler(UpdateAutoFollowPatternAction.INSTANCE, TransportUpdateAutoFollowPatternAction::class.java),
ActionHandler(AutoFollowMasterNodeAction.INSTANCE, TransportAutoFollowMasterNodeAction::class.java),
ActionHandler(AutoFollowClusterManagerNodeAction.INSTANCE, TransportAutoFollowClusterManagerNodeAction::class.java),
ActionHandler(StopIndexReplicationAction.INSTANCE, TransportStopIndexReplicationAction::class.java),
ActionHandler(PauseIndexReplicationAction.INSTANCE, TransportPauseIndexReplicationAction::class.java),
ActionHandler(ResumeIndexReplicationAction.INSTANCE, TransportResumeIndexReplicationAction::class.java),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ package org.opensearch.replication.action.autofollow
import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse

class AutoFollowMasterNodeAction: ActionType<AcknowledgedResponse>(NAME, ::AcknowledgedResponse) {
class AutoFollowClusterManagerNodeAction: ActionType<AcknowledgedResponse>(NAME, ::AcknowledgedResponse) {
companion object {
const val NAME = "internal:cluster:admin/plugins/replication/autofollow/update"
val INSTANCE = AutoFollowMasterNodeAction()
val INSTANCE = AutoFollowClusterManagerNodeAction()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder

class AutoFollowMasterNodeRequest: MasterNodeRequest<AutoFollowMasterNodeRequest>, ToXContentObject {
class AutoFollowClusterManagerNodeRequest: MasterNodeRequest<AutoFollowClusterManagerNodeRequest>, ToXContentObject {
var user: User? = null
var autofollowReq: UpdateAutoFollowPatternRequest
var withSecurityContext: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,26 @@ import org.opensearch.replication.ReplicationException
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService

class TransportAutoFollowMasterNodeAction @Inject constructor(transportService: TransportService, clusterService: ClusterService, threadPool: ThreadPool,
class TransportAutoFollowClusterManagerNodeAction @Inject constructor(transportService: TransportService, clusterService: ClusterService, threadPool: ThreadPool,
actionFilters: ActionFilters, indexNameExpressionResolver: IndexNameExpressionResolver,
private val client: NodeClient, private val metadataManager: ReplicationMetadataManager,
val indexScopedSettings: IndexScopedSettings) :
TransportMasterNodeAction<AutoFollowMasterNodeRequest, AcknowledgedResponse>(
AutoFollowMasterNodeAction.NAME, true, transportService, clusterService, threadPool, actionFilters,
::AutoFollowMasterNodeRequest, indexNameExpressionResolver), CoroutineScope by GlobalScope {
TransportMasterNodeAction<AutoFollowClusterManagerNodeRequest, AcknowledgedResponse>(
AutoFollowClusterManagerNodeAction.NAME, true, transportService, clusterService, threadPool, actionFilters,
::AutoFollowClusterManagerNodeRequest, indexNameExpressionResolver), CoroutineScope by GlobalScope {

companion object {
private val log = LogManager.getLogger(TransportAutoFollowMasterNodeAction::class.java)
private val log = LogManager.getLogger(TransportAutoFollowClusterManagerNodeAction::class.java)
const val AUTOFOLLOW_EXCEPTION_GENERIC_STRING = "Failed to update autofollow pattern"
}

override fun checkBlock(request: AutoFollowMasterNodeRequest, state: ClusterState): ClusterBlockException? {
override fun checkBlock(request: AutoFollowClusterManagerNodeRequest, state: ClusterState): ClusterBlockException? {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE)
}

override fun masterOperation(masterNodeReq: AutoFollowMasterNodeRequest, state: ClusterState, listener: ActionListener<AcknowledgedResponse>) {
val request = masterNodeReq.autofollowReq
var user = masterNodeReq.user
override fun masterOperation(clusterManagerNodeReq: AutoFollowClusterManagerNodeRequest, state: ClusterState, listener: ActionListener<AcknowledgedResponse>) {
val request = clusterManagerNodeReq.autofollowReq
var user = clusterManagerNodeReq.user
launch(threadPool.coroutineContext()) {
listener.completeWith {
if (request.action == UpdateAutoFollowPatternRequest.Action.REMOVE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class TransportUpdateAutoFollowPatternAction @Inject constructor(transportServic
throw org.opensearch.replication.ReplicationException("Setup checks failed while setting-up auto follow pattern")
}
}
val masterNodeReq = AutoFollowMasterNodeRequest(user, request)
client.suspendExecute(AutoFollowMasterNodeAction.INSTANCE, masterNodeReq)
val clusterManagerNodeReq = AutoFollowClusterManagerNodeRequest(user, request)
client.suspendExecute(AutoFollowClusterManagerNodeAction.INSTANCE, clusterManagerNodeReq)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ package org.opensearch.replication.action.index
import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse

class ReplicateIndexMasterNodeAction private constructor(): ActionType<AcknowledgedResponse>(NAME, ::AcknowledgedResponse) {
class ReplicateIndexClusterManagerNodeAction private constructor(): ActionType<AcknowledgedResponse>(NAME, ::AcknowledgedResponse) {
companion object {
const val NAME = "internal:indices/admin/plugins/replication/index/start"
val INSTANCE: ReplicateIndexMasterNodeAction = ReplicateIndexMasterNodeAction()
val INSTANCE: ReplicateIndexClusterManagerNodeAction = ReplicateIndexClusterManagerNodeAction()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
// Setup checks are successful and trigger replication for the index
// permissions evaluation to trigger replication is based on the current security context set
val internalReq = ReplicateIndexMasterNodeRequest(user, request)
client.suspendExecute(ReplicateIndexMasterNodeAction.INSTANCE, internalReq)
client.suspendExecute(ReplicateIndexClusterManagerNodeAction.INSTANCE, internalReq)
ReplicateIndexResponse(true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.io.IOException

class TransportReplicateIndexMasterNodeAction @Inject constructor(transportService: TransportService,
clusterService: ClusterService,
threadPool: ThreadPool,
actionFilters: ActionFilters,
indexNameExpressionResolver: IndexNameExpressionResolver,
val indexScopedSettings: IndexScopedSettings,
private val persistentTasksService: PersistentTasksService,
private val nodeClient : NodeClient,
private val repositoryService: RepositoriesService,
private val replicationMetadataManager: ReplicationMetadataManager) :
TransportMasterNodeAction<ReplicateIndexMasterNodeRequest, AcknowledgedResponse>(ReplicateIndexMasterNodeAction.NAME,
class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transportService: TransportService,
clusterService: ClusterService,
threadPool: ThreadPool,
actionFilters: ActionFilters,
indexNameExpressionResolver: IndexNameExpressionResolver,
val indexScopedSettings: IndexScopedSettings,
private val persistentTasksService: PersistentTasksService,
private val nodeClient : NodeClient,
private val repositoryService: RepositoriesService,
private val replicationMetadataManager: ReplicationMetadataManager) :
TransportMasterNodeAction<ReplicateIndexMasterNodeRequest, AcknowledgedResponse>(ReplicateIndexClusterManagerNodeAction.NAME,
transportService, clusterService, threadPool, actionFilters, ::ReplicateIndexMasterNodeRequest, indexNameExpressionResolver),
CoroutineScope by GlobalScope {

companion object {
private val log = LogManager.getLogger(TransportReplicateIndexMasterNodeAction::class.java)
private val log = LogManager.getLogger(TransportReplicateIndexClusterManagerNodeAction::class.java)
}

override fun executor(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
}

/**
* Fetches the index mapping from the leader cluster, applies it to the local cluster's master and then waits
* Fetches the index mapping from the leader cluster, applies it to the local cluster's clusterManager and then waits
* for the mapping to become available on the current shard. Should only be called on the primary shard .
*/
private suspend fun syncRemoteMapping(leaderAlias: String, leaderIndex: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
} else {
return FailedState(Collections.emptyMap(), """
Unable to find in progress restore for remote index: $leaderAlias:$leaderIndex.
This can happen if there was a badly timed master node failure.""".trimIndent())
This can happen if there was a badly timed clusterManager node failure.""".trimIndent())
}
} else if (restore.state() == RestoreInProgress.State.FAILURE) {
val failureReason = restore.shards().values().find {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ClusterRerouteFollowerIT : MultiClusterRestTestCase() {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)

//Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying
//Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying
assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ClusterRerouteLeaderIT : MultiClusterRestTestCase() {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)

//Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying
//Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying
assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class SecurityCustomRolesIT: SecurityBase() {
requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))

insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
//Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying
//Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying
assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SecurityCustomRolesLeaderIT: SecurityBase() {
requestOptions = RequestOptions.DEFAULT.addBasicAuthHeader("testUser1","password"))

insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
//Querying ES cluster throws random exceptions like MasterNotDiscovered or ShardsFailed etc, so catching them and retrying
//Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying
assertBusy ({
try {
Assertions.assertThat(docs(FOLLOWER, followerIndexName)).contains("dummy data 1")
Expand Down

0 comments on commit 26a92da

Please sign in to comment.