Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Conversation

rishabhmaurya
Copy link

Description of changes:
BucketSelectorExt is an extension to BucketSelector pipeline aggregation. Some of the limitations with BucketSelector -

  1. Only one bucket selector can be applied on a parent multi-bucket aggregation. Since, BucketSelector retains the selected buckets and discards the others, so allowing multiple BucketSelector for an aggregation isn't the ideal behavior.
  2. Key filters are not supported. Script variables in BucketSelector only pertains to the numeric values and keys cannot be used a script variable to write selection expression.
  3. Doesn't works with composite aggregations: None of the pipeline aggregation works with composite aggregation as most of the pipeline aggregations works on entirety of the results, whereas, composite aggregations results could be paginated. Refer: Pipeline metrics aggregations do not recognize composite aggregations as multi-bucket elastic/elasticsearch#32692

With BucketSelectorExt we are trying to address above limitations.

  • For 1, each BucketSelectorExt will have its own output section displaying the index of the selected buckets from the parent multi-bucket aggregation instead of the actual bucket. Also, the parent aggregation will contain all buckets and BucketSelectorExt will not have any impact of its result.
  • For 2, there would be a optional filter field. Here one can pass an include/exclude filter which would works on the lines of term aggregation filtering supported by elasticsearch.
  • For 3, key filters should support passing the filter for each source of the composite aggregation. For this, we have introduced a new key, value object where key is the name of the source and value is the key filter for the corresponding source in composite aggregation. Refer examples below.

Parameters:
parent_bucket_path - this is to navigate to the right parent multi-bucket aggregation on which selector has to be applied. It supports nested aggregations but should comply with below constraint -
agg1>agg2>agg3 - where agg1 and agg2 are all single-bucket aggs. Whereas, agg3 i.e. the last aggregation in the hierarchy should be a multi-bucket aggregation on which bucket selector would be applicable.
buckets_path - this is same as existing BucketSelector buckets_path

script - this is same as existing BucketSelector script

filter - key filter condition. First keys are filtered and then the bucket selector scripts are executed on the filtered keys.
It containsinclude/exclude filter which works on the lines of term aggregation filtering supported by elasticsearch.

composite_agg_filter: key filter condition for composite aggregations. Refer to example below for usage.

Some usage examples -

  • Simple case -
"aggs": {
   "<parent_agg_name>": {
      "<child_multi_bucket_agg_name>": { "terms": {"field": "<fieldname>"}}},
      "aggs": {
         "<metric_agg_name>": { "stats": { "field": "<fieldname>" } }
      }
   },
   "<bucket_selector_name>": {
      "bucket_selector_ext": {
         "buckets_path": {
           "metric_value": "<metric_agg_name>.<metric_name>"
         },
         "script": {
           "source": "params.metric_value >= 10.0"
         },
         "parent_bucket_path": "<parent_agg_name>"
      }
   }
}
  • Multiple bucket selectors selectors
"aggs": {
   "<parent_agg_name>": {
      "<child_multi_bucket_agg_name>": { "terms": {"field": "<fieldname>"}}},
      "aggs": {
         "<metric_agg_name>": { "stats": { "field": "<fieldname>" } }
      }
   },
   "<bucket_selector_name_1>": {
      "bucket_selector_ext": {
         "buckets_path": {
           "metric_value": "<metric_agg_name>.<metric_name>"
         },
         "script": {
           "source": "params.metric_value >= 10.0"
         },
         "parent_bucket_path": "<parent_agg_name>"
      }
   },
   "<bucket_selector_name_2>": {
      "bucket_selector_ext": {
         "buckets_path": {
           "metric_value": "<metric_agg_name>.<metric_name>"
         },
         "script": {
           "source": "params.metric_value >= 10.0"
         },
         "parent_bucket_path": "<parent_agg_name>"
      }
   }
}
  • Key filters -
"aggs": {
   "<parent_agg_name>": {
      "<child_multi_bucket_agg_name>": { "terms": {"field": "<fieldname>"}}},
      "aggs": {
         "<metric_agg_name>": { "stats": { "field": "<fieldname>" } }
      }
   },
   "<bucket_selector_name>": {
      "bucket_selector_ext": {
         "buckets_path": {
           "metric_value": "<metric_agg_name>.<metric_name>"
         },
         "script": {
           "source": "params.metric_value >= 10.0"
         },
         "parent_bucket_path": "<parent_agg_name>",
         "filter": {
           "include": ["key1", "key2"]
         }
      }
   }
}

For regex, refer lucene regular expression

"aggs": {
   "<parent_agg_name>": {
      "<child_multi_bucket_agg_name>": { "terms": {"field": "<fieldname>"}}},
      "aggs": {
         "<metric_agg_name>": { "stats": { "field": "<fieldname>" } }
      }
   },
   "<bucket_selector_name_1>": {
      "bucket_selector_ext": {
         "buckets_path": {
           "metric_value": "<metric_agg_name>.<metric_name>"
         },
         "script": {
           "source": "params.metric_value >= 10.0"
         },
         "parent_bucket_path": "<parent_agg_name>",
         "filter": {
           "include": "key_prefix*"
         }
      }
   }
}
  • Composite aggregation -
