diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index c42afec72fd5..fed50bc6340d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -33,9 +33,11 @@ import io.confluent.ksql.physical.pull.HARouting; import io.confluent.ksql.physical.pull.PullQueryResult; import io.confluent.ksql.query.BlockingRowQueue; +import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.LocalCommands; import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigPlannerOptions; import io.confluent.ksql.rest.server.resources.streaming.PullQueryConfigRoutingOptions; +import io.confluent.ksql.rest.util.QueryCapacityUtil; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.utils.FormatOptions; import io.confluent.ksql.services.ServiceContext; @@ -60,6 +62,7 @@ public class QueryEndpoint { private final KsqlEngine ksqlEngine; private final KsqlConfig ksqlConfig; + private final KsqlRestConfig ksqlRestConfig; private final RoutingFilterFactory routingFilterFactory; private final Optional pullQueryMetrics; private final RateLimiter rateLimiter; @@ -69,6 +72,7 @@ public class QueryEndpoint { public QueryEndpoint( final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig, + final KsqlRestConfig ksqlRestConfig, final RoutingFilterFactory routingFilterFactory, final Optional pullQueryMetrics, final RateLimiter rateLimiter, @@ -77,6 +81,7 @@ public QueryEndpoint( ) { this.ksqlEngine = ksqlEngine; this.ksqlConfig = ksqlConfig; + this.ksqlRestConfig = ksqlRestConfig; this.routingFilterFactory = routingFilterFactory; this.pullQueryMetrics = pullQueryMetrics; this.rateLimiter = rateLimiter; @@ -113,6 +118,14 @@ private QueryPublisher createPushQueryPublisher( ) { final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); + if (QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig)) { + QueryCapacityUtil.throwTooManyActivePushQueriesException( + ksqlEngine, + ksqlRestConfig, + statement.getStatementText() + ); + } + final TransientQueryMetadata queryMetadata = ksqlEngine .executeQuery(serviceContext, statement, true); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index d5fd0ca3039d..70ab34654c4a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -336,6 +336,7 @@ public void startAsync() { final Endpoints endpoints = new KsqlServerEndpoints( ksqlEngine, ksqlConfigNoPort, + restConfig, routingFilterFactory, ksqlSecurityContextProvider, ksqlResource, @@ -747,6 +748,7 @@ static KsqlRestApplication buildApplication( final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, + restConfig, commandStore, Duration.ofMillis( restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index 0ffffce98942..53def91ea92c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -68,6 +68,7 @@ public class KsqlServerEndpoints implements Endpoints { private final KsqlEngine ksqlEngine; private final KsqlConfig ksqlConfig; + private final KsqlRestConfig ksqlRestConfig; private final RoutingFilterFactory routingFilterFactory; private final ReservedInternalTopics reservedInternalTopics; private final KsqlSecurityContextProvider ksqlSecurityContextProvider; @@ -90,6 +91,7 @@ public class KsqlServerEndpoints implements Endpoints { public KsqlServerEndpoints( final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig, + final KsqlRestConfig ksqlRestConfig, final RoutingFilterFactory routingFilterFactory, final KsqlSecurityContextProvider ksqlSecurityContextProvider, final KsqlResource ksqlResource, @@ -111,6 +113,7 @@ public KsqlServerEndpoints( // CHECKSTYLE_RULES.ON: ParameterNumber this.ksqlEngine = Objects.requireNonNull(ksqlEngine); this.ksqlConfig = Objects.requireNonNull(ksqlConfig); + this.ksqlRestConfig = Objects.requireNonNull(ksqlRestConfig); this.routingFilterFactory = Objects.requireNonNull(routingFilterFactory); this.reservedInternalTopics = new ReservedInternalTopics(ksqlConfig); this.ksqlSecurityContextProvider = Objects.requireNonNull(ksqlSecurityContextProvider); @@ -141,8 +144,8 @@ public CompletableFuture createQueryPublisher(final String sql, return executeOnWorker(() -> { try { return new QueryEndpoint( - ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics, rateLimiter, routing, - localCommands) + ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics, + rateLimiter, routing, localCommands) .createQueryPublisher( sql, properties, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 8b4588b5c669..72c0a0561626 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -38,12 +38,14 @@ import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlMediaType; import io.confluent.ksql.rest.entity.KsqlRequest; +import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.LocalCommands; import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.util.CommandStoreUtil; +import io.confluent.ksql.rest.util.QueryCapacityUtil; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.security.KsqlSecurityContext; import io.confluent.ksql.services.ServiceContext; @@ -90,10 +92,12 @@ public class StreamedQueryResource implements KsqlConfigurable { private final Optional localCommands; private KsqlConfig ksqlConfig; + private KsqlRestConfig ksqlRestConfig; @SuppressWarnings("checkstyle:ParameterNumber") public StreamedQueryResource( final KsqlEngine ksqlEngine, + final KsqlRestConfig ksqlRestConfig, final CommandQueue commandQueue, final Duration disconnectCheckInterval, final Duration commandQueueCatchupTimeout, @@ -109,6 +113,7 @@ public StreamedQueryResource( ) { this( ksqlEngine, + ksqlRestConfig, new StatementParser(ksqlEngine), commandQueue, disconnectCheckInterval, @@ -130,6 +135,7 @@ public StreamedQueryResource( StreamedQueryResource( // CHECKSTYLE_RULES.OFF: ParameterNumberCheck final KsqlEngine ksqlEngine, + final KsqlRestConfig ksqlRestConfig, final StatementParser statementParser, final CommandQueue commandQueue, final Duration disconnectCheckInterval, @@ -145,6 +151,7 @@ public StreamedQueryResource( final Optional localCommands ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); + this.ksqlRestConfig = Objects.requireNonNull(ksqlRestConfig, "ksqlRestConfig"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); this.disconnectCheckInterval = @@ -353,6 +360,14 @@ private EndpointResponse handlePushQuery( final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, streamsProperties)); + if (QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig)) { + QueryCapacityUtil.throwTooManyActivePushQueriesException( + ksqlEngine, + ksqlRestConfig, + statement.getStatementText() + ); + } + final TransientQueryMetadata query = ksqlEngine .executeQuery(serviceContext, configured, false); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java index 2fbb85eaf1d7..f320a2d4e7f9 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.util; import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -53,4 +54,40 @@ public static void throwTooManyActivePersistentQueriesException( private static int getQueryLimit(final KsqlConfig ksqlConfig) { return ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG); } + + public static boolean exceedsPushQueryCapacity( + final KsqlExecutionContext executionContext, + final KsqlRestConfig ksqlRestConfig + ) { + return getNumLivePushQueries(executionContext) >= getPushQueryLimit(ksqlRestConfig); + } + + public static void throwTooManyActivePushQueriesException( + final KsqlExecutionContext executionContext, + final KsqlRestConfig ksqlRestConfig, + final String statementStr + ) { + throw new KsqlException( + String.format( + "Not executing statement(s) '%s' as it would cause the number " + + "of active, push queries to exceed the configured limit. " + + "Terminate existing PUSH queries, " + + "or increase the '%s' setting via the 'ksql-server.properties' file. " + + "Current push query count: %d. Configured limit: %d.", + statementStr, + KsqlRestConfig.MAX_PUSH_QUERIES, + getNumLivePushQueries(executionContext), + getPushQueryLimit(ksqlRestConfig) + ) + ); + } + + private static int getNumLivePushQueries(final KsqlExecutionContext ctx) { + return ctx.getAllLiveQueries().size() - ctx.getPersistentQueries().size(); + } + + private static int getPushQueryLimit(final KsqlRestConfig ksqlRestConfig) { + return ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES); + } + } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 05ddc7b5d478..2bdc36febfa6 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -15,40 +15,12 @@ package io.confluent.ksql.rest.server.resources.streaming; -import static io.confluent.ksql.GenericRow.genericRow; -import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; -import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS; -import static io.confluent.ksql.rest.Errors.badRequest; -import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode; -import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage; -import static io.confluent.ksql.rest.entity.KsqlStatementErrorMessageMatchers.statement; -import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage; -import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatementErrorMessage; -import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; -import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; -import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; -import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; +import static io.confluent.ksql.GenericRow.genericRow; import io.confluent.ksql.api.server.StreamingOutput; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -71,17 +43,27 @@ import io.confluent.ksql.rest.ApiJsonMapper; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.Errors; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_FORBIDDEN_KAFKA_ACCESS; +import static io.confluent.ksql.rest.Errors.badRequest; import io.confluent.ksql.rest.SessionProperties; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; +import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode; +import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage; import io.confluent.ksql.rest.entity.KsqlMediaType; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; +import static io.confluent.ksql.rest.entity.KsqlStatementErrorMessageMatchers.statement; import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.entity.StreamedRow.DataRow; +import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.resources.KsqlRestException; +import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage; +import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatementErrorMessage; +import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; import io.confluent.ksql.rest.server.validation.CustomValidators; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -97,6 +79,9 @@ import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata.ResultType; import io.confluent.ksql.version.metrics.ActivenessRegistrar; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; import java.io.EOFException; import java.io.IOException; import java.io.PipedInputStream; @@ -124,11 +109,26 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.codehaus.plexus.util.StringUtils; import org.hamcrest.CoreMatchers; +import static org.hamcrest.MatcherAssert.assertThat; import org.hamcrest.Matchers; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import org.mockito.Mock; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -187,6 +187,8 @@ public class StreamedQueryResourceTest { @Mock private KsqlConfig ksqlConfig; @Mock + private KsqlRestConfig ksqlRestConfig; + @Mock private PullQueryResult pullQueryResult; @Mock private LogicalSchema schema; @@ -218,6 +220,7 @@ public void setup() { testResource = new StreamedQueryResource( mockKsqlEngine, + ksqlRestConfig, mockStatementParser, commandQueue, DISCONNECT_CHECK_INTERVAL, @@ -301,6 +304,7 @@ public void shouldRateLimit() { testResource = new StreamedQueryResource( mockKsqlEngine, + ksqlRestConfig, mockStatementParser, commandQueue, DISCONNECT_CHECK_INTERVAL, @@ -347,6 +351,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { // Given: testResource = new StreamedQueryResource( mockKsqlEngine, + ksqlRestConfig, mockStatementParser, commandQueue, DISCONNECT_CHECK_INTERVAL, @@ -512,6 +517,7 @@ public void shouldThrowOnDenyListedStreamProperty() { when(mockStatementParser.parseSingleStatement(PULL_QUERY_STRING)).thenReturn(query); testResource = new StreamedQueryResource( mockKsqlEngine, + ksqlRestConfig, mockStatementParser, commandQueue, DISCONNECT_CHECK_INTERVAL, @@ -625,6 +631,8 @@ public void shouldStreamRowsCorrectly() throws Throwable { .of(query, SessionConfig.of(VALID_CONFIG, requestStreamsProperties)), false)) .thenReturn(transientQueryMetadata); + when(ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES)).thenReturn(Integer.MAX_VALUE); + final EndpointResponse response = testResource.streamQuery( securityContext, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java index 99a802e52adb..b380c93f5c43 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java @@ -15,21 +15,22 @@ package io.confluent.ksql.rest.util; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; +import io.confluent.ksql.util.QueryMetadata; import java.util.List; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThrows; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -39,6 +40,8 @@ public class QueryCapacityUtilTest { private KsqlEngine ksqlEngine; @Mock private KsqlConfig ksqlConfig; + @Mock + private KsqlRestConfig ksqlRestConfig; @Test public void shouldReportCapacityExceededIfOverLimit() { @@ -86,6 +89,54 @@ public void shouldThrowWhenAsked() { + "Current persistent query count: 3. Configured limit: 2.")); } + @Test + public void shouldReportPushQueryCapacityExceededIfOverLimit() { + // Given: + givenAllLiveQueries(10); + givenActivePersistentQueries(4); + givenPushQueryLimit(3); + + // Then: + assertThat(QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig), + equalTo(true)); + } + + @Test + public void shouldReportPushQueryAtCapacityLimit() { + // Given: + givenAllLiveQueries(10); + givenActivePersistentQueries(4); + givenPushQueryLimit(6); + + // Then: + assertThat(QueryCapacityUtil.exceedsPushQueryCapacity(ksqlEngine, ksqlRestConfig), + equalTo(true)); + } + + @Test + public void shouldThrowWhenPushQueryLimitExceeded() { + // Given: + final String statementStr = "my statement"; + givenAllLiveQueries(10); + givenActivePersistentQueries(4); + givenPushQueryLimit(3); + + // When: + final KsqlException e = assertThrows( + KsqlException.class, + () -> QueryCapacityUtil.throwTooManyActivePushQueriesException(ksqlEngine, ksqlRestConfig, statementStr) + ); + + // Then: + assertThat(e.getMessage(), containsString( + "Not executing statement(s) 'my statement' as it would cause the number " + + "of active, push queries to exceed the configured limit. " + + "Terminate existing PUSH queries, " + + "or increase the 'ksql.max.push.queries' setting " + + "via the 'ksql-server.properties' file. " + + "Current push query count: 6. Configured limit: 3.")); + } + @SuppressWarnings("unchecked") private void givenActivePersistentQueries(final int numQueries) { final List queries = mock(List.class); @@ -98,4 +149,15 @@ private void givenQueryLimit(final int queryLimit) { when(ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)) .thenReturn(queryLimit); } + + private void givenAllLiveQueries(final int numLiveQueries) { + final List queries = mock(List.class); + when(queries.size()).thenReturn(numLiveQueries); + when(ksqlEngine.getAllLiveQueries()).thenReturn(queries); + } + + private void givenPushQueryLimit(final int pushQueryLimit) { + when(ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES)) + .thenReturn(pushQueryLimit); + } }