Skip to content

Commit

Permalink
Merge pull request #1044 from penghuo/hp/feature/maximus/microbatchtask
Browse files Browse the repository at this point in the history
Add micro batch streaming execution
  • Loading branch information
penghuo authored Nov 14, 2022
2 parents 3c6b37a + faff812 commit b0cca57
Show file tree
Hide file tree
Showing 6 changed files with 566 additions and 10 deletions.
39 changes: 33 additions & 6 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.analysis.Analyzer;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand All @@ -31,12 +32,32 @@ public class QueryService {

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
* Todo. deprecated this interface after finalize {@link PlanContext}.
*
* @param plan {@link UnresolvedPlan}
* @param listener {@link ResponseListener}
*/
public void execute(UnresolvedPlan plan,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener}
* to get response.
* Todo. Pass split from PlanContext to ExecutionEngine in following PR.
*
* @param plan {@link LogicalPlan}
* @param planContext {@link PlanContext}
* @param listener {@link ResponseListener}
*/
public void executePlan(LogicalPlan plan,
PlanContext planContext,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executionEngine.execute(plan(plan), listener);
} catch (Exception e) {
Expand All @@ -54,17 +75,23 @@ public void execute(UnresolvedPlan plan,
public void explain(UnresolvedPlan plan,
ResponseListener<ExecutionEngine.ExplainResponse> listener) {
try {
executionEngine.explain(plan(plan), listener);
executionEngine.explain(plan(analyze(plan)), listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

private PhysicalPlan plan(UnresolvedPlan plan) {
// 1.Analyze abstract syntax to generate logical plan
LogicalPlan logicalPlan = analyzer.analyze(plan, new AnalysisContext());
/**
* Analyze {@link UnresolvedPlan}.
*/
public LogicalPlan analyze(UnresolvedPlan plan) {
return analyzer.analyze(plan, new AnalysisContext());
}

// 2.Generate optimal physical plan from logical plan
return planner.plan(logicalPlan);
/**
* Translate {@link LogicalPlan} to {@link PhysicalPlan}.
*/
public PhysicalPlan plan(LogicalPlan plan) {
return planner.plan(plan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.streaming;

import com.google.common.base.Preconditions;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;

/**
* Micro batch streaming execution.
*/
public class MicroBatchStreamingExecution {

private static final Logger log = LogManager.getLogger(MicroBatchStreamingExecution.class);

static final long INITIAL_LATEST_BATCH_ID = -1L;

private final StreamingSource source;

private final LogicalPlan batchPlan;

private final QueryService queryService;

/**
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
* that a given batch will always consist of the same data, we write to this log before any
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
private final MetadataLog<Offset> offsetLog;

/** keep track the latest commit batchId. */
private final MetadataLog<Offset> committedLog;

/**
* Constructor.
*/
public MicroBatchStreamingExecution(
StreamingSource source,
LogicalPlan batchPlan,
QueryService queryService,
MetadataLog<Offset> offsetLog,
MetadataLog<Offset> committedLog) {
this.source = source;
this.batchPlan = batchPlan;
this.queryService = queryService;
// todo. add offsetLog and committedLog offset recovery.
this.offsetLog = offsetLog;
this.committedLog = committedLog;
}

/**
* Pull the {@link Batch} from {@link StreamingSource} and execute the {@link Batch}.
*/
public void execute() {
Long latestBatchId = offsetLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID);
Long latestCommittedBatchId =
committedLog.getLatest().map(Pair::getKey).orElse(INITIAL_LATEST_BATCH_ID);
Optional<Offset> committedOffset = offsetLog.get(latestCommittedBatchId);
AtomicLong currentBatchId = new AtomicLong(INITIAL_LATEST_BATCH_ID);

if (latestBatchId.equals(latestCommittedBatchId)) {
// there are no unhandled Offset.
currentBatchId.set(latestCommittedBatchId + 1L);
} else {
Preconditions.checkArgument(
latestBatchId.equals(latestCommittedBatchId + 1L),
"[BUG] Expected latestBatchId - latestCommittedBatchId = 0 or 1, "
+ "but latestBatchId=%d, latestCommittedBatchId=%d",
latestBatchId,
latestCommittedBatchId);

// latestBatchId is not committed yet.
currentBatchId.set(latestBatchId);
}

Optional<Offset> availableOffsets = source.getLatestOffset();
if (hasNewData(availableOffsets, committedOffset)) {
Batch batch = source.getBatch(committedOffset, availableOffsets.get());
offsetLog.add(currentBatchId.get(), availableOffsets.get());
queryService.executePlan(
batchPlan,
new PlanContext(batch.getSplit()),
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse response) {
long finalBatchId = currentBatchId.get();
Offset finalAvailableOffsets = availableOffsets.get();
committedLog.add(finalBatchId, finalAvailableOffsets);
}

@Override
public void onFailure(Exception e) {
log.error("streaming processing failed. source = {}", source);
}
});
}
}

private boolean hasNewData(Optional<Offset> availableOffsets, Optional<Offset> committedOffset) {
if (availableOffsets.equals(committedOffset)) {
log.debug("source does not have new data, exit. source = {}", source);
return false;
} else {
Preconditions.checkArgument(
availableOffsets.isPresent(), "[BUG] available offsets must be no empty");

log.debug(
"source has new data. source = {}, availableOffsets:{}, committedOffset:{}",
source,
availableOffsets,
committedOffset);
return true;
}
}
}
31 changes: 31 additions & 0 deletions core/src/main/java/org/opensearch/sql/planner/PlanContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner;

import java.util.Optional;
import lombok.Getter;
import org.opensearch.sql.storage.split.Split;

/**
* Plan context hold planning related information.
*/
public class PlanContext {

@Getter
private final Optional<Split> split;

public PlanContext(Split split) {
this.split = Optional.of(split);
}

private PlanContext(Optional<Split> split) {
this.split = split;
}

public static PlanContext emptyPlanContext() {
return new PlanContext(Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.sql.executor;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

import java.util.Collections;
Expand All @@ -24,6 +26,7 @@
import org.opensearch.sql.analysis.Analyzer;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand Down Expand Up @@ -56,10 +59,13 @@ class QueryServiceTest {
@Mock
private ExecutionEngine.Schema schema;

@Mock
private PlanContext planContext;

@BeforeEach
public void setUp() {
when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
when(planner.plan(any())).thenReturn(plan);
lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
lenient().when(planner.plan(any())).thenReturn(plan);

queryService = new QueryService(analyzer, executionEngine, planner);
}
Expand All @@ -81,7 +87,7 @@ public void testExecuteShouldPass() {
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {

assertNotNull(pplQueryResponse);
}

@Override
Expand Down Expand Up @@ -110,7 +116,7 @@ public void testExplainShouldPass() {
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.ExplainResponse pplQueryResponse) {

assertNotNull(pplQueryResponse);
}

@Override
Expand Down Expand Up @@ -161,4 +167,51 @@ public void onFailure(Exception e) {
}
});
}

@Test
public void testExecutePlanShouldPass() {
doAnswer(
invocation -> {
ResponseListener<ExecutionEngine.QueryResponse> listener = invocation.getArgument(1);
listener.onResponse(
new ExecutionEngine.QueryResponse(schema, Collections.emptyList()));
return null;
})
.when(executionEngine)
.execute(any(), any());

queryService.executePlan(
logicalPlan,
planContext,
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {
assertNotNull(pplQueryResponse);
}

@Override
public void onFailure(Exception e) {
fail();
}
});
}

@Test
public void analyzeExceptionShouldBeCached() {
when(analyzer.analyze(any(), any())).thenThrow(IllegalStateException.class);

queryService.execute(
ast,
new ResponseListener<>() {
@Override
public void onResponse(ExecutionEngine.QueryResponse pplQueryResponse) {
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalStateException);
}
});
}
}
Loading

0 comments on commit b0cca57

Please sign in to comment.