Skip to content

Commit

Permalink
SQL: fix multi full-text functions usage with aggregate functions (#4…
Browse files Browse the repository at this point in the history
…7444)

* Skip functions involving full-text predicates when replacing multiple
aggregate functions with "stats" or "matrix_stats" aggregations.

(cherry picked from commit bb14ba8)
  • Loading branch information
astefan committed Oct 4, 2019
1 parent abeca45 commit 111e4e1
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 3 deletions.
61 changes: 61 additions & 0 deletions x-pack/plugin/sql/qa/src/main/resources/fulltext.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,64 @@ SELECT emp_no, first_name, SCORE() as s FROM test_emp WHERE MATCH(first_name, 'E
emp_no:i | first_name:s | s:f
10076 |Erez |4.1053944
;

//
// Mixture of Aggs that triggers promotion of aggs to stats using multi full-text filtering
//
multiAggWithCountMatchAndQuery
SELECT MIN(salary) min, MAX(salary) max, gender g, COUNT(*) c FROM "test_emp" WHERE languages > 0 AND (MATCH(gender, 'F') OR MATCH(gender, 'M')) AND QUERY('M*', 'default_field=last_name;lenient=true', 'fuzzy_rewrite=scoring_boolean') GROUP BY g HAVING max > 50000 ORDER BY gender;

min:i | max:i | g:s | c:l
---------------+---------------+---------------+---------------
37112 |69904 |F |3
32568 |70011 |M |8
;

multiAggWithCountAndMultiMatch
SELECT MIN(salary) min, MAX(salary) max, gender g, COUNT(*) c FROM "test_emp" WHERE MATCH(gender, 'F') OR MATCH(gender, 'M') GROUP BY g HAVING max > 50000 ORDER BY gender;

min:i | max:i | g:s | c:l
---------------+---------------+---------------+---------------
25976 |74572 |F |33
25945 |74999 |M |57
;

multiAggWithMultiMatchOrderByCount
SELECT MIN(salary) min, MAX(salary) max, ROUND(AVG(salary)) avg, gender g, COUNT(*) c FROM "test_emp" WHERE MATCH(gender, 'F') OR MATCH('first_name^3,last_name^5', 'geo hir', 'fuzziness=2;operator=or') GROUP BY g ORDER BY c DESC;

min:i | max:i | avg:d | g:s | c:l
---------------+---------------+---------------+---------------+---------------
25976 |74572 |50491 |F |33
32568 |32568 |32568 |M |1
;

multiAggWithMultiMatchOrderByCountAndSimpleCondition
SELECT MIN(salary) min, MAX(salary) max, ROUND(AVG(salary)) avg, gender g, COUNT(*) c FROM "test_emp" WHERE (MATCH(gender, 'F') AND languages > 4) OR MATCH('first_name^3,last_name^5', 'geo hir', 'fuzziness=2;operator=or') GROUP BY g ORDER BY c DESC;

min:i | max:i | avg:d | g:s | c:l
---------------+---------------+---------------+---------------+---------------
32272 |66817 |48081 |F |11
32568 |32568 |32568 |M |1
;

multiAggWithPercentileAndMultiQuery
SELECT languages, PERCENTILE(salary, 95) "95th", ROUND(PERCENTILE_RANK(salary, 65000)) AS rank, MAX(salary), MIN(salary), COUNT(*) c FROM test_emp WHERE QUERY('A*','default_field=first_name') OR QUERY('B*', 'default_field=first_name') OR languages IS NULL GROUP BY languages;

languages:bt | 95th:d | rank:d | MAX(salary):i | MIN(salary):i | c:l
---------------+---------------+---------------+---------------+---------------+---------------
null |74999 |74 |74999 |28336 |10
2 |44307 |100 |44307 |29175 |3
3 |65030 |100 |65030 |38376 |4
5 |66817 |100 |66817 |37137 |4
;

multiAggWithStatsAndMatrixStatsAndMultiQuery
SELECT languages, KURTOSIS(salary) k, SKEWNESS(salary) s, MAX(salary), MIN(salary), COUNT(*) c FROM test_emp WHERE QUERY('A*','default_field=first_name') OR QUERY('B*', 'default_field=first_name') OR languages IS NULL GROUP BY languages;

languages:bt | k:d | s:d | MAX(salary):i | MIN(salary):i | c:l
---------------+------------------+-------------------+---------------+---------------+---------------
null |1.9161749939033146|0.1480828817161133 |74999 |28336 |10
2 |1.5000000000000002|0.484743245141609 |44307 |29175 |3
3 |1.0732551278666582|0.05483979801873433|65030 |38376 |4
5 |1.322529094661261 |0.24501477738153868|66817 |37137 |4
;
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xpack.sql.expression.predicate.conditional.Case;
import org.elasticsearch.xpack.sql.expression.predicate.conditional.Coalesce;
import org.elasticsearch.xpack.sql.expression.predicate.conditional.IfConditional;
import org.elasticsearch.xpack.sql.expression.predicate.fulltext.FullTextPredicate;
import org.elasticsearch.xpack.sql.expression.predicate.logical.And;
import org.elasticsearch.xpack.sql.expression.predicate.logical.Not;
import org.elasticsearch.xpack.sql.expression.predicate.logical.Or;
Expand Down Expand Up @@ -451,11 +452,11 @@ static LogicalPlan updateAggAttributes(LogicalPlan p, Map<String, AggregateFunct
}
}

