From 7a6bade1cdd869bec114e5ad5329448d6f92b27b Mon Sep 17 00:00:00 2001 From: petardz Date: Wed, 10 Aug 2022 20:20:24 +0200 Subject: [PATCH] fixed rollupFieldValueExpressionResolverTests Signed-off-by: Petar Dzepina --- .../rollup/RollupMapperService.kt | 5 +- .../RollupFieldValueExpressionResolver.kt | 46 ++++++++++--------- .../rollup/runner/RollupRunnerIT.kt | 19 +++++--- ...RollupFieldValueExpressionResolverTests.kt | 19 ++++++-- 4 files changed, 56 insertions(+), 33 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt index 05aa5d60f..54832bd18 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt @@ -78,15 +78,16 @@ class RollupMapperService( @Suppress("ReturnCount") suspend fun targetIndexIsValidAlias(rollup: Rollup, targetIndexResolvedName: String): Boolean { - if (!RollupFieldValueExpressionResolver.hasAlias(targetIndexResolvedName)) { + if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) { return false } // All other backing indices have to have this rollup job in _META field - val backingIndices = RollupFieldValueExpressionResolver.getBackingIndicesForAlias(targetIndexResolvedName) + val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(targetIndexResolvedName) backingIndices?.forEach { if (it.index.name != targetIndexResolvedName) { when (jobExistsInRollupIndex(rollup, it.index.name)) { is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return false + else -> {} } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt index 2f4bfd9f8..a2f81c641 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -23,6 +23,7 @@ object RollupFieldValueExpressionResolver { private lateinit var scriptService: ScriptService private lateinit var clusterService: ClusterService + lateinit var indexAliasUtils: IndexAliasUtils fun resolve(rollup: Rollup, fieldValue: String): String { val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf()) @@ -34,36 +35,39 @@ object RollupFieldValueExpressionResolver { .newInstance(script.params + mapOf("ctx" to contextMap)) .execute() - if (isAlias(compiledValue)) { - compiledValue = getWriteIndexNameForAlias(compiledValue) + if (indexAliasUtils.isAlias(compiledValue)) { + compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue) } return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue } - fun registerScriptService(scriptService: ScriptService) { + fun registerServices(scriptService: ScriptService, clusterService: ClusterService) { this.scriptService = scriptService + this.clusterService = clusterService + this.indexAliasUtils = IndexAliasUtils(clusterService) } - fun hasAlias(index: String): Boolean { - val aliases = clusterService.state().metadata().indices.get(index)?.aliases - if (aliases != null) { - return aliases.size() > 0 + + class IndexAliasUtils(val clusterService: ClusterService) { + + fun hasAlias(index: String): Boolean { + val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases + if (aliases != null) { + return aliases.size() > 0 + } + return false } - return false - } - fun isAlias(index: String): Boolean { - return clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias - } - fun getWriteIndexNameForAlias(alias: String): String? { - return clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name - } - fun getBackingIndicesForAlias(alias: String): MutableList? { - return clusterService.state().metadata().indicesLookup?.get(alias)?.indices - } + fun isAlias(index: String): Boolean { + return this.clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias + } - fun registerServices(scriptService: ScriptService, clusterService: ClusterService) { - this.scriptService = scriptService - this.clusterService = clusterService + fun getWriteIndexNameForAlias(alias: String): String? { + return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name + } + + fun getBackingIndicesForAlias(alias: String): MutableList? { + return this.clusterService.state().metadata().indicesLookup?.get(alias)?.indices + } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index a62158c83..51b1a0d72 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -734,7 +734,7 @@ class RollupRunnerIT : RollupRestTestCase() { } fun `test rollup action with alias as target_index successfully`() { - generateNYCTaxiData("source_runner_sixth") + generateNYCTaxiData("source_runner_sixth_eleventh") // Create index with alias, without mappings val indexAlias = "alias_as_target_index" @@ -750,14 +750,14 @@ class RollupRunnerIT : RollupRestTestCase() { refreshAllIndices() val rollup = Rollup( - id = "page_size_runner_sixth", + id = "runner_with_alias_as_target", schemaVersion = 1L, enabled = true, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), jobLastUpdatedTime = Instant.now(), jobEnabledTime = Instant.now(), description = "basic change of page size", - sourceIndex = "source_runner_sixth", + sourceIndex = "source_runner_sixth_eleventh", targetIndex = indexAlias, metadataID = null, roles = emptyList(), @@ -774,6 +774,7 @@ class RollupRunnerIT : RollupRestTestCase() { ) ).let { createRollup(it, it.id) } + // First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META updateRollupStartTime(rollup) waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex)) } @@ -785,6 +786,9 @@ class RollupRunnerIT : RollupRestTestCase() { assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) rollupJob } + var rollupMetadataID = startedRollup.metadataID!! + var rollupMetadata = getRollupMetadata(rollupMetadataID) + assertEquals("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) // restart job client().makeRequest( @@ -792,6 +796,8 @@ class RollupRunnerIT : RollupRestTestCase() { "$ROLLUP_JOBS_BASE_URI/${startedRollup.id}?if_seq_no=${startedRollup.seqNo}&if_primary_term=${startedRollup.primaryTerm}", emptyMap(), rollup.copy(enabled = true).toHttpEntity() ) + // Second run, backing index is setup just like any other rollup index + updateRollupStartTime(rollup) startedRollup = waitFor { val rollupJob = getRollup(rollupId = rollup.id) @@ -801,11 +807,10 @@ class RollupRunnerIT : RollupRestTestCase() { rollupJob } - val rollupMetadataID = startedRollup.metadataID!! - val rollupMetadata = getRollupMetadata(rollupMetadataID) + rollupMetadataID = startedRollup.metadataID!! + rollupMetadata = getRollupMetadata(rollupMetadataID) - // Randomly choosing 100.. if it didn't work we'd either fail hitting the timeout in waitFor or we'd have thousands of pages processed - assertTrue("Did not have less than 100 pages processed", rollupMetadata.stats.documentsProcessed > 0) + assertEquals("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) } // TODO: Test scenarios: diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt index fff2f2b4d..d122008f6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt @@ -11,6 +11,7 @@ import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import com.nhaarman.mockitokotlin2.doReturn import org.junit.Before +import org.opensearch.cluster.service.ClusterService import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.ingest.TestTemplateService import org.opensearch.script.ScriptService @@ -20,19 +21,31 @@ import org.opensearch.test.OpenSearchTestCase class RollupFieldValueExpressionResolverTests : OpenSearchTestCase() { private val scriptService: ScriptService = mock() - + private val clusterService: ClusterService = mock() + private val indexAliasUtils: RollupFieldValueExpressionResolver.IndexAliasUtils = mock() @Before fun settings() { - RollupFieldValueExpressionResolver.registerScriptService(scriptService) + RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService) + clusterService.state() } - fun `test resolving successfully`() { + fun `test resolving no alias successfully`() { whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123")) + whenever(indexAliasUtils.hasAlias(any())).doReturn(false) val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}") val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex) assertEquals("test_index_123", targetIndexResolvedName) } + fun `test resolving with alias successfully`() { + whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123")) + whenever(indexAliasUtils.hasAlias(any())).doReturn(true) + whenever(indexAliasUtils.getWriteIndexNameForAlias(any())).doReturn("backing_index") + val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}") + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex) + assertEquals("backing_index", targetIndexResolvedName) + } + fun `test resolving failed returned passed value`() { whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("")) val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")