Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML: Add support for single bucket aggs in Datafeeds #37544

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/reference/ml/aggregations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,53 @@ pipeline aggregation to find the first order derivative of the counter
----------------------------------
// NOTCONSOLE

{dfeeds-cap} not only supports multi-bucket aggregations, but also single bucket aggregations.
The following shows two `filter` aggregations, each gathering the number of unique entries for
the `error` field.

[source,js]
----------------------------------
{
"job_id":"servers-unique-errors",
"indices": ["logs-*"],
"aggregations": {
"buckets": {
"date_histogram": {
"field": "time",
"interval": "360s",
"time_zone": "UTC"
},
"aggregations": {
"time": {
"max": {"field": "time"}
}
"server1": {
"filter": {"term": {"source": "server-name-1"}},
"aggregations": {
"server1_error_count": {
"value_count": {
"field": "error"
}
}
}
},
"server2": {
"filter": {"term": {"source": "server-name-2"}},
"aggregations": {
"server2_error_count": {
"value_count": {
"field": "error"
}
}
}
}
}
}
}
}
----------------------------------
// NOTCONSOLE

When you define an aggregation in a {dfeed}, it must have the following form:

[source,js]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,44 @@ public void testLookbackWithoutPermissionsAndRollup() throws Exception {
"action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\""));
}

public void testLookbackWithSingleBucketAgg() throws Exception {
String jobId = "aggs-date-histogram-with-single-bucket-agg-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"3600s\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\""
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);

String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"airlineFilter\":{\"filter\":{\"term\": {\"airline\":\"AAA\"}},"
+ " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}

public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId, "airline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
Expand All @@ -34,6 +35,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;

/**
* Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation.
Expand Down Expand Up @@ -93,18 +95,39 @@ private void processAggs(long docCount, List<Aggregation> aggregations) throws I

List<Aggregation> leafAggregations = new ArrayList<>();
List<MultiBucketsAggregation> bucketAggregations = new ArrayList<>();
List<SingleBucketAggregation> singleBucketAggregations = new ArrayList<>();

// Sort into leaf and bucket aggregations.
// The leaf aggregations will be processed first.
for (Aggregation agg : aggregations) {
if (agg instanceof MultiBucketsAggregation) {
bucketAggregations.add((MultiBucketsAggregation)agg);
} else if (agg instanceof SingleBucketAggregation){
// Skip a level down for single bucket aggs, if they have a sub-agg that is not
// a bucketed agg we should treat it like a leaf in this bucket
SingleBucketAggregation singleBucketAggregation = (SingleBucketAggregation)agg;
for (Aggregation subAgg : singleBucketAggregation.getAggregations()) {
if (subAgg instanceof MultiBucketsAggregation || subAgg instanceof SingleBucketAggregation) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the test should be subAgg instanceof HasAggregations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidkyle I wish it could be :( but MultiBucketsAggregation does not implement that interface. Its Buckets class does though :(. No way to get to Buckets without first casting, which requires an instanceof check anyways.

singleBucketAggregations.add(singleBucketAggregation);
} else {
leafAggregations.add(subAgg);
}
}
} else {
leafAggregations.add(agg);
}
}

if (bucketAggregations.size() > 1) {
// If on the current level (indicated via bucketAggregations) or one of the next levels (singleBucketAggregations)
// we have more than 1 `MultiBucketsAggregation`, we should error out.
// We need to make the check in this way as each of the items in `singleBucketAggregations` is treated as a separate branch
// in the recursive handling of this method.
int bucketAggLevelCount = Math.max(bucketAggregations.size(), (int)singleBucketAggregations.stream()
davidkyle marked this conversation as resolved.
Show resolved Hide resolved
.flatMap(s -> asList(s.getAggregations()).stream())
.filter(MultiBucketsAggregation.class::isInstance)
.count());

if (bucketAggLevelCount > 1) {
throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported");
}

Expand Down Expand Up @@ -137,6 +160,18 @@ private void processAggs(long docCount, List<Aggregation> aggregations) throws I
}
}
}
noMoreBucketsToProcess = singleBucketAggregations.isEmpty() && noMoreBucketsToProcess;
// we support more than one `SingleBucketAggregation` at each level
// However, we only want to recurse with multi/single bucket aggs.
// Non-bucketed sub-aggregations were handle as leaf aggregations at this level
for (SingleBucketAggregation singleBucketAggregation : singleBucketAggregations) {
processAggs(singleBucketAggregation.getDocCount(),
asList(singleBucketAggregation.getAggregations())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: don't need the asList

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do, processAggs takes a long, List<Aggregation> where as getAggregations() returns an Aggregations type.

.stream()
.filter(
aggregation -> (aggregation instanceof MultiBucketsAggregation || aggregation instanceof SingleBucketAggregation))
.collect(Collectors.toList()));
}

// If there are no more bucket aggregations to process we've reached the end
// and it's time to write the doc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
Expand Down Expand Up @@ -37,6 +38,14 @@ static Histogram.Bucket createHistogramBucket(long timestamp, long docCount, Lis
return bucket;
}

static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, List<Aggregation> subAggregations) {
SingleBucketAggregation singleBucketAggregation = mock(SingleBucketAggregation.class);
when(singleBucketAggregation.getName()).thenReturn(name);
when(singleBucketAggregation.getDocCount()).thenReturn(docCount);
when(singleBucketAggregation.getAggregations()).thenReturn(createAggs(subAggregations));
return singleBucketAggregation;
}

static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) {
return createHistogramBucket(timestamp, docCount, Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleBucketAgg;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -439,6 +440,38 @@ public void testBucketsBeforeStartArePruned() throws IOException {
"{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
}

public void testSingleBucketAgg() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(
createMax("time", 1000),
createSingleBucketAgg("agg1", 3, Collections.singletonList(createMax("field1", 5.0))),
createSingleBucketAgg("agg2", 1, Collections.singletonList(createMax("field2", 3.0))))),
createHistogramBucket(2000L, 7, Arrays.asList(
createMax("time", 2000),
createSingleBucketAgg("agg2", 3, Collections.singletonList(createMax("field2", 1.0))),
createSingleBucketAgg("agg1", 4, Collections.singletonList(createMax("field1", 7.0))))));

String json = aggToString(Sets.newHashSet("field1", "field2"), histogramBuckets);

assertThat(json, equalTo("{\"time\":1000,\"field1\":5.0,\"field2\":3.0,\"doc_count\":4}" +
" {\"time\":2000,\"field2\":1.0,\"field1\":7.0,\"doc_count\":7}"));
}

public void testSingleBucketAgg_failureWithSubMultiBucket() throws IOException {

List<Histogram.Bucket> histogramBuckets = Collections.singletonList(
createHistogramBucket(1000L, 4, Arrays.asList(
createMax("time", 1000),
createSingleBucketAgg("agg1", 3,
Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 5.0))),
createSingleBucketAgg("agg2", 1,
Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 3.0))))));


expectThrows(IllegalArgumentException.class,
() -> aggToString(Sets.newHashSet("my_field"), histogramBuckets));
}

private String aggToString(Set<String> fields, Histogram.Bucket bucket) throws IOException {
return aggToString(fields, Collections.singletonList(bucket));
}
Expand Down