From 645ff475980279d76df0a2996aea901fe1b463a5 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Mon, 21 Jun 2021 17:56:54 +0530 Subject: [PATCH] Pause and Resume API (#10) * Pause and Resume APIs addition Issue : https://github.com/opensearch-project/cross-cluster-replication/issues/9 --- .../replication/ReplicationPlugin.kt | 16 +- ...TransportReplicateIndexMasterNodeAction.kt | 6 + .../pause/PauseIndexReplicationAction.kt | 26 ++ .../pause/PauseIndexReplicationRequest.kt | 84 +++++++ .../TransportPauseIndexReplicationAction.kt | 150 ++++++++++++ .../resume/ResumeIndexReplicationAction.kt | 26 ++ .../resume/ResumeIndexReplicationRequest.kt | 81 +++++++ .../TransportResumeIndexReplicationAction.kt | 222 ++++++++++++++++++ .../TransportStopIndexReplicationAction.kt | 66 +++++- .../metadata/ReplicationMetadata.kt | 90 ++++++- .../rest/PauseIndexReplicationHandler.kt | 54 +++++ .../rest/ResumeIndexReplicationHandler.kt | 54 +++++ .../RemoteClusterRetentionLeaseHelper.kt | 22 ++ .../replication/task/ReplicationState.kt | 2 +- .../task/index/IndexReplicationExecutor.kt | 4 - .../task/index/IndexReplicationTask.kt | 21 +- .../task/shard/ShardReplicationTask.kt | 33 ++- .../replication/ReplicationHelpers.kt | 20 ++ .../integ/rest/PauseReplicationIT.kt | 172 ++++++++++++++ .../integ/rest/ResumeReplicationIT.kt | 158 +++++++++++++ .../integ/rest/StartReplicationIT.kt | 4 +- 21 files changed, 1282 insertions(+), 29 deletions(-) create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationAction.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationRequest.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/TransportPauseIndexReplicationAction.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationAction.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationRequest.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/rest/PauseIndexReplicationHandler.kt create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/rest/ResumeIndexReplicationHandler.kt create mode 100644 src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt create mode 100644 src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt index 9dbb96d1..aec79385 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt @@ -38,9 +38,6 @@ import com.amazon.elasticsearch.replication.repository.REMOTE_REPOSITORY_TYPE import com.amazon.elasticsearch.replication.repository.RemoteClusterRepositoriesService import com.amazon.elasticsearch.replication.repository.RemoteClusterRepository import com.amazon.elasticsearch.replication.repository.RemoteClusterRestoreLeaderService -import com.amazon.elasticsearch.replication.rest.ReplicateIndexHandler -import com.amazon.elasticsearch.replication.rest.StopIndexReplicationHandler -import com.amazon.elasticsearch.replication.rest.UpdateAutoFollowPatternsHandler import com.amazon.elasticsearch.replication.task.IndexCloseListener import com.amazon.elasticsearch.replication.task.autofollow.AutoFollowExecutor import com.amazon.elasticsearch.replication.task.autofollow.AutoFollowParams @@ -102,6 +99,15 @@ import java.util.Optional import java.util.function.Supplier import com.amazon.elasticsearch.replication.action.index.block.UpdateIndexBlockAction import com.amazon.elasticsearch.replication.action.index.block.TransportUpddateIndexBlockAction +import com.amazon.elasticsearch.replication.action.pause.PauseIndexReplicationAction +import com.amazon.elasticsearch.replication.action.pause.TransportPauseIndexReplicationAction +import com.amazon.elasticsearch.replication.action.resume.ResumeIndexReplicationAction +import com.amazon.elasticsearch.replication.action.resume.TransportResumeIndexReplicationAction +import com.amazon.elasticsearch.replication.rest.PauseIndexReplicationHandler +import com.amazon.elasticsearch.replication.rest.ReplicateIndexHandler +import com.amazon.elasticsearch.replication.rest.ResumeIndexReplicationHandler +import com.amazon.elasticsearch.replication.rest.StopIndexReplicationHandler +import com.amazon.elasticsearch.replication.rest.UpdateAutoFollowPatternsHandler import org.elasticsearch.common.util.concurrent.EsExecutors import org.elasticsearch.threadpool.FixedExecutorBuilder @@ -147,6 +153,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, ActionHandler(GetFileChunkAction.INSTANCE, TransportGetFileChunkAction::class.java), ActionHandler(UpdateAutoFollowPatternAction.INSTANCE, TransportUpdateAutoFollowPatternAction::class.java), ActionHandler(StopIndexReplicationAction.INSTANCE, TransportStopIndexReplicationAction::class.java), + ActionHandler(PauseIndexReplicationAction.INSTANCE, TransportPauseIndexReplicationAction::class.java), + ActionHandler(ResumeIndexReplicationAction.INSTANCE, TransportResumeIndexReplicationAction::class.java), ActionHandler(UpdateIndexBlockAction.INSTANCE, TransportUpddateIndexBlockAction::class.java), ActionHandler(ReleaseLeaderResourcesAction.INSTANCE, TransportReleaseLeaderResourcesAction::class.java) ) @@ -159,6 +167,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, nodesInCluster: Supplier): List { return listOf(ReplicateIndexHandler(), UpdateAutoFollowPatternsHandler(), + PauseIndexReplicationHandler(), + ResumeIndexReplicationHandler(), StopIndexReplicationHandler()) } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt index 7d694265..34c959fd 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt @@ -96,6 +96,12 @@ class TransportReplicateIndexMasterNodeAction @Inject constructor(transportServi launch(Dispatchers.Unconfined + threadPool.coroutineContext()) { try { val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.remoteCluster, replicateIndexReq.remoteIndex) + + if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) { + throw IllegalArgumentException("Cant use same index again for replication. Either close or " + + "delete the index:${replicateIndexReq.followerIndex}") + } + val params = IndexReplicationParams(replicateIndexReq.remoteCluster, remoteMetadata.index, replicateIndexReq.followerIndex) updateReplicationStateToStarted(replicateIndexReq.followerIndex) diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationAction.kt new file mode 100644 index 00000000..0e0364d8 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationAction.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.action.pause + +import org.elasticsearch.action.ActionType +import org.elasticsearch.action.support.master.AcknowledgedResponse + +class PauseIndexReplicationAction private constructor(): ActionType(NAME, ::AcknowledgedResponse) { + companion object { + const val NAME = "indices:admin/opendistro/replication/index/pause" + val INSTANCE: PauseIndexReplicationAction = PauseIndexReplicationAction() + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationRequest.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationRequest.kt new file mode 100644 index 00000000..063a6a21 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/PauseIndexReplicationRequest.kt @@ -0,0 +1,84 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.action.pause + +import org.elasticsearch.action.ActionRequestValidationException +import org.elasticsearch.action.IndicesRequest +import org.elasticsearch.action.support.IndicesOptions +import org.elasticsearch.action.support.master.AcknowledgedRequest +import org.elasticsearch.common.ParseField +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.* + +class PauseIndexReplicationRequest : AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { + + lateinit var indexName: String + var reason = "User initiated" + + constructor(indexName: String, reason: String) { + this.indexName = indexName + this.reason = reason + } + + private constructor() { + } + + constructor(inp: StreamInput): super(inp) { + indexName = inp.readString() + } + + companion object { + private val PARSER = ObjectParser("PauseReplicationRequestParser") { + PauseIndexReplicationRequest() + } + + fun fromXContent(parser: XContentParser, followerIndex: String): PauseIndexReplicationRequest { + val PauseIndexReplicationRequest = PARSER.parse(parser, null) + PauseIndexReplicationRequest.indexName = followerIndex + return PauseIndexReplicationRequest + } + } + + override fun validate(): ActionRequestValidationException? { + return null + } + + override fun indices(vararg indices: String?): IndicesRequest { + return this + } + + override fun indices(): Array { + return arrayOf(indexName) + } + + override fun indicesOptions(): IndicesOptions { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed() + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field("indexName", indexName) + builder.endObject() + return builder + } + + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeString(indexName) + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/TransportPauseIndexReplicationAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/TransportPauseIndexReplicationAction.kt new file mode 100644 index 00000000..e63b176a --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/pause/TransportPauseIndexReplicationAction.kt @@ -0,0 +1,150 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.action.pause + +import com.amazon.elasticsearch.replication.action.replicationstatedetails.UpdateReplicationStateDetailsRequest +import com.amazon.elasticsearch.replication.metadata.* +import com.amazon.elasticsearch.replication.util.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.ResourceAlreadyExistsException +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.ActionFilters +import org.elasticsearch.action.support.master.AcknowledgedRequest +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.action.support.master.TransportMasterNodeAction +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.AckedClusterStateUpdateTask +import org.elasticsearch.cluster.ClusterState +import org.elasticsearch.cluster.ClusterStateTaskExecutor +import org.elasticsearch.cluster.RestoreInProgress +import org.elasticsearch.cluster.block.ClusterBlockException +import org.elasticsearch.cluster.block.ClusterBlockLevel +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver +import org.elasticsearch.cluster.metadata.Metadata +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.threadpool.ThreadPool +import org.elasticsearch.transport.TransportService +import java.io.IOException + +class TransportPauseIndexReplicationAction @Inject constructor(transportService: TransportService, + clusterService: ClusterService, + threadPool: ThreadPool, + actionFilters: ActionFilters, + indexNameExpressionResolver: + IndexNameExpressionResolver, + val client: Client) : + TransportMasterNodeAction (PauseIndexReplicationAction.NAME, + transportService, clusterService, threadPool, actionFilters, ::PauseIndexReplicationRequest, + indexNameExpressionResolver), CoroutineScope by GlobalScope { + + companion object { + private val log = LogManager.getLogger(TransportPauseIndexReplicationAction::class.java) + } + + override fun checkBlock(request: PauseIndexReplicationRequest, state: ClusterState): ClusterBlockException? { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE) + } + + @Throws(Exception::class) + override fun masterOperation(request: PauseIndexReplicationRequest, state: ClusterState, + listener: ActionListener) { + launch(Dispatchers.Unconfined + threadPool.coroutineContext()) { + listener.completeWith { + log.info("Pausing index replication on index:" + request.indexName) + validatePauseReplicationRequest(request) + + + // Restoring Index can't be paused + val restoring = clusterService.state().custom(RestoreInProgress.TYPE).any { entry -> + entry.indices().any { it == request.indexName } + } + + if (restoring) { + throw ElasticsearchException("Index is in restore phase currently for index: ${request.indexName}. You can pause after restore completes." ) + } + + val stateUpdateResponse : AcknowledgedResponse = + clusterService.waitForClusterStateUpdate("Pause_replication") { l -> PauseReplicationTask(request, l)} + if (!stateUpdateResponse.isAcknowledged) { + throw ElasticsearchException("Failed to update cluster state") + } + + updateReplicationStateToPaused(request.indexName) + + AcknowledgedResponse(true) + } + } + } + + private fun validatePauseReplicationRequest(request: PauseIndexReplicationRequest) { + val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName) + ?: + throw IllegalArgumentException("No replication in progress for index:${request.indexName}") + val replicationOverallState = replicationStateParams[REPLICATION_OVERALL_STATE_KEY] + if (replicationOverallState == REPLICATION_OVERALL_STATE_PAUSED) + throw ResourceAlreadyExistsException("Index ${request.indexName} is already paused") + else if (replicationOverallState != REPLICATION_OVERALL_STATE_RUNNING_VALUE) + throw IllegalStateException("Unknown value of replication state:$replicationOverallState") + + } + + override fun executor(): String { + return ThreadPool.Names.SAME + } + + private suspend fun updateReplicationStateToPaused(indexName: String) { + val replicationStateParamMap = HashMap() + replicationStateParamMap[REPLICATION_OVERALL_STATE_KEY] = REPLICATION_OVERALL_STATE_PAUSED + val updateReplicationStateDetailsRequest = UpdateReplicationStateDetailsRequest(indexName, replicationStateParamMap, + UpdateReplicationStateDetailsRequest.UpdateType.ADD) + submitClusterStateUpdateTask(updateReplicationStateDetailsRequest, UpdateReplicationStateDetailsTaskExecutor.INSTANCE + as ClusterStateTaskExecutor>, + clusterService, + "pause-replication-state-params") + } + + @Throws(IOException::class) + override fun read(inp: StreamInput): AcknowledgedResponse { + return AcknowledgedResponse(inp) + } + + class PauseReplicationTask(val request: PauseIndexReplicationRequest, listener: ActionListener) : + AckedClusterStateUpdateTask(request, listener) { + + override fun execute(currentState: ClusterState): ClusterState { + val newState = ClusterState.builder(currentState) + + val mdBuilder = Metadata.builder(currentState.metadata) + val currentReplicationMetadata = currentState.metadata().custom(ReplicationMetadata.NAME) + ?: ReplicationMetadata.EMPTY + + // add paused index setting + val newMetadata = currentReplicationMetadata.pauseIndex(request.indexName, request.reason) + mdBuilder.putCustom(ReplicationMetadata.NAME, newMetadata) + newState.metadata(mdBuilder) + return newState.build() + } + + override fun newResponse(acknowledged: Boolean) = AcknowledgedResponse(acknowledged) + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationAction.kt new file mode 100644 index 00000000..583d7f44 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationAction.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.action.resume + +import org.elasticsearch.action.ActionType +import org.elasticsearch.action.support.master.AcknowledgedResponse + +class ResumeIndexReplicationAction private constructor(): ActionType(NAME, ::AcknowledgedResponse) { + companion object { + const val NAME = "indices:admin/opendistro/replication/index/resume" + val INSTANCE: ResumeIndexReplicationAction = ResumeIndexReplicationAction() + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationRequest.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationRequest.kt new file mode 100644 index 00000000..3276dc31 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/ResumeIndexReplicationRequest.kt @@ -0,0 +1,81 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.action.resume + +import org.elasticsearch.action.ActionRequestValidationException +import org.elasticsearch.action.IndicesRequest +import org.elasticsearch.action.support.IndicesOptions +import org.elasticsearch.action.support.master.AcknowledgedRequest +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.* + +class ResumeIndexReplicationRequest : AcknowledgedRequest, IndicesRequest.Replaceable, ToXContentObject { + + lateinit var indexName: String + + constructor(indexName: String) { + this.indexName = indexName + } + + private constructor() { + } + + constructor(inp: StreamInput): super(inp) { + indexName = inp.readString() + } + + companion object { + private val PARSER = ObjectParser("ResumeReplicationRequestParser") { + ResumeIndexReplicationRequest() + } + + fun fromXContent(parser: XContentParser, followerIndex: String): ResumeIndexReplicationRequest { + val ResumeIndexReplicationRequest = PARSER.parse(parser, null) + ResumeIndexReplicationRequest.indexName = followerIndex + return ResumeIndexReplicationRequest + } + } + + override fun validate(): ActionRequestValidationException? { + return null + } + + override fun indices(vararg indices: String?): IndicesRequest { + return this + } + + override fun indices(): Array { + return arrayOf(indexName) + } + + override fun indicesOptions(): IndicesOptions { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed() + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field("indexName", indexName) + builder.endObject() + return builder + } + + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeString(indexName) + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt new file mode 100644 index 00000000..333a75e8 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt @@ -0,0 +1,222 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.action.resume + +import com.amazon.elasticsearch.replication.action.index.ReplicateIndexResponse +import com.amazon.elasticsearch.replication.action.replicationstatedetails.UpdateReplicationStateDetailsRequest +import com.amazon.elasticsearch.replication.metadata.* +import com.amazon.elasticsearch.replication.seqno.RemoteClusterRetentionLeaseHelper +import com.amazon.elasticsearch.replication.task.ReplicationState +import com.amazon.elasticsearch.replication.task.index.IndexReplicationExecutor +import com.amazon.elasticsearch.replication.task.index.IndexReplicationParams +import com.amazon.elasticsearch.replication.task.index.IndexReplicationState +import com.amazon.elasticsearch.replication.util.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.ResourceAlreadyExistsException +import org.elasticsearch.ResourceNotFoundException +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.ActionFilters +import org.elasticsearch.action.support.IndicesOptions +import org.elasticsearch.action.support.master.AcknowledgedRequest +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.action.support.master.TransportMasterNodeAction +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.AckedClusterStateUpdateTask +import org.elasticsearch.cluster.ClusterState +import org.elasticsearch.cluster.ClusterStateTaskExecutor +import org.elasticsearch.cluster.block.ClusterBlockException +import org.elasticsearch.cluster.block.ClusterBlockLevel +import org.elasticsearch.cluster.metadata.IndexMetadata +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver +import org.elasticsearch.cluster.metadata.Metadata +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.index.IndexNotFoundException +import org.elasticsearch.index.shard.ShardId +import org.elasticsearch.threadpool.ThreadPool +import org.elasticsearch.transport.TransportService +import java.io.IOException + +class TransportResumeIndexReplicationAction @Inject constructor(transportService: TransportService, + clusterService: ClusterService, + threadPool: ThreadPool, + actionFilters: ActionFilters, + indexNameExpressionResolver: + IndexNameExpressionResolver, + val client: Client) : + TransportMasterNodeAction (ResumeIndexReplicationAction.NAME, + transportService, clusterService, threadPool, actionFilters, ::ResumeIndexReplicationRequest, + indexNameExpressionResolver), CoroutineScope by GlobalScope { + + companion object { + private val log = LogManager.getLogger(TransportResumeIndexReplicationAction::class.java) + } + + override fun checkBlock(request: ResumeIndexReplicationRequest, state: ClusterState): ClusterBlockException? { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE) + } + + @Throws(Exception::class) + override fun masterOperation(request: ResumeIndexReplicationRequest, state: ClusterState, + listener: ActionListener) { + launch(Dispatchers.Unconfined + threadPool.coroutineContext()) { + listener.completeWith { + log.info("Resuming index replication on index:" + request.indexName) + validateResumeReplicationRequest(request) + + val stateUpdateResponse : AcknowledgedResponse = + clusterService.waitForClusterStateUpdate("Resume_replication") { l -> ResumeReplicationTask(request, l)} + if (!stateUpdateResponse.isAcknowledged) { + throw ElasticsearchException("Failed to update cluster state") + } + + val currentReplicationMetadata = state.metadata().custom(ReplicationMetadata.NAME) + ?: ReplicationMetadata.EMPTY + + val clusterAlias = currentReplicationMetadata.replicatedIndices.entries.firstOrNull { + it.value.containsKey(request.indexName) + }?.key + + val leaderIndex = currentReplicationMetadata.replicatedIndices.get(clusterAlias)?.get(request.indexName) + + if (leaderIndex == null || clusterAlias == null) { + throw IllegalStateException("Unknown value of leader index or cluster alias") + } + + val remoteMetadata = getRemoteIndexMetadata(clusterAlias, leaderIndex) + + val params = IndexReplicationParams(clusterAlias, remoteMetadata.index, request.indexName) + + if (!isResumable(params)) { + throw ResourceNotFoundException("Retention lease doesn't exist. Replication can't be resumed for ${request.indexName}") + } + + updateReplicationStateToStarted(request.indexName) + + val task = persistentTasksService.startTask("replication:index:${request.indexName}", + IndexReplicationExecutor.TASK_NAME, params) + + if (!task.isAssigned) { + log.error("Failed to assign task") + listener.onResponse(ReplicateIndexResponse(false)) + } + + // Now wait for the replication to start and the follower index to get created before returning + persistentTasksService.waitForTaskCondition(task.id, request.timeout()) { t -> + val replicationState = (t.state as IndexReplicationState?)?.state + replicationState == ReplicationState.FOLLOWING + } + + AcknowledgedResponse(true) + } + } + } + + private suspend fun isResumable(params :IndexReplicationParams): Boolean { + var isResumable = true + val remoteClient = client.getRemoteClusterClient(params.remoteCluster) + val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName).shards() + val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient) + shards.forEach { + val followerShardId = it.value.shardId + if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.remoteIndex, followerShardId.id), followerShardId)) { + isResumable = false + } + } + + if (isResumable) { + return true + } + + // clean up all retention leases we may have accidentally took while doing verifyRetentionLeaseExist . + // Idempotent Op which does no harm + shards.forEach { + val followerShardId = it.value.shardId + log.debug("Removing lease for $followerShardId.id ") + retentionLeaseHelper.removeRetentionLease(ShardId(params.remoteIndex, followerShardId.id), followerShardId) + } + + return false + } + + private suspend fun getRemoteIndexMetadata(remoteCluster: String, remoteIndex: String): IndexMetadata { + val remoteClusterClient = client.getRemoteClusterClient(remoteCluster).admin().cluster() + val clusterStateRequest = remoteClusterClient.prepareState() + .clear() + .setIndices(remoteIndex) + .setMetadata(true) + .setIndicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed()) + .request() + val remoteState = suspending(remoteClusterClient::state)(clusterStateRequest).state + return remoteState.metadata.index(remoteIndex) ?: throw IndexNotFoundException("${remoteCluster}:${remoteIndex}") + } + + private fun validateResumeReplicationRequest(request: ResumeIndexReplicationRequest) { + val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName) + ?: + throw IllegalArgumentException("No replication in progress for index:${request.indexName}") + val replicationOverallState = replicationStateParams[REPLICATION_OVERALL_STATE_KEY] + + if (replicationOverallState != REPLICATION_OVERALL_STATE_PAUSED) + throw ResourceAlreadyExistsException("Replication on Index ${request.indexName} is already running") + } + + private suspend fun updateReplicationStateToStarted(indexName: String) { + val replicationStateParamMap = HashMap() + replicationStateParamMap[REPLICATION_OVERALL_STATE_KEY] = REPLICATION_OVERALL_STATE_RUNNING_VALUE + val updateReplicationStateDetailsRequest = UpdateReplicationStateDetailsRequest(indexName, replicationStateParamMap, + UpdateReplicationStateDetailsRequest.UpdateType.ADD) + submitClusterStateUpdateTask(updateReplicationStateDetailsRequest, UpdateReplicationStateDetailsTaskExecutor.INSTANCE + as ClusterStateTaskExecutor>, + clusterService, + "resume-replication-state-params") + } + + override fun executor(): String { + return ThreadPool.Names.SAME + } + + @Throws(IOException::class) + override fun read(inp: StreamInput): AcknowledgedResponse { + return AcknowledgedResponse(inp) + } + + class ResumeReplicationTask(val request: ResumeIndexReplicationRequest, listener: ActionListener) : + AckedClusterStateUpdateTask(request, listener) { + + override fun execute(currentState: ClusterState): ClusterState { + val newState = ClusterState.builder(currentState) + + val mdBuilder = Metadata.builder(currentState.metadata) + val currentReplicationMetadata = currentState.metadata().custom(ReplicationMetadata.NAME) + ?: ReplicationMetadata.EMPTY + + // add paused index setting + val newMetadata = currentReplicationMetadata.resumeIndex(request.indexName) + mdBuilder.putCustom(ReplicationMetadata.NAME, newMetadata) + newState.metadata(mdBuilder) + return newState.build() + } + + override fun newResponse(acknowledged: Boolean) = AcknowledgedResponse(acknowledged) + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stop/TransportStopIndexReplicationAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stop/TransportStopIndexReplicationAction.kt index 6815434b..1e999972 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/stop/TransportStopIndexReplicationAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/stop/TransportStopIndexReplicationAction.kt @@ -16,12 +16,11 @@ package com.amazon.elasticsearch.replication.action.stop import com.amazon.elasticsearch.replication.ReplicationPlugin.Companion.REPLICATED_INDEX_SETTING -import com.amazon.elasticsearch.replication.metadata.INDEX_REPLICATION_BLOCK -import com.amazon.elasticsearch.replication.metadata.checkIfIndexBlockedWithLevel import com.amazon.elasticsearch.replication.metadata.REPLICATION_OVERALL_STATE_KEY import com.amazon.elasticsearch.replication.metadata.REPLICATION_OVERALL_STATE_RUNNING_VALUE -import com.amazon.elasticsearch.replication.metadata.ReplicationMetadata -import com.amazon.elasticsearch.replication.metadata.getReplicationStateParamsForIndex +import com.amazon.elasticsearch.replication.metadata.* +import com.amazon.elasticsearch.replication.seqno.RemoteClusterRetentionLeaseHelper +import com.amazon.elasticsearch.replication.task.index.IndexReplicationParams import com.amazon.elasticsearch.replication.util.completeWith import com.amazon.elasticsearch.replication.util.coroutineContext import com.amazon.elasticsearch.replication.util.suspending @@ -36,6 +35,7 @@ import org.elasticsearch.action.ActionListener import org.elasticsearch.action.admin.indices.close.CloseIndexRequest import org.elasticsearch.action.admin.indices.open.OpenIndexRequest import org.elasticsearch.action.support.ActionFilters +import org.elasticsearch.action.support.IndicesOptions import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.action.support.master.TransportMasterNodeAction import org.elasticsearch.client.Client @@ -53,6 +53,7 @@ import org.elasticsearch.common.inject.Inject import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.settings.Settings import org.elasticsearch.index.IndexNotFoundException +import org.elasticsearch.index.shard.ShardId import org.elasticsearch.threadpool.ThreadPool import org.elasticsearch.transport.TransportService import java.io.IOException @@ -89,7 +90,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: launch(Dispatchers.Unconfined + threadPool.coroutineContext()) { listener.completeWith { log.info("Stopping index replication on index:" + request.indexName) - validateStopReplicationRequest(request) + val isPaused = validateStopReplicationRequest(request) // Index will be deleted if replication is stopped while it is restoring. So no need to close/reopen val restoring = clusterService.state().custom(RestoreInProgress.TYPE).any { entry -> @@ -103,6 +104,11 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: } } + // If paused , we need to clear retention leases as Shard Tasks are non-existent + if (isPaused) { + removeRetentionLease(state, request.indexName) + } + val stateUpdateResponse : AcknowledgedResponse = clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)} if (!stateUpdateResponse.isAcknowledged) { @@ -117,18 +123,63 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: throw ElasticsearchException("Failed to reopen index: ${request.indexName}") } } + + + AcknowledgedResponse(true) } } } - private fun validateStopReplicationRequest(request: StopIndexReplicationRequest) { + private suspend fun removeRetentionLease(state: ClusterState, followerIndexName: String) { + val currentReplicationMetadata = state.metadata().custom(ReplicationMetadata.NAME) + ?: ReplicationMetadata.EMPTY + + val clusterAlias = currentReplicationMetadata.replicatedIndices.entries.firstOrNull { + it.value.containsKey(followerIndexName) + }?.key + + val leaderIndex = currentReplicationMetadata.replicatedIndices.get(clusterAlias)?.get(followerIndexName) + + if (leaderIndex == null || clusterAlias == null) { + throw IllegalStateException("Unknown value of leader index or cluster alias") + } + + val remoteMetadata = getRemoteIndexMetadata(clusterAlias, leaderIndex) + + val params = IndexReplicationParams(clusterAlias, remoteMetadata.index, followerIndexName) + val remoteClient = client.getRemoteClusterClient(params.remoteCluster) + val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName).shards() + val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient) + shards.forEach { + val followerShardId = it.value.shardId + log.debug("Removing lease for $followerShardId.id ") + retentionLeaseHelper.removeRetentionLease(ShardId(params.remoteIndex, followerShardId.id), followerShardId) + } + } + + private suspend fun getRemoteIndexMetadata(remoteCluster: String, remoteIndex: String): IndexMetadata { + val remoteClusterClient = client.getRemoteClusterClient(remoteCluster).admin().cluster() + val clusterStateRequest = remoteClusterClient.prepareState() + .clear() + .setIndices(remoteIndex) + .setMetadata(true) + .setIndicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed()) + .request() + val remoteState = suspending(remoteClusterClient::state)(clusterStateRequest).state + return remoteState.metadata.index(remoteIndex) ?: throw IndexNotFoundException("${remoteCluster}:${remoteIndex}") + } + + private fun validateStopReplicationRequest(request: StopIndexReplicationRequest): Boolean { val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName) ?: throw IllegalArgumentException("No replication in progress for index:${request.indexName}") val replicationOverallState = replicationStateParams[REPLICATION_OVERALL_STATE_KEY] if (replicationOverallState == REPLICATION_OVERALL_STATE_RUNNING_VALUE) - return + return false + else if (replicationOverallState == REPLICATION_OVERALL_STATE_PAUSED) { + return true + } throw IllegalStateException("Unknown value of replication state:$replicationOverallState") } @@ -165,6 +216,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: val newMetadata = currentReplicationMetadata.removeIndex(clusterAlias, request.indexName) .removeReplicationStateParams(request.indexName) .removeSecurityContext(clusterAlias, request.indexName) + .clearPausedState(request.indexName) mdBuilder.putCustom(ReplicationMetadata.NAME, newMetadata) } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/metadata/ReplicationMetadata.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/metadata/ReplicationMetadata.kt index fba7acce..740a4928 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/metadata/ReplicationMetadata.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/metadata/ReplicationMetadata.kt @@ -24,7 +24,9 @@ import org.elasticsearch.cluster.metadata.Metadata import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.io.stream.Writeable import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentFragment import org.elasticsearch.common.xcontent.XContentBuilder import org.elasticsearch.common.xcontent.XContentParser import java.io.IOException @@ -36,6 +38,7 @@ import org.elasticsearch.cluster.Diff as ESDiff typealias AutoFollowPatterns = Map // { pattern name -> pattern } typealias ReplicatedIndices = Map // { follower index name -> remote index name } typealias SecurityContexts = Map // { follower index name -> User detail string } +typealias PausedIndices = Map // follower index names -> reason for pause typealias ClusterAlias = String typealias ReplicationStateParams = Map typealias FollowIndexName = String @@ -43,7 +46,8 @@ typealias FollowIndexName = String data class ReplicationMetadata(val autoFollowPatterns: Map, val replicatedIndices: Map, val replicationDetails: Map, - val securityContexts: Map) : Metadata.Custom { + val securityContexts: Map, + val pausedIndices: PausedIndices) : Metadata.Custom { companion object { const val NAME = "replication_metadata" @@ -51,9 +55,10 @@ data class ReplicationMetadata(val autoFollowPatterns: Map() { override fun write(value: AutoFollowPatterns, out: StreamOutput) { @@ -65,6 +70,16 @@ data class ReplicationMetadata(val autoFollowPatterns: Map() { + override fun write(value: String, out: StreamOutput) { + out.writeString(value) + } + + override fun read(inp: StreamInput, key: String): String { + return inp.readString() + } + } + val indicesSerializer = object: NonDiffableValueSerializer() { override fun write(value: ReplicatedIndices, out: StreamOutput) { out.writeMap(value, StreamOutput::writeString, StreamOutput::writeString) @@ -165,6 +180,21 @@ data class ReplicationMetadata(val autoFollowPatterns: Map() + while(parser.nextToken().also { token = it } != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentField = parser.currentName() + } + else if(token == XContentParser.Token.START_OBJECT) { + var pausedDetails = parser.toString() + pausedIndices[currentField!!] = pausedDetails + } else { + throw IllegalArgumentException("Unexpected token during parsing " + + "replication_metadata[$PAUSED_INDICES_KEY] - $token") + } + } + builder.pausedIndices(pausedIndices) } } return builder.build() @@ -176,6 +206,7 @@ data class ReplicationMetadata(val autoFollowPatterns: Map = mapOf() private var replicationDetails: Map = mapOf() private var securityContexts: Map = mapOf() + private var pausedIndices: PausedIndices = mapOf() fun autoFollowPatterns(patterns: Map): Builder { this.autoFollowPattern = patterns @@ -197,8 +228,13 @@ data class ReplicationMetadata(val autoFollowPatterns: Map patternsSerializer.read(i, "") }, inp.readMap(StreamInput::readString) { i -> indicesSerializer.read(i, "") }, inp.readMap(StreamInput::readString) {i -> replicationDetailsSerializer.read(i, "")}, - inp.readMap(StreamInput::readString) { i -> securityContextsSerializer.read(i, "") } + inp.readMap(StreamInput::readString) { i -> securityContextsSerializer.read(i, "") }, + inp.readMap(StreamInput::readString) { i -> pausedSerializer.read(i, "") } ) override fun writeTo(out: StreamOutput) { @@ -214,6 +251,7 @@ data class ReplicationMetadata(val autoFollowPatterns: Map indicesSerializer.write(v, o) } out.writeMap(replicationDetails, StreamOutput::writeString) { o, v -> replicationDetailsSerializer.write(v, o) } out.writeMap(securityContexts, StreamOutput::writeString) { o, v -> securityContextsSerializer.write(v, o)} + out.writeMap(pausedIndices, StreamOutput::writeString) { o, v -> pausedSerializer.write(v, o)} } override fun diff(previousState: Metadata.Custom) = Diff(previousState as ReplicationMetadata, this) @@ -242,6 +280,11 @@ data class ReplicationMetadata(val autoFollowPatterns: Map builder.field(connectionName, securityContext) } + builder.endObject() + builder.startObject(PAUSED_INDICES_KEY) + pausedIndices.forEach { (connectionName, indices) -> + builder.field(connectionName, indices) + } return builder.endObject() } @@ -251,8 +294,16 @@ data class ReplicationMetadata(val autoFollowPatterns: Map> private val replicationDetails : ESDiff> private val securityContexts : ESDiff> + private val pausedIndices: ESDiff constructor(previous: ReplicationMetadata, current: ReplicationMetadata) { autoFollowPatterns = DiffableUtils.diff(previous.autoFollowPatterns, current.autoFollowPatterns, @@ -343,6 +413,8 @@ data class ReplicationMetadata(val autoFollowPatterns: Map { + return listOf(RestHandler.Route(RestRequest.Method.POST, "/_opendistro/_replication/{index}/_pause")) + } + + override fun getName(): String { + return "opendistro_index_pause_replicate_action" + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + request.contentOrSourceParamParser().use { parser -> + val followIndex = request.param("index") + val pauseReplicationRequest = PauseIndexReplicationRequest.fromXContent(parser, followIndex) + return RestChannelConsumer { channel: RestChannel? -> + client.admin().cluster() + .execute(PauseIndexReplicationAction.INSTANCE, pauseReplicationRequest, RestToXContentListener(channel)) + } + } + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/rest/ResumeIndexReplicationHandler.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/rest/ResumeIndexReplicationHandler.kt new file mode 100644 index 00000000..6e38b72e --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/rest/ResumeIndexReplicationHandler.kt @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.rest + +import com.amazon.elasticsearch.replication.action.resume.ResumeIndexReplicationAction +import com.amazon.elasticsearch.replication.action.resume.ResumeIndexReplicationRequest +import org.apache.logging.log4j.LogManager +import org.elasticsearch.client.node.NodeClient +import org.elasticsearch.rest.BaseRestHandler +import org.elasticsearch.rest.RestChannel +import org.elasticsearch.rest.RestHandler +import org.elasticsearch.rest.RestRequest +import org.elasticsearch.rest.action.RestToXContentListener +import java.io.IOException + +class ResumeIndexReplicationHandler : BaseRestHandler() { + + companion object { + private val log = LogManager.getLogger(ResumeIndexReplicationHandler::class.java) + } + + override fun routes(): List { + return listOf(RestHandler.Route(RestRequest.Method.POST, "/_opendistro/_replication/{index}/_resume")) + } + + override fun getName(): String { + return "opendistro_index_resume_replicate_action" + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + request.contentOrSourceParamParser().use { parser -> + val followIndex = request.param("index") + val resumeReplicationRequest = ResumeIndexReplicationRequest.fromXContent(parser, followIndex) + return RestChannelConsumer { channel: RestChannel? -> + client.admin().cluster() + .execute(ResumeIndexReplicationAction.INSTANCE, resumeReplicationRequest, RestToXContentListener(channel)) + } + } + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt index 70c28ffe..5ab34b1a 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt @@ -21,6 +21,7 @@ import org.elasticsearch.client.Client import org.elasticsearch.common.logging.Loggers import org.elasticsearch.index.seqno.RetentionLeaseActions import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException +import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException import org.elasticsearch.index.shard.ShardId @@ -52,6 +53,27 @@ class RemoteClusterRetentionLeaseHelper constructor(val followerClusterName: Str } } + public suspend fun verifyRetentionLeaseExist(remoteShardId: ShardId, followerShardId: ShardId): Boolean { + val retentionLeaseId = retentionLeaseIdForShard(followerClusterName, followerShardId) + // Currently there is no API to describe/list the retention leases . + // So we are verifying the existence of lease by trying to renew a lease by same name . + // If retention lease doesn't exist, this will throw an RetentionLeaseNotFoundException exception + // If it does it will try to RENEW that one with -1 seqno , which should either + // throw RetentionLeaseInvalidRetainingSeqNoException if a retention lease exists with higher seq no. + // which will exist in all probability + // Or if a retention lease already exists with -1 seqno, it will renew that . + val request = RetentionLeaseActions.RenewRequest(remoteShardId, retentionLeaseId, RetentionLeaseActions.RETAIN_ALL, retentionLeaseSource) + try { + client.suspendExecute(RetentionLeaseActions.Renew.INSTANCE, request) + } catch (e : RetentionLeaseInvalidRetainingSeqNoException) { + return true + } + catch (e: RetentionLeaseNotFoundException) { + return false + } + return true + } + public suspend fun renewRetentionLease(remoteShardId: ShardId, seqNo: Long, followerShardId: ShardId) { val retentionLeaseId = retentionLeaseIdForShard(followerClusterName, followerShardId) val request = RetentionLeaseActions.RenewRequest(remoteShardId, retentionLeaseId, seqNo, retentionLeaseSource) diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/ReplicationState.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/ReplicationState.kt index e8f6fa55..978d4e79 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/ReplicationState.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/ReplicationState.kt @@ -27,7 +27,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder */ enum class ReplicationState : Writeable, ToXContentFragment { - INIT, RESTORING, INIT_FOLLOW, FOLLOWING, MONITORING, FAILED, COMPLETED; // TODO: Add PAUSED state + INIT, RESTORING, INIT_FOLLOW, FOLLOWING, MONITORING, FAILED, COMPLETED; override fun writeTo(out: StreamOutput) { out.writeEnum(this) diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationExecutor.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationExecutor.kt index 8dab25b7..db57eb9f 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationExecutor.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationExecutor.kt @@ -41,10 +41,6 @@ class IndexReplicationExecutor(executor: String, private val clusterService: Clu } override fun validate(params: IndexReplicationParams, clusterState: ClusterState) { - if (clusterState.routingTable.hasIndex(params.followerIndexName)) { - throw IllegalArgumentException("Cant use same index again for replication. Either close or " + - "delete the index:${params.followerIndexName}") - } val replicationStateParams = getReplicationStateParamsForIndex(clusterService, params.followerIndexName) ?: throw IllegalStateException("Index task started without replication state in cluster metadata") diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt index 6087ed82..d403cfa6 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt @@ -67,6 +67,10 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine import com.amazon.elasticsearch.replication.action.index.block.UpdateIndexBlockAction +import com.amazon.elasticsearch.replication.action.pause.PauseIndexReplicationAction +import com.amazon.elasticsearch.replication.action.pause.PauseIndexReplicationRequest +import com.amazon.elasticsearch.replication.metadata.REPLICATION_OVERALL_STATE_KEY +import com.amazon.elasticsearch.replication.metadata.REPLICATION_OVERALL_STATE_PAUSED class IndexReplicationTask(id: Long, type: String, action: String, description: String, parentTask: TaskId, @@ -103,7 +107,12 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: val newState = when (currentTaskState.state) { ReplicationState.INIT -> { addListenerToInterruptTask() - startRestore() + if (isResumed()) { + log.debug("Resuming tasks now.") + InitFollowState + } else { + startRestore() + } } ReplicationState.RESTORING -> { waitForRestore() @@ -150,6 +159,10 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: return MonitoringState } + private fun isResumed(): Boolean { + return clusterService.state().routingTable.hasIndex(followerIndexName) + } + private suspend fun stopReplicationTasks() { val stopReplicationResponse = client.suspendExecute(StopIndexReplicationAction.INSTANCE, StopIndexReplicationRequest(followerIndexName)) if (!stopReplicationResponse.isAcknowledged) @@ -169,7 +182,7 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: } override suspend fun cleanup() { - if (currentTaskState.state == ReplicationState.INIT || currentTaskState.state == ReplicationState.RESTORING) { + if (currentTaskState.state == ReplicationState.RESTORING) { log.info("Replication stopped before restore could finish, so removing partial restore..") cancelRestore() } @@ -300,6 +313,10 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: if (replicationStateParams == null) { if (PersistentTasksNodeService.Status(State.STARTED) == status) scope.cancel("Index replication task received an interrupt.") + + } else if (replicationStateParams[REPLICATION_OVERALL_STATE_KEY] == REPLICATION_OVERALL_STATE_PAUSED){ + log.info("Pause state received for index $followerIndexName task") + scope.cancel("Index replication task received a pause.") } } } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt index a012dc77..97b988b3 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt @@ -15,15 +15,21 @@ package com.amazon.elasticsearch.replication.task.shard +import com.amazon.elasticsearch.replication.ReplicationException import com.amazon.elasticsearch.replication.ReplicationPlugin.Companion.REPLICATION_CHANGE_BATCH_SIZE import com.amazon.elasticsearch.replication.action.changes.GetChangesAction import com.amazon.elasticsearch.replication.action.changes.GetChangesRequest import com.amazon.elasticsearch.replication.action.changes.GetChangesResponse +import com.amazon.elasticsearch.replication.action.pause.PauseIndexReplicationAction +import com.amazon.elasticsearch.replication.action.pause.PauseIndexReplicationRequest +import com.amazon.elasticsearch.replication.metadata.REPLICATION_OVERALL_STATE_KEY +import com.amazon.elasticsearch.replication.metadata.REPLICATION_OVERALL_STATE_PAUSED import com.amazon.elasticsearch.replication.metadata.getReplicationStateParamsForIndex import com.amazon.elasticsearch.replication.seqno.RemoteClusterRetentionLeaseHelper import com.amazon.elasticsearch.replication.task.CrossClusterReplicationTask import com.amazon.elasticsearch.replication.task.ReplicationState import com.amazon.elasticsearch.replication.util.indicesService +import com.amazon.elasticsearch.replication.util.suspendExecute import com.amazon.elasticsearch.replication.util.suspendExecuteWithRetries import com.amazon.elasticsearch.replication.util.suspending import kotlinx.coroutines.ObsoleteCoroutinesApi @@ -59,6 +65,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: private val followerShardId = params.followerShardId private val remoteClient = client.getRemoteClusterClient(remoteCluster) private val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient) + private var paused = false private val clusterStateListenerForTaskInterruption = ClusterStateListenerForTaskInterruption() @@ -80,11 +87,16 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: } override suspend fun cleanup() { - retentionLeaseHelper.removeRetentionLease(remoteShardId, followerShardId) /* This is to minimise overhead of calling an additional listener as - * it continues to be called even after the task is completed. + * it continues to be called even after the task is completed. */ clusterService.removeListener(clusterStateListenerForTaskInterruption) + if (paused) { + log.debug("Pausing and not removing lease for index $followerIndexName and shard $followerShardId task") + return + } + retentionLeaseHelper.removeRetentionLease(remoteShardId, followerShardId) + } private fun addListenerToInterruptTask() { @@ -99,6 +111,10 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: if (replicationStateParams == null) { if (PersistentTasksNodeService.Status(State.STARTED) == status) scope.cancel("Shard replication task received an interrupt.") + } else if (replicationStateParams[REPLICATION_OVERALL_STATE_KEY] == REPLICATION_OVERALL_STATE_PAUSED){ + log.info("Pause state received for index $followerIndexName. Cancelling $followerShardId task") + paused = true + scope.cancel("Shard replication task received pause.") } } } @@ -109,14 +125,16 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: @ObsoleteCoroutinesApi private suspend fun replicate() { updateTaskState(FollowingState) - // TODO: Acquire retention lease prior to initiating remote recovery - retentionLeaseHelper.addRetentionLease(remoteShardId, RetentionLeaseActions.RETAIN_ALL, followerShardId) val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) val indexShard = followerIndexService.getShard(followerShardId.id) // After restore, persisted localcheckpoint is matched with maxSeqNo. // Fetch the operations after localCheckpoint from the leader var seqNo = indexShard.localCheckpoint + 1 val node = primaryShardNode() + + // TODO: Acquire retention lease prior to initiating remote recovery + retentionLeaseHelper.addRetentionLease(remoteShardId, seqNo, followerShardId) + addListenerToInterruptTask() // Not really used yet as we only have one get changes action at a time. @@ -172,4 +190,11 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: // Cancellation and valid executions are marked as completed return CrossClusterReplicationTaskResponse(ReplicationState.COMPLETED.name) } + + // ToDo : Use in case of non retriable errors + private suspend fun pauseReplicationTasks(reason: String) { + val pauseReplicationResponse = client.suspendExecute(PauseIndexReplicationAction.INSTANCE, PauseIndexReplicationRequest(followerIndexName, reason)) + if (!pauseReplicationResponse.isAcknowledged) + throw ReplicationException("Failed to pause replication") + } } diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt index 773291c6..101e188b 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt @@ -37,6 +37,8 @@ data class StartReplicationRequest(val remoteClusterAlias: String, val remoteInd const val REST_REPLICATION_PREFIX = "/_opendistro/_replication/" const val REST_REPLICATION_START = "$REST_REPLICATION_PREFIX{index}/_start" const val REST_REPLICATION_STOP = "$REST_REPLICATION_PREFIX{index}/_stop" +const val REST_REPLICATION_PAUSE = "$REST_REPLICATION_PREFIX{index}/_pause" +const val REST_REPLICATION_RESUME = "$REST_REPLICATION_PREFIX{index}/_resume" const val REST_AUTO_FOLLOW_PATTERN = "_opendistro/_replication/_autofollow" fun RestHighLevelClient.startReplication(request: StartReplicationRequest, @@ -73,6 +75,24 @@ fun RestHighLevelClient.stopReplication(index: String) { waitForReplicationStop(index) } +fun RestHighLevelClient.pauseReplication(index: String) { + val lowLevelStopRequest = Request("POST", REST_REPLICATION_PAUSE.replace("{index}", index,true)) + lowLevelStopRequest.setJsonEntity("{}") + val lowLevelStopResponse = lowLevelClient.performRequest(lowLevelStopRequest) + val response = getAckResponse(lowLevelStopResponse) + assertThat(response.isAcknowledged).withFailMessage("Replication could not be stopped").isTrue() + waitForReplicationStop(index) +} + +fun RestHighLevelClient.resumeReplication(index: String) { + val lowLevelStopRequest = Request("POST", REST_REPLICATION_RESUME.replace("{index}", index,true)) + lowLevelStopRequest.setJsonEntity("{}") + val lowLevelStopResponse = lowLevelClient.performRequest(lowLevelStopRequest) + val response = getAckResponse(lowLevelStopResponse) + assertThat(response.isAcknowledged).withFailMessage("Replication could not be Resumed").isTrue() + waitForReplicationStart(index, TimeValue.timeValueSeconds(10)) +} + fun RestHighLevelClient.waitForReplicationStart(index: String, waitFor : TimeValue = TimeValue.timeValueSeconds(10)) { assertBusy( { diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt new file mode 100644 index 00000000..f990eebc --- /dev/null +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/PauseReplicationIT.kt @@ -0,0 +1,172 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.integ.rest + +import com.amazon.elasticsearch.replication.* +import org.apache.http.util.EntityUtils +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.elasticsearch.ElasticsearchStatusException +import org.elasticsearch.action.DocWriteResponse +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest +import org.elasticsearch.action.admin.indices.flush.FlushRequest +import org.elasticsearch.action.index.IndexRequest +import org.elasticsearch.client.Request +import org.elasticsearch.client.RequestOptions +import org.elasticsearch.client.ResponseException +import org.elasticsearch.client.RestHighLevelClient +import org.elasticsearch.client.indices.CreateIndexRequest +import org.elasticsearch.client.indices.GetIndexRequest +import org.elasticsearch.cluster.metadata.IndexMetadata +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.common.unit.TimeValue +import org.elasticsearch.index.mapper.MapperService +import org.elasticsearch.test.ESTestCase.assertBusy +import java.util.concurrent.TimeUnit + + + +@MultiClusterAnnotations.ClusterConfigurations( + MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), + MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) +) +class PauseReplicationIT: MultiClusterRestTestCase() { + private val leaderIndexName = "leader_index" + private val followerIndexName = "paused_index" + + fun `test pause replication in following state and empty index`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + // Since, we were still in FOLLOWING phase when pause was called, the index + // in follower index should not have been deleted in follower cluster + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + + followerClient.resumeReplication(followerIndexName) + } + + fun `test pause replication in restoring state with multiple shards`() { + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 20) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, Long.MAX_VALUE) + .build() + testPauseReplicationInRestoringState(settings, 5000, 1000, 1000) + } + + private fun testPauseReplicationInRestoringState(settings: Settings, + nFields: Int, + fieldLength: Int, + stepSize: Int) { + logger.info("""Testing pause replication in restoring state with params: + | shards:$settings[IndexMetadata.SETTING_NUMBER_OF_SHARDS] + | nFields:$nFields + | fieldLength:$fieldLength + | stepSize:$stepSize + | """.trimMargin()) + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), + RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + // Put a large amount of data into the index + fillIndex(leaderClient, leaderIndexName, nFields, fieldLength, stepSize) + assertBusy { + assertThat(leaderClient.indices() + .exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)) + } + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + false) + //Given the size of index, the replication should be in RESTORING phase at this point + assertThatThrownBy { + followerClient.pauseReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Index is in restore phase currently for index: ${followerIndexName}") + } + + private fun fillIndex(clusterClient: RestHighLevelClient, + indexName : String, + nFields: Int, + fieldLength: Int, + stepSize: Int) { + for (i in nFields downTo 1 step stepSize) { + val sourceMap : MutableMap = HashMap() + for (j in stepSize downTo 1) + sourceMap[(i-j).toString()] = randomAlphaOfLength(fieldLength) + logger.info("Updating index with map of size:${sourceMap.size}") + val indexResponse = clusterClient.index(IndexRequest(indexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) + assertThat(indexResponse.result).isIn(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED) + } + //flush the index + clusterClient.indices().flush(FlushRequest(indexName), RequestOptions.DEFAULT) + } + + fun `test pause without replication in progress`() { + val followerClient = getClientForCluster(FOLLOWER) + //ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ? + var randomIndex = "random" + val createIndexResponse = followerClient.indices().create(CreateIndexRequest(randomIndex), + RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + assertThatThrownBy { + followerClient.pauseReplication(randomIndex) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("No replication in progress for index:$randomIndex") + } + + fun `test pause replication and stop replication`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + // Since, we were still in FOLLOWING phase when pause was called, the index + // in follower index should not have been deleted in follower cluster + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + + followerClient.stopReplication(followerIndexName) + } + +} diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt new file mode 100644 index 00000000..e2206711 --- /dev/null +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/ResumeReplicationIT.kt @@ -0,0 +1,158 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.elasticsearch.replication.integ.rest + +import com.amazon.elasticsearch.replication.* +import org.apache.http.util.EntityUtils +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.elasticsearch.ElasticsearchStatusException +import org.elasticsearch.action.DocWriteResponse +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest +import org.elasticsearch.action.admin.indices.flush.FlushRequest +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest +import org.elasticsearch.action.index.IndexRequest +import org.elasticsearch.client.Request +import org.elasticsearch.client.RequestOptions +import org.elasticsearch.client.ResponseException +import org.elasticsearch.client.RestHighLevelClient +import org.elasticsearch.client.indices.CloseIndexRequest +import org.elasticsearch.client.indices.CreateIndexRequest +import org.elasticsearch.client.indices.GetIndexRequest +import org.elasticsearch.cluster.metadata.IndexMetadata +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.common.unit.TimeValue +import org.elasticsearch.index.mapper.MapperService +import org.elasticsearch.test.ESTestCase.assertBusy +import java.util.concurrent.TimeUnit + + + +@MultiClusterAnnotations.ClusterConfigurations( + MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), + MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) +) +class ResumeReplicationIT: MultiClusterRestTestCase() { + private val leaderIndexName = "leader_index" + private val followerIndexName = "resumed_index" + + fun `test pause and resume replication in following state and empty index`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + followerClient.resumeReplication(followerIndexName) + } + + + fun `test resume without pause `() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + assertThatThrownBy { + followerClient.resumeReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Replication on Index ${followerIndexName} is already running") + } + + fun `test resume without retention lease`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + var createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + + followerClient.pauseReplication(followerIndexName) + + // If we delete the existing index and recreate the index with same name, retention leases should be lost + val deleteIndexResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(deleteIndexResponse.isAcknowledged).isTrue() + createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + + assertThatThrownBy { + followerClient.resumeReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Retention lease doesn't exist. Replication can't be resumed for $followerIndexName") + + } + + fun `test pause and resume replication amid leader index close and open`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + + leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + leaderClient.indices().open(OpenIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + + followerClient.resumeReplication(followerIndexName) + } + + fun `test pause and resume replication amid index close`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) + + /* At this point, the follower cluster should be in FOLLOWING state. Next, we pause replication + and verify the same + */ + followerClient.pauseReplication(followerIndexName) + + leaderClient.indices().close(CloseIndexRequest(leaderIndexName), RequestOptions.DEFAULT); + + assertThatThrownBy { + followerClient.resumeReplication(followerIndexName) + }.isInstanceOf(ResponseException::class.java) + .hasMessageContaining("closed") + + } + + +} diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt index 8f6a6d95..a16469cb 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt @@ -75,8 +75,8 @@ class StartReplicationIT: MultiClusterRestTestCase() { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) assertThatThrownBy { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - }.isInstanceOf(ResponseException::class.java).hasMessageContaining("{\"error\":{\"root_cause\":[{\"type\":\"resource_already_exists_exception\"," + - "\"reason\":\"task with id {replication:index:follower_index} already exist\"}]") + }.isInstanceOf(ResponseException::class.java).hasMessageContaining("Cant use same index again for replication." + + " Either close or delete the index:$followerIndexName") } fun `test start replication fails when remote cluster alias does not exist`() {