diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 88f80d35173a..60c90fc0793a 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -204,6 +204,11 @@ public class KsqlConfig extends AbstractConfig { public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC = "Timeout in " + "milliseconds when waiting for rebalancing of the stream store during a pull query"; + public static final String KSQL_QUERY_PULL_METRICS_ENABLED = + "ksql.query.pull.metrics.enabled"; + public static final String KSQL_QUERY_PULL_METRICS_ENABLED_DOC = + "Config to enable/disable collecting JMX metrics for pull queries."; + public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS = ImmutableList.of(); @@ -632,6 +637,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, "Feature flag for removing restriction on key names - WIP, do not enable." ) + .define( + KSQL_QUERY_PULL_METRICS_ENABLED, + Type.BOOLEAN, + false, + Importance.LOW, + KSQL_QUERY_PULL_METRICS_ENABLED_DOC + ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/QueryStreamEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/QueryStreamEndpoint.java index 582944ac7012..2f760d300ef4 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/QueryStreamEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/endpoints/QueryStreamEndpoint.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; public class QueryStreamEndpoint { @@ -91,7 +92,8 @@ private QueryPublisher createPushQueryPublisher(final Context context, private QueryPublisher createPullQueryPublisher(final Context context, final ServiceContext serviceContext, final ConfiguredStatement statement) { - final TableRowsEntity tableRows = pullQueryExecutor.execute(statement, serviceContext); + final TableRowsEntity tableRows = pullQueryExecutor.execute( + statement, serviceContext, Optional.empty()); return new PullQueryPublisher(context, tableRows, colNamesFromSchema(tableRows.getSchema()), colTypesFromSchema(tableRows.getSchema())); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index f426f7510d0b..7ab26df88266 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -272,6 +272,7 @@ public static KsqlRestConfig convertToApiServerConfig(final KsqlRestConfig confi final List preconditions, final List configurables, final Consumer rocksDBConfigSetterHandler, + final PullQueryExecutor pullQueryExecutor, final Optional heartbeatAgent, final Optional lagReportingAgent ) { @@ -300,12 +301,11 @@ public static KsqlRestConfig convertToApiServerConfig(final KsqlRestConfig confi this.configurables = requireNonNull(configurables, "configurables"); this.rocksDBConfigSetterHandler = requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler"); + this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor"); this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent"); this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent"); - this.routingFilterFactory = initializeRoutingFilterFactory( ksqlConfigNoPort, heartbeatAgent, lagReportingAgent); - this.pullQueryExecutor = new PullQueryExecutor(ksqlEngine, routingFilterFactory); } @Override @@ -445,8 +445,14 @@ private void initialize(final KsqlConfig configWithApplicationServer) { serverState.setReady(); } + @SuppressWarnings("checkstyle:NPathComplexity") @Override public void triggerShutdown() { + try { + streamedQueryResource.closeMetrics(); + } catch (final Exception e) { + log.error("Exception while waiting for pull query metrics to close", e); + } try { ksqlEngine.close(); } catch (final Exception e) { @@ -717,7 +723,8 @@ static KsqlRestApplication buildApplication( heartbeatAgent, lagReportingAgent); final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( - ksqlEngine, routingFilterFactory); + ksqlEngine, routingFilterFactory, ksqlConfig); + final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, commandStore, @@ -792,6 +799,7 @@ static KsqlRestApplication buildApplication( preconditions, configurables, rocksDBConfigSetterHandler, + pullQueryExecutor, heartbeatAgent, lagReportingAgent ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index 88ff4a9f05bc..9f52907dac6f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -133,7 +133,8 @@ public final class PullQueryExecutor { public PullQueryExecutor( final KsqlExecutionContext executionContext, - final RoutingFilterFactory routingFilterFactory + final RoutingFilterFactory routingFilterFactory, + final KsqlConfig ksqlConfig ) { this.executionContext = Objects.requireNonNull(executionContext, "executionContext"); this.routingFilterFactory = @@ -152,7 +153,8 @@ public static void validate( public TableRowsEntity execute( final ConfiguredStatement statement, - final ServiceContext serviceContext + final ServiceContext serviceContext, + final Optional pullQueryMetrics ) { if (!statement.getStatement().isPullQuery()) { throw new IllegalArgumentException("Executor can only handle pull queries"); @@ -193,7 +195,8 @@ public TableRowsEntity execute( analysis, whereInfo, queryId, - contextStacker); + contextStacker, + pullQueryMetrics); return handlePullQuery( statement, @@ -201,8 +204,8 @@ public TableRowsEntity execute( serviceContext, pullQueryContext ); - } catch (final Exception e) { + pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1)); throw new KsqlStatementException( e.getMessage() == null ? "Server Error" : e.getMessage(), statement.getStatementText(), @@ -256,6 +259,8 @@ private TableRowsEntity routeQuery( if (node.isLocal()) { LOG.debug("Query {} executed locally at host {} at timestamp {}.", statement.getStatementText(), node.location(), System.currentTimeMillis()); + pullQueryContext.pullQueryMetrics + .ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordLocalRequests(1)); return queryRowsLocally( statement, executionContext, @@ -263,6 +268,8 @@ private TableRowsEntity routeQuery( } else { LOG.debug("Query {} routed to host {} at timestamp {}.", statement.getStatementText(), node.location(), System.currentTimeMillis()); + pullQueryContext.pullQueryMetrics + .ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1)); return forwardTo(node, statement, serviceContext); } } @@ -399,6 +406,7 @@ private static final class PullQueryContext { private final WhereInfo whereInfo; private final QueryId queryId; private final QueryContext.Stacker contextStacker; + private final Optional pullQueryMetrics; private PullQueryContext( final Struct key, @@ -406,7 +414,9 @@ private PullQueryContext( final ImmutableAnalysis analysis, final WhereInfo whereInfo, final QueryId queryId, - final QueryContext.Stacker contextStacker + final QueryContext.Stacker contextStacker, + final Optional pullQueryMetrics + ) { this.key = Objects.requireNonNull(key, "key"); this.mat = Objects.requireNonNull(mat, "materialization"); @@ -414,6 +424,8 @@ private PullQueryContext( this.whereInfo = Objects.requireNonNull(whereInfo, "whereInfo"); this.queryId = Objects.requireNonNull(queryId, "queryId"); this.contextStacker = Objects.requireNonNull(contextStacker, "contextStacker"); + this.pullQueryMetrics = Objects.requireNonNull( + pullQueryMetrics, "pullQueryExecutorMetrics"); } public Struct getKey() { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java new file mode 100644 index 000000000000..e46271e6ba44 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorMetrics.java @@ -0,0 +1,258 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.util.ReservedInternalTopics; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentile; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +public class PullQueryExecutorMetrics implements Closeable { + + private static final String PULL_QUERY_METRIC_GROUP = "pull-query"; + private static final String PULL_REQUESTS = "pull-query-requests"; + + private final List sensors; + private final Sensor localRequestsSensor; + private final Sensor remoteRequestsSensor; + private final Sensor latencySensor; + private final Sensor requestRateSensor; + private final Sensor errorRateSensor; + private final Metrics metrics; + private final Map customMetricsTags; + private final String ksqlServiceId; + + public PullQueryExecutorMetrics( + final String ksqlServiceId, + final Map customMetricsTags + ) { + + this.metrics = MetricCollectors.getMetrics(); + this.sensors = new ArrayList<>(); + this.localRequestsSensor = configureLocalRequestsSensor(); + this.remoteRequestsSensor = configureRemoteRequestsSensor(); + this.latencySensor = configureRequestSensor(); + this.requestRateSensor = configureRateSensor(); + this.errorRateSensor = configureErrorRateSensor(); + this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags"); + this.ksqlServiceId = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + ksqlServiceId; + } + + @Override + public void close() { + sensors.forEach(sensor -> metrics.removeSensor(sensor.name())); + } + + public void recordLocalRequests(final double value) { + this.localRequestsSensor.record(value); + } + + public void recordRemoteRequests(final double value) { + this.remoteRequestsSensor.record(value); + } + + public void recordRate(final double value) { + this.requestRateSensor.record(value); + } + + public void recordLatency(final double value) { + this.latencySensor.record(value); + } + + public void recordErrorRate(final double value) { + this.errorRateSensor.record(value); + } + + private Sensor configureLocalRequestsSensor() { + final Sensor sensor = metrics.sensor( + PULL_QUERY_METRIC_GROUP + "-" + PULL_REQUESTS + "-local"); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-local-count", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Count of local pull query requests", + customMetricsTags + ), + new WindowedCount() + ); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-local-rate", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Rate of local pull query requests", + customMetricsTags + ), + new Rate() + ); + sensors.add(sensor); + return sensor; + } + + private Sensor configureRemoteRequestsSensor() { + final Sensor sensor = metrics.sensor( + PULL_QUERY_METRIC_GROUP + "-" + PULL_REQUESTS + "-remote"); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-remote-count", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Count of remote pull query requests", + customMetricsTags + ), + new WindowedCount() + ); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-remote-rate", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Rate of remote pull query requests", + customMetricsTags + ), + new Rate() + ); + sensors.add(sensor); + return sensor; + } + + private Sensor configureRateSensor() { + final Sensor sensor = metrics.sensor( + PULL_QUERY_METRIC_GROUP + "-" + PULL_REQUESTS + "-rate"); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-rate", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Rate of pull query requests", + customMetricsTags + ), + new Rate() + ); + sensors.add(sensor); + return sensor; + } + + private Sensor configureErrorRateSensor() { + final Sensor sensor = metrics.sensor( + PULL_QUERY_METRIC_GROUP + "-" + PULL_REQUESTS + "-error-rate"); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-error-rate", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Rate of erroneous pull query requests", + customMetricsTags + ), + new Rate() + ); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-error-total", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Total number of erroneous pull query requests", + customMetricsTags + ), + new WindowedCount() + ); + + sensors.add(sensor); + return sensor; + } + + private Sensor configureRequestSensor() { + final Sensor sensor = metrics.sensor( + PULL_QUERY_METRIC_GROUP + "-" + PULL_REQUESTS + "-latency"); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-latency-avg", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Average time for a pull query request", + customMetricsTags + ), + new Avg() + ); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-latency-max", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Max time for a pull query request", + customMetricsTags + ), + new Max() + ); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-latency-min", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Min time for a pull query request", + customMetricsTags + ), + new Min() + ); + sensor.add( + metrics.metricName( + PULL_REQUESTS + "-total", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Total number of pull query request", + customMetricsTags + ), + new WindowedCount() + ); + sensor.add(new Percentiles( + 100, + 0, + 1000, + BucketSizing.CONSTANT, + new Percentile(metrics.metricName( + PULL_REQUESTS + "-distribution-50", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Latency distribution", + customMetricsTags + ), 50.0), + new Percentile(metrics.metricName( + PULL_REQUESTS + "-distribution-75", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Latency distribution", + customMetricsTags + ), 75.0), + new Percentile(metrics.metricName( + PULL_REQUESTS + "-distribution-90", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Latency distribution", + customMetricsTags + ), 90.0), + new Percentile(metrics.metricName( + PULL_REQUESTS + "-distribution-99", + ksqlServiceId + PULL_QUERY_METRIC_GROUP, + "Latency distribution", + customMetricsTags + ), 99.0) + )); + + sensors.add(sensor); + return sensor; + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index 55e3bc457dce..ffdacc6ccb6d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -29,6 +29,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.stream.Collectors; @@ -53,7 +54,7 @@ class PullQueryPublisher implements Flow.Publisher> { public synchronized void subscribe(final Subscriber> subscriber) { final PullQuerySubscription subscription = new PullQuerySubscription( subscriber, - () -> pullQueryExecutor.execute(query, serviceContext) + () -> pullQueryExecutor.execute(query, serviceContext, Optional.empty()) ); subscriber.onSubscribe(subscription); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 2a4bb66f4a2e..409b223dfbca 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -15,6 +15,8 @@ package io.confluent.ksql.rest.server.resources.streaming; +import static java.util.Optional.empty; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -32,6 +34,7 @@ import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics; import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.util.CommandStoreUtil; @@ -60,6 +63,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +86,8 @@ public class StreamedQueryResource implements KsqlConfigurable { private final Errors errorHandler; private KsqlConfig ksqlConfig; private final PullQueryExecutor pullQueryExecutor; + private Optional pullQueryMetrics; + private final Time time; public StreamedQueryResource( final KsqlEngine ksqlEngine, @@ -133,6 +139,7 @@ public StreamedQueryResource( this.authorizationValidator = authorizationValidator; this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor"); + this.time = Time.SYSTEM; } @Override @@ -142,6 +149,13 @@ public void configure(final KsqlConfig config) { } ksqlConfig = config; + final Boolean collectMetrics = ksqlConfig.getBoolean( + KsqlConfig.KSQL_QUERY_PULL_METRICS_ENABLED); + this.pullQueryMetrics = collectMetrics + ? Optional.of(new PullQueryExecutorMetrics( + ksqlEngine.getServiceId(), + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS))) + : empty(); } @POST @@ -149,6 +163,7 @@ public Response streamQuery( @Context final KsqlSecurityContext securityContext, final KsqlRequest request ) { + final long startTime = time.nanoseconds(); throwIfNotConfigured(); activenessRegistrar.updateLastRequestTime(); @@ -158,7 +173,11 @@ public Response streamQuery( CommandStoreUtil.httpWaitForCommandSequenceNumber( commandQueue, request, commandQueueCatchupTimeout); - return handleStatement(securityContext, request, statement); + return handleStatement(securityContext, request, statement, startTime); + } + + public void closeMetrics() { + pullQueryMetrics.ifPresent(PullQueryExecutorMetrics::close); } private void throwIfNotConfigured() { @@ -184,7 +203,8 @@ private PreparedStatement parseStatement(final KsqlRequest request) { private Response handleStatement( final KsqlSecurityContext securityContext, final KsqlRequest request, - final PreparedStatement statement + final PreparedStatement statement, + final long startTime ) { try { authorizationValidator.ifPresent(validator -> @@ -198,12 +218,19 @@ private Response handleStatement( final PreparedStatement queryStmt = (PreparedStatement) statement; if (queryStmt.getStatement().isPullQuery()) { - return handlePullQuery( - securityContext.getServiceContext(), + final Response response = handlePullQuery( + securityContext.getServiceContext(), queryStmt, request.getConfigOverrides(), request.getRequestProperties() ); + if (pullQueryMetrics.isPresent()) { + //Record latency at microsecond scale + final double latency = (time.nanoseconds() - startTime) / 1000f ; + pullQueryMetrics.get().recordLatency(latency); + pullQueryMetrics.get().recordRate(1); + } + return response; } return handlePushQuery( @@ -242,7 +269,7 @@ private Response handlePullQuery( ConfiguredStatement.of(statement, configOverrides, requestProperties, ksqlConfig); final TableRowsEntity entity = pullQueryExecutor - .execute(configured, serviceContext); + .execute(configured, serviceContext, pullQueryMetrics); final StreamedRow header = StreamedRow.header(entity.getQueryId(), entity.getSchema()); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 420b42cc9727..4935fead3321 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -42,6 +42,7 @@ import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder; +import io.confluent.ksql.rest.server.execution.PullQueryExecutor; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.resources.RootDocument; @@ -127,6 +128,8 @@ public class KsqlRestApplicationTest { @Mock private Consumer rocksDBConfigSetterHandler; @Mock + private PullQueryExecutor pullQueryExecutor; + @Mock private HeartbeatAgent heartbeatAgent; @Mock private LagReportingAgent lagReportingAgent; @@ -438,6 +441,7 @@ private void givenAppWithRestConfig(final Map restConfigMap) { ImmutableList.of(precondition1, precondition2), ImmutableList.of(ksqlResource, streamedQueryResource), rocksDBConfigSetterHandler, + pullQueryExecutor, Optional.of(heartbeatAgent), Optional.of(lagReportingAgent) ); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index e40a98b43b48..e43836128bfb 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.util.Optional; import org.eclipse.jetty.http.HttpStatus.Code; import org.junit.Rule; import org.junit.Test; @@ -70,14 +71,14 @@ public void shouldThrowExceptionIfConfigDisabled() { engine.getKsqlConfig() ); PullQueryExecutor pullQueryExecutor = new PullQueryExecutor( - engine.getEngine(), ROUTING_FILTER_FACTORY); + engine.getEngine(), ROUTING_FILTER_FACTORY, engine.getKsqlConfig()); // Then: expectedException.expect(KsqlException.class); expectedException.expectMessage(containsString("Pull queries are disabled")); // When: - pullQueryExecutor.execute(query, engine.getServiceContext()); + pullQueryExecutor.execute(query, engine.getServiceContext(), Optional.empty()); } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index 07b4dfaefc44..51cf264a1b8a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -36,6 +36,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.Collection; +import java.util.Optional; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -79,7 +80,7 @@ public void setUp() { statement, pullQueryExecutor); - when(pullQueryExecutor.execute(any(), any())).thenReturn(entity); + when(pullQueryExecutor.execute(any(), any(), any())).thenReturn(entity); when(entity.getSchema()).thenReturn(SCHEMA); doAnswer(callRequestAgain()).when(subscriber).onNext(any()); @@ -103,7 +104,7 @@ public void shouldRunQueryWithCorrectParams() { subscription.request(1); // Then: - verify(pullQueryExecutor).execute(statement, serviceContext); + verify(pullQueryExecutor).execute(statement, serviceContext, Optional.empty()); } @Test @@ -116,7 +117,7 @@ public void shouldOnlyExecuteOnce() { // Then: verify(subscriber).onNext(any()); - verify(pullQueryExecutor).execute(statement, serviceContext); + verify(pullQueryExecutor).execute(statement, serviceContext, Optional.empty()); } @Test @@ -151,7 +152,7 @@ public void shouldCallOnErrorOnFailure() { // Given: givenSubscribed(); final Throwable e = new RuntimeException("Boom!"); - when(pullQueryExecutor.execute(any(), any())).thenThrow(e); + when(pullQueryExecutor.execute(any(), any(), any())).thenThrow(e); // When: subscription.request(1); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 55287f08607d..849926c2270b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -181,7 +181,7 @@ public void setup() { securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext); pullQueryExecutor = new PullQueryExecutor( - mockKsqlEngine, ROUTING_FILTER_FACTORY); + mockKsqlEngine, ROUTING_FILTER_FACTORY, VALID_CONFIG); testResource = new StreamedQueryResource( mockKsqlEngine, mockStatementParser,