Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] adding support for composite aggs in anomaly detection #69970

Merged
merged 27 commits into from
Mar 30, 2021

Conversation

benwtrent
Copy link
Member

@benwtrent benwtrent commented Mar 4, 2021

This commit allows for composite aggregations in datafeeds.

Composite aggs provide a much better solution for having influencers, partitions, etc. on high volume data. Instead of worrying about long scrolls in the datafeed, the calculation is distributed across cluster via the aggregations.

The restrictions for this support are as follows:

  • The composite aggregation must have EXACTLY one date_histogram source
  • The sub-aggs of the composite aggregation must have a max aggregation on the SAME timefield as the aforementioned date_histogram source
  • The composite agg must be the ONLY top level agg and it cannot have a composite or date_histogram sub-agg
  • If using a date_histogram to bucket time, it cannot have a composite sub-agg.
  • The top-level composite agg cannot have a sibling pipeline agg. Pipeline aggregations are supported as a sub-agg (thus a pipeline agg INSIDE the bucket).

Some key user interaction differences:

  • Speed + resources used by the cluster should be controlled by the size parameter in the composite aggregation. Previously, we said if you are using aggs, use a specific chunking_config. But, with composite, that is not necessary.
  • Users really shouldn't use nested terms aggs anylonger. While this is still a "valid" configuration and MAY be desirable for some users (only wanting the top 10 of certain terms), typically when users want influencers, partition fields, etc. they want the ENTIRE population. Previously, this really wasn't possible with aggs, with composite it is.
  • I cannot really think of a typical usecase that SHOULD ever use a multi-bucket aggregation that is NOT supported by composite.

Example

here is a job that was traditionally restricted to using a scroll datafeed but now can be completed with a composite agg:
Job:

{
  "custom_settings": {
    "created_by": "ml-module-sample",
    "custom_urls": [
      {
        "url_name": "Raw data",
        "url_value": """discover#/?_g=(time:(from:'$earliest$',mode:absolute,to:'$latest$'))&_a=(index:'90943e30-9a47-11e8-b64d-95841ca0b247',query:(language:kuery,query:'response.keyword:"$response.keyword$"'),sort:!('@timestamp',desc))"""
      },
      {
        "url_name": "Data dashboard",
        "url_value": "dashboards#/view/edf84fe0-e1a0-11e7-b6d5-4dc382ef7f5b?_g=(filters:!(),time:(from:'$earliest$',mode:absolute,to:'$latest$'))&_a=(filters:!(('$state':(store:appState),meta:(alias:!n,disabled:!f,index:'90943e30-9a47-11e8-b64d-95841ca0b247',key:response.keyword,negate:!f,params:(query:'$response.keyword$'),type:phrase,value:'$response.keyword$'),query:(match:(response.keyword:(query:'$response.keyword$',type:phrase))))),query:(language:kuery,query:''))"
      }
    ]
  },
  "analysis_config": {
    "bucket_span": "1h",
    "detectors": [
      {
        "detector_description": "Event rate by response code",
        "function": "count",
        "partition_field_name": "response.keyword",
        "detector_index": 0
      }
    ],
    "influencers": [
      "clientip",
      "response.keyword"
    ],
    "summary_count_field_name": "doc_count"
  },
  "analysis_limits": {
    "model_memory_limit": "11mb"
  },
  "model_plot_config": {
    "enabled": false,
    "annotations_enabled": true
  },
  "data_description": {
    "time_field": "timestamp",
    "time_format": "epoch_ms"
  },
 "results_index_name": "custom-simple-response_code_rates"
}

datafeed:

{
    "job_id": "agged-response_code_rates",
    "query": {
      "bool": {
        "filter": [
          {
            "term": {
              "event.dataset": "sample_web_logs"
            }
          }
        ]
      }
    },
    "indices": [
      "kibana_sample_data_logs"
    ],
    "delayed_data_check_config": {
      "enabled": true
    },
    "aggregations": {
      "buckets": {
        "composite": {
          "size": 10000,
          "sources": [
            {
              "time": {
                "date_histogram": {
                  "field": "timestamp",
                  "fixed_interval": "15m"
                }
              }
            },
            {
              "response.keyword": {
                "terms": {
                  "field": "response.keyword"
                }
              }
            },
            {
              "clientip": {
                "terms": {
                  "field": "clientip"
                }
              }
            }
          ]
        },
        "aggs": {
          "timestamp": {
            "max": {
              "field": "timestamp"
            }
          }
        }
      }
    }
  }

All the links, results, etc. are the same as if this was done via a regular scroll.

Even the UI works as the internal aggregations are simple enough (just a count). This is not always the case, as with aggregations now, they are so flexible that the UI can have issues recreating the underlying visuals. But, in this particular case, it works fine because there is no internal term aggregation (just in the composite definition).

image

image

The UI should probably validate the composite agg definition to make sure it is only date_histogram and term sources. But, support for filtering by the other composite sources shouldn't be too difficult in time. Kibana issue

@elasticmachine elasticmachine added the Team:ML Meta label for the ML team label Mar 4, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

