From 34d1868b24e895ded9101341a2b2a6190ce95aa2 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Thu, 3 Nov 2022 01:29:13 +0100 Subject: [PATCH] alias in rollup target_index field (#445) * added support for mustache scripting of rollup.target_index field (#435) * tests * small refactor/improvements * added wildcard check when creating rollup job; removed resolving targetIndex on Rollup init; added test for wildcards * lint fixes * moved target_index validation in getRollup resp handler * removed catch block * fixed IT fail * added Exception catch block Signed-off-by: Petar Dzepina (cherry picked from commit 70cf4eadb28571b14da5252e7c7d98aade2867d5) --- .../indexmanagement/IndexManagementPlugin.kt | 2 +- .../rollup/RollupMapperService.kt | 140 +++++- .../rollup/settings/RollupSettings.kt | 2 +- .../RollupFieldValueExpressionResolver.kt | 47 +- .../rollup/util/RollupUtils.kt | 4 + .../rollup/runner/RollupRunnerIT.kt | 444 ++++++++++++++++++ ...RollupFieldValueExpressionResolverTests.kt | 20 +- 7 files changed, 645 insertions(+), 14 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index abc9dc3ce..7ed68c9bd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -372,7 +372,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin this.indexNameExpressionResolver = indexNameExpressionResolver val skipFlag = SkipExecution(client) - RollupFieldValueExpressionResolver.registerScriptService(scriptService) + RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService) val rollupRunner = RollupRunner .registerClient(client) .registerClusterService(clusterService) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt index 0830d60bb..473844865 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt @@ -14,6 +14,8 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client @@ -21,6 +23,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.metadata.MappingMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Histogram @@ -33,7 +36,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver +import org.opensearch.indexmanagement.rollup.util.getRollupJobs import org.opensearch.indexmanagement.rollup.util.isRollupIndex +import org.opensearch.indexmanagement.rollup.util.isTargetIndexAlias import org.opensearch.indexmanagement.util.IndexUtils.Companion._META import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings import org.opensearch.transport.RemoteTransportException @@ -50,14 +55,32 @@ class RollupMapperService( private val logger = LogManager.getLogger(javaClass) - // If the index already exists we need to verify it's a rollup index, - // confirm it does not conflict with existing jobs and is a valid job + /** + * If the index already exists we need to verify it's a rollup index, + * confirm it does not conflict with existing jobs and is a valid job. + * If + * + * @param rollup Rollup job we're currently executing + * @param targetIndexResolvedName concrete index name + * @param hasLegacyPlugin flag to indicate if we're running legacy plugin + * @return RollupJobValidationResult indicating success or failure with appropriate error message included. + */ @Suppress("ReturnCount") - private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult { - if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) { + private suspend fun validateAndAttemptToUpdateTargetIndex( + rollup: Rollup, + targetIndexResolvedName: String, + hasLegacyPlugin: Boolean + ): RollupJobValidationResult { + if (rollup.isTargetIndexAlias()) { + val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName) + if (aliasValidationResult !is RollupJobValidationResult.Valid) { + return aliasValidationResult + } else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) { + return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin) + } + } else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) { return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index") } - return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) { is RollupJobValidationResult.Valid -> jobExistsResult is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName) @@ -65,6 +88,59 @@ class RollupMapperService( } } + /** + * Target Index is valid alias if either all backing indices have this job in _meta + * or there isn't any rollup job present in _meta + */ + @Suppress("ReturnCount") + suspend fun validateTargetIndexAlias(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult { + + var errorMessage: String + + if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) { + logger.error("[${rollup.targetIndex}] is not an alias!") + return RollupJobValidationResult.Failure("[${rollup.targetIndex}] is not an alias!") + } + + val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs() + if (rollupJobs != null && + (rollupJobs.size > 1 || rollupJobs[0].id != rollup.id) + ) { + errorMessage = "More than one rollup job present on the backing index, cannot add alias for target index: [$targetIndexResolvedName]" + logger.error(errorMessage) + return RollupJobValidationResult.Failure(errorMessage) + } + + // All other backing indices have to have this rollup job in _META field and it has to be the only one! + val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex) + backingIndices?.forEach { + if (it.index.name != targetIndexResolvedName) { + val allRollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs() + val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, allRollupJobs) + if (validationResult !is RollupJobValidationResult.Valid) { + return validationResult + } + } + } + return RollupJobValidationResult.Valid + } + + suspend fun validateNonWriteBackingIndex(backingIndex: String, currentRollupJob: Rollup, rollupJobs: List?): RollupJobValidationResult { + var errorMessage = "" + if (rollupJobs == null) { + errorMessage = "Backing index [$backingIndex] has to have owner rollup job with id:[${currentRollupJob.id}]" + } else if (rollupJobs.size == 1 && rollupJobs[0].id != currentRollupJob.id) { + errorMessage = "Backing index [$backingIndex] has to have owner rollup job with id:[${currentRollupJob.id}]" + } else if (rollupJobs.size > 1) { + errorMessage = "Backing index [$backingIndex] has multiple rollup job owners" + } + if (errorMessage.isNotEmpty()) { + logger.error(errorMessage) + return RollupJobValidationResult.Failure(errorMessage) + } + return RollupJobValidationResult.Valid + } + // This creates the target index if it doesn't already else validate the target index is rollup index // If the target index mappings doesn't contain rollup job attempts to update the mappings. // TODO: error handling @@ -72,7 +148,12 @@ class RollupMapperService( suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult { val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex) if (indexExists(targetIndexResolvedName)) { - return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName) + val validationResult = validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin) + when (validationResult) { + is RollupJobValidationResult.Failure -> logger.error(validationResult.message) + is RollupJobValidationResult.Invalid -> logger.error(validationResult.reason) + } + return validationResult } else { val errorMessage = "Failed to create target index [$targetIndexResolvedName]" return try { @@ -96,6 +177,53 @@ class RollupMapperService( } } + suspend fun addRollupSettingToIndex(targetIndexResolvedName: String, hasLegacyPlugin: Boolean): Boolean { + val settings = if (hasLegacyPlugin) { + Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build() + } else { + Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build() + } + val resp: AcknowledgedResponse = client.admin().indices().suspendUntil { + updateSettings(UpdateSettingsRequest(settings, targetIndexResolvedName), it) + } + return resp.isAcknowledged + } + @Suppress("ReturnCount") + suspend fun prepareTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult { + var errorMessage = "" + try { + // 1. First we need to add index.plugins.rollup_index setting to index + if (addRollupSettingToIndex(targetIndexResolvedName, hasLegacyPlugin) == false) { + logger.error("Failed to update rollup settings for target index: [$targetIndexResolvedName]") + return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]") + } + + // 2. Put rollup target_index mappings + val putMappingRequest: PutMappingRequest = + PutMappingRequest(targetIndexResolvedName).source(IndexManagementIndices.rollupTargetMappings, XContentType.JSON) + val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil { + putMapping(putMappingRequest, it) + } + if (!respMappings.isAcknowledged) { + return RollupJobValidationResult.Invalid("Failed to put initial rollup mappings for target index [$targetIndexResolvedName]") + } + // 3. Add this rollup job to target_index's _meta + errorMessage = "Failed to update mappings for target index [$targetIndexResolvedName]" + updateRollupIndexMappings(rollup, targetIndexResolvedName) + } catch (e: RemoteTransportException) { + val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception + logger.error(errorMessage, unwrappedException) + RollupJobValidationResult.Failure(errorMessage, unwrappedException) + } catch (e: OpenSearchSecurityException) { + logger.error("$errorMessage because ", e) + RollupJobValidationResult.Failure("$errorMessage - missing required cluster permissions: ${e.localizedMessage}", e) + } catch (e: Exception) { + logger.error("$errorMessage because ", e) + RollupJobValidationResult.Failure(errorMessage, e) + } + return RollupJobValidationResult.Valid + } + private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse { val settings = if (hasLegacyPlugin) { Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt index f5a2fd0d5..d0464bd34 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt @@ -39,7 +39,7 @@ class RollupSettings { "index.plugins.rollup_index", LegacyOpenDistroRollupSettings.ROLLUP_INDEX, Setting.Property.IndexScope, - Setting.Property.InternalIndex + Setting.Property.Dynamic ) val ROLLUP_INGEST_BACKOFF_MILLIS: Setting = Setting.positiveTimeSetting( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt index c28db5704..fe2c38801 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -5,6 +5,9 @@ package org.opensearch.indexmanagement.rollup.util +import org.opensearch.cluster.metadata.IndexAbstraction +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.XContentFactory import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE import org.opensearch.indexmanagement.opensearchapi.toMap @@ -19,7 +22,8 @@ object RollupFieldValueExpressionResolver { private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD) private lateinit var scriptService: ScriptService - + private lateinit var clusterService: ClusterService + lateinit var indexAliasUtils: IndexAliasUtils fun resolve(rollup: Rollup, fieldValue: String): String { val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf()) @@ -27,14 +31,49 @@ object RollupFieldValueExpressionResolver { .toMap() .filterKeys { key -> key in validTopContextFields } - val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT) + var compiledValue = scriptService.compile(script, TemplateScript.CONTEXT) .newInstance(script.params + mapOf("ctx" to contextMap)) .execute() - return if (compiledValue.isBlank()) fieldValue else compiledValue + if (indexAliasUtils.isAlias(compiledValue)) { + compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue) + } + + return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue + } + + fun registerServices(scriptService: ScriptService, clusterService: ClusterService) { + this.scriptService = scriptService + this.clusterService = clusterService + this.indexAliasUtils = IndexAliasUtils(clusterService) } - fun registerScriptService(scriptService: ScriptService) { + fun registerServices(scriptService: ScriptService, clusterService: ClusterService, indexAliasUtils: IndexAliasUtils) { this.scriptService = scriptService + this.clusterService = clusterService + this.indexAliasUtils = indexAliasUtils + } + + open class IndexAliasUtils(val clusterService: ClusterService) { + + open fun hasAlias(index: String): Boolean { + val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases + if (aliases != null) { + return aliases.size() > 0 + } + return false + } + + open fun isAlias(index: String): Boolean { + return this.clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias + } + + open fun getWriteIndexNameForAlias(alias: String): String? { + return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name + } + + open fun getBackingIndicesForAlias(alias: String): MutableList? { + return this.clusterService.state().metadata().indicesLookup?.get(alias)?.indices + } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index 988fa9643..7ca16858c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -73,6 +73,10 @@ fun isRollupIndex(index: String, clusterState: ClusterState): Boolean { return false } +fun Rollup.isTargetIndexAlias(): Boolean { + return RollupFieldValueExpressionResolver.indexAliasUtils.isAlias(targetIndex) +} + fun Rollup.getRollupSearchRequest(metadata: RollupMetadata): SearchRequest { val query = if (metadata.continuous != null) { RangeQueryBuilder(this.getDateHistogram().sourceField) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 60898e7bb..51ca788b0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -5,11 +5,16 @@ package org.opensearch.indexmanagement.rollup.runner +import com.carrotsearch.randomizedtesting.RandomizedTest.sleep import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity +import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS +import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.indexmanagement.rollup.model.Rollup @@ -800,6 +805,445 @@ class RollupRunnerIT : RollupRestTestCase() { assertTrue("Did not spend time searching", rollupMetadata.stats.searchTimeInMillis > 0L) } + fun `test rollup action with alias as target_index successfully`() { + generateNYCTaxiData("source_runner_sixth_eleventh_1") + + // Create index with alias, without mappings + val indexAlias = "alias_as_target_index" + val backingIndex = "backing_target_index" + val builtSettings = Settings.builder().let { + it.put(INDEX_NUMBER_OF_REPLICAS, "1") + it.put(INDEX_NUMBER_OF_SHARDS, "1") + it + }.build() + val aliases = "\"$indexAlias\": { \"is_write_index\": true }" + createIndex(backingIndex, builtSettings, null, aliases) + + refreshAllIndices() + + val rollup = Rollup( + id = "runner_with_alias_as_target", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic change of page size", + sourceIndex = "source_runner_sixth_eleventh_1", + targetIndex = indexAlias, + metadataID = null, + roles = emptyList(), + pageSize = 1000, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", + targetField = "passenger_count", + metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average()) + ) + ) + ).let { createRollup(it, it.id) } + + // First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META + updateRollupStartTime(rollup) + + waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex)) } + + var startedRollup = waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + rollupJob + } + var rollupMetadataID = startedRollup.metadataID!! + var rollupMetadata = getRollupMetadata(rollupMetadataID) + assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + + // restart job + client().makeRequest( + "PUT", + "$ROLLUP_JOBS_BASE_URI/${startedRollup.id}?if_seq_no=${startedRollup.seqNo}&if_primary_term=${startedRollup.primaryTerm}", + emptyMap(), rollup.copy(enabled = true).toHttpEntity() + ) + // Second run, backing index is setup just like any other rollup index + updateRollupStartTime(rollup) + + startedRollup = waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + rollupJob + } + + rollupMetadataID = startedRollup.metadataID!! + rollupMetadata = getRollupMetadata(rollupMetadataID) + + assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + } + + fun `test rollup action with alias as target_index with multiple backing indices successfully`() { + generateNYCTaxiData("source_runner_sixth_29932") + + // Create index with alias, without mappings + val indexAlias = "alias_as_target_index_2" + val backingIndex1 = "backing_target_index1-000001" + val backingIndex2 = "backing_target_index1-000002" + val builtSettings = Settings.builder().let { + it.put(INDEX_NUMBER_OF_REPLICAS, "1") + it.put(INDEX_NUMBER_OF_SHARDS, "1") + it + }.build() + val aliases = "\"$indexAlias\": { \"is_write_index\": true }" + createIndex(backingIndex1, builtSettings, null, aliases) + + refreshAllIndices() + + val rollup = Rollup( + id = "page_size_runner_sixth_2", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic change of page size", + sourceIndex = "source_runner_sixth_29932", + targetIndex = indexAlias, + metadataID = null, + roles = emptyList(), + pageSize = 1000, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", + targetField = "passenger_count", + metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average()) + ) + ) + ).let { createRollup(it, it.id) } + + // First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META + updateRollupStartTime(rollup) + + waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex1)) } + + var startedRollup = waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + assertTrue("Rollup is not disabled", !rollupJob.enabled) + rollupJob + } + var rollupMetadataID = startedRollup.metadataID!! + var rollupMetadata = getRollupMetadata(rollupMetadataID) + assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + + // do rollover on alias + val rolloverResponse = client().makeRequest("POST", "/$indexAlias/_rollover") + assertEquals(RestStatus.OK, rolloverResponse.restStatus()) + waitFor { assertTrue("index was not created after rollover", indexExists(backingIndex2)) } + + // restart job + client().makeRequest( + "PUT", + "$ROLLUP_JOBS_BASE_URI/${startedRollup.id}?if_seq_no=${startedRollup.seqNo}&if_primary_term=${startedRollup.primaryTerm}", + emptyMap(), rollup.copy(enabled = true).toHttpEntity() + ) + // Second run, backing index is setup just like any other rollup index + updateRollupStartTime(rollup) + + startedRollup = waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + rollupJob + } + + rollupMetadataID = startedRollup.metadataID!! + rollupMetadata = getRollupMetadata(rollupMetadataID) + + assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + } + + /** + * Index with alias is created and job1 successfully ran first time. + * Then Job2 ran on first backing index once and made this alias invalid for further use by any rollup job + */ + fun `test rollup action with alias as target_index with multiple backing indices failed`() { + generateNYCTaxiData("source_runner_sixth_2123") + + // Create index with alias, without mappings + val indexAlias = "alias_as_target_index_failed" + val backingIndex1 = "backing_target_index1_f-000001" + val backingIndex2 = "backing_target_index1_f-000002" + val builtSettings = Settings.builder().let { + it.put(INDEX_NUMBER_OF_REPLICAS, "1") + it.put(INDEX_NUMBER_OF_SHARDS, "1") + it + }.build() + val aliases = "\"$indexAlias\": { \"is_write_index\": true }" + createIndex(backingIndex1, builtSettings, null, aliases) + + refreshAllIndices() + + val job1 = Rollup( + id = "rollup_with1_alias_1", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic change of page size", + sourceIndex = "source_runner_sixth_2123", + targetIndex = indexAlias, + metadataID = null, + roles = emptyList(), + pageSize = 1000, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", + targetField = "passenger_count", + metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average()) + ) + ) + ).let { createRollup(it, it.id) } + + // First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META + updateRollupStartTime(job1) + + waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex1)) } + + var startedRollup1 = waitFor { + val rollupJob = getRollup(rollupId = job1.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + assertTrue("Rollup is not disabled", !rollupJob.enabled) + rollupJob + } + var rollupMetadataID = startedRollup1.metadataID!! + var rollupMetadata = getRollupMetadata(rollupMetadataID) + assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + + // Run job #2 on same target_index + val job2 = job1.copy(id = "some_other_job_999", targetIndex = backingIndex1) + .let { createRollup(it, it.id) } + + // Job2 First run, it should add itself to _meta in the same index job1 did. + updateRollupStartTime(job2) + + var startedRollup2 = waitFor { + val rollupJob = getRollup(rollupId = job2.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + assertTrue("Rollup is not disabled", !rollupJob.enabled) + rollupJob + } + rollupMetadataID = startedRollup2.metadataID!! + rollupMetadata = getRollupMetadata(rollupMetadataID) + assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + + // do rollover on alias + val rolloverResponse = client().makeRequest("POST", "/$indexAlias/_rollover") + assertEquals(RestStatus.OK, rolloverResponse.restStatus()) + waitFor { assertTrue("index was not created after rollover", indexExists(backingIndex2)) } + + refreshAllIndices() + + // restart job #1 + client().makeRequest( + "PUT", + "$ROLLUP_JOBS_BASE_URI/${startedRollup1.id}?if_seq_no=${startedRollup1.seqNo}&if_primary_term=${startedRollup1.primaryTerm}", + emptyMap(), job1.copy(enabled = true).toHttpEntity() + ) + // Second run, backing index is setup just like any other rollup index + updateRollupStartTime(job1) + + startedRollup1 = waitFor { + val rollupJob = getRollup(rollupId = job1.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata.status) + rollupJob + } + + rollupMetadataID = startedRollup1.metadataID!! + rollupMetadata = getRollupMetadata(rollupMetadataID) + + assertEquals("Backing index [$backingIndex1] has multiple rollup job owners", rollupMetadata.failureReason) + } + + fun `test rollup action with alias as target_index reuse failed`() { + generateNYCTaxiData("source_runner_sixth_2209") + + // Create index with alias, without mappings + val indexAlias = "alias_as_target_index_failed_1" + val backingIndex1 = "backing-000001" + val builtSettings = Settings.builder().let { + it.put(INDEX_NUMBER_OF_REPLICAS, "1") + it.put(INDEX_NUMBER_OF_SHARDS, "1") + it + }.build() + val aliases = "\"$indexAlias\": { \"is_write_index\": true }" + createIndex(backingIndex1, builtSettings, null, aliases) + + refreshAllIndices() + + val job1 = Rollup( + id = "rollup_with_alias_11", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic change of page size", + sourceIndex = "source_runner_sixth_2209", + targetIndex = indexAlias, + metadataID = null, + roles = emptyList(), + pageSize = 1000, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", + targetField = "passenger_count", + metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average()) + ) + ) + ).let { createRollup(it, it.id) } + + // First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META + updateRollupStartTime(job1) + + waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex1)) } + + var startedRollup1 = waitFor { + val rollupJob = getRollup(rollupId = job1.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + assertTrue("Rollup is not disabled", !rollupJob.enabled) + rollupJob + } + var rollupMetadataID = startedRollup1.metadataID!! + var rollupMetadata = getRollupMetadata(rollupMetadataID) + assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + + // Run job #2 on same target_index alias + val job2 = job1.copy(id = "some_other_job_9991", targetIndex = indexAlias) + .let { createRollup(it, it.id) } + + // Job2 First run, it should fail because job1 already wrote to backing index + updateRollupStartTime(job2) + + var startedRollup2 = waitFor { + val rollupJob = getRollup(rollupId = job2.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata.status) + assertTrue("Rollup is not disabled", !rollupJob.enabled) + rollupJob + } + rollupMetadataID = startedRollup2.metadataID!! + rollupMetadata = getRollupMetadata(rollupMetadataID) + assertEquals("If target_index is alias, write backing index must be used only by this rollup job: [$backingIndex1]", rollupMetadata.failureReason) + } + + fun `test rollup action with alias as target_index multiple empty backing indices`() { + generateNYCTaxiData("source_runner_sixth_1532209") + + // Create index with alias, without mappings + val indexAlias = "alias_as_target_index_failed_19941" + val backingIndex1 = "backing-99000001" + val backingIndex2 = "backing-99000002" + val builtSettings = Settings.builder().let { + it.put(INDEX_NUMBER_OF_REPLICAS, "1") + it.put(INDEX_NUMBER_OF_SHARDS, "1") + it + }.build() + var aliases = "\"$indexAlias\": { \"is_write_index\": true }" + createIndex(backingIndex1, builtSettings, null, aliases) + aliases = "\"$indexAlias\": {}" + createIndex(backingIndex2, builtSettings, null, aliases) + + refreshAllIndices() + + val job1 = Rollup( + id = "rollup_with_alias_99243411", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic change of page size", + sourceIndex = "source_runner_sixth_1532209", + targetIndex = indexAlias, + metadataID = null, + roles = emptyList(), + pageSize = 1000, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", + targetField = "passenger_count", + metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average()) + ) + ) + ).let { createRollup(it, it.id) } + + // First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META + updateRollupStartTime(job1) + + waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex1)) } + + var startedRollup1 = waitFor { + val rollupJob = getRollup(rollupId = job1.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata.status) + assertTrue("Rollup is not disabled", !rollupJob.enabled) + rollupJob + } + var rollupMetadataID = startedRollup1.metadataID!! + var rollupMetadata = getRollupMetadata(rollupMetadataID) + assertEquals("Backing index [$backingIndex2] has to have owner rollup job with id:[${startedRollup1.id}]", rollupMetadata.failureReason) + } + // TODO: Test scenarios: // - Source index deleted after first execution // * If this is with a source index pattern and the underlying indices are recreated but with different data diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt index fff2f2b4d..66a905573 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt @@ -11,6 +11,8 @@ import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import com.nhaarman.mockitokotlin2.doReturn import org.junit.Before +import org.mockito.ArgumentMatchers.anyString +import org.opensearch.cluster.service.ClusterService import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.ingest.TestTemplateService import org.opensearch.script.ScriptService @@ -20,21 +22,35 @@ import org.opensearch.test.OpenSearchTestCase class RollupFieldValueExpressionResolverTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() + private val clusterService: ClusterService = mock() + private var indexAliasUtils: RollupFieldValueExpressionResolver.IndexAliasUtils = mock() @Before fun settings() { - RollupFieldValueExpressionResolver.registerScriptService(scriptService) + this.indexAliasUtils = mock() + RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService, indexAliasUtils) } - fun `test resolving successfully`() { + fun `test resolving no alias successfully`() { whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123")) + whenever(indexAliasUtils.isAlias(anyString())).doReturn(false) val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}") val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex) assertEquals("test_index_123", targetIndexResolvedName) } + fun `test resolving with alias successfully`() { + whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123")) + whenever(indexAliasUtils.isAlias(anyString())).doReturn(true) + whenever(indexAliasUtils.getWriteIndexNameForAlias(anyString())).doReturn("backing_index") + val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}") + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex) + assertEquals("backing_index", targetIndexResolvedName) + } + fun `test resolving failed returned passed value`() { whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("")) + whenever(indexAliasUtils.isAlias(anyString())).doReturn(false) val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}") val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex) assertEquals("{{ctx.source_index}}", targetIndexResolvedName)