"aggs": {
   "<parent_agg_name>": {
      "composite": {
         "sources": [
            {"<source_1>": { "terms": {"field": "<field_1>"}}},
            {"<source_2>": { "terms": {"field": "<field_2>" }}}  
         ]
      },      
      "aggs": {
         "<metric_agg_name>": { "stats": { "field": "<fieldname>" } }
      }
   },
   "<bucket_selector_name_1>": {
      "bucket_selector_ext": {
         "buckets_path": {
           "metric_value": "<metric_agg_name>.<metric_name>"
         },
         "script": {
           "source": "params.metric_value >= 10.0"
         },
         "parent_bucket_path": "<parent_agg_name>",
         "composite_agg_filter": {
            "<source_1>" : {
               "include": ["<include_key_1>"]
            },
            "<source_2>" : {
               "include": "@"
            }
         }
      }
   }
}

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
have the right to submit it under the open source license
indicated in the file; or

(b) The contribution is based upon previous work that, to the best
of my knowledge, is covered under an appropriate open source
license and I have the right under that license to submit that
work with modifications, whether created in whole or in part
by me, under the same open source license (unless I am
permitted to submit under a different license), as indicated
in the file; or

(c) The contribution was provided directly to me by some other
person who certified (a), (b) or (c) and I have not modified
it.

(d) I understand and agree that this project and the contribution
are public and that a record of the contribution (including all
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.

Copy link
Contributor

@qreshi qreshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments but the main logic looks good to me. You can merge this in and address the comments in the follow-up PRs if you prefer so you're not blocked on sending those out.

Comment on lines 56 to 60
val mapSize: Int = sin.readVInt()
bucketsPathsMap = java.util.HashMap(mapSize)
for (i in 0 until mapSize) {
bucketsPathsMap[sin.readString()] = sin.readString()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can alternatively be replaced with bucketsPathsMap = sin.readMap() as Map<String, String>

Comment on lines 73 to 77
out.writeVInt(bucketsPathsMap.size)
for ((key, value) in bucketsPathsMap) {
out.writeString(key)
out.writeString(value)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this can be replaced with out.writeMap(bucketsPathsMap as Map<String, String>)


@Throws(IOException::class)
public override fun internalXContent(builder: XContentBuilder, params: Params): XContentBuilder {
builder.field(PipelineAggregator.Parser.BUCKETS_PATH.preferredName, bucketsPathsMap as Map<String, Any>?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP: Builder calls can be chained to reduce text.

Ex.

builder.field()
    .field()
    .field()

private val PARENT_BUCKET_PATH = ParseField("parent_bucket_path")

@Throws(IOException::class)
fun parse(reducerName: String, parser: XContentParser): BucketSelectorExtAggregationBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clean this up a bit and be more consistent with our other parse functions, I think this can be simplified to assume that parse is being called on the start_object of bucket_select_ext. This way, we can fetch the field name and then the next token should be the contents of the field. Then within the single when, we can cover the different formats of the field being parsed.

Ex.

fun parse(reducerName: String, xcp: XContentParser): BucketSelectorExtAggregationBuilder {
    var bucketsPathsMap: MutableMap<String, String>? = null
    var gapPolicy: GapPolicy? = null
    var script: Script? = null
    var parentBucketPath: String? = null
    var filter: BucketSelectorExtFilter? = null

    ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
    while(xcp.nextToken() != Token.END_OBJECT) {
    	val fieldName = xcp.currentName()
        xcp.nextToken()

        when (fieldName) {
            PipelineAggregator.Parser.BUCKETS_PATH -> {
                if (xcp.currentToken == Token.START_OBJECT) {
                   ...
                } else if (xcp.current == Token.START_ARRAY) {
                   while (xcp.nextToken() != Token.END_ARRAY) {
                      ...
                   }
                } else {
                   ...
                }
            }
            PipelineAggregator.Parser.GAP_POLICY -> { ... }
            Script.SCRIPT_PARSE_FIELD -> { ... }
            PARENT_BUCKET_PATH -> { ... }
            else -> { ... }
        }
    }

    ...
}

constructor(sin: StreamInput) : super(sin.readString(), null, null) {
script = Script(sin)
gapPolicy = GapPolicy.readFrom(sin)
bucketsPathsMap = sin.readGenericValue() as Map<String, String>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can make this sin.readMap() to be a little more explicit

import java.util.function.Consumer
import java.util.function.Function

class BucketSelectorExtAggregatorTestsIT : AggregatorTestCase() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP: We can probably just call these BucketSelectorExtAggregatorTests (same for BucketSelectExtAggregationBuilderTestsIT) since they're more unit tests. The IT tests in this package typically extend ODFE/ESRestTestCase() and make the actual API calls on the test cluster.

@rishabhmaurya rishabhmaurya merged commit 3bc057d into opendistro-for-elasticsearch:doc-level-alerting-dev May 10, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants