Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scriptable Metrics Aggregation #7075

Merged
merged 1 commit into from
Aug 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference/search/aggregations/metrics.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ include::metrics/cardinality-aggregation.asciidoc[]
include::metrics/geobounds-aggregation.asciidoc[]

include::metrics/tophits-aggregation.asciidoc[]

include::metrics/scripted-metric-aggregation.asciidoc[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
[[search-aggregations-metrics-scripted-metric-aggregation]]
=== Scripted Metric Aggregation

added[1.4.0]

A metric aggregation that executes using scripts to provide a metric output.

.Experimental!
[IMPORTANT]
=====
This feature is marked as experimental, and may be subject to change in the
future. If you use this feature, please let us know your experience with it!
=====

Example:

[source,js]
--------------------------------------------------
{
"query" : {
"match_all" : {}
},
"aggs": {
"profit": {
"scripted_metric": {
"init_script" : "_agg['transactions'] = []",
"map_script" : "if (doc['type'].value == \"sale\") { _agg.transactions.add(doc['amount'].value) } else { _agg.transactions.add(-1 * doc['amount'].value) }", <1>
"combine_script" : "profit = 0; for (t in _agg.transactions) { profit += t }; return profit",
"reduce_script" : "profit = 0; for (a in _aggs) { profit += a }; return profit"
}
}
}
}
--------------------------------------------------

<1> `map_script` is the only required parameter

The above aggregation demonstrates how one would use the script aggregation compute the total profit from sale and cost transactions.

The response for the above aggregation:

[source,js]
--------------------------------------------------
{
...

"aggregations": {
"profit": {
"aggregation": 170
}
}
}
--------------------------------------------------

==== Scope of scripts

The scripted metric aggregation uses scripts at 4 stages of its execution:

init_script:: Executed prior to any collection of documents. Allows the aggregation to set up any initial state.
+
In the above example, the `init_script` creates an array `transactions` in the `_agg` object.

map_script:: Executed once per document collected. This is the only required script. If no combine_script is specified, the resulting state
needs to be stored in an object named `_agg`.
+
In the above example, the `map_script` checks the value of the type field. If the value if 'sale' the value of the amount field
is added to the transactions array. If the value of the type field is not 'sale' the negated value of the amount field is added
to transactions.

combine_script:: Executed once on each shard after document collection is complete. Allows the aggregation to consolidate the state returned from
each shard. If a combine_script is not provided the combine phase will return the aggregation variable.
+
In the above example, the `combine_script` iterates through all the stored transactions, summing the values in the `profit` variable
and finally returns `profit`.

reduce_script:: Executed once on the coordinating node after all shards have returned their results. The script is provided with access to a
variable `_aggs` which is an array of the result of the combine_script on each shard. If a reduce_script is not provided
the reduce phase will return the `_aggs` variable.
+
In the above example, the `reduce_script` iterates through the `profit` returned by each shard summing the values before returning the
final combined profit which will be returned in the response of the aggregation.

==== Worked Example

Imagine a situation where you index the following documents into and index with 2 shards:

[source,js]
--------------------------------------------------
$ curl -XPUT 'http://localhost:9200/transactions/stock/1' -d '{
{
"type": "sale"
"amount": 80
}

$ curl -XPUT 'http://localhost:9200/transactions/stock/2' -d '{
{
"type": "cost"
"amount": 10
}

$ curl -XPUT 'http://localhost:9200/transactions/stock/3' -d '{
{
"type": "cost"
"amount": 30
}

$ curl -XPUT 'http://localhost:9200/transactions/stock/4' -d '{
{
"type": "sale"
"amount": 130
}
--------------------------------------------------

Lets say that documents 1 and 3 end up on shard A and documents 2 and 4 end up on shard B. The following is a breakdown of what the aggregation result is
at each stage of the example above.

===== Before init_script

No params object was specified so the default params object is used:

[source,js]
--------------------------------------------------
"params" : {
"_agg" : {}
}
--------------------------------------------------

===== After init_script

This is run once on each shard before any document collection is performed, and so we will have a copy on each shard:

Shard A::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : []
}
}
--------------------------------------------------

Shard B::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : []
}
}
--------------------------------------------------

===== After map_script

Each shard collects its documents and runs the map_script on each document that is collected:

Shard A::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : [ 80, -30 ]
}
}
--------------------------------------------------

Shard B::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : [ -10, 130 ]
}
}
--------------------------------------------------

===== After combine_script

The combine_script is executed on each shard after document collection is complete and reduces all the transactions down to a single profit figure for each
shard (by summing the values in the transactions array) which is passed back to the coordinating node:

Shard A:: 50
Shard B:: 120

===== After reduce_script

The reduce_script receives an `_aggs` array containing the result of the combine script for each shard:

