Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for mustache scripting of rollup.target_index field #435

Merged
merged 14 commits into from
Aug 5, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction
import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler
Expand Down Expand Up @@ -370,6 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollup

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchException
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.index.engine.VersionConflictEngineException
Expand Down Expand Up @@ -62,7 +63,9 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
}
} catch (e: RemoteTransportException) {
processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: RemoteTransportException) {
} catch (e: OpenSearchException) {
processFailure(rollup.id, indexName, e)
} catch (e: Exception) {
processFailure(rollup.id, indexName, e)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_COUNT
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_MILLIS
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getInitialDocValues
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
Expand Down Expand Up @@ -123,7 +124,8 @@ class RollupIndexer(
}
}
mapOfKeyValues.putAll(aggResults)
val indexRequest = IndexRequest(job.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
val indexRequest = IndexRequest(targetIndexResolvedName)
.id(documentId)
.source(mapOfKeyValues, XContentType.JSON)
requests.add(indexRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
Expand All @@ -52,14 +53,14 @@ class RollupMapperService(
// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup): RollupJobValidationResult {
if (!isRollupIndex(rollup.targetIndex, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [${rollup.targetIndex}] is a non rollup index")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}

return when (val jobExistsResult = jobExistsInRollupIndex(rollup)) {
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup)
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
is RollupJobValidationResult.Failure -> jobExistsResult
}
}
Expand All @@ -69,14 +70,15 @@ class RollupMapperService(
// TODO: error handling
@Suppress("ReturnCount")
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
if (indexExists(job.targetIndex)) {
return validateAndAttemptToUpdateTargetIndex(job)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
} else {
val errorMessage = "Failed to create target index [${job.targetIndex}]"
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
val response = createTargetIndex(job, hasLegacyPlugin)
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
if (response.isAcknowledged) {
updateRollupIndexMappings(job)
updateRollupIndexMappings(job, targetIndexResolvedName)
} else {
RollupJobValidationResult.Failure(errorMessage)
}
Expand All @@ -94,13 +96,13 @@ class RollupMapperService(
}
}

private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse {
private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
val request = CreateIndexRequest(job.targetIndex)
val request = CreateIndexRequest(targetIndexName)
.settings(settings)
.mapping(IndexManagementIndices.rollupTargetMappings)
// TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest
Expand Down Expand Up @@ -204,19 +206,19 @@ class RollupMapperService(
return field != null
}

private suspend fun jobExistsInRollupIndex(rollup: Rollup): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(rollup.targetIndex)) {
private suspend fun jobExistsInRollupIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(targetIndexResolvedName)) {
is GetMappingsResult.Success -> getMappingsResult.response
is GetMappingsResult.Failure ->
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
}

val indexMapping: MappingMetadata = res.mappings[rollup.targetIndex]
val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName]

return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
RollupJobValidationResult.Valid
} else {
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [${rollup.targetIndex}]")
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]")
}
}

Expand Down Expand Up @@ -254,8 +256,8 @@ class RollupMapperService(
// where they can both get the same mapping state and only add their own job, meaning one
// of the jobs won't be added to the target index _meta
@Suppress("BlockingMethodInNonBlockingContext", "ReturnCount")
private suspend fun updateRollupIndexMappings(rollup: Rollup): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [${rollup.targetIndex}] with rollup job"
private suspend fun updateRollupIndexMappings(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [$targetIndexResolvedName] with rollup job"
try {
val response = withContext(Dispatchers.IO) {
val resp: AcknowledgedResponse = client.suspendUntil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.commons.authuser.User
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.parseRollup
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.util.IndexUtils
Expand Down Expand Up @@ -91,6 +92,14 @@ class TransportIndexRollupAction @Inject constructor(
if (response.isAcknowledged) {
log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
if (request.opType() == DocWriteRequest.OpType.CREATE) {
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
}
putRollup()
} else {
getRollup()
Expand Down Expand Up @@ -128,6 +137,14 @@ class TransportIndexRollupAction @Inject constructor(
if (modified.isNotEmpty()) {
return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST))
}
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
}
putRollup()
}

Expand Down Expand Up @@ -172,5 +189,10 @@ class TransportIndexRollupAction @Inject constructor(
}
)
}

private fun validateTargetIndexName(): Boolean {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
return targetIndexResolvedName.contains("*") == false && targetIndexResolvedName.contains("?") == false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
Expand All @@ -50,7 +51,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
private val log = LogManager.getLogger(javaClass)

override fun checkBlock(request: UpdateRollupMappingRequest, state: ClusterState): ClusterBlockException? {
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(request.rollup.targetIndex))
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(targetIndexResolvedName))
}

@Suppress("ReturnCount", "LongMethod")
Expand All @@ -59,7 +61,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
state: ClusterState,
listener: ActionListener<AcknowledgedResponse>
) {
val index = state.metadata.index(request.rollup.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
petardz marked this conversation as resolved.
Show resolved Hide resolved
val index = state.metadata.index(targetIndexResolvedName)
if (index == null) {
log.debug("Could not find index [$index]")
return listener.onFailure(IllegalStateException("Could not find index [$index]"))
Expand Down Expand Up @@ -113,7 +116,7 @@ class TransportUpdateRollupMappingAction @Inject constructor(
}

// TODO: Does schema_version get overwritten?
val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).source(metaMappings)
val putMappingRequest = PutMappingRequest(targetIndexResolvedName).source(metaMappings)
client.admin().indices().putMapping(
putMappingRequest,
object : ActionListener<AcknowledgedResponse> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.toMap
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript

object RollupFieldValueExpressionResolver {

private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)

private lateinit var scriptService: ScriptService

fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
.toMap()
.filterKeys { key -> key in validTopContextFields }

val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

return if (compiledValue.isBlank()) fieldValue else compiledValue
}

fun registerScriptService(scriptService: ScriptService) {
this.scriptService = scriptService
}
}
Loading