From aa09f856dae30e4318315e32d65b9d045c591609 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sun, 1 Oct 2023 11:26:54 -0700 Subject: [PATCH] Remove the old stale cluster state ism metadata logic Signed-off-by: bowenlan-amzn --- .../indexmanagement/IndexManagementPlugin.kt | 9 - .../ManagedIndexCoordinator.kt | 10 -- .../ManagedIndexRunner.kt | 11 +- .../opensearchapi/OpenSearchExtensions.kt | 9 - .../LegacyOpenDistroManagedIndexSettings.kt | 32 ---- .../settings/ManagedIndexSettings.kt | 30 ---- .../addpolicy/TransportAddPolicyAction.kt | 30 +++- .../TransportChangePolicyAction.kt | 19 +-- .../action/explain/TransportExplainAction.kt | 36 +--- .../TransportRemovePolicyAction.kt | 3 - .../TransportRetryFailedManagedIndexAction.kt | 10 +- ...ansportUpdateManagedIndexMetaDataAction.kt | 158 ------------------ .../UpdateManagedIndexMetaDataAction.kt | 22 --- .../UpdateManagedIndexMetaDataRequest.kt | 67 -------- .../util/ManagedIndexUtils.kt | 84 +--------- .../util/RestHandlerUtils.kt | 22 --- .../IndexManagementSettingsTests.kt | 4 - .../ManagedIndexCoordinatorTests.kt | 2 - 18 files changed, 35 insertions(+), 523 deletions(-) delete mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt delete mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataAction.kt delete mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataRequest.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 0986e9a13..00e25ce86 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -82,8 +82,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.remo import org.opensearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.TransportRetryFailedManagedIndexAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.validation.ActionValidation import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction @@ -500,10 +498,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ManagedIndexSettings.ROLLOVER_SKIP, ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED, ManagedIndexSettings.ACTION_VALIDATION_ENABLED, - ManagedIndexSettings.METADATA_SERVICE_ENABLED, ManagedIndexSettings.AUTO_MANAGE, - ManagedIndexSettings.METADATA_SERVICE_STATUS, - ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL, ManagedIndexSettings.JITTER, ManagedIndexSettings.JOB_INTERVAL, ManagedIndexSettings.SWEEP_PERIOD, @@ -540,7 +535,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin LegacyOpenDistroManagedIndexSettings.ROLLOVER_ALIAS, LegacyOpenDistroManagedIndexSettings.ROLLOVER_SKIP, LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED, - LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED, LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL, LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD, LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_COUNT, @@ -548,8 +542,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin LegacyOpenDistroManagedIndexSettings.ALLOW_LIST, LegacyOpenDistroManagedIndexSettings.SNAPSHOT_DENY_LIST, LegacyOpenDistroManagedIndexSettings.AUTO_MANAGE, - LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS, - LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL, LegacyOpenDistroManagedIndexSettings.RESTRICTED_INDEX_PATTERN, LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_COUNT, LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS, @@ -565,7 +557,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin override fun getActions(): List> { return listOf( - ActionPlugin.ActionHandler(UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java), ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java), ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java), ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 3583122e2..8a37145d0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -119,8 +119,6 @@ class ManagedIndexCoordinator( private val ismIndices = indexManagementIndices private var scheduledFullSweep: Scheduler.Cancellable? = null - private var scheduledMoveMetadata: Scheduler.Cancellable? = null - private var scheduledTemplateMigration: Scheduler.Cancellable? = null @Volatile private var lastFullSweepTimeNano = System.nanoTime() @Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings) @@ -170,10 +168,6 @@ class ManagedIndexCoordinator( fun offClusterManager() { // Cancel background sweep when demoted from being cluster manager scheduledFullSweep?.cancel() - - scheduledMoveMetadata?.cancel() - - scheduledTemplateMigration?.cancel() } override fun clusterChanged(event: ClusterChangedEvent) { @@ -206,8 +200,6 @@ class ManagedIndexCoordinator( override fun beforeStop() { scheduledFullSweep?.cancel() - - scheduledMoveMetadata?.cancel() } private fun enable() { @@ -229,8 +221,6 @@ class ManagedIndexCoordinator( private fun disable() { scheduledFullSweep?.cancel() indexStateManagementEnabled = false - - scheduledMoveMetadata?.cancel() } private suspend fun reenableJobs() { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 4ab3f48d0..3ad6d1be5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -56,15 +56,12 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ACTION_VALIDATION_ENABLED import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE -import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck -import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentJobInterval import org.opensearch.indexmanagement.indexstatemanagement.util.hasTimedOut -import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict import org.opensearch.indexmanagement.indexstatemanagement.util.isAllowed import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange @@ -98,6 +95,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -281,13 +279,6 @@ object ManagedIndexRunner : logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.") return } - } else { - val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata() - val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger) - if (metadataCheck != MetadataCheck.SUCCESS) { - logger.info("Skipping execution while metadata status is $metadataCheck") - return - } } // If policy or managedIndexMetaData is null then initialize diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index cb7e4c6ad..60b84bf7b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -61,15 +61,6 @@ fun IndexMetadata.getRolloverSkip(): Boolean { return this.settings.getAsBoolean(ManagedIndexSettings.ROLLOVER_SKIP.key, false) } -fun IndexMetadata.getManagedIndexMetadata(): ManagedIndexMetaData? { - val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE) - - if (existingMetaDataMap != null) { - return ManagedIndexMetaData.fromMap(existingMetaDataMap) - } - return null -} - fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: DefaultIndexMetadataService): MutableList { val indexMetadatas = state.metadata.indices val closeList = mutableListOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt index fa6d6a74b..101177465 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt @@ -15,8 +15,6 @@ import java.util.function.Function class LegacyOpenDistroManagedIndexSettings { companion object { const val DEFAULT_ISM_ENABLED = true - const val DEFAULT_METADATA_SERVICE_STATUS = 0 - const val DEFAULT_METADATA_SERVICE_ENABLED = true const val DEFAULT_JOB_INTERVAL = 5 private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() val ALLOW_LIST_NONE = emptyList() @@ -30,36 +28,6 @@ class LegacyOpenDistroManagedIndexSettings { Setting.Property.Deprecated ) - // 0: migration is going on - // 1: migration succeed - // -1: migration failed - val METADATA_SERVICE_STATUS: Setting = Setting.intSetting( - "opendistro.index_state_management.metadata_migration.status", - DEFAULT_METADATA_SERVICE_STATUS, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ) - - // 0: enabled, use onClusterManager time as ISM template last_updated_time - // -1: migration ended successfully - // -2: migration ended unsuccessfully - // >0: use this setting (epoch millis) as ISM template last_updated_time - val TEMPLATE_MIGRATION_CONTROL: Setting = Setting.longSetting( - "opendistro.index_state_management.template_migration.control", - ManagedIndexSettings.DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP, - -2L, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ) - - val METADATA_SERVICE_ENABLED: Setting = Setting.boolSetting( - "opendistro.index_state_management.metadata_service.enabled", - DEFAULT_METADATA_SERVICE_ENABLED, - Setting.Property.NodeScope, - Setting.Property.Dynamic, - Setting.Property.Deprecated - ) - val POLICY_ID: Setting = Setting.simpleString( "index.opendistro.index_state_management.policy_id", Setting.Property.IndexScope, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt index 56bda817e..62f6408ff 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -15,7 +15,6 @@ class ManagedIndexSettings { companion object { const val DEFAULT_ISM_ENABLED = true const val DEFAULT_ACTION_VALIDATION_ENABLED = false - const val DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP = 0L const val DEFAULT_JOB_INTERVAL = 5 const val DEFAULT_JITTER = 0.6 const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX" @@ -36,35 +35,6 @@ class ManagedIndexSettings { Setting.Property.Dynamic ) - // 0: migration is going on - // 1: migration succeed - // -1: migration failed - val METADATA_SERVICE_STATUS: Setting = Setting.intSetting( - "plugins.index_state_management.metadata_migration.status", - LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ) - - // 0: enabled, use onClusterManager time as ISM template last_updated_time - // -1: migration ended successfully - // -2: migration ended unsuccessfully - // >0: use this setting (epoch millis) as ISM template last_updated_time - val TEMPLATE_MIGRATION_CONTROL: Setting = Setting.longSetting( - "plugins.index_state_management.template_migration.control", - LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL, - -2L, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ) - - val METADATA_SERVICE_ENABLED: Setting = Setting.boolSetting( - "plugins.index_state_management.metadata_service.enabled", - LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ) - val POLICY_ID: Setting = Setting.simpleString( "index.plugins.index_state_management.policy_id", LegacyOpenDistroManagedIndexSettings.POLICY_ID, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt index da4e81d77..d2307120a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt @@ -34,8 +34,8 @@ import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.commons.ConfigConstants import org.opensearch.commons.authuser.User -import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.index.Index +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider @@ -49,7 +49,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.mana import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest -import org.opensearch.indexmanagement.indexstatemanagement.util.removeClusterStateMetadatas import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -61,6 +60,7 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.time.Duration @@ -126,7 +126,7 @@ class TransportAddPolicyAction @Inject constructor( } @Suppress("SpreadOperator") - fun getClusterState() { + private fun getClusterState() { startTime = Instant.now() CoroutineScope(Dispatchers.IO).launch { val indexNameToMetadata: MutableMap = HashMap() @@ -193,10 +193,6 @@ class TransportAddPolicyAction @Inject constructor( clusterStateRequest, object : ActionListener { override fun onResponse(response: ClusterStateResponse) { - CoroutineScope(Dispatchers.IO).launch { - removeClusterStateMetadatas(client, log, indicesToAdd.map { Index(it.value, it.key) }) - } - val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService getUuidsForClosedIndices(response.state, defaultIndexMetadataService).forEach { failedIndices.add(FailedIndex(indicesToAdd[it] as String, it, "This index is closed")) @@ -346,6 +342,9 @@ class TransportAddPolicyAction @Inject constructor( } } actionListener.onResponse(ISMStatusResponse(indicesToAdd.size, failedIndices)) + + // best effort to clean up ISM metadata + removeMetadatas(indicesToAdd.map { Index(it.value, it.key) }) } override fun onFailure(t: Exception) { @@ -368,6 +367,23 @@ class TransportAddPolicyAction @Inject constructor( private fun onFailure(t: Exception) { actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) } + + fun removeMetadatas(indices: List) { + val request = indices.map { deleteManagedIndexMetadataRequest(it.uuid) } + val bulkReq = BulkRequest().add(request) + client.bulk( + bulkReq, + object : ActionListener { + override fun onResponse(response: BulkResponse) { + log.debug("Successfully cleaned metadata for remove policy indices: {}", indices) + } + + override fun onFailure(e: Exception) { + log.error("Failed to clean metadata for remove policy indices.", e) + } + } + ) + } } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index 7dca57343..b697aeea7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -41,7 +41,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.buildMgetMetadataRequest -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetResponseToMap import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse @@ -268,9 +267,6 @@ class TransportChangePolicyAction @Inject constructor( val includedStates = changePolicy.include.map { it.state }.toSet() indicesToUpdate.forEach { (indexUuid, indexName) -> - // indexMetaData and clusterStateMetadata will be null for non-default index types - val indexMetaData = indexUuidToIndexMetadata[indexUuid] - val clusterStateMetadata = indexMetaData?.getManagedIndexMetadata() val mgetFailure = metadataMap[indexUuid]?.second val managedIndexMetadata: ManagedIndexMetaData? = metadataMap[managedIndexMetadataID(indexUuid)]?.first @@ -296,25 +292,16 @@ class TransportChangePolicyAction @Inject constructor( RestChangePolicyAction.INDEX_IN_TRANSITION ) ) - // else if there is no ManagedIndexMetaData yet then the managed index has not initialized and we can change the policy safely + // else if there is no ManagedIndexMetaData yet then the managed index has not initialized, and we can change the policy safely managedIndexMetadata == null -> { - if (clusterStateMetadata != null) { - failedIndices.add( - FailedIndex( - indexName, indexUuid, - "Cannot change policy until metadata has finished migrating" - ) - ) - } else { - managedIndicesToUpdate.add(indexName to indexUuid) - } + managedIndicesToUpdate.add(indexName to indexUuid) } // else if the includedStates is empty (i.e. not being used) then we will always try to update the managed index includedStates.isEmpty() -> managedIndicesToUpdate.add(indexName to indexUuid) // else only update the managed index if its currently in one of the included states includedStates.contains(managedIndexMetadata.stateMetaData?.name) -> managedIndicesToUpdate.add(indexName to indexUuid) - // else the managed index did not match any of the included state filters and we will not update it + // else the managed index did not match any of the included state filters, and we will not update it else -> log.debug("Skipping $indexName as it does not match any of the include state filters") } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index 9e67742b3..b6c9cfa0e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -23,7 +23,6 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient -import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject @@ -44,14 +43,11 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.common.model.rest.SearchParams import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner.actionValidation -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_INDEX_UUID_FIELD import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_NAME_KEYWORD_FIELD -import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck -import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -271,8 +267,7 @@ class TransportExplainAction @Inject constructor( clusterStateRequest, object : ActionListener { override fun onResponse(response: ClusterStateResponse) { - val clusterStateIndexMetadatas = response.state.metadata.indices - getMetadataMap(clusterStateIndexMetadatas, threadContext) + getMetadataMap(threadContext) } override fun onFailure(t: Exception) { @@ -281,11 +276,11 @@ class TransportExplainAction @Inject constructor( } ) } else { - getMetadataMap(null, threadContext) + getMetadataMap(threadContext) } } - private fun getMetadataMap(clusterStateIndexMetadatas: Map?, threadContext: ThreadContext.StoredContext) { + private fun getMetadataMap(threadContext: ThreadContext.StoredContext) { val mgetMetadataReq = MultiGetRequest() indexNamesToUUIDs.values.forEach { uuid -> mgetMetadataReq.add(MultiGetRequest.Item(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(uuid)).routing(uuid)) @@ -296,7 +291,7 @@ class TransportExplainAction @Inject constructor( override fun onResponse(response: MultiGetResponse) { val metadataMap: Map = response.responses.associate { it.id to getMetadata(it.response)?.toMap() } - buildResponse(indexNamesToUUIDs, metadataMap, clusterStateIndexMetadatas, threadContext) + buildResponse(indexNamesToUUIDs, metadataMap, threadContext) } override fun onFailure(t: Exception) { @@ -310,7 +305,6 @@ class TransportExplainAction @Inject constructor( private fun buildResponse( indices: Map, metadataMap: Map, - clusterStateIndexMetadatas: Map?, threadContext: ThreadContext.StoredContext ) { // cluster state response will not resist the sort order @@ -329,22 +323,13 @@ class TransportExplainAction @Inject constructor( if (metadataMapFromManagedIndex.isNotEmpty()) { managedIndexMetadata = ManagedIndexMetaData.fromMap(metadataMapFromManagedIndex) } - - // clusterStateIndexMetadatas will not be null only for the default index type - if (clusterStateIndexMetadatas != null) { - val currentIndexUuid = indices[indexName] - val clusterStateMetadata = clusterStateIndexMetadatas[indexName]?.getManagedIndexMetadata() - val metadataCheck = checkMetadata(clusterStateMetadata, configIndexMetadataMap, currentIndexUuid, log) - val info = metadataStatusToInfo[metadataCheck] - info?.let { managedIndexMetadata = clusterStateMetadata?.copy(info = it) } - } } if (validateAction) { var validationResult = actionValidation.validate("nothing", indexName) val policy = policiesforValidation[indexName] if (policy != null && managedIndexMetadata != null) { - val state = policy.getStateToExecute(managedIndexMetadata!!) - val action = state?.getActionToExecute(managedIndexMetadata!!, indexMetadataProvider) + val state = policy.getStateToExecute(managedIndexMetadata) + val action = state?.getActionToExecute(managedIndexMetadata, indexMetadataProvider) var actionName = action?.type if (actionName == null) { actionName = "nothing" @@ -445,13 +430,4 @@ class TransportExplainAction @Inject constructor( } } } - - companion object { - const val METADATA_MOVING_WARNING = "Managed index's metadata is pending migration." - const val METADATA_CORRUPT_WARNING = "Managed index's metadata is corrupt, please use remove policy API to clean it." - val metadataStatusToInfo = mapOf( - MetadataCheck.PENDING to mapOf("message" to METADATA_MOVING_WARNING), - MetadataCheck.CORRUPT to mapOf("message" to METADATA_CORRUPT_WARNING) - ) - } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt index 2c74e4b1a..b9db84887 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt @@ -48,7 +48,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TY import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest -import org.opensearch.indexmanagement.indexstatemanagement.util.removeClusterStateMetadatas import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata import org.opensearch.indexmanagement.util.IndexManagementException import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser @@ -352,8 +351,6 @@ class TransportRemovePolicyAction @Inject constructor( // clean metadata for indicesToRemove val indicesToRemoveMetadata = indicesToRemove.map { Index(it.value, it.key) } - // best effort - CoroutineScope(Dispatchers.IO).launch { removeClusterStateMetadatas(client, log, indicesToRemoveMetadata) } removeMetadatas(indicesToRemoveMetadata) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt index fd21528eb..b12b73468 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt @@ -38,7 +38,6 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.buildMgetMetadataRequest -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetResponseToMap import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexAction @@ -222,9 +221,6 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( private fun onMgetMetadataResponse(mgetResponse: MultiGetResponse) { val metadataMap = mgetResponseToMap(mgetResponse) indicesToRetry.forEach { (indexUuid, indexName) -> - // indexMetaData and clusterStateMetadata will be null for non-default index types - val indexMetaData = indexUuidToIndexMetadata[indexUuid] - val clusterStateMetadata = indexMetaData?.getManagedIndexMetadata() val mgetFailure = metadataMap[managedIndexMetadataID(indexUuid)]?.second val managedIndexMetadata: ManagedIndexMetaData? = metadataMap[managedIndexMetadataID(indexUuid)]?.first when { @@ -233,11 +229,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( mgetFailure != null -> failedIndices.add(FailedIndex(indexName, indexUuid, "Failed to get managed index metadata, $mgetFailure")) managedIndexMetadata == null -> { - if (clusterStateMetadata != null) { - failedIndices.add(FailedIndex(indexName, indexUuid, "Cannot retry until metadata has finished migrating")) - } else { - failedIndices.add(FailedIndex(indexName, indexUuid, "This index has no metadata information")) - } + failedIndices.add(FailedIndex(indexName, indexUuid, "This index has no metadata information")) } !managedIndexMetadata.isFailed -> failedIndices.add(FailedIndex(indexName, indexUuid, "This index is not in failed state.")) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt deleted file mode 100644 index 8d9f4f731..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata - -import org.apache.logging.log4j.LogManager -import org.opensearch.core.action.ActionListener -import org.opensearch.action.support.ActionFilters -import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.cluster.ClusterState -import org.opensearch.cluster.ClusterStateTaskConfig -import org.opensearch.cluster.ClusterStateTaskExecutor -import org.opensearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult -import org.opensearch.cluster.ClusterStateTaskListener -import org.opensearch.cluster.block.ClusterBlockException -import org.opensearch.cluster.block.ClusterBlockLevel -import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.cluster.metadata.IndexNameExpressionResolver -import org.opensearch.cluster.metadata.Metadata -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.Priority -import org.opensearch.common.inject.Inject -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.Writeable -import org.opensearch.core.index.Index -import org.opensearch.indexmanagement.IndexManagementPlugin -import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData -import org.opensearch.threadpool.ThreadPool -import org.opensearch.transport.TransportService - -class TransportUpdateManagedIndexMetaDataAction @Inject constructor( - threadPool: ThreadPool, - clusterService: ClusterService, - transportService: TransportService, - actionFilters: ActionFilters, - val indexMetadataProvider: IndexMetadataProvider, - indexNameExpressionResolver: IndexNameExpressionResolver -) : TransportClusterManagerNodeAction( - UpdateManagedIndexMetaDataAction.INSTANCE.name(), - transportService, - clusterService, - threadPool, - actionFilters, - Writeable.Reader { UpdateManagedIndexMetaDataRequest(it) }, - indexNameExpressionResolver -) { - - private val log = LogManager.getLogger(javaClass) - private val executor = ManagedIndexMetaDataExecutor() - - override fun checkBlock(request: UpdateManagedIndexMetaDataRequest, state: ClusterState): ClusterBlockException? { - // https://github.com/elastic/elasticsearch/commit/ae14b4e6f96b554ca8f4aaf4039b468f52df0123 - // This commit will help us to give each individual index name and the error that is cause it. For now it will be a generic error message. - val indicesToAddTo = request.indicesToAddManagedIndexMetaDataTo.map { it.first }.toTypedArray() - val indicesToRemoveFrom = request.indicesToRemoveManagedIndexMetaDataFrom.map { it }.toTypedArray() - val indices = checkExtensionsOverrideBlock(indicesToAddTo + indicesToRemoveFrom, state) - - return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indices) - } - - /* - * Index Management extensions may provide an index setting, which, if set to true, overrides the cluster metadata write block - */ - private fun checkExtensionsOverrideBlock(indices: Array, state: ClusterState): Array { - val indexBlockOverrideSettings = indexMetadataProvider.getIndexMetadataWriteOverrideSettings() - val indicesToBlock = indices.toMutableList() - indexBlockOverrideSettings.forEach { indexBlockOverrideSetting -> - indicesToBlock.removeIf { state.metadata.getIndexSafe(it).settings.getAsBoolean(indexBlockOverrideSetting, false) } - } - return indicesToBlock - .map { it.name } - .toTypedArray() - } - - override fun clusterManagerOperation( - request: UpdateManagedIndexMetaDataRequest, - state: ClusterState, - listener: ActionListener - ) { - clusterService.submitStateUpdateTask( - IndexManagementPlugin.OLD_PLUGIN_NAME, - ManagedIndexMetaDataTask(request.indicesToAddManagedIndexMetaDataTo, request.indicesToRemoveManagedIndexMetaDataFrom), - ClusterStateTaskConfig.build(Priority.NORMAL), - executor, - object : ClusterStateTaskListener { - override fun onFailure(source: String, e: Exception) = listener.onFailure(e) - - override fun clusterStateProcessed(source: String, oldState: ClusterState, newState: ClusterState) = - listener.onResponse(AcknowledgedResponse(true)) - } - ) - } - - override fun read(si: StreamInput): AcknowledgedResponse { - return AcknowledgedResponse(si) - } - - override fun executor(): String { - return ThreadPool.Names.SAME - } - - inner class ManagedIndexMetaDataExecutor : ClusterStateTaskExecutor { - - override fun execute(currentState: ClusterState, tasks: List): ClusterTasksResult { - val newClusterState = getUpdatedClusterState(currentState, tasks) - return ClusterTasksResult.builder().successes(tasks).build(newClusterState) - } - } - - fun getUpdatedClusterState(currentState: ClusterState, tasks: List): ClusterState { - // If there are no indices to make changes to, return early. - // Also doing this because when creating a metaDataBuilder and making no changes to it, for some - // reason the task does not complete, leading to indefinite suspension. - if (tasks.all { it.indicesToAddManagedIndexMetaDataTo.isEmpty() && it.indicesToRemoveManagedIndexMetaDataFrom.isEmpty() } - ) { - return currentState - } - log.trace("Start of building new cluster state") - val metaDataBuilder = Metadata.builder(currentState.metadata) - for (task in tasks) { - for (pair in task.indicesToAddManagedIndexMetaDataTo) { - if (currentState.metadata.hasIndex(pair.first.name)) { - metaDataBuilder.put( - IndexMetadata.builder(currentState.metadata.index(pair.first)) - .putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE, pair.second.toMap()) - ) - } else { - log.debug("No IndexMetadata found for [${pair.first.name}] when updating ManagedIndexMetaData") - } - } - - for (index in task.indicesToRemoveManagedIndexMetaDataFrom) { - if (currentState.metadata.hasIndex(index.name)) { - val indexMetaDataBuilder = IndexMetadata.builder(currentState.metadata.index(index)) - indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE) - - metaDataBuilder.put(indexMetaDataBuilder) - } else { - log.debug("No IndexMetadata found for [${index.name}] when removing ManagedIndexMetaData") - } - } - } - log.trace("End of building new cluster state") - - return ClusterState.builder(currentState).metadata(metaDataBuilder).build() - } - - companion object { - data class ManagedIndexMetaDataTask( - val indicesToAddManagedIndexMetaDataTo: List>, - val indicesToRemoveManagedIndexMetaDataFrom: List - ) - } -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataAction.kt deleted file mode 100644 index 7078f2d59..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataAction.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata - -import org.opensearch.action.ActionType -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.core.common.io.stream.Writeable - -class UpdateManagedIndexMetaDataAction : ActionType(NAME, reader) { - - companion object { - const val NAME = "cluster:admin/ism/update/managedindexmetadata" - val INSTANCE = UpdateManagedIndexMetaDataAction() - - val reader = Writeable.Reader { AcknowledgedResponse(it) } - } - - override fun getResponseReader(): Writeable.Reader = reader -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataRequest.kt deleted file mode 100644 index 5d5d31b69..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataRequest.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata - -import org.opensearch.action.ActionRequestValidationException -import org.opensearch.action.ValidateActions.addValidationError -import org.opensearch.action.support.master.AcknowledgedRequest -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.index.Index -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData - -class UpdateManagedIndexMetaDataRequest : AcknowledgedRequest { - - var indicesToAddManagedIndexMetaDataTo: List> - private set - - var indicesToRemoveManagedIndexMetaDataFrom: List - private set - - constructor(si: StreamInput) : super(si) { - indicesToAddManagedIndexMetaDataTo = si.readList { - val index = Index(it) - val managedIndexMetaData = ManagedIndexMetaData.fromStreamInput(it) - Pair(index, managedIndexMetaData) - } - - indicesToRemoveManagedIndexMetaDataFrom = si.readList { Index(it) } - } - - constructor( - indicesToAddManagedIndexMetaDataTo: List> = listOf(), - indicesToRemoveManagedIndexMetaDataFrom: List = listOf() - ) { - this.indicesToAddManagedIndexMetaDataTo = indicesToAddManagedIndexMetaDataTo - this.indicesToRemoveManagedIndexMetaDataFrom = indicesToRemoveManagedIndexMetaDataFrom - } - - override fun validate(): ActionRequestValidationException? { - var validationException: ActionRequestValidationException? = null - - if (this.indicesToAddManagedIndexMetaDataTo.isEmpty() && this.indicesToRemoveManagedIndexMetaDataFrom.isEmpty()) { - validationException = addValidationError( - "At least one non-empty List must be given for UpdateManagedIndexMetaDataRequest", - validationException - ) - } - - return validationException - } - - override fun writeTo(streamOutput: StreamOutput) { - super.writeTo(streamOutput) - - streamOutput.writeCollection(indicesToAddManagedIndexMetaDataTo) { so, pair -> - pair.first.writeTo(so) - pair.second.writeTo(so) - } - - streamOutput.writeCollection(indicesToRemoveManagedIndexMetaDataFrom) { so, index -> - index.writeTo(so) - } - } -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 309ce9931..ee2762375 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -7,11 +7,8 @@ @file:JvmName("ManagedIndexUtils") package org.opensearch.indexmanagement.indexstatemanagement.util -// import inet.ipaddr.IPAddressString import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext -// import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.Logger import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse @@ -19,7 +16,6 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest import org.opensearch.action.update.UpdateRequest -// import org.opensearch.alerting.destination.message.BaseMessage import org.opensearch.client.Client import org.opensearch.cluster.routing.Preference import org.opensearch.core.common.unit.ByteSizeValue @@ -38,14 +34,12 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy -import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.State import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.opensearchapi.optionalISMTemplateField import org.opensearch.indexmanagement.opensearchapi.optionalTimeField import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -138,17 +132,6 @@ private fun updateEnabledField(uuid: String, enabled: Boolean, enabledTime: Long return UpdateRequest(INDEX_MANAGEMENT_INDEX, uuid).doc(builder) } -fun updateISMTemplateRequest(policyID: String, ismTemplates: List, seqNo: Long, primaryTerm: Long): UpdateRequest { - val builder = XContentFactory.jsonBuilder() - .startObject() - .startObject(Policy.POLICY_TYPE) - .optionalISMTemplateField(Policy.ISM_TEMPLATE, ismTemplates) - .endObject() - .endObject() - return UpdateRequest(INDEX_MANAGEMENT_INDEX, policyID).doc(builder) - .setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm) -} - fun updateDisableManagedIndexRequest(uuid: String): UpdateRequest { return updateEnabledField(uuid, false, null) } @@ -455,7 +438,7 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { * @param stateName the name of the state the [ManagedIndexConfig] is currently in * @param newPolicy the new (actual data model) policy we will eventually try to change to * @param changePolicy the change policy to change to - * @return if its safe to change + * @return if it's safe to change */ @Suppress("ReturnCount") fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: ChangePolicy): Boolean { @@ -486,71 +469,6 @@ fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: C */ fun Action.isAllowed(allowList: List): Boolean = allowList.contains(this.type) -/** - * Check if cluster state metadata has been moved to config index - * - * log warning if remaining cluster state metadata has newer last_updated_time - */ -@Suppress("ReturnCount", "ComplexCondition", "ComplexMethod") -fun checkMetadata( - clusterStateMetadata: ManagedIndexMetaData?, - configIndexMetadata: Any?, - currentIndexUuid: String?, - logger: Logger -): MetadataCheck { - // indexUuid saved in ISM metadata may be outdated - // if an index restored from snapshot - val indexUuid1 = clusterStateMetadata?.indexUuid - val indexUuid2 = when (configIndexMetadata) { - is ManagedIndexMetaData -> configIndexMetadata.indexUuid - is Map<*, *> -> configIndexMetadata["index_uuid"] - else -> null - } as String? - if ((indexUuid1 != null && indexUuid1 != currentIndexUuid) || - (indexUuid2 != null && indexUuid2 != currentIndexUuid) - ) { - return MetadataCheck.CORRUPT - } - - if (clusterStateMetadata != null) { - if (configIndexMetadata == null) return MetadataCheck.PENDING - - // compare last updated time between 2 metadatas - val t1 = clusterStateMetadata.stepMetaData?.startTime - val t2 = when (configIndexMetadata) { - is ManagedIndexMetaData -> configIndexMetadata.stepMetaData?.startTime - is Map<*, *> -> { - @Suppress("UNCHECKED_CAST") - val stepMetadata = configIndexMetadata["step"] as Map? - stepMetadata?.get("start_time") - } - else -> null - } as Long? - if (t1 != null && t2 != null && t1 > t2) { - logger.warn("Cluster state metadata get updates after moved for [${clusterStateMetadata.index}]") - } - } - return MetadataCheck.SUCCESS -} - -enum class MetadataCheck { - PENDING, CORRUPT, SUCCESS -} - -// private val baseMessageLogger = LogManager.getLogger(BaseMessage::class.java) -// -// fun BaseMessage.isHostInDenylist(networks: List): Boolean { -// val ipStr = IPAddressString(this.uri.host) -// for (network in networks) { -// val netStr = IPAddressString(network) -// if (netStr.contains(ipStr)) { -// baseMessageLogger.error("Host: {} resolves to: {} which is in denylist: {}.", uri.host, InetAddress.getByName(uri.host), netStr) -// return true -// } -// } -// return false -// } - @Suppress("BlockingMethodInNonBlockingContext") suspend fun getManagedIndexConfig(indexUuid: String, client: Client): ManagedIndexConfig? { val request = GetRequest().routing(indexUuid).index(INDEX_MANAGEMENT_INDEX).id(indexUuid) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index 5fe439716..4442ccc82 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -6,11 +6,8 @@ @file:Suppress("TopLevelPropertyNaming", "MatchingDeclarationName") package org.opensearch.indexmanagement.indexstatemanagement.util -import org.apache.logging.log4j.Logger import org.opensearch.OpenSearchParseException import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.client.Client import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable @@ -20,15 +17,10 @@ import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.ToXContentFragment import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory -import org.opensearch.core.index.Index import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest import org.opensearch.indexmanagement.opensearchapi.optionalTimeField -import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.rest.RestRequest -import java.lang.Exception import java.time.Instant const val WITH_TYPE = "with_type" @@ -121,20 +113,6 @@ fun getPartialChangePolicyBuilder( return builder.endObject().endObject() } -/** - * Removes the managed index metadata from the cluster state for the the provided indices. - */ -suspend fun removeClusterStateMetadatas(client: Client, logger: Logger, indices: List) { - val request = UpdateManagedIndexMetaDataRequest(indicesToRemoveManagedIndexMetaDataFrom = indices) - - try { - val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) } - logger.debug("Cleaned cluster state metadata for $indices, ${response.isAcknowledged}") - } catch (e: Exception) { - logger.error("Failed to clean cluster state metadata for $indices") - } -} - const val MASTER_TIMEOUT_DEPRECATED_MESSAGE = "Parameter [master_timeout] is deprecated and will be removed in 3.0. " + "To support inclusive language, please use [cluster_manager_timeout] instead." diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt index 2bf14f914..41bb1662d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt @@ -41,7 +41,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { LegacyOpenDistroManagedIndexSettings.ROLLOVER_ALIAS, LegacyOpenDistroManagedIndexSettings.ROLLOVER_SKIP, LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED, - LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED, LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL, LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD, LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_COUNT, @@ -78,7 +77,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { ManagedIndexSettings.POLICY_ID, ManagedIndexSettings.ROLLOVER_ALIAS, ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED, - ManagedIndexSettings.METADATA_SERVICE_ENABLED, ManagedIndexSettings.JOB_INTERVAL, ManagedIndexSettings.SWEEP_PERIOD, ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT, @@ -159,7 +157,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { .build() assertEquals(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.get(settings), false) - assertEquals(ManagedIndexSettings.METADATA_SERVICE_ENABLED.get(settings), false) assertEquals(ManagedIndexSettings.JOB_INTERVAL.get(settings), 1) assertEquals(ManagedIndexSettings.SWEEP_PERIOD.get(settings), TimeValue.timeValueMinutes(6)) assertEquals(ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) @@ -186,7 +183,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { assertSettingDeprecationsAndWarnings( arrayOf( LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED, - LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED, LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL, LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD, LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt index 59464d152..178e93918 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt @@ -59,8 +59,6 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { settingSet.add(ManagedIndexSettings.JITTER) settingSet.add(ManagedIndexSettings.JOB_INTERVAL) settingSet.add(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED) - settingSet.add(ManagedIndexSettings.METADATA_SERVICE_STATUS) - settingSet.add(ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL) settingSet.add(ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT) settingSet.add(ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS) settingSet.add(ManagedIndexSettings.RESTRICTED_INDEX_PATTERN)