Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Streaming Plan Impl #1068

Merged
7 changes: 1 addition & 6 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@ plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'java-test-fixtures'
}

repositories {
mavenCentral()
}
//
//configurations.all {
// resolutionStrategy.dependencySubstitution {
// substitute module('com.google.guava:guava:26.0-jre') with module('com.google.guava:guava:29.0-jre')
// }
//}

dependencies {
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@RequiredArgsConstructor
public class Query extends Statement {

private final UnresolvedPlan plan;
protected final UnresolvedPlan plan;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor;

import java.util.Optional;
import lombok.Getter;
import org.opensearch.sql.storage.split.Split;

/**
* Execution context hold planning related information.
*/
public class ExecutionContext {
@Getter
private final Optional<Split> split;

public ExecutionContext(Split split) {
this.split = Optional.of(split);
}

private ExecutionContext(Optional<Split> split) {
this.split = split;
}

public static ExecutionContext emptyExecutionContext() {
return new ExecutionContext(Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ public interface ExecutionEngine {

/**
* Execute physical plan and call back response listener.
* Todo. deprecated this interface after finalize {@link ExecutionContext}.
*
* @param plan executable physical plan
* @param listener response listener
*/
void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener);

/**
* Execute physical plan with {@link ExecutionContext} and call back response listener.
*/
void execute(PhysicalPlan plan, ExecutionContext context,
ResponseListener<QueryResponse> listener);

/**
* Explain physical plan and call back response listener. The reason why this has to
* be part of execution engine interface is that the physical plan probably needs to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,13 @@ public interface QueryManager {
* @return {@link QueryId}.
*/
QueryId submit(AbstractPlan queryPlan);

/**
* Cancel submitted {@link AbstractPlan} by {@link QueryId}.
*
* @return true indicate successful.
*/
default boolean cancel(QueryId queryId) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ public void executePlan(LogicalPlan plan,
PlanContext planContext,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executionEngine.execute(plan(plan), listener);
planContext
.getSplit()
.ifPresentOrElse(
split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener),
() -> executionEngine.execute(
plan(plan), ExecutionContext.emptyExecutionContext(), listener));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public class QueryPlan extends AbstractPlan {
/**
* The query plan ast.
*/
private final UnresolvedPlan plan;
protected final UnresolvedPlan plan;

/**
* Query service.
*/
private final QueryService queryService;
protected final QueryService queryService;

private final ResponseListener<ExecutionEngine.QueryResponse> listener;
protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** constructor. */
public QueryPlan(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MicroBatchStreamingExecution;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.logical.LogicalRelation;

/**
* Streaming Query Plan.
*/
public class StreamingQueryPlan extends QueryPlan {

private static final Logger log = LogManager.getLogger(StreamingQueryPlan.class);

private final ExecutionStrategy executionStrategy;

private MicroBatchStreamingExecution streamingExecution;

/**
* constructor.
*/
public StreamingQueryPlan(QueryId queryId,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener,
ExecutionStrategy executionStrategy) {
super(queryId, plan, queryService, listener);

this.executionStrategy = executionStrategy;
}

@Override
public void execute() {
try {
LogicalPlan logicalPlan = queryService.analyze(plan);
StreamingSource streamingSource = buildStreamingSource(logicalPlan);
streamingExecution =
new MicroBatchStreamingExecution(
streamingSource,
logicalPlan,
queryService,
new DefaultMetadataLog<>(),
new DefaultMetadataLog<>());
executionStrategy.execute(streamingExecution::execute);
} catch (UnsupportedOperationException | IllegalArgumentException e) {
listener.onFailure(e);
} catch (InterruptedException e) {
log.error(e);
// todo, update async task status.
}
}

interface ExecutionStrategy {
/**
* execute task.
*/
void execute(Runnable task) throws InterruptedException;
}

/**
* execute task with fixed interval.
* if task run time < interval, trigger next task on next interval.
* if task run time >= interval, trigger next task immediately.
*/
@RequiredArgsConstructor
public static class IntervalTriggerExecution implements ExecutionStrategy {

private final long intervalInSeconds;

@Override
public void execute(Runnable runnable) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
try {
Instant start = Instant.now();
runnable.run();
Instant end = Instant.now();
long took = Duration.between(start, end).toSeconds();
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
TimeUnit.SECONDS.sleep(intervalInSeconds > took ? intervalInSeconds - took : 0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

private StreamingSource buildStreamingSource(LogicalPlan logicalPlan) {
return logicalPlan.accept(new StreamingSourceBuilder(), null);
}

static class StreamingSourceBuilder extends LogicalPlanNodeVisitor<StreamingSource, Void> {
@Override
public StreamingSource visitNode(LogicalPlan plan, Void context) {
List<LogicalPlan> children = plan.getChild();
if (children.isEmpty()) {
String errorMsg =
String.format(
"Could find relation plan, %s does not have child node.",
plan.getClass().getSimpleName());
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
return children.get(0).accept(this, context);
}

@Override
public StreamingSource visitRelation(LogicalRelation plan, Void context) {
try {
return plan.getTable().asStreamingSource();
} catch (UnsupportedOperationException e) {
String errorMsg =
String.format(
"table %s could not been used as streaming source.", plan.getRelationName());
log.error(errorMsg);
throw new UnsupportedOperationException(errorMsg);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void onResponse(ExecutionEngine.QueryResponse response) {

@Override
public void onFailure(Exception e) {
log.error("streaming processing failed. source = {}", source);
log.error("streaming processing failed. source = {}", source, e);
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.planner.PlanNode;
import org.opensearch.sql.storage.split.Split;

/**
* Physical plan.
Expand All @@ -36,6 +37,10 @@ public void close() {
getChild().forEach(PhysicalPlan::close);
}

public void add(Split split) {
getChild().forEach(child -> child.add(split));
}

public ExecutionEngine.Schema schema() {
throw new IllegalStateException(String.format("[BUG] schema can been only applied to "
+ "ProjectOperator, instead of %s", toString()));
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Map;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;

Expand Down Expand Up @@ -58,4 +59,10 @@ default LogicalPlan optimize(LogicalPlan plan) {
return plan;
}

/**
* Translate {@link Table} to {@link StreamingSource} if possible.
*/
default StreamingSource asStreamingSource() {
throw new UnsupportedOperationException();
}
}

This file was deleted.

Loading