Skip to content

Commit

Permalink
Validate leader index primary shard state before starting replication (
Browse files Browse the repository at this point in the history
…#407) (#419)

Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon authored Jun 3, 2022
1 parent 02b0eb3 commit 680212b
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.IndicesOptions
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -82,8 +83,8 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
// Any checks on the settings is followed by setup checks to ensure all relevant changes are
// present across the plugins
// validate index metadata on the leader cluster
val leaderIndexMetadata = getLeaderIndexMetadata(request.leaderAlias, request.leaderIndex)
ValidationUtil.validateLeaderIndexMetadata(leaderIndexMetadata)
val leaderClusterState = getLeaderClusterState(request.leaderAlias, request.leaderIndex)
ValidationUtil.validateLeaderIndexState(request.leaderAlias, request.leaderIndex, leaderClusterState)

val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex)

Expand Down Expand Up @@ -112,17 +113,17 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
}
}

private suspend fun getLeaderIndexMetadata(leaderAlias: String, leaderIndex: String): IndexMetadata {
private suspend fun getLeaderClusterState(leaderAlias: String, leaderIndex: String): ClusterState {
val remoteClusterClient = client.getRemoteClusterClient(leaderAlias)
val clusterStateRequest = remoteClusterClient.admin().cluster().prepareState()
.clear()
.setIndices(leaderIndex)
.setRoutingTable(true)
.setMetadata(true)
.setIndicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed())
.request()
val remoteState = remoteClusterClient.suspending(remoteClusterClient.admin().cluster()::state,
return remoteClusterClient.suspending(remoteClusterClient.admin().cluster()::state,
injectSecurityContext = true, defaultContext = true)(clusterStateRequest).state
return remoteState.metadata.index(leaderIndex) ?: throw IndexNotFoundException("${leaderAlias}:${leaderIndex}")
}

private suspend fun getLeaderIndexSettings(leaderAlias: String, leaderIndex: String): Settings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ package org.opensearch.replication.util
import org.apache.logging.log4j.LogManager
import org.opensearch.ResourceNotFoundException
import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.common.Strings
import org.opensearch.common.ValidationException
import org.opensearch.common.settings.Settings
import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -85,7 +87,7 @@ object ValidationUtil {
* This ensures that clusters upgrade in the above order for the existing cross cluster
* connections
*/
fun validateLeaderIndexMetadata(leaderIndexMetadata: IndexMetadata) {
private fun validateLeaderIndexMetadata(leaderIndexMetadata: IndexMetadata) {
if(Version.CURRENT.before(leaderIndexMetadata.creationVersion)) {
val err = "Leader index[${leaderIndexMetadata.index.name}] is on " +
"higher version [${leaderIndexMetadata.creationVersion}] than follower [${Version.CURRENT}]"
Expand All @@ -99,4 +101,20 @@ object ValidationUtil {
throw IllegalArgumentException(err)
}
}

/**
* validate leader index state - version and shard routing, based on leader cluster state
*/
fun validateLeaderIndexState(leaderAlias: String, leaderIndex: String, leaderClusterState: ClusterState) {
val leaderIndexMetadata = leaderClusterState.metadata.index(leaderIndex) ?: throw IndexNotFoundException("${leaderAlias}:${leaderIndex}")
// validate index metadata
validateLeaderIndexMetadata(leaderIndexMetadata)

// validate index shard state - All primary shards should be active
if(!leaderClusterState.routingTable.index(leaderIndex).allPrimaryShardsActive()) {
val validationException = ValidationException()
validationException.addValidationError("Primary shards in the Index[${leaderAlias}:${leaderIndex}] are not active")
throw validationException
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ import org.opensearch.index.mapper.MapperService
import org.opensearch.repositories.fs.FsRepository
import org.opensearch.test.OpenSearchTestCase.assertBusy
import org.junit.Assert
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING
import org.opensearch.replication.followerStats
import org.opensearch.replication.leaderStats
Expand Down Expand Up @@ -1166,6 +1168,63 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}
}

fun `test that replication is not started when all primary shards are not in active state`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)
// Exclude leader cluster nodes to stop assignment for the new shards
excludeAllClusterNodes(LEADER)
try{
leaderClient.indices().create(
CreateIndexRequest(leaderIndexName),
RequestOptions.DEFAULT
)
} catch(_: Exception) {
// Index creation
}
// Index should be present (although shards will not assigned).
assertBusy {
assertThat(leaderClient.indices().exists(GetIndexRequest(leaderIndexName), RequestOptions.DEFAULT)).isEqualTo(true)
}

// start repilcation should fail as the shards are not active on the leader cluster
assertThatThrownBy { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
waitForRestore = true) }
.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("Primary shards in the Index[source:${leaderIndexName}] are not active")

}

private fun excludeAllClusterNodes(clusterName: String) {
val transientSettingsRequest = Request("PUT", "_cluster/settings")
// Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster.
val excludeIps = getClusterNodeIPs(clusterName)
val entityAsString = """
{
"transient": {
"cluster.routing.allocation.exclude._ip": "${excludeIps.joinToString()}"
}
}""".trimMargin()
transientSettingsRequest.entity = NStringEntity(entityAsString, ContentType.APPLICATION_JSON)
val transientSettingsResponse = getNamedCluster(clusterName).lowLevelClient.performRequest(transientSettingsRequest)
assertEquals(HttpStatus.SC_OK.toLong(), transientSettingsResponse.statusLine.statusCode.toLong())
}

private fun getClusterNodeIPs(clusterName: String): List<String> {
val clusterClient = getNamedCluster(clusterName).lowLevelClient
val nodesRequest = Request("GET", "_cat/nodes?format=json")
val nodesResponse = EntityUtils.toString(clusterClient.performRequest(nodesRequest).entity)
val nodeIPs = arrayListOf<String>()
val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, nodesResponse)
parser.list().forEach {
it as Map<*, *>
nodeIPs.add(it["ip"] as String)
}
return nodeIPs
}

private fun assertValidationFailure(client: RestHighLevelClient, leader: String, follower: String, errrorMsg: String) {
assertThatThrownBy {
client.startReplication(StartReplicationRequest("source", leader, follower))
Expand Down

0 comments on commit 680212b

Please sign in to comment.