Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alias in rollup target_index field #445

Merged
merged 20 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.metadata.MappingMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Histogram
Expand All @@ -33,7 +36,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getRollupJobs
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.rollup.util.isTargetIndexAlias
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
import org.opensearch.transport.RemoteTransportException
Expand All @@ -50,29 +55,105 @@ class RollupMapperService(

private val logger = LogManager.getLogger(javaClass)

// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
/**
* If the index already exists we need to verify it's a rollup index,
* confirm it does not conflict with existing jobs and is a valid job.
* If
*
* @param rollup Rollup job we're currently executing
* @param targetIndexResolvedName concrete index name
* @param hasLegacyPlugin flag to indicate if we're running legacy plugin
* @return RollupJobValidationResult indicating success or failure with appropriate error message included.
*/
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
private suspend fun validateAndAttemptToUpdateTargetIndex(
rollup: Rollup,
targetIndexResolvedName: String,
Comment on lines +70 to +71
Copy link
Contributor

@khushbr khushbr Oct 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of documentation chages:

  1. Let us add KDoc for description of parameters and return values. Explain the various terminology and any assumptions we are making with their operation.
  2. rename rollup to rollupJob
  3. rename targetIndexResolvedName

Copy link
Contributor Author

@petardz petardz Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. done
  2. "rollup" is used throughout all Rollup related files. We should probably rename all or none.
  3. What do you suggest? This is called "resolved name" since it can be resolved to concrete index name from mustache script or from alias

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I was thinking of calling it plainly as concreteIndex ?

hasLegacyPlugin: Boolean
): RollupJobValidationResult {
if (rollup.isTargetIndexAlias()) {
val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName)
if (aliasValidationResult !is RollupJobValidationResult.Valid) {
return aliasValidationResult
} else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more on this if condition? From what I gather:
If Target Index is an alias and the alias is not valid and the target index isn't a Rollup Index, then => Prepare Target Index.

These conditions seem orthogonal and don't make a coherent sense of what we are trying to do here. Are we saying this function will validate and update and create a Target index? It seems too many responsibilities present within a single function.

Copy link
Contributor Author

@petardz petardz Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Target Index is an alias and the alias IS valid and the target index isn't a Rollup Index

Only then we would "prepare target_index"

isRollupIndex would check for existence of rollup setting in index. This will be false if we're at first job run, because in case of alias, user setup index and not us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding the context.
Can we add all these cases to validateAndAttemptToUpdateTargetIndex() documentation? We are appending new alias feature on top of the existing functionality in `validateAndAttemptToUpdateTargetIndex(), so let us update the KDoc for this function.

return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
}
} else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}

return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
is RollupJobValidationResult.Failure -> jobExistsResult
}
}

/**
* Target Index is valid alias if either all backing indices have this job in _meta
* or there isn't any rollup job present in _meta
*/
@Suppress("ReturnCount")
suspend fun validateTargetIndexAlias(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {

var errorMessage: String

if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) {
logger.error("[${rollup.targetIndex}] is not an alias!")
return RollupJobValidationResult.Failure("[${rollup.targetIndex}] is not an alias!")
khushbr marked this conversation as resolved.
Show resolved Hide resolved
}

val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs()
if (rollupJobs != null &&
(rollupJobs.size > 1 || rollupJobs[0].id != rollup.id)
) {
errorMessage = "More than one rollup job present on the backing index, cannot add alias for target index: [$targetIndexResolvedName]"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really add alias to target index at any point?
Or we should say More than one rollup job present on the backing index of the target alias, cannot perform rollup to this target alias.

logger.error(errorMessage)
return RollupJobValidationResult.Failure(errorMessage)
}

// All other backing indices have to have this rollup job in _META field and it has to be the only one!
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
val allRollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, allRollupJobs)
if (validationResult !is RollupJobValidationResult.Valid) {
return validationResult
}
}
}
return RollupJobValidationResult.Valid
}

suspend fun validateNonWriteBackingIndex(backingIndex: String, currentRollupJob: Rollup, rollupJobs: List<Rollup>?): RollupJobValidationResult {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
var errorMessage = ""
if (rollupJobs == null) {
errorMessage = "Backing index [$backingIndex] has to have owner rollup job with id:[${currentRollupJob.id}]"
} else if (rollupJobs.size == 1 && rollupJobs[0].id != currentRollupJob.id) {
errorMessage = "Backing index [$backingIndex] has to have owner rollup job with id:[${currentRollupJob.id}]"
} else if (rollupJobs.size > 1) {
errorMessage = "Backing index [$backingIndex] has multiple rollup job owners"
}
if (errorMessage.isNotEmpty()) {
logger.error(errorMessage)
return RollupJobValidationResult.Failure(errorMessage)
}
return RollupJobValidationResult.Valid
}

// This creates the target index if it doesn't already else validate the target index is rollup index
// If the target index mappings doesn't contain rollup job attempts to update the mappings.
// TODO: error handling
@Suppress("ReturnCount")
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
val validationResult = validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
when (validationResult) {
is RollupJobValidationResult.Failure -> logger.error(validationResult.message)
is RollupJobValidationResult.Invalid -> logger.error(validationResult.reason)
}
return validationResult
} else {
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
Expand All @@ -96,6 +177,53 @@ class RollupMapperService(
}
}

suspend fun addRollupSettingToIndex(targetIndexResolvedName: String, hasLegacyPlugin: Boolean): Boolean {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the builder have the retries built it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean on updateSettings call on client? If yes, then exception would be thrown and job would fail. We don't have any retry in place for these kind of errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a conversation outside this PR, but I am assuming we do not retry the updateSettings call as we expect the next cycle of ISM execution to take care of any transient errors ?

} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
val resp: AcknowledgedResponse = client.admin().indices().suspendUntil {
updateSettings(UpdateSettingsRequest(settings, targetIndexResolvedName), it)
}
return resp.isAcknowledged
}
@Suppress("ReturnCount")
suspend fun prepareTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
var errorMessage = ""
try {
// 1. First we need to add index.plugins.rollup_index setting to index
if (addRollupSettingToIndex(targetIndexResolvedName, hasLegacyPlugin) == false) {
logger.error("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
khushbr marked this conversation as resolved.
Show resolved Hide resolved
}

// 2. Put rollup target_index mappings
val putMappingRequest: PutMappingRequest =
PutMappingRequest(targetIndexResolvedName).source(IndexManagementIndices.rollupTargetMappings, XContentType.JSON)
val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
putMapping(putMappingRequest, it)
}
if (!respMappings.isAcknowledged) {
return RollupJobValidationResult.Invalid("Failed to put initial rollup mappings for target index [$targetIndexResolvedName]")
}
// 3. Add this rollup job to target_index's _meta
errorMessage = "Failed to update mappings for target index [$targetIndexResolvedName]"
updateRollupIndexMappings(rollup, targetIndexResolvedName)
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.error(errorMessage, unwrappedException)
RollupJobValidationResult.Failure(errorMessage, unwrappedException)
} catch (e: OpenSearchSecurityException) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure("$errorMessage - missing required cluster permissions: ${e.localizedMessage}", e)
} catch (e: Exception) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure(errorMessage, e)
}
return RollupJobValidationResult.Valid
}

private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RollupSettings {
"index.plugins.rollup_index",
LegacyOpenDistroRollupSettings.ROLLUP_INDEX,
Setting.Property.IndexScope,
Setting.Property.InternalIndex
Setting.Property.Dynamic
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
)

val ROLLUP_INGEST_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.toMap
Expand All @@ -19,22 +22,58 @@ object RollupFieldValueExpressionResolver {
private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)

private lateinit var scriptService: ScriptService

private lateinit var clusterService: ClusterService
lateinit var indexAliasUtils: IndexAliasUtils
fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
.toMap()
.filterKeys { key -> key in validTopContextFields }

val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
var compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

return if (compiledValue.isBlank()) fieldValue else compiledValue
if (indexAliasUtils.isAlias(compiledValue)) {
compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue)
}

return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
}

fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
this.scriptService = scriptService
this.clusterService = clusterService
this.indexAliasUtils = IndexAliasUtils(clusterService)
}

fun registerScriptService(scriptService: ScriptService) {
fun registerServices(scriptService: ScriptService, clusterService: ClusterService, indexAliasUtils: IndexAliasUtils) {
this.scriptService = scriptService
this.clusterService = clusterService
this.indexAliasUtils = indexAliasUtils
}

open class IndexAliasUtils(val clusterService: ClusterService) {

open fun hasAlias(index: String): Boolean {
val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases
if (aliases != null) {
return aliases.size() > 0
}
return false
}

open fun isAlias(index: String): Boolean {
return this.clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
}

open fun getWriteIndexNameForAlias(alias: String): String? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
}

open fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.indices
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ fun isRollupIndex(index: String, clusterState: ClusterState): Boolean {
return false
}

fun Rollup.isTargetIndexAlias(): Boolean {
return RollupFieldValueExpressionResolver.indexAliasUtils.isAlias(targetIndex)
}

fun Rollup.getRollupSearchRequest(metadata: RollupMetadata): SearchRequest {
val query = if (metadata.continuous != null) {
RangeQueryBuilder(this.getDateHistogram().sourceField)
Expand Down
Loading