[source,js]
--------------------------------------------------
"_aggs" : [
50,
120
]
--------------------------------------------------

It reduces the responses for the shards down to a final overall profit figure (by summing the values) and returns this as the result of the aggregation to
produce the response:

[source,js]
--------------------------------------------------
{
...

"aggregations": {
"profit": {
"aggregation": 170
}
}
}
--------------------------------------------------

==== Other Parameters

[horizontal]
params:: Optional. An object whose contents will be passed as variables to the `init_script`, `map_script` and `combine_script`. This can be
useful to allow the user to control the behavior of the aggregation and for storing state between the scripts. If this is not specified,
the default is the equivalent of providing:
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {}
}
--------------------------------------------------
reduce_params:: Optional. An object whose contents will be passed as variables to the `reduce_script`. This can be useful to allow the user to control
the behavior of the reduce phase. If this is not specified the variable will be undefined in the reduce_script execution.
lang:: Optional. The script language used for the scripts. If this is not specified the default scripting language is used.
script_type:: Optional. The type of script provided. This can be `inline`, `file` or `indexed`. The default is `inline`.

Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationPhase;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhase;
Expand Down Expand Up @@ -887,7 +888,7 @@ private InternalAggregations reduceAggregations(List<PercolateShardResponse> sha
for (PercolateShardResponse shardResult : shardResults) {
aggregationsList.add(shardResult.aggregations());
}
return InternalAggregations.reduce(aggregationsList, bigArrays);
return InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@
import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.AvgBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityBuilder;
import org.elasticsearch.search.aggregations.metrics.geobounds.GeoBoundsBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxBuilder;
import org.elasticsearch.search.aggregations.metrics.min.MinBuilder;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesBuilder;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanksBuilder;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesBuilder;
import org.elasticsearch.search.aggregations.metrics.scripted.ScriptedMetricBuilder;
import org.elasticsearch.search.aggregations.metrics.stats.StatsBuilder;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsBuilder;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountBuilder;

/**
Expand Down Expand Up @@ -166,4 +167,8 @@ public static TopHitsBuilder topHits(String name) {
public static GeoBoundsBuilder geoBounds(String name) {
return new GeoBoundsBuilder(name);
}

public static ScriptedMetricBuilder scriptedMetric(String name) {
return new ScriptedMetricBuilder(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@
import org.elasticsearch.search.aggregations.bucket.range.ipv4.IpRangeParser;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsParser;
import org.elasticsearch.search.aggregations.bucket.terms.TermsParser;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
import org.elasticsearch.search.aggregations.metrics.avg.AvgParser;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityParser;
import org.elasticsearch.search.aggregations.metrics.geobounds.GeoBoundsParser;
import org.elasticsearch.search.aggregations.metrics.max.MaxParser;
import org.elasticsearch.search.aggregations.metrics.min.MinParser;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesParser;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanksParser;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesParser;
import org.elasticsearch.search.aggregations.metrics.scripted.ScriptedMetricParser;
import org.elasticsearch.search.aggregations.metrics.stats.StatsParser;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsParser;
import org.elasticsearch.search.aggregations.metrics.sum.SumParser;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;

import java.util.List;
Expand Down Expand Up @@ -88,6 +89,7 @@ public AggregationModule() {
parsers.add(ReverseNestedParser.class);
parsers.add(TopHitsParser.class);
parsers.add(GeoBoundsParser.class);
parsers.add(ScriptedMetricParser.class);
parsers.add(ChildrenParser.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.script.ScriptService;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -82,14 +83,16 @@ public String toString() {
}
}

protected static class ReduceContext {
public static class ReduceContext {

private final List<InternalAggregation> aggregations;
private final BigArrays bigArrays;
private ScriptService scriptService;

public ReduceContext(List<InternalAggregation> aggregations, BigArrays bigArrays) {
public ReduceContext(List<InternalAggregation> aggregations, BigArrays bigArrays, ScriptService scriptService) {
this.aggregations = aggregations;
this.bigArrays = bigArrays;
this.scriptService = scriptService;
}

public List<InternalAggregation> aggregations() {
Expand All @@ -99,6 +102,10 @@ public List<InternalAggregation> aggregations() {
public BigArrays bigArrays() {
return bigArrays;
}

public ScriptService scriptService() {
return scriptService;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;

import java.io.IOException;
import java.util.*;
Expand Down Expand Up @@ -112,7 +112,7 @@ public <A extends Aggregation> A get(String name) {
* @param aggregationsList A list of aggregation to reduce
* @return The reduced addAggregation
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, BigArrays bigArrays) {
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}
Expand All @@ -137,7 +137,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue();
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays)));
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context.scriptService())));
}
return new InternalAggregations(reducedAggregations);
}
Expand Down
Loading