diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 1940b99ce..56c328a3d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -118,10 +118,13 @@ object ManagedIndexRunner : private lateinit var extensionStatusChecker: ExtensionStatusChecker private lateinit var indexMetadataProvider: IndexMetadataProvider private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED + @Suppress("MagicNumber") private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) + @Suppress("MagicNumber") private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) + @Suppress("MagicNumber") private val errorNotificationRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) private var jobInterval: Int = DEFAULT_JOB_INTERVAL @@ -345,7 +348,8 @@ object ManagedIndexRunner : val info = mapOf("message" to "Previous action was not able to update IndexMetaData.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info + policyRetryInfo = PolicyRetryInfoMetaData(true, 0), + info = info ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) @@ -359,7 +363,8 @@ object ManagedIndexRunner : val info = mapOf("message" to "Failed to execute action=${action?.type} as extension [$actionExtensionName] is not enabled.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info + policyRetryInfo = PolicyRetryInfoMetaData(true, 0), + info = info ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) @@ -372,7 +377,8 @@ object ManagedIndexRunner : val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info + policyRetryInfo = PolicyRetryInfoMetaData(true, 0), + info = info ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) @@ -388,7 +394,10 @@ object ManagedIndexRunner : // Step null check is done in getStartingManagedIndexMetaData withClosableContext( IndexManagementSecurityContext( - managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user + managedIndexConfig.id, + settings, + threadPool.threadContext, + managedIndexConfig.policy.user ) ) { step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger) @@ -476,8 +485,10 @@ object ManagedIndexRunner : // Intellij complains about createParser/parseWithType blocking because it sees they throw IOExceptions return withContext(Dispatchers.IO) { val xcp = XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - policySource, XContentType.JSON + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + policySource, + XContentType.JSON ) xcp.parseWithType(getResponse.id, getResponse.seqNo, getResponse.primaryTerm, Policy.Companion::parse) } @@ -504,8 +515,11 @@ object ManagedIndexRunner : @Suppress("TooGenericExceptionCaught") private suspend fun savePolicyToManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, policy: Policy): Boolean { val updatedManagedIndexConfig = managedIndexConfig.copy( - policyID = policy.id, policy = policy, - policySeqNo = policy.seqNo, policyPrimaryTerm = policy.primaryTerm, changePolicy = null + policyID = policy.id, + policy = policy, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + changePolicy = null ) val indexRequest = managedIndexConfigIndexRequest(updatedManagedIndexConfig) var savedPolicy = false @@ -605,8 +619,8 @@ object ManagedIndexRunner : // this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm // in the metadata, in this case we just want to say we successfully initialized the policy again but we will not // modify the state, action, etc. so it can resume where it left off - managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm - && managedIndexMetaData.policyID == policy.id -> + managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm && + managedIndexMetaData.policyID == policy.id -> // If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue. managedIndexMetaData.copy( policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), @@ -688,7 +702,6 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null val changePolicy = managedIndexConfig.changePolicy if (changePolicy == null) { @@ -709,8 +722,13 @@ object ManagedIndexRunner : // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase val newTransitionMetaData = ActionMetaData( - TransitionsAction.name, Instant.now().toEpochMilli(), -1, - false, 0, 0, null + TransitionsAction.name, + Instant.now().toEpochMilli(), + -1, + false, + 0, + 0, + null ) val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { newTransitionMetaData diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index 66f38333d..73b3abf17 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -60,7 +60,10 @@ class TransportIndexPolicyAction @Inject constructor( val settings: Settings, val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( - IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest + IndexPolicyAction.NAME, + transportService, + actionFilters, + ::IndexPolicyRequest ) { @Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings) @@ -187,7 +190,8 @@ class TransportIndexPolicyAction @Inject constructor( private fun putPolicy() { val policy = request.policy.copy( - schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user + schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, + user = this.user ) val indexRequest = IndexRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) @@ -237,7 +241,7 @@ class TransportIndexPolicyAction @Inject constructor( val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { response.shardInfo.failures.forEach { - entry -> + entry -> failureReasons.append(entry.reason()) } return failureReasons.toString() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt index c264d27fc..74fe5c857 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt @@ -48,7 +48,7 @@ class RollupIndexer( init { clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_INGEST_BACKOFF_MILLIS, ROLLUP_INGEST_BACKOFF_COUNT) { - millis, count -> + millis, count -> retryIngestPolicy = BackoffPolicy.constantBackoff(millis, count) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index 460b8292d..69b27074e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -46,7 +46,7 @@ class RollupSearchService( init { clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_SEARCH_BACKOFF_MILLIS, ROLLUP_SEARCH_BACKOFF_COUNT) { - millis, count -> + millis, count -> retrySearchPolicy = BackoffPolicy.constantBackoff(millis, count) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt index 6dcfd2bae..e67837c7d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt @@ -41,8 +41,7 @@ class FieldCapsFilter( @Volatile private var shouldIntercept = RollupSettings.ROLLUP_DASHBOARDS.get(settings) init { - clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) { - flag -> + clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) { flag -> shouldIntercept = flag } } @@ -59,7 +58,7 @@ class FieldCapsFilter( val rollupIndices = mutableSetOf() val nonRollupIndices = mutableSetOf() val remoteClusterIndices = GuiceHolder.remoteClusterService.groupIndices(request.indicesOptions(), indices) { - idx: String? -> + idx: String? -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterService.state()) } val localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) @@ -102,7 +101,9 @@ class FieldCapsFilter( } chain.proceed( - task, action, request, + task, + action, + request, object : ActionListener { override fun onResponse(response: Response) { logger.info("Has rollup indices will rewrite field caps response") @@ -192,10 +193,15 @@ class FieldCapsFilter( } val isSearchable = fieldMapping.fieldType == RollupFieldMapping.Companion.FieldType.DIMENSION response[fieldName]!![type] = FieldCapabilities( - fieldName, type, isSearchable, true, + fieldName, + type, + isSearchable, + true, fieldMappingIndexMap.getValue(fieldMapping) .toTypedArray(), - null, null, mapOf>() + null, + null, + mapOf>() ) } @@ -267,10 +273,15 @@ class FieldCapsFilter( val fieldCaps = fields.getValue(field).getValue(type) val rewrittenIndices = if (fieldCaps.indices() != null && fieldCaps.indices().isNotEmpty()) fieldCaps.indices() else indices expandedResponse[field]!![type] = FieldCapabilities( - fieldCaps.name, fieldCaps.type, fieldCaps.isSearchable, + fieldCaps.name, + fieldCaps.type, + fieldCaps.isSearchable, fieldCaps .isAggregatable, - rewrittenIndices, fieldCaps.nonSearchableIndices(), fieldCaps.nonAggregatableIndices(), fieldCaps.meta() + rewrittenIndices, + fieldCaps.nonSearchableIndices(), + fieldCaps.nonAggregatableIndices(), + fieldCaps.meta() ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 0c4e5614d..4c83c4333 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -49,7 +49,7 @@ class TransformIndexer( TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT ) { - millis, count -> + millis, count -> backoffPolicy = BackoffPolicy.constantBackoff(millis, count) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 3e22b68ce..57090c18c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -74,7 +74,7 @@ class TransformSearchService( init { clusterService.clusterSettings.addSettingsUpdateConsumer(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS, TRANSFORM_JOB_SEARCH_BACKOFF_COUNT) { - millis, count -> + millis, count -> backoffPolicy = BackoffPolicy.constantBackoff(millis, count) } }