else if (e instanceof ScalarFunction) {
else if (e instanceof ScalarFunction && false == Expressions.anyMatch(e.children(), c -> c instanceof FullTextPredicate)) {
ScalarFunction sf = (ScalarFunction) e;

// if it's a unseen function check if the function children/arguments refers to any of the promoted aggs
if (!updatedScalarAttrs.containsKey(sf.functionId()) && e.anyMatch(c -> {
if (newAggIds.isEmpty() == false && !updatedScalarAttrs.containsKey(sf.functionId()) && e.anyMatch(c -> {
Attribute a = Expressions.attribute(c);
if (a instanceof FunctionAttribute) {
return newAggIds.contains(((FunctionAttribute) a).functionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@
import org.elasticsearch.xpack.sql.expression.Order.OrderDirection;
import org.elasticsearch.xpack.sql.expression.function.Function;
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Avg;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.sql.expression.function.aggregate.ExtendedStats;
import org.elasticsearch.xpack.sql.expression.function.aggregate.First;
import org.elasticsearch.xpack.sql.expression.function.aggregate.InnerAggregate;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Last;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Max;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Min;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Stats;
import org.elasticsearch.xpack.sql.expression.function.aggregate.StddevPop;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Sum;
import org.elasticsearch.xpack.sql.expression.function.aggregate.SumOfSquares;
import org.elasticsearch.xpack.sql.expression.function.aggregate.VarPop;
import org.elasticsearch.xpack.sql.expression.function.scalar.Cast;
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DayName;
import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DayOfMonth;
Expand Down Expand Up @@ -55,7 +63,12 @@
import org.elasticsearch.xpack.sql.expression.predicate.conditional.Iif;
import org.elasticsearch.xpack.sql.expression.predicate.conditional.Least;
import org.elasticsearch.xpack.sql.expression.predicate.conditional.NullIf;
import org.elasticsearch.xpack.sql.expression.predicate.fulltext.FullTextPredicate;
import org.elasticsearch.xpack.sql.expression.predicate.fulltext.MatchQueryPredicate;
import org.elasticsearch.xpack.sql.expression.predicate.fulltext.MultiMatchQueryPredicate;
import org.elasticsearch.xpack.sql.expression.predicate.fulltext.StringQueryPredicate;
import org.elasticsearch.xpack.sql.expression.predicate.logical.And;
import org.elasticsearch.xpack.sql.expression.predicate.logical.BinaryLogic;
import org.elasticsearch.xpack.sql.expression.predicate.logical.Not;
import org.elasticsearch.xpack.sql.expression.predicate.logical.Or;
import org.elasticsearch.xpack.sql.expression.predicate.nulls.IsNotNull;
Expand Down Expand Up @@ -85,6 +98,8 @@
import org.elasticsearch.xpack.sql.optimizer.Optimizer.FoldNull;
import org.elasticsearch.xpack.sql.optimizer.Optimizer.PropagateEquals;
import org.elasticsearch.xpack.sql.optimizer.Optimizer.PruneDuplicateFunctions;
import org.elasticsearch.xpack.sql.optimizer.Optimizer.ReplaceAggsWithExtendedStats;
import org.elasticsearch.xpack.sql.optimizer.Optimizer.ReplaceAggsWithStats;
import org.elasticsearch.xpack.sql.optimizer.Optimizer.ReplaceFoldableAttributes;
import org.elasticsearch.xpack.sql.optimizer.Optimizer.ReplaceMinMaxWithTopHits;
import org.elasticsearch.xpack.sql.optimizer.Optimizer.SimplifyCase;
Expand Down Expand Up @@ -1498,4 +1513,70 @@ public void testSortAggregateOnOrderByOnlyAliases() {
assertEquals(firstAlias, groupings.get(0));
assertEquals(secondAlias, groupings.get(1));
}
}

/**
* Test queries like SELECT MIN(agg_field), MAX(agg_field) FROM table WHERE MATCH(match_field,'A') AND/OR QUERY('match_field:A')
* or SELECT STDDEV_POP(agg_field), VAR_POP(agg_field) FROM table WHERE MATCH(match_field,'A') AND/OR QUERY('match_field:A')
*/
public void testAggregatesPromoteToStats_WithFullTextPredicatesConditions() {
FieldAttribute matchField = new FieldAttribute(EMPTY, "match_field", new EsField("match_field", DataType.TEXT, emptyMap(), true));
FieldAttribute aggField = new FieldAttribute(EMPTY, "agg_field", new EsField("agg_field", DataType.INTEGER, emptyMap(), true));

FullTextPredicate matchPredicate = new MatchQueryPredicate(EMPTY, matchField, "A", StringUtils.EMPTY);
FullTextPredicate multiMatchPredicate = new MultiMatchQueryPredicate(EMPTY, "match_field", "A", StringUtils.EMPTY);
FullTextPredicate stringQueryPredicate = new StringQueryPredicate(EMPTY, "match_field:A", StringUtils.EMPTY);
List<FullTextPredicate> predicates = Arrays.asList(matchPredicate, multiMatchPredicate, stringQueryPredicate);

FullTextPredicate left = randomFrom(predicates);
FullTextPredicate right = randomFrom(predicates);

BinaryLogic or = new Or(EMPTY, left, right);
BinaryLogic and = new And(EMPTY, left, right);
BinaryLogic condition = randomFrom(or, and);
Filter filter = new Filter(EMPTY, FROM(), condition);

List<AggregateFunction> aggregates;
boolean isSimpleStats = randomBoolean();
if (isSimpleStats) {
aggregates = Arrays.asList(new Avg(EMPTY, aggField), new Sum(EMPTY, aggField), new Min(EMPTY, aggField),
new Max(EMPTY, aggField));
} else {
aggregates = Arrays.asList(new StddevPop(EMPTY, aggField), new SumOfSquares(EMPTY, aggField), new VarPop(EMPTY, aggField));
}
AggregateFunction firstAggregate = randomFrom(aggregates);
AggregateFunction secondAggregate = randomValueOtherThan(firstAggregate, () -> randomFrom(aggregates));
Aggregate aggregatePlan = new Aggregate(EMPTY, filter, Collections.singletonList(matchField),
Arrays.asList(firstAggregate, secondAggregate));
LogicalPlan result;
if (isSimpleStats) {
result = new ReplaceAggsWithStats().apply(aggregatePlan);
} else {
result = new ReplaceAggsWithExtendedStats().apply(aggregatePlan);
}

assertTrue(result instanceof Aggregate);
Aggregate resultAgg = (Aggregate) result;
assertEquals(2, resultAgg.aggregates().size());
assertTrue(resultAgg.aggregates().get(0) instanceof InnerAggregate);
assertTrue(resultAgg.aggregates().get(1) instanceof InnerAggregate);

InnerAggregate resultFirstAgg = (InnerAggregate) resultAgg.aggregates().get(0);
InnerAggregate resultSecondAgg = (InnerAggregate) resultAgg.aggregates().get(1);
assertEquals(resultFirstAgg.inner(), firstAggregate);
assertEquals(resultSecondAgg.inner(), secondAggregate);
if (isSimpleStats) {
assertTrue(resultFirstAgg.outer() instanceof Stats);
assertTrue(resultSecondAgg.outer() instanceof Stats);
assertEquals(((Stats) resultFirstAgg.outer()).field(), aggField);
assertEquals(((Stats) resultSecondAgg.outer()).field(), aggField);
} else {
assertTrue(resultFirstAgg.outer() instanceof ExtendedStats);
assertTrue(resultSecondAgg.outer() instanceof ExtendedStats);
assertEquals(((ExtendedStats) resultFirstAgg.outer()).field(), aggField);
assertEquals(((ExtendedStats) resultSecondAgg.outer()).field(), aggField);
}

assertTrue(resultAgg.child() instanceof Filter);
assertEquals(resultAgg.child(), filter);
}
}

0 comments on commit 111e4e1

Please sign in to comment.