Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add micro batch streaming execution #1044

39 changes: 33 additions & 6 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.analysis.Analyzer;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand All @@ -31,12 +32,32 @@ public class QueryService {

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
* Todo. deprecated this interface after finalize {@link PlanContext}.
*
* @param plan {@link UnresolvedPlan}
* @param listener {@link ResponseListener}
*/
public void execute(UnresolvedPlan plan,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener}
* to get response.
* Todo. Pass split from PlanContext to ExecutionEngine in following PR.
*
* @param plan {@link LogicalPlan}
* @param planContext {@link PlanContext}
* @param listener {@link ResponseListener}
*/
public void executePlan(LogicalPlan plan,
PlanContext planContext,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executionEngine.execute(plan(plan), listener);
} catch (Exception e) {
Expand All @@ -54,17 +75,23 @@ public void execute(UnresolvedPlan plan,
public void explain(UnresolvedPlan plan,
ResponseListener<ExecutionEngine.ExplainResponse> listener) {
try {
executionEngine.explain(plan(plan), listener);
executionEngine.explain(plan(analyze(plan)), listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

private PhysicalPlan plan(UnresolvedPlan plan) {
// 1.Analyze abstract syntax to generate logical plan
LogicalPlan logicalPlan = analyzer.analyze(plan, new AnalysisContext());
/**
* Analyze {@link UnresolvedPlan}.
*/
public LogicalPlan analyze(UnresolvedPlan plan) {
return analyzer.analyze(plan, new AnalysisContext());
}

// 2.Generate optimal physical plan from logical plan
return planner.plan(logicalPlan);
/**
* Translate {@link LogicalPlan} to {@link PhysicalPlan}.
*/
public PhysicalPlan plan(LogicalPlan plan) {
return planner.plan(plan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.streaming;

import com.google.common.base.Preconditions;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;

/**
* Micro batch streaming execution.
*/
public class MicroBatchStreamingExecution {

private static final Logger log = LogManager.getLogger(MicroBatchStreamingExecution.class);

static final long INITIAL_LATEST_BATCH_ID = -1L;

private final StreamingSource source;

private final LogicalPlan batchPlan;

private final QueryService queryService;

/**
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
* that a given batch will always consist of the same data, we write to this log before any
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
private final MetadataLog<Offset> offsetLog;

/** keep track the latest commit batchId. */
private final MetadataLog<Offset> committedLog;

/**
* Constructor.
*/
public MicroBatchStreamingExecution(
StreamingSource source,
LogicalPlan batchPlan,
QueryService queryService,
MetadataLog<Offset> offsetLog,
MetadataLog<Offset> committedLog) {
this.source = source;
this.batchPlan = batchPlan;
this.queryService = queryService;
// todo. add offsetLog and committedLog offset recovery.
this.offsetLog = offsetLog;
this.committedLog = committedLog;
}

/**
* Pull the {@link Batch} from {@link StreamingSource} and execute the {@link Batch}.
*/
public void execute() {
Long latestBatchId = offsetLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID);
Long latestCommittedBatchId =
committedLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID);
Optional<Offset> committedOffset = offsetLog.get(latestCommittedBatchId);
AtomicLong currentBatchId = new AtomicLong(INITIAL_LATEST_BATCH_ID);

if (latestBatchId.equals(latestCommittedBatchId)) {
// there are no unhandled Offset.
currentBatchId.set(latestCommittedBatchId + 1L);
} else {
Preconditions.checkArgument(
latestBatchId.equals(latestCommittedBatchId + 1L),
"[BUG] Expected latestBatchId - latestCommittedBatchId = 0 or 1, "
+ "but latestBatchId=%d, latestCommittedBatchId=%d",
latestBatchId,
latestCommittedBatchId);

// latestBatchId is not committed yet.
currentBatchId.set(latestBatchId);
}

Optional<Offset> availableOffsets = source.getLatestOffset();
if (hasNewData(availableOffsets, committedOffset)) {
Batch batch = source.getBatch(committedOffset, availableOffsets.get());
offsetLog.add(currentBatchId.get(), availableOffsets.get());
queryService.executePlan(
batchPlan,
new PlanContext(batch.getSplit()),
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse response) {
long finalBatchId = currentBatchId.get();
Offset finalAvailableOffsets = availableOffsets.get();
committedLog.add(finalBatchId, finalAvailableOffsets);
}

@Override
public void onFailure(Exception e) {
log.error("streaming processing failed. source = {}", source);
}
});
}
}

private boolean hasNewData(Optional<Offset> availableOffsets, Optional<Offset> committedOffset) {
if (availableOffsets.equals(committedOffset)) {
log.debug("source does not have new data, exit. source = {}", source);
return false;
} else {
Preconditions.checkArgument(
availableOffsets.isPresent(), "[BUG] available offsets must be no empty");

log.debug(
"source has new data. source = {}, availableOffsets:{}, committedOffset:{}",
source,
availableOffsets,
committedOffset);
return true;
}
}
}
31 changes: 31 additions & 0 deletions core/src/main/java/org/opensearch/sql/planner/PlanContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner;

import java.util.Optional;
import lombok.Getter;
import org.opensearch.sql.storage.split.Split;

/**
* Plan context hold planning related information.
*/
public class PlanContext {

@Getter
private final Optional<Split> split;

public PlanContext(Split split) {
this.split = Optional.of(split);
}

private PlanContext(Optional<Split> split) {
this.split = split;
}

public static PlanContext emptyPlanContext() {
return new PlanContext(Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.sql.executor;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

import java.util.Collections;
Expand All @@ -24,6 +26,7 @@
import org.opensearch.sql.analysis.Analyzer;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand Down Expand Up @@ -56,10 +59,13 @@ class QueryServiceTest {
@Mock
private ExecutionEngine.Schema schema;

@Mock
private PlanContext planContext;

@BeforeEach
public void setUp() {
when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
when(planner.plan(any())).thenReturn(plan);
lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
lenient().when(planner.plan(any())).thenReturn(plan);

queryService = new QueryService(analyzer, executionEngine, planner);
}
Expand All @@ -81,7 +87,7 @@ public void testExecuteShouldPass() {
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {

assertNotNull(pplQueryResponse);
}

@Override
Expand Down Expand Up @@ -110,7 +116,7 @@ public void testExplainShouldPass() {
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.ExplainResponse pplQueryResponse) {

assertNotNull(pplQueryResponse);
}

@Override
Expand Down Expand Up @@ -161,4 +167,51 @@ public void onFailure(Exception e) {
}
});
}

@Test
public void testExecutePlanShouldPass() {
doAnswer(
invocation -> {
ResponseListener<ExecutionEngine.QueryResponse> listener = invocation.getArgument(1);
listener.onResponse(
new ExecutionEngine.QueryResponse(schema, Collections.emptyList()));
return null;
})
.when(executionEngine)
.execute(any(), any());

queryService.executePlan(
logicalPlan,
planContext,
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {
assertNotNull(pplQueryResponse);
}

@Override
public void onFailure(Exception e) {
fail();
}
});
}

@Test
public void analyzeExceptionShouldBeCached() {
when(analyzer.analyze(any(), any())).thenThrow(IllegalStateException.class);

queryService.execute(
ast,
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalStateException);
}
});
}
}
Loading