Skip to content

Commit

Permalink
Pause and Resume API (#10)
Browse files Browse the repository at this point in the history
* Pause and Resume APIs addition

Issue : #9
  • Loading branch information
gbbafna authored Jun 21, 2021
1 parent 84b567f commit 645ff47
Show file tree
Hide file tree
Showing 21 changed files with 1,282 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ import com.amazon.elasticsearch.replication.repository.REMOTE_REPOSITORY_TYPE
import com.amazon.elasticsearch.replication.repository.RemoteClusterRepositoriesService
import com.amazon.elasticsearch.replication.repository.RemoteClusterRepository
import com.amazon.elasticsearch.replication.repository.RemoteClusterRestoreLeaderService
import com.amazon.elasticsearch.replication.rest.ReplicateIndexHandler
import com.amazon.elasticsearch.replication.rest.StopIndexReplicationHandler
import com.amazon.elasticsearch.replication.rest.UpdateAutoFollowPatternsHandler
import com.amazon.elasticsearch.replication.task.IndexCloseListener
import com.amazon.elasticsearch.replication.task.autofollow.AutoFollowExecutor
import com.amazon.elasticsearch.replication.task.autofollow.AutoFollowParams
Expand Down Expand Up @@ -102,6 +99,15 @@ import java.util.Optional
import java.util.function.Supplier
import com.amazon.elasticsearch.replication.action.index.block.UpdateIndexBlockAction
import com.amazon.elasticsearch.replication.action.index.block.TransportUpddateIndexBlockAction
import com.amazon.elasticsearch.replication.action.pause.PauseIndexReplicationAction
import com.amazon.elasticsearch.replication.action.pause.TransportPauseIndexReplicationAction
import com.amazon.elasticsearch.replication.action.resume.ResumeIndexReplicationAction
import com.amazon.elasticsearch.replication.action.resume.TransportResumeIndexReplicationAction
import com.amazon.elasticsearch.replication.rest.PauseIndexReplicationHandler
import com.amazon.elasticsearch.replication.rest.ReplicateIndexHandler
import com.amazon.elasticsearch.replication.rest.ResumeIndexReplicationHandler
import com.amazon.elasticsearch.replication.rest.StopIndexReplicationHandler
import com.amazon.elasticsearch.replication.rest.UpdateAutoFollowPatternsHandler
import org.elasticsearch.common.util.concurrent.EsExecutors
import org.elasticsearch.threadpool.FixedExecutorBuilder

Expand Down Expand Up @@ -147,6 +153,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
ActionHandler(GetFileChunkAction.INSTANCE, TransportGetFileChunkAction::class.java),
ActionHandler(UpdateAutoFollowPatternAction.INSTANCE, TransportUpdateAutoFollowPatternAction::class.java),
ActionHandler(StopIndexReplicationAction.INSTANCE, TransportStopIndexReplicationAction::class.java),
ActionHandler(PauseIndexReplicationAction.INSTANCE, TransportPauseIndexReplicationAction::class.java),
ActionHandler(ResumeIndexReplicationAction.INSTANCE, TransportResumeIndexReplicationAction::class.java),
ActionHandler(UpdateIndexBlockAction.INSTANCE, TransportUpddateIndexBlockAction::class.java),
ActionHandler(ReleaseLeaderResourcesAction.INSTANCE, TransportReleaseLeaderResourcesAction::class.java)
)
Expand All @@ -159,6 +167,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
nodesInCluster: Supplier<DiscoveryNodes>): List<RestHandler> {
return listOf(ReplicateIndexHandler(),
UpdateAutoFollowPatternsHandler(),
PauseIndexReplicationHandler(),
ResumeIndexReplicationHandler(),
StopIndexReplicationHandler())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class TransportReplicateIndexMasterNodeAction @Inject constructor(transportServi
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
try {
val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.remoteCluster, replicateIndexReq.remoteIndex)

if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) {
throw IllegalArgumentException("Cant use same index again for replication. Either close or " +
"delete the index:${replicateIndexReq.followerIndex}")
}

val params = IndexReplicationParams(replicateIndexReq.remoteCluster, remoteMetadata.index, replicateIndexReq.followerIndex)
updateReplicationStateToStarted(replicateIndexReq.followerIndex)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.elasticsearch.replication.action.pause

import org.elasticsearch.action.ActionType
import org.elasticsearch.action.support.master.AcknowledgedResponse

class PauseIndexReplicationAction private constructor(): ActionType<AcknowledgedResponse>(NAME, ::AcknowledgedResponse) {
companion object {
const val NAME = "indices:admin/opendistro/replication/index/pause"
val INSTANCE: PauseIndexReplicationAction = PauseIndexReplicationAction()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.elasticsearch.replication.action.pause

import org.elasticsearch.action.ActionRequestValidationException
import org.elasticsearch.action.IndicesRequest
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedRequest
import org.elasticsearch.common.ParseField
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.*

class PauseIndexReplicationRequest : AcknowledgedRequest<PauseIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {

lateinit var indexName: String
var reason = "User initiated"

constructor(indexName: String, reason: String) {
this.indexName = indexName
this.reason = reason
}

private constructor() {
}

constructor(inp: StreamInput): super(inp) {
indexName = inp.readString()
}

companion object {
private val PARSER = ObjectParser<PauseIndexReplicationRequest, Void>("PauseReplicationRequestParser") {
PauseIndexReplicationRequest()
}

fun fromXContent(parser: XContentParser, followerIndex: String): PauseIndexReplicationRequest {
val PauseIndexReplicationRequest = PARSER.parse(parser, null)
PauseIndexReplicationRequest.indexName = followerIndex
return PauseIndexReplicationRequest
}
}

override fun validate(): ActionRequestValidationException? {
return null
}

override fun indices(vararg indices: String?): IndicesRequest {
return this
}

override fun indices(): Array<String> {
return arrayOf(indexName)
}

override fun indicesOptions(): IndicesOptions {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed()
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("indexName", indexName)
builder.endObject()
return builder
}

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.elasticsearch.replication.action.pause

import com.amazon.elasticsearch.replication.action.replicationstatedetails.UpdateReplicationStateDetailsRequest
import com.amazon.elasticsearch.replication.metadata.*
import com.amazon.elasticsearch.replication.util.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ResourceAlreadyExistsException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.master.AcknowledgedRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.action.support.master.TransportMasterNodeAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.AckedClusterStateUpdateTask
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.ClusterStateTaskExecutor
import org.elasticsearch.cluster.RestoreInProgress
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.block.ClusterBlockLevel
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.metadata.Metadata
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportService
import java.io.IOException

class TransportPauseIndexReplicationAction @Inject constructor(transportService: TransportService,
clusterService: ClusterService,
threadPool: ThreadPool,
actionFilters: ActionFilters,
indexNameExpressionResolver:
IndexNameExpressionResolver,
val client: Client) :
TransportMasterNodeAction<PauseIndexReplicationRequest, AcknowledgedResponse> (PauseIndexReplicationAction.NAME,
transportService, clusterService, threadPool, actionFilters, ::PauseIndexReplicationRequest,
indexNameExpressionResolver), CoroutineScope by GlobalScope {

companion object {
private val log = LogManager.getLogger(TransportPauseIndexReplicationAction::class.java)
}

override fun checkBlock(request: PauseIndexReplicationRequest, state: ClusterState): ClusterBlockException? {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE)
}

@Throws(Exception::class)
override fun masterOperation(request: PauseIndexReplicationRequest, state: ClusterState,
listener: ActionListener<AcknowledgedResponse>) {
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
listener.completeWith {
log.info("Pausing index replication on index:" + request.indexName)
validatePauseReplicationRequest(request)


// Restoring Index can't be paused
val restoring = clusterService.state().custom<RestoreInProgress>(RestoreInProgress.TYPE).any { entry ->
entry.indices().any { it == request.indexName }
}

if (restoring) {
throw ElasticsearchException("Index is in restore phase currently for index: ${request.indexName}. You can pause after restore completes." )
}

val stateUpdateResponse : AcknowledgedResponse =
clusterService.waitForClusterStateUpdate("Pause_replication") { l -> PauseReplicationTask(request, l)}
if (!stateUpdateResponse.isAcknowledged) {
throw ElasticsearchException("Failed to update cluster state")
}

updateReplicationStateToPaused(request.indexName)

AcknowledgedResponse(true)
}
}
}

private fun validatePauseReplicationRequest(request: PauseIndexReplicationRequest) {
val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName)
?:
throw IllegalArgumentException("No replication in progress for index:${request.indexName}")
val replicationOverallState = replicationStateParams[REPLICATION_OVERALL_STATE_KEY]
if (replicationOverallState == REPLICATION_OVERALL_STATE_PAUSED)
throw ResourceAlreadyExistsException("Index ${request.indexName} is already paused")
else if (replicationOverallState != REPLICATION_OVERALL_STATE_RUNNING_VALUE)
throw IllegalStateException("Unknown value of replication state:$replicationOverallState")

}

override fun executor(): String {
return ThreadPool.Names.SAME
}

private suspend fun updateReplicationStateToPaused(indexName: String) {
val replicationStateParamMap = HashMap<String, String>()
replicationStateParamMap[REPLICATION_OVERALL_STATE_KEY] = REPLICATION_OVERALL_STATE_PAUSED
val updateReplicationStateDetailsRequest = UpdateReplicationStateDetailsRequest(indexName, replicationStateParamMap,
UpdateReplicationStateDetailsRequest.UpdateType.ADD)
submitClusterStateUpdateTask(updateReplicationStateDetailsRequest, UpdateReplicationStateDetailsTaskExecutor.INSTANCE
as ClusterStateTaskExecutor<AcknowledgedRequest<UpdateReplicationStateDetailsRequest>>,
clusterService,
"pause-replication-state-params")
}

@Throws(IOException::class)
override fun read(inp: StreamInput): AcknowledgedResponse {
return AcknowledgedResponse(inp)
}

class PauseReplicationTask(val request: PauseIndexReplicationRequest, listener: ActionListener<AcknowledgedResponse>) :
AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {

override fun execute(currentState: ClusterState): ClusterState {
val newState = ClusterState.builder(currentState)

val mdBuilder = Metadata.builder(currentState.metadata)
val currentReplicationMetadata = currentState.metadata().custom(ReplicationMetadata.NAME)
?: ReplicationMetadata.EMPTY

// add paused index setting
val newMetadata = currentReplicationMetadata.pauseIndex(request.indexName, request.reason)
mdBuilder.putCustom(ReplicationMetadata.NAME, newMetadata)
newState.metadata(mdBuilder)
return newState.build()
}

override fun newResponse(acknowledged: Boolean) = AcknowledgedResponse(acknowledged)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.elasticsearch.replication.action.resume

import org.elasticsearch.action.ActionType
import org.elasticsearch.action.support.master.AcknowledgedResponse

class ResumeIndexReplicationAction private constructor(): ActionType<AcknowledgedResponse>(NAME, ::AcknowledgedResponse) {
companion object {
const val NAME = "indices:admin/opendistro/replication/index/resume"
val INSTANCE: ResumeIndexReplicationAction = ResumeIndexReplicationAction()
}
}
Loading

0 comments on commit 645ff47

Please sign in to comment.