Skip to content

Commit

Permalink
Replica Count Validation when awareness replica balance is enabled (o…
Browse files Browse the repository at this point in the history
…pensearch-project#429) (opensearch-project#463)

* bump version to 2.2 (opensearch-project#446)

Signed-off-by: Ashish Agrawal <[email protected]>

* Replica Count validation when awareness replica balance is enabled

Signed-off-by: Gaurav Bafna <[email protected]>

* Addressing PR comments

Signed-off-by: Gaurav Bafna <[email protected]>

Signed-off-by: Ashish Agrawal <[email protected]>
Signed-off-by: Gaurav Bafna <[email protected]>
Co-authored-by: Ashish Agrawal <[email protected]>
(cherry picked from commit f64c0c7)

Co-authored-by: Gaurav Bafna <[email protected]>
Signed-off-by: Angie Zhang <[email protected]>
  • Loading branch information
2 people authored and Angie Zhang committed Sep 12, 2022
1 parent 768d528 commit 8f3a348
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
Expand All @@ -30,6 +32,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator.Companion.MAX_HITS
import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction
import org.opensearch.indexmanagement.indexstatemanagement.findConflictingPolicyTemplates
import org.opensearch.indexmanagement.indexstatemanagement.findSelfConflictingTemplates
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
Expand Down Expand Up @@ -58,12 +61,14 @@ class TransportIndexPolicyAction @Inject constructor(
val ismIndices: IndexManagementIndices,
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
var awarenessReplicaBalance: AwarenessReplicaBalance,
) : HandledTransportAction<IndexPolicyRequest, IndexPolicyResponse>(
IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest
) {

@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)
@Volatile
private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.FILTER_BY_BACKEND_ROLES) {
Expand All @@ -82,6 +87,7 @@ class TransportIndexPolicyAction @Inject constructor(
private val user: User? = buildUser(client.threadPool().threadContext)
) {
fun start() {
validate()
log.debug(
"User and roles string from thread context: ${client.threadPool().threadContext.getTransient<String>(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
Expand All @@ -103,6 +109,22 @@ class TransportIndexPolicyAction @Inject constructor(
}
}

@Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth")
private fun validate() {
request.policy.states.forEach { state ->
state.actions.forEach { action ->
if (action is ReplicaCountAction) {
val error = awarenessReplicaBalance.validate(action.numOfReplicas)
if (error.isPresent) {
val ex = ValidationException()
ex.addValidationError(error.get())
actionListener.onFailure(ex)
}
}
}
}
}

private fun onCreateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Successfully created or updated ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mappings.")
Expand Down Expand Up @@ -134,9 +156,10 @@ class TransportIndexPolicyAction @Inject constructor(
// check self overlapping
val selfOverlap = ismTemplateList.findSelfConflictingTemplates()
if (selfOverlap != null) {
val errorMessage = "New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " +
"matching this policy's other ISM templates with index patterns ${selfOverlap.second}," +
" please use different priority"
val errorMessage =
"New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " +
"matching this policy's other ISM templates with index patterns ${selfOverlap.second}," +
" please use different priority"
actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage)))
return
}
Expand Down Expand Up @@ -209,7 +232,12 @@ class TransportIndexPolicyAction @Inject constructor(
override fun onResponse(response: IndexResponse) {
val failureReasons = checkShardsFailure(response)
if (failureReasons != null) {
actionListener.onFailure(OpenSearchStatusException(failureReasons.toString(), response.status()))
actionListener.onFailure(
OpenSearchStatusException(
failureReasons.toString(),
response.status()
)
)
return
}
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.junit.Assert
import org.opensearch.client.ResponseException
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.toJsonString
import org.opensearch.indexmanagement.makeRequest
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class IndexPolicyActionIT : IndexStateManagementRestTestCase() {
private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test allocation aware replica count`() {
val policyID = "${testIndexName}_testPolicyName_replica"
var actionConfig = ReplicaCountAction(3, 0)
var states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf()))
updateClusterSetting(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, "true")
updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.key, "zone")

// creates a dummy policy , so that ISM index gets initialized
var policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
client().makeRequest(
"PUT",
"${IndexManagementPlugin.POLICY_BASE_URI}/init-index",
emptyMap(),
StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON)
)

updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.key + "zone.values", "a, b")

// Valid replica count, shouldn't throw exception
client().makeRequest(
"PUT",
"${IndexManagementPlugin.POLICY_BASE_URI}/$policyID",
emptyMap(),
StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON)
)

actionConfig = ReplicaCountAction(4, 0)
states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf()))
policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
Assert.assertThrows(
ResponseException::class.java
) {
client().makeRequest(
"PUT",
"${IndexManagementPlugin.POLICY_BASE_URI}/$policyID",
emptyMap(),
StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON)
)
}

// clean up cluster settings
updateClusterSetting(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, "true")
updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.key, "")
updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.key + "zone", "")
}
}

0 comments on commit 8f3a348

Please sign in to comment.