diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index 423d152e06..dcdf6bc010 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -13,6 +13,7 @@ import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -31,12 +32,32 @@ public class QueryService { /** * Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response. + * Todo. deprecated this interface after finalize {@link PlanContext}. * * @param plan {@link UnresolvedPlan} * @param listener {@link ResponseListener} */ public void execute(UnresolvedPlan plan, ResponseListener listener) { + try { + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + /** + * Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener} + * to get response. + * Todo. Pass split from PlanContext to ExecutionEngine in following PR. + * + * @param plan {@link LogicalPlan} + * @param planContext {@link PlanContext} + * @param listener {@link ResponseListener} + */ + public void executePlan(LogicalPlan plan, + PlanContext planContext, + ResponseListener listener) { try { executionEngine.execute(plan(plan), listener); } catch (Exception e) { @@ -54,17 +75,23 @@ public void execute(UnresolvedPlan plan, public void explain(UnresolvedPlan plan, ResponseListener listener) { try { - executionEngine.explain(plan(plan), listener); + executionEngine.explain(plan(analyze(plan)), listener); } catch (Exception e) { listener.onFailure(e); } } - private PhysicalPlan plan(UnresolvedPlan plan) { - // 1.Analyze abstract syntax to generate logical plan - LogicalPlan logicalPlan = analyzer.analyze(plan, new AnalysisContext()); + /** + * Analyze {@link UnresolvedPlan}. + */ + public LogicalPlan analyze(UnresolvedPlan plan) { + return analyzer.analyze(plan, new AnalysisContext()); + } - // 2.Generate optimal physical plan from logical plan - return planner.plan(logicalPlan); + /** + * Translate {@link LogicalPlan} to {@link PhysicalPlan}. + */ + public PhysicalPlan plan(LogicalPlan plan) { + return planner.plan(plan); } } diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java new file mode 100644 index 0000000000..4f25b9433f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecution.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.streaming; + +import com.google.common.base.Preconditions; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.planner.PlanContext; +import org.opensearch.sql.planner.logical.LogicalPlan; + +/** + * Micro batch streaming execution. + */ +public class MicroBatchStreamingExecution { + + private static final Logger log = LogManager.getLogger(MicroBatchStreamingExecution.class); + + static final long INITIAL_LATEST_BATCH_ID = -1L; + + private final StreamingSource source; + + private final LogicalPlan batchPlan; + + private final QueryService queryService; + + /** + * A write-ahead-log that records the offsets that are present in each batch. In order to ensure + * that a given batch will always consist of the same data, we write to this log before any + * processing is done. Thus, the Nth record in this log indicated data that is currently being + * processed and the N-1th entry indicates which offsets have been durably committed to the sink. + */ + private final MetadataLog offsetLog; + + /** keep track the latest commit batchId. */ + private final MetadataLog committedLog; + + /** + * Constructor. + */ + public MicroBatchStreamingExecution( + StreamingSource source, + LogicalPlan batchPlan, + QueryService queryService, + MetadataLog offsetLog, + MetadataLog committedLog) { + this.source = source; + this.batchPlan = batchPlan; + this.queryService = queryService; + // todo. add offsetLog and committedLog offset recovery. + this.offsetLog = offsetLog; + this.committedLog = committedLog; + } + + /** + * Pull the {@link Batch} from {@link StreamingSource} and execute the {@link Batch}. + */ + public void execute() { + Long latestBatchId = offsetLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID); + Long latestCommittedBatchId = + committedLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID); + Optional committedOffset = offsetLog.get(latestCommittedBatchId); + AtomicLong currentBatchId = new AtomicLong(INITIAL_LATEST_BATCH_ID); + + if (latestBatchId.equals(latestCommittedBatchId)) { + // there are no unhandled Offset. + currentBatchId.set(latestCommittedBatchId + 1L); + } else { + Preconditions.checkArgument( + latestBatchId.equals(latestCommittedBatchId + 1L), + "[BUG] Expected latestBatchId - latestCommittedBatchId = 0 or 1, " + + "but latestBatchId=%d, latestCommittedBatchId=%d", + latestBatchId, + latestCommittedBatchId); + + // latestBatchId is not committed yet. + currentBatchId.set(latestBatchId); + } + + Optional availableOffsets = source.getLatestOffset(); + if (hasNewData(availableOffsets, committedOffset)) { + Batch batch = source.getBatch(committedOffset, availableOffsets.get()); + offsetLog.add(currentBatchId.get(), availableOffsets.get()); + queryService.executePlan( + batchPlan, + new PlanContext(batch.getSplit()), + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + long finalBatchId = currentBatchId.get(); + Offset finalAvailableOffsets = availableOffsets.get(); + committedLog.add(finalBatchId, finalAvailableOffsets); + } + + @Override + public void onFailure(Exception e) { + log.error("streaming processing failed. source = {}", source); + } + }); + } + } + + private boolean hasNewData(Optional availableOffsets, Optional committedOffset) { + if (availableOffsets.equals(committedOffset)) { + log.debug("source does not have new data, exit. source = {}", source); + return false; + } else { + Preconditions.checkArgument( + availableOffsets.isPresent(), "[BUG] available offsets must be no empty"); + + log.debug( + "source has new data. source = {}, availableOffsets:{}, committedOffset:{}", + source, + availableOffsets, + committedOffset); + return true; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/PlanContext.java b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java new file mode 100644 index 0000000000..3d43c02d61 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner; + +import java.util.Optional; +import lombok.Getter; +import org.opensearch.sql.storage.split.Split; + +/** + * Plan context hold planning related information. + */ +public class PlanContext { + + @Getter + private final Optional split; + + public PlanContext(Split split) { + this.split = Optional.of(split); + } + + private PlanContext(Optional split) { + this.split = split; + } + + public static PlanContext emptyPlanContext() { + return new PlanContext(Optional.empty()); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java index 2884544dd0..d1ffa51fcc 100644 --- a/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java @@ -8,11 +8,13 @@ package org.opensearch.sql.executor; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; import java.util.Collections; @@ -24,6 +26,7 @@ import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -56,10 +59,13 @@ class QueryServiceTest { @Mock private ExecutionEngine.Schema schema; + @Mock + private PlanContext planContext; + @BeforeEach public void setUp() { - when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); - when(planner.plan(any())).thenReturn(plan); + lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan); + lenient().when(planner.plan(any())).thenReturn(plan); queryService = new QueryService(analyzer, executionEngine, planner); } @@ -81,7 +87,7 @@ public void testExecuteShouldPass() { new ResponseListener<>() { @Override public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { - + assertNotNull(pplQueryResponse); } @Override @@ -110,7 +116,7 @@ public void testExplainShouldPass() { new ResponseListener<>() { @Override public void onResponse(ExecutionEngine.ExplainResponse pplQueryResponse) { - + assertNotNull(pplQueryResponse); } @Override @@ -161,4 +167,51 @@ public void onFailure(Exception e) { } }); } + + @Test + public void testExecutePlanShouldPass() { + doAnswer( + invocation -> { + ResponseListener listener = invocation.getArgument(1); + listener.onResponse( + new ExecutionEngine.QueryResponse(schema, Collections.emptyList())); + return null; + }) + .when(executionEngine) + .execute(any(), any()); + + queryService.executePlan( + logicalPlan, + planContext, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + assertNotNull(pplQueryResponse); + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }); + } + + @Test + public void analyzeExceptionShouldBeCached() { + when(analyzer.analyze(any(), any())).thenThrow(IllegalStateException.class); + + queryService.execute( + ast, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) { + fail(); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + } + }); + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java new file mode 100644 index 0000000000..1a2b6e3f2a --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/streaming/MicroBatchStreamingExecutionTest.java @@ -0,0 +1,284 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import lombok.EqualsAndHashCode; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.planner.PlanContext; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.storage.split.Split; + +@ExtendWith(MockitoExtension.class) +class MicroBatchStreamingExecutionTest { + + @Test + void executedSuccess() { + streamingQuery() + .addData() + .executeSuccess(0L) + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L); + } + + @Test + void executedFailed() { + streamingQuery() + .addData() + .executeFailed() + .latestOffsetLogShouldBe(0L) + .noCommittedLog(); + } + + @Test + void noDataInSource() { + streamingQuery() + .neverProcess() + .noOffsetLog() + .noCommittedLog(); + } + + @Test + void noNewDataInSource() { + streamingQuery() + .addData() + .executeSuccess(0L) + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .neverProcess() + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L); + } + + @Test + void addNewDataInSequenceAllExecuteSuccess() { + streamingQuery() + .addData() + .executeSuccess(0L) + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .addData() + .executeSuccess(1L) + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(1L); + } + + @Test + void addNewDataInSequenceExecuteFailedInBetween() { + streamingQuery() + .addData() + .executeSuccess(0L) + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .addData() + .executeFailed() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(0L) + .executeSuccess(1L) + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(1L); + } + + @Test + void addNewDataInSequenceExecuteFailed() { + streamingQuery() + .addData() + .executeSuccess(0L) + .latestOffsetLogShouldBe(0L) + .latestCommittedLogShouldBe(0L) + .addData() + .executeFailed() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(0L) + .executeFailed() + .latestOffsetLogShouldBe(1L) + .latestCommittedLogShouldBe(0L); + } + + Helper streamingQuery() { + return new Helper(); + } + + private static class Helper { + + private final MicroBatchStreamingExecution execution; + + private final MetadataLog offsetLog; + + private final MetadataLog committedLog; + + private final LogicalPlan batchPlan; + + private final QueryService queryService; + + private final TestStreamingSource source = new TestStreamingSource(); + + public Helper() { + this.offsetLog = new DefaultMetadataLog<>(); + this.committedLog = new DefaultMetadataLog<>(); + this.batchPlan = Mockito.mock(LogicalPlan.class); + this.queryService = Mockito.mock(QueryService.class); + this.execution = + new MicroBatchStreamingExecution( + source, batchPlan, queryService, offsetLog, committedLog); + } + + Helper addData() { + source.addData(); + return this; + } + + Helper neverProcess() { + lenient() + .doAnswer( + invocation -> { + fail(); + return null; + }) + .when(queryService) + .executePlan(any(), any(), any()); + execution.execute(); + return this; + } + + Helper executeSuccess(Long... offsets) { + lenient() + .doAnswer( + invocation -> { + ResponseListener listener = + invocation.getArgument(2); + listener.onResponse( + new ExecutionEngine.QueryResponse(null, Collections.emptyList())); + + PlanContext planContext = invocation.getArgument(1); + assertTrue(planContext.getSplit().isPresent()); + assertEquals(new TestOffsetSplit(offsets), planContext.getSplit().get()); + + return null; + }) + .when(queryService) + .executePlan(any(), any(), any()); + execution.execute(); + + return this; + } + + Helper executeFailed() { + lenient() + .doAnswer( + invocation -> { + ResponseListener listener = + invocation.getArgument(2); + listener.onFailure(new RuntimeException()); + + return null; + }) + .when(queryService) + .executePlan(any(), any(), any()); + execution.execute(); + + return this; + } + + Helper noCommittedLog() { + assertTrue(committedLog.getLatest().isEmpty()); + return this; + } + + Helper latestCommittedLogShouldBe(Long offsetId) { + assertTrue(committedLog.getLatest().isPresent()); + assertEquals(offsetId, committedLog.getLatest().get().getRight().getOffset()); + return this; + } + + Helper noOffsetLog() { + assertTrue(offsetLog.getLatest().isEmpty()); + return this; + } + + Helper latestOffsetLogShouldBe(Long offsetId) { + assertTrue(offsetLog.getLatest().isPresent()); + assertEquals(offsetId, offsetLog.getLatest().get().getRight().getOffset()); + return this; + } + } + + /** + * StreamingSource impl only for testing. + * + *

initially, offset is -1, getLatestOffset() will return Optional.emtpy(). + * + *

call addData() add offset by one. + */ + static class TestStreamingSource implements StreamingSource { + + private final AtomicLong offset = new AtomicLong(-1L); + + /** add offset by one. */ + void addData() { + offset.incrementAndGet(); + } + + /** return offset if addData was called. */ + @Override + public Optional getLatestOffset() { + if (offset.get() == -1) { + return Optional.empty(); + } else { + return Optional.of(new Offset(offset.get())); + } + } + + /** always return `empty` Batch regardless start and end offset. */ + @Override + public Batch getBatch(Optional start, Offset end) { + return new Batch( + new TestOffsetSplit( + start.map(v -> v.getOffset() + 1).orElse(0L), Long.min(offset.get(), + end.getOffset()))); + } + } + + @EqualsAndHashCode + static class TestOffsetSplit implements Split { + + private final List offsets; + + public TestOffsetSplit(Long start, Long end) { + this.offsets = new ArrayList<>(); + for (long l = start; l <= end; l++) { + this.offsets.add(l); + } + } + + public TestOffsetSplit(Long... offsets) { + this.offsets = Arrays.stream(offsets).collect(Collectors.toList()); + } + + @Override + public String getSplitId() { + return "id"; + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/PlanContextTest.java b/core/src/test/java/org/opensearch/sql/planner/PlanContextTest.java new file mode 100644 index 0000000000..77ae78f77e --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/PlanContextTest.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.storage.split.Split; + +@ExtendWith(MockitoExtension.class) +class PlanContextTest { + + @Mock + private Split split; + + @Test + void createEmptyPlanContext() { + assertTrue(PlanContext.emptyPlanContext().getSplit().isEmpty()); + } + + @Test + void createPlanContextWithSplit() { + assertTrue(new PlanContext(split).getSplit().isPresent()); + } +}