Skip to content

Commit

Permalink
Error prevention stage 1 (#579) (#592)
Browse files Browse the repository at this point in the history
* Removed recursion from Explain Action to avoid stackoverflow in some situations (#419)
* enabled by default integrated
* Fix Test cases
* Fix comments; set validation disabled by default
* Rename validation_service to action_validation; Fix some detekt issues

Signed-off-by: Joanne Wang <[email protected]>
Signed-off-by: Petar Dzepina <[email protected]>
Signed-off-by: Angie Zhang <[email protected]>

Co-authored-by: Joanne Wang <[email protected]>
Co-authored-by: Petar <[email protected]>
Co-authored-by: Angie Zhang <[email protected]>
  • Loading branch information
4 people authored Nov 3, 2022
1 parent 8f78467 commit 57544c3
Show file tree
Hide file tree
Showing 60 changed files with 1,904 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.settings.Settings
import org.opensearch.monitor.jvm.JvmService
import java.util.Locale

abstract class Validate(
val settings: Settings,
val clusterService: ClusterService,
val jvmService: JvmService
) {

var validationStatus = ValidationStatus.PASSED
var validationMessage: String? = "Starting Validation"

abstract fun execute(indexName: String): Validate

enum class ValidationStatus(val status: String) : Writeable {
PASSED("passed"),
RE_VALIDATING("re_validating"),
FAILED("failed");

override fun toString(): String {
return status
}

override fun writeTo(out: StreamOutput) {
out.writeString(status)
}

companion object {
fun read(streamInput: StreamInput): ValidationStatus {
return valueOf(streamInput.readString().uppercase(Locale.ROOT))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ data class ManagedIndexMetaData(
var action: ActionMetaData? = null
var step: StepMetaData? = null
var retryInfo: PolicyRetryInfoMetaData? = null

var info: Map<String, Any>? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.common.Strings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentFragment
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets
import java.util.Locale

data class ValidationResult(
val validationMessage: String,
val validationStatus: Validate.ValidationStatus
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeString(validationMessage)
validationStatus.writeTo(out)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder
.field(VALIDATION_MESSAGE, validationMessage)
.field(VALIDATION_STATUS, validationStatus.toString())
return builder
}

fun getMapValueString(): String {
return Strings.toString(this, false, false)
}

companion object {
const val VALIDATE = "validate"
const val VALIDATION_MESSAGE = "validation_message"
const val VALIDATION_STATUS = "validation_status"

fun fromStreamInput(si: StreamInput): ValidationResult {
val validationMessage: String? = si.readString()
val validationStatus: Validate.ValidationStatus? = Validate.ValidationStatus.read(si)

return ValidationResult(
requireNotNull(validationMessage) { "$VALIDATION_MESSAGE is null" },
requireNotNull(validationStatus) { "$VALIDATION_STATUS is null" }
)
}

fun fromManagedIndexMetaDataMap(map: Map<String, String?>): ValidationResult? {
val stepJsonString = map[VALIDATE]
return if (stepJsonString != null) {
val inputStream = ByteArrayInputStream(stepJsonString.toByteArray(StandardCharsets.UTF_8))
val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, inputStream)
parser.nextToken()
parse(parser)
} else {
null
}
}

fun parse(xcp: XContentParser): ValidationResult {
var validationMessage: String? = null
var validationStatus: Validate.ValidationStatus? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
VALIDATION_MESSAGE -> validationMessage = xcp.text()
VALIDATION_STATUS -> validationStatus = Validate.ValidationStatus.valueOf(xcp.text().uppercase(Locale.ROOT))
}
}

return ValidationResult(
requireNotNull(validationMessage) { "$VALIDATION_MESSAGE is null" },
requireNotNull(validationStatus) { "$VALIDATION_STATUS is null" }
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retr
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.validation.ActionValidation
import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService
import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
Expand Down Expand Up @@ -187,6 +188,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var actionValidation: ActionValidation
lateinit var clusterService: ClusterService
lateinit var indexNameExpressionResolver: IndexNameExpressionResolver
lateinit var rollupInterceptor: RollupInterceptor
Expand Down Expand Up @@ -387,6 +389,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerConsumers()
.registerClusterConfigurationProvider(skipFlag)
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
actionValidation = ActionValidation(settings, clusterService, jvmService)
val indexStateManagementHistory =
IndexStateManagementHistory(
settings,
Expand All @@ -408,6 +411,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
val managedIndexRunner = ManagedIndexRunner
.registerClient(client)
.registerClusterService(clusterService)
.registerValidationService(actionValidation)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
Expand Down Expand Up @@ -436,6 +440,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
rollupRunner,
transformRunner,
indexManagementIndices,
actionValidation,
managedIndexCoordinator,
indexStateManagementHistory,
indexMetadataProvider,
Expand All @@ -458,6 +463,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.ROLLOVER_ALIAS,
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.ACTION_VALIDATION_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.METADATA_SERVICE_STATUS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_NONE
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ACTION_VALIDATION_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ACTION_VALIDATION_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck
import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata
Expand All @@ -74,6 +76,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldBackoff
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.util.updateDisableManagedIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.validation.ActionValidation
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.parseWithType
Expand All @@ -83,6 +86,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.opensearchapi.withClosableContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
Expand Down Expand Up @@ -114,12 +118,14 @@ object ManagedIndexRunner :
private lateinit var scriptService: ScriptService
private lateinit var settings: Settings
private lateinit var imIndices: IndexManagementIndices
lateinit var actionValidation: ActionValidation
private lateinit var ismHistory: IndexStateManagementHistory
private lateinit var skipExecFlag: SkipExecution
private lateinit var threadPool: ThreadPool
private lateinit var extensionStatusChecker: ExtensionStatusChecker
private lateinit var indexMetadataProvider: IndexMetadataProvider
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED
private var validationServiceEnabled: Boolean = DEFAULT_ACTION_VALIDATION_ENABLED
@Suppress("MagicNumber")
private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)
@Suppress("MagicNumber")
Expand Down Expand Up @@ -166,6 +172,11 @@ object ManagedIndexRunner :
indexStateManagementEnabled = it
}

validationServiceEnabled = ACTION_VALIDATION_ENABLED.get(settings)
clusterService.clusterSettings.addSettingsUpdateConsumer(ACTION_VALIDATION_ENABLED) {
validationServiceEnabled = it
}

allowList = ALLOW_LIST.get(settings)
clusterService.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) {
allowList = it
Expand All @@ -179,6 +190,11 @@ object ManagedIndexRunner :
return this
}

fun registerValidationService(actionValidation: ActionValidation): ManagedIndexRunner {
this.actionValidation = actionValidation
return this
}

fun registerHistoryIndex(ismHistory: IndexStateManagementHistory): ManagedIndexRunner {
this.ismHistory = ismHistory
return this
Expand Down Expand Up @@ -385,8 +401,23 @@ object ManagedIndexRunner :
val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step)
val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData)

@Suppress("ComplexCondition")
@Suppress("ComplexCondition", "MaxLineLength")
if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) {
if (validationServiceEnabled) {
val validationResult = actionValidation.validate(action.type, stepContext.metadata.index)
if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) {
logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name")
publishErrorNotification(policy, managedIndexMetaData)
return
}
if (validationResult.validationStatus == Validate.ValidationStatus.FAILED) {
logger.warn("Validation Status is: FAILED. The action is {}, state is {}, step is {}.", action.type, state.name, step.name)
publishErrorNotification(policy, managedIndexMetaData)
disableManagedIndexConfig(managedIndexConfig)
return
}
}

// Step null check is done in getStartingManagedIndexMetaData
withClosableContext(
IndexManagementSecurityContext(
Expand Down Expand Up @@ -671,7 +702,6 @@ object ManagedIndexRunner :
} catch (e: Exception) {
logger.error("Failed to save ManagedIndexMetaData for [index=${managedIndexMetaData.index}]", e)
}

return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_U
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ISM_BASE_URI
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_EXPLAIN_VALIDATE_ACTION
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_EXPLAIN_SHOW_POLICY
import org.opensearch.indexmanagement.indexstatemanagement.util.SHOW_VALIDATE_ACTION
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_JOB_SORT_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.util.SHOW_POLICY_QUERY_PARAM
Expand Down Expand Up @@ -77,6 +79,7 @@ class RestExplainAction : BaseRestHandler() {
clusterManagerTimeout,
searchParams,
request.paramAsBoolean(SHOW_POLICY_QUERY_PARAM, DEFAULT_EXPLAIN_SHOW_POLICY),
request.paramAsBoolean(SHOW_VALIDATE_ACTION, DEFAULT_EXPLAIN_VALIDATE_ACTION),
indexType
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import java.util.function.Function
class ManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_ACTION_VALIDATION_ENABLED = false
const val DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP = 0L
const val DEFAULT_JOB_INTERVAL = 5
const val DEFAULT_JITTER = 0.6
Expand All @@ -28,6 +29,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val ACTION_VALIDATION_ENABLED: Setting<Boolean> = Setting.boolSetting(
"plugins.index_state_management.action_validation.enabled",
DEFAULT_ACTION_VALIDATION_ENABLED,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

// 0: migration is going on
// 1: migration succeed
// -1: migration failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@ class ExplainRequest : ActionRequest {
val clusterManagerTimeout: TimeValue
val searchParams: SearchParams
val showPolicy: Boolean
val validateAction: Boolean
val indexType: String

@Suppress("LongParameterList")
constructor(
indices: List<String>,
local: Boolean,
clusterManagerTimeout: TimeValue,
searchParams: SearchParams,
showPolicy: Boolean,
validateAction: Boolean,
indexType: String
) : super() {
this.indices = indices
this.local = local
this.clusterManagerTimeout = clusterManagerTimeout
this.searchParams = searchParams
this.showPolicy = showPolicy
this.validateAction = validateAction
this.indexType = indexType
}

Expand All @@ -47,6 +51,7 @@ class ExplainRequest : ActionRequest {
clusterManagerTimeout = sin.readTimeValue(),
searchParams = SearchParams(sin),
showPolicy = sin.readBoolean(),
validateAction = sin.readBoolean(),
indexType = sin.readString()
)

Expand All @@ -68,6 +73,7 @@ class ExplainRequest : ActionRequest {
out.writeTimeValue(clusterManagerTimeout)
searchParams.writeTo(out)
out.writeBoolean(showPolicy)
out.writeBoolean(validateAction)
out.writeString(indexType)
}

Expand Down
Loading

0 comments on commit 57544c3

Please sign in to comment.