From a561ad970757c93fb8b96fd1932b7549de4a3afb Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 24 Aug 2022 11:52:54 +0530 Subject: [PATCH] Validate leader index primary shard state before starting replication (#407) (#419) (#421) Signed-off-by: Sai Kumar (cherry picked from commit 680212b83d03079b3f7ecf58ca877efc5206e371) Co-authored-by: Sai Kumar --- .../index/TransportReplicateIndexAction.kt | 11 ++-- .../replication/util/ValidationUtil.kt | 20 ++++++- .../integ/rest/StartReplicationIT.kt | 59 +++++++++++++++++++ 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt index 11a60aa1..c7836eec 100644 --- a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt @@ -32,6 +32,7 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.IndicesOptions import org.opensearch.client.Client +import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings @@ -82,8 +83,8 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp // Any checks on the settings is followed by setup checks to ensure all relevant changes are // present across the plugins // validate index metadata on the leader cluster - val leaderIndexMetadata = getLeaderIndexMetadata(request.leaderAlias, request.leaderIndex) - ValidationUtil.validateLeaderIndexMetadata(leaderIndexMetadata) + val leaderClusterState = getLeaderClusterState(request.leaderAlias, request.leaderIndex) + ValidationUtil.validateLeaderIndexState(request.leaderAlias, request.leaderIndex, leaderClusterState) val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex) @@ -112,17 +113,17 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp } } - private suspend fun getLeaderIndexMetadata(leaderAlias: String, leaderIndex: String): IndexMetadata { + private suspend fun getLeaderClusterState(leaderAlias: String, leaderIndex: String): ClusterState { val remoteClusterClient = client.getRemoteClusterClient(leaderAlias) val clusterStateRequest = remoteClusterClient.admin().cluster().prepareState() .clear() .setIndices(leaderIndex) + .setRoutingTable(true) .setMetadata(true) .setIndicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed()) .request() - val remoteState = remoteClusterClient.suspending(remoteClusterClient.admin().cluster()::state, + return remoteClusterClient.suspending(remoteClusterClient.admin().cluster()::state, injectSecurityContext = true, defaultContext = true)(clusterStateRequest).state - return remoteState.metadata.index(leaderIndex) ?: throw IndexNotFoundException("${leaderAlias}:${leaderIndex}") } private suspend fun getLeaderIndexSettings(leaderAlias: String, leaderIndex: String): Settings { diff --git a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt index b176a182..fb7c49e2 100644 --- a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt +++ b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt @@ -14,12 +14,14 @@ package org.opensearch.replication.util import org.apache.logging.log4j.LogManager import org.opensearch.ResourceNotFoundException import org.opensearch.Version +import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.MetadataCreateIndexService import org.opensearch.common.Strings import org.opensearch.common.ValidationException import org.opensearch.common.settings.Settings import org.opensearch.env.Environment +import org.opensearch.index.IndexNotFoundException import java.io.UnsupportedEncodingException import java.nio.file.Files import java.nio.file.Path @@ -82,7 +84,7 @@ object ValidationUtil { * - Upgrade path - Upgrade Follower cluster to higher version * and then upgrade leader */ - fun validateLeaderIndexMetadata(leaderIndexMetadata: IndexMetadata) { + private fun validateLeaderIndexMetadata(leaderIndexMetadata: IndexMetadata) { if(Version.CURRENT.before(leaderIndexMetadata.creationVersion)) { val err = "Leader index[${leaderIndexMetadata.index.name}] is on " + "higher version [${leaderIndexMetadata.creationVersion}] than follower [${Version.CURRENT}]" @@ -96,4 +98,20 @@ object ValidationUtil { throw IllegalArgumentException(err) } } + + /** + * validate leader index state - version and shard routing, based on leader cluster state + */ + fun validateLeaderIndexState(leaderAlias: String, leaderIndex: String, leaderClusterState: ClusterState) { + val leaderIndexMetadata = leaderClusterState.metadata.index(leaderIndex) ?: throw IndexNotFoundException("${leaderAlias}:${leaderIndex}") + // validate index metadata + validateLeaderIndexMetadata(leaderIndexMetadata) + + // validate index shard state - All primary shards should be active + if(!leaderClusterState.routingTable.index(leaderIndex).allPrimaryShardsActive()) { + val validationException = ValidationException() + validationException.addValidationError("Primary shards in the Index[${leaderAlias}:${leaderIndex}] are not active") + throw validationException + } + } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 4d9907ed..ae8e2eec 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -65,6 +65,8 @@ import org.opensearch.index.mapper.MapperService import org.opensearch.repositories.fs.FsRepository import org.opensearch.test.OpenSearchTestCase.assertBusy import org.junit.Assert +import org.opensearch.common.xcontent.DeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING import org.opensearch.replication.followerStats import org.opensearch.replication.leaderStats @@ -1153,6 +1155,63 @@ class StartReplicationIT: MultiClusterRestTestCase() { } } + fun `test that replication is not started when all primary shards are not in active state`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + // Exclude leader cluster nodes to stop assignment for the new shards + excludeAllClusterNodes(LEADER) + try{ + leaderClient.indices().create( + CreateIndexRequest(leaderIndexName), + RequestOptions.DEFAULT + ) + } catch(_: Exception) { + // Index creation + } + // Index should be present (although shards will not assigned). + assertBusy { + assertThat(leaderClient.indices().exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)).isEqualTo(true) + } + + // start repilcation should fail as the shards are not active on the leader cluster + assertThatThrownBy { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) } + .isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Primary shards in the Index[source:${leaderIndexName}] are not active") + + } + + private fun excludeAllClusterNodes(clusterName: String) { + val transientSettingsRequest = Request("PUT", "_cluster/settings") + // Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster. + val excludeIps = getClusterNodeIPs(clusterName) + val entityAsString = """ + { + "transient": { + "cluster.routing.allocation.exclude._ip": "${excludeIps.joinToString()}" + } + }""".trimMargin() + transientSettingsRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON) + val transientSettingsResponse = getNamedCluster(clusterName).lowLevelClient.performRequest(transientSettingsRequest) + assertEquals(HttpStatus.SC_OK.toLong(), transientSettingsResponse.statusLine.statusCode.toLong()) + } + + private fun getClusterNodeIPs(clusterName: String): List { + val clusterClient = getNamedCluster(clusterName).lowLevelClient + val nodesRequest = Request("GET", "_cat/nodes?format=json") + val nodesResponse = EntityUtils.toString(clusterClient.performRequest(nodesRequest).entity) + val nodeIPs = arrayListOf() + val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, nodesResponse) + parser.list().forEach { + it as Map<*, *> + nodeIPs.add(it["ip"] as String) + } + return nodeIPs + } + private fun assertValidationFailure(client: RestHighLevelClient, leader: String, follower: String, errrorMsg: String) { assertThatThrownBy { client.startReplication(StartReplicationRequest("source", leader, follower))