From 5640b979e61581b1abf1bbf4226c142e9f5d2730 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 15 Jan 2019 16:45:45 -0600 Subject: [PATCH 1/3] ML: Adding support for SingleBucketAggs --- .../extractor/aggregation/AggregationToJsonProcessor.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index c934653a6268e..b0d5368282076 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -13,6 +13,7 @@ import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; @@ -93,12 +94,15 @@ private void processAggs(long docCount, List aggregations) throws I List leafAggregations = new ArrayList<>(); List bucketAggregations = new ArrayList<>(); + List singleBucketAggregations = new ArrayList<>(); // Sort into leaf and bucket aggregations. // The leaf aggregations will be processed first. for (Aggregation agg : aggregations) { if (agg instanceof MultiBucketsAggregation) { bucketAggregations.add((MultiBucketsAggregation)agg); + } else if (agg instanceof SingleBucketAggregation){ + singleBucketAggregations.add((SingleBucketAggregation)agg); } else { leafAggregations.add(agg); } @@ -137,6 +141,10 @@ private void processAggs(long docCount, List aggregations) throws I } } } + noMoreBucketsToProcess = singleBucketAggregations.isEmpty() && noMoreBucketsToProcess; + for (SingleBucketAggregation singleBucketAggregation : singleBucketAggregations) { + processAggs(singleBucketAggregation.getDocCount(), asList(singleBucketAggregation.getAggregations())); + } // If there are no more bucket aggregations to process we've reached the end // and it's time to write the doc From e2380c5a63643c0fc8a1925f92cdd06cb4086bb3 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 16 Jan 2019 10:51:47 -0600 Subject: [PATCH 2/3] ML: Adding single bucket agg support --- .../ml/integration/DatafeedJobsRestIT.java | 38 +++++++++++++++++++ .../AggregationToJsonProcessor.java | 13 ++++++- 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 2e69702381bcf..b794fee311805 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -894,6 +894,44 @@ public void testLookbackWithoutPermissionsAndRollup() throws Exception { "action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\"")); } + public void testLookbackWithSingleBucketAgg() throws Exception { + String jobId = "aggs-date-histogram-with-single-bucket-agg-job"; + Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + createJobRequest.setJsonEntity("{\n" + + " \"description\": \"Aggs job\",\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"3600s\",\n" + + " \"summary_count_field_name\": \"doc_count\",\n" + + " \"detectors\": [\n" + + " {\n" + + " \"function\": \"mean\",\n" + + " \"field_name\": \"responsetime\"" + + " }\n" + + " ]\n" + + " },\n" + + " \"data_description\": {\"time_field\": \"time stamp\"}\n" + + "}"); + client().performRequest(createJobRequest); + + String datafeedId = "datafeed-" + jobId; + String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"}," + + "\"aggregations\":{" + + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}}," + + "\"airlineFilter\":{\"filter\":{\"term\": {\"airline\":\"AAA\"}}," + + " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); + Response jobStatsResponse = client().performRequest(new Request("GET", + MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); + String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity()); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + public void testRealtime() throws Exception { String jobId = "job-realtime-1"; createJob(jobId, "airline"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index b0d5368282076..d77b5d75aecf2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -108,7 +108,16 @@ private void processAggs(long docCount, List aggregations) throws I } } - if (bucketAggregations.size() > 1) { + // If on the current level (indicated via bucketAggregations) or on of the next levels (singleBucketAggregations) + // we have more than 1 `MultiBucketsAggregation`, we should error out. + // We need to make the check in this way as each of the items in `singleBucketAggregations` is treated as a separate branch + // in the recursive handling of this method. + int bucketAggLevelCount = Math.max(bucketAggregations.size(), (int)singleBucketAggregations.stream() + .flatMap(s -> asList(s.getAggregations()).stream()) + .filter(MultiBucketsAggregation.class::isInstance) + .count()); + + if (bucketAggLevelCount > 1) { throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported"); } @@ -142,6 +151,8 @@ private void processAggs(long docCount, List aggregations) throws I } } noMoreBucketsToProcess = singleBucketAggregations.isEmpty() && noMoreBucketsToProcess; + // we support more than one `SingleBucketAggregation` at each level, each agg needs to be handled + // recursively. for (SingleBucketAggregation singleBucketAggregation : singleBucketAggregations) { processAggs(singleBucketAggregation.getDocCount(), asList(singleBucketAggregation.getAggregations())); } From d7a95229da2d2fbbf45325694db507928928bafd Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 17 Jan 2019 14:14:16 -0600 Subject: [PATCH 3/3] Fixing minor bug in parsing aggs, adding tests --- docs/reference/ml/aggregations.asciidoc | 47 +++++++++++++++++++ .../AggregationToJsonProcessor.java | 26 ++++++++-- .../aggregation/AggregationTestUtils.java | 9 ++++ .../AggregationToJsonProcessorTests.java | 33 +++++++++++++ 4 files changed, 110 insertions(+), 5 deletions(-) diff --git a/docs/reference/ml/aggregations.asciidoc b/docs/reference/ml/aggregations.asciidoc index 3f09022d17eaa..a50016807a714 100644 --- a/docs/reference/ml/aggregations.asciidoc +++ b/docs/reference/ml/aggregations.asciidoc @@ -145,6 +145,53 @@ pipeline aggregation to find the first order derivative of the counter ---------------------------------- // NOTCONSOLE +{dfeeds-cap} not only supports multi-bucket aggregations, but also single bucket aggregations. +The following shows two `filter` aggregations, each gathering the number of unique entries for +the `error` field. + +[source,js] +---------------------------------- +{ + "job_id":"servers-unique-errors", + "indices": ["logs-*"], + "aggregations": { + "buckets": { + "date_histogram": { + "field": "time", + "interval": "360s", + "time_zone": "UTC" + }, + "aggregations": { + "time": { + "max": {"field": "time"} + } + "server1": { + "filter": {"term": {"source": "server-name-1"}}, + "aggregations": { + "server1_error_count": { + "value_count": { + "field": "error" + } + } + } + }, + "server2": { + "filter": {"term": {"source": "server-name-2"}}, + "aggregations": { + "server2_error_count": { + "value_count": { + "field": "error" + } + } + } + } + } + } + } +} +---------------------------------- +// NOTCONSOLE + When you define an aggregation in a {dfeed}, it must have the following form: [source,js] diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index d77b5d75aecf2..db8dea22675f2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Collectors; /** * Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation. @@ -102,13 +103,22 @@ private void processAggs(long docCount, List aggregations) throws I if (agg instanceof MultiBucketsAggregation) { bucketAggregations.add((MultiBucketsAggregation)agg); } else if (agg instanceof SingleBucketAggregation){ - singleBucketAggregations.add((SingleBucketAggregation)agg); + // Skip a level down for single bucket aggs, if they have a sub-agg that is not + // a bucketed agg we should treat it like a leaf in this bucket + SingleBucketAggregation singleBucketAggregation = (SingleBucketAggregation)agg; + for (Aggregation subAgg : singleBucketAggregation.getAggregations()) { + if (subAgg instanceof MultiBucketsAggregation || subAgg instanceof SingleBucketAggregation) { + singleBucketAggregations.add(singleBucketAggregation); + } else { + leafAggregations.add(subAgg); + } + } } else { leafAggregations.add(agg); } } - // If on the current level (indicated via bucketAggregations) or on of the next levels (singleBucketAggregations) + // If on the current level (indicated via bucketAggregations) or one of the next levels (singleBucketAggregations) // we have more than 1 `MultiBucketsAggregation`, we should error out. // We need to make the check in this way as each of the items in `singleBucketAggregations` is treated as a separate branch // in the recursive handling of this method. @@ -151,10 +161,16 @@ private void processAggs(long docCount, List aggregations) throws I } } noMoreBucketsToProcess = singleBucketAggregations.isEmpty() && noMoreBucketsToProcess; - // we support more than one `SingleBucketAggregation` at each level, each agg needs to be handled - // recursively. + // we support more than one `SingleBucketAggregation` at each level + // However, we only want to recurse with multi/single bucket aggs. + // Non-bucketed sub-aggregations were handle as leaf aggregations at this level for (SingleBucketAggregation singleBucketAggregation : singleBucketAggregations) { - processAggs(singleBucketAggregation.getDocCount(), asList(singleBucketAggregation.getAggregations())); + processAggs(singleBucketAggregation.getDocCount(), + asList(singleBucketAggregation.getAggregations()) + .stream() + .filter( + aggregation -> (aggregation instanceof MultiBucketsAggregation || aggregation instanceof SingleBucketAggregation)) + .collect(Collectors.toList())); } // If there are no more bucket aggregations to process we've reached the end diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java index 47d2eb828c6a4..38202eee0ff06 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java @@ -7,6 +7,7 @@ import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; @@ -37,6 +38,14 @@ static Histogram.Bucket createHistogramBucket(long timestamp, long docCount, Lis return bucket; } + static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, List subAggregations) { + SingleBucketAggregation singleBucketAggregation = mock(SingleBucketAggregation.class); + when(singleBucketAggregation.getName()).thenReturn(name); + when(singleBucketAggregation.getDocCount()).thenReturn(docCount); + when(singleBucketAggregation.getAggregations()).thenReturn(createAggs(subAggregations)); + return singleBucketAggregation; + } + static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) { return createHistogramBucket(timestamp, docCount, Collections.emptyList()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index bf283b5be519d..be79b461eeb18 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -31,6 +31,7 @@ import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleBucketAgg; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; import static org.hamcrest.Matchers.containsString; @@ -439,6 +440,38 @@ public void testBucketsBeforeStartArePruned() throws IOException { "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); } + public void testSingleBucketAgg() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), + createSingleBucketAgg("agg1", 3, Collections.singletonList(createMax("field1", 5.0))), + createSingleBucketAgg("agg2", 1, Collections.singletonList(createMax("field2", 3.0))))), + createHistogramBucket(2000L, 7, Arrays.asList( + createMax("time", 2000), + createSingleBucketAgg("agg2", 3, Collections.singletonList(createMax("field2", 1.0))), + createSingleBucketAgg("agg1", 4, Collections.singletonList(createMax("field1", 7.0)))))); + + String json = aggToString(Sets.newHashSet("field1", "field2"), histogramBuckets); + + assertThat(json, equalTo("{\"time\":1000,\"field1\":5.0,\"field2\":3.0,\"doc_count\":4}" + + " {\"time\":2000,\"field2\":1.0,\"field1\":7.0,\"doc_count\":7}")); + } + + public void testSingleBucketAgg_failureWithSubMultiBucket() throws IOException { + + List histogramBuckets = Collections.singletonList( + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), + createSingleBucketAgg("agg1", 3, + Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 5.0))), + createSingleBucketAgg("agg2", 1, + Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 3.0)))))); + + + expectThrows(IllegalArgumentException.class, + () -> aggToString(Sets.newHashSet("my_field"), histogramBuckets)); + } + private String aggToString(Set fields, Histogram.Bucket bucket) throws IOException { return aggToString(fields, Collections.singletonList(bucket)); }