Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support InSubquery in PPL #714

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

91 changes: 91 additions & 0 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,97 @@ _- **Limitation: "REPLACE" or "APPEND" clause must contain "AS"**_

Details of Lookup command syntax, see [PPL-Lookup-Command](../docs/PPL-Lookup-command.md)

**InSubquery**
- `source = outer | where a in [ source = inner | fields b ]`
- `source = outer | where (a) in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) in [ source = inner | fields d,e,f ]`
- `source = outer | where a not in [ source = inner | fields b ]`
- `source = outer | where (a) not in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) not in [ source = inner | fields d,e,f ]`
- `source = outer | where a in [ source = inner1 | where b not in [ source = inner2 | fields c ] | fields b ]` (nested)
- `source = table1 | inner join left = l right = r on l.a = r.a AND r.a in [ source = inner | fields d ] | fields l.a, r.a, b, c` (as join filter)

SQL Migration examples with IN-Subquery PPL:
1. tpch q4 (in-subquery with aggregation)
```sql
select
o_orderpriority,
count(*) as order_count
from
orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and o_orderkey in (
select
l_orderkey
from
lineitem
where l_commitdate < l_receiptdate
)
group by
o_orderpriority
order by
o_orderpriority
```
Rewritten by PPL InSubquery query:
```sql
source = orders
| where o_orderdate >= "1993-07-01" and o_orderdate < "1993-10-01" and o_orderkey IN
[ source = lineitem
| where l_commitdate < l_receiptdate
| fields l_orderkey
]
| stats count(1) as order_count by o_orderpriority
| sort o_orderpriority
| fields o_orderpriority, order_count
```
2.tpch q20 (nested in-subquery)
```sql
select
s_name,
s_address
from
supplier,
nation
where
s_suppkey in (
select
ps_suppkey
from
partsupp
where
ps_partkey in (
select
p_partkey
from
part
where
p_name like 'forest%'
)
)
and s_nationkey = n_nationkey
and n_name = 'CANADA'
order by
s_name
```
Rewritten by PPL InSubquery query:
```sql
source = supplier
| where s_suppkey IN [
source = partsupp
| where ps_partkey IN [
source = part
| where like(p_name, "forest%")
| fields p_partkey
]
| fields ps_suppkey
]
| inner join left=l right=r on s_nationkey = n_nationkey and n_name = 'CANADA'
nation
| sort s_name
```

---
#### Experimental Commands:
- `correlation` - [See details](../docs/PPL-Correlation-command.md)
Expand Down
19 changes: 19 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ queryStatement
: pplCommands (PIPE commands)*
;

subSearch
: searchCommand (PIPE commands)*
;

// commands
pplCommands
: searchCommand
Expand Down Expand Up @@ -339,6 +343,12 @@ logicalExpression
comparisonExpression
: left = valueExpression comparisonOperator right = valueExpression # compareExpr
| valueExpression IN valueList # inExpr
| valueExpressionList NOT? IN LT_SQR_PRTHS subSearch RT_SQR_PRTHS # inSubqueryExpr
;

valueExpressionList
: valueExpression
| LT_PRTHS valueExpression (COMMA valueExpression)* RT_PRTHS
;

valueExpression
Expand Down Expand Up @@ -1004,4 +1014,13 @@ keywordsCanBeId
| SPARKLINE
| C
| DC
// JOIN TYPE
| OUTER
| INNER
| CROSS
| LEFT
| RIGHT
| FULL
| SEMI
| ANTI
;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.InSubquery;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IsEmpty;
import org.opensearch.sql.ast.expression.Let;
Expand Down Expand Up @@ -289,4 +290,7 @@ public T visitExplain(Explain node, C context) {
return visitStatement(node, context);
}

