Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require combine and reduce scripts in scripted metrics aggregation #33452

Merged
merged 19 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9c8e69c
Make text message not required in constructor for slack
albendz Jun 13, 2018
b6876b4
Remove unnecessary comments in test file
albendz Jun 13, 2018
51ff0a0
Merge remote-tracking branch 'upstream/master'
albendz Jul 3, 2018
5df7b98
Merge remote-tracking branch 'upstream/master'
albendz Jul 6, 2018
156abbf
Merge remote-tracking branch 'upstream/master'
albendz Sep 1, 2018
68fdddc
Merge remote-tracking branch 'upstream/master'
albendz Sep 5, 2018
63bf29a
Throw exception when reduce or combine is not provided; update tests
albendz Sep 5, 2018
f68326e
Update integration tests for scripted metrics to always include reduc…
albendz Sep 6, 2018
3da8fd5
Remove some old changes from previous branches
albendz Sep 6, 2018
78f7a2e
Merge remote-tracking branch 'upstream/master'
albendz Sep 11, 2018
00050da
Merge branch 'master' into require_scripts
albendz Sep 11, 2018
667557a
Rearrange script presence checks to be earlier in build
albendz Sep 12, 2018
1adb4ed
Change null check order in script builder for aggregated metrics; cor…
albendz Sep 16, 2018
8db9925
Merge remote-tracking branch 'upstream/master' into require_scripts
albendz Sep 16, 2018
b119cbd
Merge remote-tracking branch 'upstream/master'
albendz Sep 27, 2018
e451b84
Merge remote-tracking branch 'origin' into require_scripts
albendz Sep 27, 2018
120bab7
Add breaking change details to PR
albendz Oct 2, 2018
1495c24
Merge remote-tracking branch 'upstream/master'
albendz Oct 2, 2018
790e5fd
Merge branch 'master' into require_scripts
albendz Oct 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,11 @@ protected ScriptedMetricAggregatorFactory doBuild(SearchContext context, Aggrega
ScriptedMetricAggContexts.CombineScript.CONTEXT);
combineScriptParams = combineScript.getParams();
} else {
compiledCombineScript = (p, a) -> null;
combineScriptParams = Collections.emptyMap();
throw new IllegalArgumentException("[combineScript] must not be null: [" + name + "]");
}

if(reduceScript == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do these null checks at the start of the method before we compile any scripts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

throw new IllegalArgumentException("[reduceScript] must not be null: [" + name + "]");
}
return new ScriptedMetricAggregatorFactory(name, compiledMapScript, mapScriptParams, compiledInitScript,
initScriptParams, compiledCombineScript, combineScriptParams, reduceScript,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,14 @@ protected Path nodeConfigPath(int nodeOrdinal) {

public void testMap() {
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state['count'] = 1", Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states (lists) values as a new aggregation", Collections.emptyMap());

SearchResponse response = client().prepareSearch("idx")
.setQuery(matchAllQuery())
.addAggregation(scriptedMetric("scripted").mapScript(mapScript))
.addAggregation(scriptedMetric("scripted").mapScript(mapScript).combineScript(combineScript).reduceScript(reduceScript))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
Expand Down Expand Up @@ -370,10 +374,18 @@ public void testMapWithParams() {
Map<String, Object> aggregationParams = Collections.singletonMap("param2", 1);

Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state[param1] = param2", scriptParams);
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states (lists) values as a new aggregation", Collections.emptyMap());

SearchResponse response = client().prepareSearch("idx")
.setQuery(matchAllQuery())
.addAggregation(scriptedMetric("scripted").params(aggregationParams).mapScript(mapScript))
.addAggregation(scriptedMetric("scripted")
.params(aggregationParams)
.mapScript(mapScript)
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
Expand Down Expand Up @@ -424,7 +436,11 @@ public void testInitMapWithParams() {
.initScript(
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap()))
.mapScript(new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"state.list.add(vars.multiplier)", Collections.emptyMap())))
"state.list.add(vars.multiplier)", Collections.emptyMap()))
.combineScript(new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum state values as a new aggregation", Collections.emptyMap()))
.reduceScript(new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states (lists) values as a new aggregation", Collections.emptyMap())))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
Expand Down Expand Up @@ -467,6 +483,8 @@ public void testMapCombineWithParams() {
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(1)", Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states (lists) values as a new aggregation", Collections.emptyMap());

SearchResponse response = client()
.prepareSearch("idx")
Expand All @@ -475,7 +493,8 @@ public void testMapCombineWithParams() {
scriptedMetric("scripted")
.params(params)
.mapScript(mapScript)
.combineScript(combineScript))
.combineScript(combineScript)
.reduceScript(reduceScript))
.execute().actionGet();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
Expand Down Expand Up @@ -520,6 +539,8 @@ public void testInitMapCombineWithParams() {
Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states (lists) values as a new aggregation", Collections.emptyMap());

SearchResponse response = client()
.prepareSearch("idx")
Expand All @@ -529,7 +550,8 @@ public void testInitMapCombineWithParams() {
.params(params)
.initScript(initScript)
.mapScript(mapScript)
.combineScript(combineScript))
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
Expand Down Expand Up @@ -714,6 +736,8 @@ public void testInitMapReduceWithParams() {
Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states' state.list values as a new aggregation", Collections.emptyMap());

Expand All @@ -725,6 +749,7 @@ public void testInitMapReduceWithParams() {
.params(params)
.initScript(initScript)
.mapScript(mapScript)
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
Expand Down Expand Up @@ -753,6 +778,8 @@ public void testMapReduceWithParams() {

Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
Collections.emptyMap());
Script combineScript =
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
"sum all states' state.list values as a new aggregation", Collections.emptyMap());

Expand All @@ -763,6 +790,7 @@ public void testMapReduceWithParams() {
scriptedMetric("scripted")
.params(params)
.mapScript(mapScript)
.combineScript(combineScript)
.reduceScript(reduceScript))
.get();
assertSearchResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
private static final Script MAP_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScript", Collections.emptyMap());
private static final Script COMBINE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScript",
Collections.emptyMap());
private static final Script REDUCE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "reduceScript",
Collections.emptyMap());

private static final Script INIT_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptScore",
Collections.emptyMap());
private static final Script MAP_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScriptScore",
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptScore",
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_NOOP = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptNoop",
Collections.emptyMap());

