Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: petar.dzepina <[email protected]>
  • Loading branch information
petardz committed Aug 5, 2022
1 parent 1f581dd commit ad231f9
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction
import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler
Expand Down Expand Up @@ -370,6 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.metadata.MappingMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Histogram
Expand All @@ -32,6 +35,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
Expand All @@ -52,14 +56,33 @@ class RollupMapperService(
// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup): RollupJobValidationResult {
if (!isRollupIndex(rollup.targetIndex, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [${rollup.targetIndex}] is a non rollup index")
}
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state()) &&
RollupFieldValueExpressionResolver.hasAlias(targetIndexResolvedName)) {

return when (val jobExistsResult = jobExistsInRollupIndex(rollup)) {
val backingIndices = RollupFieldValueExpressionResolver.getBackingIndicesForAlias(targetIndexResolvedName)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
when (val jobExistsResult = jobExistsInRollupIndex(rollup, it.index.name)) {
is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return jobExistsResult
}
}
}
val mappings = getMappings(targetIndexResolvedName)
if (mappings is GetMappingsResult.Failure) {
return RollupJobValidationResult.Failure("Failed to get mappings for target index: $targetIndexResolvedName")
} else if (mappings is GetMappingsResult.Success && mappings.response.mappings()?.get(targetIndexResolvedName)?.sourceAsMap().isNullOrEmpty() == false) {
return RollupJobValidationResult.Failure("If target_index is alias, backing index must be empty: $targetIndexResolvedName")
}
return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
} else {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}
}
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup)
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
is RollupJobValidationResult.Failure -> jobExistsResult
}
}
Expand All @@ -69,14 +92,15 @@ class RollupMapperService(
// TODO: error handling
@Suppress("ReturnCount")
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
if (indexExists(job.targetIndex)) {
return validateAndAttemptToUpdateTargetIndex(job)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
} else {
val errorMessage = "Failed to create target index [${job.targetIndex}]"
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
val response = createTargetIndex(job, hasLegacyPlugin)
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
if (response.isAcknowledged) {
updateRollupIndexMappings(job)
updateRollupIndexMappings(job, targetIndexResolvedName)
} else {
RollupJobValidationResult.Failure(errorMessage)
}
Expand All @@ -94,13 +118,57 @@ class RollupMapperService(
}
}

private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse {
suspend fun prepareTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
var errorMessage = ""
try {
// 1. First we need to add index.plugins.rollup_index setting to index
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
errorMessage = "Failed to update settings for target index [$targetIndexResolvedName]"
val resp: AcknowledgedResponse = client.admin().indices().suspendUntil {
updateSettings(UpdateSettingsRequest(settings, targetIndexResolvedName), it)
}
if (!resp.isAcknowledged) {
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
}

// 2. Put rollup mappings
val putMappingRequest: PutMappingRequest =
PutMappingRequest(targetIndexResolvedName).source(IndexManagementIndices.rollupTargetMappings, XContentType.JSON)
errorMessage = "Failed to put initial rollup mappings for target index [$targetIndexResolvedName]"
val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
putMapping(putMappingRequest, it)
}
if (!respMappings.isAcknowledged) {
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
}
// 3.
errorMessage = "Failed to update mappings for target index [$targetIndexResolvedName]"
updateRollupIndexMappings(rollup, targetIndexResolvedName)
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.error(errorMessage, unwrappedException)
RollupJobValidationResult.Failure(errorMessage, unwrappedException)
} catch (e: OpenSearchSecurityException) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure("$errorMessage - missing required cluster permissions: ${e.localizedMessage}", e)
} catch (e: Exception) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure(errorMessage, e)
}
return RollupJobValidationResult.Valid
}

private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
val request = CreateIndexRequest(job.targetIndex)
val request = CreateIndexRequest(targetIndexName)
.settings(settings)
.mapping(IndexManagementIndices.rollupTargetMappings)
// TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest
Expand Down Expand Up @@ -204,19 +272,19 @@ class RollupMapperService(
return field != null
}

private suspend fun jobExistsInRollupIndex(rollup: Rollup): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(rollup.targetIndex)) {
private suspend fun jobExistsInRollupIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(targetIndexResolvedName)) {
is GetMappingsResult.Success -> getMappingsResult.response
is GetMappingsResult.Failure ->
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
}

val indexMapping: MappingMetadata = res.mappings[rollup.targetIndex]
val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName]

return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
RollupJobValidationResult.Valid
} else {
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [${rollup.targetIndex}]")
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]")
}
}

Expand Down Expand Up @@ -254,8 +322,8 @@ class RollupMapperService(
// where they can both get the same mapping state and only add their own job, meaning one
// of the jobs won't be added to the target index _meta
@Suppress("BlockingMethodInNonBlockingContext", "ReturnCount")
private suspend fun updateRollupIndexMappings(rollup: Rollup): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [${rollup.targetIndex}] with rollup job"
private suspend fun updateRollupIndexMappings(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [$targetIndexResolvedName] with rollup job"
try {
val response = withContext(Dispatchers.IO) {
val resp: AcknowledgedResponse = client.suspendUntil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RollupSettings {
"index.plugins.rollup_index",
LegacyOpenDistroRollupSettings.ROLLUP_INDEX,
Setting.Property.IndexScope,
Setting.Property.InternalIndex
Setting.Property.Dynamic
)

val ROLLUP_INGEST_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.toMap
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript

object RollupFieldValueExpressionResolver {

private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)

private lateinit var scriptService: ScriptService
private lateinit var clusterService: ClusterService
fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
.toMap()
.filterKeys { key -> key in validTopContextFields }

var compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

if (isAlias(compiledValue)) {
compiledValue = getWriteIndexNameForAlias(compiledValue)
}

return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
}

fun registerScriptService(scriptService: ScriptService) {
this.scriptService = scriptService
}
fun hasAlias(index: String): Boolean {
val aliases = clusterService.state().metadata().indices.get(index)?.aliases
if (aliases != null) {
return aliases.size() > 0
}
return false
}
fun isAlias(index: String): Boolean {
return clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
}
fun getWriteIndexNameForAlias(alias: String): String? {
return clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
}

fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
return clusterService.state().metadata().indicesLookup?.get(alias)?.indices
}

fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
this.scriptService = scriptService
this.clusterService = clusterService
}
}

0 comments on commit ad231f9

Please sign in to comment.