diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 295db7680f..84e460e66d 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -35,6 +35,9 @@ import org.opensearch.sql.ast.expression.When; import org.opensearch.sql.ast.expression.WindowFunction; import org.opensearch.sql.ast.expression.Xor; +import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.ast.tree.AD; import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Dedupe; @@ -269,4 +272,16 @@ public T visitAD(AD node, C context) { public T visitHighlightFunction(HighlightFunction node, C context) { return visitChildren(node, context); } + + public T visitStatement(Statement node, C context) { + return visit(node, context); + } + + public T visitQuery(Query node, C context) { + return visitStatement(node, context); + } + + public T visitExplain(Explain node, C context) { + return visitStatement(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/Explain.java b/core/src/main/java/org/opensearch/sql/ast/statement/Explain.java new file mode 100644 index 0000000000..d0f2e3b372 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/statement/Explain.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.ast.statement; + +import lombok.Data; +import org.opensearch.sql.ast.AbstractNodeVisitor; + +/** + * Explain Statement. + */ +@Data +public class Explain extends Statement { + + private final Statement statement; + + @Override + public R accept(AbstractNodeVisitor visitor, C context) { + return visitor.visitExplain(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java new file mode 100644 index 0000000000..a7b547ed2a --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.ast.statement; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.tree.UnresolvedPlan; + +/** + * Query Statement. + */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +public class Query extends Statement { + + private final UnresolvedPlan plan; + + @Override + public R accept(AbstractNodeVisitor visitor, C context) { + return visitor.visitQuery(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/Statement.java b/core/src/main/java/org/opensearch/sql/ast/statement/Statement.java new file mode 100644 index 0000000000..e32a8dbfd8 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/statement/Statement.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.ast.statement; + +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.Node; + +/** + * Statement is the high interface of core engine. + */ +public abstract class Statement extends Node { + @Override + public R accept(AbstractNodeVisitor visitor, C context) { + return visitor.visitStatement(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java b/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java new file mode 100644 index 0000000000..9ab3bd7486 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor; + +import org.opensearch.sql.executor.execution.AbstractPlan; + +/** + * Default QueryManager implementation which execute {@link AbstractPlan} on caller thread. + */ +public class DefaultQueryManager implements QueryManager { + + @Override + public QueryId submit(AbstractPlan queryExecution) { + queryExecution.execute(); + + return queryExecution.getQueryId(); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryId.java b/core/src/main/java/org/opensearch/sql/executor/QueryId.java new file mode 100644 index 0000000000..933cb5d82d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/QueryId.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor; + +import lombok.Getter; +import org.apache.commons.lang3.RandomStringUtils; +import org.opensearch.sql.executor.execution.AbstractPlan; + +/** + * Query id of {@link AbstractPlan}. + */ +public class QueryId { + /** + * Query id. + */ + @Getter + private final String queryId; + + /** + * Generate {@link QueryId}. + * @return {@link QueryId}. + */ + public static QueryId queryId() { + return new QueryId(RandomStringUtils.random(10, true, true)); + } + + private QueryId(String queryId) { + this.queryId = queryId; + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryManager.java b/core/src/main/java/org/opensearch/sql/executor/QueryManager.java new file mode 100644 index 0000000000..3a32e4c7e9 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/QueryManager.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor; + +import org.opensearch.sql.executor.execution.AbstractPlan; + +/** + * QueryManager is the high-level interface of core engine. + * Frontend submit {@link AbstractPlan} to QueryManager. + */ +public interface QueryManager { + + /** + * Submit {@link AbstractPlan}. + * @param queryPlan {@link AbstractPlan}. + * @return {@link QueryId}. + */ + QueryId submit(AbstractPlan queryPlan); +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java new file mode 100644 index 0000000000..423d152e06 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.analysis.AnalysisContext; +import org.opensearch.sql.analysis.Analyzer; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.Planner; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * The low level interface of core engine. + */ +@RequiredArgsConstructor +public class QueryService { + + private final Analyzer analyzer; + + private final ExecutionEngine executionEngine; + + private final Planner planner; + + /** + * Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response. + * + * @param plan {@link UnresolvedPlan} + * @param listener {@link ResponseListener} + */ + public void execute(UnresolvedPlan plan, + ResponseListener listener) { + try { + executionEngine.execute(plan(plan), listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + /** + * Explain the query in {@link UnresolvedPlan} using {@link ResponseListener} to + * get and format explain response. + * + * @param plan {@link UnresolvedPlan} + * @param listener {@link ResponseListener} for explain response + */ + public void explain(UnresolvedPlan plan, + ResponseListener listener) { + try { + executionEngine.explain(plan(plan), listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private PhysicalPlan plan(UnresolvedPlan plan) { + // 1.Analyze abstract syntax to generate logical plan + LogicalPlan logicalPlan = analyzer.analyze(plan, new AnalysisContext()); + + // 2.Generate optimal physical plan from logical plan + return planner.plan(logicalPlan); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/AbstractPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/AbstractPlan.java new file mode 100644 index 0000000000..1654293c04 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/execution/AbstractPlan.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; + +/** + * AbstractPlan represent the execution entity of the Statement. + */ +@RequiredArgsConstructor +public abstract class AbstractPlan { + + /** + * Uniq query id. + */ + @Getter + private final QueryId queryId; + + /** + * Start query execution. + */ + public abstract void execute(); + + /** + * Explain query execution. + * + * @param listener query explain response listener. + */ + public abstract void explain(ResponseListener listener); +} diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/ExplainPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/ExplainPlan.java new file mode 100644 index 0000000000..8c784f82ed --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/execution/ExplainPlan.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; + +/** + * Explain plan. + */ +public class ExplainPlan extends AbstractPlan { + + private final AbstractPlan plan; + + private final ResponseListener explainListener; + + /** + * Constructor. + */ + public ExplainPlan(QueryId queryId, + AbstractPlan plan, + ResponseListener explainListener) { + super(queryId); + this.plan = plan; + this.explainListener = explainListener; + } + + @Override + public void execute() { + plan.explain(explainListener); + } + + @Override + public void explain(ResponseListener listener) { + throw new UnsupportedOperationException("explain query can not been explained."); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java new file mode 100644 index 0000000000..02cb701a0b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; + +/** + * Query plan. Which includes. + * + *

select query. + */ +public class QueryPlan extends AbstractPlan { + + /** + * The query plan ast. + */ + private final UnresolvedPlan plan; + + /** + * Query service. + */ + private final QueryService queryService; + + private final ResponseListener listener; + + /** constructor. */ + public QueryPlan( + QueryId queryId, + UnresolvedPlan plan, + QueryService queryService, + ResponseListener listener) { + super(queryId); + this.plan = plan; + this.queryService = queryService; + this.listener = listener; + } + + @Override + public void execute() { + queryService.execute(plan, listener); + } + + @Override + public void explain(ResponseListener listener) { + queryService.explain(plan, listener); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java new file mode 100644 index 0000000000..851381cc7a --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; + +/** + * QueryExecution Factory. + */ +@RequiredArgsConstructor +public class QueryPlanFactory + extends AbstractNodeVisitor< + AbstractPlan, + Pair< + Optional>, + Optional>>> { + + /** + * Query Service. + */ + private final QueryService queryService; + + /** + * NO_CONSUMER_RESPONSE_LISTENER should never been called. It is only used as constructor + * parameter of {@link QueryPlan}. + */ + @VisibleForTesting + protected static final ResponseListener + NO_CONSUMER_RESPONSE_LISTENER = + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + throw new IllegalStateException( + "[BUG] query response should not sent to unexpected channel"); + } + + @Override + public void onFailure(Exception e) { + throw new IllegalStateException( + "[BUG] exception response should not sent to unexpected channel"); + } + }; + + /** + * Create QueryExecution from Statement. + */ + public AbstractPlan create( + Statement statement, + Optional> queryListener, + Optional> explainListener) { + return statement.accept(this, Pair.of(queryListener, explainListener)); + } + + @Override + public AbstractPlan visitQuery( + Query node, + Pair< + Optional>, + Optional>> + context) { + Preconditions.checkArgument( + context.getLeft().isPresent(), "[BUG] query listener must be not null"); + + return new QueryPlan(QueryId.queryId(), node.getPlan(), queryService, context.getLeft().get()); + } + + @Override + public AbstractPlan visitExplain( + Explain node, + Pair< + Optional>, + Optional>> + context) { + Preconditions.checkArgument( + context.getRight().isPresent(), "[BUG] explain listener must be not null"); + + return new ExplainPlan( + QueryId.queryId(), + create(node.getStatement(), Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()), + context.getRight().get()); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java new file mode 100644 index 0000000000..988b41657d --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.executor.execution.AbstractPlan; + +@ExtendWith(MockitoExtension.class) +class DefaultQueryManagerTest { + + @Mock + private AbstractPlan plan; + + @Mock + private QueryId queryId; + + @Test + public void submitQuery() { + when(plan.getQueryId()).thenReturn(queryId); + + QueryId actualQueryId = new DefaultQueryManager().submit(plan); + + assertEquals(queryId, actualQueryId); + verify(plan, times(1)).execute(); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryIdTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryIdTest.java new file mode 100644 index 0000000000..7d837c3e24 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/QueryIdTest.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +import com.google.common.base.Strings; +import org.junit.jupiter.api.Test; + +class QueryIdTest { + + @Test + public void createQueryId() { + assertFalse(Strings.isNullOrEmpty(QueryId.queryId().getQueryId())); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java new file mode 100644 index 0000000000..2884544dd0 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.analysis.Analyzer; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.Planner; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +@ExtendWith(MockitoExtension.class) +class QueryServiceTest { + private AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + + private QueryService queryService; + + @Mock + private ExecutionEngine executionEngine; + + @Mock + private Analyzer analyzer; + + @Mock + private Planner planner; + + @Mock + private UnresolvedPlan ast; + + @Mock + private LogicalPlan logicalPlan; + + @Mock + private PhysicalPlan plan; + + @Mock + private ExecutionEngine.Schema schema; + + @BeforeEach + public void setUp() { + when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); + when(planner.plan(any())).thenReturn(plan); + + queryService = new QueryService(analyzer, executionEngine, planner); + } + + @Test + public void testExecuteShouldPass() { + doAnswer( + invocation -> { + ResponseListener listener = invocation.getArgument(1); + listener.onResponse( + new ExecutionEngine.QueryResponse(schema, Collections.emptyList())); + return null; + }) + .when(executionEngine) + .execute(any(), any()); + + queryService.execute( + ast, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }); + } + + @Test + public void testExplainShouldPass() { + doAnswer( + invocation -> { + ResponseListener listener = + invocation.getArgument(1); + listener.onResponse( + new ExecutionEngine.ExplainResponse( + new ExecutionEngine.ExplainResponseNode("test"))); + return null; + }) + .when(executionEngine) + .explain(any(), any()); + + queryService.explain( + ast, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.ExplainResponse pplQueryResponse) { + + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }); + } + + @Test + public void testExecuteWithExceptionShouldBeCaughtByHandler() { + doThrow(new IllegalStateException("illegal state exception")) + .when(executionEngine) + .execute(any(), any()); + + queryService.execute( + ast, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + fail(); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + } + }); + } + + @Test + public void testExecuteWithIllegalQueryShouldBeCaughtByHandler() { + doThrow(new IllegalStateException("illegal state exception")) + .when(executionEngine) + .explain(any(), any()); + + queryService.explain( + ast, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.ExplainResponse pplQueryResponse) { + fail(); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + } + }); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/ExplainPlanTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/ExplainPlanTest.java new file mode 100644 index 0000000000..54b4f24db0 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/execution/ExplainPlanTest.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; + +@ExtendWith(MockitoExtension.class) +public class ExplainPlanTest { + @Mock + private QueryId queryId; + + @Mock + private QueryPlan queryPlan; + + @Mock + private ResponseListener explainListener; + + @Test + public void execute() { + doNothing().when(queryPlan).explain(any()); + + ExplainPlan explainPlan = new ExplainPlan(queryId, queryPlan, explainListener); + explainPlan.execute(); + + verify(queryPlan, times(1)).explain(explainListener); + } + + @Test + public void explainThrowException() { + ExplainPlan explainPlan = new ExplainPlan(queryId, queryPlan, explainListener); + + UnsupportedOperationException unsupportedExplainException = + assertThrows( + UnsupportedOperationException.class, + () -> { + explainPlan.explain(explainListener); + }); + assertEquals("explain query can not been explained.", unsupportedExplainException.getMessage()); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java new file mode 100644 index 0000000000..cc4bf070fb --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.executor.execution.QueryPlanFactory.NO_CONSUMER_RESPONSE_LISTENER; + +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryService; + +@ExtendWith(MockitoExtension.class) +class QueryPlanFactoryTest { + + @Mock + private UnresolvedPlan plan; + + @Mock + private QueryService queryService; + + @Mock + private ResponseListener queryListener; + + @Mock + private ResponseListener explainListener; + + @Mock + private ExecutionEngine.QueryResponse queryResponse; + + private QueryPlanFactory factory; + + @BeforeEach + void init() { + factory = new QueryPlanFactory(queryService); + } + + @Test + public void createFromQueryShouldSuccess() { + Statement query = new Query(plan); + AbstractPlan queryExecution = + factory.create(query, Optional.of(queryListener), Optional.empty()); + assertTrue(queryExecution instanceof QueryPlan); + } + + @Test + public void createFromExplainShouldSuccess() { + Statement query = new Explain(new Query(plan)); + AbstractPlan queryExecution = + factory.create(query, Optional.empty(), Optional.of(explainListener)); + assertTrue(queryExecution instanceof ExplainPlan); + } + + @Test + public void createFromQueryWithoutQueryListenerShouldThrowException() { + Statement query = new Query(plan); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> factory.create(query, + Optional.empty(), Optional.empty())); + assertEquals("[BUG] query listener must be not null", exception.getMessage()); + } + + @Test + public void createFromExplainWithoutExplainListenerShouldThrowException() { + Statement query = new Explain(new Query(plan)); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> factory.create(query, + Optional.empty(), Optional.empty())); + assertEquals("[BUG] explain listener must be not null", exception.getMessage()); + } + + @Test + public void noConsumerResponseChannel() { + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> NO_CONSUMER_RESPONSE_LISTENER.onResponse(queryResponse)); + assertEquals( + "[BUG] query response should not sent to unexpected channel", exception.getMessage()); + + exception = + assertThrows( + IllegalStateException.class, + () -> NO_CONSUMER_RESPONSE_LISTENER.onFailure(new RuntimeException())); + assertEquals( + "[BUG] exception response should not sent to unexpected channel", exception.getMessage()); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java new file mode 100644 index 0000000000..834db76996 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; + +@ExtendWith(MockitoExtension.class) +class QueryPlanTest { + + @Mock + private QueryId queryId; + + @Mock + private UnresolvedPlan plan; + + @Mock + private QueryService queryService; + + @Mock + private ResponseListener explainListener; + + @Mock + private ResponseListener queryListener; + + @Test + public void execute() { + QueryPlan query = new QueryPlan(queryId, plan, queryService, queryListener); + query.execute(); + + verify(queryService, times(1)).execute(any(), any()); + } + + @Test + public void explain() { + QueryPlan query = new QueryPlan(queryId, plan, queryService, queryListener); + query.explain(explainListener); + + verify(queryService, times(1)).explain(plan, explainListener); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index 94cafef35c..bcd0c0ffb8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -18,23 +18,40 @@ import org.opensearch.client.Request; import org.opensearch.client.RestClient; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.sql.analysis.Analyzer; +import org.opensearch.sql.analysis.ExpressionAnalyzer; import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.executor.DefaultQueryManager; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.execution.QueryPlanFactory; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.config.ExpressionConfig; +import org.opensearch.sql.expression.function.BuiltinFunctionRepository; import org.opensearch.sql.monitor.AlwaysHealthyMonitor; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.client.OpenSearchRestClient; import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine; import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector; import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.planner.Planner; +import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; import org.opensearch.sql.plugin.catalog.CatalogServiceImpl; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Scope; /** * Run PPL with query engine outside OpenSearch cluster. This IT doesn't require our plugin @@ -56,10 +73,13 @@ public void init() { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.registerBean(ExecutionEngine.class, () -> new OpenSearchExecutionEngine(client, new OpenSearchExecutionProtector(new AlwaysHealthyMonitor()))); - context.register(PPLServiceConfig.class); + context.registerBean(OpenSearchClient.class, () -> client); + context.registerBean(Settings.class, () -> defaultSettings()); OpenSearchStorageEngine openSearchStorageEngine = new OpenSearchStorageEngine(client, defaultSettings()); CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(openSearchStorageEngine); context.registerBean(CatalogService.class, CatalogServiceImpl::getInstance); + context.register(StandaloneConfig.class); + context.register(PPLServiceConfig.class); context.refresh(); pplService = context.getBean(PPLService.class); @@ -144,4 +164,39 @@ public InternalRestHighLevelClient(RestClient restClient) { super(restClient, RestClient::close, Collections.emptyList()); } } + + @Configuration + @Import({ExpressionConfig.class}) + static class StandaloneConfig { + @Autowired + private CatalogService catalogService; + + @Autowired + private ExecutionEngine executionEngine; + + @Bean + QueryManager queryManager() { + return new DefaultQueryManager(); + } + + @Bean + @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) + QueryPlanFactory queryExecutionFactory(BuiltinFunctionRepository functionRepository) { + catalogService + .getCatalogs() + .forEach( + catalog -> + catalog + .getStorageEngine() + .getFunctions() + .forEach( + functionResolver -> + functionRepository.register(catalog.getName(), functionResolver))); + Analyzer analyzer = new Analyzer(new ExpressionAnalyzer(functionRepository), + catalogService, functionRepository); + Planner planner = + new Planner(LogicalPlanOptimizer.create(new DSL(functionRepository))); + return new QueryPlanFactory(new QueryService(analyzer, executionEngine, planner)); + } + } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java index 6524ac2f0e..a5a3ac5a4f 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java @@ -6,12 +6,12 @@ package org.opensearch.sql.legacy.plugin; -import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.rest.RestStatus.OK; import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import java.util.List; +import java.util.function.BiConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.node.NodeClient; @@ -22,11 +22,11 @@ import org.opensearch.rest.RestStatus; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.security.SecurityAccess; -import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.CsvResponseFormatter; import org.opensearch.sql.protocol.response.format.Format; @@ -76,39 +76,67 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod /** * Prepare REST channel consumer for a SQL query request. - * @param request SQL request - * @param nodeClient node client - * @return channel consumer + * + * @param request SQL request + * @param fallbackHandler handle request fallback to legacy engine. + * @param executionErrorHandler handle error response during new engine execution. + * @return {@link RestChannelConsumer} */ - public RestChannelConsumer prepareRequest(SQLQueryRequest request, NodeClient nodeClient) { + public RestChannelConsumer prepareRequest( + SQLQueryRequest request, + BiConsumer fallbackHandler, + BiConsumer executionErrorHandler) { if (!request.isSupported()) { - return NOT_SUPPORTED_YET; + return channel -> fallbackHandler.accept(channel, new IllegalStateException("not supported")); } SQLService sqlService = SecurityAccess.doPrivileged(() -> applicationContext.getBean(SQLService.class)); - PhysicalPlan plan; - try { - // For now analyzing and planning stage may throw syntax exception as well - // which hints the fallback to legacy code is necessary here. - plan = sqlService.plan( - sqlService.analyze( - sqlService.parse(request.getQuery()))); - } catch (SyntaxCheckException e) { - // When explain, print info log for what unsupported syntax is causing fallback to old engine - if (request.isExplainRequest()) { - LOG.info("Request is falling back to old SQL engine due to: " + e.getMessage()); - } - return NOT_SUPPORTED_YET; - } if (request.isExplainRequest()) { - return channel -> sqlService.explain(plan, createExplainResponseListener(channel)); + return channel -> + sqlService.explain( + request, + fallBackListener( + channel, + createExplainResponseListener(channel, executionErrorHandler), + fallbackHandler)); + } else { + return channel -> + sqlService.execute( + request, + fallBackListener( + channel, + createQueryResponseListener(channel, request, executionErrorHandler), + fallbackHandler)); } - return channel -> sqlService.execute(plan, createQueryResponseListener(channel, request)); } - private ResponseListener createExplainResponseListener(RestChannel channel) { + private ResponseListener fallBackListener( + RestChannel channel, + ResponseListener next, + BiConsumer fallBackHandler) { + return new ResponseListener() { + @Override + public void onResponse(T response) { + LOG.error("[{}] Request is handled by new SQL query engine", + QueryContext.getRequestId()); + next.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof SyntaxCheckException) { + fallBackHandler.accept(channel, e); + } else { + next.onFailure(e); + } + } + }; + } + + private ResponseListener createExplainResponseListener( + RestChannel channel, BiConsumer errorHandler) { return new ResponseListener() { @Override public void onResponse(ExplainResponse response) { @@ -122,15 +150,15 @@ protected Object buildJsonObject(ExplainResponse response) { @Override public void onFailure(Exception e) { - LOG.error("Error happened during explain", e); - logAndPublishMetrics(e); - sendResponse(channel, INTERNAL_SERVER_ERROR, - "Failed to explain the query due to error: " + e.getMessage()); + errorHandler.accept(channel, e); } }; } - private ResponseListener createQueryResponseListener(RestChannel channel, SQLQueryRequest request) { + private ResponseListener createQueryResponseListener( + RestChannel channel, + SQLQueryRequest request, + BiConsumer errorHandler) { Format format = request.format(); ResponseFormatter formatter; if (format.equals(Format.CSV)) { @@ -149,9 +177,7 @@ public void onResponse(QueryResponse response) { @Override public void onFailure(Exception e) { - LOG.error("Error happened during query handling", e); - logAndPublishMetrics(e); - sendResponse(channel, INTERNAL_SERVER_ERROR, formatter.format(e)); + errorHandler.accept(channel, e); } }; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index d47dac9325..de09bcee1a 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -9,7 +9,6 @@ import static org.opensearch.rest.RestStatus.BAD_REQUEST; import static org.opensearch.rest.RestStatus.OK; import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; -import static org.opensearch.sql.opensearch.executor.Scheduler.schedule; import com.alibaba.druid.sql.parser.ParserException; import com.google.common.collect.ImmutableList; @@ -153,27 +152,29 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Format format = SqlRequestParam.getFormat(request.params()); - return channel -> schedule(client, () -> { - try { - // Route request to new query engine if it's supported already - SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(), - sqlRequest.getSql(), request.path(), request.params()); - RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client); - if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) { - LOG.info("[{}] Request is handled by new SQL query engine", - QueryContext.getRequestId()); - result.accept(channel); - } else { - LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine", - QueryContext.getRequestId(), newSqlRequest); - QueryAction queryAction = explainRequest(client, sqlRequest, format); - executeSqlRequest(request, queryAction, client, channel); - } - } catch (Exception e) { - logAndPublishMetrics(e); - reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE); - } - }); + // Route request to new query engine if it's supported already + SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(), + sqlRequest.getSql(), request.path(), request.params()); + return newSqlQueryHandler.prepareRequest(newSqlRequest, + (restChannel, exception) -> { + try{ + if (newSqlRequest.isExplainRequest()) { + LOG.info("Request is falling back to old SQL engine due to: " + exception.getMessage()); + } + LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine", + QueryContext.getRequestId(), newSqlRequest); + QueryAction queryAction = explainRequest(client, sqlRequest, format); + executeSqlRequest(request, queryAction, client, restChannel); + } catch (Exception e) { + logAndPublishMetrics(e); + reportError(restChannel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE); + } + }, + (restChannel, exception) -> { + logAndPublishMetrics(exception); + reportError(restChannel, exception, isClientError(exception) ? + BAD_REQUEST : SERVICE_UNAVAILABLE); + }); } catch (Exception e) { logAndPublishMetrics(e); return channel -> reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE); diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java index 6257f0dd95..4c9afe802e 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java @@ -6,12 +6,15 @@ package org.opensearch.sql.legacy.plugin; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; -import static org.opensearch.sql.legacy.plugin.RestSQLQueryAction.NOT_SUPPORTED_YET; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.opensearch.sql.legacy.plugin.RestSqlAction.EXPLAIN_API_ENDPOINT; import static org.opensearch.sql.legacy.plugin.RestSqlAction.QUERY_API_ENDPOINT; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; @@ -21,8 +24,14 @@ import org.mockito.junit.MockitoJUnitRunner; import org.opensearch.client.node.NodeClient; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.sql.config.SQLServiceConfig; import org.opensearch.sql.sql.domain.SQLQueryRequest; import org.opensearch.sql.storage.StorageEngine; @@ -30,13 +39,25 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext; @RunWith(MockitoJUnitRunner.class) -public class RestSQLQueryActionTest { +public class RestSQLQueryActionTest extends BaseRestHandler { private NodeClient nodeClient; @Mock private ThreadPool threadPool; + @Mock + private QueryManager queryManager; + + @Mock + private QueryPlanFactory factory; + + @Mock + private ExecutionEngine.Schema schema; + + @Mock + private RestChannel restChannel; + private AnnotationConfigApplicationContext context; @Before @@ -46,6 +67,8 @@ public void setup() { context.registerBean(StorageEngine.class, () -> Mockito.mock(StorageEngine.class)); context.registerBean(ExecutionEngine.class, () -> Mockito.mock(ExecutionEngine.class)); context.registerBean(CatalogService.class, () -> Mockito.mock(CatalogService.class)); + context.registerBean(QueryManager.class, () -> queryManager); + context.registerBean(QueryPlanFactory.class, () -> factory); context.register(SQLServiceConfig.class); context.refresh(); Mockito.lenient().when(threadPool.getThreadContext()) @@ -53,7 +76,7 @@ public void setup() { } @Test - public void handleQueryThatCanSupport() { + public void handleQueryThatCanSupport() throws Exception { SQLQueryRequest request = new SQLQueryRequest( new JSONObject("{\"query\": \"SELECT -123\"}"), "SELECT -123", @@ -61,11 +84,15 @@ public void handleQueryThatCanSupport() { ""); RestSQLQueryAction queryAction = new RestSQLQueryAction(context); - assertNotSame(NOT_SUPPORTED_YET, queryAction.prepareRequest(request, nodeClient)); + queryAction.prepareRequest(request, (channel, exception) -> { + fail(); + }, (channel, exception) -> { + fail(); + }).accept(restChannel); } @Test - public void handleExplainThatCanSupport() { + public void handleExplainThatCanSupport() throws Exception { SQLQueryRequest request = new SQLQueryRequest( new JSONObject("{\"query\": \"SELECT -123\"}"), "SELECT -123", @@ -73,11 +100,15 @@ public void handleExplainThatCanSupport() { ""); RestSQLQueryAction queryAction = new RestSQLQueryAction(context); - assertNotSame(NOT_SUPPORTED_YET, queryAction.prepareRequest(request, nodeClient)); + queryAction.prepareRequest(request, (channel, exception) -> { + fail(); + }, (channel, exception) -> { + fail(); + }).accept(restChannel); } @Test - public void skipQueryThatNotSupport() { + public void queryThatNotSupportIsHandledByFallbackHandler() throws Exception { SQLQueryRequest request = new SQLQueryRequest( new JSONObject( "{\"query\": \"SELECT name FROM test1 JOIN test2 ON test1.name = test2.name\"}"), @@ -85,8 +116,53 @@ public void skipQueryThatNotSupport() { QUERY_API_ENDPOINT, ""); + AtomicBoolean fallback = new AtomicBoolean(false); + RestSQLQueryAction queryAction = new RestSQLQueryAction(context); + queryAction.prepareRequest(request, (channel, exception) -> { + fallback.set(true); + assertTrue(exception instanceof SyntaxCheckException); + }, (channel, exception) -> { + fail(); + }).accept(restChannel); + + assertTrue(fallback.get()); + } + + @Test + public void queryExecutionFailedIsHandledByExecutionErrorHandler() throws Exception { + SQLQueryRequest request = new SQLQueryRequest( + new JSONObject( + "{\"query\": \"SELECT -123\"}"), + "SELECT -123", + QUERY_API_ENDPOINT, + ""); + + doThrow(new IllegalStateException("execution exception")) + .when(queryManager) + .submit(any()); + + AtomicBoolean executionErrorHandler = new AtomicBoolean(false); RestSQLQueryAction queryAction = new RestSQLQueryAction(context); - assertSame(NOT_SUPPORTED_YET, queryAction.prepareRequest(request, nodeClient)); + queryAction.prepareRequest(request, (channel, exception) -> { + assertTrue(exception instanceof SyntaxCheckException); + }, (channel, exception) -> { + executionErrorHandler.set(true); + assertTrue(exception instanceof IllegalStateException); + }).accept(restChannel); + + assertTrue(executionErrorHandler.get()); } + @Override + public String getName() { + // do nothing, RestChannelConsumer is protected which required to extend BaseRestHandler + return null; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) + throws IOException { + // do nothing, RestChannelConsumer is protected which required to extend BaseRestHandler + return null; + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java new file mode 100644 index 0000000000..9c6fcdb825 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.opensearch.executor; + +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.ThreadContext; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.execution.AbstractPlan; +import org.opensearch.threadpool.ThreadPool; + +/** + * QueryManager implemented in OpenSearch cluster. + */ +@RequiredArgsConstructor +public class OpenSearchQueryManager implements QueryManager { + + private final NodeClient nodeClient; + + private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + + @Override + public QueryId submit(AbstractPlan queryPlan) { + schedule(nodeClient, () -> queryPlan.execute()); + + return queryPlan.getQueryId(); + } + + private void schedule(NodeClient client, Runnable task) { + ThreadPool threadPool = client.threadPool(); + threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); + } + + private Runnable withCurrentContext(final Runnable task) { + final Map currentContext = ThreadContext.getImmutableContext(); + return () -> { + ThreadContext.putAll(currentContext); + task.run(); + }; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/Scheduler.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/Scheduler.java deleted file mode 100644 index 5567d1f9b2..0000000000 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/Scheduler.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.opensearch.executor; - -import java.util.Map; -import lombok.experimental.UtilityClass; -import org.apache.logging.log4j.ThreadContext; -import org.opensearch.client.node.NodeClient; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.threadpool.ThreadPool; - -/** The scheduler which schedule the task run in sql-worker thread pool. */ -@UtilityClass -public class Scheduler { - - public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - - public static void schedule(NodeClient client, Runnable task) { - ThreadPool threadPool = client.threadPool(); - threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); - } - - private static Runnable withCurrentContext(final Runnable task) { - final Map currentContext = ThreadContext.getImmutableContext(); - return () -> { - ThreadContext.putAll(currentContext); - task.run(); - }; - } -} diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/SchedulerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java similarity index 51% rename from opensearch/src/test/java/org/opensearch/sql/opensearch/executor/SchedulerTest.java rename to opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java index f14bda7a95..6d2b9b13ce 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/SchedulerTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java @@ -1,6 +1,9 @@ /* - * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.sql.opensearch.executor; @@ -14,18 +17,47 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.node.NodeClient; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.execution.AbstractPlan; +import org.opensearch.sql.executor.execution.QueryPlan; import org.opensearch.threadpool.ThreadPool; @ExtendWith(MockitoExtension.class) -class SchedulerTest { +class OpenSearchQueryManagerTest { + + @Mock + private QueryId queryId; + + @Mock + private QueryService queryService; + + @Mock + private UnresolvedPlan plan; + + @Mock + private ResponseListener listener; + @Test - public void schedule() { + public void submitQuery() { NodeClient nodeClient = mock(NodeClient.class); ThreadPool threadPool = mock(ThreadPool.class); when(nodeClient.threadPool()).thenReturn(threadPool); + AtomicBoolean isRun = new AtomicBoolean(false); + AbstractPlan queryPlan = new QueryPlan(queryId, plan, queryService, listener) { + @Override + public void execute() { + isRun.set(true); + } + }; + doAnswer( invocation -> { Runnable task = invocation.getArgument(0); @@ -34,8 +66,8 @@ public void schedule() { }) .when(threadPool) .schedule(any(), any(), any()); - AtomicBoolean isRun = new AtomicBoolean(false); - Scheduler.schedule(nodeClient, () -> isRun.set(true)); + new OpenSearchQueryManager(nodeClient).submit(queryPlan); + assertTrue(isRun.get()); } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java index a2169eb839..596296522c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java @@ -10,20 +10,29 @@ package org.opensearch.sql.plugin.config; import org.opensearch.client.node.NodeClient; +import org.opensearch.sql.analysis.Analyzer; +import org.opensearch.sql.analysis.ExpressionAnalyzer; +import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.execution.QueryPlanFactory; +import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.config.ExpressionConfig; import org.opensearch.sql.expression.function.BuiltinFunctionRepository; -import org.opensearch.sql.expression.function.OpenSearchFunctions; import org.opensearch.sql.monitor.ResourceMonitor; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.client.OpenSearchNodeClient; import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine; +import org.opensearch.sql.opensearch.executor.OpenSearchQueryManager; import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector; import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector; import org.opensearch.sql.opensearch.monitor.OpenSearchMemoryHealthy; import org.opensearch.sql.opensearch.monitor.OpenSearchResourceMonitor; import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.planner.Planner; +import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; import org.opensearch.sql.storage.StorageEngine; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -47,7 +56,7 @@ public class OpenSearchPluginConfig { private Settings settings; @Autowired - private BuiltinFunctionRepository functionRepository; + private CatalogService catalogService; @Bean @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) @@ -64,7 +73,6 @@ public StorageEngine storageEngine() { @Bean @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public ExecutionEngine executionEngine() { - OpenSearchFunctions.register(functionRepository); return new OpenSearchExecutionEngine(client(), protector()); } @@ -79,4 +87,35 @@ public ResourceMonitor resourceMonitor() { public ExecutionProtector protector() { return new OpenSearchExecutionProtector(resourceMonitor()); } + + /** + * Per node singleton object. + */ + @Bean + public QueryManager queryManager() { + return new OpenSearchQueryManager(nodeClient); + } + + /** + * QueryPlanFactory. + */ + @Bean + @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) + public QueryPlanFactory queryExecutionFactory(BuiltinFunctionRepository functionRepository) { + catalogService + .getCatalogs() + .forEach( + catalog -> + catalog + .getStorageEngine() + .getFunctions() + .forEach( + functionResolver -> + functionRepository.register(catalog.getName(), functionResolver))); + Analyzer analyzer = new Analyzer(new ExpressionAnalyzer(functionRepository), + catalogService, functionRepository); + Planner planner = + new Planner(LogicalPlanOptimizer.create(new DSL(functionRepository))); + return new QueryPlanFactory(new QueryService(analyzer, executionEngine(), planner)); + } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index 2bc3c8d72d..0f4d2f7d0c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -9,7 +9,6 @@ import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.rest.RestStatus.OK; import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; -import static org.opensearch.sql.opensearch.executor.Scheduler.schedule; import com.google.common.collect.ImmutableList; import java.util.Arrays; @@ -112,43 +111,42 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod PPLQueryRequestFactory.getPPLRequest(request) ); - return channel -> schedule(nodeClient, () -> - nodeClient.execute( - PPLQueryAction.INSTANCE, - transportPPLQueryRequest, - new ActionListener<>() { - @Override - public void onResponse(TransportPPLQueryResponse response) { - sendResponse(channel, OK, response.getResult()); - } - - @Override - public void onFailure(Exception e) { - if (transportPPLQueryRequest.isExplainRequest()) { - LOG.error("Error happened during explain", e); - sendResponse( - channel, - INTERNAL_SERVER_ERROR, - "Failed to explain the query due to error: " + e.getMessage()); - } else if (e instanceof IllegalAccessException) { - reportError(channel, e, BAD_REQUEST); - } else { - LOG.error("Error happened during query handling", e); - if (isClientError(e)) { - Metrics.getInstance() - .getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS) - .increment(); + return channel -> + nodeClient.execute( + PPLQueryAction.INSTANCE, + transportPPLQueryRequest, + new ActionListener<>() { + @Override + public void onResponse(TransportPPLQueryResponse response) { + sendResponse(channel, OK, response.getResult()); + } + + @Override + public void onFailure(Exception e) { + if (transportPPLQueryRequest.isExplainRequest()) { + LOG.error("Error happened during explain", e); + sendResponse( + channel, + INTERNAL_SERVER_ERROR, + "Failed to explain the query due to error: " + e.getMessage()); + } else if (e instanceof IllegalAccessException) { reportError(channel, e, BAD_REQUEST); } else { - Metrics.getInstance() - .getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS) - .increment(); - reportError(channel, e, SERVICE_UNAVAILABLE); + LOG.error("Error happened during query handling", e); + if (isClientError(e)) { + Metrics.getInstance() + .getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS) + .increment(); + reportError(channel, e, BAD_REQUEST); + } else { + Metrics.getInstance() + .getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS) + .increment(); + reportError(channel, e, SERVICE_UNAVAILABLE); + } } } - } - } - )); + }); } private void sendResponse(RestChannel channel, RestStatus status, String content) { diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index a1061b0020..c281b9130d 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -14,6 +14,14 @@ root /** statement */ pplStatement + : dmlStatement + ; + +dmlStatement + : queryStatement + ; + +queryStatement : pplCommands (PIPE commands)* ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java index e6cbbb92f5..e11edc1646 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java @@ -8,41 +8,35 @@ import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import java.util.Optional; import lombok.RequiredArgsConstructor; import org.antlr.v4.runtime.tree.ParseTree; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.sql.analysis.AnalysisContext; -import org.opensearch.sql.analysis.Analyzer; -import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.ast.tree.UnresolvedPlan; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.utils.QueryContext; -import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; -import org.opensearch.sql.expression.DSL; -import org.opensearch.sql.expression.function.BuiltinFunctionRepository; -import org.opensearch.sql.planner.Planner; -import org.opensearch.sql.planner.logical.LogicalPlan; -import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; -import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.execution.AbstractPlan; +import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.ppl.parser.AstBuilder; import org.opensearch.sql.ppl.parser.AstExpressionBuilder; +import org.opensearch.sql.ppl.parser.AstStatementBuilder; import org.opensearch.sql.ppl.utils.PPLQueryDataAnonymizer; -import org.opensearch.sql.ppl.utils.UnresolvedPlanHelper; +/** + * PPLService. + */ @RequiredArgsConstructor public class PPLService { private final PPLSyntaxParser parser; - private final ExecutionEngine openSearchExecutionEngine; - - private final BuiltinFunctionRepository repository; + private final QueryManager queryManager; - private final CatalogService catalogService; + private final QueryPlanFactory queryExecutionFactory; private final PPLQueryDataAnonymizer anonymizer = new PPLQueryDataAnonymizer(); @@ -56,7 +50,7 @@ public class PPLService { */ public void execute(PPLQueryRequest request, ResponseListener listener) { try { - openSearchExecutionEngine.execute(plan(request), listener); + queryManager.submit(plan(request, Optional.of(listener), Optional.empty())); } catch (Exception e) { listener.onFailure(e); } @@ -71,28 +65,31 @@ public void execute(PPLQueryRequest request, ResponseListener lis */ public void explain(PPLQueryRequest request, ResponseListener listener) { try { - openSearchExecutionEngine.explain(plan(request), listener); + queryManager.submit(plan(request, Optional.empty(), Optional.of(listener))); } catch (Exception e) { listener.onFailure(e); } } - private PhysicalPlan plan(PPLQueryRequest request) { + private AbstractPlan plan( + PPLQueryRequest request, + Optional> queryListener, + Optional> explainListener) { // 1.Parse query and convert parse tree (CST) to abstract syntax tree (AST) ParseTree cst = parser.parse(request.getRequest()); - UnresolvedPlan ast = cst.accept( - new AstBuilder(new AstExpressionBuilder(), request.getRequest())); - LOG.info("[{}] Incoming request {}", QueryContext.getRequestId(), - anonymizer.anonymizeData(ast)); - // 2.Analyze abstract syntax to generate logical plan - LogicalPlan logicalPlan = - new Analyzer(new ExpressionAnalyzer(repository), catalogService, repository).analyze( - UnresolvedPlanHelper.addSelectAll(ast), - new AnalysisContext()); + Statement statement = + cst.accept( + new AstStatementBuilder( + new AstBuilder(new AstExpressionBuilder(), request.getRequest()), + AstStatementBuilder.StatementBuilderContext.builder() + .isExplain(request.isExplainRequest()) + .build())); - // 3.Generate optimal physical plan from logical plan - return new Planner(LogicalPlanOptimizer.create(new DSL(repository))) - .plan(logicalPlan); - } + LOG.info( + "[{}] Incoming request {}", + QueryContext.getRequestId(), + anonymizer.anonymizeStatement(statement)); + return queryExecutionFactory.create(statement, queryListener, explainListener); + } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/config/PPLServiceConfig.java b/ppl/src/main/java/org/opensearch/sql/ppl/config/PPLServiceConfig.java index 8e6c4f4f7b..1067bbaa6b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/config/PPLServiceConfig.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/config/PPLServiceConfig.java @@ -6,32 +6,24 @@ package org.opensearch.sql.ppl.config; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.ConnectorType; -import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.expression.config.ExpressionConfig; -import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Scope; @Configuration -@Import({ExpressionConfig.class}) public class PPLServiceConfig { @Autowired - private ExecutionEngine executionEngine; + private QueryManager queryManager; @Autowired - private CatalogService catalogService; - - @Autowired - private BuiltinFunctionRepository functionRepository; + private QueryPlanFactory queryPlanFactory; /** * The registration of OpenSearch storage engine happens here because @@ -42,12 +34,7 @@ public class PPLServiceConfig { @Bean @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public PPLService pplService() { - catalogService.getCatalogs() - .forEach(catalog -> catalog.getStorageEngine().getFunctions() - .forEach(functionResolver -> functionRepository - .register(catalog.getName(), functionResolver))); - return new PPLService(new PPLSyntaxParser(), executionEngine, - functionRepository, catalogService); + return new PPLService(new PPLSyntaxParser(), queryManager, queryPlanFactory); } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index 0d8a4c63d1..87532e01d0 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -9,16 +9,17 @@ import java.util.Locale; import java.util.Optional; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.experimental.Accessors; import org.json.JSONObject; import org.opensearch.sql.protocol.response.format.Format; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; -@RequiredArgsConstructor public class PPLQueryRequest { - public static final PPLQueryRequest NULL = new PPLQueryRequest("", null, "", ""); + + private static final String DEFAULT_PPL_PATH = "/_plugins/_ppl"; + + public static final PPLQueryRequest NULL = new PPLQueryRequest("", null, DEFAULT_PPL_PATH, ""); private final String pplQuery; @Getter @@ -38,13 +39,17 @@ public class PPLQueryRequest { @Accessors(fluent = true) private JsonResponseFormatter.Style style = JsonResponseFormatter.Style.COMPACT; + public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path) { + this(pplQuery, jsonContent, path, ""); + } + /** * Constructor of PPLQueryRequest. */ public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path, String format) { this.pplQuery = pplQuery; this.jsonContent = jsonContent; - this.path = path; + this.path = Optional.ofNullable(path).orElse(DEFAULT_PPL_PATH); this.format = format; } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index b0d17940a4..d58cf9dad2 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -13,7 +13,6 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FromClauseContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.HeadCommandContext; -import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.PplStatementContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RareCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RenameCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFilterFromContext; @@ -87,7 +86,7 @@ public class AstBuilder extends OpenSearchPPLParserBaseVisitor { private final String query; @Override - public UnresolvedPlan visitPplStatement(PplStatementContext ctx) { + public UnresolvedPlan visitQueryStatement(OpenSearchPPLParser.QueryStatementContext ctx) { UnresolvedPlan pplCommand = visit(ctx.pplCommands()); return ctx.commands() .stream() diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java new file mode 100644 index 0000000000..e4f40e9a11 --- /dev/null +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.ppl.parser; + +import com.google.common.collect.ImmutableList; +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.ast.expression.AllFields; +import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.Project; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser; +import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParserBaseVisitor; + +/** + * Build {@link Statement} from PPL Query. + */ +@RequiredArgsConstructor +public class AstStatementBuilder extends OpenSearchPPLParserBaseVisitor { + + private final AstBuilder astBuilder; + + private final StatementBuilderContext context; + + @Override + public Statement visitDmlStatement(OpenSearchPPLParser.DmlStatementContext ctx) { + Query query = new Query(addSelectAll(astBuilder.visit(ctx))); + return context.isExplain ? new Explain(query) : query; + } + + @Override + protected Statement aggregateResult(Statement aggregate, Statement nextResult) { + return nextResult != null ? nextResult : aggregate; + } + + @Data + @Builder + public static class StatementBuilderContext { + private final boolean isExplain; + } + + private UnresolvedPlan addSelectAll(UnresolvedPlan plan) { + if ((plan instanceof Project) && !((Project) plan).isExcluded()) { + return plan; + } else { + return new Project(ImmutableList.of(AllFields.of())).attach(plan); + } + } +} diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 314d97009c..1f0e6f0d52 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -29,6 +29,9 @@ import org.opensearch.sql.ast.expression.Or; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.expression.Xor; +import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.Eval; @@ -74,6 +77,23 @@ public String anonymizeData(UnresolvedPlan plan) { return plan.accept(this, null); } + public String anonymizeStatement(Statement plan) { + return plan.accept(this, null); + } + + /** + * Handle Query Statement. + */ + @Override + public String visitQuery(Query node, String context) { + return node.getPlan().accept(this, null); + } + + @Override + public String visitExplain(Explain node, String context) { + return node.getStatement().accept(this, null); + } + @Override public String visitRelation(Relation node, String context) { return StringUtils.format("source=%s", node.getTableName()); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java index 0ecebf160d..9a560e25b0 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java @@ -8,86 +8,51 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import java.util.Collections; -import java.util.Set; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.ConnectorType; import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.DefaultQueryManager; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponseNode; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; -import org.opensearch.sql.expression.DSL; -import org.opensearch.sql.expression.function.BuiltinFunctionRepository; -import org.opensearch.sql.expression.function.FunctionResolver; -import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.ppl.domain.PPLQueryRequest; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.Table; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @RunWith(MockitoJUnitRunner.class) public class PPLServiceTest { - private AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); - - private PPLService pplService; - - @Mock - private StorageEngine storageEngine; - @Mock - private ExecutionEngine executionEngine; + private static String QUERY = "/_plugins/_ppl"; - @Mock - private CatalogService catalogService; + private static String EXPLAIN = "/_plugins/_ppl/_explain"; - @Mock - private BuiltinFunctionRepository functionRepository; - - @Mock - private DSL dsl; + private AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); - @Mock - private Table table; + private PPLService pplService; @Mock - private PhysicalPlan plan; + private QueryService queryService; @Mock private ExecutionEngine.Schema schema; - @Mock - private FunctionResolver functionResolver; - /** * Setup the test context. */ @Before public void setUp() { - when(table.getFieldTypes()).thenReturn(ImmutableMap.of("a", ExprCoreType.INTEGER)); - when(table.implement(any())).thenReturn(plan); - when(storageEngine.getTable(any())).thenReturn(table); - when(catalogService.getCatalogs()) - .thenReturn(Set.of(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine))); - when(catalogService.getCatalog(any())) - .thenReturn(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - when(storageEngine.getFunctions()).thenReturn(Collections.singleton(functionResolver)); - - context.registerBean(StorageEngine.class, () -> storageEngine); - context.registerBean(ExecutionEngine.class, () -> executionEngine); - context.registerBean(CatalogService.class, () -> catalogService); + context.registerBean(QueryManager.class, DefaultQueryManager::new); + context.registerBean(QueryPlanFactory.class, () -> new QueryPlanFactory(queryService)); context.register(PPLServiceConfig.class); context.refresh(); pplService = context.getBean(PPLService.class); @@ -99,9 +64,9 @@ public void testExecuteShouldPass() { ResponseListener listener = invocation.getArgument(1); listener.onResponse(new QueryResponse(schema, Collections.emptyList())); return null; - }).when(executionEngine).execute(any(), any()); + }).when(queryService).execute(any(), any()); - pplService.execute(new PPLQueryRequest("search source=t a=1", null, null), + pplService.execute(new PPLQueryRequest("search source=t a=1", null, QUERY), new ResponseListener() { @Override public void onResponse(QueryResponse pplQueryResponse) { @@ -121,9 +86,9 @@ public void testExecuteCsvFormatShouldPass() { ResponseListener listener = invocation.getArgument(1); listener.onResponse(new QueryResponse(schema, Collections.emptyList())); return null; - }).when(executionEngine).execute(any(), any()); + }).when(queryService).execute(any(), any()); - pplService.execute(new PPLQueryRequest("search source=t a=1", null, "/_plugins/_ppl", "csv"), + pplService.execute(new PPLQueryRequest("search source=t a=1", null, QUERY, "csv"), new ResponseListener() { @Override public void onResponse(QueryResponse pplQueryResponse) { @@ -138,15 +103,13 @@ public void onFailure(Exception e) { @Test public void testExplainShouldPass() { - when(catalogService.getCatalog(any())) - .thenReturn(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); doAnswer(invocation -> { ResponseListener listener = invocation.getArgument(1); listener.onResponse(new ExplainResponse(new ExplainResponseNode("test"))); return null; - }).when(executionEngine).explain(any(), any()); + }).when(queryService).explain(any(), any()); - pplService.explain(new PPLQueryRequest("search source=t a=1", null, null), + pplService.explain(new PPLQueryRequest("search source=t a=1", null, EXPLAIN), new ResponseListener() { @Override public void onResponse(ExplainResponse pplQueryResponse) { @@ -161,7 +124,7 @@ public void onFailure(Exception e) { @Test public void testExecuteWithIllegalQueryShouldBeCaughtByHandler() { - pplService.execute(new PPLQueryRequest("search", null, null), + pplService.execute(new PPLQueryRequest("search", null, QUERY), new ResponseListener() { @Override public void onResponse(QueryResponse pplQueryResponse) { @@ -177,7 +140,7 @@ public void onFailure(Exception e) { @Test public void testExplainWithIllegalQueryShouldBeCaughtByHandler() { - pplService.explain(new PPLQueryRequest("search", null, null), + pplService.explain(new PPLQueryRequest("search", null, QUERY), new ResponseListener<>() { @Override public void onResponse(ExplainResponse pplQueryResponse) { @@ -197,9 +160,9 @@ public void testPrometheusQuery() { ResponseListener listener = invocation.getArgument(1); listener.onResponse(new QueryResponse(schema, Collections.emptyList())); return null; - }).when(executionEngine).execute(any(), any()); + }).when(queryService).execute(any(), any()); - pplService.execute(new PPLQueryRequest("source = prometheus.http_requests_total", null, null), + pplService.execute(new PPLQueryRequest("source = prometheus.http_requests_total", null, QUERY), new ResponseListener<>() { @Override public void onResponse(QueryResponse pplQueryResponse) { @@ -214,8 +177,8 @@ public void onFailure(Exception e) { } @Test - public void test() { - pplService.execute(new PPLQueryRequest("search", null, null), + public void testInvalidPPLQuery() { + pplService.execute(new PPLQueryRequest("search", null, QUERY), new ResponseListener() { @Override public void onResponse(QueryResponse pplQueryResponse) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java new file mode 100644 index 0000000000..4760024692 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.ppl.parser; + +import static org.junit.Assert.assertEquals; +import static org.opensearch.sql.ast.dsl.AstDSL.compare; +import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.filter; +import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.project; +import static org.opensearch.sql.ast.dsl.AstDSL.relation; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.AllFields; +import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; + +public class AstStatementBuilderTest { + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + private PPLSyntaxParser parser = new PPLSyntaxParser(); + + @Test + public void buildQueryStatement() { + assertEqual( + "search source=t a=1", + new Query( + project( + filter(relation("t"), compare("=", field("a"), intLiteral(1))), AllFields.of()))); + } + + @Test + public void buildExplainStatement() { + assertExplainEqual( + "search source=t a=1", + new Explain( + new Query( + project( + filter(relation("t"), compare("=", field("a"), intLiteral(1))), + AllFields.of())))); + } + + private void assertEqual(String query, Statement expectedStatement) { + Node actualPlan = plan(query, false); + assertEquals(expectedStatement, actualPlan); + } + + private void assertExplainEqual(String query, Statement expectedStatement) { + Node actualPlan = plan(query, true); + assertEquals(expectedStatement, actualPlan); + } + + private Node plan(String query, boolean isExplain) { + final AstStatementBuilder builder = + new AstStatementBuilder(new AstBuilder(new AstExpressionBuilder(), query), + AstStatementBuilder.StatementBuilderContext.builder().isExplain(isExplain).build()); + return builder.visit(parser.parse(query)); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 52f2f18b72..1998647dba 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -7,24 +7,20 @@ package org.opensearch.sql.ppl.utils; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.when; import static org.opensearch.sql.ast.dsl.AstDSL.field; import static org.opensearch.sql.ast.dsl.AstDSL.projectWithArg; import static org.opensearch.sql.ast.dsl.AstDSL.relation; -import com.google.common.collect.ImmutableSet; import java.util.Collections; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.ast.tree.UnresolvedPlan; -import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; import org.opensearch.sql.ppl.parser.AstBuilder; import org.opensearch.sql.ppl.parser.AstExpressionBuilder; +import org.opensearch.sql.ppl.parser.AstStatementBuilder; @RunWith(MockitoJUnitRunner.class) public class PPLQueryDataAnonymizerTest { @@ -174,6 +170,20 @@ public void testDateFunction() { ); } + @Test + public void testExplain() { + assertEquals("source=t | fields + a", + anonymizeStatement("source=t | fields a", true) + ); + } + + @Test + public void testQuery() { + assertEquals("source=t | fields + a", + anonymizeStatement("source=t | fields a", false) + ); + } + @Test public void anonymizeFieldsNoArg() { assertEquals("source=t | fields + f", @@ -190,4 +200,14 @@ private String anonymize(UnresolvedPlan plan) { final PPLQueryDataAnonymizer anonymize = new PPLQueryDataAnonymizer(); return anonymize.anonymizeData(plan); } + + private String anonymizeStatement(String query, boolean isExplain) { + AstStatementBuilder builder = + new AstStatementBuilder( + new AstBuilder(new AstExpressionBuilder(), query), + AstStatementBuilder.StatementBuilderContext.builder().isExplain(isExplain).build()); + Statement statement = builder.visit(parser.parse(query)); + PPLQueryDataAnonymizer anonymize = new PPLQueryDataAnonymizer(); + return anonymize.anonymizeStatement(statement); + } } diff --git a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java index 76de0f6249..082a3e9581 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java +++ b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java @@ -6,25 +6,20 @@ package org.opensearch.sql.sql; +import java.util.Optional; import lombok.RequiredArgsConstructor; import org.antlr.v4.runtime.tree.ParseTree; -import org.opensearch.sql.analysis.AnalysisContext; -import org.opensearch.sql.analysis.Analyzer; -import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; -import org.opensearch.sql.expression.DSL; -import org.opensearch.sql.expression.function.BuiltinFunctionRepository; -import org.opensearch.sql.planner.Planner; -import org.opensearch.sql.planner.logical.LogicalPlan; -import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; -import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.execution.AbstractPlan; +import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.sql.antlr.SQLSyntaxParser; import org.opensearch.sql.sql.domain.SQLQueryRequest; import org.opensearch.sql.sql.parser.AstBuilder; -import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.sql.parser.AstStatementBuilder; /** * SQL service. @@ -34,75 +29,52 @@ public class SQLService { private final SQLSyntaxParser parser; - private final Analyzer analyzer; + private final QueryManager queryManager; - private final ExecutionEngine executionEngine; - - private final BuiltinFunctionRepository repository; + private final QueryPlanFactory queryExecutionFactory; /** - * Parse, analyze, plan and execute the query. - * @param request SQL query request - * @param listener callback listener + * Given {@link SQLQueryRequest}, execute it. Using listener to listen result. + * + * @param request {@link SQLQueryRequest} + * @param listener callback listener */ public void execute(SQLQueryRequest request, ResponseListener listener) { try { - executionEngine.execute( - plan( - analyze( - parse(request.getQuery()))), listener); + queryManager.submit(plan(request, Optional.of(listener), Optional.empty())); } catch (Exception e) { listener.onFailure(e); } } /** - * Given physical plan, execute it and listen on response. - * @param plan physical plan - * @param listener callback listener + * Given {@link SQLQueryRequest}, explain it. Using listener to listen result. + * + * @param request {@link SQLQueryRequest} + * @param listener callback listener */ - public void execute(PhysicalPlan plan, ResponseListener listener) { + public void explain(SQLQueryRequest request, ResponseListener listener) { try { - executionEngine.execute(plan, listener); + queryManager.submit(plan(request, Optional.empty(), Optional.of(listener))); } catch (Exception e) { listener.onFailure(e); } } - /** - * Given physical plan, explain it. - * @param plan physical plan - * @param listener callback listener - */ - public void explain(PhysicalPlan plan, ResponseListener listener) { - try { - executionEngine.explain(plan, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } + private AbstractPlan plan( + SQLQueryRequest request, + Optional> queryListener, + Optional> explainListener) { + // 1.Parse query and convert parse tree (CST) to abstract syntax tree (AST) + ParseTree cst = parser.parse(request.getQuery()); + Statement statement = + cst.accept( + new AstStatementBuilder( + new AstBuilder(request.getQuery()), + AstStatementBuilder.StatementBuilderContext.builder() + .isExplain(request.isExplainRequest()) + .build())); - /** - * Parse query and convert parse tree (CST) to abstract syntax tree (AST). - */ - public UnresolvedPlan parse(String query) { - ParseTree cst = parser.parse(query); - return cst.accept(new AstBuilder(query)); + return queryExecutionFactory.create(statement, queryListener, explainListener); } - - /** - * Analyze abstract syntax to generate logical plan. - */ - public LogicalPlan analyze(UnresolvedPlan ast) { - return analyzer.analyze(ast, new AnalysisContext()); - } - - /** - * Generate optimal physical plan from logical plan. - */ - public PhysicalPlan plan(LogicalPlan logicalPlan) { - return new Planner(LogicalPlanOptimizer.create(new DSL(repository))) - .plan(logicalPlan); - } - } diff --git a/sql/src/main/java/org/opensearch/sql/sql/config/SQLServiceConfig.java b/sql/src/main/java/org/opensearch/sql/sql/config/SQLServiceConfig.java index e5c9de3d2a..4287883c34 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/config/SQLServiceConfig.java +++ b/sql/src/main/java/org/opensearch/sql/sql/config/SQLServiceConfig.java @@ -6,42 +6,27 @@ package org.opensearch.sql.sql.config; -import org.opensearch.sql.analysis.Analyzer; -import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.expression.config.ExpressionConfig; -import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.sql.SQLService; import org.opensearch.sql.sql.antlr.SQLSyntaxParser; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Scope; /** * SQL service configuration for Spring container initialization. */ @Configuration -@Import({ExpressionConfig.class}) public class SQLServiceConfig { @Autowired - private ExecutionEngine executionEngine; + private QueryManager queryManager; @Autowired - private CatalogService catalogService; - - @Autowired - private BuiltinFunctionRepository functionRepository; - - @Bean - public Analyzer analyzer() { - return new Analyzer(new ExpressionAnalyzer(functionRepository), catalogService, - functionRepository); - } + private QueryPlanFactory queryExecutionFactory; /** * The registration of OpenSearch storage engine happens here because @@ -54,10 +39,8 @@ public Analyzer analyzer() { public SQLService sqlService() { return new SQLService( new SQLSyntaxParser(), - new Analyzer(new ExpressionAnalyzer(functionRepository), - catalogService, functionRepository), - executionEngine, - functionRepository); + queryManager, + queryExecutionFactory); } } diff --git a/sql/src/main/java/org/opensearch/sql/sql/parser/AstStatementBuilder.java b/sql/src/main/java/org/opensearch/sql/sql/parser/AstStatementBuilder.java new file mode 100644 index 0000000000..40d549764a --- /dev/null +++ b/sql/src/main/java/org/opensearch/sql/sql/parser/AstStatementBuilder.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.sql.parser; + +import lombok.Builder; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.sql.antlr.parser.OpenSearchSQLParser; +import org.opensearch.sql.sql.antlr.parser.OpenSearchSQLParserBaseVisitor; + +@RequiredArgsConstructor +public class AstStatementBuilder extends OpenSearchSQLParserBaseVisitor { + + private final AstBuilder astBuilder; + + private final StatementBuilderContext context; + + @Override + public Statement visitSqlStatement(OpenSearchSQLParser.SqlStatementContext ctx) { + Query query = new Query(astBuilder.visit(ctx)); + return context.isExplain ? new Explain(query) : query; + } + + @Override + protected Statement aggregateResult(Statement aggregate, Statement nextResult) { + return nextResult != null ? nextResult : aggregate; + } + + @Data + @Builder + public static class StatementBuilderContext { + private final boolean isExplain; + } +} diff --git a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java index 774c5e2d52..f1d2c5293d 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java @@ -10,8 +10,6 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import java.util.Collections; @@ -21,41 +19,39 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.DefaultQueryManager; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponseNode; -import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.sql.config.SQLServiceConfig; import org.opensearch.sql.sql.domain.SQLQueryRequest; -import org.opensearch.sql.storage.StorageEngine; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @ExtendWith(MockitoExtension.class) class SQLServiceTest { - private AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + private static String QUERY = "/_plugins/_sql"; - private SQLService sqlService; + private static String EXPLAIN = "/_plugins/_sql/_explain"; - @Mock - private StorageEngine storageEngine; + private AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); - @Mock - private ExecutionEngine executionEngine; + private SQLService sqlService; @Mock - private CatalogService catalogService; + private QueryService queryService; @Mock private ExecutionEngine.Schema schema; @BeforeEach public void setUp() { - context.registerBean(StorageEngine.class, () -> storageEngine); - context.registerBean(ExecutionEngine.class, () -> executionEngine); - context.registerBean(CatalogService.class, () -> catalogService); + context.registerBean(QueryManager.class, DefaultQueryManager::new); + context.registerBean(QueryPlanFactory.class, () -> new QueryPlanFactory(queryService)); context.register(SQLServiceConfig.class); context.refresh(); sqlService = context.getBean(SQLService.class); @@ -67,10 +63,10 @@ public void canExecuteSqlQuery() { ResponseListener listener = invocation.getArgument(1); listener.onResponse(new QueryResponse(schema, Collections.emptyList())); return null; - }).when(executionEngine).execute(any(), any()); + }).when(queryService).execute(any(), any()); sqlService.execute( - new SQLQueryRequest(new JSONObject(), "SELECT 123", "_plugins/_sql", "jdbc"), + new SQLQueryRequest(new JSONObject(), "SELECT 123", QUERY, "jdbc"), new ResponseListener() { @Override public void onResponse(QueryResponse response) { @@ -90,10 +86,10 @@ public void canExecuteCsvFormatRequest() { ResponseListener listener = invocation.getArgument(1); listener.onResponse(new QueryResponse(schema, Collections.emptyList())); return null; - }).when(executionEngine).execute(any(), any()); + }).when(queryService).execute(any(), any()); sqlService.execute( - new SQLQueryRequest(new JSONObject(), "SELECT 123", "_plugins/_sql", "csv"), + new SQLQueryRequest(new JSONObject(), "SELECT 123", QUERY, "csv"), new ResponseListener() { @Override public void onResponse(QueryResponse response) { @@ -113,9 +109,9 @@ public void canExplainSqlQuery() { ResponseListener listener = invocation.getArgument(1); listener.onResponse(new ExplainResponse(new ExplainResponseNode("Test"))); return null; - }).when(executionEngine).explain(any(), any()); + }).when(queryService).explain(any(), any()); - sqlService.explain(mock(PhysicalPlan.class), + sqlService.explain(new SQLQueryRequest(new JSONObject(), "SELECT 123", EXPLAIN, "csv"), new ResponseListener() { @Override public void onResponse(ExplainResponse response) { @@ -129,50 +125,10 @@ public void onFailure(Exception e) { }); } - @Test - public void canExecuteFromPhysicalPlan() { - doAnswer(invocation -> { - ResponseListener listener = invocation.getArgument(1); - listener.onResponse(new QueryResponse(schema, Collections.emptyList())); - return null; - }).when(executionEngine).execute(any(), any()); - - sqlService.execute(mock(PhysicalPlan.class), - new ResponseListener() { - @Override - public void onResponse(QueryResponse response) { - assertNotNull(response); - } - - @Override - public void onFailure(Exception e) { - fail(e); - } - }); - } - @Test public void canCaptureErrorDuringExecution() { sqlService.execute( - new SQLQueryRequest(new JSONObject(), "SELECT", "_plugins/_sql", ""), - new ResponseListener() { - @Override - public void onResponse(QueryResponse response) { - fail(); - } - - @Override - public void onFailure(Exception e) { - assertNotNull(e); - } - }); - } - - @Test - public void canCaptureErrorDuringExecutionFromPhysicalPlan() { - doThrow(new RuntimeException()).when(executionEngine).execute(any(), any()); - - sqlService.execute(mock(PhysicalPlan.class), + new SQLQueryRequest(new JSONObject(), "SELECT", QUERY, ""), new ResponseListener() { @Override public void onResponse(QueryResponse response) { @@ -188,9 +144,8 @@ public void onFailure(Exception e) { @Test public void canCaptureErrorDuringExplain() { - doThrow(new RuntimeException()).when(executionEngine).explain(any(), any()); - - sqlService.explain(mock(PhysicalPlan.class), + sqlService.explain( + new SQLQueryRequest(new JSONObject(), "SELECT", EXPLAIN, ""), new ResponseListener() { @Override public void onResponse(ExplainResponse response) {