Skip to content

Commit

Permalink
fix: KSQL does not accept more queries when running QueryLimit - 1 qu…
Browse files Browse the repository at this point in the history
…eries (#5461)
  • Loading branch information
spena authored May 29, 2020
1 parent 71b1593 commit d64f1bc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public int validate(

numPersistentQueries +=
validate(serviceContext, configured, sessionProperties, ctx, injector);
}

if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig, numPersistentQueries)) {
QueryCapacityUtil.throwTooManyActivePersistentQueriesException(ctx, ksqlConfig, sql);
if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig)) {
QueryCapacityUtil.throwTooManyActivePersistentQueriesException(ctx, ksqlConfig, sql);
}
}

return numPersistentQueries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ private QueryCapacityUtil() {

public static boolean exceedsPersistentQueryCapacity(
final KsqlExecutionContext executionContext,
final KsqlConfig ksqlConfig,
final long additionalQueries
final KsqlConfig ksqlConfig
) {
final long newTotal = executionContext.getPersistentQueries().size() + additionalQueries;
return newTotal > getQueryLimit(ksqlConfig);
return executionContext.getPersistentQueries().size() > getQueryLimit(ksqlConfig);
}

public static void throwTooManyActivePersistentQueriesException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,27 @@ public void shouldListTablesStatement() {
assertThat(tablesList.getTables(), contains(sourceTable("TEST_TABLE")));
}

@Test
public void shouldExecuteMaxNumberPersistentQueries() {
// Given:
final String sql = "CREATE STREAM S AS SELECT * FROM test_stream;";
givenKsqlConfigWith(ImmutableMap.of(
KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, "1"
));
setUpKsqlResource();


// When:
makeSingleRequest(sql, CommandStatusEntity.class);

// Then:
verify(commandStore).enqueueCommand(
any(),
argThat(is(commandWithStatement(sql))),
any(Producer.class)
);
}

@Test
public void shouldFailForIncorrectCSASStatementResultType() {
// When:
Expand Down Expand Up @@ -1491,7 +1512,8 @@ public void shouldFailIfReachedActivePersistentQueriesLimit() {

givenMockEngine();

givenPersistentQueryCount(3);
// mock 3 queries already running + 1 new query to execute
givenPersistentQueryCount(4);

// When:
final KsqlErrorMessage result = makeFailingRequest(
Expand All @@ -1508,12 +1530,13 @@ public void shouldFailIfReachedActivePersistentQueriesLimit() {
public void shouldFailAllCommandsIfWouldReachActivePersistentQueriesLimit() {
// Given:
givenKsqlConfigWith(
ImmutableMap.of(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, 3));
ImmutableMap.of(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, 1));

final String ksqlString = "CREATE STREAM new_stream AS SELECT * FROM test_stream;"
+ "CREATE STREAM another_stream AS SELECT * FROM test_stream;";
givenMockEngine();

// mock 2 new query to execute
givenPersistentQueryCount(2);

// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.Sandbox;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Before;
Expand Down Expand Up @@ -197,6 +200,7 @@ public void shouldThrowIfNoValidatorAvailable() {
public void shouldThrowIfTooManyPersistentQueries() {
// Given:
when(ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)).thenReturn(1);
givenPersistentQueryCount(2);

final List<ParsedStatement> statements =
givenParsed(
Expand Down Expand Up @@ -304,6 +308,13 @@ private void givenRequestValidator(
);
}

@SuppressWarnings("unchecked")
private void givenPersistentQueryCount(final int value) {
final List<PersistentQueryMetadata> queries = mock(List.class);
when(queries.size()).thenReturn(value);
when(ksqlEngine.getPersistentQueries()).thenReturn(queries);
}

@Sandbox
private interface SandboxEngine extends KsqlExecutionContext {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,18 @@ public void shouldReportCapacityExceededIfOverLimit() {
givenQueryLimit(2);

// Then:
assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 0),
equalTo(true));
}

@Test
public void shouldReportCapacityExceededIfTooManyQueriesAdded() {
// Given:
givenActivePersistentQueries(2);
givenQueryLimit(4);

// Then:
assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 3),
assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig),
equalTo(true));
}

@Test
public void shouldNotReportCapacityExceededIfReached() {
// Given:
givenActivePersistentQueries(2);
givenQueryLimit(4);

// Then:
assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 2),
equalTo(false));
}

@Test
public void shouldNotReportCapacityExceededIfNotReached() {
// Given:
givenActivePersistentQueries(2);
givenQueryLimit(4);
givenQueryLimit(2);

// Then:
assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig, 1),
assertThat(QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, ksqlConfig),
equalTo(false));
}

Expand Down

0 comments on commit d64f1bc

Please sign in to comment.