TIP: If you use a terms aggregation and the cardinality of a term is high, the
aggregation might not be effective and you might want to just use the default
search and scroll behavior.
TIP: If you use a terms aggregation and the cardinality of a term is high, you
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szabosteve Mind reviewing all these doc updates? Its pretty encompassing.

Copy link
Contributor

@szabosteve szabosteve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for amending the docs! I left a couple of minor comments.

TIP: If you use a terms aggregation and the cardinality of a term is high, the
aggregation might not be effective and you might want to just use the default
search and scroll behavior.
TIP: If you use a terms aggregation and the cardinality of a term is high, you
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TIP: If you use a terms aggregation and the cardinality of a term is high, you
TIP: If you use a terms aggregation and the cardinality of a term is high,

aggregation might not be effective and you might want to just use the default
search and scroll behavior.
TIP: If you use a terms aggregation and the cardinality of a term is high, you
should use composite aggregations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
should use composite aggregations.
use composite aggregations instead.

of each bucket is the time of the last record in the bucket.

For `composite` aggregation support, there must be exactly one `date_histogram` value
source. Additionally, that value source must NOT be sorted in descending order. Additional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
source. Additionally, that value source must NOT be sorted in descending order. Additional
source. That value source must not be sorted in descending order. Additional

`responsetime`.

Your {dfeed} can contain multiple aggregations, but only the ones with names
TIP: If you are utilizing a `term` aggregation to gather influencer or partition
detector field information, consider using a `composite` aggregation. It will perform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
detector field information, consider using a `composite` aggregation. It will perform
detector field information, consider using a `composite` aggregation. It performs

Your {dfeed} can contain multiple aggregations, but only the ones with names
TIP: If you are utilizing a `term` aggregation to gather influencer or partition
detector field information, consider using a `composite` aggregation. It will perform
better than a `date_histogram` with a nested `term` aggregation and will also include
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
better than a `date_histogram` with a nested `term` aggregation and will also include
better than a `date_histogram` with a nested `term` aggregation and also includes

aggregation `term` source with the name `airline` works. Note its name
is the same as the field.
<4> The required `max` aggregation whose name is the time field in the
job analysis config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
job analysis config
job analysis config.


When using a `date_histogram` aggregation to bucket by time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When using a `date_histogram` aggregation to bucket by time
When using a `date_histogram` aggregation to bucket by time:

@@ -282,26 +379,64 @@ When you define an aggregation in a {dfeed}, it must have the following form:
----------------------------------
// NOTCONSOLE

When using a `composite` aggregation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When using a `composite` aggregation
When using a `composite` aggregation:

aggregation. For more information, see
{ref}/search-aggregations-bucket-datehistogram-aggregation.html[Date histogram aggregation].
sub-aggregation that is a `date_histogram`, the top level aggregation is the
required `date_histogram`, or the top leve aggregation is the required `composite`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
required `date_histogram`, or the top leve aggregation is the required `composite`.
required `date_histogram`, or the top level aggregation is the required `composite`.


You can optionally specify a terms aggregation, which creates buckets for
different values of a field.

TIP: Instead of nesting a `term` aggregation, try using `composite` aggs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TIP: Instead of nesting a `term` aggregation, try using `composite` aggs.
TIP: Instead of nesting a `term` aggregation, use `composite` aggs.

@benwtrent benwtrent requested a review from szabosteve March 4, 2021 15:59
Copy link
Contributor

@szabosteve szabosteve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs LGTM! 👍

@dimitris-athanasiou
Copy link
Contributor

The datafeed config does not seem to match the job config in the description.

@benwtrent
Copy link
Member Author

@dimitris-athanasiou updated :). I copied the wrong one from my kibana console!

Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very very cool! Just a few minor things.

@benwtrent
Copy link
Member Author

OK, doing some additional testing, we are running into an issue with requiring composite aggs to use the max timestamp method for keeping buckets sorted. When the parent agg was ONLY a date_histogram this is not really an issue. But composite aggs order buckets like this.

[date_histogram, terms1, terms2....]

It is possible that bucket [date_histogram_bucket_1, terms_a, ...] has an EARLIER max timestamp than [date_histogram_bucket_1, terms_b, ....]. This means the timestamped data is technically sent out of order to the native process.

This can result in many THOUSANDS of out of order records and cause processing to slow to a crawl as the model has to constantly reorder buckets as they come in.

This needs to be thought about further before merging this PR.

How do we handle bucket ordering in composite aggs while not risking the model seeing the data twice?

