Skip to content

Commit

Permalink
Remove the old stale cluster state ism metadata logic
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Oct 1, 2023
1 parent a2dd769 commit aa09f85
Show file tree
Hide file tree
Showing 18 changed files with 35 additions and 523 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.remo
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.TransportRetryFailedManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.validation.ActionValidation
import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
Expand Down Expand Up @@ -500,10 +498,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.ACTION_VALIDATION_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.METADATA_SERVICE_STATUS,
ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
ManagedIndexSettings.JITTER,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
Expand Down Expand Up @@ -540,16 +535,13 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
LegacyOpenDistroManagedIndexSettings.ROLLOVER_ALIAS,
LegacyOpenDistroManagedIndexSettings.ROLLOVER_SKIP,
LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
LegacyOpenDistroManagedIndexSettings.ALLOW_LIST,
LegacyOpenDistroManagedIndexSettings.SNAPSHOT_DENY_LIST,
LegacyOpenDistroManagedIndexSettings.AUTO_MANAGE,
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS,
LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
LegacyOpenDistroManagedIndexSettings.RESTRICTED_INDEX_PATTERN,
LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
Expand All @@ -565,7 +557,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

override fun getActions(): List<ActionPlugin.ActionHandler<out ActionRequest, out ActionResponse>> {
return listOf(
ActionPlugin.ActionHandler(UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java),
ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java),
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java),
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ class ManagedIndexCoordinator(
private val ismIndices = indexManagementIndices

private var scheduledFullSweep: Scheduler.Cancellable? = null
private var scheduledMoveMetadata: Scheduler.Cancellable? = null
private var scheduledTemplateMigration: Scheduler.Cancellable? = null

@Volatile private var lastFullSweepTimeNano = System.nanoTime()
@Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings)
Expand Down Expand Up @@ -170,10 +168,6 @@ class ManagedIndexCoordinator(
fun offClusterManager() {
// Cancel background sweep when demoted from being cluster manager
scheduledFullSweep?.cancel()

scheduledMoveMetadata?.cancel()

scheduledTemplateMigration?.cancel()
}

override fun clusterChanged(event: ClusterChangedEvent) {
Expand Down Expand Up @@ -206,8 +200,6 @@ class ManagedIndexCoordinator(

override fun beforeStop() {
scheduledFullSweep?.cancel()

scheduledMoveMetadata?.cancel()
}

private fun enable() {
Expand All @@ -229,8 +221,6 @@ class ManagedIndexCoordinator(
private fun disable() {
scheduledFullSweep?.cancel()
indexStateManagementEnabled = false

scheduledMoveMetadata?.cancel()
}

private suspend fun reenableJobs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,12 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ACTION_VALIDATION_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck
import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentJobInterval
import org.opensearch.indexmanagement.indexstatemanagement.util.hasTimedOut
import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict
import org.opensearch.indexmanagement.indexstatemanagement.util.isAllowed
import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed
import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange
Expand Down Expand Up @@ -98,6 +95,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
Expand Down Expand Up @@ -281,13 +279,6 @@ object ManagedIndexRunner :
logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.")
return
}
} else {
val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata()
val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger)
if (metadataCheck != MetadataCheck.SUCCESS) {
logger.info("Skipping execution while metadata status is $metadataCheck")
return
}
}

// If policy or managedIndexMetaData is null then initialize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,6 @@ fun IndexMetadata.getRolloverSkip(): Boolean {
return this.settings.getAsBoolean(ManagedIndexSettings.ROLLOVER_SKIP.key, false)
}

fun IndexMetadata.getManagedIndexMetadata(): ManagedIndexMetaData? {
val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE)

if (existingMetaDataMap != null) {
return ManagedIndexMetaData.fromMap(existingMetaDataMap)
}
return null
}

fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: DefaultIndexMetadataService): MutableList<String> {
val indexMetadatas = state.metadata.indices
val closeList = mutableListOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import java.util.function.Function
class LegacyOpenDistroManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_METADATA_SERVICE_STATUS = 0
const val DEFAULT_METADATA_SERVICE_ENABLED = true
const val DEFAULT_JOB_INTERVAL = 5
private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
val ALLOW_LIST_NONE = emptyList<String>()
Expand All @@ -30,36 +28,6 @@ class LegacyOpenDistroManagedIndexSettings {
Setting.Property.Deprecated
)

// 0: migration is going on
// 1: migration succeed
// -1: migration failed
val METADATA_SERVICE_STATUS: Setting<Int> = Setting.intSetting(
"opendistro.index_state_management.metadata_migration.status",
DEFAULT_METADATA_SERVICE_STATUS,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

// 0: enabled, use onClusterManager time as ISM template last_updated_time
// -1: migration ended successfully
// -2: migration ended unsuccessfully
// >0: use this setting (epoch millis) as ISM template last_updated_time
val TEMPLATE_MIGRATION_CONTROL: Setting<Long> = Setting.longSetting(
"opendistro.index_state_management.template_migration.control",
ManagedIndexSettings.DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP,
-2L,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val METADATA_SERVICE_ENABLED: Setting<Boolean> = Setting.boolSetting(
"opendistro.index_state_management.metadata_service.enabled",
DEFAULT_METADATA_SERVICE_ENABLED,
Setting.Property.NodeScope,
Setting.Property.Dynamic,
Setting.Property.Deprecated
)

val POLICY_ID: Setting<String> = Setting.simpleString(
"index.opendistro.index_state_management.policy_id",
Setting.Property.IndexScope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class ManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_ACTION_VALIDATION_ENABLED = false
const val DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP = 0L
const val DEFAULT_JOB_INTERVAL = 5
const val DEFAULT_JITTER = 0.6
const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX"
Expand All @@ -36,35 +35,6 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

// 0: migration is going on
// 1: migration succeed
// -1: migration failed
val METADATA_SERVICE_STATUS: Setting<Int> = Setting.intSetting(
"plugins.index_state_management.metadata_migration.status",
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

// 0: enabled, use onClusterManager time as ISM template last_updated_time
// -1: migration ended successfully
// -2: migration ended unsuccessfully
// >0: use this setting (epoch millis) as ISM template last_updated_time
val TEMPLATE_MIGRATION_CONTROL: Setting<Long> = Setting.longSetting(
"plugins.index_state_management.template_migration.control",
LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
-2L,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val METADATA_SERVICE_ENABLED: Setting<Boolean> = Setting.boolSetting(
"plugins.index_state_management.metadata_service.enabled",
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val POLICY_ID: Setting<String> = Setting.simpleString(
"index.plugins.index_state_management.policy_id",
LegacyOpenDistroManagedIndexSettings.POLICY_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.authuser.User
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.index.Index
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService
import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider
Expand All @@ -49,7 +49,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.mana
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.removeClusterStateMetadatas
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
Expand All @@ -61,6 +60,7 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Duration
Expand Down Expand Up @@ -126,7 +126,7 @@ class TransportAddPolicyAction @Inject constructor(
}

@Suppress("SpreadOperator")
fun getClusterState() {
private fun getClusterState() {
startTime = Instant.now()
CoroutineScope(Dispatchers.IO).launch {
val indexNameToMetadata: MutableMap<String, ISMIndexMetadata> = HashMap()
Expand Down Expand Up @@ -193,10 +193,6 @@ class TransportAddPolicyAction @Inject constructor(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(response: ClusterStateResponse) {
CoroutineScope(Dispatchers.IO).launch {
removeClusterStateMetadatas(client, log, indicesToAdd.map { Index(it.value, it.key) })
}

val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService
getUuidsForClosedIndices(response.state, defaultIndexMetadataService).forEach {
failedIndices.add(FailedIndex(indicesToAdd[it] as String, it, "This index is closed"))
Expand Down Expand Up @@ -346,6 +342,9 @@ class TransportAddPolicyAction @Inject constructor(
}
}
actionListener.onResponse(ISMStatusResponse(indicesToAdd.size, failedIndices))

// best effort to clean up ISM metadata
removeMetadatas(indicesToAdd.map { Index(it.value, it.key) })
}

override fun onFailure(t: Exception) {
Expand All @@ -368,6 +367,23 @@ class TransportAddPolicyAction @Inject constructor(
private fun onFailure(t: Exception) {
actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception)
}

fun removeMetadatas(indices: List<Index>) {
val request = indices.map { deleteManagedIndexMetadataRequest(it.uuid) }
val bulkReq = BulkRequest().add(request)
client.bulk(
bulkReq,
object : ActionListener<BulkResponse> {
override fun onResponse(response: BulkResponse) {
log.debug("Successfully cleaned metadata for remove policy indices: {}", indices)
}

override fun onFailure(e: Exception) {
log.error("Failed to clean metadata for remove policy indices.", e)

Check warning on line 382 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt#L382

Added line #L382 was not covered by tests
}
}
)
}
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.buildMgetMetadataRequest
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetResponseToMap
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse
Expand Down Expand Up @@ -268,9 +267,6 @@ class TransportChangePolicyAction @Inject constructor(
val includedStates = changePolicy.include.map { it.state }.toSet()

indicesToUpdate.forEach { (indexUuid, indexName) ->
// indexMetaData and clusterStateMetadata will be null for non-default index types
val indexMetaData = indexUuidToIndexMetadata[indexUuid]
val clusterStateMetadata = indexMetaData?.getManagedIndexMetadata()
val mgetFailure = metadataMap[indexUuid]?.second
val managedIndexMetadata: ManagedIndexMetaData? = metadataMap[managedIndexMetadataID(indexUuid)]?.first

Expand All @@ -296,25 +292,16 @@ class TransportChangePolicyAction @Inject constructor(
RestChangePolicyAction.INDEX_IN_TRANSITION
)
)
// else if there is no ManagedIndexMetaData yet then the managed index has not initialized and we can change the policy safely
// else if there is no ManagedIndexMetaData yet then the managed index has not initialized, and we can change the policy safely
managedIndexMetadata == null -> {
if (clusterStateMetadata != null) {
failedIndices.add(
FailedIndex(
indexName, indexUuid,
"Cannot change policy until metadata has finished migrating"
)
)
} else {
managedIndicesToUpdate.add(indexName to indexUuid)
}
managedIndicesToUpdate.add(indexName to indexUuid)
}
// else if the includedStates is empty (i.e. not being used) then we will always try to update the managed index
includedStates.isEmpty() -> managedIndicesToUpdate.add(indexName to indexUuid)
// else only update the managed index if its currently in one of the included states
includedStates.contains(managedIndexMetadata.stateMetaData?.name) ->
managedIndicesToUpdate.add(indexName to indexUuid)
// else the managed index did not match any of the included state filters and we will not update it
// else the managed index did not match any of the included state filters, and we will not update it
else -> log.debug("Skipping $indexName as it does not match any of the include state filters")
}
}
Expand Down
Loading

0 comments on commit aa09f85

Please sign in to comment.