public T visitInSubquery(InSubquery node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.expression;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;

import java.util.Arrays;
import java.util.List;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class InSubquery extends UnresolvedExpression {
private final List<UnresolvedExpression> value;
private final UnresolvedPlan query;

@Override
public List<UnresolvedExpression> getChild() {
return value;
}

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
return nodeVisitor.visitInSubquery(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.spark.sql.catalyst.expressions.CaseWhen;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.InSubquery$;
import org.apache.spark.sql.catalyst.expressions.ListQuery$;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.SortDirection;
Expand Down Expand Up @@ -41,6 +43,7 @@
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.InSubquery;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IsEmpty;
import org.opensearch.sql.ast.expression.Let;
Expand Down Expand Up @@ -75,6 +78,7 @@
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TopAggregation;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.ppl.utils.AggregatorTranslator;
import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator;
Expand Down Expand Up @@ -124,6 +128,10 @@ public LogicalPlan visit(Statement plan, CatalystPlanContext context) {
return plan.accept(this, context);
}

public LogicalPlan visitSubSearch(UnresolvedPlan plan, CatalystPlanContext context) {
return plan.accept(this, context);
}

/**
* Handle Query Statement.
*/
Expand Down Expand Up @@ -487,7 +495,7 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) {
/**
* Expression Analyzer.
*/
public static class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, CatalystPlanContext> {
public class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, CatalystPlanContext> {

public Expression analyze(UnresolvedExpression unresolved, CatalystPlanContext context) {
return unresolved.accept(this, context);
Expand Down Expand Up @@ -734,5 +742,24 @@ public Expression visitRareTopN(RareTopN node, CatalystPlanContext context) {
public Expression visitWindowFunction(WindowFunction node, CatalystPlanContext context) {
throw new IllegalStateException("Not Supported operation : WindowFunction");
}

@Override
public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerContext) {
CatalystPlanContext innerContext = new CatalystPlanContext();
visitExpressionList(node.getChild(), innerContext);
Seq<Expression> values = innerContext.retainAllNamedParseExpressions(p -> p);
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = CatalystQueryPlanVisitor.this.visitSubSearch(outerPlan, innerContext);
Expression inSubQuery = InSubquery$.MODULE$.apply(
values,
ListQuery$.MODULE$.apply(
subSearch,
seq(new java.util.ArrayList<Expression>()),
NamedExpression.newExprId(),
-1,
seq(new java.util.ArrayList<Expression>()),
Option.empty()));
return outerContext.getNamedParseExpressions().push(inSubQuery);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public UnresolvedPlan visitQueryStatement(OpenSearchPPLParser.QueryStatementCont
return ctx.commands().stream().map(this::visit).reduce(pplCommand, (r, e) -> e.attach(r));
}

@Override
public UnresolvedPlan visitSubSearch(OpenSearchPPLParser.SubSearchContext ctx) {
UnresolvedPlan searchCommand = visit(ctx.searchCommand());
return ctx.commands().stream().map(this::visit).reduce(searchCommand, (r, e) -> e.attach(r));
}

/** Search command. */
@Override
public UnresolvedPlan visitSearchFrom(OpenSearchPPLParser.SearchFromContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.InSubquery;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IntervalUnit;
import org.opensearch.sql.ast.expression.IsEmpty;
Expand Down Expand Up @@ -62,6 +63,13 @@ public class AstExpressionBuilder extends OpenSearchPPLParserBaseVisitor<Unresol

private static final int DEFAULT_TAKE_FUNCTION_SIZE_VALUE = 10;

private AstBuilder astBuilder;

/** Set AstBuilder back to AstExpressionBuilder for resolving the subquery plan in subquery expression */
public void setAstBuilder(AstBuilder astBuilder) {
this.astBuilder = astBuilder;
}

/**
* The function name mapping between fronted and core engine.
*/
Expand Down Expand Up @@ -370,6 +378,15 @@ public UnresolvedExpression visitRightHint(OpenSearchPPLParser.RightHintContext
return new EqualTo(new Literal(ctx.rightHintKey.getText(), DataType.STRING), visit(ctx.rightHintValue));
}

@Override
public UnresolvedExpression visitInSubqueryExpr(OpenSearchPPLParser.InSubqueryExprContext ctx) {
UnresolvedExpression expr = new InSubquery(
ctx.valueExpressionList().valueExpression().stream()
.map(this::visit).collect(Collectors.toList()),
astBuilder.visitSubSearch(ctx.subSearch()));
return ctx.NOT() != null ? new Not(expr) : expr;
}

private QualifiedName visitIdentifiers(List<? extends ParserRuleContext> ctx) {
return new QualifiedName(
ctx.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ class PPLSyntaxParser extends Parser {

object PlaneUtils {
def plan(parser: PPLSyntaxParser, query: String): Statement = {
val builder = new AstStatementBuilder(
new AstBuilder(new AstExpressionBuilder(), query),
AstStatementBuilder.StatementBuilderContext.builder())
val astExpressionBuilder = new AstExpressionBuilder()
val astBuilder = new AstBuilder(astExpressionBuilder, query)
astExpressionBuilder.setAstBuilder(astBuilder)
val builder =
new AstStatementBuilder(astBuilder, AstStatementBuilder.StatementBuilderContext.builder())
builder.visit(parser.parse(query))
}
}
Loading
Loading