Skip to content

Commit

Permalink
feat: introduce branches for pull queries on streams (#8045)
Browse files Browse the repository at this point in the history
  • Loading branch information
vvcephei authored Aug 27, 2021
1 parent 180832b commit 886da99
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ static PersistentQueryMetadata findMaterializingQuery(
final Set<QueryId> queries = engineContext.getQueryRegistry().getQueriesWithSink(sourceName);

if (source.getDataSourceType() != DataSourceType.KTABLE) {
throw new KsqlException("Pull queries are not supported on streams."
throw new KsqlException("Unexpected data source type for table pull query: "
+ source.getDataSourceType() + " "
+ PullQueryValidator.PULL_QUERY_SYNTAX_HELP);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.PullQueryValidator;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
Expand All @@ -29,6 +30,7 @@
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
Expand Down Expand Up @@ -133,15 +135,31 @@ public QueryPublisher createQueryPublisher(
if (statement.getStatement().isPullQuery()) {
final ImmutableAnalysis analysis = ksqlEngine
.analyzeQueryWithNoOutputTopic(statement.getStatement(), statement.getStatementText());
return createTablePullQueryPublisher(
analysis,
context,
serviceContext,
statement,
pullQueryMetrics,
workerExecutor,
metricsCallbackHolder
);
final DataSource dataSource = analysis.getFrom().getDataSource();
final DataSource.DataSourceType dataSourceType = dataSource.getDataSourceType();
switch (dataSourceType) {
case KTABLE:
return createTablePullQueryPublisher(
analysis,
context,
serviceContext,
statement,
pullQueryMetrics,
workerExecutor,
metricsCallbackHolder
);
case KSTREAM:
throw new KsqlStatementException(
"Pull queries are not supported on streams."
+ PullQueryValidator.PULL_QUERY_SYNTAX_HELP,
statement.getStatementText()
);
default:
throw new KsqlStatementException(
"Unexpected data source type for pull query: " + dataSourceType,
statement.getStatementText()
);
}
} else if (ScalablePushUtil.isScalablePushQuery(statement.getStatement(), ksqlEngine,
ksqlConfig, properties)) {
final ImmutableAnalysis analysis = ksqlEngine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
Expand Down Expand Up @@ -324,6 +325,8 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext,
if (statement.getStatement().isPullQuery()) {
final ImmutableAnalysis analysis = ksqlEngine
.analyzeQueryWithNoOutputTopic(statement.getStatement(), statement.getStatementText());
final DataSource dataSource = analysis.getFrom().getDataSource();
final DataSource.DataSourceType dataSourceType = dataSource.getDataSourceType();

// First thing, set the metrics callback so that it gets called, even if we hit an error
final AtomicReference<PullQueryResult> resultForMetrics = new AtomicReference<>(null);
Expand Down Expand Up @@ -376,19 +379,35 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext,
statement.getStatementText());
}

final SessionConfig sessionConfig = SessionConfig.of(ksqlConfig, configProperties);
final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, sessionConfig);
return handleTablePullQuery(
analysis,
securityContext.getServiceContext(),
configured,
request.getRequestProperties(),
isInternalRequest,
connectionClosedFuture,
pullBandRateLimiter,
resultForMetrics
);
switch (dataSourceType) {
case KTABLE: {
final SessionConfig sessionConfig = SessionConfig.of(ksqlConfig, configProperties);
final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, sessionConfig);
return handleTablePullQuery(
analysis,
securityContext.getServiceContext(),
configured,
request.getRequestProperties(),
isInternalRequest,
connectionClosedFuture,
pullBandRateLimiter,
resultForMetrics
);
}
case KSTREAM: {
throw new KsqlStatementException(
"Pull queries are not supported on streams."
+ PullQueryValidator.PULL_QUERY_SYNTAX_HELP,
statement.getStatementText()
);
}
default:
throw new KsqlStatementException(
"Unexpected data source type for pull query: " + dataSourceType,
statement.getStatementText()
);
}
} else if (ScalablePushUtil
.isScalablePushQuery(statement.getStatement(), ksqlEngine, ksqlConfig,
configProperties)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.analyzer.PullQueryValidator;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
Expand All @@ -50,6 +52,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
Expand Down Expand Up @@ -330,23 +333,41 @@ private void handleQuery(final RequestContext info, final Query query,

final ImmutableAnalysis analysis = ksqlEngine
.analyzeQueryWithNoOutputTopic(configured.getStatement(), configured.getStatementText());


pullQueryPublisher.start(
ksqlEngine,
info.securityContext.getServiceContext(),
exec,
configured,
analysis,
streamSubscriber,
pullQueryMetrics,
startTimeNanos,
routingFilterFactory,
rateLimiter,
pullConcurrencyLimiter,
pullBandRateLimiter,
routing
);
final DataSource dataSource = analysis.getFrom().getDataSource();
final DataSource.DataSourceType dataSourceType = dataSource.getDataSourceType();

switch (dataSourceType) {
case KTABLE: {
pullQueryPublisher.start(
ksqlEngine,
info.securityContext.getServiceContext(),
exec,
configured,
analysis,
streamSubscriber,
pullQueryMetrics,
startTimeNanos,
routingFilterFactory,
rateLimiter,
pullConcurrencyLimiter,
pullBandRateLimiter,
routing
);
return;
}
case KSTREAM: {
throw new KsqlStatementException(
"Pull queries are not supported on streams."
+ PullQueryValidator.PULL_QUERY_SYNTAX_HELP,
statement.getStatementText()
);
}
default:
throw new KsqlStatementException(
"Unexpected data source type for pull query: " + dataSourceType,
statement.getStatementText()
);
}
} else if (ScalablePushUtil.isScalablePushQuery(
statement.getStatement(), ksqlEngine, ksqlConfig, clientLocalProperties)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.analyzer.Analysis.AliasedDataSource;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.StreamingOutput;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.api.server.StreamingOutput;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.PullQueryExecutionUtil;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
Expand Down Expand Up @@ -225,6 +228,12 @@ public class StreamedQueryResourceTest {
private Context context;
@Mock
private DistributingExecutor distributingExecutor;
@Mock
private ImmutableAnalysis mockAnalysis;
@Mock
private AliasedDataSource mockAliasedDataSource;
@Mock
private DataSource mockDataSource;

private StreamedQueryResource testResource;
private PreparedStatement<Statement> invalid;
Expand All @@ -246,6 +255,10 @@ public void setup() {
when(errorsHandler.accessDeniedFromKafkaResponse(any(Exception.class))).thenReturn(AUTHORIZATION_ERROR_RESPONSE);
when(errorsHandler.generateResponse(exception.capture(), any()))
.thenReturn(EndpointResponse.failed(500));
when(mockKsqlEngine.analyzeQueryWithNoOutputTopic(any(), any())).thenReturn(mockAnalysis);
when(mockAnalysis.getFrom()).thenReturn(mockAliasedDataSource);
when(mockAliasedDataSource.getDataSource()).thenReturn(mockDataSource);
when(mockDataSource.getDataSourceType()).thenReturn(DataSourceType.KSTREAM);

securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext);

Expand Down Expand Up @@ -274,10 +287,20 @@ public void setup() {
}

@Test
public void shouldThrowExceptionIfConfigDisabled() {
public void shouldThrowExceptionIfConfigDisabledStream() {
shouldThrowExceptionIfConfigDisabled(DataSourceType.KSTREAM);
}

@Test
public void shouldThrowExceptionIfConfigDisabledTable() {
shouldThrowExceptionIfConfigDisabled(DataSourceType.KTABLE);
}

private void shouldThrowExceptionIfConfigDisabled(final DataSourceType dataSourceType) {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps())
.thenReturn(ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1"));
when(mockDataSource.getDataSourceType()).thenReturn(dataSourceType);
testResource.configure(ksqlConfig);

final String errorMsg = "Pull queries are disabled. See https://cnfl.io/queries for more info.\n"
Expand Down Expand Up @@ -362,6 +385,7 @@ public void shouldRateLimit() {
when(mockKsqlEngine.executeTablePullQuery(any(), any(), any(), any(), any(), any(), any(), anyBoolean()))
.thenReturn(pullQueryResult);
when(pullQueryResult.getPullQueryQueue()).thenReturn(pullQueryQueue);
when(mockDataSource.getDataSourceType()).thenReturn(DataSourceType.KTABLE);

// When:
testResource.streamQuery(
Expand Down Expand Up @@ -418,6 +442,7 @@ public void shouldReachConcurrentLimit() {
// Given:
when(rateLimiter.tryAcquire()).thenReturn(true);
when(concurrencyLimiter.increment()).thenThrow(new KsqlException("concurrencyLimiter Error!"));
when(mockDataSource.getDataSourceType()).thenReturn(DataSourceType.KTABLE);

// When:
final EndpointResponse response =
Expand Down

0 comments on commit 886da99

Please sign in to comment.