Skip to content

Commit

Permalink
Allow bucket paths to specify _count within a bucket (#85720)
Browse files Browse the repository at this point in the history
Users should be able to specify specific metrics/keys within a specific bucket key. 

An example is `agg["bucket_foo"]._count`. 

This change now allows that.

closes: #76320
  • Loading branch information
benwtrent authored Apr 29, 2022
1 parent a2f7a82 commit c49b92e
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 40 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/85720.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85720
summary: Allow bucket paths to specify `_count` within a bucket
area: Aggregations
type: bug
issues: []
9 changes: 9 additions & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ PATH = <AGG_NAME><MULTIBUCKET_KEY>? (<AGG_SEPARATOR>, <AGG_NAME>
For example, the path `"my_bucket>my_stats.avg"` will path to the `avg` value in the `"my_stats"` metric, which is
contained in the `"my_bucket"` bucket aggregation.

Here are some more examples:
+
--
* `multi_bucket["foo"]>single_bucket>multi_metric.avg` will go to the `avg` metric in the `"multi_metric"` agg under the
single bucket `"single_bucket"` within the `"foo"` bucket of the `"multi_bucket"` multi-bucket aggregation.
* `agg1["foo"]._count` will get the `_count` metric for the `"foo"` bucket in the
multi-bucket aggregation `"multi_bucket"`
--

Paths are relative from the position of the pipeline aggregation; they are not absolute paths, and the path cannot go back "up" the
aggregation tree. For example, this derivative is embedded inside a date_histogram and refers to a "sibling"
metric `"the_sum"`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -89,7 +90,8 @@ public SortValue sortValue(AggregationPath.PathElement head, Iterator<Aggregatio
if (tail.hasNext()) {
return aggregation.sortValue(tail.next(), tail);
}
return aggregation.sortValue(head.key());
// We can sort by either the `[value]` or `.value`
return aggregation.sortValue(Optional.ofNullable(head.key()).orElse(head.metric()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* A path that can be used to sort/order buckets (in some multi-bucket aggregations, e.g. terms &amp; histogram) based on
Expand Down Expand Up @@ -50,6 +51,16 @@
* {@code agg1>agg2>agg3.avg} - where agg1 and agg2 are both single-bucket aggs and agg3 is a multi-value metrics agg (eg stats,
* extended_stats, etc...). In this case, the order will be based on the avg value of {@code agg3}.
* </li>
* <li>
* {@code agg1["foo"]>agg2>agg3.avg} - where agg1 is multi-bucket, and the path expects a bucket "foo",
* agg2 are both single-bucket aggs and agg3 is a multi-value metrics agg
* (eg stats, extended_stats, etc...).
* In this case, the order will be based on the avg value of {@code agg3}.
* </li>
* <li>
* {@code agg1["foo"]._count} - where agg1 is multi-bucket, and the path expects a bucket "foo".
* This would extract the doc_count for that specific bucket.
* </li>
* </ul>
*
*/
Expand All @@ -64,61 +75,80 @@ public static AggregationPath parse(String path) {
for (int i = 0; i < elements.length; i++) {
String element = elements[i];
if (i == elements.length - 1) {
int index = element.lastIndexOf('[');
if (index >= 0) {
if (index == 0 || index > element.length() - 3) {
int keyIndex = element.lastIndexOf('[');
int metricIndex = element.lastIndexOf('.');
if (keyIndex >= 0) {
if (keyIndex == 0 || keyIndex > element.length() - 3) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
if (element.charAt(element.length() - 1) != ']') {
int endKeyIndex = element.lastIndexOf(']');
if (endKeyIndex < keyIndex) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
tokens.add(new PathElement(element, element.substring(0, index), element.substring(index + 1, element.length() - 1)));
if (metricIndex < 0 && endKeyIndex != element.length() - 1) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
tokens.add(
new PathElement(
element,
element.substring(0, keyIndex),
element.substring(keyIndex + 1, endKeyIndex),
// Aggs and metrics can have `.` in the name, so only count as a metric in a bucket if after the brackets
metricIndex < endKeyIndex ? null : element.substring(metricIndex + 1)
)
);
continue;
}
index = element.lastIndexOf('.');
if (index < 0) {
tokens.add(new PathElement(element, element, null));
if (metricIndex < 0) {
tokens.add(new PathElement(element, element, null, null));
continue;
}
if (index == 0 || index > element.length() - 2) {
if (metricIndex == 0 || metricIndex > element.length() - 2) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
tuple = split(element, index, tuple);
tokens.add(new PathElement(element, tuple[0], tuple[1]));

tuple = split(element, metricIndex, tuple);
tokens.add(new PathElement(element, tuple[0], null, tuple[1]));
} else {
int index = element.lastIndexOf('[');
if (index >= 0) {
if (index == 0 || index > element.length() - 3) {
int keyIndex = element.lastIndexOf('[');
if (keyIndex >= 0) {
if (keyIndex == 0 || keyIndex > element.length() - 3) {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
if (element.charAt(element.length() - 1) != ']') {
throw new AggregationExecutionException("Invalid path element [" + element + "] in path [" + path + "]");
}
tokens.add(new PathElement(element, element.substring(0, index), element.substring(index + 1, element.length() - 1)));
tokens.add(
new PathElement(
element,
element.substring(0, keyIndex),
element.substring(keyIndex + 1, element.length() - 1),
null
)
);
continue;
}
tokens.add(new PathElement(element, element, null));
tokens.add(new PathElement(element, element, null, null));
}
}
return new AggregationPath(tokens);
}

public record PathElement(String fullName, String name, String key) {
public record PathElement(String fullName, String name, String key, String metric) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

PathElement token = (PathElement) o;
return Objects.equals(key, token.key) && Objects.equals(name, token.name);
return Objects.equals(key, token.key) && Objects.equals(name, token.name) && Objects.equals(metric, token.metric);
}

@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (metric != null ? metric.hashCode() : 0);
return result;
}

Expand Down Expand Up @@ -157,6 +187,9 @@ public List<String> getPathElementsAsStringList() {
if (pathElement.key != null) {
stringPathElements.add(pathElement.key);
}
if (pathElement.metric != null) {
stringPathElements.add(pathElement.metric);
}
}
return stringPathElements;
}
Expand Down Expand Up @@ -198,7 +231,7 @@ public Aggregator resolveTopmostAggregator(Aggregator root) {
}

public BucketComparator bucketComparator(Aggregator root, SortOrder order) {
return resolveAggregator(root).bucketComparator(lastPathElement().key, order);
return resolveAggregator(root).bucketComparator(Optional.ofNullable(lastPathElement().key).orElse(lastPathElement().metric), order);
}

private static String[] split(String toSplit, int index, String[] result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static org.hamcrest.Matchers.equalTo;

public class AggregationPathTests extends ESTestCase {
public void testInvalidPaths() throws Exception {
public void testInvalidPaths() {
assertInvalidPath("[foo]", "brackets at the beginning of the token expression");
assertInvalidPath("foo[bar", "open brackets without closing at the token expression");
assertInvalidPath("foo[", "open bracket at the end of the token expression");
Expand All @@ -27,18 +27,19 @@ public void testInvalidPaths() throws Exception {
assertInvalidPath("foo.", "dot separator at the end of the token expression");
}

public void testValidPaths() throws Exception {
public void testValidPaths() {
assertValidPath("foo[bar]._count", tokens().addKeyAndMetric("foo", "bar", "_count"));
assertValidPath("foo>bar", tokens().add("foo").add("bar"));
assertValidPath("foo.bar", tokens().add("foo", "bar"));
assertValidPath("foo[bar]", tokens().add("foo", "bar"));
assertValidPath("foo[bar]>baz", tokens().add("foo", "bar").add("baz"));
assertValidPath("foo[bar]>baz[qux]", tokens().add("foo", "bar").add("baz", "qux"));
assertValidPath("foo[bar]>baz.qux", tokens().add("foo", "bar").add("baz", "qux"));
assertValidPath("foo.bar>baz.qux", tokens().add("foo.bar").add("baz", "qux"));
assertValidPath("foo.bar>baz[qux]", tokens().add("foo.bar").add("baz", "qux"));
assertValidPath("foo.bar", tokens().addMetric("foo", "bar"));
assertValidPath("foo[bar]", tokens().addKey("foo", "bar"));
assertValidPath("foo[bar]>baz", tokens().addKey("foo", "bar").add("baz"));
assertValidPath("foo[bar]>baz[qux]", tokens().addKey("foo", "bar").addKey("baz", "qux"));
assertValidPath("foo[bar]>baz.qux", tokens().addKey("foo", "bar").addMetric("baz", "qux"));
assertValidPath("foo.bar>baz.qux", tokens().add("foo.bar").addMetric("baz", "qux"));
assertValidPath("foo.bar>baz[qux]", tokens().add("foo.bar").addKey("baz", "qux"));
}

private AggregationExecutionException assertInvalidPath(String path, String reason) {
private AggregationExecutionException assertInvalidPath(String path, String _unused) {
return expectThrows(AggregationExecutionException.class, () -> AggregationPath.parse(path));
}

Expand All @@ -58,24 +59,30 @@ private static Tokens tokens() {
}

private static class Tokens {
private List<AggregationPath.PathElement> tokens = new ArrayList<>();
private final List<AggregationPath.PathElement> tokens = new ArrayList<>();

Tokens add(String name) {
tokens.add(new AggregationPath.PathElement(name, name, null));
tokens.add(new AggregationPath.PathElement(name, name, null, null));
return this;
}

Tokens add(String name, String key) {
if (randomBoolean()) {
tokens.add(new AggregationPath.PathElement(name + "." + key, name, key));
} else {
tokens.add(new AggregationPath.PathElement(name + "[" + key + "]", name, key));
}
Tokens addKey(String name, String key) {
tokens.add(new AggregationPath.PathElement(name + "[" + key + "]", name, key, null));
return this;
}

Tokens addMetric(String name, String metric) {
tokens.add(new AggregationPath.PathElement(name + "." + metric, name, null, metric));
return this;
}

Tokens addKeyAndMetric(String name, String key, String metric) {
tokens.add(new AggregationPath.PathElement(name + "[" + key + "]." + metric, name, key, metric));
return this;
}

AggregationPath.PathElement[] toArray() {
return tokens.toArray(new AggregationPath.PathElement[tokens.size()]);
return tokens.toArray(new AggregationPath.PathElement[0]);
}
}
}

0 comments on commit c49b92e

Please sign in to comment.