Iterator<Map.Entry<Long, List<Map<String, Object>>>> iterator = docsByBucketTimestamp.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, List<Map<String, Object>>> entry = iterator.next();
if (shouldCancel.test(entry.getKey())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I thought about this and the discrepancy with the other version of writeDocs. I believe we can just keep the new version and use it where we used the old version as well.

Here is a bit of the story here.

The first implementation of allowing datafeeds with aggs to cancel added the idea of looking at how many key-value pairs we wrote. At that point, the implementation was not checking if we wrote the entire bucket. Thus, it was possible to stop the datafeed half through the bucket. This was fixed later, when the current implementation was added.

The current implementation collects data in buckets. We then write whole buckets and only check to see if we should cancel after a whole bucket was written. We still look for whether we reached the key-value pair batch size of 1000. But I think this is completely unnecessary now.

The point of checking key-value pairs was to create a good moment to check whether we should cancel without having to check after each record. But checking after each histogram bucket is good enough. I hope this makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having said that, the other benefit of splitting is to contain the size of the output stream. But I guess this might be unnecessary as it is bound by the size of the search response, right? If the search response already fits in memory, then the output stream we create shouldn't be a problem.

@benwtrent
Copy link
Member Author

run elasticsearch-ci/2

Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Just a couple of minor comments and we're set to go.

return (DateHistogramValuesSourceBuilder)valuesSourceBuilder;
}
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that every caller of this assumes it doesn't return null. I think this is because at this point we have checked there is a histogram aggregation specified. I wonder if we should just throw an IllegalStateException here instead of returning null and remove the assertions in callers.

Collections.singletonList(indexName))
.setParsedAggregations(aggs)
.setFrequency(TimeValue.timeValueHours(1))
.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting manual chunking of 1 hour and later update it to auto? It might be something worth capturing in a comment in the test.

.execute(ActionListener.wrap(
_unused -> listener.onResponse(finished),
ex -> {
logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a warn? error seems like some sort of system failure and failing to refresh the job is not a horrific thing in this scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warn is even more appropriate indeed

Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@benwtrent benwtrent merged commit c8415a7 into elastic:master Mar 30, 2021
@benwtrent benwtrent deleted the feature/ml-dafafeed-composite-aggs branch March 30, 2021 12:25
benwtrent added a commit to benwtrent/elasticsearch that referenced this pull request Mar 30, 2021
…69970)

This commit allows for composite aggregations in datafeeds.

Composite aggs provide a much better solution for having influencers, partitions, etc. on high volume data. Instead of worrying about long scrolls in the datafeed, the calculation is distributed across cluster via the aggregations.

The restrictions for this support are as follows:

- The composite aggregation must have EXACTLY one `date_histogram` source
- The sub-aggs of the composite aggregation must have a `max` aggregation on the SAME timefield as the aforementioned `date_histogram` source
- The composite agg must be the ONLY top level agg and it cannot have a `composite` or `date_histogram` sub-agg
- If using a `date_histogram` to bucket time, it cannot have a `composite` sub-agg.
- The top-level `composite` agg cannot have a sibling pipeline agg. Pipeline aggregations are supported as a sub-agg (thus a pipeline agg INSIDE the bucket).

Some key user interaction differences:
- Speed + resources used by the cluster should be controlled by the `size` parameter in the `composite` aggregation. Previously, we said if you are using aggs, use a specific `chunking_config`. But, with composite, that is not necessary.
- Users really shouldn't use nested `terms` aggs anylonger. While this is still a "valid" configuration and MAY be desirable for some users (only wanting the top 10 of certain terms), typically when users want influencers, partition fields, etc. they want the ENTIRE population. Previously, this really wasn't possible with aggs, with `composite` it is.
- I cannot really think of a typical usecase that SHOULD ever use a multi-bucket aggregation that is NOT supported by composite.
benwtrent added a commit that referenced this pull request Mar 30, 2021
…9970) (#71052)

* [ML] adding support for composite aggs in anomaly detection (#69970)

This commit allows for composite aggregations in datafeeds.

Composite aggs provide a much better solution for having influencers, partitions, etc. on high volume data. Instead of worrying about long scrolls in the datafeed, the calculation is distributed across cluster via the aggregations.

The restrictions for this support are as follows:

- The composite aggregation must have EXACTLY one `date_histogram` source
- The sub-aggs of the composite aggregation must have a `max` aggregation on the SAME timefield as the aforementioned `date_histogram` source
- The composite agg must be the ONLY top level agg and it cannot have a `composite` or `date_histogram` sub-agg
- If using a `date_histogram` to bucket time, it cannot have a `composite` sub-agg.
- The top-level `composite` agg cannot have a sibling pipeline agg. Pipeline aggregations are supported as a sub-agg (thus a pipeline agg INSIDE the bucket).

Some key user interaction differences:
- Speed + resources used by the cluster should be controlled by the `size` parameter in the `composite` aggregation. Previously, we said if you are using aggs, use a specific `chunking_config`. But, with composite, that is not necessary.
- Users really shouldn't use nested `terms` aggs anylonger. While this is still a "valid" configuration and MAY be desirable for some users (only wanting the top 10 of certain terms), typically when users want influencers, partition fields, etc. they want the ENTIRE population. Previously, this really wasn't possible with aggs, with `composite` it is.
- I cannot really think of a typical usecase that SHOULD ever use a multi-bucket aggregation that is NOT supported by composite.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>enhancement :ml Machine learning Team:ML Meta label for the ML team v7.13.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants