From 41e12448011f918c3254e677410902d56656ac62 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 26 Jul 2022 12:45:01 +0530 Subject: [PATCH] Replica Count validation when awareness replica balance is enabled Signed-off-by: Gaurav Bafna --- .../indexpolicy/TransportIndexPolicyAction.kt | 41 +++++++-- .../action/IndexPolicyActionIT.kt | 89 +++++++++++++++++++ 2 files changed, 124 insertions(+), 6 deletions(-) create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexPolicyActionIT.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index d16e332c8..b515b7d8b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -18,7 +18,9 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.ValidationException import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.NamedXContentRegistry @@ -30,6 +32,7 @@ import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator.Companion.MAX_HITS +import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction import org.opensearch.indexmanagement.indexstatemanagement.findConflictingPolicyTemplates import org.opensearch.indexmanagement.indexstatemanagement.findSelfConflictingTemplates import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate @@ -58,12 +61,14 @@ class TransportIndexPolicyAction @Inject constructor( val ismIndices: IndexManagementIndices, val clusterService: ClusterService, val settings: Settings, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, + var awarenessReplicaBalance: AwarenessReplicaBalance, ) : HandledTransportAction( IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest ) { - @Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings) + @Volatile + private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings) init { clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.FILTER_BY_BACKEND_ROLES) { @@ -82,6 +87,7 @@ class TransportIndexPolicyAction @Inject constructor( private val user: User? = buildUser(client.threadPool().threadContext) ) { fun start() { + validate() log.debug( "User and roles string from thread context: ${client.threadPool().threadContext.getTransient( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT @@ -103,6 +109,23 @@ class TransportIndexPolicyAction @Inject constructor( } } + @Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth") + private fun validate() { + val states = request.policy.states + for (state in states) { + for (action in state.actions) { + if (action is ReplicaCountAction) { + val error = awarenessReplicaBalance.validate(action.numOfReplicas) + if (error.isPresent) { + val ex = ValidationException() + ex.addValidationError(error.get()) + actionListener.onFailure(ex) + } + } + } + } + } + private fun onCreateMappingsResponse(response: AcknowledgedResponse) { if (response.isAcknowledged) { log.info("Successfully created or updated ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mappings.") @@ -134,9 +157,10 @@ class TransportIndexPolicyAction @Inject constructor( // check self overlapping val selfOverlap = ismTemplateList.findSelfConflictingTemplates() if (selfOverlap != null) { - val errorMessage = "New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " + - "matching this policy's other ISM templates with index patterns ${selfOverlap.second}," + - " please use different priority" + val errorMessage = + "New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " + + "matching this policy's other ISM templates with index patterns ${selfOverlap.second}," + + " please use different priority" actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage))) return } @@ -209,7 +233,12 @@ class TransportIndexPolicyAction @Inject constructor( override fun onResponse(response: IndexResponse) { val failureReasons = checkShardsFailure(response) if (failureReasons != null) { - actionListener.onFailure(OpenSearchStatusException(failureReasons.toString(), response.status())) + actionListener.onFailure( + OpenSearchStatusException( + failureReasons.toString(), + response.status() + ) + ) return } actionListener.onResponse( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexPolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexPolicyActionIT.kt new file mode 100644 index 000000000..b730100af --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexPolicyActionIT.kt @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.junit.Assert +import org.opensearch.client.ResponseException +import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.indexstatemanagement.toJsonString +import org.opensearch.indexmanagement.makeRequest +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class IndexPolicyActionIT : IndexStateManagementRestTestCase() { + private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + + fun `test allocation aware replica count`() { + val policyID = "${testIndexName}_testPolicyName_replica" + var actionConfig = ReplicaCountAction(3, 0) + var states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf())) + updateClusterSetting(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, "true") + updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.key, "zone") + + // creates a dummy policy , so that ISM index gets initialized + var policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + client().makeRequest( + "PUT", + "${IndexManagementPlugin.POLICY_BASE_URI}/init-index", + emptyMap(), + StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON) + ) + + updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.key + "zone.values", "a, b") + + // Valid replica count, shouldn't throw exception + client().makeRequest( + "PUT", + "${IndexManagementPlugin.POLICY_BASE_URI}/$policyID", + emptyMap(), + StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON) + ) + + actionConfig = ReplicaCountAction(4, 0) + states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf())) + policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + Assert.assertThrows( + ResponseException::class.java + ) { + client().makeRequest( + "PUT", + "${IndexManagementPlugin.POLICY_BASE_URI}/$policyID", + emptyMap(), + StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON) + ) + } + + // clean up cluster settings + updateClusterSetting(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, "true") + updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.key, "") + updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.key + "zone", "") + } +}