Skip to content

Commit

Permalink
Makes pull query interface async
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Jan 27, 2021
1 parent f211cc9 commit f9168bd
Show file tree
Hide file tree
Showing 48 changed files with 2,032 additions and 955 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.impl.BlockingQueryPublisher;
import io.confluent.ksql.api.server.PushQueryHandle;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.util.KeyValue;
Expand All @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
Expand All @@ -49,7 +50,7 @@ public Publisher<KeyValue<List<?>, GenericRow>> createPublisher(long elements) {
final Context context = vertx.getOrCreateContext();
BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
final TestQueryHandle queryHandle = new TestQueryHandle(elements);
publisher.setQueryHandle(queryHandle);
publisher.setQueryHandle(queryHandle, false);
if (elements < Integer.MAX_VALUE) {
for (long l = 0; l < elements; l++) {
queryHandle.queue.acceptRow(null, generateRow(l));
Expand All @@ -71,7 +72,7 @@ private static GenericRow generateRow(long num) {
return GenericRow.fromList(l);
}

private static class TestQueryHandle implements PushQueryHandle {
private static class TestQueryHandle implements QueryHandle {

private final TransientQueryQueue queue;

Expand Down Expand Up @@ -105,5 +106,9 @@ public void stop() {
public BlockingRowQueue getQueue() {
return queue;
}

@Override
public void onException(Consumer<Throwable> onException) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class BasePublisher<T> implements Publisher<T> {
private long demand;
private boolean cancelled;
private boolean sentComplete;
private volatile Exception failure;
private volatile Throwable failure;

public BasePublisher(final Context ctx) {
this.ctx = Objects.requireNonNull(ctx);
Expand Down Expand Up @@ -75,7 +75,7 @@ protected void checkContext() {
VertxUtils.checkContext(ctx);
}

protected final void sendError(final Exception e) {
protected final void sendError(final Throwable e) {
checkContext();
try {
if (subscriber != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
Expand Down Expand Up @@ -145,18 +144,19 @@ TransientQueryMetadata executeQuery(
* plan. The physical plan is then traversed for every row in the state store.
* @param serviceContext The service context to execute the query in
* @param statement The pull query
* @param routingFilterFactory The filters used to route requests for HA routing
* @param routingOptions Configuration parameters used for routing requests
* @param pullQueryMetrics JMX metrics
* @param startImmediately Whether to start the pull query immediately. If not, the caller must
* call PullQueryResult.start to start the query.
* @return the rows that are the result of the query evaluation.
*/
PullQueryResult executePullQuery(
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
HARouting routing,
RoutingFilterFactory routingFilterFactory,
RoutingOptions routingOptions,
Optional<PullQueryExecutorMetrics> pullQueryMetrics
Optional<PullQueryExecutorMetrics> pullQueryMetrics,
boolean startImmediately
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.metastore.model.DataSource;
Expand All @@ -41,6 +40,7 @@
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
import io.confluent.ksql.physical.pull.PullPhysicalPlanBuilder;
import io.confluent.ksql.physical.pull.PullQueryQueuePopulator;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.LogicalPlanner;
Expand All @@ -49,6 +49,7 @@
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryExecutor;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
Expand Down Expand Up @@ -137,17 +138,16 @@ ExecuteResult execute(final KsqlPlan plan) {
* Evaluates a pull query by first analyzing it, then building the logical plan and finally
* the physical plan. The execution is then done using the physical plan in a pipelined manner.
* @param statement The pull query
* @param routingFilterFactory The filters used for HA routing
* @param routingOptions Configuration parameters used for HA routing
* @param pullQueryMetrics JMX metrics
* @return the rows that are the result of evaluating the pull query
*/
PullQueryResult executePullQuery(
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final boolean startImmediately
) {

if (!statement.getStatement().isPullQuery()) {
Expand All @@ -168,10 +168,17 @@ PullQueryResult executePullQuery(
logicalPlan,
analysis
);
return routing.handlePullQuery(
final PullQueryQueue pullQueryQueue = new PullQueryQueue();
final PullQueryQueuePopulator populator = () -> routing.handlePullQuery(
serviceContext,
physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(),
physicalPlan.getQueryId());
physicalPlan.getQueryId(), pullQueryQueue);
final PullQueryResult result = new PullQueryResult(physicalPlan.getOutputSchema(), populator,
physicalPlan.getQueryId(), pullQueryQueue, pullQueryMetrics);
if (startImmediately) {
result.start();
}
return result;
} catch (final Exception e) {
pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1));
throw new KsqlStatementException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
Expand Down Expand Up @@ -270,9 +269,9 @@ public PullQueryResult executePullQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final boolean startImmediately
) {
return EngineExecutor
.create(
Expand All @@ -283,9 +282,9 @@ public PullQueryResult executePullQuery(
.executePullQuery(
statement,
routing,
routingFilterFactory,
routingOptions,
pullQueryMetrics
pullQueryMetrics,
startImmediately
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
Expand Down Expand Up @@ -164,9 +163,9 @@ public PullQueryResult executePullQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final boolean startImmediately
) {
return EngineExecutor.create(
engineContext,
Expand All @@ -175,9 +174,9 @@ public PullQueryResult executePullQuery(
).executePullQuery(
statement,
routing,
routingFilterFactory,
routingOptions,
pullQueryMetrics
pullQueryMetrics,
startImmediately
);
}
}
Loading

0 comments on commit f9168bd

Please sign in to comment.