diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt index fb6fa43c8..fdaa3f1be 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt @@ -16,7 +16,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata -class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService { +class DefaultIndexMetadataService(private val customUUIDSetting: String? = null) : IndexMetadataService { /** * Returns the default index metadata needed for ISM @@ -39,7 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index response.state.metadata.indices.forEach { // TODO waiting to add document count until it is definitely needed - val uuid = getCustomIndexUUID(it.value) + val uuid = getIndexUUID(it.value) val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1) indexNameToMetadata[it.key] = indexMetadata } @@ -48,11 +48,10 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index } /* - * If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if - * present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off - * cluster and back while using this persistent uuid. + * This method prioritize the custom index setting provided by extension to decide the index UUID + * Custom index UUID is needed when index moved out of cluster and re-attach back, it will get a new UUID in cluster metadata */ - fun getCustomIndexUUID(indexMetadata: IndexMetadata): String { + fun getIndexUUID(indexMetadata: IndexMetadata): String { return if (customUUIDSetting != null) { indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID) } else { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 961a56057..9f6a070c5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -299,7 +299,7 @@ class ManagedIndexCoordinator( // If there is a custom index uuid associated with the index, we do not auto manage it // This is because cold index uses custom uuid, and we do not auto manage cold-to-warm index val indexMetadata = clusterState.metadata.index(indexName) - val wasOffCluster = defaultIndexMetadataService.getCustomIndexUUID(indexMetadata) != indexMetadata.indexUUID + val wasOffCluster = defaultIndexMetadataService.getIndexUUID(indexMetadata) != indexMetadata.indexUUID val ismIndexMetadata = ismIndicesMetadata[indexName] // We try to find lookup name instead of using index name as datastream indices need the alias to match policy val lookupName = findIndexLookupName(indexName, clusterState) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 3ad6d1be5..d30e5bdfc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -95,7 +95,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus -import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict +import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentPolicyVersion import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -262,12 +262,11 @@ object ManagedIndexRunner : // Check the cluster state for the index metadata val clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index) val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService - val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getCustomIndexUUID(it) } - // If the index metadata is null, the index is not in the cluster state. If the index metadata is not null, but - // the cluster state index uuid differs from the one in the managed index config then the config is referring - // to a different index which does not exist in the cluster. We need to check all the extensions to confirm an index exists + val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getIndexUUID(it) } + // If the index metadata is null, the index is not in the cluster state. + // If the index metadata is not null, and the index uuid differs from the one in the managed index config + // These mean this managed index could be a different index type and should use extensions to check if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) { - // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeISMIndexMetadata(nonDefaultIndexTypes, listOf(managedIndexConfig.index)) @@ -281,20 +280,18 @@ object ManagedIndexRunner : } } - // If policy or managedIndexMetaData is null then initialize - val policy = managedIndexConfig.policy - if (policy == null || managedIndexMetaData == null) { - initManagedIndex(managedIndexConfig, managedIndexMetaData) + if (managedIndexMetaData == null) { + initManagedIndex(managedIndexConfig) return } - // If the policy was completed or failed then return early and disable job so it stops scheduling work + // If the policy was completed or failed then return early and disable job, so it stops scheduling work if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) { disableManagedIndexConfig(managedIndexConfig) return } - if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) { + if (managedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig)) { val info = mapOf("message" to "There is a version conflict between your previous execution and your managed index") val result = updateManagedIndexMetaData( managedIndexMetaData.copy( @@ -308,6 +305,7 @@ object ManagedIndexRunner : return } + val policy = managedIndexConfig.policy val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) val stepContext = StepContext( @@ -318,6 +316,7 @@ object ManagedIndexRunner : // If Index State Management is disabled and the current step is not null and safe to disable on // then disable the job and return early + // TODO are there any step not safe to disable on? if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { disableManagedIndexConfig(managedIndexConfig) return @@ -334,7 +333,11 @@ object ManagedIndexRunner : return } + logger.info("change policy $managedIndexConfig") + logger.info("change policy $managedIndexMetaData") + logger.info("change policy $action") if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { + logger.info("Change policy for index ${managedIndexConfig.index}") initChangePolicy(managedIndexConfig, managedIndexMetaData, action) return } @@ -375,7 +378,7 @@ object ManagedIndexRunner : } // If this action is not allowed and the step to be executed is the first step in the action then we will fail - // as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight + // as this action has been removed from the AllowList, but if it's not the first step we will let it finish as it's already inflight if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) { val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.") val updated = updateManagedIndexMetaData( @@ -387,8 +390,8 @@ object ManagedIndexRunner : return } - // If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step) + // If any of State, Action, Step components come back as null, then we are moving to error in ManagedIndexMetaData val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData) @Suppress("ComplexCondition", "MaxLineLength") @@ -402,7 +405,7 @@ object ManagedIndexRunner : actionValidation.validate(action.type, stepContext.metadata.index) } if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) { - logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name") + logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.", action.type, state.name, step.name) publishErrorNotification(policy, managedIndexMetaData) return } @@ -464,32 +467,44 @@ object ManagedIndexRunner : } } - private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) { - var policy: Policy? = managedIndexConfig.policy - val policyID = managedIndexConfig.changePolicy?.policyID ?: managedIndexConfig.policyID - // If policy does not currently exist, we need to save the policy on the ManagedIndexConfig for the first time - // or if a change policy exists then we will also execute the change as we are still in initialization phase - if (policy == null || managedIndexConfig.changePolicy != null) { - // Get the policy by the name unless a ChangePolicy exists then allow the change to happen during initialization - policy = getPolicy(policyID) - // Attempt to save the policy - if (policy != null) { + private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) { + var policy: Policy = managedIndexConfig.policy + lateinit var metadata: ManagedIndexMetaData + + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) + if (managedIndexConfig.changePolicy != null) { + val policyID = managedIndexConfig.changePolicy.policyID + val newPolicy = getPolicy(policyID) + if (newPolicy != null) { + policy = newPolicy val saved = savePolicyToManagedIndexConfig(managedIndexConfig, policy) - // If we failed to save the policy, don't initialize ManagedIndexMetaData - if (!saved) return + if (!saved) { + logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})") + return + } + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) + } else { + // no policy found for change policy TODO can we check this in change policy API + metadata = ManagedIndexMetaData( + index = managedIndexConfig.index, + indexUuid = managedIndexConfig.indexUuid, + policyID = policyID, + policySeqNo = null, + policyPrimaryTerm = null, + policyCompleted = false, + rolledOver = false, + indexCreationDate = getIndexCreationDate(managedIndexConfig), + transitionTo = null, + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), + info = mapOf("message" to "Fail to load policy: $policyID") + ) } - // If we failed to get the policy then we will update the ManagedIndexMetaData with error info - } - - // at this point we either successfully saved the policy or we failed to get the policy - val updatedManagedIndexMetaData = if (policy == null) { - getFailedInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policyID) - } else { - // Initializing ManagedIndexMetaData for the first time - getInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policy) } - updateManagedIndexMetaData(updatedManagedIndexMetaData, create = managedIndexMetaData == null) + updateManagedIndexMetaData(metadata, create = true) } @Suppress("ReturnCount", "BlockingMethodInNonBlockingContext") @@ -568,95 +583,32 @@ object ManagedIndexRunner : } } - private suspend fun getFailedInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, + @Suppress("ComplexMethod") + private suspend fun getInitializedManagedIndexMetaData( managedIndexConfig: ManagedIndexConfig, - policyID: String + policy: Policy ): ManagedIndexMetaData { - // we either haven't initialized any metadata yet or we have already initialized metadata but still have no policy - return managedIndexMetaData?.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") - ) ?: ManagedIndexMetaData( + val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState + val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) + + return ManagedIndexMetaData( index = managedIndexConfig.index, indexUuid = managedIndexConfig.indexUuid, - policyID = policyID, - policySeqNo = null, - policyPrimaryTerm = null, + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, policyCompleted = false, rolledOver = false, indexCreationDate = getIndexCreationDate(managedIndexConfig), transitionTo = null, - stateMetaData = null, + stateMetaData = stateMetaData, actionMetaData = null, stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") + policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), + info = mapOf("message" to "Successfully initialized policy: ${policy.id}") ) } - @Suppress("ComplexMethod") - private suspend fun getInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, - managedIndexConfig: ManagedIndexConfig, - policy: Policy - ): ManagedIndexMetaData { - val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState - val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) - - return when { - managedIndexMetaData == null -> ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = stateMetaData, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - managedIndexMetaData.policySeqNo == null || managedIndexMetaData.policyPrimaryTerm == null -> - // If there is seqNo and PrimaryTerm it is first time populating Policy. - managedIndexMetaData.copy( - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - stateMetaData = stateMetaData, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - // this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm - // in the metadata, in this case we just want to say we successfully initialized the policy again but we will not - // modify the state, action, etc. so it can resume where it left off - managedIndexMetaData.policySeqNo == policy.seqNo && - managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm && - managedIndexMetaData.policyID == policy.id -> - // If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue. - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - else -> - // else this means we either tried to load a policy with a different id, seqno, or primaryterm from what is - // in the metadata and we cannot guarantee it will work with the current state in managedIndexMetaData - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf( - "message" to "Fail to load policy: ${policy.id} with " + - "seqNo ${policy.seqNo} and primaryTerm ${policy.primaryTerm} as it" + - " does not match what's in the metadata [policyID=${managedIndexMetaData.policyID}," + - " policySeqNo=${managedIndexMetaData.policySeqNo}, policyPrimaryTerm=${managedIndexMetaData.policyPrimaryTerm}]" - ) - ) - } - } - /** * update metadata in config index, and save metadata in history after update * this can be called 2 times in one job run, so need to save seqNo & primeTerm @@ -717,11 +669,13 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null + // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null val changePolicy = managedIndexConfig.changePolicy if (changePolicy == null) { - logger.debug("initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: $managedIndexConfig") + logger.debug( + "initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: {}", + managedIndexConfig + ) return } @@ -737,12 +691,11 @@ object ManagedIndexRunner : } else { // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase - val newTransitionMetaData = ActionMetaData( - TransitionsAction.name, Instant.now().toEpochMilli(), -1, - false, 0, 0, null - ) val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { - newTransitionMetaData + ActionMetaData( + TransitionsAction.name, Instant.now().toEpochMilli(), -1, + false, 0, 0, null + ) } else { managedIndexMetaData.actionMetaData } @@ -761,11 +714,11 @@ object ManagedIndexRunner : // check if the safe flag was set by the Change Policy REST API, if it was then do a second validation // before allowing a change to happen if (changePolicy.isSafe) { - // if policy is null then we are only updating error information in metadata so its fine to continue + // if policy is null then we are only updating error information in metadata, so it's fine to continue if (policy != null) { // current policy being null should never happen as we have a check at the top of runner - // if it is unsafe to change then we set safe back to false so we don't keep doing this check every execution - if (managedIndexConfig.policy?.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy) != true) { + // if it is unsafe to change then we set safe back to false, so we don't keep doing this check every execution + if (!managedIndexConfig.policy.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy)) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) return } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index 7e8d36268..d92af725e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -35,7 +35,7 @@ data class ManagedIndexConfig( val policyID: String, val policySeqNo: Long?, val policyPrimaryTerm: Long?, - val policy: Policy?, + val policy: Policy, val changePolicy: ChangePolicy?, val jobJitter: Double? ) : ScheduledJobParameter { @@ -177,11 +177,13 @@ data class ManagedIndexConfig( policyID = requireNotNull(policyID) { "ManagedIndexConfig policy id is null" }, policySeqNo = policySeqNo, policyPrimaryTerm = policyPrimaryTerm, - policy = policy?.copy( - id = policyID, - seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, - primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM - ), + policy = requireNotNull( + policy?.copy( + id = policyID, + seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ) + ) { "ManagedIndexConfig policy is null" }, changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index 60b84bf7b..ceba19187 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -67,7 +67,7 @@ fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: D indexMetadatas.forEach { // it.key is index name if (it.value.state == IndexMetadata.State.CLOSE) { - closeList.add(defaultIndexMetadataService.getCustomIndexUUID(it.value)) + closeList.add(defaultIndexMetadataService.getIndexUUID(it.value)) } } return closeList diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt index 101177465..7858ef93d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt @@ -7,19 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings import org.opensearch.common.settings.Setting import org.opensearch.common.unit.TimeValue -import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_ALL +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST_NONE import java.util.concurrent.TimeUnit import java.util.function.Function @Suppress("UtilityClassWithPublicConstructor") class LegacyOpenDistroManagedIndexSettings { companion object { - const val DEFAULT_ISM_ENABLED = true - const val DEFAULT_JOB_INTERVAL = 5 - private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() - val ALLOW_LIST_NONE = emptyList() - val SNAPSHOT_DENY_LIST_NONE = emptyList() - val INDEX_STATE_MANAGEMENT_ENABLED: Setting = Setting.boolSetting( "opendistro.index_state_management.enabled", DEFAULT_ISM_ENABLED, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt index 62f6408ff..125844f4a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings import org.opensearch.common.settings.Setting import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import java.util.function.Function @Suppress("UtilityClassWithPublicConstructor") @@ -19,6 +20,7 @@ class ManagedIndexSettings { const val DEFAULT_JITTER = 0.6 const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX" val ALLOW_LIST_NONE = emptyList() + val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() val SNAPSHOT_DENY_LIST_NONE = emptyList() val INDEX_STATE_MANAGEMENT_ENABLED: Setting = Setting.boolSetting( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index b697aeea7..1e9d09a0c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -238,7 +238,7 @@ class TransportChangePolicyAction @Inject constructor( val clusterState = response.state val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService clusterState.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } // ISMIndexMetadata from the default index metadata service uses lenient expand, we want to use strict expand, filter diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index ec32bcbd9..f30a0422a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -199,10 +199,10 @@ class TransportExplainAction @Inject constructor( "enabled" to managedIndex.enabled.toString() ) if (showPolicy) { - managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it } + managedIndex.policy.let { appliedPolicies[managedIndex.index] = it } } if (validateAction) { - managedIndex.policy?.let { policiesforValidation[managedIndex.index] = it } + managedIndex.policy.let { policiesforValidation[managedIndex.index] = it } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt index b12b73468..4d85b1c02 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt @@ -167,7 +167,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( override fun onResponse(response: ClusterStateResponse) { val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService response.state.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } processResponse() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index ee2762375..cda41cf40 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -61,7 +61,7 @@ fun managedIndexConfigIndexRequest( uuid: String, policyID: String, jobInterval: Int, - policy: Policy? = null, + policy: Policy, jobJitter: Double? ): IndexRequest { val managedIndexConfig = ManagedIndexConfig( @@ -74,8 +74,8 @@ fun managedIndexConfigIndexRequest( jobEnabledTime = Instant.now(), policyID = policyID, policy = policy, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, changePolicy = null, jobJitter = jobJitter ) @@ -400,25 +400,24 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta return true } - // we need this in so that we can change policy before the first transition happens so policy doesnt get completed + // we need this in so that we can change policy before the first transition happens so policy doesn't get completed // before we have a chance to change policy if (actionToExecute?.type == TransitionsAction.name) { return true } - if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) { - return false - } - - return true + // TODO actionToExecute is correlate to the actionMetadata? + // actionToExecute is found out by checking the metadata, it can be current unfinished one or the next + // actionMetadata has already been updated, it can be current unfinished one or the next + // In change policy context, we only accept unfinished transition or the new transition + return managedIndexMetaData.actionMetaData?.name == TransitionsAction.name } -fun ManagedIndexMetaData.hasVersionConflict(managedIndexConfig: ManagedIndexConfig): Boolean = +fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean = this.policySeqNo != managedIndexConfig.policySeqNo || this.policyPrimaryTerm != managedIndexConfig.policyPrimaryTerm fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { - val schedule = this.schedule - when (schedule) { + when (val schedule = this.schedule) { is IntervalSchedule -> { return schedule.interval != jobInterval } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt index f800b54cd..8446be029 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -24,6 +24,7 @@ object RollupFieldValueExpressionResolver { private lateinit var scriptService: ScriptService private lateinit var clusterService: ClusterService lateinit var indexAliasUtils: IndexAliasUtils + fun resolve(rollup: Rollup, fieldValue: String): String { val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 7bb838f0a..cbf3b0976 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -319,8 +319,7 @@ fun randomManagedIndexConfig( schedule: Schedule = IntervalSchedule(Instant.ofEpochMilli(Instant.now().toEpochMilli()), 5, ChronoUnit.MINUTES), lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, - policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - policy: Policy? = randomPolicy(), + policy: Policy = randomPolicy(), changePolicy: ChangePolicy? = randomChangePolicy(), jitter: Double? = 0.0 ): ManagedIndexConfig { @@ -332,10 +331,10 @@ fun randomManagedIndexConfig( jobSchedule = schedule, jobLastUpdatedTime = lastUpdatedTime, jobEnabledTime = enabledTime, - policyID = policy?.id ?: policyID, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, - policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + policy = policy.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index 17f96cda6..eba50e92b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -220,7 +220,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, updatedManagedIndexConfig.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + updatedManagedIndexConfig.policy.description + ) } fun `test changing policy on a valid index and log pattern`() { @@ -301,7 +304,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) // After first execution we should expect the change policy to still be null (since we haven't called it yet) - // and the initial policy should of been cached + // and the initial policy should have been cached val executedManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Executed managed index config is null", config) @@ -346,7 +349,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // speed up to second execution we will have a ChangePolicy but not be in Transitions yet // which means we should still execute the ReadOnlyAction updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Next managed index config is null", config) @@ -386,7 +388,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // speed up to third execution so that we try to move to transitions and trigger a change policy updateManagedIndexConfigStartTime(managedIndexConfig) - val changedManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Changed managed index config is null", config) @@ -512,9 +513,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { RestRequest.Method.POST.toString(), "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity() ) - assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList(), UPDATED_INDICES to 1), response.asMap()) - waitFor { assertNotNull(getExistingManagedIndexConfig(index).changePolicy) } // speed up to first execution where we initialize the policy on the job @@ -529,7 +528,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, config.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + config.policy.description + ) config } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 3dd7b20ec..d6dfed16a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.randomClusterStateManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy import org.opensearch.indexmanagement.indexstatemanagement.randomSweptManagedIndexConfig import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.test.OpenSearchTestCase @@ -34,7 +35,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { val index = randomAlphaOfLength(10) val uuid = randomAlphaOfLength(10) val policyID = randomAlphaOfLength(10) - val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, jobJitter = 0.0) + val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, randomPolicy(), jobJitter = 0.0) assertNotNull("IndexRequest not created", createRequest) assertEquals("Incorrect ism index used in request", INDEX_MANAGEMENT_INDEX, createRequest.index())