Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit the number of active push queries everywhere using "ksql.max.push.queries" config #7109

Merged
merged 8 commits into from
Mar 10, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
revert some stuff
Chittaranjan Prasad committed Mar 9, 2021
commit 4e12b871d57ae16c0d06931858956423c13bc6d2
Original file line number Diff line number Diff line change
@@ -359,7 +359,7 @@ public void startAsync() {
apiServer.start();

final KsqlConfig ksqlConfigWithPort = buildConfigWithPort();
configurables.forEach(c -> c.configure(ksqlConfigWithPort, restConfig));
configurables.forEach(c -> c.configure(ksqlConfigWithPort));

startKsql(ksqlConfigWithPort);
final Properties metricsProperties = new Properties();
@@ -748,6 +748,7 @@ static KsqlRestApplication buildApplication(

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
restConfig,
commandStore,
Duration.ofMillis(
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)),
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@

package io.confluent.ksql.rest.server.resources;

import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.util.KsqlConfig;

public interface KsqlConfigurable {
@@ -24,7 +23,6 @@ public interface KsqlConfigurable {
* Called with the server config.
*
* @param config server config
* @param restConfig REST config
*/
void configure(KsqlConfig config, KsqlRestConfig restConfig);
void configure(KsqlConfig config);
}
Original file line number Diff line number Diff line change
@@ -97,6 +97,7 @@ public class StreamedQueryResource implements KsqlConfigurable {
@SuppressWarnings("checkstyle:ParameterNumber")
public StreamedQueryResource(
final KsqlEngine ksqlEngine,
final KsqlRestConfig ksqlRestConfig,
final CommandQueue commandQueue,
final Duration disconnectCheckInterval,
final Duration commandQueueCatchupTimeout,
@@ -112,6 +113,7 @@ public StreamedQueryResource(
) {
this(
ksqlEngine,
ksqlRestConfig,
new StatementParser(ksqlEngine),
commandQueue,
disconnectCheckInterval,
@@ -133,6 +135,7 @@ public StreamedQueryResource(
StreamedQueryResource(
// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
final KsqlEngine ksqlEngine,
final KsqlRestConfig ksqlRestConfig,
final StatementParser statementParser,
final CommandQueue commandQueue,
final Duration disconnectCheckInterval,
@@ -148,6 +151,7 @@ public StreamedQueryResource(
final Optional<LocalCommands> localCommands
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.ksqlRestConfig = Objects.requireNonNull(ksqlRestConfig, "ksqlRestConfig");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.disconnectCheckInterval =
@@ -169,13 +173,12 @@ public StreamedQueryResource(
}

@Override
public void configure(final KsqlConfig config, final KsqlRestConfig restConfig) {
public void configure(final KsqlConfig config) {
if (!config.getKsqlStreamConfigProps().containsKey(StreamsConfig.APPLICATION_SERVER_CONFIG)) {
throw new IllegalArgumentException("Need KS application server set");
}

ksqlConfig = config;
ksqlRestConfig = restConfig;
}

public EndpointResponse streamQuery(
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode;
import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage;
import static io.confluent.ksql.rest.entity.KsqlStatementErrorMessageMatchers.statement;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatementErrorMessage;
import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode;
@@ -187,6 +188,8 @@ public class StreamedQueryResourceTest {
@Mock
private KsqlConfig ksqlConfig;
@Mock
private KsqlRestConfig ksqlRestConfig;
@Mock
private PullQueryResult pullQueryResult;
@Mock
private LogicalSchema schema;
@@ -218,6 +221,7 @@ public void setup() {

testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,
@@ -301,6 +305,7 @@ public void shouldRateLimit() {

testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,
@@ -347,6 +352,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() {
// Given:
testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,
@@ -512,6 +518,7 @@ public void shouldThrowOnDenyListedStreamProperty() {
when(mockStatementParser.<Query>parseSingleStatement(PULL_QUERY_STRING)).thenReturn(query);
testResource = new StreamedQueryResource(
mockKsqlEngine,
ksqlRestConfig,
mockStatementParser,
commandQueue,
DISCONNECT_CHECK_INTERVAL,