From de56e8989ec7c1451ce463ae7604256215703760 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 11 Nov 2022 14:49:44 -0800 Subject: [PATCH 01/11] add streaming plan Signed-off-by: Peng Huo --- .../opensearch/sql/ast/statement/Query.java | 2 +- .../sql/executor/DefaultQueryManager.java | 42 ++- .../sql/executor/ExecutionContext.java | 30 ++ .../sql/executor/ExecutionEngine.java | 7 + .../opensearch/sql/executor/QueryManager.java | 9 + .../opensearch/sql/executor/QueryService.java | 7 +- .../sql/executor/execution/QueryPlan.java | 6 +- .../execution/StreamingQueryPlan.java | 137 +++++++ .../MicroBatchStreamingExecution.java | 2 +- .../sql/planner/physical/PhysicalPlan.java | 5 + .../org/opensearch/sql/storage/Table.java | 7 + .../sql/executor/DefaultQueryManagerTest.java | 52 ++- .../sql/executor/QueryManagerTest.java | 29 ++ .../sql/executor/QueryServiceTest.java | 82 ++-- .../IntervalTriggerExecutionTest.java | 102 +++++ .../execution/StreamingQueryPlanTest.java | 137 +++++++ .../planner/physical/PhysicalPlanTest.java | 53 +++ .../physical/catalog/CatalogTableTest.java | 7 + filesystem/build.gradle | 4 +- integ-test/build.gradle | 4 +- .../org/opensearch/sql/ppl/StandaloneIT.java | 2 +- .../opensearch/sql/ppl/StreamingQueryIT.java | 350 ++++++++++++++++++ .../executor/OpenSearchExecutionEngine.java | 9 + .../OpenSearchExecutionEngineTest.java | 47 +++ 24 files changed, 1090 insertions(+), 42 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/executor/ExecutionContext.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/execution/StreamingQueryPlan.java create mode 100644 core/src/test/java/org/opensearch/sql/executor/QueryManagerTest.java create mode 100644 core/src/test/java/org/opensearch/sql/executor/execution/IntervalTriggerExecutionTest.java create mode 100644 core/src/test/java/org/opensearch/sql/executor/execution/StreamingQueryPlanTest.java create mode 100644 core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTest.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java index a7b547ed2a..17682cd47b 100644 --- a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java +++ b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java @@ -26,7 +26,7 @@ @RequiredArgsConstructor public class Query extends Statement { - private final UnresolvedPlan plan; + protected final UnresolvedPlan plan; @Override public R accept(AbstractNodeVisitor visitor, C context) { diff --git a/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java b/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java index 9ab3bd7486..af0db86ad5 100644 --- a/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java +++ b/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java @@ -8,6 +8,12 @@ package org.opensearch.sql.executor; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.opensearch.sql.executor.execution.AbstractPlan; /** @@ -15,10 +21,40 @@ */ public class DefaultQueryManager implements QueryManager { + private final ExecutorService executorService; + + private final Map> map = new HashMap<>(); + + public DefaultQueryManager(ExecutorService executorService) { + this.executorService = executorService; + } + + public static DefaultQueryManager defaultQueryManager() { + return new DefaultQueryManager(Executors.newSingleThreadExecutor()); + } + @Override - public QueryId submit(AbstractPlan queryExecution) { - queryExecution.execute(); + public synchronized QueryId submit(AbstractPlan queryExecution) { + Future future = executorService.submit(queryExecution::execute); + QueryId queryId = queryExecution.getQueryId(); + + map.put(queryId, future); + return queryId; + } + + @Override + public synchronized boolean cancel(QueryId queryId) { + if (map.containsKey(queryId)) { + Future future = map.get(queryId); + map.remove(queryId); + return future.cancel(true); + } else { + return false; + } + } - return queryExecution.getQueryId(); + public void awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException { + executorService.shutdown(); + executorService.awaitTermination(timeout, timeUnit); } } diff --git a/core/src/main/java/org/opensearch/sql/executor/ExecutionContext.java b/core/src/main/java/org/opensearch/sql/executor/ExecutionContext.java new file mode 100644 index 0000000000..8a3162068f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/ExecutionContext.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import java.util.Optional; +import lombok.Getter; +import org.opensearch.sql.storage.split.Split; + +/** + * Execution context hold planning related information. + */ +public class ExecutionContext { + @Getter + private final Optional split; + + public ExecutionContext(Split split) { + this.split = Optional.of(split); + } + + private ExecutionContext(Optional split) { + this.split = split; + } + + public static ExecutionContext emptyExecutionContext() { + return new ExecutionContext(Optional.empty()); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java index 6de841a890..1936a0f517 100644 --- a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java @@ -23,12 +23,19 @@ public interface ExecutionEngine { /** * Execute physical plan and call back response listener. + * Todo. deprecated this interface after finalize {@link ExecutionContext}. * * @param plan executable physical plan * @param listener response listener */ void execute(PhysicalPlan plan, ResponseListener listener); + /** + * Execute physical plan with {@link ExecutionContext} and call back response listener. + */ + void execute(PhysicalPlan plan, ExecutionContext context, + ResponseListener listener); + /** * Explain physical plan and call back response listener. The reason why this has to * be part of execution engine interface is that the physical plan probably needs to diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryManager.java b/core/src/main/java/org/opensearch/sql/executor/QueryManager.java index 3a32e4c7e9..5b41d7ce2e 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryManager.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryManager.java @@ -22,4 +22,13 @@ public interface QueryManager { * @return {@link QueryId}. */ QueryId submit(AbstractPlan queryPlan); + + /** + * Cancel submitted {@link AbstractPlan} by {@link QueryId}. + * + * @return true indicate successful. + */ + default boolean cancel(QueryId queryId) { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 32fda1ab2c..5c5b04aba3 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -55,7 +55,12 @@ public void executePlan(LogicalPlan plan, PlanContext planContext, ResponseListener listener) { try { - executionEngine.execute(plan(plan), listener); + planContext + .getSplit() + .ifPresentOrElse( + split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener), + () -> executionEngine.execute( + plan(plan), ExecutionContext.emptyExecutionContext(), listener)); } catch (Exception e) { listener.onFailure(e); } diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java index 02cb701a0b..af5c032d49 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java @@ -24,14 +24,14 @@ public class QueryPlan extends AbstractPlan { /** * The query plan ast. */ - private final UnresolvedPlan plan; + protected final UnresolvedPlan plan; /** * Query service. */ - private final QueryService queryService; + protected final QueryService queryService; - private final ResponseListener listener; + protected final ResponseListener listener; /** constructor. */ public QueryPlan( diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/StreamingQueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/StreamingQueryPlan.java new file mode 100644 index 0000000000..9bb37b064c --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/execution/StreamingQueryPlan.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.execution; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.streaming.DefaultMetadataLog; +import org.opensearch.sql.executor.streaming.MicroBatchStreamingExecution; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; +import org.opensearch.sql.planner.logical.LogicalRelation; + +/** + * Streaming Query Plan. + */ +public class StreamingQueryPlan extends QueryPlan { + + private static final Logger log = LogManager.getLogger(StreamingQueryPlan.class); + + private final ExecutionStrategy executionStrategy; + + private MicroBatchStreamingExecution streamingExecution; + + /** + * constructor. + */ + public StreamingQueryPlan(QueryId queryId, + UnresolvedPlan plan, + QueryService queryService, + ResponseListener listener, + ExecutionStrategy executionStrategy) { + super(queryId, plan, queryService, listener); + + this.executionStrategy = executionStrategy; + } + + @Override + public void execute() { + try { + LogicalPlan logicalPlan = queryService.analyze(plan); + StreamingSource streamingSource = buildStreamingSource(logicalPlan); + streamingExecution = + new MicroBatchStreamingExecution( + streamingSource, + logicalPlan, + queryService, + new DefaultMetadataLog<>(), + new DefaultMetadataLog<>()); + executionStrategy.execute(streamingExecution::execute); + } catch (UnsupportedOperationException | IllegalArgumentException e) { + listener.onFailure(e); + } catch (InterruptedException e) { + log.error(e); + // todo, update async task status. + } + } + + interface ExecutionStrategy { + /** + * execute task. + */ + void execute(Runnable task) throws InterruptedException; + } + + /** + * execute task with fixed interval. + * if task run time < interval, trigger next task on next interval. + * if task run time >= interval, trigger next task immediately. + */ + @RequiredArgsConstructor + public static class IntervalTriggerExecution implements ExecutionStrategy { + + private final long intervalInSeconds; + + @Override + public void execute(Runnable runnable) throws InterruptedException { + while (!Thread.currentThread().isInterrupted()) { + try { + Instant start = Instant.now(); + runnable.run(); + Instant end = Instant.now(); + long took = Duration.between(start, end).toSeconds(); + TimeUnit.SECONDS.sleep(intervalInSeconds > took ? intervalInSeconds - took : 0); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + private StreamingSource buildStreamingSource(LogicalPlan logicalPlan) { + return logicalPlan.accept(new StreamingSourceBuilder(), null); + } + + static class StreamingSourceBuilder extends LogicalPlanNodeVisitor { + @Override + public StreamingSource visitNode(LogicalPlan plan, Void context) { + List children = plan.getChild(); + if (children.isEmpty()) { + String errorMsg = + String.format( + "Could find relation plan, %s does not have child node.", + plan.getClass().getSimpleName()); + log.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + return children.get(0).accept(this, context); + } + + @Override + public StreamingSource visitRelation(LogicalRelation plan, Void context) { + try { + return plan.getTable().asStreamingSource(); + } catch (UnsupportedOperationException e) { + String errorMsg = + String.format( + "table %s could not been used as streaming source.", plan.getRelationName()); + log.error(errorMsg); + throw new UnsupportedOperationException(errorMsg); + } + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java index 4f25b9433f..d7087e6153 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java @@ -105,7 +105,7 @@ public void onResponse(ExecutionEngine.QueryResponse response) { @Override public void onFailure(Exception e) { - log.error("streaming processing failed. source = {}", source); + log.error("streaming processing failed. source = {}", source, e); } }); } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java index a067f5b3f9..b476b01557 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java @@ -10,6 +10,7 @@ import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.planner.PlanNode; +import org.opensearch.sql.storage.split.Split; /** * Physical plan. @@ -36,6 +37,10 @@ public void close() { getChild().forEach(PhysicalPlan::close); } + public void add(Split split) { + getChild().forEach(child -> child.add(split)); + } + public ExecutionEngine.Schema schema() { throw new IllegalStateException(String.format("[BUG] schema can been only applied to " + "ProjectOperator, instead of %s", toString())); diff --git a/core/src/main/java/org/opensearch/sql/storage/Table.java b/core/src/main/java/org/opensearch/sql/storage/Table.java index 34e6ece30b..f43531e2a6 100644 --- a/core/src/main/java/org/opensearch/sql/storage/Table.java +++ b/core/src/main/java/org/opensearch/sql/storage/Table.java @@ -8,6 +8,7 @@ import java.util.Map; import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.streaming.StreamingSource; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -58,4 +59,10 @@ default LogicalPlan optimize(LogicalPlan plan) { return plan; } + /** + * Translate {@link Table} to {@link StreamingSource} if possible. + */ + default StreamingSource asStreamingSource() { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java index 988b41657d..fc5c1d985c 100644 --- a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java @@ -10,32 +10,78 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.executor.execution.AbstractPlan; @ExtendWith(MockitoExtension.class) class DefaultQueryManagerTest { @Mock - private AbstractPlan plan; + private QueryId queryId; @Mock - private QueryId queryId; + private AbstractPlan plan; + + private DefaultQueryManager queryManager; + + private ExecutorService executorService; + + @AfterEach + void clean() throws InterruptedException { + queryManager.awaitTermination(1, TimeUnit.SECONDS); + } @Test public void submitQuery() { when(plan.getQueryId()).thenReturn(queryId); - QueryId actualQueryId = new DefaultQueryManager().submit(plan); + queryManager = DefaultQueryManager.defaultQueryManager();; + QueryId actualQueryId = queryManager.submit(plan); assertEquals(queryId, actualQueryId); verify(plan, times(1)).execute(); } + + @Test + public void cancel() { + queryManager = DefaultQueryManager.defaultQueryManager();; + QueryId id = queryManager.submit(new AbstractPlan(queryId) { + @Override + public void execute() { + // do nothing + } + + @Override + public void explain(ResponseListener listener) { + // do nothing + } + }); + assertTrue(queryManager.cancel(id)); + assertFalse(queryManager.cancel(id)); + } + + @Test + public void executorServiceShutdownProperly() throws InterruptedException { + executorService = Executors.newSingleThreadExecutor(); + queryManager = new DefaultQueryManager(executorService); + queryManager.submit(plan); + queryManager.awaitTermination(1, TimeUnit.SECONDS); + + assertTrue(executorService.isShutdown()); + assertTrue(executorService.isTerminated()); + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryManagerTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryManagerTest.java new file mode 100644 index 0000000000..7f34d348bc --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/QueryManagerTest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class QueryManagerTest { + + @Mock private QueryId queryId; + + private QueryManager queryManager = + id -> { + throw new UnsupportedOperationException(); + }; + + @Test + void cancelIsNotSupportedByDefault() { + assertThrows(UnsupportedOperationException.class, () -> queryManager.cancel(queryId)); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java index c65210e97e..b229e56a00 100644 --- a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.when; import java.util.Collections; +import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,6 +30,7 @@ import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.storage.split.Split; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @ExtendWith(MockitoExtension.class) @@ -61,6 +63,9 @@ class QueryServiceTest { @Mock private PlanContext planContext; + @Mock + private Split split; + @BeforeEach public void setUp() { lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); @@ -73,13 +78,13 @@ public void setUp() { public void testExecuteShouldPass() { doAnswer( invocation -> { - ResponseListener listener = invocation.getArgument(1); + ResponseListener listener = invocation.getArgument(2); listener.onResponse( new ExecutionEngine.QueryResponse(schema, Collections.emptyList())); return null; }) .when(executionEngine) - .execute(any(), any()); + .execute(any(), any(), any()); queryService.execute( ast, @@ -129,7 +134,7 @@ public void onFailure(Exception e) { public void testExecuteWithExceptionShouldBeCaughtByHandler() { doThrow(new IllegalStateException("illegal state exception")) .when(executionEngine) - .execute(any(), any()); + .execute(any(), any(), any()); queryService.execute( ast, @@ -168,30 +173,57 @@ public void onFailure(Exception e) { } @Test - public void testExecutePlanShouldPass() { - doAnswer( - invocation -> { - ResponseListener listener = invocation.getArgument(1); - listener.onResponse( - new ExecutionEngine.QueryResponse(schema, Collections.emptyList())); - return null; - }) - .when(executionEngine) - .execute(any(), any()); + public void testExecutePlan() { + queryService() + .executeSuccess(); - queryService.executePlan( - logicalPlan, - planContext, - new ResponseListener<>() { - @Override - public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + queryService() + .executeSuccessWithSplit(); + } - } + Helper queryService() { + return new Helper(); + } + + class Helper { + public Helper() { + doAnswer( + invocation -> { + ResponseListener listener = invocation.getArgument(2); + listener.onResponse( + new ExecutionEngine.QueryResponse(schema, Collections.emptyList())); + return null; + }) + .when(executionEngine) + .execute(any(), any(), any()); + } + + void executeSuccess() { + when(planContext.getSplit()).thenReturn(Optional.empty()); + executeInternal(); + } + + void executeSuccessWithSplit() { + when(planContext.getSplit()).thenReturn(Optional.of(split)); + executeInternal(); + } + + private void executeInternal() { + queryService.executePlan( + logicalPlan, + planContext, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }); + } - @Override - public void onFailure(Exception e) { - fail(); - } - }); } } diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/IntervalTriggerExecutionTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/IntervalTriggerExecutionTest.java new file mode 100644 index 0000000000..e0638ba88f --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/execution/IntervalTriggerExecutionTest.java @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.execution; + +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +public class IntervalTriggerExecutionTest { + + @Test + void executeTaskWithInterval() { + triggerTask(2) + .taskRun(1) + .aroundInterval(); + } + + @Test + void continueExecuteIfTaskRunningLongerThanInterval() { + triggerTask(1) + .taskRun(2) + .aroundTaskRuntime(); + } + + Helper triggerTask(long interval) { + return new Helper(interval); + } + + class Helper implements Runnable { + + private StreamingQueryPlan.IntervalTriggerExecution executionStrategy; + + private static final int START = 0; + + private static final int FINISH = 1; + + private static final int UNEXPECTED = 2; + + private int state = START; + + private long interval; + + private long taskExecutionTime; + + Instant start; + + Instant end; + + public Helper(long interval) { + this.interval = interval; + this.executionStrategy = new StreamingQueryPlan.IntervalTriggerExecution(interval); + } + + @SneakyThrows + Helper taskRun(long taskExecutionTime) { + this.taskExecutionTime = taskExecutionTime; + executionStrategy.execute(this::run); + return this; + } + + @SneakyThrows + void aroundInterval() { + assertTime(interval); + } + + @SneakyThrows + void aroundTaskRuntime() { + assertTime(taskExecutionTime); + } + + void assertTime(long expected) { + long took = Duration.between(start, end).toSeconds(); + assertTrue( + took >= expected - 1 || took <= expected + 1, + String.format("task interval should around %d, but took :%d", expected, took)); + } + + @SneakyThrows + @Override + public void run() { + assertNotEquals(UNEXPECTED, state); + + if (state == FINISH) { + end = Instant.now(); + state = UNEXPECTED; + Thread.currentThread().interrupt(); + } else { + start = Instant.now(); + state = FINISH; + } + TimeUnit.SECONDS.sleep(taskExecutionTime); + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/StreamingQueryPlanTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/StreamingQueryPlanTest.java new file mode 100644 index 0000000000..7357e99d18 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/execution/StreamingQueryPlanTest.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.execution; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.planner.logical.LogicalPlanDSL; +import org.opensearch.sql.storage.Table; + +@ExtendWith(MockitoExtension.class) +class StreamingQueryPlanTest { + + static final String tableName = "mock"; + + @Mock private QueryService queryService; + + @Mock private QueryId queryId; + + @Mock private StreamingQueryPlan.ExecutionStrategy executionStrategy; + + @Mock private ResponseListener listener; + + @Mock private UnresolvedPlan unresolvedPlan; + + @Mock private Table table; + + @Mock private StreamingSource streamingSource; + + @Test + void executionSuccess() throws InterruptedException { + streamingQuery() + .streamingSource() + .shouldSuccess(); + } + + @Test + void failIfNoRelation() throws InterruptedException { + streamingQuery() + .withoutSource() + .shouldFail("Could find relation plan, LogicalValues does not have child node."); + } + + @Test + void failIfNoStreamingSource() throws InterruptedException { + streamingQuery() + .nonStreamingSource() + .shouldFail(String.format("table %s could not been used as streaming source.", tableName)); + } + + @Test + void taskExecutionShouldNotCallListener() throws InterruptedException { + streamingQuery() + .streamingSource() + .taskExecutionShouldNotCallListener(); + } + + Helper streamingQuery() { + return new Helper(); + } + + class Helper { + + private StreamingQueryPlan queryPlan; + + public Helper() { + queryPlan = + new StreamingQueryPlan( + queryId, unresolvedPlan, queryService, listener, executionStrategy); + } + + Helper streamingSource() { + when(table.asStreamingSource()).thenReturn(streamingSource); + when(queryService.analyze(any())) + .thenReturn( + LogicalPlanDSL.project( + LogicalPlanDSL.relation(tableName, table), + DSL.named("integer_value", DSL.ref("integer_value", INTEGER)))); + return this; + } + + Helper nonStreamingSource() { + when(table.asStreamingSource()).thenThrow(UnsupportedOperationException.class); + when(queryService.analyze(any())).thenReturn(LogicalPlanDSL.relation(tableName, table)); + + return this; + } + + Helper withoutSource() { + when(queryService.analyze(any())).thenReturn(LogicalPlanDSL.values()); + + return this; + } + + void shouldSuccess() throws InterruptedException { + queryPlan.execute(); + verify(executionStrategy).execute(any()); + verify(listener, never()).onFailure(any()); + verify(listener, never()).onResponse(any()); + } + + void shouldFail(String expectedException) throws InterruptedException { + queryPlan.execute(); + verify(executionStrategy, never()).execute(any()); + ArgumentCaptor argument = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(argument.capture()); + assertEquals(expectedException, argument.getValue().getMessage()); + } + + void taskExecutionShouldNotCallListener() throws InterruptedException { + doThrow(InterruptedException.class).when(executionStrategy).execute(any()); + queryPlan.execute(); + verify(listener, never()).onFailure(any()); + verify(listener, never()).onResponse(any()); + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTest.java new file mode 100644 index 0000000000..0a93c96bbb --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanTest.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical; + +import static org.mockito.Mockito.verify; + +import java.util.List; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.storage.split.Split; + +@ExtendWith(MockitoExtension.class) +class PhysicalPlanTest { + @Mock + Split split; + + @Mock + PhysicalPlan child; + + private PhysicalPlan testPlan = new PhysicalPlan() { + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasNext() { + throw new UnsupportedOperationException(); + } + + @Override + public ExprValue next() { + throw new UnsupportedOperationException(); + } + + @Override + public List getChild() { + return List.of(child); + } + }; + + @Test + void addSplitToChildByDefault() { + testPlan.add(split); + verify(child).add(split); + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableTest.java index a35c94a21c..ae1590a4fc 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableTest.java @@ -64,4 +64,11 @@ void testCreateTable() { () -> new CatalogTable(catalogService).create(new HashMap<>())); assertEquals("Unsupported Operation", exception.getMessage()); } + + @Test + void defaultAsStreamingSource() { + assertThrows( + UnsupportedOperationException.class, + () -> new CatalogTable(catalogService).asStreamingSource()); + } } diff --git a/filesystem/build.gradle b/filesystem/build.gradle index 0571088132..935d0742bd 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -19,9 +19,9 @@ configurations.all { } dependencies { - implementation project(':core') + api project(':core') // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. - implementation("org.apache.hadoop:hadoop-common:${hadoop}") { + api("org.apache.hadoop:hadoop-common:${hadoop}") { exclude group: 'org.apache.zookeeper' exclude group: 'org.eclipse.jetty' exclude group: 'com.sun.jersey' diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 7a2e5cd406..5fb5d92769 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -144,8 +144,8 @@ stopPrometheus.mustRunAfter startPrometheus // Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled integTest { dependsOn ':opensearch-sql-plugin:bundlePlugin' - dependsOn startPrometheus - finalizedBy stopPrometheus +// dependsOn startPrometheus +// finalizedBy stopPrometheus systemProperty 'tests.security.manager', 'false' systemProperty('project.root', project.projectDir.absolutePath) diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index bcd0c0ffb8..1ea6fb5a0a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -176,7 +176,7 @@ static class StandaloneConfig { @Bean QueryManager queryManager() { - return new DefaultQueryManager(); + return DefaultQueryManager.defaultQueryManager(); } @Bean diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java new file mode 100644 index 0000000000..366ca06af3 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java @@ -0,0 +1,350 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import com.carrotsearch.randomizedtesting.ThreadFilter; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import com.google.common.collect.ImmutableMap; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Test; +import org.opensearch.sql.CatalogSchemaName; +import org.opensearch.sql.analysis.Analyzer; +import org.opensearch.sql.analysis.ExpressionAnalyzer; +import org.opensearch.sql.ast.dsl.AstDSL; +import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.DefaultQueryManager; +import org.opensearch.sql.executor.ExecutionContext; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.execution.StreamingQueryPlan; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.config.ExpressionConfig; +import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; +import org.opensearch.sql.filesystem.streaming.FileSystemStreamSource; +import org.opensearch.sql.planner.Planner; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; +import org.opensearch.sql.plugin.catalog.CatalogServiceImpl; +import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.split.Split; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +@ThreadLeakFilters(filters = {StreamingQueryIT.HadoopFSThreadsFilter.class}) +public class StreamingQueryIT extends PPLIntegTestCase { + + private static final int INTERVAL_IN_SECONDS = 1; + + private final AtomicInteger result = new AtomicInteger(0); + + private Source source; + + private StreamingQuery query; + + @Test + public void testStreamingQuery() throws IOException, InterruptedException { + source = new Source(); + query = fromFile(source.tempDir); + query.run(); + + source.add(0); + source.add(1); + query.sumShouldBe(1); + + // no source update. + query.sumShouldBe(1); + + source.add(1); + query.sumShouldBe(2); + } + + @After + void clean() throws InterruptedException, IOException { + query.close(); + source.close(); + } + + StreamingQuery fromFile(java.nio.file.Path path) throws IOException { + return new StreamingQuery(path); + } + + class Source { + @Getter + private final java.nio.file.Path tempDir; + + public Source() throws IOException { + tempDir = Files.createTempDirectory("tempDir"); + } + + Source add(int v) throws IOException { + java.nio.file.Path path = + Files.createFile(tempDir.resolve(UUID.randomUUID().toString())); + String buf = String.valueOf(v); + FileOutputStream outputStream = new FileOutputStream(path.toFile()); + outputStream.write(buf.getBytes(StandardCharsets.UTF_8)); + outputStream.close(); + return this; + } + + void close() throws IOException { + Files.walk(tempDir) + .sorted(Comparator.reverseOrder()) + .map(java.nio.file.Path::toFile) + .forEach(File::delete); + assertFalse("temp dir still exist", Files.exists(tempDir)); + } + } + + class StreamingQuery { + + final AnnotationConfigApplicationContext context; + + final DefaultQueryManager queryManager; + + final QueryService queryService; + + final FileSystem fs; + + final QueryId queryId = QueryId.queryId(); + + public StreamingQuery(java.nio.file.Path tempDir) throws IOException { + result.set(0); + fs = FileSystem.get(new Configuration()); + context = new AnnotationConfigApplicationContext(); + context.register(ExpressionConfig.class); + context.refresh(); + BuiltinFunctionRepository functionRepository = + context.getBean(BuiltinFunctionRepository.class); + CatalogService catalogService = CatalogServiceImpl.getInstance(); + CatalogServiceImpl.getInstance() + .registerDefaultOpenSearchCatalog(new FSStorageEngine(fs, new Path(tempDir.toUri()))); + Analyzer analyzer = + new Analyzer( + new ExpressionAnalyzer(functionRepository), catalogService, functionRepository); + Planner planner = new Planner(LogicalPlanOptimizer.create(new DSL(functionRepository))); + + queryManager = DefaultQueryManager.defaultQueryManager(); + queryService = new QueryService(analyzer, new DefaultExecutionEngine(), planner); + } + + public StreamingQuery run() { + queryManager.submit( + new StreamingQueryPlan( + queryId, + AstDSL.relation("mock"), + queryService, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + fail(); + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }, + new StreamingQueryPlan.IntervalTriggerExecution(1))); + return this; + } + + void sumShouldBe(int expected) throws InterruptedException { + TimeUnit.SECONDS.sleep(INTERVAL_IN_SECONDS); + assertEquals(expected, result.get()); + } + + void close() throws InterruptedException, IOException { + assertTrue(queryManager.cancel(queryId)); + + fs.close(); + context.close(); + queryManager.awaitTermination(5, TimeUnit.SECONDS); + } + } + + /** FileSystem StorageEngine. */ + @RequiredArgsConstructor + class FSStorageEngine implements StorageEngine { + + private final FileSystem fs; + + private final Path basePath; + + @Override + public Table getTable(CatalogSchemaName catalogSchemaName, String tableName) { + return new FSTable(fs, basePath); + } + } + + /** FileSystem Table. */ + @RequiredArgsConstructor + class FSTable implements Table { + private final FileSystem fs; + + private final Path basePath; + + @Override + public Map getFieldTypes() { + return ImmutableMap.of("value", ExprCoreType.INTEGER); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return new Output(new FSScan(fs)); + } + + @Override + public StreamingSource asStreamingSource() { + return new FileSystemStreamSource(fs, basePath); + } + } + + @RequiredArgsConstructor + class Output extends PhysicalPlan { + private final PhysicalPlan input; + + @Override + public void open() { + while (input.hasNext()) { + result.addAndGet(input.next().integerValue()); + } + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public ExprValue next() { + throw new UnsupportedOperationException(); + } + + // todo, need to refactor physical plan interface. + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + throw new UnsupportedOperationException(); + } + + @Override + public List getChild() { + return Collections.singletonList(input); + } + } + + @RequiredArgsConstructor + class FSScan extends TableScanOperator { + + private final FileSystem fs; + + private Iterator paths; + + @Override + public String explain() { + fail(); + return ""; + } + + @Override + public boolean hasNext() { + return paths.hasNext(); + } + + @SneakyThrows(IOException.class) + @Override + public ExprValue next() { + BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(paths.next()))); + // every file only contain one line. + ExprValue result = ExprValueUtils.integerValue(Integer.valueOf(reader.readLine())); + + reader.close(); + return result; + } + + @Override + public void add(Split split) { + paths = ((FileSystemSplit) split).getPaths().iterator(); + } + } + + class DefaultExecutionEngine implements ExecutionEngine { + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + execute(plan, ExecutionContext.emptyExecutionContext(), listener); + } + + @Override + public void execute(PhysicalPlan plan, ExecutionContext context, + ResponseListener listener) { + try { + List result = new ArrayList<>(); + + context.getSplit().ifPresent(plan::add); + plan.open(); + + while (plan.hasNext()) { + result.add(plan.next()); + } + QueryResponse response = + new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>()); + listener.onResponse(response); + } catch (Exception e) { + listener.onFailure(e); + } finally { + plan.close(); + } + } + + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + fail(); + } + } + + /** + * org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner could not close. + * https://www.mail-archive.com/common-issues@hadoop.apache.org/msg232722.html + */ + static public class HadoopFSThreadsFilter implements ThreadFilter { + @Override + public boolean reject(Thread t) { + return t.getName().contains("StatisticsDataReferenceCleaner"); + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index bb00fbb68b..9a136a3bec 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -12,6 +12,7 @@ import lombok.RequiredArgsConstructor; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.Explain; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -29,11 +30,19 @@ public class OpenSearchExecutionEngine implements ExecutionEngine { @Override public void execute(PhysicalPlan physicalPlan, ResponseListener listener) { + execute(physicalPlan, ExecutionContext.emptyExecutionContext(), listener); + } + + @Override + public void execute(PhysicalPlan physicalPlan, ExecutionContext context, + ResponseListener listener) { PhysicalPlan plan = executionProtector.protect(physicalPlan); client.schedule( () -> { try { List result = new ArrayList<>(); + + context.getSplit().ifPresent(plan::add); plan.open(); while (plan.hasNext()) { diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index f1a0a7d5d7..4a0c6e24f1 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -8,6 +8,7 @@ import static com.google.common.collect.ImmutableMap.of; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -25,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import lombok.RequiredArgsConstructor; import org.junit.jupiter.api.BeforeEach; @@ -35,6 +37,7 @@ import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -43,6 +46,7 @@ import org.opensearch.sql.opensearch.storage.OpenSearchIndexScan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.split.Split; @ExtendWith(MockitoExtension.class) class OpenSearchExecutionEngineTest { @@ -53,6 +57,10 @@ class OpenSearchExecutionEngineTest { @Mock private static ExecutionEngine.Schema schema; + @Mock private ExecutionContext executionContext; + + @Mock private Split split; + @BeforeEach void setUp() { doAnswer( @@ -167,11 +175,44 @@ public void onFailure(Exception e) { assertNotNull(result.get()); } + @Test + void callAddSplitAndOpenInOrder() { + List expected = + Arrays.asList( + tupleValue(of("name", "John", "age", 20)), tupleValue(of("name", "Allen", "age", 30))); + FakePhysicalPlan plan = new FakePhysicalPlan(expected.iterator()); + when(protector.protect(plan)).thenReturn(plan); + when(executionContext.getSplit()).thenReturn(Optional.of(split)); + + OpenSearchExecutionEngine executor = new OpenSearchExecutionEngine(client, protector); + List actual = new ArrayList<>(); + executor.execute( + plan, + executionContext, + new ResponseListener<>() { + @Override + public void onResponse(QueryResponse response) { + actual.addAll(response.getResults()); + } + + @Override + public void onFailure(Exception e) { + fail("Error occurred during execution", e); + } + }); + + assertTrue(plan.hasSplit); + assertTrue(plan.hasOpen); + assertEquals(expected, actual); + assertTrue(plan.hasClosed); + } + @RequiredArgsConstructor private static class FakePhysicalPlan extends TableScanOperator { private final Iterator it; private boolean hasOpen; private boolean hasClosed; + private boolean hasSplit; @Override public void open() { @@ -185,6 +226,12 @@ public void close() { hasClosed = true; } + @Override + public void add(Split split) { + assertFalse(hasOpen); + hasSplit = true; + } + @Override public boolean hasNext() { return it.hasNext(); From cd0238a53d11b0f045d5ef4339035a73598e0400 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 14 Nov 2022 11:51:03 -0800 Subject: [PATCH 02/11] fix thread leak Signed-off-by: Peng Huo --- .../org/opensearch/sql/ppl/StandaloneIT.java | 4 ++-- .../ExecuteOnCallerThreadQueryManager.java | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 integ-test/src/test/java/org/opensearch/sql/util/ExecuteOnCallerThreadQueryManager.java diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index 1ea6fb5a0a..cf5f930b3b 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -23,7 +23,6 @@ import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.executor.DefaultQueryManager; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.executor.QueryManager; @@ -45,6 +44,7 @@ import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; +import org.opensearch.sql.util.ExecuteOnCallerThreadQueryManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -176,7 +176,7 @@ static class StandaloneConfig { @Bean QueryManager queryManager() { - return DefaultQueryManager.defaultQueryManager(); + return new ExecuteOnCallerThreadQueryManager(); } @Bean diff --git a/integ-test/src/test/java/org/opensearch/sql/util/ExecuteOnCallerThreadQueryManager.java b/integ-test/src/test/java/org/opensearch/sql/util/ExecuteOnCallerThreadQueryManager.java new file mode 100644 index 0000000000..0a42dc83e3 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/util/ExecuteOnCallerThreadQueryManager.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.util; + +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryManager; +import org.opensearch.sql.executor.execution.AbstractPlan; + +/** + * ONLY USED FOR TEST PURPOSE. + * + * Execute {@link AbstractPlan} on caller thread. + */ +public class ExecuteOnCallerThreadQueryManager implements QueryManager { + @Override + public QueryId submit(AbstractPlan queryPlan) { + queryPlan.execute(); + return queryPlan.getQueryId(); + } +} From fdbd364275a9bb7bf2b359aaaf92e217de1e6431 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 14 Nov 2022 13:01:54 -0800 Subject: [PATCH 03/11] fix build issue Signed-off-by: Peng Huo --- .../java/org/opensearch/sql/ppl/PPLServiceTest.java | 12 +++++++++++- .../java/org/opensearch/sql/sql/SQLServiceTest.java | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java index ef8ec25df8..1bcc1b9de9 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java @@ -10,6 +10,8 @@ import static org.mockito.Mockito.doAnswer; import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -57,12 +59,15 @@ public class PPLServiceTest { @Mock private ExecutionEngine.Schema schema; + private DefaultQueryManager queryManager; + /** * Setup the test context. */ @Before public void setUp() { - context.registerBean(QueryManager.class, DefaultQueryManager::new); + queryManager = DefaultQueryManager.defaultQueryManager(); + context.registerBean(QueryManager.class, () -> queryManager); context.registerBean(QueryPlanFactory.class, () -> new QueryPlanFactory(queryService)); context.registerBean(StorageEngine.class, () -> storageEngine); context.registerBean(ExecutionEngine.class, () -> executionEngine); @@ -72,6 +77,11 @@ public void setUp() { pplService = context.getBean(PPLService.class); } + @After + public void cleanup() throws InterruptedException { + queryManager.awaitTermination(1, TimeUnit.SECONDS); + } + @Test public void testExecuteShouldPass() { doAnswer(invocation -> { diff --git a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java index f1d2c5293d..9abe37cd06 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java @@ -13,7 +13,9 @@ import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import java.util.Collections; +import java.util.concurrent.TimeUnit; import org.json.JSONObject; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -42,6 +44,8 @@ class SQLServiceTest { private SQLService sqlService; + private DefaultQueryManager queryManager; + @Mock private QueryService queryService; @@ -50,13 +54,19 @@ class SQLServiceTest { @BeforeEach public void setUp() { - context.registerBean(QueryManager.class, DefaultQueryManager::new); + queryManager = DefaultQueryManager.defaultQueryManager(); + context.registerBean(QueryManager.class, () -> queryManager); context.registerBean(QueryPlanFactory.class, () -> new QueryPlanFactory(queryService)); context.register(SQLServiceConfig.class); context.refresh(); sqlService = context.getBean(SQLService.class); } + @AfterEach + public void cleanup() throws InterruptedException { + queryManager.awaitTermination(1, TimeUnit.SECONDS); + } + @Test public void canExecuteSqlQuery() { doAnswer(invocation -> { From 373a853922a692e0b5687198cd82a5bbcb8f6dcb Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 14 Nov 2022 15:31:22 -0800 Subject: [PATCH 04/11] enable prometheus test Signed-off-by: Peng Huo --- integ-test/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integ-test/build.gradle b/integ-test/build.gradle index f5dc682e60..58618cd67c 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -144,8 +144,8 @@ stopPrometheus.mustRunAfter startPrometheus // Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled integTest { dependsOn ':opensearch-sql-plugin:bundlePlugin' -// dependsOn startPrometheus -// finalizedBy stopPrometheus + dependsOn startPrometheus + finalizedBy stopPrometheus systemProperty 'tests.security.manager', 'false' systemProperty('project.root', project.projectDir.absolutePath) From 71652dce4a4c98560f2677327c545e4bde943c3f Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 15 Nov 2022 10:03:19 -0800 Subject: [PATCH 05/11] disable flaky test Signed-off-by: Peng Huo --- .../org/opensearch/sql/executor/DefaultQueryManagerTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java index fc5c1d985c..4857a70366 100644 --- a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java @@ -45,6 +45,10 @@ void clean() throws InterruptedException { queryManager.awaitTermination(1, TimeUnit.SECONDS); } + /** + * ignore verify(plan, times(1)).execute(), because the time ExecutorService executing the + * runnable is undetermined. + */ @Test public void submitQuery() { when(plan.getQueryId()).thenReturn(queryId); @@ -53,7 +57,6 @@ public void submitQuery() { QueryId actualQueryId = queryManager.submit(plan); assertEquals(queryId, actualQueryId); - verify(plan, times(1)).execute(); } @Test From 804f39c59d1e9da672303d122c7a65dcddd35099 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 15 Nov 2022 11:39:33 -0800 Subject: [PATCH 06/11] fix ut Signed-off-by: Peng Huo --- .../sql/executor/DefaultQueryManagerTest.java | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java index 4857a70366..b737da6033 100644 --- a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java @@ -8,12 +8,9 @@ package org.opensearch.sql.executor; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.concurrent.ExecutorService; @@ -30,11 +27,9 @@ @ExtendWith(MockitoExtension.class) class DefaultQueryManagerTest { - @Mock - private QueryId queryId; + @Mock private QueryId queryId; - @Mock - private AbstractPlan plan; + @Mock private AbstractPlan plan; private DefaultQueryManager queryManager; @@ -61,20 +56,27 @@ public void submitQuery() { @Test public void cancel() { - queryManager = DefaultQueryManager.defaultQueryManager();; - QueryId id = queryManager.submit(new AbstractPlan(queryId) { - @Override - public void execute() { - // do nothing - } - - @Override - public void explain(ResponseListener listener) { - // do nothing - } - }); + queryManager = DefaultQueryManager.defaultQueryManager(); + QueryId id = + queryManager.submit( + new AbstractPlan(queryId) { + @Override + public void execute() { + // do nothing + } + + @Override + public void explain(ResponseListener listener) { + // do nothing + } + }); assertTrue(queryManager.cancel(id)); - assertFalse(queryManager.cancel(id)); + } + + @Test + public void cancelQueryNotExist() { + queryManager = DefaultQueryManager.defaultQueryManager(); + assertFalse(queryManager.cancel(QueryId.queryId())); } @Test From 4c7a6f1e59e8653a484ddefcd91ab29a96bf4555 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 15 Nov 2022 20:34:14 -0800 Subject: [PATCH 07/11] add DefaultQueryManager to testFixtures Signed-off-by: Peng Huo --- core/build.gradle | 7 +- .../sql/executor/DefaultQueryManagerTest.java | 92 ------------------- .../sql/executor/DefaultQueryManager.java | 5 +- ppl/build.gradle | 2 +- sql/build.gradle | 1 + 5 files changed, 4 insertions(+), 103 deletions(-) delete mode 100644 core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java rename core/src/{main => testFixtures}/java/org/opensearch/sql/executor/DefaultQueryManager.java (90%) diff --git a/core/build.gradle b/core/build.gradle index eb70f110d1..45169f4ead 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -26,17 +26,12 @@ plugins { id 'java-library' id "io.freefair.lombok" id 'jacoco' + id 'java-test-fixtures' } repositories { mavenCentral() } -// -//configurations.all { -// resolutionStrategy.dependencySubstitution { -// substitute module('com.google.guava:guava:26.0-jre') with module('com.google.guava:guava:29.0-jre') -// } -//} dependencies { api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' diff --git a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java b/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java deleted file mode 100644 index b737da6033..0000000000 --- a/core/src/test/java/org/opensearch/sql/executor/DefaultQueryManagerTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.sql.executor; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.when; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.executor.execution.AbstractPlan; - -@ExtendWith(MockitoExtension.class) -class DefaultQueryManagerTest { - - @Mock private QueryId queryId; - - @Mock private AbstractPlan plan; - - private DefaultQueryManager queryManager; - - private ExecutorService executorService; - - @AfterEach - void clean() throws InterruptedException { - queryManager.awaitTermination(1, TimeUnit.SECONDS); - } - - /** - * ignore verify(plan, times(1)).execute(), because the time ExecutorService executing the - * runnable is undetermined. - */ - @Test - public void submitQuery() { - when(plan.getQueryId()).thenReturn(queryId); - - queryManager = DefaultQueryManager.defaultQueryManager();; - QueryId actualQueryId = queryManager.submit(plan); - - assertEquals(queryId, actualQueryId); - } - - @Test - public void cancel() { - queryManager = DefaultQueryManager.defaultQueryManager(); - QueryId id = - queryManager.submit( - new AbstractPlan(queryId) { - @Override - public void execute() { - // do nothing - } - - @Override - public void explain(ResponseListener listener) { - // do nothing - } - }); - assertTrue(queryManager.cancel(id)); - } - - @Test - public void cancelQueryNotExist() { - queryManager = DefaultQueryManager.defaultQueryManager(); - assertFalse(queryManager.cancel(QueryId.queryId())); - } - - @Test - public void executorServiceShutdownProperly() throws InterruptedException { - executorService = Executors.newSingleThreadExecutor(); - queryManager = new DefaultQueryManager(executorService); - queryManager.submit(plan); - queryManager.awaitTermination(1, TimeUnit.SECONDS); - - assertTrue(executorService.isShutdown()); - assertTrue(executorService.isTerminated()); - } -} diff --git a/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java b/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultQueryManager.java similarity index 90% rename from core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java rename to core/src/testFixtures/java/org/opensearch/sql/executor/DefaultQueryManager.java index af0db86ad5..41888a0581 100644 --- a/core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java +++ b/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultQueryManager.java @@ -1,9 +1,6 @@ /* + * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. */ package org.opensearch.sql.executor; diff --git a/ppl/build.gradle b/ppl/build.gradle index 2c3c648478..1c605f4372 100644 --- a/ppl/build.gradle +++ b/ppl/build.gradle @@ -57,7 +57,7 @@ dependencies { testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' - + testImplementation(testFixtures(project(":core"))) } test { diff --git a/sql/build.gradle b/sql/build.gradle index 222ad92ac6..13a7ceba53 100644 --- a/sql/build.gradle +++ b/sql/build.gradle @@ -57,6 +57,7 @@ dependencies { testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' + testImplementation(testFixtures(project(":core"))) } test { From 3d3fbfd4c4f7a66751e230e2ac59d50f39fbc63f Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 15 Nov 2022 21:23:11 -0800 Subject: [PATCH 08/11] ad fs storage engine to testFixtures Signed-off-by: Peng Huo --- .../sql/executor/DefaultExecutionEngine.java | 48 +++++ filesystem/build.gradle | 23 ++- .../filesystem/storage/FSStorageEngine.java | 47 +++++ .../sql/filesystem/storage/FSTable.java | 127 ++++++++++++ integ-test/build.gradle | 2 + .../opensearch/sql/ppl/StreamingQueryIT.java | 191 +----------------- 6 files changed, 256 insertions(+), 182 deletions(-) create mode 100644 core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java create mode 100644 filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java create mode 100644 filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java diff --git a/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java b/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java new file mode 100644 index 0000000000..e4f9a185a3 --- /dev/null +++ b/core/src/testFixtures/java/org/opensearch/sql/executor/DefaultExecutionEngine.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import java.util.ArrayList; +import java.util.List; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * Used for testing purpose. + */ +public class DefaultExecutionEngine implements ExecutionEngine { + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + execute(plan, ExecutionContext.emptyExecutionContext(), listener); + } + + @Override + public void execute( + PhysicalPlan plan, ExecutionContext context, ResponseListener listener) { + try { + List result = new ArrayList<>(); + + context.getSplit().ifPresent(plan::add); + plan.open(); + + while (plan.hasNext()) { + result.add(plan.next()); + } + QueryResponse response = new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>()); + listener.onResponse(response); + } catch (Exception e) { + listener.onFailure(e); + } finally { + plan.close(); + } + } + + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + throw new UnsupportedOperationException(); + } +} diff --git a/filesystem/build.gradle b/filesystem/build.gradle index 935d0742bd..b76c0d517c 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -7,6 +7,7 @@ plugins { id 'java-library' id "io.freefair.lombok" id 'jacoco' + id 'java-test-fixtures' } ext { @@ -19,9 +20,9 @@ configurations.all { } dependencies { - api project(':core') + implementation project(':core') // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. - api("org.apache.hadoop:hadoop-common:${hadoop}") { + implementation("org.apache.hadoop:hadoop-common:${hadoop}") { exclude group: 'org.apache.zookeeper' exclude group: 'org.eclipse.jetty' exclude group: 'com.sun.jersey' @@ -67,6 +68,24 @@ dependencies { testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' + testFixturesImplementation(project(":core")) + testFixturesImplementation("org.apache.hadoop:hadoop-common:${hadoop}") { + exclude group: 'org.apache.zookeeper' + exclude group: 'org.eclipse.jetty' + exclude group: 'com.sun.jersey' + exclude group: 'javax.servlet.jsp' + exclude group: 'javax.servlet' + exclude group: 'org.apache.kerby' + exclude group: 'org.apache.curator' + exclude group: 'com.google.protobuf', module: 'protobuf-java' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt' + // enforce version. + exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core' + exclude group: 'commons-io', module: 'commons-io' + exclude group: 'ch.qos.reload4j', module: 'reload4j' + exclude group: 'org.apache.httpcomponents', module: 'httpcore' + } } test { diff --git a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java new file mode 100644 index 0000000000..160b7ecaef --- /dev/null +++ b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.storage; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.opensearch.sql.CatalogSchemaName; +import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.Table; + +/** + * FileSystem StorageEngine. Used for testing purpose. + */ +@RequiredArgsConstructor +public class FSStorageEngine implements StorageEngine { + + private final FileSystem fs; + + private final Path basePath; + + private final AtomicInteger result; + + @SneakyThrows + public FSStorageEngine(URI basePath, AtomicInteger result) { + this.fs = FileSystem.get(new Configuration()); + this.basePath = new Path(basePath); + this.result = result; + } + + @Override + public Table getTable(CatalogSchemaName catalogSchemaName, String tableName) { + return new FSTable(fs, basePath, result); + } + + @SneakyThrows + public void close() { + fs.close(); + } +} diff --git a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java new file mode 100644 index 0000000000..45f8fc1104 --- /dev/null +++ b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSTable.java @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.storage; + +import com.google.common.collect.ImmutableMap; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; +import org.opensearch.sql.filesystem.streaming.FileSystemStreamSource; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; +import org.opensearch.sql.storage.Table; +import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.sql.storage.split.Split; + +/** + * FileSystem Table. Used for testing purpose. + */ +@RequiredArgsConstructor +public class FSTable implements Table { + private final FileSystem fs; + + private final Path basePath; + + private final AtomicInteger result; + + @Override + public Map getFieldTypes() { + return ImmutableMap.of("value", ExprCoreType.INTEGER); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return new Output(new FSScan(fs)); + } + + @Override + public StreamingSource asStreamingSource() { + return new FileSystemStreamSource(fs, basePath); + } + + @RequiredArgsConstructor + class Output extends PhysicalPlan { + private final PhysicalPlan input; + + @Override + public void open() { + while (input.hasNext()) { + result.addAndGet(input.next().integerValue()); + } + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public ExprValue next() { + throw new UnsupportedOperationException(); + } + + // todo, need to refactor physical plan interface. + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + throw new UnsupportedOperationException(); + } + + @Override + public List getChild() { + return Collections.singletonList(input); + } + } + + @RequiredArgsConstructor + class FSScan extends TableScanOperator { + + private final FileSystem fs; + + private Iterator paths; + + @Override + public String explain() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasNext() { + return paths.hasNext(); + } + + @SneakyThrows(IOException.class) + @Override + public ExprValue next() { + BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(paths.next()))); + // every file only contain one line. + ExprValue result = ExprValueUtils.integerValue(Integer.valueOf(reader.readLine())); + + reader.close(); + return result; + } + + @Override + public void add(Split split) { + paths = ((FileSystemSplit) split).getPaths().iterator(); + } + } +} diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 58618cd67c..25197c6b8f 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -93,6 +93,8 @@ dependencies { testImplementation group: 'com.h2database', name: 'h2', version: '2.1.210' testImplementation group: 'org.xerial', name: 'sqlite-jdbc', version: '3.28.0' testImplementation group: 'com.google.code.gson', name: 'gson', version: '2.8.9' + testImplementation(testFixtures(project(":core"))) + testImplementation(testFixtures(project(":filesystem"))) } dependencyLicenses.enabled = false diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java index 366ca06af3..e1ced8044c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StreamingQueryIT.java @@ -7,63 +7,36 @@ import com.carrotsearch.randomizedtesting.ThreadFilter; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import com.google.common.collect.ImmutableMap; -import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Test; -import org.opensearch.sql.CatalogSchemaName; import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.analysis.ExpressionAnalyzer; import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.model.ExprValueUtils; -import org.opensearch.sql.data.type.ExprCoreType; -import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.DefaultExecutionEngine; import org.opensearch.sql.executor.DefaultQueryManager; -import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryId; import org.opensearch.sql.executor.QueryService; import org.opensearch.sql.executor.execution.StreamingQueryPlan; -import org.opensearch.sql.executor.streaming.StreamingSource; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.config.ExpressionConfig; import org.opensearch.sql.expression.function.BuiltinFunctionRepository; -import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; -import org.opensearch.sql.filesystem.streaming.FileSystemStreamSource; +import org.opensearch.sql.filesystem.storage.FSStorageEngine; import org.opensearch.sql.planner.Planner; -import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; -import org.opensearch.sql.planner.physical.PhysicalPlan; -import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; import org.opensearch.sql.plugin.catalog.CatalogServiceImpl; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.Table; -import org.opensearch.sql.storage.TableScanOperator; -import org.opensearch.sql.storage.split.Split; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @ThreadLeakFilters(filters = {StreamingQueryIT.HadoopFSThreadsFilter.class}) @@ -105,16 +78,14 @@ StreamingQuery fromFile(java.nio.file.Path path) throws IOException { } class Source { - @Getter - private final java.nio.file.Path tempDir; + @Getter private final java.nio.file.Path tempDir; public Source() throws IOException { tempDir = Files.createTempDirectory("tempDir"); } Source add(int v) throws IOException { - java.nio.file.Path path = - Files.createFile(tempDir.resolve(UUID.randomUUID().toString())); + java.nio.file.Path path = Files.createFile(tempDir.resolve(UUID.randomUUID().toString())); String buf = String.valueOf(v); FileOutputStream outputStream = new FileOutputStream(path.toFile()); outputStream.write(buf.getBytes(StandardCharsets.UTF_8)); @@ -139,21 +110,20 @@ class StreamingQuery { final QueryService queryService; - final FileSystem fs; - final QueryId queryId = QueryId.queryId(); - public StreamingQuery(java.nio.file.Path tempDir) throws IOException { + final FSStorageEngine storageEngine; + + public StreamingQuery(java.nio.file.Path tempDir) { result.set(0); - fs = FileSystem.get(new Configuration()); context = new AnnotationConfigApplicationContext(); context.register(ExpressionConfig.class); context.refresh(); BuiltinFunctionRepository functionRepository = context.getBean(BuiltinFunctionRepository.class); CatalogService catalogService = CatalogServiceImpl.getInstance(); - CatalogServiceImpl.getInstance() - .registerDefaultOpenSearchCatalog(new FSStorageEngine(fs, new Path(tempDir.toUri()))); + storageEngine = new FSStorageEngine(tempDir.toUri(), result); + CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(storageEngine); Analyzer analyzer = new Analyzer( new ExpressionAnalyzer(functionRepository), catalogService, functionRepository); @@ -192,156 +162,17 @@ void sumShouldBe(int expected) throws InterruptedException { void close() throws InterruptedException, IOException { assertTrue(queryManager.cancel(queryId)); - fs.close(); + storageEngine.close(); context.close(); queryManager.awaitTermination(5, TimeUnit.SECONDS); } } - /** FileSystem StorageEngine. */ - @RequiredArgsConstructor - class FSStorageEngine implements StorageEngine { - - private final FileSystem fs; - - private final Path basePath; - - @Override - public Table getTable(CatalogSchemaName catalogSchemaName, String tableName) { - return new FSTable(fs, basePath); - } - } - - /** FileSystem Table. */ - @RequiredArgsConstructor - class FSTable implements Table { - private final FileSystem fs; - - private final Path basePath; - - @Override - public Map getFieldTypes() { - return ImmutableMap.of("value", ExprCoreType.INTEGER); - } - - @Override - public PhysicalPlan implement(LogicalPlan plan) { - return new Output(new FSScan(fs)); - } - - @Override - public StreamingSource asStreamingSource() { - return new FileSystemStreamSource(fs, basePath); - } - } - - @RequiredArgsConstructor - class Output extends PhysicalPlan { - private final PhysicalPlan input; - - @Override - public void open() { - while (input.hasNext()) { - result.addAndGet(input.next().integerValue()); - } - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public ExprValue next() { - throw new UnsupportedOperationException(); - } - - // todo, need to refactor physical plan interface. - @Override - public R accept(PhysicalPlanNodeVisitor visitor, C context) { - throw new UnsupportedOperationException(); - } - - @Override - public List getChild() { - return Collections.singletonList(input); - } - } - - @RequiredArgsConstructor - class FSScan extends TableScanOperator { - - private final FileSystem fs; - - private Iterator paths; - - @Override - public String explain() { - fail(); - return ""; - } - - @Override - public boolean hasNext() { - return paths.hasNext(); - } - - @SneakyThrows(IOException.class) - @Override - public ExprValue next() { - BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(paths.next()))); - // every file only contain one line. - ExprValue result = ExprValueUtils.integerValue(Integer.valueOf(reader.readLine())); - - reader.close(); - return result; - } - - @Override - public void add(Split split) { - paths = ((FileSystemSplit) split).getPaths().iterator(); - } - } - - class DefaultExecutionEngine implements ExecutionEngine { - @Override - public void execute(PhysicalPlan plan, ResponseListener listener) { - execute(plan, ExecutionContext.emptyExecutionContext(), listener); - } - - @Override - public void execute(PhysicalPlan plan, ExecutionContext context, - ResponseListener listener) { - try { - List result = new ArrayList<>(); - - context.getSplit().ifPresent(plan::add); - plan.open(); - - while (plan.hasNext()) { - result.add(plan.next()); - } - QueryResponse response = - new QueryResponse(new Schema(new ArrayList<>()), new ArrayList<>()); - listener.onResponse(response); - } catch (Exception e) { - listener.onFailure(e); - } finally { - plan.close(); - } - } - - @Override - public void explain(PhysicalPlan plan, ResponseListener listener) { - fail(); - } - } - /** * org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner could not close. * https://www.mail-archive.com/common-issues@hadoop.apache.org/msg232722.html */ - static public class HadoopFSThreadsFilter implements ThreadFilter { + public static class HadoopFSThreadsFilter implements ThreadFilter { @Override public boolean reject(Thread t) { return t.getName().contains("StatisticsDataReferenceCleaner"); From 79f4bf6a7223d80bb8c1189be7470ed46587136c Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 15 Nov 2022 21:37:57 -0800 Subject: [PATCH 09/11] fix style issue Signed-off-by: Peng Huo --- .../opensearch/sql/filesystem/storage/FSStorageEngine.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java index 160b7ecaef..ba590bcd1e 100644 --- a/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java +++ b/filesystem/src/testFixtures/java/org/opensearch/sql/filesystem/storage/FSStorageEngine.java @@ -16,9 +16,7 @@ import org.opensearch.sql.storage.StorageEngine; import org.opensearch.sql.storage.Table; -/** - * FileSystem StorageEngine. Used for testing purpose. - */ +/** FileSystem StorageEngine. Used for testing purpose. */ @RequiredArgsConstructor public class FSStorageEngine implements StorageEngine { @@ -28,6 +26,9 @@ public class FSStorageEngine implements StorageEngine { private final AtomicInteger result; + /** + * constructor. + */ @SneakyThrows public FSStorageEngine(URI basePath, AtomicInteger result) { this.fs = FileSystem.get(new Configuration()); From 085adff6736d44ba3a3bb81ecf0e79ab0bde6c6a Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 18 Nov 2022 09:13:06 -0800 Subject: [PATCH 10/11] address comments Signed-off-by: Peng Huo --- .../sql/executor/streaming/MicroBatchStreamingExecution.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java index d7087e6153..c31ed18c57 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java @@ -105,7 +105,7 @@ public void onResponse(ExecutionEngine.QueryResponse response) { @Override public void onFailure(Exception e) { - log.error("streaming processing failed. source = {}", source, e); + log.error("streaming processing failed. source = {} {}", source, e); } }); } From ed263a81cb3204bf691307d512935284a1e7a110 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 18 Nov 2022 16:56:39 -0800 Subject: [PATCH 11/11] fix build issue Signed-off-by: Peng Huo --- .../sql/planner/physical/datasource/DataSourceTableTest.java | 2 +- integ-test/build.gradle | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableTest.java index dd5d49d08e..c82a042770 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableTest.java @@ -71,6 +71,6 @@ void testCreateTable() { void defaultAsStreamingSource() { assertThrows( UnsupportedOperationException.class, - () -> new CatalogTable(catalogService).asStreamingSource()); + () -> new DataSourceTable(dataSourceService).asStreamingSource()); } } diff --git a/integ-test/build.gradle b/integ-test/build.gradle index ba6a6b5838..43a00a2c33 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -158,8 +158,8 @@ stopPrometheus.mustRunAfter startPrometheus // Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled integTest { dependsOn ':opensearch-sql-plugin:bundlePlugin' -// dependsOn startPrometheus -// finalizedBy stopPrometheus + dependsOn startPrometheus + finalizedBy stopPrometheus systemProperty 'tests.security.manager', 'false' systemProperty('project.root', project.projectDir.absolutePath)