diff --git a/build.gradle b/build.gradle index c6e8b032c..a67d141b8 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,7 @@ buildscript { notification_version = System.getProperty("notification.version", opensearch_build) common_utils_version = System.getProperty("common_utils.version", opensearch_build) job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build) - kotlin_version = System.getProperty("kotlin.version", "1.4.32") + kotlin_version = System.getProperty("kotlin.version", "1.6.10") opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","") job_scheduler_no_snapshot = job_scheduler_version.replace("-SNAPSHOT","") @@ -39,16 +39,19 @@ buildscript { classpath "org.opensearch.gradle:build-tools:${opensearch_version}" classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}" classpath "org.jetbrains.kotlin:kotlin-allopen:${kotlin_version}" - classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.17.1" - classpath "org.jacoco:org.jacoco.agent:0.8.5" + classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.21.0" + classpath "org.jacoco:org.jacoco.agent:0.8.7" } } + + plugins { id 'nebula.ospackage' version "8.3.0" id "com.dorongold.task-tree" version "1.5" } + apply plugin: 'java' apply plugin: 'jacoco' apply plugin: 'idea' @@ -73,8 +76,8 @@ configurations.all { force 'org.apache.httpcomponents:httpclient-osgi:4.5.13' force 'org.apache.httpcomponents.client5:httpclient5:5.0.3' force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3' - force 'com.fasterxml.jackson.core:jackson-databind:2.10.4' - force 'org.yaml:snakeyaml:1.26' + force 'com.fasterxml.jackson.core:jackson-databind:2.13.4' + force 'org.yaml:snakeyaml:1.32' force 'org.codehaus.plexus:plexus-utils:3.0.24' } } @@ -127,7 +130,7 @@ detekt { buildUponDefaultConfig = true } -configurations.testCompile { +configurations.testImplementation { exclude module: "securemock" } @@ -144,6 +147,7 @@ allprojects { if (isSnapshot) { version += "-SNAPSHOT" } + jacoco.toolVersion = "0.8.7" } dependencies { @@ -164,7 +168,7 @@ dependencies { testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" testCompile "org.mockito:mockito-core:3.12.4" - add("ktlint", "com.pinterest:ktlint:0.41.0") { + add("ktlint", "com.pinterest:ktlint:0.45.1") { attributes { attribute(Bundling.BUNDLING_ATTRIBUTE, objects.named(Bundling, Bundling.EXTERNAL)) } @@ -602,4 +606,4 @@ task updateVersion { // String tokenization to support -SNAPSHOT ant.replaceregexp(file:'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags:'g', byline:true) } -} +} \ No newline at end of file diff --git a/detekt.yml b/detekt.yml index 1929d8197..47b9d163c 100644 --- a/detekt.yml +++ b/detekt.yml @@ -1,6 +1,6 @@ # TODO: Remove this before initial release, only for developmental purposes build: - maxIssues: 10 + maxIssues: 20 exceptions: TooGenericExceptionCaught: diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt index c4494887e..1a4b2f971 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt @@ -68,7 +68,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) { companion object { fun read(streamInput: StreamInput): StepStatus { - return valueOf(streamInput.readString().toUpperCase(Locale.ROOT)) + return valueOf(streamInput.readString().uppercase(Locale.ROOT)) } } } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt index c60b88b99..045fdbb9b 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionRetry.kt @@ -72,7 +72,7 @@ data class ActionRetry( when (fieldName) { COUNT_FIELD -> count = xcp.longValue() - BACKOFF_FIELD -> backoff = Backoff.valueOf(xcp.text().toUpperCase(Locale.ROOT)) + BACKOFF_FIELD -> backoff = Backoff.valueOf(xcp.text().uppercase(Locale.ROOT)) DELAY_FIELD -> delay = TimeValue.parseTimeValue(xcp.text(), DELAY_FIELD) } } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepMetaData.kt index a83d9c1b5..9b9ca40f1 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepMetaData.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/StepMetaData.kt @@ -90,7 +90,7 @@ data class StepMetaData( when (fieldName) { NAME -> name = xcp.text() START_TIME -> startTime = xcp.longValue() - STEP_STATUS -> stepStatus = Step.StepStatus.valueOf(xcp.text().toUpperCase(Locale.ROOT)) + STEP_STATUS -> stepStatus = Step.StepStatus.valueOf(xcp.text().uppercase(Locale.ROOT)) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 609f87330..5ab6913c4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -4,7 +4,6 @@ */ package org.opensearch.indexmanagement - import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse @@ -38,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinat import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.MetadataService import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution +import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction @@ -73,7 +73,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retr import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE -import org.opensearch.indexmanagement.migration.ISMTemplateService import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction import org.opensearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction import org.opensearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction @@ -251,7 +250,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin indexManagementExtensions.forEach { extension -> val extensionName = extension.getExtensionName() if (extensionName in extensions) { - throw IllegalStateException("Multiple extensions of IndexManagement have same name $extensionName - not supported") + error("Multiple extensions of IndexManagement have same name $extensionName - not supported") } extension.getISMActionParsers().forEach { parser -> ISMActionsParser.instance.addParser(parser, extensionName) @@ -259,7 +258,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin indexMetadataServices.add(extension.getIndexMetadataService()) extension.overrideClusterStateIndexUuidSetting()?.let { if (customIndexUUIDSetting != null) { - throw IllegalStateException( + error( "Multiple extensions of IndexManagement plugin overriding ClusterStateIndexUUIDSetting - not supported" ) } @@ -359,7 +358,9 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ) indexMetadataProvider = IndexMetadataProvider( - settings, client, clusterService, + settings, + client, + clusterService, hashMapOf( DEFAULT_INDEX_TYPE to DefaultIndexMetadataService(customIndexUUIDSetting) ) @@ -386,7 +387,13 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin val managedIndexCoordinator = ManagedIndexCoordinator( environment.settings(), - client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider + client, + clusterService, + threadPool, + indexManagementIndices, + metadataService, + templateService, + indexMetadataProvider ) return listOf( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 2a426c88e..c00e51c38 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -47,6 +47,7 @@ import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig @@ -73,7 +74,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed import org.opensearch.indexmanagement.indexstatemanagement.util.isPolicyCompleted import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.updateEnableManagedIndexRequest -import org.opensearch.indexmanagement.migration.ISMTemplateService import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext import org.opensearch.indexmanagement.opensearchapi.contentParser import org.opensearch.indexmanagement.opensearchapi.parseFromSearchResponse @@ -128,17 +128,26 @@ class ManagedIndexCoordinator( private var scheduledTemplateMigration: Scheduler.Cancellable? = null @Volatile private var lastFullSweepTimeNano = System.nanoTime() + @Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings) + @Volatile private var metadataServiceEnabled = METADATA_SERVICE_ENABLED.get(settings) + @Volatile private var sweepPeriod = SWEEP_PERIOD.get(settings) + @Volatile private var retryPolicy = BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings)) + @Volatile private var templateMigrationEnabled: Boolean = true + @Volatile private var templateMigrationEnabledSetting = TEMPLATE_MIGRATION_CONTROL.get(settings) + @Volatile private var jobInterval = JOB_INTERVAL.get(settings) + @Volatile private var jobJitter = JITTER.get(settings) @Volatile private var isMaster = false + @Volatile private var onMasterTimeStamp: Long = 0L init { @@ -168,8 +177,7 @@ class ManagedIndexCoordinator( if (!templateMigrationEnabled) scheduledTemplateMigration?.cancel() else initTemplateMigration(it) } - clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) { - millis, count -> + clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) { millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count) } } @@ -556,14 +564,15 @@ class ManagedIndexCoordinator( logger.info("Performing ISM template migration.") if (enableSetting == 0L) { - if (onMasterTimeStamp != 0L) + if (onMasterTimeStamp != 0L) { templateService.doMigration(Instant.ofEpochMilli(onMasterTimeStamp)) - else { + } else { logger.error("No valid onMaster time cached, cancel ISM template migration job.") scheduledTemplateMigration?.cancel() } - } else + } else { templateService.doMigration(Instant.ofEpochMilli(enableSetting)) + } } catch (e: Exception) { logger.error("Failed to migrate ISM template", e) } @@ -596,7 +605,8 @@ class ManagedIndexCoordinator( // Get the matching policyIds for applicable indices val updateMatchingIndicesReqs = createManagedIndexRequests( - clusterService.state(), unManagedIndices.map { (indexName, _) -> indexName } + clusterService.state(), + unManagedIndices.map { (indexName, _) -> indexName } ) // check all managed indices, if the index has already been deleted @@ -701,7 +711,10 @@ class ManagedIndexCoordinator( mRes.forEach { if (it.response.isExists) { result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType( - it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse + it.response.id, + it.response.seqNo, + it.response.primaryTerm, + ManagedIndexConfig.Companion::parse ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 1940b99ce..56c328a3d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -118,10 +118,13 @@ object ManagedIndexRunner : private lateinit var extensionStatusChecker: ExtensionStatusChecker private lateinit var indexMetadataProvider: IndexMetadataProvider private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED + @Suppress("MagicNumber") private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) + @Suppress("MagicNumber") private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) + @Suppress("MagicNumber") private val errorNotificationRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) private var jobInterval: Int = DEFAULT_JOB_INTERVAL @@ -345,7 +348,8 @@ object ManagedIndexRunner : val info = mapOf("message" to "Previous action was not able to update IndexMetaData.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info + policyRetryInfo = PolicyRetryInfoMetaData(true, 0), + info = info ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) @@ -359,7 +363,8 @@ object ManagedIndexRunner : val info = mapOf("message" to "Failed to execute action=${action?.type} as extension [$actionExtensionName] is not enabled.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info + policyRetryInfo = PolicyRetryInfoMetaData(true, 0), + info = info ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) @@ -372,7 +377,8 @@ object ManagedIndexRunner : val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info + policyRetryInfo = PolicyRetryInfoMetaData(true, 0), + info = info ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) @@ -388,7 +394,10 @@ object ManagedIndexRunner : // Step null check is done in getStartingManagedIndexMetaData withClosableContext( IndexManagementSecurityContext( - managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user + managedIndexConfig.id, + settings, + threadPool.threadContext, + managedIndexConfig.policy.user ) ) { step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger) @@ -476,8 +485,10 @@ object ManagedIndexRunner : // Intellij complains about createParser/parseWithType blocking because it sees they throw IOExceptions return withContext(Dispatchers.IO) { val xcp = XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - policySource, XContentType.JSON + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + policySource, + XContentType.JSON ) xcp.parseWithType(getResponse.id, getResponse.seqNo, getResponse.primaryTerm, Policy.Companion::parse) } @@ -504,8 +515,11 @@ object ManagedIndexRunner : @Suppress("TooGenericExceptionCaught") private suspend fun savePolicyToManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, policy: Policy): Boolean { val updatedManagedIndexConfig = managedIndexConfig.copy( - policyID = policy.id, policy = policy, - policySeqNo = policy.seqNo, policyPrimaryTerm = policy.primaryTerm, changePolicy = null + policyID = policy.id, + policy = policy, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + changePolicy = null ) val indexRequest = managedIndexConfigIndexRequest(updatedManagedIndexConfig) var savedPolicy = false @@ -605,8 +619,8 @@ object ManagedIndexRunner : // this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm // in the metadata, in this case we just want to say we successfully initialized the policy again but we will not // modify the state, action, etc. so it can resume where it left off - managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm - && managedIndexMetaData.policyID == policy.id -> + managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm && + managedIndexMetaData.policyID == policy.id -> // If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue. managedIndexMetaData.copy( policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), @@ -688,7 +702,6 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null val changePolicy = managedIndexConfig.changePolicy if (changePolicy == null) { @@ -709,8 +722,13 @@ object ManagedIndexRunner : // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase val newTransitionMetaData = ActionMetaData( - TransitionsAction.name, Instant.now().toEpochMilli(), -1, - false, 0, 0, null + TransitionsAction.name, + Instant.now().toEpochMilli(), + -1, + false, + 0, + 0, + null ) val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { newTransitionMetaData diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServices.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServices.kt index 364e5366e..154ce65d5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServices.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServices.kt @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.indexmanagement.migration +package org.opensearch.indexmanagement.indexstatemanagement.migration import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext @@ -254,7 +254,7 @@ class ISMTemplateService( private fun populateV2ISMTemplateMap(policyID: String, indexPatterns: List, priority: Int) { var v1Increment = 0 - val v1MaxOrder = v1orderToBucketIncrement.keys.max() + val v1MaxOrder = v1orderToBucketIncrement.keys.maxOrNull() if (v1MaxOrder != null) { v1Increment = v1MaxOrder + v1orderToBucketIncrement.values.sum() } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt index d0c2d75d3..24a5c48b7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt @@ -18,7 +18,6 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.opensearchapi.string import java.io.IOException -import java.lang.IllegalStateException /** * A value object that represents a Chime message. Chime message will be @@ -61,7 +60,7 @@ data class Chime(val url: String) : ToXContent, Writeable { when (fieldName) { URL -> url = xcp.text() else -> { - throw IllegalStateException("Unexpected field: $fieldName, while parsing Chime destination") + error("Unexpected field: $fieldName, while parsing Chime destination") } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/CustomWebhook.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/CustomWebhook.kt index 288757c7c..ce008b865 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/CustomWebhook.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/CustomWebhook.kt @@ -15,7 +15,6 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import java.io.IOException -import java.lang.IllegalStateException /** * A value object that represents a Custom webhook message. Webhook message will be @@ -121,7 +120,7 @@ data class CustomWebhook( USERNAME_FIELD -> username = xcp.textOrNull() PASSWORD_FIELD -> password = xcp.textOrNull() else -> { - throw IllegalStateException("Unexpected field: $fieldName, while parsing custom webhook destination") + error("Unexpected field: $fieldName, while parsing custom webhook destination") } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt index 32cf40939..684f8f84a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt @@ -18,7 +18,6 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.opensearchapi.string import java.io.IOException -import java.lang.IllegalStateException /** * A value object that represents a Slack message. Slack message will be @@ -61,7 +60,7 @@ data class Slack(val url: String) : ToXContent, Writeable { when (fieldName) { URL -> url = xcp.text() else -> { - throw IllegalStateException("Unexpected field: $fieldName, while parsing Slack destination") + error("Unexpected field: $fieldName, while parsing Slack destination") } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index 66f38333d..73b3abf17 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -60,7 +60,10 @@ class TransportIndexPolicyAction @Inject constructor( val settings: Settings, val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( - IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest + IndexPolicyAction.NAME, + transportService, + actionFilters, + ::IndexPolicyRequest ) { @Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings) @@ -187,7 +190,8 @@ class TransportIndexPolicyAction @Inject constructor( private fun putPolicy() { val policy = request.policy.copy( - schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user + schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, + user = this.user ) val indexRequest = IndexRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) @@ -237,7 +241,7 @@ class TransportIndexPolicyAction @Inject constructor( val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { response.shardInfo.failures.forEach { - entry -> + entry -> failureReasons.append(entry.reason()) } return failureReasons.toString() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt index 1b9b77963..74fe5c857 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt @@ -48,7 +48,7 @@ class RollupIndexer( init { clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_INGEST_BACKOFF_MILLIS, ROLLUP_INGEST_BACKOFF_COUNT) { - millis, count -> + millis, count -> retryIngestPolicy = BackoffPolicy.constantBackoff(millis, count) } } @@ -120,7 +120,7 @@ class RollupIndexer( is InternalMin -> aggResults[it.name] = it.value is InternalValueCount -> aggResults[it.name] = it.value is InternalAvg -> aggResults[it.name] = it.value - else -> throw IllegalStateException("Found aggregation in composite result that is not supported [${it.type} - ${it.name}]") + else -> error("Found aggregation in composite result that is not supported [${it.type} - ${it.name}]") } } mapOfKeyValues.putAll(aggResults) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt index 016d2a27e..9dba61b5c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt @@ -293,7 +293,7 @@ object RollupRunner : ) { client.suspendUntil { listener: ActionListener -> execute(GetRollupAction.INSTANCE, GetRollupRequest(updatableJob.id, null, "_local"), listener) - }.rollup ?: throw IllegalStateException("Unable to get rollup job") + }.rollup ?: error("Unable to get rollup job") } } is RollupResult.Failure -> { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt index 460b8292d..69b27074e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt @@ -46,7 +46,7 @@ class RollupSearchService( init { clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_SEARCH_BACKOFF_MILLIS, ROLLUP_SEARCH_BACKOFF_COUNT) { - millis, count -> + millis, count -> retrySearchPolicy = BackoffPolicy.constantBackoff(millis, count) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt index 6dcfd2bae..e67837c7d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/actionfilter/FieldCapsFilter.kt @@ -41,8 +41,7 @@ class FieldCapsFilter( @Volatile private var shouldIntercept = RollupSettings.ROLLUP_DASHBOARDS.get(settings) init { - clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) { - flag -> + clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) { flag -> shouldIntercept = flag } } @@ -59,7 +58,7 @@ class FieldCapsFilter( val rollupIndices = mutableSetOf() val nonRollupIndices = mutableSetOf() val remoteClusterIndices = GuiceHolder.remoteClusterService.groupIndices(request.indicesOptions(), indices) { - idx: String? -> + idx: String? -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterService.state()) } val localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) @@ -102,7 +101,9 @@ class FieldCapsFilter( } chain.proceed( - task, action, request, + task, + action, + request, object : ActionListener { override fun onResponse(response: Response) { logger.info("Has rollup indices will rewrite field caps response") @@ -192,10 +193,15 @@ class FieldCapsFilter( } val isSearchable = fieldMapping.fieldType == RollupFieldMapping.Companion.FieldType.DIMENSION response[fieldName]!![type] = FieldCapabilities( - fieldName, type, isSearchable, true, + fieldName, + type, + isSearchable, + true, fieldMappingIndexMap.getValue(fieldMapping) .toTypedArray(), - null, null, mapOf>() + null, + null, + mapOf>() ) } @@ -267,10 +273,15 @@ class FieldCapsFilter( val fieldCaps = fields.getValue(field).getValue(type) val rewrittenIndices = if (fieldCaps.indices() != null && fieldCaps.indices().isNotEmpty()) fieldCaps.indices() else indices expandedResponse[field]!![type] = FieldCapabilities( - fieldCaps.name, fieldCaps.type, fieldCaps.isSearchable, + fieldCaps.name, + fieldCaps.type, + fieldCaps.isSearchable, fieldCaps .isAggregatable, - rewrittenIndices, fieldCaps.nonSearchableIndices(), fieldCaps.nonAggregatableIndices(), fieldCaps.meta() + rewrittenIndices, + fieldCaps.nonSearchableIndices(), + fieldCaps.nonAggregatableIndices(), + fieldCaps.meta() ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index 12e090df7..2bbfd6092 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -143,7 +143,7 @@ inline fun Rollup.findMatchingMetricField(field: String): String { } } } - throw IllegalStateException("Did not find matching rollup metric") + error("Did not find matching rollup metric") } @Suppress("NestedBlockDepth", "ComplexMethod") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 0c4e5614d..4c83c4333 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -49,7 +49,7 @@ class TransformIndexer( TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT ) { - millis, count -> + millis, count -> backoffPolicy = BackoffPolicy.constantBackoff(millis, count) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 3e02c7d46..610538019 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -75,7 +75,7 @@ class TransformSearchService( init { clusterService.clusterSettings.addSettingsUpdateConsumer(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS, TRANSFORM_JOB_SEARCH_BACKOFF_COUNT) { - millis, count -> + millis, count -> backoffPolicy = BackoffPolicy.constantBackoff(millis, count) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformValidator.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformValidator.kt index f6399207c..551f761a1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformValidator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformValidator.kt @@ -24,7 +24,6 @@ import org.opensearch.indexmanagement.transform.model.TransformValidationResult import org.opensearch.indexmanagement.transform.settings.TransformSettings import org.opensearch.monitor.jvm.JvmService import org.opensearch.transport.RemoteTransportException -import java.lang.IllegalStateException @Suppress("SpreadOperator", "ReturnCount", "ThrowsCount") class TransformValidator( @@ -93,7 +92,7 @@ class TransformValidator( private suspend fun validateIndex(index: String, transform: Transform): List { val request = GetMappingsRequest().indices(index) val result: GetMappingsResponse = - client.admin().indices().suspendUntil { getMappings(request, it) } ?: throw IllegalStateException( + client.admin().indices().suspendUntil { getMappings(request, it) } ?: error( "GetMappingResponse for [$index] was null" ) return validateMappingsResponse(index, result, transform) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/TransformMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/TransformMetadata.kt index a3dbc2060..6f96cbd9b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/TransformMetadata.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/TransformMetadata.kt @@ -148,7 +148,7 @@ data class TransformMetadata( TRANSFORM_ID_FIELD -> transformId = xcp.text() AFTER_KEY_FIELD -> afterkey = xcp.map() LAST_UPDATED_AT_FIELD -> lastUpdatedAt = xcp.instant() - STATUS_FIELD -> status = Status.valueOf(xcp.text().toUpperCase(Locale.ROOT)) + STATUS_FIELD -> status = Status.valueOf(xcp.text().uppercase(Locale.ROOT)) FAILURE_REASON -> failureReason = xcp.textOrNull() STATS_FIELD -> stats = TransformStats.parse(xcp) SHARD_ID_TO_GLOBAL_CHECKPOINT_FIELD -> diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index 99149cdd9..7d6d77935 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -237,11 +237,11 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { // assuming our ingestion is randomly split between the 20 primary shards // then 250kb/20 gives around 12.5kb per primary shard which is below our 100kb condition - val KB_250 = 250_000 + val kb250 = 250_000 var primaryStoreSizeBytes = 0 var count = 0 // Ingest data into the test index until the total size of the index is greater than our min primary size condition - while (primaryStoreSizeBytes < KB_250) { + while (primaryStoreSizeBytes < kb250) { // this count should never get as high as 10... if it does just fail the test if (count++ > 10) fail("Something is wrong with the data ingestion for testing rollover condition") insertSampleData(index = firstIndex, docCount = 20, jsonString = "{ \"test_field\": \"${OpenSearchTestCase.randomAlphaOfLength(7000)}\" }", delay = 0) @@ -271,12 +271,12 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { assertThat("Did not have min size current", minPrimarySize["current"], isA(String::class.java)) } - val KB_150 = 150_000 + val kb150 = 150_000 var primaryShardSizeBytes = 0 count = 0 // Ingest data into the test index using custom routing so it always goes to a single shard until the size of the // primary shard is over 150kb - while (primaryShardSizeBytes < KB_150) { + while (primaryShardSizeBytes < kb150) { // this count should never get as high as 10... if it does just fail the test if (count++ > 10) fail("Something is wrong with the data ingestion for testing rollover condition") insertSampleData(index = firstIndex, docCount = 20, delay = 0, jsonString = "{ \"test_field\": \"${OpenSearchTestCase.randomAlphaOfLength(7000)}\" }", routing = "custom_routing") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt index 2a92ea6e0..76ef23577 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt @@ -20,8 +20,8 @@ import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator import org.opensearch.indexmanagement.indexstatemanagement.MetadataService +import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.migration.ISMTemplateService import org.opensearch.test.ClusterServiceUtils import org.opensearch.test.OpenSearchTestCase import org.opensearch.threadpool.Scheduler @@ -76,8 +76,14 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { clusterService = Mockito.spy(originClusterService) indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) coordinator = ManagedIndexCoordinator( - settings, client, clusterService, threadPool, indexManagementIndices, metadataService, - templateService, indexMetadataProvider + settings, + client, + clusterService, + threadPool, + indexManagementIndices, + metadataService, + templateService, + indexMetadataProvider ) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponseTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponseTests.kt index 3df655682..079cf6cd2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponseTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponseTests.kt @@ -22,34 +22,34 @@ class RefreshSearchAnalyzerResponseTests : OpenSearchTestCase() { val i2s0 = ShardId(index2, "xyz", 0) val i2s1 = ShardId(index2, "xyz", 1) - val response_i1s0 = RefreshSearchAnalyzerShardResponse(i1s0, listOf(syn1, syn2)) - val response_i1s1 = RefreshSearchAnalyzerShardResponse(i1s1, listOf(syn1, syn2)) - val response_i2s0 = RefreshSearchAnalyzerShardResponse(i2s0, listOf(syn1)) - val response_i2s1 = RefreshSearchAnalyzerShardResponse(i2s1, listOf(syn1)) - val failure_i1s0 = DefaultShardOperationFailedException(index1, 0, Throwable("dummyCause")) - val failure_i1s1 = DefaultShardOperationFailedException(index1, 1, Throwable("dummyCause")) - val failure_i2s0 = DefaultShardOperationFailedException(index2, 0, Throwable("dummyCause")) - val failure_i2s1 = DefaultShardOperationFailedException(index2, 1, Throwable("dummyCause")) + val responseI1s0 = RefreshSearchAnalyzerShardResponse(i1s0, listOf(syn1, syn2)) + val responseI1s1 = RefreshSearchAnalyzerShardResponse(i1s1, listOf(syn1, syn2)) + val responseI2s0 = RefreshSearchAnalyzerShardResponse(i2s0, listOf(syn1)) + val responseI2s1 = RefreshSearchAnalyzerShardResponse(i2s1, listOf(syn1)) + val failureI1s0 = DefaultShardOperationFailedException(index1, 0, Throwable("dummyCause")) + val failureI1s1 = DefaultShardOperationFailedException(index1, 1, Throwable("dummyCause")) + val failureI2s0 = DefaultShardOperationFailedException(index2, 0, Throwable("dummyCause")) + val failureI2s1 = DefaultShardOperationFailedException(index2, 1, Throwable("dummyCause")) // Case 1: All shards successful - var aggregate_response = listOf(response_i1s0, response_i1s1, response_i2s0, response_i2s1) - var aggregate_failures = listOf() - var refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 4, 0, aggregate_failures, aggregate_response) + var aggregateResponse = listOf(responseI1s0, responseI1s1, responseI2s0, responseI2s1) + var aggregateFailures = listOf() + var refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 4, 0, aggregateFailures, aggregateResponse) var successfulIndices = refreshSearchAnalyzerResponse.getSuccessfulRefreshDetails() Assert.assertTrue(successfulIndices.containsKey(index1)) Assert.assertTrue(successfulIndices.containsKey(index2)) // Case 2: All shards failed - aggregate_response = listOf() - aggregate_failures = listOf(failure_i1s0, failure_i1s1, failure_i2s0, failure_i2s1) - refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 0, 4, aggregate_failures, aggregate_response) + aggregateResponse = listOf() + aggregateFailures = listOf(failureI1s0, failureI1s1, failureI2s0, failureI2s1) + refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 0, 4, aggregateFailures, aggregateResponse) successfulIndices = refreshSearchAnalyzerResponse.getSuccessfulRefreshDetails() Assert.assertTrue(successfulIndices.isEmpty()) // Case 3: Some shards of an index fail, while some others succeed - aggregate_response = listOf(response_i1s1, response_i2s0, response_i2s1) - aggregate_failures = listOf(failure_i1s0) - refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 3, 1, aggregate_failures, aggregate_response) + aggregateResponse = listOf(responseI1s1, responseI2s0, responseI2s1) + aggregateFailures = listOf(failureI1s0) + refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 3, 1, aggregateFailures, aggregateResponse) successfulIndices = refreshSearchAnalyzerResponse.getSuccessfulRefreshDetails() Assert.assertTrue(successfulIndices.containsKey(index2)) Assert.assertFalse(successfulIndices.containsKey(index1))