private static final Script INIT_SCRIPT_PARAMS = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptParams",
Collections.singletonMap("initialValue", 24));
Expand Down Expand Up @@ -96,6 +100,14 @@ public static void initMockScripts() {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return ((List<Integer>) state.get("collector")).stream().mapToInt(Integer::intValue).sum();
});
SCRIPTS.put("combineScriptNoop", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});
SCRIPTS.put("reduceScript", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});

SCRIPTS.put("initScriptScore", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
Expand Down Expand Up @@ -160,7 +172,7 @@ public void testNoDocs() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.mapScript(MAP_SCRIPT); // map script is mandatory, even if its not used in this case
aggregationBuilder.mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_NOOP).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -169,9 +181,6 @@ public void testNoDocs() throws IOException {
}
}

/**
* without combine script, the "states" map should contain a list of the size of the number of documents matched
*/
public void testScriptedMetricWithoutCombine() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
Expand All @@ -182,15 +191,28 @@ public void testScriptedMetricWithoutCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
@SuppressWarnings("unchecked")
Map<String, Object> agg = (Map<String, Object>) scriptedMetric.aggregation();
@SuppressWarnings("unchecked")
List<Integer> list = (List<Integer>) agg.get("collector");
assertEquals(numDocs, list.size());
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[combineScript] must not be null: [scriptedMetric]");
}
}
}

public void testScriptedMetricWithoutReduce() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
for (int i = 0; i < numDocs; i++) {
indexWriter.addDocument(singleton(new SortedNumericDocValuesField("number", i)));
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[reduceScript] must not be null: [scriptedMetric]");
}
}
}
Expand All @@ -208,7 +230,8 @@ public void testScriptedMetricWithCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -230,7 +253,8 @@ public void testScriptedMetricWithCombineAccessesScores() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE).combineScript(COMBINE_SCRIPT_SCORE);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE)
.combineScript(COMBINE_SCRIPT_SCORE).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -250,7 +274,8 @@ public void testScriptParamsPassedThrough() throws IOException {

try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);

// The result value depends on the script params.
Expand All @@ -270,8 +295,8 @@ public void testConflictingAggAndScriptParams() throws IOException {
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
Map<String, Object> aggParams = Collections.singletonMap(CONFLICTING_PARAM_NAME, "blah");
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).
combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -289,7 +314,8 @@ public void testSelfReferencingAggStateAfterInit() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -309,7 +335,8 @@ public void testSelfReferencingAggStateAfterMap() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -326,7 +353,8 @@ public void testSelfReferencingAggStateAfterCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_SELF_REF).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand Down