From 25194d64eb45476766713781d779f6863ed3e9c1 Mon Sep 17 00:00:00 2001 From: Mohammad Qureshi <47198598+qreshi@users.noreply.github.com> Date: Mon, 23 Aug 2021 12:22:45 -0700 Subject: [PATCH] Add BucketSelector pipeline aggregation extension (#144) Signed-off-by: Mohammad Qureshi Co-authored-by: Rishabh Maurya --- .../org/opensearch/alerting/AlertingPlugin.kt | 18 +- .../BucketSelectorExtAggregationBuilder.kt | 251 ++++++++++++ .../BucketSelectorExtAggregator.kt | 168 ++++++++ .../BucketSelectorExtFilter.kt | 149 +++++++ .../BucketSelectorIndices.kt | 79 ++++ ...ucketSelectorExtAggregationBuilderTests.kt | 60 +++ .../BucketSelectorExtAggregatorTests.kt | 371 ++++++++++++++++++ 7 files changed, 1095 insertions(+), 1 deletion(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilder.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregator.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtFilter.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorIndices.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 4ce81f562..9c719ac43 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -45,6 +45,7 @@ import org.opensearch.alerting.action.IndexMonitorAction import org.opensearch.alerting.action.SearchEmailAccountAction import org.opensearch.alerting.action.SearchEmailGroupAction import org.opensearch.alerting.action.SearchMonitorAction +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices @@ -103,12 +104,14 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.NamedWriteableRegistry +import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.IndexScopedSettings import org.opensearch.common.settings.Setting import org.opensearch.common.settings.Settings import org.opensearch.common.settings.SettingsFilter import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentParser import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule @@ -119,6 +122,7 @@ import org.opensearch.plugins.ActionPlugin import org.opensearch.plugins.Plugin import org.opensearch.plugins.ReloadablePlugin import org.opensearch.plugins.ScriptPlugin +import org.opensearch.plugins.SearchPlugin import org.opensearch.repositories.RepositoriesService import org.opensearch.rest.RestController import org.opensearch.rest.RestHandler @@ -134,7 +138,7 @@ import java.util.function.Supplier * It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY] to the * [NamedXContentRegistry] so that we are able to deserialize the custom named objects. */ -internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, Plugin() { +internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() { override fun getContextWhitelists(): Map, List> { val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt") @@ -330,4 +334,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R override fun reload(settings: Settings) { runner.reloadDestinationSettings(settings) } + + override fun getPipelineAggregations(): List { + return listOf( + SearchPlugin.PipelineAggregationSpec( + BucketSelectorExtAggregationBuilder.NAME, + { sin: StreamInput -> BucketSelectorExtAggregationBuilder(sin) }, + { parser: XContentParser, agg_name: String -> + BucketSelectorExtAggregationBuilder.parse(agg_name, parser) + } + ) + ) + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilder.kt b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilder.kt new file mode 100644 index 000000000..d3bb0057e --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilder.kt @@ -0,0 +1,251 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.aggregation.bucketselectorext + +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter.Companion.BUCKET_SELECTOR_COMPOSITE_AGG_FILTER +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter.Companion.BUCKET_SELECTOR_FILTER +import org.opensearch.common.ParseField +import org.opensearch.common.ParsingException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent.Params +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.script.Script +import org.opensearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder +import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy +import org.opensearch.search.aggregations.pipeline.PipelineAggregator +import java.io.IOException +import java.util.Objects + +class BucketSelectorExtAggregationBuilder : + AbstractPipelineAggregationBuilder { + private val bucketsPathsMap: Map + val parentBucketPath: String + val script: Script + val filter: BucketSelectorExtFilter? + private var gapPolicy = GapPolicy.SKIP + + constructor( + name: String, + bucketsPathsMap: Map, + script: Script, + parentBucketPath: String, + filter: BucketSelectorExtFilter? + ) : super(name, NAME.preferredName, listOf(parentBucketPath).toTypedArray()) { + this.bucketsPathsMap = bucketsPathsMap + this.script = script + this.parentBucketPath = parentBucketPath + this.filter = filter + } + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : super(sin, NAME.preferredName) { + bucketsPathsMap = sin.readMap() as MutableMap + script = Script(sin) + gapPolicy = GapPolicy.readFrom(sin) + parentBucketPath = sin.readString() + filter = if (sin.readBoolean()) { + BucketSelectorExtFilter(sin) + } else { + null + } + } + + @Throws(IOException::class) + override fun doWriteTo(out: StreamOutput) { + out.writeMap(bucketsPathsMap) + script.writeTo(out) + gapPolicy.writeTo(out) + out.writeString(parentBucketPath) + if (filter != null) { + out.writeBoolean(true) + filter.writeTo(out) + } else { + out.writeBoolean(false) + } + } + + /** + * Sets the gap policy to use for this aggregation. + */ + fun gapPolicy(gapPolicy: GapPolicy?): BucketSelectorExtAggregationBuilder { + requireNotNull(gapPolicy) { "[gapPolicy] must not be null: [$name]" } + this.gapPolicy = gapPolicy + return this + } + + override fun createInternal(metaData: Map?): PipelineAggregator { + return BucketSelectorExtAggregator(name, bucketsPathsMap, parentBucketPath, script, gapPolicy, filter, metaData) + } + + @Throws(IOException::class) + public override fun internalXContent(builder: XContentBuilder, params: Params): XContentBuilder { + builder.field(PipelineAggregator.Parser.BUCKETS_PATH.preferredName, bucketsPathsMap as Map?) + .field(PARENT_BUCKET_PATH.preferredName, parentBucketPath) + .field(Script.SCRIPT_PARSE_FIELD.preferredName, script) + .field(PipelineAggregator.Parser.GAP_POLICY.preferredName, gapPolicy.getName()) + if (filter != null) { + if (filter.isCompositeAggregation) { + builder.startObject(BUCKET_SELECTOR_COMPOSITE_AGG_FILTER.preferredName) + .value(filter) + .endObject() + } else { + builder.startObject(BUCKET_SELECTOR_FILTER.preferredName) + .value(filter) + .endObject() + } + } + return builder + } + + override fun overrideBucketsPath(): Boolean { + return true + } + + override fun validate(context: ValidationContext) { + // Nothing to check + } + + override fun hashCode(): Int { + return Objects.hash(super.hashCode(), bucketsPathsMap, script, gapPolicy) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other == null || javaClass != other.javaClass) return false + if (!super.equals(other)) return false + val otherCast = other as BucketSelectorExtAggregationBuilder + return (bucketsPathsMap == otherCast.bucketsPathsMap && + script == otherCast.script && + gapPolicy == otherCast.gapPolicy) + } + + override fun getWriteableName(): String { + return NAME.preferredName + } + + companion object { + val NAME = ParseField("bucket_selector_ext") + val PARENT_BUCKET_PATH = ParseField("parent_bucket_path") + + @Throws(IOException::class) + fun parse(reducerName: String, parser: XContentParser): BucketSelectorExtAggregationBuilder { + var token: XContentParser.Token + var script: Script? = null + var currentFieldName: String? = null + var bucketsPathsMap: MutableMap? = null + var gapPolicy: GapPolicy? = null + var parentBucketPath: String? = null + var filter: BucketSelectorExtFilter? = null + while (parser.nextToken().also { token = it } !== XContentParser.Token.END_OBJECT) { + if (token === XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName() + } else if (token === XContentParser.Token.VALUE_STRING) { + when { + PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler) -> { + bucketsPathsMap = HashMap() + bucketsPathsMap["_value"] = parser.text() + } + PipelineAggregator.Parser.GAP_POLICY.match(currentFieldName, parser.deprecationHandler) -> { + gapPolicy = GapPolicy.parse(parser.text(), parser.tokenLocation) + } + Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.deprecationHandler) -> { + script = Script.parse(parser) + } + PARENT_BUCKET_PATH.match(currentFieldName, parser.deprecationHandler) -> { + parentBucketPath = parser.text() + } + else -> { + throw ParsingException( + parser.tokenLocation, + "Unknown key for a $token in [$reducerName]: [$currentFieldName]." + ) + } + } + } else if (token === XContentParser.Token.START_ARRAY) { + if (PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler)) { + val paths: MutableList = ArrayList() + while (parser.nextToken().also { token = it } !== XContentParser.Token.END_ARRAY) { + val path = parser.text() + paths.add(path) + } + bucketsPathsMap = HashMap() + for (i in paths.indices) { + bucketsPathsMap["_value$i"] = paths[i] + } + } else { + throw ParsingException( + parser.tokenLocation, + "Unknown key for a $token in [$reducerName]: [$currentFieldName]." + ) + } + } else if (token === XContentParser.Token.START_OBJECT) { + when { + Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.deprecationHandler) -> { + script = Script.parse(parser) + } + PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler) -> { + val map = parser.map() + bucketsPathsMap = HashMap() + for ((key, value) in map) { + bucketsPathsMap[key] = value.toString() + } + } + BUCKET_SELECTOR_FILTER.match(currentFieldName, parser.deprecationHandler) -> { + filter = BucketSelectorExtFilter.parse(reducerName, false, parser) + } + BUCKET_SELECTOR_COMPOSITE_AGG_FILTER.match( + currentFieldName, + parser.deprecationHandler + ) -> { + filter = BucketSelectorExtFilter.parse(reducerName, true, parser) + } + else -> { + throw ParsingException( + parser.tokenLocation, + "Unknown key for a $token in [$reducerName]: [$currentFieldName]." + ) + } + } + } else { + throw ParsingException(parser.tokenLocation, "Unexpected token $token in [$reducerName].") + } + } + if (bucketsPathsMap == null) { + throw ParsingException( + parser.tokenLocation, "Missing required field [" + PipelineAggregator.Parser.BUCKETS_PATH.preferredName + + "] for bucket_selector aggregation [" + reducerName + "]" + ) + } + if (script == null) { + throw ParsingException( + parser.tokenLocation, "Missing required field [" + Script.SCRIPT_PARSE_FIELD.preferredName + + "] for bucket_selector aggregation [" + reducerName + "]" + ) + } + + if (parentBucketPath == null) { + throw ParsingException( + parser.tokenLocation, "Missing required field [" + PARENT_BUCKET_PATH + + "] for bucket_selector aggregation [" + reducerName + "]" + ) + } + val factory = BucketSelectorExtAggregationBuilder(reducerName, bucketsPathsMap, script, parentBucketPath, filter) + if (gapPolicy != null) { + factory.gapPolicy(gapPolicy) + } + return factory + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregator.kt new file mode 100644 index 000000000..59b65cd55 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregator.kt @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.aggregation.bucketselectorext + +import org.apache.lucene.util.BytesRef +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder.Companion.NAME +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.script.BucketAggregationSelectorScript +import org.opensearch.script.Script +import org.opensearch.search.DocValueFormat +import org.opensearch.search.aggregations.Aggregations +import org.opensearch.search.aggregations.InternalAggregation +import org.opensearch.search.aggregations.InternalAggregation.ReduceContext +import org.opensearch.search.aggregations.InternalMultiBucketAggregation +import org.opensearch.search.aggregations.bucket.SingleBucketAggregation +import org.opensearch.search.aggregations.bucket.composite.InternalComposite +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import org.opensearch.search.aggregations.pipeline.BucketHelpers +import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy +import org.opensearch.search.aggregations.pipeline.SiblingPipelineAggregator +import org.opensearch.search.aggregations.support.AggregationPath +import java.io.IOException + +class BucketSelectorExtAggregator : SiblingPipelineAggregator { + private var name: String? = null + private var bucketsPathsMap: Map + private var parentBucketPath: String + private var script: Script + private var gapPolicy: GapPolicy + private var bucketSelectorExtFilter: BucketSelectorExtFilter? = null + + constructor( + name: String?, + bucketsPathsMap: Map, + parentBucketPath: String, + script: Script, + gapPolicy: GapPolicy, + filter: BucketSelectorExtFilter?, + metadata: Map? + ) : super(name, bucketsPathsMap.values.toTypedArray(), metadata) { + this.bucketsPathsMap = bucketsPathsMap + this.parentBucketPath = parentBucketPath + this.script = script + this.gapPolicy = gapPolicy + this.bucketSelectorExtFilter = filter + } + + /** + * Read from a stream. + */ + @Suppress("UNCHECKED_CAST") + @Throws(IOException::class) + constructor(sin: StreamInput) : super(sin.readString(), null, null) { + script = Script(sin) + gapPolicy = GapPolicy.readFrom(sin) + bucketsPathsMap = sin.readMap() as Map + parentBucketPath = sin.readString() + if (sin.readBoolean()) { + bucketSelectorExtFilter = BucketSelectorExtFilter(sin) + } else { + bucketSelectorExtFilter = null + } + } + + @Throws(IOException::class) + override fun doWriteTo(out: StreamOutput) { + out.writeString(name) + script.writeTo(out) + gapPolicy.writeTo(out) + out.writeGenericValue(bucketsPathsMap) + out.writeString(parentBucketPath) + if (bucketSelectorExtFilter != null) { + out.writeBoolean(true) + bucketSelectorExtFilter!!.writeTo(out) + } else { + out.writeBoolean(false) + } + } + + override fun getWriteableName(): String { + return NAME.preferredName + } + + override fun doReduce(aggregations: Aggregations, reduceContext: ReduceContext): InternalAggregation { + val parentBucketPathList = AggregationPath.parse(parentBucketPath).pathElementsAsStringList + var subAggregations: Aggregations = aggregations + for (i in 0 until parentBucketPathList.size - 1) { + subAggregations = subAggregations.get(parentBucketPathList[0]).aggregations + } + val originalAgg = subAggregations.get(parentBucketPathList.last()) as InternalMultiBucketAggregation<*, *> + val buckets = originalAgg.buckets + val factory = reduceContext.scriptService().compile(script, BucketAggregationSelectorScript.CONTEXT) + val selectedBucketsIndex: MutableList = ArrayList() + for (i in buckets.indices) { + val bucket = buckets[i] + if (bucketSelectorExtFilter != null) { + var accepted = true + if (bucketSelectorExtFilter!!.isCompositeAggregation) { + val compBucketKeyObj = (bucket as InternalComposite.InternalBucket).key + val filtersMap: HashMap? = bucketSelectorExtFilter!!.filtersMap + for (sourceKey in compBucketKeyObj.keys) { + if (filtersMap != null) { + if (filtersMap.containsKey(sourceKey)) { + val obj = compBucketKeyObj[sourceKey] + accepted = isAccepted(obj!!, filtersMap[sourceKey]) + if (!accepted) break + } else { + accepted = false + break + } + } + } + } else { + accepted = isAccepted(bucket.key, bucketSelectorExtFilter!!.filters) + } + if (!accepted) continue + } + + val vars: MutableMap = HashMap() + if (script.params != null) { + vars.putAll(script.params) + } + for ((varName, bucketsPath) in bucketsPathsMap) { + val value = BucketHelpers.resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy) + vars[varName] = value + } + val executableScript = factory.newInstance(vars) + // TODO: can we use one instance of the script for all buckets? it should be stateless? + if (executableScript.execute()) { + selectedBucketsIndex.add(i) + } + } + + return BucketSelectorIndices( + name(), parentBucketPath, selectedBucketsIndex, originalAgg.metadata + ) + } + + private fun isAccepted(obj: Any, filter: IncludeExclude?): Boolean { + return when (obj.javaClass) { + String::class.java -> { + val stringFilter = filter!!.convertToStringFilter(DocValueFormat.RAW) + stringFilter.accept(BytesRef(obj as String)) + } + java.lang.Long::class.java, Long::class.java -> { + val longFilter = filter!!.convertToLongFilter(DocValueFormat.RAW) + longFilter.accept(obj as Long) + } + java.lang.Double::class.java, Double::class.java -> { + val doubleFilter = filter!!.convertToDoubleFilter() + doubleFilter.accept(obj as Long) + } + else -> { + throw IllegalStateException("Object is not comparable. Please use one of String, Long or Double type.") + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtFilter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtFilter.kt new file mode 100644 index 000000000..49eff3a66 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtFilter.kt @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.aggregation.bucketselectorext + +import org.opensearch.common.ParseField +import org.opensearch.common.ParsingException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent.Params +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import java.io.IOException + +class BucketSelectorExtFilter : ToXContentObject, Writeable { + // used for composite aggregations + val filtersMap: HashMap? + // used for filtering string term aggregation + val filters: IncludeExclude? + + constructor(filters: IncludeExclude?) { + filtersMap = null + this.filters = filters + } + + constructor(filtersMap: HashMap?) { + this.filtersMap = filtersMap + filters = null + } + + constructor(sin: StreamInput) { + if (sin.readBoolean()) { + val size: Int = sin.readVInt() + filtersMap = java.util.HashMap() + for (i in 0 until size) { + filtersMap[sin.readString()] = IncludeExclude(sin) + } + filters = null + } else { + filters = IncludeExclude(sin) + filtersMap = null + } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + val isCompAgg = isCompositeAggregation + out.writeBoolean(isCompAgg) + if (isCompAgg) { + out.writeVInt(filtersMap!!.size) + for ((key, value) in filtersMap) { + out.writeString(key) + value.writeTo(out) + } + } else { + filters!!.writeTo(out) + } + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: Params): XContentBuilder { + if (isCompositeAggregation) { + for ((key, filter) in filtersMap!!) { + builder.startObject(key) + filter.toXContent(builder, params) + builder.endObject() + } + } else { + filters!!.toXContent(builder, params) + } + return builder + } + + val isCompositeAggregation: Boolean + get() = if (filtersMap != null && filters == null) { + true + } else if (filtersMap == null && filters != null) { + false + } else { + throw IllegalStateException("Type of selector cannot be determined") + } + + companion object { + const val NAME = "filter" + var BUCKET_SELECTOR_FILTER = ParseField("filter") + var BUCKET_SELECTOR_COMPOSITE_AGG_FILTER = ParseField("composite_agg_filter") + + @Throws(IOException::class) + fun parse(reducerName: String, isCompositeAggregation: Boolean, parser: XContentParser): BucketSelectorExtFilter { + var token: XContentParser.Token + return if (isCompositeAggregation) { + val filtersMap = HashMap() + while (parser.nextToken().also { token = it } !== XContentParser.Token.END_OBJECT) { + if (token === XContentParser.Token.FIELD_NAME) { + val sourceKey = parser.currentName() + token = parser.nextToken() + filtersMap[sourceKey] = parseIncludeExclude(reducerName, parser) + } else { + throw ParsingException( + parser.tokenLocation, + "Unknown key for a " + token + " in [" + reducerName + "]: [" + parser.currentName() + "]." + ) + } + } + BucketSelectorExtFilter(filtersMap) + } else { + BucketSelectorExtFilter(parseIncludeExclude(reducerName, parser)) + } + } + + @Throws(IOException::class) + private fun parseIncludeExclude(reducerName: String, parser: XContentParser): IncludeExclude { + var token: XContentParser.Token + var include: IncludeExclude? = null + var exclude: IncludeExclude? = null + while (parser.nextToken().also { token = it } !== XContentParser.Token.END_OBJECT) { + val fieldName = parser.currentName() + when { + IncludeExclude.INCLUDE_FIELD.match(fieldName, parser.deprecationHandler) -> { + parser.nextToken() + include = IncludeExclude.parseInclude(parser) + } + IncludeExclude.EXCLUDE_FIELD.match(fieldName, parser.deprecationHandler) -> { + parser.nextToken() + exclude = IncludeExclude.parseExclude(parser) + } + else -> { + throw ParsingException( + parser.tokenLocation, + "Unknown key for a $token in [$reducerName]: [$fieldName]." + ) + } + } + } + return IncludeExclude.merge(include, exclude) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorIndices.kt new file mode 100644 index 000000000..4e636e360 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorIndices.kt @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.aggregation.bucketselectorext + +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent.Params +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.search.aggregations.InternalAggregation +import java.io.IOException +import java.util.Objects + +open class BucketSelectorIndices( + name: String?, + private var parentBucketPath: String, + var bucketIndices: List, + metaData: Map? +) : InternalAggregation(name, metaData) { + + @Throws(IOException::class) + override fun doWriteTo(out: StreamOutput) { + out.writeString(parentBucketPath) + out.writeIntArray(bucketIndices.stream().mapToInt { i: Int? -> i!! }.toArray()) + } + + override fun getWriteableName(): String { + return name + } + + override fun reduce(aggregations: List, reduceContext: ReduceContext): BucketSelectorIndices { + throw UnsupportedOperationException("Not supported") + } + + override fun mustReduceOnSingleInternalAgg(): Boolean { + return false + } + + override fun getProperty(path: MutableList?): Any { + throw UnsupportedOperationException("Not supported") + } + + internal object Fields { + const val PARENT_BUCKET_PATH = "parent_bucket_path" + const val BUCKET_INDICES = "bucket_indices" + } + + @Throws(IOException::class) + override fun doXContentBody(builder: XContentBuilder, params: Params): XContentBuilder { + builder.field(Fields.PARENT_BUCKET_PATH, parentBucketPath) + builder.field(Fields.BUCKET_INDICES, bucketIndices) + otherStatsToXContent(builder) + return builder + } + + @Throws(IOException::class) + protected fun otherStatsToXContent(builder: XContentBuilder): XContentBuilder { + return builder + } + + override fun hashCode(): Int { + return Objects.hash(super.hashCode(), parentBucketPath) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other == null || javaClass != other.javaClass) return false + if (!super.equals(other)) return false + val otherCast = other as BucketSelectorIndices + return name == otherCast.name && parentBucketPath == otherCast.parentBucketPath + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt new file mode 100644 index 000000000..8a925ac1d --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregationBuilderTests.kt @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.aggregation.bucketselectorext + +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.plugins.SearchPlugin +import org.opensearch.script.Script +import org.opensearch.script.ScriptType +import org.opensearch.search.aggregations.BasePipelineAggregationTestCase +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy + +class BucketSelectorExtAggregationBuilderTests : BasePipelineAggregationTestCase() { + override fun plugins(): List { + return listOf(AlertingPlugin()) + } + + override fun createTestAggregatorFactory(): BucketSelectorExtAggregationBuilder { + val name = randomAlphaOfLengthBetween(3, 20) + val bucketsPaths: MutableMap = HashMap() + val numBucketPaths = randomIntBetween(1, 10) + for (i in 0 until numBucketPaths) { + bucketsPaths[randomAlphaOfLengthBetween(1, 20)] = randomAlphaOfLengthBetween(1, 40) + } + val script: Script + if (randomBoolean()) { + script = mockScript("script") + } else { + val params: MutableMap = HashMap() + if (randomBoolean()) { + params["foo"] = "bar" + } + val type = randomFrom(*ScriptType.values()) + script = + Script( + type, if (type == ScriptType.STORED) null else + randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG), "script", params + ) + } + val parentBucketPath = randomAlphaOfLengthBetween(3, 20) + val filter = BucketSelectorExtFilter(IncludeExclude("foo.*", "bar.*")) + val factory = BucketSelectorExtAggregationBuilder( + name, bucketsPaths, + script, parentBucketPath, filter + ) + if (randomBoolean()) { + factory.gapPolicy(randomFrom(*GapPolicy.values())) + } + return factory + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt new file mode 100644 index 000000000..a767a6e5c --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregatorTests.kt @@ -0,0 +1,371 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.aggregation.bucketselectorext + +import org.apache.lucene.document.Document +import org.apache.lucene.document.SortedNumericDocValuesField +import org.apache.lucene.document.SortedSetDocValuesField +import org.apache.lucene.index.DirectoryReader +import org.apache.lucene.index.RandomIndexWriter +import org.apache.lucene.search.MatchAllDocsQuery +import org.apache.lucene.search.Query +import org.apache.lucene.util.BytesRef +import org.hamcrest.CoreMatchers +import org.opensearch.common.CheckedConsumer +import org.opensearch.common.settings.Settings +import org.opensearch.index.mapper.KeywordFieldMapper.KeywordFieldType +import org.opensearch.index.mapper.MappedFieldType +import org.opensearch.index.mapper.NumberFieldMapper +import org.opensearch.index.mapper.NumberFieldMapper.NumberFieldType +import org.opensearch.index.query.MatchAllQueryBuilder +import org.opensearch.script.MockScriptEngine +import org.opensearch.script.Script +import org.opensearch.script.ScriptEngine +import org.opensearch.script.ScriptModule +import org.opensearch.script.ScriptService +import org.opensearch.script.ScriptType +import org.opensearch.search.aggregations.Aggregation +import org.opensearch.search.aggregations.Aggregator +import org.opensearch.search.aggregations.AggregatorTestCase +import org.opensearch.search.aggregations.bucket.filter.FilterAggregationBuilder +import org.opensearch.search.aggregations.bucket.filter.FiltersAggregationBuilder +import org.opensearch.search.aggregations.bucket.filter.InternalFilter +import org.opensearch.search.aggregations.bucket.filter.InternalFilters +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder +import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder +import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder +import java.io.IOException +import java.util.Collections +import java.util.function.Consumer +import java.util.function.Function + +class BucketSelectorExtAggregatorTests : AggregatorTestCase() { + + private var SCRIPTNAME = "bucket_selector_script" + private var paramName = "the_avg" + private var paramValue = 19.0 + + override fun getMockScriptService(): ScriptService { + + val scriptEngine = MockScriptEngine( + MockScriptEngine.NAME, + Collections.singletonMap(SCRIPTNAME, + Function, Any> { script: Map -> + script[paramName].toString().toDouble() == paramValue + }), emptyMap() + ) + val engines: Map = Collections.singletonMap(scriptEngine.type, scriptEngine) + return ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS) + } + + @Throws(Exception::class) + fun `test bucket selector script`() { + val fieldType: MappedFieldType = NumberFieldType("number_field", NumberFieldMapper.NumberType.INTEGER) + val fieldType1: MappedFieldType = KeywordFieldType("the_field") + + val filters: FiltersAggregationBuilder = FiltersAggregationBuilder("placeholder", MatchAllQueryBuilder()) + .subAggregation( + TermsAggregationBuilder("the_terms").field("the_field") + .subAggregation(AvgAggregationBuilder("the_avg").field("number_field")) + ) + .subAggregation( + BucketSelectorExtAggregationBuilder( + "test_bucket_selector_ext", + Collections.singletonMap("the_avg", "the_avg.value"), + Script(ScriptType.INLINE, MockScriptEngine.NAME, SCRIPTNAME, emptyMap()), + "the_terms", + null + ) + ) + paramName = "the_avg" + paramValue = 19.0 + testCase( + filters, MatchAllDocsQuery(), + CheckedConsumer { iw: RandomIndexWriter -> + var doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test1"))) + doc.add(SortedNumericDocValuesField("number_field", 20)) + iw.addDocument(doc) + doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test2"))) + doc.add(SortedNumericDocValuesField("number_field", 19)) + iw.addDocument(doc) + }, + Consumer { f: InternalFilters -> + assertThat( + (f.buckets[0].aggregations + .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + CoreMatchers.equalTo(1) + ) + }, fieldType, fieldType1 + ) + } + + @Throws(Exception::class) + fun `test bucket selector filter include`() { + val fieldType: MappedFieldType = NumberFieldType("number_field", NumberFieldMapper.NumberType.INTEGER) + val fieldType1: MappedFieldType = KeywordFieldType("the_field") + + val selectorAgg1: FiltersAggregationBuilder = FiltersAggregationBuilder("placeholder", MatchAllQueryBuilder()) + .subAggregation( + TermsAggregationBuilder("the_terms").field("the_field") + .subAggregation(AvgAggregationBuilder("the_avg").field("number_field")) + ) + .subAggregation( + BucketSelectorExtAggregationBuilder( + "test_bucket_selector_ext", + Collections.singletonMap("the_avg", "the_avg.value"), + Script(ScriptType.INLINE, MockScriptEngine.NAME, SCRIPTNAME, emptyMap()), + "the_terms", + BucketSelectorExtFilter(IncludeExclude(arrayOf("test1"), arrayOf())) + ) + ) + + val selectorAgg2: FiltersAggregationBuilder = FiltersAggregationBuilder("placeholder", MatchAllQueryBuilder()) + .subAggregation( + TermsAggregationBuilder("the_terms").field("the_field") + .subAggregation(AvgAggregationBuilder("the_avg").field("number_field")) + ) + .subAggregation( + BucketSelectorExtAggregationBuilder( + "test_bucket_selector_ext", + Collections.singletonMap("the_avg", "the_avg.value"), + Script(ScriptType.INLINE, MockScriptEngine.NAME, SCRIPTNAME, emptyMap()), + "the_terms", + BucketSelectorExtFilter(IncludeExclude(arrayOf("test2"), arrayOf())) + ) + ) + + paramName = "the_avg" + paramValue = 19.0 + + testCase( + selectorAgg1, MatchAllDocsQuery(), + CheckedConsumer { iw: RandomIndexWriter -> + var doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test1"))) + doc.add(SortedNumericDocValuesField("number_field", 20)) + iw.addDocument(doc) + doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test2"))) + doc.add(SortedNumericDocValuesField("number_field", 19)) + iw.addDocument(doc) + }, + Consumer { f: InternalFilters -> + assertThat( + (f.buckets[0].aggregations + .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices.size, + CoreMatchers.equalTo(0) + ) + }, fieldType, fieldType1 + ) + + testCase( + selectorAgg2, MatchAllDocsQuery(), + CheckedConsumer { iw: RandomIndexWriter -> + var doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test1"))) + doc.add(SortedNumericDocValuesField("number_field", 20)) + iw.addDocument(doc) + doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test2"))) + doc.add(SortedNumericDocValuesField("number_field", 19)) + iw.addDocument(doc) + }, + Consumer { f: InternalFilters -> + assertThat( + (f.buckets[0].aggregations + .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + CoreMatchers.equalTo(1) + ) + }, fieldType, fieldType1 + ) + } + + @Throws(Exception::class) + fun `test bucket selector filter exclude`() { + val fieldType: MappedFieldType = NumberFieldType("number_field", NumberFieldMapper.NumberType.INTEGER) + val fieldType1: MappedFieldType = KeywordFieldType("the_field") + + val selectorAgg1: FiltersAggregationBuilder = FiltersAggregationBuilder("placeholder", MatchAllQueryBuilder()) + .subAggregation( + TermsAggregationBuilder("the_terms").field("the_field") + .subAggregation(AvgAggregationBuilder("the_avg").field("number_field")) + ) + .subAggregation( + BucketSelectorExtAggregationBuilder( + "test_bucket_selector_ext", + Collections.singletonMap("the_avg", "the_avg.value"), + Script(ScriptType.INLINE, MockScriptEngine.NAME, SCRIPTNAME, emptyMap()), + "the_terms", + BucketSelectorExtFilter(IncludeExclude(arrayOf(), arrayOf("test2"))) + ) + ) + paramName = "the_avg" + paramValue = 19.0 + testCase( + selectorAgg1, MatchAllDocsQuery(), + CheckedConsumer { iw: RandomIndexWriter -> + var doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test1"))) + doc.add(SortedNumericDocValuesField("number_field", 20)) + iw.addDocument(doc) + doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test2"))) + doc.add(SortedNumericDocValuesField("number_field", 19)) + iw.addDocument(doc) + }, + Consumer { f: InternalFilters -> + assertThat( + (f.buckets[0].aggregations + .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices.size, + CoreMatchers.equalTo(0) + ) + }, fieldType, fieldType1 + ) + } + + @Throws(Exception::class) + fun `test bucket selector filter numeric key`() { + val fieldType: MappedFieldType = NumberFieldType("number_field", NumberFieldMapper.NumberType.INTEGER) + val fieldType1: MappedFieldType = KeywordFieldType("the_field") + + val selectorAgg1: FiltersAggregationBuilder = FiltersAggregationBuilder("placeholder", MatchAllQueryBuilder()) + .subAggregation( + TermsAggregationBuilder("number_agg").field("number_field") + .subAggregation(ValueCountAggregationBuilder("count").field("number_field")) + ) + .subAggregation( + BucketSelectorExtAggregationBuilder( + "test_bucket_selector_ext", + Collections.singletonMap("count", "count"), + Script(ScriptType.INLINE, MockScriptEngine.NAME, SCRIPTNAME, emptyMap()), + "number_agg", + BucketSelectorExtFilter(IncludeExclude(doubleArrayOf(19.0), doubleArrayOf())) + ) + ) + + paramName = "count" + paramValue = 1.0 + testCase( + selectorAgg1, MatchAllDocsQuery(), + CheckedConsumer { iw: RandomIndexWriter -> + var doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test1"))) + doc.add(SortedNumericDocValuesField("number_field", 20)) + iw.addDocument(doc) + doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test2"))) + doc.add(SortedNumericDocValuesField("number_field", 19)) + iw.addDocument(doc) + }, + Consumer { f: InternalFilters -> + assertThat( + (f.buckets[0].aggregations + .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + CoreMatchers.equalTo(0) + ) + }, fieldType, fieldType1 + ) + } + + @Throws(Exception::class) + fun `test bucket selector nested parent path`() { + val fieldType: MappedFieldType = NumberFieldType("number_field", NumberFieldMapper.NumberType.INTEGER) + val fieldType1: MappedFieldType = KeywordFieldType("the_field") + + val selectorAgg1: FilterAggregationBuilder = FilterAggregationBuilder("placeholder", MatchAllQueryBuilder()) + .subAggregation( + FilterAggregationBuilder("parent_agg", MatchAllQueryBuilder()) + .subAggregation( + TermsAggregationBuilder("term_agg").field("the_field") + .subAggregation(AvgAggregationBuilder("the_avg").field("number_field")) + ) + ) + .subAggregation( + BucketSelectorExtAggregationBuilder( + "test_bucket_selector_ext", + Collections.singletonMap("the_avg", "the_avg.value"), + Script(ScriptType.INLINE, MockScriptEngine.NAME, SCRIPTNAME, emptyMap()), + "parent_agg>term_agg", + null + ) + ) + paramName = "the_avg" + paramValue = 19.0 + testCaseInternalFilter( + selectorAgg1, MatchAllDocsQuery(), + CheckedConsumer { iw: RandomIndexWriter -> + var doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test1"))) + + doc.add(SortedNumericDocValuesField("number_field", 20)) + iw.addDocument(doc) + doc = Document() + doc.add(SortedSetDocValuesField("the_field", BytesRef("test2"))) + + doc.add(SortedNumericDocValuesField("number_field", 19)) + iw.addDocument(doc) + }, + Consumer { f: InternalFilter -> + assertThat( + (f.aggregations + .get("test_bucket_selector_ext") as BucketSelectorIndices).bucketIndices[0], + CoreMatchers.equalTo(1) + ) + }, fieldType, fieldType1 + ) + } + + @Throws(IOException::class) + private fun testCase( + aggregationBuilder: FiltersAggregationBuilder, + query: Query, + buildIndex: CheckedConsumer, + verify: Consumer, + vararg fieldType: MappedFieldType + ) { + newDirectory().use { directory -> + val indexWriter = RandomIndexWriter(random(), directory) + buildIndex.accept(indexWriter) + indexWriter.close() + DirectoryReader.open(directory).use { indexReader -> + val indexSearcher = newIndexSearcher(indexReader) + val filters: InternalFilters + filters = searchAndReduce(indexSearcher, query, aggregationBuilder, *fieldType) + verify.accept(filters) + } + } + } + + @Throws(IOException::class) + private fun testCaseInternalFilter( + aggregationBuilder: FilterAggregationBuilder, + query: Query, + buildIndex: CheckedConsumer, + verify: Consumer, + vararg fieldType: MappedFieldType + ) { + newDirectory().use { directory -> + val indexWriter = RandomIndexWriter(random(), directory) + buildIndex.accept(indexWriter) + indexWriter.close() + DirectoryReader.open(directory).use { indexReader -> + val indexSearcher = newIndexSearcher(indexReader) + val filters: InternalFilter + filters = searchAndReduce(indexSearcher, query, aggregationBuilder, *fieldType) + verify.accept(filters) + } + } + } +}