Skip to content

Commit

Permalink
Add BucketSelector pipeline aggregation extension (#144)
Browse files Browse the repository at this point in the history
Signed-off-by: Mohammad Qureshi <[email protected]>

Co-authored-by: Rishabh Maurya <[email protected]>
  • Loading branch information
qreshi and rishabhmaurya authored Aug 23, 2021
1 parent 764c14a commit 25194d6
Show file tree
Hide file tree
Showing 7 changed files with 1,095 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.opensearch.alerting.action.IndexMonitorAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
Expand Down Expand Up @@ -103,12 +104,14 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsFilter
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
Expand All @@ -119,6 +122,7 @@ import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
Expand All @@ -134,7 +138,7 @@ import java.util.function.Supplier
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY] to the
* [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, Plugin() {
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {

override fun getContextWhitelists(): Map<ScriptContext<*>, List<Whitelist>> {
val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt")
Expand Down Expand Up @@ -330,4 +334,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
override fun reload(settings: Settings) {
runner.reloadDestinationSettings(settings)
}

override fun getPipelineAggregations(): List<SearchPlugin.PipelineAggregationSpec> {
return listOf(
SearchPlugin.PipelineAggregationSpec(
BucketSelectorExtAggregationBuilder.NAME,
{ sin: StreamInput -> BucketSelectorExtAggregationBuilder(sin) },
{ parser: XContentParser, agg_name: String ->
BucketSelectorExtAggregationBuilder.parse(agg_name, parser)
}
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting.aggregation.bucketselectorext

import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter.Companion.BUCKET_SELECTOR_COMPOSITE_AGG_FILTER
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter.Companion.BUCKET_SELECTOR_FILTER
import org.opensearch.common.ParseField
import org.opensearch.common.ParsingException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent.Params
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.script.Script
import org.opensearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder
import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy
import org.opensearch.search.aggregations.pipeline.PipelineAggregator
import java.io.IOException
import java.util.Objects

class BucketSelectorExtAggregationBuilder :
AbstractPipelineAggregationBuilder<BucketSelectorExtAggregationBuilder> {
private val bucketsPathsMap: Map<String, String>
val parentBucketPath: String
val script: Script
val filter: BucketSelectorExtFilter?
private var gapPolicy = GapPolicy.SKIP

constructor(
name: String,
bucketsPathsMap: Map<String, String>,
script: Script,
parentBucketPath: String,
filter: BucketSelectorExtFilter?
) : super(name, NAME.preferredName, listOf<String>(parentBucketPath).toTypedArray<String>()) {
this.bucketsPathsMap = bucketsPathsMap
this.script = script
this.parentBucketPath = parentBucketPath
this.filter = filter
}

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : super(sin, NAME.preferredName) {
bucketsPathsMap = sin.readMap() as MutableMap<String, String>
script = Script(sin)
gapPolicy = GapPolicy.readFrom(sin)
parentBucketPath = sin.readString()
filter = if (sin.readBoolean()) {
BucketSelectorExtFilter(sin)
} else {
null
}
}

@Throws(IOException::class)
override fun doWriteTo(out: StreamOutput) {
out.writeMap(bucketsPathsMap)
script.writeTo(out)
gapPolicy.writeTo(out)
out.writeString(parentBucketPath)
if (filter != null) {
out.writeBoolean(true)
filter.writeTo(out)
} else {
out.writeBoolean(false)
}
}

/**
* Sets the gap policy to use for this aggregation.
*/
fun gapPolicy(gapPolicy: GapPolicy?): BucketSelectorExtAggregationBuilder {
requireNotNull(gapPolicy) { "[gapPolicy] must not be null: [$name]" }
this.gapPolicy = gapPolicy
return this
}

override fun createInternal(metaData: Map<String, Any>?): PipelineAggregator {
return BucketSelectorExtAggregator(name, bucketsPathsMap, parentBucketPath, script, gapPolicy, filter, metaData)
}

@Throws(IOException::class)
public override fun internalXContent(builder: XContentBuilder, params: Params): XContentBuilder {
builder.field(PipelineAggregator.Parser.BUCKETS_PATH.preferredName, bucketsPathsMap as Map<String, Any>?)
.field(PARENT_BUCKET_PATH.preferredName, parentBucketPath)
.field(Script.SCRIPT_PARSE_FIELD.preferredName, script)
.field(PipelineAggregator.Parser.GAP_POLICY.preferredName, gapPolicy.getName())
if (filter != null) {
if (filter.isCompositeAggregation) {
builder.startObject(BUCKET_SELECTOR_COMPOSITE_AGG_FILTER.preferredName)
.value(filter)
.endObject()
} else {
builder.startObject(BUCKET_SELECTOR_FILTER.preferredName)
.value(filter)
.endObject()
}
}
return builder
}

override fun overrideBucketsPath(): Boolean {
return true
}

override fun validate(context: ValidationContext) {
// Nothing to check
}

override fun hashCode(): Int {
return Objects.hash(super.hashCode(), bucketsPathsMap, script, gapPolicy)
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other == null || javaClass != other.javaClass) return false
if (!super.equals(other)) return false
val otherCast = other as BucketSelectorExtAggregationBuilder
return (bucketsPathsMap == otherCast.bucketsPathsMap &&
script == otherCast.script &&
gapPolicy == otherCast.gapPolicy)
}

override fun getWriteableName(): String {
return NAME.preferredName
}

companion object {
val NAME = ParseField("bucket_selector_ext")
val PARENT_BUCKET_PATH = ParseField("parent_bucket_path")

@Throws(IOException::class)
fun parse(reducerName: String, parser: XContentParser): BucketSelectorExtAggregationBuilder {
var token: XContentParser.Token
var script: Script? = null
var currentFieldName: String? = null
var bucketsPathsMap: MutableMap<String, String>? = null
var gapPolicy: GapPolicy? = null
var parentBucketPath: String? = null
var filter: BucketSelectorExtFilter? = null
while (parser.nextToken().also { token = it } !== XContentParser.Token.END_OBJECT) {
if (token === XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName()
} else if (token === XContentParser.Token.VALUE_STRING) {
when {
PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler) -> {
bucketsPathsMap = HashMap()
bucketsPathsMap["_value"] = parser.text()
}
PipelineAggregator.Parser.GAP_POLICY.match(currentFieldName, parser.deprecationHandler) -> {
gapPolicy = GapPolicy.parse(parser.text(), parser.tokenLocation)
}
Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.deprecationHandler) -> {
script = Script.parse(parser)
}
PARENT_BUCKET_PATH.match(currentFieldName, parser.deprecationHandler) -> {
parentBucketPath = parser.text()
}
else -> {
throw ParsingException(
parser.tokenLocation,
"Unknown key for a $token in [$reducerName]: [$currentFieldName]."
)
}
}
} else if (token === XContentParser.Token.START_ARRAY) {
if (PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler)) {
val paths: MutableList<String> = ArrayList()
while (parser.nextToken().also { token = it } !== XContentParser.Token.END_ARRAY) {
val path = parser.text()
paths.add(path)
}
bucketsPathsMap = HashMap()
for (i in paths.indices) {
bucketsPathsMap["_value$i"] = paths[i]
}
} else {
throw ParsingException(
parser.tokenLocation,
"Unknown key for a $token in [$reducerName]: [$currentFieldName]."
)
}
} else if (token === XContentParser.Token.START_OBJECT) {
when {
Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.deprecationHandler) -> {
script = Script.parse(parser)
}
PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler) -> {
val map = parser.map()
bucketsPathsMap = HashMap()
for ((key, value) in map) {
bucketsPathsMap[key] = value.toString()
}
}
BUCKET_SELECTOR_FILTER.match(currentFieldName, parser.deprecationHandler) -> {
filter = BucketSelectorExtFilter.parse(reducerName, false, parser)
}
BUCKET_SELECTOR_COMPOSITE_AGG_FILTER.match(
currentFieldName,
parser.deprecationHandler
) -> {
filter = BucketSelectorExtFilter.parse(reducerName, true, parser)
}
else -> {
throw ParsingException(
parser.tokenLocation,
"Unknown key for a $token in [$reducerName]: [$currentFieldName]."
)
}
}
} else {
throw ParsingException(parser.tokenLocation, "Unexpected token $token in [$reducerName].")
}
}
if (bucketsPathsMap == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PipelineAggregator.Parser.BUCKETS_PATH.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}
if (script == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + Script.SCRIPT_PARSE_FIELD.preferredName +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}

if (parentBucketPath == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PARENT_BUCKET_PATH +
"] for bucket_selector aggregation [" + reducerName + "]"
)
}
val factory = BucketSelectorExtAggregationBuilder(reducerName, bucketsPathsMap, script, parentBucketPath, filter)
if (gapPolicy != null) {
factory.gapPolicy(gapPolicy)
}
return factory
}
}
}
Loading

0 comments on commit 25194d6

Please sign in to comment.