Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require combine and reduce scripts in scripted metrics aggregation #33452

Merged
merged 19 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9c8e69c
Make text message not required in constructor for slack
albendz Jun 13, 2018
b6876b4
Remove unnecessary comments in test file
albendz Jun 13, 2018
51ff0a0
Merge remote-tracking branch 'upstream/master'
albendz Jul 3, 2018
5df7b98
Merge remote-tracking branch 'upstream/master'
albendz Jul 6, 2018
156abbf
Merge remote-tracking branch 'upstream/master'
albendz Sep 1, 2018
68fdddc
Merge remote-tracking branch 'upstream/master'
albendz Sep 5, 2018
63bf29a
Throw exception when reduce or combine is not provided; update tests
albendz Sep 5, 2018
f68326e
Update integration tests for scripted metrics to always include reduc…
albendz Sep 6, 2018
3da8fd5
Remove some old changes from previous branches
albendz Sep 6, 2018
78f7a2e
Merge remote-tracking branch 'upstream/master'
albendz Sep 11, 2018
00050da
Merge branch 'master' into require_scripts
albendz Sep 11, 2018
667557a
Rearrange script presence checks to be earlier in build
albendz Sep 12, 2018
1adb4ed
Change null check order in script builder for aggregated metrics; cor…
albendz Sep 16, 2018
8db9925
Merge remote-tracking branch 'upstream/master' into require_scripts
albendz Sep 16, 2018
b119cbd
Merge remote-tracking branch 'upstream/master'
albendz Sep 27, 2018
e451b84
Merge remote-tracking branch 'origin' into require_scripts
albendz Sep 27, 2018
120bab7
Add breaking change details to PR
albendz Oct 2, 2018
1495c24
Merge remote-tracking branch 'upstream/master'
albendz Oct 2, 2018
790e5fd
Merge branch 'master' into require_scripts
albendz Oct 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/reference/migration/migrate_7_0/aggregations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ has been removed. `missing_bucket` should be used instead.
The object used to share aggregation state between the scripts in a Scripted Metric
Aggregation is now a variable called `state` available in the script context, rather than
being provided via the `params` object as `params._agg`.

[float]
==== Make metric aggregation script parameters `reduce_script` and `combine_script` mandatory

The metric aggregation has been changed to require these two script parameters to ensure users are
explicitly defining how their data is processed.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ public Map<String, Object> params() {
protected ScriptedMetricAggregatorFactory doBuild(SearchContext context, AggregatorFactory<?> parent,
Builder subfactoriesBuilder) throws IOException {

if (combineScript == null) {
throw new IllegalArgumentException("[combineScript] must not be null: [" + name + "]");
}

if(reduceScript == null) {
throw new IllegalArgumentException("[reduceScript] must not be null: [" + name + "]");
}

QueryShardContext queryShardContext = context.getQueryShardContext();

// Extract params from scripts and pass them along to ScriptedMetricAggregatorFactory, since it won't have
Expand All @@ -215,16 +223,14 @@ protected ScriptedMetricAggregatorFactory doBuild(SearchContext context, Aggrega
ScriptedMetricAggContexts.MapScript.CONTEXT);
Map<String, Object> mapScriptParams = mapScript.getParams();


ScriptedMetricAggContexts.CombineScript.Factory compiledCombineScript;
Map<String, Object> combineScriptParams;
if (combineScript != null) {
compiledCombineScript = queryShardContext.getScriptService().compile(combineScript,
ScriptedMetricAggContexts.CombineScript.CONTEXT);
combineScriptParams = combineScript.getParams();
} else {
compiledCombineScript = (p, a) -> null;
combineScriptParams = Collections.emptyMap();
}

compiledCombineScript = queryShardContext.getScriptService().compile(combineScript,
ScriptedMetricAggContexts.CombineScript.CONTEXT);
combineScriptParams = combineScript.getParams();

return new ScriptedMetricAggregatorFactory(name, compiledMapScript, mapScriptParams, compiledInitScript,
initScriptParams, compiledCombineScript, combineScriptParams, reduceScript,
params, queryShardContext.lookup(), context, parent, subfactoriesBuilder, metaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
private static final Script MAP_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScript", Collections.emptyMap());
private static final Script COMBINE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScript",
Collections.emptyMap());
private static final Script REDUCE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "reduceScript",
Collections.emptyMap());

private static final Script INIT_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptScore",
Collections.emptyMap());
private static final Script MAP_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScriptScore",
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptScore",
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_NOOP = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptNoop",
Collections.emptyMap());

private static final Script INIT_SCRIPT_PARAMS = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptParams",
Collections.singletonMap("initialValue", 24));
Expand Down Expand Up @@ -96,6 +100,14 @@ public static void initMockScripts() {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return ((List<Integer>) state.get("collector")).stream().mapToInt(Integer::intValue).sum();
});
SCRIPTS.put("combineScriptNoop", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});
SCRIPTS.put("reduceScript", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});

SCRIPTS.put("initScriptScore", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
Expand Down Expand Up @@ -160,7 +172,7 @@ public void testNoDocs() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.mapScript(MAP_SCRIPT); // map script is mandatory, even if its not used in this case
aggregationBuilder.mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_NOOP).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -169,9 +181,6 @@ public void testNoDocs() throws IOException {
}
}

/**
* without combine script, the "states" map should contain a list of the size of the number of documents matched
*/
public void testScriptedMetricWithoutCombine() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
Expand All @@ -182,15 +191,28 @@ public void testScriptedMetricWithoutCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
@SuppressWarnings("unchecked")
Map<String, Object> agg = (Map<String, Object>) scriptedMetric.aggregation();
@SuppressWarnings("unchecked")
List<Integer> list = (List<Integer>) agg.get("collector");
assertEquals(numDocs, list.size());
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[combineScript] must not be null: [scriptedMetric]");
}
}
}

public void testScriptedMetricWithoutReduce() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
for (int i = 0; i < numDocs; i++) {
indexWriter.addDocument(singleton(new SortedNumericDocValuesField("number", i)));
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[reduceScript] must not be null: [scriptedMetric]");
}
}
}
Expand All @@ -208,7 +230,8 @@ public void testScriptedMetricWithCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -230,7 +253,8 @@ public void testScriptedMetricWithCombineAccessesScores() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE).combineScript(COMBINE_SCRIPT_SCORE);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE)
.combineScript(COMBINE_SCRIPT_SCORE).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -250,7 +274,8 @@ public void testScriptParamsPassedThrough() throws IOException {

try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);

// The result value depends on the script params.
Expand All @@ -270,8 +295,8 @@ public void testConflictingAggAndScriptParams() throws IOException {
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
Map<String, Object> aggParams = Collections.singletonMap(CONFLICTING_PARAM_NAME, "blah");
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).
combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -289,7 +314,8 @@ public void testSelfReferencingAggStateAfterInit() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -309,7 +335,8 @@ public void testSelfReferencingAggStateAfterMap() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -326,7 +353,8 @@ public void testSelfReferencingAggStateAfterCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_SELF_REF).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand Down
Loading