diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java index 5a026e55acd0..4bfa1a989b87 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java @@ -109,10 +109,10 @@ public int validate( numPersistentQueries += validate(serviceContext, configured, sessionProperties, ctx, injector); - } - if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig, numPersistentQueries)) { - QueryCapacityUtil.throwTooManyActivePersistentQueriesException(ctx, ksqlConfig, sql); + if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig)) { + QueryCapacityUtil.throwTooManyActivePersistentQueriesException(ctx, ksqlConfig, sql); + } } return numPersistentQueries; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java index ae0056724364..2fbb85eaf1d7 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryCapacityUtil.java @@ -25,11 +25,9 @@ private QueryCapacityUtil() { public static boolean exceedsPersistentQueryCapacity( final KsqlExecutionContext executionContext, - final KsqlConfig ksqlConfig, - final long additionalQueries + final KsqlConfig ksqlConfig ) { - final long newTotal = executionContext.getPersistentQueries().size() + additionalQueries; - return newTotal > getQueryLimit(ksqlConfig); + return executionContext.getPersistentQueries().size() > getQueryLimit(ksqlConfig); } public static void throwTooManyActivePersistentQueriesException( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 8fa75432c619..6fbb6ca9983f 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -586,6 +586,27 @@ public void shouldListTablesStatement() { assertThat(tablesList.getTables(), contains(sourceTable("TEST_TABLE"))); } + @Test + public void shouldExecuteMaxNumberPersistentQueries() { + // Given: + final String sql = "CREATE STREAM S AS SELECT * FROM test_stream;"; + givenKsqlConfigWith(ImmutableMap.of( + KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, "1" + )); + setUpKsqlResource(); + + + // When: + makeSingleRequest(sql, CommandStatusEntity.class); + + // Then: + verify(commandStore).enqueueCommand( + any(), + argThat(is(commandWithStatement(sql))), + any(Producer.class) + ); + } + @Test public void shouldFailForIncorrectCSASStatementResultType() { // When: @@ -1491,7 +1512,8 @@ public void shouldFailIfReachedActivePersistentQueriesLimit() { givenMockEngine(); - givenPersistentQueryCount(3); + // mock 3 queries already running + 1 new query to execute + givenPersistentQueryCount(4); // When: final KsqlErrorMessage result = makeFailingRequest( @@ -1508,12 +1530,13 @@ public void shouldFailIfReachedActivePersistentQueriesLimit() { public void shouldFailAllCommandsIfWouldReachActivePersistentQueriesLimit() { // Given: givenKsqlConfigWith( - ImmutableMap.of(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, 3)); + ImmutableMap.of(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, 1)); final String ksqlString = "CREATE STREAM new_stream AS SELECT * FROM test_stream;" + "CREATE STREAM another_stream AS SELECT * FROM test_stream;"; givenMockEngine(); + // mock 2 new query to execute givenPersistentQueryCount(2); // When: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java index 71cfe642fef3..3ede452b6add 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java @@ -55,7 +55,10 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; +import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.Sandbox; + +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.junit.Before; @@ -197,6 +200,7 @@ public void shouldThrowIfNoValidatorAvailable() { public void shouldThrowIfTooManyPersistentQueries() { // Given: when(ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)).thenReturn(1); + givenPersistentQueryCount(2); final List statements = givenParsed( @@ -304,6 +308,13 @@ private void givenRequestValidator( ); } + @SuppressWarnings("unchecked") + private void givenPersistentQueryCount(final int value) { + final List queries = mock(List.class); + when(queries.size()).thenReturn(value); + when(ksqlEngine.getPersistentQueries()).thenReturn(queries); + } + @Sandbox private interface SandboxEngine extends KsqlExecutionContext { } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java index 2b3fef733e92..99a802e52adb 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/util/QueryCapacityUtilTest.java @@ -47,18 +47,7 @@ public void shouldReportCapacityExceededIfOverLimit() { givenQueryLimit(2); // Then: - assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 0), - equalTo(true)); - } - - @Test - public void shouldReportCapacityExceededIfTooManyQueriesAdded() { - // Given: - givenActivePersistentQueries(2); - givenQueryLimit(4); - - // Then: - assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 3), + assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig), equalTo(true)); } @@ -66,21 +55,10 @@ public void shouldReportCapacityExceededIfTooManyQueriesAdded() { public void shouldNotReportCapacityExceededIfReached() { // Given: givenActivePersistentQueries(2); - givenQueryLimit(4); - - // Then: - assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 2), - equalTo(false)); - } - - @Test - public void shouldNotReportCapacityExceededIfNotReached() { - // Given: - givenActivePersistentQueries(2); - givenQueryLimit(4); + givenQueryLimit(2); // Then: - assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 1), + assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig), equalTo(false)); }