Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

BucketSelector pipeline aggregation extension #374

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.action.SearchEmailAccountAction
import com.amazon.opendistroforelasticsearch.alerting.action.SearchEmailGroupAction
import com.amazon.opendistroforelasticsearch.alerting.action.SearchMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
Expand Down Expand Up @@ -89,12 +90,14 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.node.DiscoveryNodes
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.io.stream.NamedWriteableRegistry
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.settings.ClusterSettings
import org.elasticsearch.common.settings.IndexScopedSettings
import org.elasticsearch.common.settings.Setting
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.settings.SettingsFilter
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.env.Environment
import org.elasticsearch.env.NodeEnvironment
import org.elasticsearch.index.IndexModule
Expand All @@ -105,6 +108,7 @@ import org.elasticsearch.plugins.ActionPlugin
import org.elasticsearch.plugins.Plugin
import org.elasticsearch.plugins.ReloadablePlugin
import org.elasticsearch.plugins.ScriptPlugin
import org.elasticsearch.plugins.SearchPlugin
import org.elasticsearch.repositories.RepositoriesService
import org.elasticsearch.rest.RestController
import org.elasticsearch.rest.RestHandler
Expand All @@ -120,7 +124,7 @@ import java.util.function.Supplier
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY] to the
* [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, Plugin() {
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {
override fun getContextWhitelists(): Map<ScriptContext<*>, List<Whitelist>> {
val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "com.amazon.opendistroforelasticsearch.alerting.txt")
return mapOf(TriggerScript.CONTEXT to listOf(whitelist))
Expand Down Expand Up @@ -284,4 +288,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
override fun reload(settings: Settings) {
runner.reloadDestinationSettings(settings)
}

override fun getPipelineAggregations(): List<SearchPlugin.PipelineAggregationSpec> {
return listOf(
SearchPlugin.PipelineAggregationSpec(
BucketSelectorExtAggregationBuilder.NAME,
{ sin: StreamInput -> BucketSelectorExtAggregationBuilder(sin) },
{ parser: XContentParser, agg_name: String ->
BucketSelectorExtAggregationBuilder.parse(agg_name, parser)
}
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazon.opendistroforelasticsearch.alerting.aggregation.bucketselectorext

import com.amazon.opendistroforelasticsearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter.Companion.BUCKET_SELECTOR_COMPOSITE_AGG_FILTER
import com.amazon.opendistroforelasticsearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter.Companion.BUCKET_SELECTOR_FILTER
import org.elasticsearch.common.ParseField
import org.elasticsearch.common.ParsingException
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ToXContent.Params
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.script.Script
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator
import java.io.IOException
import java.util.Objects

class BucketSelectorExtAggregationBuilder :
AbstractPipelineAggregationBuilder<BucketSelectorExtAggregationBuilder> {
private val bucketsPathsMap: MutableMap<String, String>
private val parentBucketPath: String
val script: Script
val filter: BucketSelectorExtFilter?
private var gapPolicy = GapPolicy.SKIP

constructor(
name: String?,
bucketsPathsMap: MutableMap<String, String>,
script: Script,
parentBucketPath: String,
filter: BucketSelectorExtFilter?
) : super(name, NAME.preferredName, listOf<String>(parentBucketPath).toTypedArray<String>()) {
this.bucketsPathsMap = bucketsPathsMap
this.script = script
this.parentBucketPath = parentBucketPath
this.filter = filter
}

@Throws(IOException::class)
constructor(sin: StreamInput) : super(sin, NAME.preferredName) {
val mapSize: Int = sin.readVInt()
bucketsPathsMap = java.util.HashMap(mapSize)
for (i in 0 until mapSize) {
bucketsPathsMap[sin.readString()] = sin.readString()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can alternatively be replaced with bucketsPathsMap = sin.readMap() as Map<String, String>

script = Script(sin)
gapPolicy = GapPolicy.readFrom(sin)
parentBucketPath = sin.readString()
filter = if (sin.readBoolean()) {
BucketSelectorExtFilter(sin)
} else {
null
}
}

@Throws(IOException::class)
override fun doWriteTo(out: StreamOutput) {
out.writeVInt(bucketsPathsMap.size)
for ((key, value) in bucketsPathsMap) {
out.writeString(key)
out.writeString(value)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this can be replaced with out.writeMap(bucketsPathsMap as Map<String, String>)

script.writeTo(out)
gapPolicy.writeTo(out)
out.writeString(parentBucketPath)
if (filter != null) {
out.writeBoolean(true)
filter.writeTo(out)
} else {
out.writeBoolean(false)
}
}

/**
* Sets the gap policy to use for this aggregation.
*/
fun gapPolicy(gapPolicy: GapPolicy?): BucketSelectorExtAggregationBuilder {
requireNotNull(gapPolicy) { "[gapPolicy] must not be null: [$name]" }
this.gapPolicy = gapPolicy
return this
}

override fun createInternal(metaData: Map<String, Any>?): PipelineAggregator {
return BucketSelectorExtAggregator(name, bucketsPathsMap, parentBucketPath, script, gapPolicy, filter, metaData)
}

@Throws(IOException::class)
public override fun internalXContent(builder: XContentBuilder, params: Params): XContentBuilder {
builder.field(PipelineAggregator.Parser.BUCKETS_PATH.preferredName, bucketsPathsMap as Map<String, Any>?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP: Builder calls can be chained to reduce text.

Ex.

builder.field()
    .field()
    .field()

builder.field(PARENT_BUCKET_PATH.preferredName, parentBucketPath)
builder.field(Script.SCRIPT_PARSE_FIELD.preferredName, script)
builder.field(PipelineAggregator.Parser.GAP_POLICY.preferredName, gapPolicy.getName())
if (filter != null) {
if (filter.isCompositeAggregation) {
builder.startObject(BUCKET_SELECTOR_COMPOSITE_AGG_FILTER.preferredName)
builder.value(filter)
builder.endObject()
} else {
builder.startObject(BUCKET_SELECTOR_FILTER.preferredName)
builder.value(filter)
builder.endObject()
}
}
return builder
}

override fun overrideBucketsPath(): Boolean {
return true
}

override fun validate(context: ValidationContext) {
// Nothing to check
}

override fun hashCode(): Int {
return Objects.hash(super.hashCode(), bucketsPathsMap, script, gapPolicy)
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other == null || javaClass != other.javaClass) return false
if (!super.equals(other)) return false
val otherCast = other as BucketSelectorExtAggregationBuilder
return (bucketsPathsMap == otherCast.bucketsPathsMap
&& script == otherCast.script
&& gapPolicy == otherCast.gapPolicy)
}

override fun getWriteableName(): String {
return NAME.preferredName
}

companion object {
val NAME = ParseField("bucket_selector_ext")
private val PARENT_BUCKET_PATH = ParseField("parent_bucket_path")

@Throws(IOException::class)
fun parse(reducerName: String, parser: XContentParser): BucketSelectorExtAggregationBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clean this up a bit and be more consistent with our other parse functions, I think this can be simplified to assume that parse is being called on the start_object of bucket_select_ext. This way, we can fetch the field name and then the next token should be the contents of the field. Then within the single when, we can cover the different formats of the field being parsed.

Ex.

fun parse(reducerName: String, xcp: XContentParser): BucketSelectorExtAggregationBuilder {
    var bucketsPathsMap: MutableMap<String, String>? = null
    var gapPolicy: GapPolicy? = null
    var script: Script? = null
    var parentBucketPath: String? = null
    var filter: BucketSelectorExtFilter? = null

    ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
    while(xcp.nextToken() != Token.END_OBJECT) {
    	val fieldName = xcp.currentName()
        xcp.nextToken()

        when (fieldName) {
            PipelineAggregator.Parser.BUCKETS_PATH -> {
                if (xcp.currentToken == Token.START_OBJECT) {
                   ...
                } else if (xcp.current == Token.START_ARRAY) {
                   while (xcp.nextToken() != Token.END_ARRAY) {
                      ...
                   }
                } else {
                   ...
                }
            }
            PipelineAggregator.Parser.GAP_POLICY -> { ... }
            Script.SCRIPT_PARSE_FIELD -> { ... }
            PARENT_BUCKET_PATH -> { ... }
            else -> { ... }
        }
    }

    ...
}

var token: XContentParser.Token
var script: Script? = null
var currentFieldName: String? = null
var bucketsPathsMap: MutableMap<String, String>? = null
var gapPolicy: GapPolicy? = null
var parentBucketPath: String? = null
var filter: BucketSelectorExtFilter? = null
while (parser.nextToken().also { token = it } !== XContentParser.Token.END_OBJECT) {
if (token === XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName()
} else if (token === XContentParser.Token.VALUE_STRING) {
when {
PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler) -> {
bucketsPathsMap = HashMap()
bucketsPathsMap["_value"] = parser.text()
}
PipelineAggregator.Parser.GAP_POLICY.match(currentFieldName, parser.deprecationHandler) -> {
gapPolicy = GapPolicy.parse(parser.text(), parser.tokenLocation)
}
Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.deprecationHandler) -> {
script = Script.parse(parser)
}
PARENT_BUCKET_PATH.match(currentFieldName, parser.deprecationHandler) -> {
parentBucketPath = parser.text()
}
else -> {
throw ParsingException(
parser.tokenLocation,
"Unknown key for a $token in [$reducerName]: [$currentFieldName]."
)
}
}
} else if (token === XContentParser.Token.START_ARRAY) {
if (PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler)) {
val paths: MutableList<String> = ArrayList()
while (parser.nextToken().also { token = it } !== XContentParser.Token.END_ARRAY) {
val path = parser.text()
paths.add(path)
}
bucketsPathsMap = HashMap()
for (i in paths.indices) {
bucketsPathsMap["_value$i"] = paths[i]
}
} else {
throw ParsingException(
parser.tokenLocation,
"Unknown key for a $token in [$reducerName]: [$currentFieldName]."
)
}
} else if (token === XContentParser.Token.START_OBJECT) {
when {
Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.deprecationHandler) -> {
script = Script.parse(parser)
}
PipelineAggregator.Parser.BUCKETS_PATH.match(currentFieldName, parser.deprecationHandler) -> {
val map = parser.map()
bucketsPathsMap = HashMap()
for ((key, value) in map) {
bucketsPathsMap[key] = value.toString()
}
}
BUCKET_SELECTOR_FILTER.match(currentFieldName, parser.deprecationHandler) -> {
filter = BucketSelectorExtFilter.parse(reducerName, false, parser)
}
BUCKET_SELECTOR_COMPOSITE_AGG_FILTER.match(
currentFieldName,
parser.deprecationHandler
) -> {
filter = BucketSelectorExtFilter.parse(reducerName, true, parser)
}
else -> {
throw ParsingException(
parser.tokenLocation,
"Unknown key for a $token in [$reducerName]: [$currentFieldName]."
)
}
}
} else {
throw ParsingException(parser.tokenLocation, "Unexpected token $token in [$reducerName].")
}
}
if (bucketsPathsMap == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PipelineAggregator.Parser.BUCKETS_PATH.preferredName
+ "] for bucket_selector aggregation [" + reducerName + "]"
)
}
if (script == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + Script.SCRIPT_PARSE_FIELD.preferredName
+ "] for bucket_selector aggregation [" + reducerName + "]"
)
}

if (parentBucketPath == null) {
throw ParsingException(
parser.tokenLocation, "Missing required field [" + PARENT_BUCKET_PATH
+ "] for bucket_selector aggregation [" + reducerName + "]"
)
}
val factory = BucketSelectorExtAggregationBuilder(reducerName, bucketsPathsMap, script, parentBucketPath, filter)
if (gapPolicy != null) {
factory.gapPolicy(gapPolicy)
}
return factory
}
}

}
Loading