diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 33bd9a6e2f30..f24973f70b75 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -258,6 +258,14 @@ public class KsqlConfig extends AbstractConfig { = "The maximum amount of pull query bandwidth in megabytes allowed over" + " a period of one hour. Once the limit is hit, queries will fail immediately"; + public static final String KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG + = "ksql.query.push.v2.max.hourly.bandwidth.megabytes"; + public static final Integer KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT + = Integer.MAX_VALUE; + public static final String KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC + = "The maximum amount of v2 push query bandwidth in megabytes allowed over" + + " a period of one hour. Once the limit is hit, queries will fail immediately"; + public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG = "ksql.query.pull.thread.pool.size"; public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 100; @@ -927,6 +935,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.HIGH, KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC ) + .define( + KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG, + Type.INT, + KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DEFAULT, + Importance.HIGH, + KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_DOC + ) .define( KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG, Type.INT, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 820bc92e85a9..55684484ce66 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -84,6 +84,7 @@ public class QueryEndpoint { private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; + private final SlidingWindowRateLimiter scalablePushBandRateLimiter; private final HARouting routing; private final PushRouting pushRouting; private final Optional localCommands; @@ -100,6 +101,7 @@ public QueryEndpoint( final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, final SlidingWindowRateLimiter pullBandLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -112,6 +114,7 @@ public QueryEndpoint( this.rateLimiter = rateLimiter; this.pullConcurrencyLimiter = pullConcurrencyLimiter; this.pullBandRateLimiter = pullBandLimiter; + this.scalablePushBandRateLimiter = scalablePushBandRateLimiter; this.routing = routing; this.pushRouting = pushRouting; this.localCommands = localCommands; @@ -170,7 +173,8 @@ public QueryPublisher createQueryPublisher( serviceContext, statement, workerExecutor, - requestProperties + requestProperties, + metricsCallbackHolder ); } else { return createPushQueryPublisher(context, serviceContext, statement, workerExecutor); @@ -183,9 +187,15 @@ private QueryPublisher createScalablePushQueryPublisher( final ServiceContext serviceContext, final ConfiguredStatement statement, final WorkerExecutor workerExecutor, - final Map requestProperties + final Map requestProperties, + final MetricsCallbackHolder metricsCallbackHolder ) { - final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); + metricsCallbackHolder.setCallback((statusCode, requestBytes, responseBytes, startTimeNanos) -> { + scalablePushBandRateLimiter.add(responseBytes); + }); + + final BlockingQueryPublisher publisher = + new BlockingQueryPublisher(context, workerExecutor); final PushQueryConfigRoutingOptions routingOptions = new PushQueryConfigRoutingOptions(requestProperties); @@ -194,6 +204,8 @@ private QueryPublisher createScalablePushQueryPublisher( ksqlConfig, statement.getSessionConfig().getOverrides()); + scalablePushBandRateLimiter.allow(); + final ScalablePushQueryMetadata query = ksqlEngine .executeScalablePushQuery(analysis, serviceContext, statement, pushRouting, routingOptions, plannerOptions, context); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java index 7d049abb63bf..860c3a113037 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java @@ -26,7 +26,7 @@ /** * SlidingWindowRateLimiter keeps a log of timestamps and the size for each response returned by - * pull queries. When a response comes, we first pop all outdated timestamps outside of past hour + * queries. When a response comes, we first pop all outdated timestamps outside of past hour * before appending the new response time and size to the log. Then we decide whether this response * should be processed depending on whether the log size has exceeded the throttleLimit. * Many rate limiters require you to ask for access before it's granted whereas this method always @@ -55,15 +55,15 @@ public class SlidingWindowRateLimiter { private final long slidingWindowSizeMs; /** - * Aggregate of pull query response sizes in the past hour + * Aggregate of query response sizes in the past hour */ private long numBytesInWindow; public SlidingWindowRateLimiter(final int requestLimitInMB, final long slidingWindowSizeMs) { checkArgument(requestLimitInMB >= 0, - "Pull Query bandwidth limit can't be negative."); + "Query bandwidth limit can't be negative."); checkArgument(slidingWindowSizeMs >= 0, - "Pull Query throttle window size can't be negative"); + "Query throttle window size can't be negative"); this.throttleLimit = (long) requestLimitInMB * NUM_BYTES_IN_ONE_MEGABYTE; this.slidingWindowSizeMs = slidingWindowSizeMs; @@ -72,9 +72,9 @@ public SlidingWindowRateLimiter(final int requestLimitInMB, final long slidingWi } /** - * Checks if pull queries have returned more than the throttleLimit in the past hour. + * Checks if queries have returned more than the throttleLimit in the past hour. * Throws a KsqlException is the limit has been breached - * @throws KsqlException Exception that the throttle limit has been reached for pull queries + * @throws KsqlException Exception that the throttle limit has been reached for queries */ public synchronized void allow() throws KsqlException { this.allow(Time.SYSTEM.milliseconds()); @@ -90,14 +90,14 @@ protected synchronized void allow(final long timestamp) throws KsqlException { this.numBytesInWindow -= responseSizesLog.poll().right; } if (this.numBytesInWindow > throttleLimit) { - throw new KsqlException("Host is at bandwidth rate limit for pull queries."); + throw new KsqlException("Host is at bandwidth rate limit for queries."); } } /** * Adds the responseSizeInBytes and its timestamp to the queue of all response sizes * in the past hour. - * @param responseSizeInBytes pull query response size measured in Bytes + * @param responseSizeInBytes query response size measured in Bytes */ public synchronized void add(final long responseSizeInBytes) { add(Time.SYSTEM.milliseconds(), responseSizeInBytes); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 51e1f2980949..55077e9b6dfb 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -201,6 +201,7 @@ public final class KsqlRestApplication implements Executable { private final RateLimiter pullQueryRateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; + private final SlidingWindowRateLimiter scalablePushBandRateLimiter; private final HARouting pullQueryRouting; private final Optional localCommands; @@ -243,6 +244,7 @@ public static SourceName getCommandsStreamName() { final RateLimiter pullQueryRateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, final HARouting pullQueryRouting, final PushRouting pushQueryRouting, final Optional localCommands @@ -302,6 +304,8 @@ public static SourceName getCommandsStreamName() { this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter"); this.pullConcurrencyLimiter = requireNonNull(pullConcurrencyLimiter, "pullConcurrencyLimiter"); this.pullBandRateLimiter = requireNonNull(pullBandRateLimiter, "pullBandRateLimiter"); + this.scalablePushBandRateLimiter = + requireNonNull(scalablePushBandRateLimiter, "scalablePushBandRateLimiter"); this.pullQueryRouting = requireNonNull(pullQueryRouting, "pullQueryRouting"); this.pushQueryRouting = pushQueryRouting; this.localCommands = requireNonNull(localCommands, "localCommands"); @@ -349,6 +353,7 @@ public void startAsync() { pullQueryRateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, pullQueryRouting, localCommands, pushQueryRouting @@ -376,6 +381,7 @@ public void startAsync() { pullQueryRateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, pullQueryRouting, pushQueryRouting, localCommands @@ -792,6 +798,10 @@ static KsqlRestApplication buildApplication( final SlidingWindowRateLimiter pullBandRateLimiter = new SlidingWindowRateLimiter( ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG), NUM_MILLISECONDS_IN_HOUR); + final SlidingWindowRateLimiter scalablePushBandRateLimiter = new SlidingWindowRateLimiter( + ksqlConfig.getInt( + KsqlConfig.KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG), + NUM_MILLISECONDS_IN_HOUR); final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator( ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST)); @@ -826,6 +836,7 @@ static KsqlRestApplication buildApplication( pullQueryRateLimiter, pullQueryConcurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, pullQueryRouting, pushQueryRouting, localCommands @@ -905,6 +916,7 @@ static KsqlRestApplication buildApplication( pullQueryRateLimiter, pullQueryConcurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, pullQueryRouting, pushQueryRouting, localCommands diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index 92ff66118c13..c81e0ed89401 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -92,6 +92,7 @@ public class KsqlServerEndpoints implements Endpoints { private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; + private final SlidingWindowRateLimiter scalablePushBandRateLimiter; private final HARouting routing; private final PushRouting pushRouting; private final Optional localCommands; @@ -118,6 +119,7 @@ public KsqlServerEndpoints( final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -144,6 +146,7 @@ public KsqlServerEndpoints( this.rateLimiter = Objects.requireNonNull(rateLimiter); this.pullConcurrencyLimiter = pullConcurrencyLimiter; this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter); + this.scalablePushBandRateLimiter = Objects.requireNonNull(scalablePushBandRateLimiter); this.routing = Objects.requireNonNull(routing); this.pushRouting = pushRouting; this.localCommands = Objects.requireNonNull(localCommands); @@ -164,8 +167,8 @@ public CompletableFuture createQueryPublisher(final String sql, try { return new QueryEndpoint( ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics, - rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, routing, pushRouting, - localCommands) + rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, scalablePushBandRateLimiter, + routing, pushRouting, localCommands) .createQueryPublisher( sql, properties, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java index cc1f43e9063c..3c2356143dd2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import io.confluent.ksql.GenericRow; import io.confluent.ksql.analyzer.ImmutableAnalysis; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.physical.scalablepush.PushRouting; @@ -59,6 +60,7 @@ final class PushQueryPublisher implements Flow.Publisher private final PushRouting pushRouting; private final boolean isScalablePush; private final Context context; + private final SlidingWindowRateLimiter scalablePushBandRateLimiter; private PushQueryPublisher( final KsqlEngine ksqlEngine, @@ -75,6 +77,7 @@ private PushQueryPublisher( this.pushRouting = null; this.isScalablePush = false; this.context = null; + this.scalablePushBandRateLimiter = null; } private PushQueryPublisher( @@ -84,7 +87,8 @@ private PushQueryPublisher( final ConfiguredStatement query, final ImmutableAnalysis analysis, final PushRouting pushRouting, - final Context context + final Context context, + final SlidingWindowRateLimiter scalablePushBandRateLimiter ) { this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine"); this.serviceContext = requireNonNull(serviceContext, "serviceContext"); @@ -94,6 +98,8 @@ private PushQueryPublisher( this.pushRouting = requireNonNull(pushRouting, "pushRouting"); this.isScalablePush = true; this.context = requireNonNull(context, "context"); + this.scalablePushBandRateLimiter = + requireNonNull(scalablePushBandRateLimiter, "scalablePushBandRateLimiter"); } public static PushQueryPublisher createPublisher( @@ -113,7 +119,8 @@ public static PushQueryPublisher createScalablePublisher( final ConfiguredStatement query, final ImmutableAnalysis analysis, final PushRouting pushRouting, - final Context context + final Context context, + final SlidingWindowRateLimiter scalablePushBandRateLimiter ) { return new PushQueryPublisher( ksqlEngine, @@ -122,7 +129,8 @@ public static PushQueryPublisher createScalablePublisher( query, analysis, pushRouting, - context + context, + scalablePushBandRateLimiter ); } @@ -138,6 +146,8 @@ public synchronized void subscribe(final Flow.Subscriber query.getSessionConfig().getConfig(false), query.getSessionConfig().getOverrides()); + scalablePushBandRateLimiter.allow(); + final ImmutableAnalysis analysis = ksqlEngine.analyzeQueryWithNoOutputTopic(query.getStatement(), query.getStatementText()); final ScalablePushQueryMetadata pushQueryMetadata = ksqlEngine diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 874c2e84c6b3..3f9f22efb572 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -110,6 +110,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private final RateLimiter rateLimiter; private final ConcurrencyLimiter concurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; + private final SlidingWindowRateLimiter scalablePushBandRateLimiter; private final HARouting routing; private final PushRouting pushRouting; private final Optional localCommands; @@ -133,6 +134,7 @@ public StreamedQueryResource( final RateLimiter rateLimiter, final ConcurrencyLimiter concurrencyLimiter, final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -153,6 +155,7 @@ public StreamedQueryResource( rateLimiter, concurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, routing, pushRouting, localCommands @@ -178,6 +181,7 @@ public StreamedQueryResource( final RateLimiter rateLimiter, final ConcurrencyLimiter concurrencyLimiter, final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, final HARouting routing, final PushRouting pushRouting, final Optional localCommands @@ -202,6 +206,8 @@ public StreamedQueryResource( this.rateLimiter = Objects.requireNonNull(rateLimiter, "rateLimiter"); this.concurrencyLimiter = Objects.requireNonNull(concurrencyLimiter, "concurrencyLimiter"); this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter, "pullBandRateLimiter"); + this.scalablePushBandRateLimiter = + Objects.requireNonNull(scalablePushBandRateLimiter, "scalablePushBandRateLimiter"); this.routing = Objects.requireNonNull(routing, "routing"); this.pushRouting = pushRouting; this.localCommands = Objects.requireNonNull(localCommands, "localCommands"); @@ -235,7 +241,8 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, mediaType, metricsCallbackHolder, context, pullBandRateLimiter); + isInternalRequest, mediaType, metricsCallbackHolder, context, pullBandRateLimiter, + scalablePushBandRateLimiter); } private void throwIfNotConfigured() { @@ -267,7 +274,8 @@ private EndpointResponse handleStatement( final KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder, final Context context, - final SlidingWindowRateLimiter pullBandRateLimiter + final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter ) { try { authorizationValidator.ifPresent(validator -> @@ -291,7 +299,8 @@ private EndpointResponse handleStatement( metricsCallbackHolder, configProperties, context, - pullBandRateLimiter + pullBandRateLimiter, + scalablePushBandRateLimiter ); } else if (statement.getStatement() instanceof PrintTopic) { return handlePrintTopic( @@ -322,7 +331,8 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, final MetricsCallbackHolder metricsCallbackHolder, final Map configProperties, final Context context, - final SlidingWindowRateLimiter pullBandRateLimiter) { + final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter) { if (statement.getStatement().isPullQuery()) { final ImmutableAnalysis analysis = ksqlEngine @@ -424,6 +434,10 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, .analyzeQueryWithNoOutputTopic(statement.getStatement(), statement.getStatementText()); QueryLogger.info("Scalable push query created", statement.getStatementText()); + + metricsCallbackHolder.setCallback((statusCode, requestBytes, responseBytes, startTimeNanos) -> + scalablePushBandRateLimiter.add(responseBytes)); + return handleScalablePushQuery( analysis, securityContext.getServiceContext(), @@ -431,7 +445,8 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, configProperties, request.getRequestProperties(), connectionClosedFuture, - context + context, + scalablePushBandRateLimiter ); } else { // log validated statements for query anonymization @@ -521,7 +536,8 @@ private EndpointResponse handleScalablePushQuery( final Map configOverrides, final Map requestProperties, final CompletableFuture connectionClosedFuture, - final Context context + final Context context, + final SlidingWindowRateLimiter scalablePushBandRateLimiter ) { final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); @@ -532,6 +548,8 @@ private EndpointResponse handleScalablePushQuery( final PushQueryConfigPlannerOptions plannerOptions = new PushQueryConfigPlannerOptions(ksqlConfig, configOverrides); + scalablePushBandRateLimiter.allow(); + final ScalablePushQueryMetadata query = ksqlEngine .executeScalablePushQuery(analysis, serviceContext, configured, pushRouting, routingOptions, plannerOptions, context); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 36606dc29956..3fcb84ec912c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -90,6 +90,7 @@ public class WSQueryEndpoint { private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; + private final SlidingWindowRateLimiter scalablePushBandRateLimiter; private final HARouting routing; private final Optional localCommands; private final PushRouting pushRouting; @@ -112,6 +113,7 @@ public WSQueryEndpoint( final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, final HARouting routing, final Optional localCommands, final PushRouting pushRouting @@ -136,6 +138,7 @@ public WSQueryEndpoint( rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, routing, localCommands, pushRouting @@ -164,6 +167,7 @@ public WSQueryEndpoint( final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, final SlidingWindowRateLimiter pullBandRateLimiter, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, final HARouting routing, final Optional localCommands, final PushRouting pushRouting @@ -195,6 +199,8 @@ public WSQueryEndpoint( this.pullConcurrencyLimiter = Objects.requireNonNull(pullConcurrencyLimiter, "pullConcurrencyLimiter"); this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter, "pullBandRateLimiter"); + this.scalablePushBandRateLimiter = + Objects.requireNonNull(scalablePushBandRateLimiter, "scalablePushBandRateLimiter"); this.routing = Objects.requireNonNull(routing, "routing"); this.localCommands = Objects.requireNonNull(localCommands, "localCommands"); this.pushRouting = Objects.requireNonNull(pushRouting, "pushRouting"); @@ -382,7 +388,8 @@ private void handleQuery(final RequestContext info, final Query query, analysis, pushRouting, context, - streamSubscriber + streamSubscriber, + scalablePushBandRateLimiter ); } else { pushQueryPublisher.start( @@ -443,7 +450,8 @@ private static void startScalablePushQueryPublisher( final ImmutableAnalysis analysis, final PushRouting pushRouting, final Context context, - final WebSocketSubscriber streamSubscriber + final WebSocketSubscriber streamSubscriber, + final SlidingWindowRateLimiter scalablePushBandRateLimiter ) { PushQueryPublisher.createScalablePublisher( ksqlEngine, @@ -452,7 +460,8 @@ private static void startScalablePushQueryPublisher( query, analysis, pushRouting, - context + context, + scalablePushBandRateLimiter ).subscribe(streamSubscriber); } @@ -522,7 +531,8 @@ void start( ImmutableAnalysis analysis, PushRouting pushRouting, Context context, - WebSocketSubscriber subscriber); + WebSocketSubscriber subscriber, + SlidingWindowRateLimiter scalablePushBandRateLimiter); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java index 52377b3e444c..ec5be3fb69a8 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java @@ -15,24 +15,30 @@ package io.confluent.ksql.api.integration; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; +import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER2; +import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertEquals; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.api.utils.QueryResponse; import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; -import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.serde.FormatFactory; -import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; -import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER2; import io.confluent.ksql.test.util.secure.ClientTrustStore; import io.confluent.ksql.test.util.secure.Credentials; import io.confluent.ksql.test.util.secure.SecureKafkaHelper; -import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; -import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PULL_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG; -import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PageViewDataProvider; import io.confluent.ksql.util.VertxCompletableFuture; @@ -47,13 +53,8 @@ import java.util.concurrent.atomic.AtomicReference; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.StreamsConfig; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; import org.junit.After; import org.junit.AfterClass; -import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -95,7 +96,7 @@ public class PullBandwidthThrottleIntegrationTest { .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) .around(TEST_HARNESS) .around(REST_APP); - private static final String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for pull queries."; + private static final String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for queries."; @BeforeClass public static void setUpClass() { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ScalablePushBandwidthThrottleIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ScalablePushBandwidthThrottleIntegrationTest.java new file mode 100644 index 000000000000..267f7c0e490c --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ScalablePushBandwidthThrottleIntegrationTest.java @@ -0,0 +1,215 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.integration; + +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; +import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER2; +import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PUSH_V2_ENABLED; +import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.rest.client.KsqlRestClient; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.client.StreamPublisher; +import io.confluent.ksql.rest.entity.StreamedRow; +import io.confluent.ksql.rest.integration.QueryStreamSubscriber; +import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; +import io.confluent.ksql.test.util.secure.ClientTrustStore; +import io.confluent.ksql.test.util.secure.Credentials; +import io.confluent.ksql.test.util.secure.SecureKafkaHelper; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.PageViewDataProvider; +import io.confluent.ksql.util.PersistentQueryMetadata; +import io.vertx.core.Vertx; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; + +@Category({IntegrationTest.class}) +public class ScalablePushBandwidthThrottleIntegrationTest { + private static final String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for queries."; + private static final PageViewDataProvider TEST_DATA_PROVIDER = new PageViewDataProvider(); + private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName(); + private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName(); + + private static final String AGG_TABLE = "AGG_TABLE"; + private static final Credentials NORMAL_USER = VALID_USER2; + + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.builder() + .withKafkaCluster( + EmbeddedSingleNodeKafkaCluster.newBuilder() + .withoutPlainListeners() + .withSaslSslListeners() + ).build(); + + private static final TestKsqlRestApp REST_APP = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty("security.protocol", "SASL_SSL") + .withProperty("sasl.mechanism", "PLAIN") + .withProperty("sasl.jaas.config", SecureKafkaHelper.buildJaasConfig(NORMAL_USER)) + .withProperties(ClientTrustStore.trustStoreProps()) + .withProperty(KSQL_QUERY_PUSH_V2_ENABLED , true) + .withProperty(KSQL_QUERY_PUSH_V2_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG , 1) + .withProperty("auto.offset.reset", "latest") + .withProperty(KSQL_QUERY_PUSH_V2_REGISTRY_INSTALLED, true) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON") + .build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS) + .around(REST_APP); + + private Vertx vertx; + private KsqlRestClient restClient; + private StreamPublisher publisher; + private QueryStreamSubscriber subscriber; + + @BeforeClass + public static void setUpClass() { + TEST_HARNESS.ensureTopics(TEST_TOPIC); + + TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, FormatFactory.JSON, FormatFactory.JSON); + + RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); + + makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " + + "SELECT PAGEID, LATEST_BY_OFFSET(USERID) AS USERID FROM " + TEST_STREAM + " GROUP BY PAGEID;" + ); + } + + @AfterClass + public static void classTearDown() { + REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); + } + + @Before + public void setUp() { + vertx = Vertx.vertx(); + restClient = REST_APP.buildKsqlClient(); + } + + @After + public void tearDown() { + if (vertx != null) { + vertx.close(); + } + REST_APP.getServiceContext().close(); + } + + @SuppressFBWarnings({"DLS_DEAD_LOCAL_STORE"}) + @Test + public void scalablePushBandwidthThrottleTestHTTP1() { + assertAllPersistentQueriesRunning(); + String veryLong = createDataSize(100000); + final CompletableFuture header = new CompletableFuture<>(); + final CompletableFuture> complete = new CompletableFuture<>(); + String sql = "SELECT CONCAT(\'"+ veryLong + "\') as placeholder from " + AGG_TABLE + " EMIT CHANGES LIMIT 1;"; + + // scalable push query should succeed 10 times + for (int i = 0; i < 11; i += 1) { + makeRequestAndSetupSubscriber(sql, + ImmutableMap.of("auto.offset.reset", "latest"), + header, complete ); + TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, FormatFactory.JSON, FormatFactory.JSON); + System.out.println(i); + } + + // scalable push query should fail on 11th try since it exceeds 1MB bandwidth limit + try { + makeQueryRequest(sql, + ImmutableMap.of("auto.offset.reset", "latest")); + throw new AssertionError("New scalable push query should have exceeded bandwidth limit "); + } catch (KsqlException e) { + assertThat(e.getMessage(), is(RATE_LIMIT_MESSAGE)); + } + } + + + + private static String createDataSize(int msgSize) { + StringBuilder sb = new StringBuilder(msgSize); + for (int i=0; i properties, + final CompletableFuture header, + final CompletableFuture> future + ) { + publisher = makeQueryRequest(sql, properties); + subscriber = new QueryStreamSubscriber(publisher.getContext(), future, header); + publisher.subscribe(subscriber); + } + + StreamPublisher makeQueryRequest( + final String sql, + final Map properties + ) { + final RestResponse> res = + restClient.makeQueryRequestStreamed(sql, null, properties); + + if (res.isErroneous()) { + throw new KsqlException(res.getErrorMessage().getMessage()); + } + return res.getResponse(); + } + + private void assertAllPersistentQueriesRunning() { + assertThatEventually(() -> { + for (final PersistentQueryMetadata metadata : REST_APP.getEngine().getPersistentQueries()) { + if (metadata.getState() != State.RUNNING) { + return false; + } + } + return true; + }, is(true)); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java index 843d41804392..a0153357eea5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/LoggingHandlerTest.java @@ -9,7 +9,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; @@ -60,12 +59,9 @@ public class LoggingHandlerTest { @Before public void setUp() { - when(server.getConfig()).thenReturn(ksqlRestConfig); - when(routingContext.response()).thenReturn(response); when(routingContext.request()).thenReturn(request); when(request.response()).thenReturn(response); when(request.remoteAddress()).thenReturn(socketAddress); - when(ksqlRestConfig.getList(any())).thenReturn(ImmutableList.of("401")); when(loggingRateLimiter.shouldLog(logger, "/query", 200)).thenReturn(true); when(loggingRateLimiter.shouldLog(logger, "/query", 405)).thenReturn(true); when(clock.millis()).thenReturn(1699813434333L); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java index d89a26c2ccdf..4c9df996dd61 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java @@ -13,7 +13,7 @@ @RunWith(MockitoJUnitRunner.class) public class SlidingWindowRateLimiterTest { private SlidingWindowRateLimiter limiter; - private static String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for pull queries."; + private static String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for queries."; private static String TEST_SHOULD_NOT_FAIL = "This test should not throw an exception"; @Before diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 79bd1fa990c6..47e6c7f124be 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -18,8 +18,6 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; @@ -35,13 +33,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.RateLimiter; -import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogServerUtils; import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.physical.pull.HARouting; @@ -68,7 +66,6 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.version.metrics.VersionCheckerAgent; import io.vertx.core.Vertx; - import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -150,6 +147,8 @@ public class KsqlRestApplicationTest { @Mock private SlidingWindowRateLimiter pullBandRateLimiter; @Mock + private SlidingWindowRateLimiter scalablePushBandRateLimiter; + @Mock private HARouting haRouting; @Mock private PushRouting pushRouting; @@ -500,6 +499,7 @@ private void givenAppWithRestConfig(final Map restConfigMap) { rateLimiter, concurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, haRouting, pushRouting, Optional.empty() diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java index fdf289347f3d..7e4f8170737e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java @@ -15,7 +15,6 @@ package io.confluent.ksql.rest.server.resources; -import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -26,6 +25,7 @@ import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.physical.pull.HARouting; import io.confluent.ksql.physical.scalablepush.PushRouting; import io.confluent.ksql.properties.DenyListPropertyValidator; @@ -34,8 +34,8 @@ import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; -import io.confluent.ksql.rest.util.ConcurrencyLimiter; import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint; +import io.confluent.ksql.rest.util.ConcurrencyLimiter; import io.confluent.ksql.security.KsqlSecurityContext; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.version.metrics.ActivenessRegistrar; @@ -88,6 +88,7 @@ public void setUp() { mock(RateLimiter.class), mock(ConcurrencyLimiter.class), mock(SlidingWindowRateLimiter.class), + mock(SlidingWindowRateLimiter.class), mock(HARouting.class), Optional.empty(), mock(PushRouting.class) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 1ca4eb962b7f..ec4b5b2f86b9 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -52,8 +52,8 @@ import io.confluent.ksql.analyzer.Analysis.AliasedDataSource; import io.confluent.ksql.analyzer.ImmutableAnalysis; import io.confluent.ksql.api.server.MetricsCallbackHolder; -import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.api.server.StreamingOutput; +import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; @@ -209,6 +209,8 @@ public class StreamedQueryResourceTest { @Mock private SlidingWindowRateLimiter pullBandRateLimiter; @Mock + private SlidingWindowRateLimiter scalablePushBandRateLimiter; + @Mock private KsqlConfig ksqlConfig; @Mock private KsqlRestConfig ksqlRestConfig; @@ -280,6 +282,7 @@ public void setup() { rateLimiter, concurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, haRouting, pushRouting, Optional.empty() @@ -383,6 +386,7 @@ public void shouldRateLimit() { pullQueryRateLimiter, concurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, haRouting, pushRouting, Optional.empty() @@ -495,6 +499,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { rateLimiter, concurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, haRouting, pushRouting, Optional.empty() @@ -679,6 +684,7 @@ public void shouldThrowOnDenyListedStreamProperty() { rateLimiter, concurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, haRouting, pushRouting, Optional.empty()