From 561af5335a6178fd12fdd78f5728b9dff0c7890f Mon Sep 17 00:00:00 2001 From: nateab Date: Wed, 29 Sep 2021 17:17:52 -0700 Subject: [PATCH] feat: add detailed scalable push query metrics with type breakdown (#8178) * feat: add scalable push query metrics * feat: add skeleton for spq metrics, ws endpoint * feat: add spq metrics on http 1 and 2 endpoints * fix: typo * refactor: rename latency metric connection duration * fix: typo * fix: rename latency in test * fix: address feedback * fix: remove old tests * test: remove legacy metrics from test * fix: merge conflicts * test: fix test * chore: remove executor from name * test email change' * chore: fix auto import style * chore: fix auto import style * chore: fix imports * Trigger Build * chore: refactor metrics callback * fix: close metrics * style: line length Co-authored-by: natea --- .../io/confluent/ksql/util/KsqlConstants.java | 24 +- .../confluent/ksql/KsqlExecutionContext.java | 5 +- .../confluent/ksql/engine/EngineExecutor.java | 76 +- .../io/confluent/ksql/engine/KsqlEngine.java | 7 +- .../engine/SandboxedExecutionContext.java | 7 +- .../internal/PullQueryExecutorMetrics.java | 26 +- .../internal/ScalablePushQueryMetrics.java | 699 ++++++++++++++++++ .../ksql/physical/pull/PullPhysicalPlan.java | 31 +- .../pull/PullPhysicalPlanBuilder.java | 10 +- .../ksql/physical/pull/PullQueryResult.java | 12 +- .../scalablepush/PushPhysicalPlan.java | 14 +- .../scalablepush/PushPhysicalPlanBuilder.java | 9 +- .../physical/scalablepush/PushRouting.java | 35 +- .../scalablepush/PushRoutingOptions.java | 13 + .../operators/PeekStreamOperator.java | 8 + .../operators/PushDataSourceOperator.java | 4 + .../ksql/util/ScalablePushQueryMetadata.java | 51 +- .../scalablepush/PushPhysicalPlanTest.java | 13 +- .../scalablepush/PushRoutingTest.java | 26 +- .../util/ScalablePushQueryMetadataTest.java | 10 +- .../ksql/api/impl/QueryEndpoint.java | 96 ++- .../ksql/rest/server/KsqlRestApplication.java | 34 +- .../ksql/rest/server/KsqlServerEndpoints.java | 8 +- .../streaming/PullQueryPublisher.java | 6 +- .../PushQueryConfigRoutingOptions.java | 8 + .../streaming/PushQueryPublisher.java | 49 +- .../streaming/StreamedQueryResource.java | 34 +- .../resources/streaming/WSQueryEndpoint.java | 7 + ...MetricsUtil.java => QueryMetricsUtil.java} | 62 +- .../rest/server/KsqlRestApplicationTest.java | 1 + .../execution/PullQueryMetricsTest.java | 70 +- .../ScalablePushQueryMetricsTest.java | 291 ++++++++ .../server/resources/WSQueryEndpointTest.java | 1 + .../streaming/StreamedQueryResourceTest.java | 5 + 34 files changed, 1569 insertions(+), 183 deletions(-) create mode 100644 ksqldb-engine/src/main/java/io/confluent/ksql/internal/ScalablePushQueryMetrics.java rename ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/{PullQueryMetricsUtil.java => QueryMetricsUtil.java} (69%) create mode 100644 ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ScalablePushQueryMetricsTest.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java index 936b7bfdd71b..29f0b6fe6257 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java @@ -54,7 +54,9 @@ public enum KsqlQueryType { } public enum PersistentQueryType { - CREATE_SOURCE, CREATE_AS, INSERT + CREATE_SOURCE, + CREATE_AS, + INSERT } public enum KsqlQueryStatus { @@ -80,4 +82,24 @@ public static String getSRSubject(final String topicName, final boolean isKey) { public static final String TIME_PATTERN = "HH:mm:ss.SSS"; public static final String DATE_PATTERN = "yyyy-MM-dd"; public static final String DATE_TIME_PATTERN = DATE_PATTERN + "'T'" + TIME_PATTERN; + + /** + * The types we consider for metrics purposes. These should only be added to. You can deprecate + * a field, but don't delete it or change its meaning + */ + public enum QuerySourceType { + NON_WINDOWED, + WINDOWED, + NON_WINDOWED_STREAM, + WINDOWED_STREAM + } + + /** + * The types we consider for metrics purposes. These should only be added to. You can deprecate + * a field, but don't delete it or change its meaning + */ + public enum RoutingNodeType { + SOURCE_NODE, + REMOTE_NODE + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java index 22bd35a996e8..3a4831d8171f 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java @@ -20,6 +20,7 @@ import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.execution.streams.RoutingOptions; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.name.SourceName; @@ -184,6 +185,7 @@ PullQueryResult executeTablePullQuery( * @param pushRouting The push routing object * @param pushRoutingOptions The options for routing * @param context The Vertx context of the request + * @param scalablePushQueryMetrics JMX metrics * @return A ScalablePushQueryMetadata object */ ScalablePushQueryMetadata executeScalablePushQuery( @@ -193,7 +195,8 @@ ScalablePushQueryMetadata executeScalablePushQuery( PushRouting pushRouting, PushRoutingOptions pushRoutingOptions, QueryPlannerOptions queryPlannerOptions, - Context context + Context context, + Optional scalablePushQueryMetrics ); /** diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index 895a0f5bac63..276d9964bc5d 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -34,6 +34,7 @@ import io.confluent.ksql.execution.streams.RoutingOptions; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.MetaStoreImpl; import io.confluent.ksql.metastore.MutableMetaStore; @@ -60,7 +61,6 @@ import io.confluent.ksql.physical.PhysicalPlan; import io.confluent.ksql.physical.pull.HARouting; import io.confluent.ksql.physical.pull.PullPhysicalPlan; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType; import io.confluent.ksql.physical.pull.PullPhysicalPlanBuilder; import io.confluent.ksql.physical.pull.PullQueryQueuePopulator; import io.confluent.ksql.physical.pull.PullQueryResult; @@ -91,6 +91,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -231,6 +232,9 @@ PullQueryResult executeTablePullQuery( throw new IllegalArgumentException("Executor can only handle pull queries"); } final SessionConfig sessionConfig = statement.getSessionConfig(); + + // If we ever change how many hops a request can do, we'll need to update this for correct + // metrics. final RoutingNodeType routingNodeType = routingOptions.getIsSkipForwardRequest() ? RoutingNodeType.REMOTE_NODE : RoutingNodeType.SOURCE_NODE; @@ -255,9 +259,6 @@ PullQueryResult executeTablePullQuery( ); final PullPhysicalPlan physicalPlan = plan; - // If we ever change how many hops a request can do, we'll need to update this for correct - // metrics. - final PullQueryQueue pullQueryQueue = new PullQueryQueue(); final PullQueryQueuePopulator populator = () -> routing.handlePullQuery( serviceContext, @@ -325,19 +326,30 @@ ScalablePushQueryMetadata executeScalablePushQuery( final PushRouting pushRouting, final PushRoutingOptions pushRoutingOptions, final QueryPlannerOptions queryPlannerOptions, - final Context context + final Context context, + final Optional scalablePushQueryMetrics ) { final SessionConfig sessionConfig = statement.getSessionConfig(); + + // If we ever change how many hops a request can do, we'll need to update this for correct + // metrics. + final RoutingNodeType routingNodeType = pushRoutingOptions.getHasBeenForwarded() + ? RoutingNodeType.REMOTE_NODE : RoutingNodeType.SOURCE_NODE; + + PushPhysicalPlan plan = null; + try { final KsqlConfig ksqlConfig = sessionConfig.getConfig(false); final LogicalPlanNode logicalPlan = buildAndValidateLogicalPlan( statement, analysis, ksqlConfig, queryPlannerOptions, true); - final PushPhysicalPlan physicalPlan = buildScalablePushPhysicalPlan( + plan = buildScalablePushPhysicalPlan( logicalPlan, analysis, context, pushRoutingOptions ); + final PushPhysicalPlan physicalPlan = plan; + final TransientQueryQueue transientQueryQueue = new TransientQueryQueue(analysis.getLimitClause()); final PushQueryMetadata.ResultType resultType = @@ -348,24 +360,66 @@ ScalablePushQueryMetadata executeScalablePushQuery( final PushQueryQueuePopulator populator = () -> pushRouting.handlePushQuery(serviceContext, physicalPlan, statement, pushRoutingOptions, - physicalPlan.getOutputSchema(), transientQueryQueue); + physicalPlan.getOutputSchema(), transientQueryQueue, scalablePushQueryMetrics); final PushQueryPreparer preparer = () -> pushRouting.preparePushQuery(physicalPlan, statement, pushRoutingOptions); final ScalablePushQueryMetadata metadata = new ScalablePushQueryMetadata( physicalPlan.getOutputSchema(), physicalPlan.getQueryId(), transientQueryQueue, + scalablePushQueryMetrics, resultType, populator, - preparer + preparer, + physicalPlan.getSourceType(), + routingNodeType, + physicalPlan::getRowsReadFromDataSource ); return metadata; } catch (final Exception e) { + if (plan == null) { + scalablePushQueryMetrics.ifPresent(m -> m.recordErrorRateForNoResult(1)); + } else { + final PushPhysicalPlan pushPhysicalPlan = plan; + scalablePushQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1, + pushPhysicalPlan.getSourceType(), + routingNodeType + )); + } + + final String stmtLower = statement.getStatementText().toLowerCase(Locale.ROOT); + final String messageLower = e.getMessage().toLowerCase(Locale.ROOT); + final String stackLower = Throwables.getStackTraceAsString(e).toLowerCase(Locale.ROOT); + + // do not include the statement text in the default logs as it may contain sensitive + // information - the exception which is returned to the user below will contain + // the contents of the query + if (messageLower.contains(stmtLower) || stackLower.contains(stmtLower)) { + final StackTraceElement loc = Iterables + .getLast(Throwables.getCausalChain(e)) + .getStackTrace()[0]; + LOG.error("Failure to execute push query V2 {} {}, not logging the error message since it " + + "contains the query string, which may contain sensitive information." + + " If you see this LOG message, please submit a GitHub ticket and" + + " we will scrub the statement text from the error at {}", + pushRoutingOptions.debugString(), + queryPlannerOptions.debugString(), + loc); + } else { + LOG.error("Failure to execute push query V2. {} {}", + pushRoutingOptions.debugString(), + queryPlannerOptions.debugString(), + e); + } + LOG.debug("Failed push query V2 text {}, {}", statement.getStatementText(), e); + throw new KsqlStatementException( - e.getMessage(), - statement.getStatementText(), - e + e.getMessage() == null + ? "Server Error" + Arrays.toString(e.getStackTrace()) + : e.getMessage(), + statement.getStatementText(), + e ); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 1f284657fce9..8c0eeb916110 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -32,6 +32,7 @@ import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.internal.KsqlEngineMetrics; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.query.QueryLogger; import io.confluent.ksql.metastore.MetaStore; @@ -455,7 +456,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery( final PushRouting pushRouting, final PushRoutingOptions pushRoutingOptions, final QueryPlannerOptions queryPlannerOptions, - final Context context + final Context context, + final Optional scalablePushQueryMetrics ) { final ScalablePushQueryMetadata query = EngineExecutor .create(primaryContext, serviceContext, statement.getSessionConfig()) @@ -465,7 +467,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery( pushRouting, pushRoutingOptions, queryPlannerOptions, - context); + context, + scalablePushQueryMetrics); return query; } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index 8827fe46ef33..f30d58534043 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -20,6 +20,7 @@ import io.confluent.ksql.analyzer.ImmutableAnalysis; import io.confluent.ksql.execution.streams.RoutingOptions; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.logging.processing.NoopProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.MetaStore; @@ -207,7 +208,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery( final PushRouting pushRouting, final PushRoutingOptions pushRoutingOptions, final QueryPlannerOptions queryPlannerOptions, - final Context context + final Context context, + final Optional scalablePushQueryMetrics ) { return EngineExecutor.create( engineContext, @@ -219,7 +221,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery( pushRouting, pushRoutingOptions, queryPlannerOptions, - context + context, + scalablePushQueryMetrics ); } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/PullQueryExecutorMetrics.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/PullQueryExecutorMetrics.java index 177e455670ba..8994405bee57 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/PullQueryExecutorMetrics.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/PullQueryExecutorMetrics.java @@ -20,9 +20,9 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType; import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; import io.confluent.ksql.util.ReservedInternalTopics; import java.io.Closeable; import java.util.ArrayList; @@ -128,7 +128,7 @@ public void recordRemoteRequests(final double value) { public void recordLatency( final long startTimeNanos, - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType ) { @@ -156,7 +156,7 @@ private void innerRecordLatency(final long startTimeNanos, final MetricsKey key) public void recordErrorRate( final double value, - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType ) { @@ -185,7 +185,7 @@ public void recordRequestSize(final double value) { public void recordResponseSize( final double value, - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType ) { @@ -222,7 +222,7 @@ public void recordStatusCode(final int statusCode) { public void recordRowsReturned( final double value, - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType ) { @@ -245,7 +245,7 @@ public void recordZeroRowsReturnedForError() { public void recordRowsProcessed( final double value, - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType ) { @@ -519,7 +519,7 @@ private void addRequestMetricsToSensor( sensor, metricNamePrefix + "-total", servicePrefix + PULL_QUERY_METRIC_GROUP, - "Total number of pull query request" + descriptionSuffix, + "Total number of pull query requests" + descriptionSuffix, metricsTags, new CumulativeCount() ); @@ -654,7 +654,7 @@ private void addSensor( final String groupName, final String description, final Map metricsTags, - final MeasurableStat measureableStat + final MeasurableStat measurableStat ) { sensor.add( metrics.metricName( @@ -663,7 +663,7 @@ private void addSensor( description, metricsTags ), - measureableStat + measurableStat ); } @@ -671,7 +671,7 @@ private Map configureSensorMap( final String sensorBaseName, final MetricsAdder metricsAdder) { final ImmutableMap.Builder builder = ImmutableMap.builder(); - for (final PullSourceType sourceType : PullSourceType.values()) { + for (final QuerySourceType sourceType : QuerySourceType.values()) { for (final PullPhysicalPlanType planType : PullPhysicalPlanType.values()) { for (final RoutingNodeType routingNodeType : RoutingNodeType.values()) { addSensorToMap( @@ -723,7 +723,7 @@ private interface MetricsAdder { // Detailed metrics are broken down by multiple parameters represented by the following key. private static class MetricsKey { - private final PullSourceType sourceType; + private final QuerySourceType sourceType; private final PullPhysicalPlanType planType; private final RoutingNodeType routingNodeType; @@ -738,7 +738,7 @@ private static class MetricsKey { } MetricsKey( - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType ) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/ScalablePushQueryMetrics.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/ScalablePushQueryMetrics.java new file mode 100644 index 000000000000..81f0783cba17 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/ScalablePushQueryMetrics.java @@ -0,0 +1,699 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.internal; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; +import io.confluent.ksql.util.ReservedInternalTopics; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentile; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.Time; + +@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") +public class ScalablePushQueryMetrics implements Closeable { + + private static final String SCALABLE_PUSH_QUERY_METRIC_GROUP = "scalable-push-query"; + private static final String SCALABLE_PUSH_REQUESTS = "scalable-push-query-requests"; + private static final long MAX_LATENCY_BUCKET_VALUE_MICROS = TimeUnit.SECONDS.toMicros(10); + private static final int NUM_LATENCY_BUCKETS = 1000; + + private final List sensors; + private final Sensor localRequestsSensor; + private final Sensor remoteRequestsSensor; + private final Sensor connectionDurationSensor; + private final Map connectionDurationSensorMap; + private final Sensor requestRateSensor; + private final Sensor errorRateSensor; + private final Map errorRateSensorMap; + private final Sensor requestSizeSensor; + private final Sensor responseSizeSensor; + private final Map responseSizeSensorMap; + private final Sensor responseCode2XX; + private final Sensor responseCode3XX; + private final Sensor responseCode4XX; + private final Sensor responseCode5XX; + private final Map rowsReturnedSensorMap; + private final Map rowsProcessedSensorMap; + private final Metrics metrics; + private final Map customMetricsTags; + private final String ksqlServicePrefix; + private final Time time; + + public ScalablePushQueryMetrics( + final String ksqlServiceId, + final Map customMetricsTags, + final Time time + ) { + + this.ksqlServicePrefix = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX; + final Map metricsTags = new HashMap<>(customMetricsTags); + metricsTags.put(KsqlConstants.KSQL_SERVICE_ID_METRICS_TAG, ksqlServiceId); + this.customMetricsTags = ImmutableMap.copyOf(metricsTags); + + this.time = Objects.requireNonNull(time, "time"); + this.metrics = MetricCollectors.getMetrics(); + this.sensors = new ArrayList<>(); + this.localRequestsSensor = configureLocalRequestsSensor(); + this.remoteRequestsSensor = configureRemoteRequestsSensor(); + this.connectionDurationSensor = configureConnectionDurationSensor(); + this.connectionDurationSensorMap = configureConnectionDurationSensorMap(); + this.requestRateSensor = configureRateSensor(); + this.errorRateSensor = configureErrorRateSensor(); + this.errorRateSensorMap = configureErrorSensorMap(); + this.requestSizeSensor = configureRequestSizeSensor(); + this.responseSizeSensor = configureResponseSizeSensor(); + this.responseSizeSensorMap = configureResponseSizeSensorMap(); + this.responseCode2XX = configureStatusCodeSensor("2XX"); + this.responseCode3XX = configureStatusCodeSensor("3XX"); + this.responseCode4XX = configureStatusCodeSensor("4XX"); + this.responseCode5XX = configureStatusCodeSensor("5XX"); + this.rowsReturnedSensorMap = configureRowsReturnedSensorMap(); + this.rowsProcessedSensorMap = configureRowsProcessedSensorMap(); + } + + @Override + public void close() { + sensors.forEach(sensor -> metrics.removeSensor(sensor.name())); + } + + public void recordLocalRequests(final double value) { + this.localRequestsSensor.record(value); + } + + public void recordRemoteRequests(final double value) { + this.remoteRequestsSensor.record(value); + } + + public void recordConnectionDuration( + final long startTimeNanos, + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType + ) { + final MetricsKey key = new MetricsKey(sourceType, routingNodeType); + innerRecordConnectionDuration(startTimeNanos, key); + } + + public void recordConnectionDurationForError(final long startTimeNanos) { + final MetricsKey key = new MetricsKey(); + innerRecordConnectionDuration(startTimeNanos, key); + } + + private void innerRecordConnectionDuration(final long startTimeNanos, final MetricsKey key) { + // Record connection duration at microsecond scale + final long nowNanos = time.nanoseconds(); + final double connectionDuration = TimeUnit.NANOSECONDS.toMicros(nowNanos - startTimeNanos); + this.connectionDurationSensor.record(connectionDuration); + this.requestRateSensor.record(1); + if (connectionDurationSensorMap.containsKey(key)) { + connectionDurationSensorMap.get(key).record(connectionDuration); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordErrorRate( + final double value, + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType + ) { + this.errorRateSensor.record(value); + final MetricsKey key = new MetricsKey(sourceType, routingNodeType); + if (errorRateSensorMap.containsKey(key)) { + errorRateSensorMap.get(key).record(value); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordErrorRateForNoResult(final double value) { + this.errorRateSensor.record(value); + final MetricsKey key = new MetricsKey(); + if (errorRateSensorMap.containsKey(key)) { + errorRateSensorMap.get(key).record(value); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordRequestSize(final double value) { + this.requestSizeSensor.record(value); + } + + public void recordResponseSize( + final double value, + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType + ) { + this.responseSizeSensor.record(value); + final MetricsKey key = new MetricsKey(sourceType, routingNodeType); + if (responseSizeSensorMap.containsKey(key)) { + responseSizeSensorMap.get(key).record(value); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordResponseSizeForError(final long responseBytes) { + this.responseSizeSensor.record(responseBytes); + final MetricsKey key = new MetricsKey(); + if (responseSizeSensorMap.containsKey(key)) { + responseSizeSensorMap.get(key).record(responseBytes); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordStatusCode(final int statusCode) { + if (statusCode >= 200 && statusCode < 300) { + responseCode2XX.record(1); + } else if (statusCode >= 300 && statusCode < 400) { + responseCode3XX.record(1); + } else if (statusCode >= 400 && statusCode < 500) { + responseCode4XX.record(1); + } else if (statusCode >= 500) { + responseCode5XX.record(1); + } + } + + public void recordRowsReturned( + final double value, + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType + ) { + final MetricsKey key = new MetricsKey(sourceType, routingNodeType); + if (rowsReturnedSensorMap.containsKey(key)) { + rowsReturnedSensorMap.get(key).record(value); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordZeroRowsReturnedForError() { + final MetricsKey key = new MetricsKey(); + if (rowsReturnedSensorMap.containsKey(key)) { + rowsReturnedSensorMap.get(key).record(0); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordRowsProcessed( + final double value, + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType + ) { + final MetricsKey key = new MetricsKey(sourceType, routingNodeType); + if (rowsProcessedSensorMap.containsKey(key)) { + rowsProcessedSensorMap.get(key).record(value); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public void recordZeroRowsProcessedForError() { + final MetricsKey key = new MetricsKey(); + if (rowsProcessedSensorMap.containsKey(key)) { + rowsProcessedSensorMap.get(key).record(0); + } else { + throw new IllegalStateException("Metrics not configured correctly, missing " + key); + } + } + + public List getSensors() { + return Collections.unmodifiableList(sensors); + } + + @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "should be mutable") + public Metrics getMetrics() { + return metrics; + } + + private Sensor configureLocalRequestsSensor() { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + SCALABLE_PUSH_REQUESTS + "-local"); + + // new metrics with ksql service id in tags + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-local-count", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Count of local scalable push query requests", + customMetricsTags, + new CumulativeCount() + ); + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-local-rate", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Rate of local scalable push query requests", + customMetricsTags, + new Rate() + ); + sensors.add(sensor); + return sensor; + } + + private Sensor configureRemoteRequestsSensor() { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + SCALABLE_PUSH_REQUESTS + "-remote"); + + // new metrics with ksql service in tags + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-remote-count", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Count of remote scalable push query requests", + customMetricsTags, + new CumulativeCount() + ); + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-remote-rate", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Rate of remote scalable push query requests", + customMetricsTags, + new Rate() + ); + + sensors.add(sensor); + return sensor; + } + + private Sensor configureRateSensor() { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + SCALABLE_PUSH_REQUESTS + "-rate"); + + // new metrics with ksql service id in tags + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-rate", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Rate of pull query requests", + customMetricsTags, + new Rate() + ); + + sensors.add(sensor); + return sensor; + } + + private Sensor configureErrorRateSensor() { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + SCALABLE_PUSH_REQUESTS + "-error-rate"); + + // new metrics with ksql service id in tags + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-error-rate", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Rate of erroneous scalable push query requests", + customMetricsTags, + new Rate() + ); + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-error-total", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Total number of erroneous scalable push query requests", + customMetricsTags, + new CumulativeCount() + ); + + sensors.add(sensor); + return sensor; + } + + private Map configureErrorSensorMap() { + return configureSensorMap("error", (sensor, tags, variantName) -> { + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-detailed-error-total", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Total number of erroneous scalable push query requests - " + variantName, + tags, + new CumulativeCount() + ); + }); + } + + private Sensor configureStatusCodeSensor(final String codeName) { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + + SCALABLE_PUSH_REQUESTS + "-" + codeName + "-total"); + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-" + codeName + "-total", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Total number of status code " + codeName + " responses", + customMetricsTags, + new CumulativeCount() + ); + + sensors.add(sensor); + return sensor; + } + + private Sensor configureConnectionDurationSensor() { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + + SCALABLE_PUSH_REQUESTS + "-connection-duration"); + + // New metrics + addRequestMetricsToSensor( + sensor, ksqlServicePrefix, SCALABLE_PUSH_REQUESTS, customMetricsTags, ""); + + sensors.add(sensor); + return sensor; + } + + private Map configureConnectionDurationSensorMap() { + return configureSensorMap("connection-duration", (sensor, tags, variantName) -> { + addRequestMetricsToSensor( + sensor, ksqlServicePrefix, SCALABLE_PUSH_REQUESTS + "-detailed", + tags, " - " + variantName); + }); + } + + private void addRequestMetricsToSensor( + final Sensor sensor, + final String servicePrefix, + final String metricNamePrefix, + final Map metricsTags, + final String descriptionSuffix + ) { + addSensor( + sensor, + metricNamePrefix + "-connection-duration-avg", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Average time for a scalable push query request" + descriptionSuffix, + metricsTags, + new Avg() + ); + addSensor( + sensor, + metricNamePrefix + "-connection-duration-max", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Max time for a scalable push query request" + descriptionSuffix, + metricsTags, + new Max() + ); + addSensor( + sensor, + metricNamePrefix + "-connection-duration-min", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Min time for a scalable push query request" + descriptionSuffix, + metricsTags, + new Min() + ); + addSensor( + sensor, + metricNamePrefix + "-total", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Total number of scalable push query requests" + descriptionSuffix, + metricsTags, + new CumulativeCount() + ); + + sensor.add(new Percentiles( + 4 * NUM_LATENCY_BUCKETS, + MAX_LATENCY_BUCKET_VALUE_MICROS, + BucketSizing.LINEAR, + new Percentile(metrics.metricName( + metricNamePrefix + "-distribution-50", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Connection duration distribution" + descriptionSuffix, + metricsTags + ), 50.0), + new Percentile(metrics.metricName( + metricNamePrefix + "-distribution-75", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Connection duration distribution" + descriptionSuffix, + metricsTags + ), 75.0), + new Percentile(metrics.metricName( + metricNamePrefix + "-distribution-90", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Connection duration distribution" + descriptionSuffix, + metricsTags + ), 90.0), + new Percentile(metrics.metricName( + metricNamePrefix + "-distribution-99", + servicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Connection duration distribution" + descriptionSuffix, + metricsTags + ), 99.0) + )); + } + + private Sensor configureRequestSizeSensor() { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + SCALABLE_PUSH_REQUESTS + "-request-size"); + + // new metrics with ksql service id in tags + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-request-size", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Size in bytes of scalable push query request", + customMetricsTags, + new CumulativeSum() + ); + + sensors.add(sensor); + return sensor; + } + + private Sensor configureResponseSizeSensor() { + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + SCALABLE_PUSH_REQUESTS + "-response-size"); + + // new metrics with ksql service id in tags + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-response-size", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Size in bytes of scalable push query response", + customMetricsTags, + new CumulativeSum() + ); + + sensors.add(sensor); + return sensor; + } + + private Map configureResponseSizeSensorMap() { + return configureSensorMap("response-size", (sensor, tags, variantName) -> { + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-detailed-response-size", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Size in bytes of scalable push query response - " + variantName, + tags, + new CumulativeSum() + ); + }); + } + + private Map configureRowsReturnedSensorMap() { + return configureSensorMap("rows-returned", (sensor, tags, variantName) -> { + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-rows-returned-total", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Number of rows returned - " + variantName, + tags, + new CumulativeSum() + ); + }); + } + + private Map configureRowsProcessedSensorMap() { + return configureSensorMap("rows-processed", (sensor, tags, variantName) -> { + addSensor( + sensor, + SCALABLE_PUSH_REQUESTS + "-rows-processed-total", + ksqlServicePrefix + SCALABLE_PUSH_QUERY_METRIC_GROUP, + "Number of rows processed -" + variantName, + tags, + new CumulativeSum() + ); + }); + } + + private void addSensor( + final Sensor sensor, + final String metricName, + final String groupName, + final String description, + final Map metricsTags, + final MeasurableStat measurableStat + ) { + sensor.add( + metrics.metricName( + metricName, + groupName, + description, + metricsTags + ), + measurableStat + ); + } + + private Map configureSensorMap( + final String sensorBaseName, final MetricsAdder metricsAdder) { + final ImmutableMap.Builder builder = ImmutableMap.builder(); + + for (final QuerySourceType sourceType : QuerySourceType.values()) { + for (final RoutingNodeType routingNodeType : RoutingNodeType.values()) { + addSensorToMap( + sensorBaseName, + metricsAdder, + builder, + new MetricsKey(sourceType, routingNodeType) + ); + } + } + + // Add one more sensor for collecting metrics when there is no response + addSensorToMap(sensorBaseName, metricsAdder, builder, new MetricsKey()); + + return builder.build(); + } + + private void addSensorToMap(final String sensorBaseName, final MetricsAdder metricsAdder, + final Builder builder, final MetricsKey metricsKey) { + final String variantName = metricsKey.variantName(); + final Sensor sensor = metrics.sensor( + SCALABLE_PUSH_QUERY_METRIC_GROUP + "-" + + SCALABLE_PUSH_REQUESTS + "-" + + sensorBaseName + "-" + + variantName); + + final ImmutableMap tags = ImmutableMap.builder() + .putAll(customMetricsTags) + .put(KsqlConstants.KSQL_QUERY_SOURCE_TAG, metricsKey.sourceTypeName()) + .put(KsqlConstants.KSQL_QUERY_ROUTING_TYPE_TAG, metricsKey.routingNodeTypeName()) + .build(); + + metricsAdder.addMetrics(sensor, tags, variantName); + + builder.put( + metricsKey, + sensor + ); + sensors.add(sensor); + } + + private interface MetricsAdder { + + void addMetrics(Sensor sensor, Map tags, String variantName); + } + + // Detailed metrics are broken down by multiple parameters represented by the following key. + private static class MetricsKey { + + private final QuerySourceType sourceType; + private final RoutingNodeType routingNodeType; + + /** + * Constructor representing an "unknown key" for situations in which we record metrics for an + * API call that didn't have a result (because it had an error instead). + */ + MetricsKey() { + this.sourceType = null; + this.routingNodeType = null; + } + + MetricsKey( + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType + ) { + this.sourceType = Objects.requireNonNull(sourceType, "sourceType"); + this.routingNodeType = Objects.requireNonNull(routingNodeType, "routingNodeType"); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final MetricsKey key = (MetricsKey) o; + return Objects.equals(sourceType, key.sourceType) + && Objects.equals(routingNodeType, key.routingNodeType); + } + + @Override + public int hashCode() { + return Objects.hash(sourceType, routingNodeType); + } + + @Override + public String toString() { + return "MetricsKey{" + + "sourceType=" + sourceType + + ", routingNodeType=" + routingNodeType + + '}'; + } + + public String variantName() { + return sourceTypeName() + "-" + + routingNodeTypeName(); + } + + public String sourceTypeName() { + return getName(sourceType); + } + + public String routingNodeTypeName() { + return getName(routingNodeType); + } + + private String getName(final Enum o) { + if (o == null) { + return "unknown"; + } else { + return o.name().toLowerCase(); + } + } + } +} + diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java index 6a28a0b8e96e..b0b6c4c4b04e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlan.java @@ -27,6 +27,7 @@ import io.confluent.ksql.query.PullQueryQueue; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -49,7 +50,7 @@ public class PullPhysicalPlan { private final QueryId queryId; private final List lookupConstraints; private final PullPhysicalPlanType pullPhysicalPlanType; - private final PullSourceType pullSourceType; + private final QuerySourceType querySourceType; private final Materialization mat; private final DataSourceOperator dataSourceOperator; @@ -59,7 +60,7 @@ public PullPhysicalPlan( final QueryId queryId, final List lookupConstraints, final PullPhysicalPlanType pullPhysicalPlanType, - final PullSourceType pullSourceType, + final QuerySourceType querySourceType, final Materialization mat, final DataSourceOperator dataSourceOperator ) { @@ -69,7 +70,7 @@ public PullPhysicalPlan( this.lookupConstraints = Objects.requireNonNull(lookupConstraints, "lookupConstraints"); this.pullPhysicalPlanType = Objects.requireNonNull(pullPhysicalPlanType, "pullPhysicalPlanType"); - this.pullSourceType = Objects.requireNonNull(pullSourceType, "pullSourceType"); + this.querySourceType = Objects.requireNonNull(querySourceType, "pullSourceType"); this.mat = Objects.requireNonNull(mat, "mat"); this.dataSourceOperator = Objects.requireNonNull( dataSourceOperator, "dataSourceOperator"); @@ -144,8 +145,8 @@ public PullPhysicalPlanType getPlanType() { return pullPhysicalPlanType; } - public PullSourceType getSourceType() { - return pullSourceType; + public QuerySourceType getSourceType() { + return querySourceType; } public long getRowsReadFromDataSource() { @@ -167,24 +168,4 @@ public enum PullPhysicalPlanType { TABLE_SCAN, UNKNOWN } - - /** - * The types we consider for metrics purposes. These should only be added to. You can deprecate - * a field, but don't delete it or change its meaning - */ - public enum PullSourceType { - NON_WINDOWED, - WINDOWED, - NON_WINDOWED_STREAM, - WINDOWED_STREAM - } - - /** - * The types we consider for metrics purposes. These should only be added to. You can deprecate - * a field, but don't delete it or change its meaning - */ - public enum RoutingNodeType { - SOURCE_NODE, - REMOTE_NODE - } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java index 50fc0107e4d9..ca460f1319d2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullPhysicalPlanBuilder.java @@ -29,7 +29,6 @@ import io.confluent.ksql.physical.common.operators.ProjectOperator; import io.confluent.ksql.physical.common.operators.SelectOperator; import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType; import io.confluent.ksql.physical.pull.operators.DataSourceOperator; import io.confluent.ksql.physical.pull.operators.KeyedTableLookupOperator; import io.confluent.ksql.physical.pull.operators.KeyedWindowedTableLookupOperator; @@ -48,6 +47,7 @@ import io.confluent.ksql.planner.plan.QueryFilterNode; import io.confluent.ksql.planner.plan.QueryProjectNode; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.Collections; @@ -75,7 +75,7 @@ public class PullPhysicalPlanBuilder { private List lookupConstraints; private PullPhysicalPlanType pullPhysicalPlanType; - private PullSourceType pullSourceType; + private QuerySourceType querySourceType; private boolean seenSelectOperator = false; public PullPhysicalPlanBuilder( @@ -160,7 +160,7 @@ public PullPhysicalPlan buildPullPhysicalPlan(final LogicalPlanNode logicalPlanN queryId, lookupConstraints, pullPhysicalPlanType, - pullSourceType, + querySourceType, mat, dataSourceOperator); } @@ -228,8 +228,8 @@ private AbstractPhysicalOperator translateDataSourceNode( } } - pullSourceType = logicalNode.isWindowed() - ? PullSourceType.WINDOWED : PullSourceType.NON_WINDOWED; + querySourceType = logicalNode.isWindowed() + ? QuerySourceType.WINDOWED : QuerySourceType.NON_WINDOWED; if (pullPhysicalPlanType == PullPhysicalPlanType.TABLE_SCAN) { if (!logicalNode.isWindowed()) { return new TableScanOperator(mat, logicalNode, shouldCancelOperations); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java index 27241d3a1dc0..f35bb1fcbb41 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/PullQueryResult.java @@ -19,11 +19,11 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType; import io.confluent.ksql.query.PullQueryQueue; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -40,7 +40,7 @@ public class PullQueryResult { private final QueryId queryId; private final PullQueryQueue pullQueryQueue; private final Optional pullQueryMetrics; - private final PullSourceType sourceType; + private final QuerySourceType sourceType; private final PullPhysicalPlanType planType; private final RoutingNodeType routingNodeType; private final Supplier rowsProcessedSupplier; @@ -59,7 +59,7 @@ public PullQueryResult( final QueryId queryId, final PullQueryQueue pullQueryQueue, final Optional pullQueryMetrics, - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType, final Supplier rowsProcessedSupplier, @@ -129,7 +129,7 @@ public void onException(final Consumer consumer) { } public void onCompletion(final Consumer consumer) { - future.thenAccept(consumer::accept); + future.thenAccept(consumer); } public void onCompletionOrException(final BiConsumer biConsumer) { @@ -139,7 +139,7 @@ public void onCompletionOrException(final BiConsumer biConsumer }); } - public PullSourceType getSourceType() { + public QuerySourceType getSourceType() { return sourceType; } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java index b6b88412c927..675dbba8530b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlan.java @@ -22,6 +22,7 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.reactive.BufferedPublisher; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; import io.confluent.ksql.util.VertxUtils; import io.vertx.core.Context; import java.util.List; @@ -49,6 +50,7 @@ public class PushPhysicalPlan { private final ScalablePushRegistry scalablePushRegistry; private final PushDataSourceOperator dataSourceOperator; private final Context context; + private final QuerySourceType querySourceType; private volatile boolean closed = false; private long timer = -1; @@ -59,7 +61,8 @@ public PushPhysicalPlan( final QueryId queryId, final ScalablePushRegistry scalablePushRegistry, final PushDataSourceOperator dataSourceOperator, - final Context context + final Context context, + final QuerySourceType querySourceType ) { this.root = Objects.requireNonNull(root, "root"); this.schema = Objects.requireNonNull(schema, "schema"); @@ -68,6 +71,7 @@ public PushPhysicalPlan( Objects.requireNonNull(scalablePushRegistry, "scalablePushRegistry"); this.dataSourceOperator = dataSourceOperator; this.context = context; + this.querySourceType = Objects.requireNonNull(querySourceType, "querySourceType"); } public BufferedPublisher> execute() { @@ -166,6 +170,14 @@ public Context getContext() { return context; } + public QuerySourceType getSourceType() { + return querySourceType; + } + + public long getRowsReadFromDataSource() { + return dataSourceOperator.getRowsReadCount(); + } + public static class Publisher extends BufferedPublisher> { public Publisher(final Context ctx) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java index 0b2cae8c1b71..f23228a9b7ef 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanBuilder.java @@ -33,6 +33,7 @@ import io.confluent.ksql.planner.plan.QueryFilterNode; import io.confluent.ksql.planner.plan.QueryProjectNode; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; import io.vertx.core.Context; @@ -53,6 +54,8 @@ public class PushPhysicalPlanBuilder { private final Stacker contextStacker; private final QueryId queryId; + private QuerySourceType querySourceType; + public PushPhysicalPlanBuilder( final ProcessingLogContext processingLogContext, final PersistentQueryMetadata persistentQueryMetadata, @@ -130,7 +133,8 @@ public PushPhysicalPlan buildPushPhysicalPlan( queryId, dataSourceOperator.getScalablePushRegistry(), dataSourceOperator, - context); + context, + querySourceType); } private ProjectOperator translateProjectNode(final QueryProjectNode logicalNode) { @@ -163,6 +167,9 @@ private AbstractPhysicalOperator translateDataSourceNode( final ScalablePushRegistry scalablePushRegistry = persistentQueryMetadata.getScalablePushRegistry() .orElseThrow(() -> new IllegalStateException("Scalable push registry cannot be found")); + + querySourceType = logicalNode.isWindowed() + ? QuerySourceType.WINDOWED : QuerySourceType.NON_WINDOWED; return new PeekStreamOperator(scalablePushRegistry, logicalNode, queryId, expectingStartOfRegistryData); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java index 21a47f5f2768..168312352138 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java @@ -24,6 +24,7 @@ import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.physical.scalablepush.locator.PushLocator.KsqlNode; import io.confluent.ksql.query.QueryId; @@ -100,19 +101,20 @@ public CompletableFuture handlePushQuery( final ConfiguredStatement statement, final PushRoutingOptions pushRoutingOptions, final LogicalSchema outputSchema, - final TransientQueryQueue transientQueryQueue + final TransientQueryQueue transientQueryQueue, + final Optional scalablePushQueryMetrics ) { final Set hosts = getInitialHosts(pushPhysicalPlan, statement, pushRoutingOptions); final PushConnectionsHandle pushConnectionsHandle = new PushConnectionsHandle(); // Returns a future with the handle once the initial connection is made final CompletableFuture result = connectToHosts( - serviceContext, pushPhysicalPlan, statement, hosts, outputSchema, - transientQueryQueue, pushConnectionsHandle, false); + serviceContext, pushPhysicalPlan, statement, hosts, outputSchema, transientQueryQueue, + pushConnectionsHandle, false, scalablePushQueryMetrics); // Only check for new nodes if this is the source node if (backgroundRetries && !pushRoutingOptions.getHasBeenForwarded()) { checkForNewHostsOnContext(serviceContext, pushPhysicalPlan, statement, hosts, outputSchema, - transientQueryQueue, pushConnectionsHandle); + transientQueryQueue, pushConnectionsHandle, scalablePushQueryMetrics); } return result; } @@ -158,7 +160,8 @@ private CompletableFuture connectToHosts( final LogicalSchema outputSchema, final TransientQueryQueue transientQueryQueue, final PushConnectionsHandle pushConnectionsHandle, - final boolean dynamicallyAddedNode + final boolean dynamicallyAddedNode, + final Optional scalablePushQueryMetrics ) { final Map> futureMap = new LinkedHashMap<>(); for (final KsqlNode node : hosts) { @@ -180,7 +183,7 @@ private CompletableFuture connectToHosts( }); futureMap.put(node, executeOrRouteQuery( node, statement, serviceContext, pushPhysicalPlan, outputSchema, - transientQueryQueue, callback, dynamicallyAddedNode)); + transientQueryQueue, callback, dynamicallyAddedNode, scalablePushQueryMetrics)); } return CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])) .thenApply(v -> { @@ -237,12 +240,15 @@ static CompletableFuture executeOrRouteQuery( final LogicalSchema outputSchema, final TransientQueryQueue transientQueryQueue, final CompletableFuture callback, - final boolean dynamicallyAddedNode + final boolean dynamicallyAddedNode, + final Optional scalablePushQueryMetrics ) { if (node.isLocal()) { LOG.debug("Query {} id {} executed locally at host {} at timestamp {}.", statement.getStatementText(), pushPhysicalPlan.getQueryId(), node.location(), System.currentTimeMillis()); + scalablePushQueryMetrics + .ifPresent(metrics -> metrics.recordLocalRequests(1)); final AtomicReference>> publisherRef = new AtomicReference<>(null); return CompletableFuture.completedFuture(null) @@ -273,6 +279,8 @@ static CompletableFuture executeOrRouteQuery( } else { LOG.debug("Query {} routed to host {} at timestamp {}.", statement.getStatementText(), node.location(), System.currentTimeMillis()); + scalablePushQueryMetrics + .ifPresent(metrics -> metrics.recordRemoteRequests(1)); final AtomicReference> publisherRef = new AtomicReference<>(null); final CompletableFuture> publisherFuture @@ -339,11 +347,12 @@ private void checkForNewHostsOnContext( final Set hosts, final LogicalSchema outputSchema, final TransientQueryQueue transientQueryQueue, - final PushConnectionsHandle pushConnectionsHandle + final PushConnectionsHandle pushConnectionsHandle, + final Optional scalablePushQueryMetrics ) { pushPhysicalPlan.getContext().runOnContext(v -> checkForNewHosts(serviceContext, pushPhysicalPlan, statement, outputSchema, - transientQueryQueue, pushConnectionsHandle)); + transientQueryQueue, pushConnectionsHandle, scalablePushQueryMetrics)); } private void checkForNewHosts( @@ -352,7 +361,8 @@ private void checkForNewHosts( final ConfiguredStatement statement, final LogicalSchema outputSchema, final TransientQueryQueue transientQueryQueue, - final PushConnectionsHandle pushConnectionsHandle + final PushConnectionsHandle pushConnectionsHandle, + final Optional scalablePushQueryMetrics ) { VertxUtils.checkContext(pushPhysicalPlan.getContext()); if (pushConnectionsHandle.isClosed()) { @@ -372,7 +382,8 @@ private void checkForNewHosts( if (newHosts.size() > 0) { LOG.info("Dynamically adding new hosts {} for {}", newHosts, pushPhysicalPlan.getQueryId()); connectToHosts(serviceContext, pushPhysicalPlan, statement, newHosts, outputSchema, - transientQueryQueue, pushConnectionsHandle, true); + transientQueryQueue, pushConnectionsHandle, true, + scalablePushQueryMetrics); } if (removedHosts.size() > 0) { LOG.info("Dynamically removing hosts {} for {}", removedHosts, pushPhysicalPlan.getQueryId()); @@ -384,7 +395,7 @@ private void checkForNewHosts( } pushPhysicalPlan.getContext().owner().setTimer(clusterCheckInterval, timerId -> checkForNewHosts(serviceContext, pushPhysicalPlan, statement, outputSchema, - transientQueryQueue, pushConnectionsHandle)); + transientQueryQueue, pushConnectionsHandle, scalablePushQueryMetrics)); } private static Set loadCurrentHosts(final ScalablePushRegistry scalablePushRegistry) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java index 9e65f6386d55..f998ab6aff78 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRoutingOptions.java @@ -26,4 +26,17 @@ public interface PushRoutingOptions { // When a rebalance occurs and we connect to a new node, we don't want to miss anything, so we // set this flag indicating we should error if this expectation isn't met. boolean getExpectingStartOfRegistryData(); + + boolean getIsDebugRequest(); + + /** + * @return a human readable representation of the routing options, used + * to debug requests + */ + default String debugString() { + return "PushRoutingOptions{" + + "getHasBeenForwarded: " + getHasBeenForwarded() + + ", isDebugRequest: " + getIsDebugRequest() + + "}"; + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java index d6b128f54e0a..e5e2c5ddcb34 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PeekStreamOperator.java @@ -34,6 +34,8 @@ public class PeekStreamOperator extends AbstractPhysicalOperator implements Push private final ProcessingQueue processingQueue; private final boolean expectingStartOfRegistryData; + private long rowsRead = 0; + public PeekStreamOperator( final ScalablePushRegistry scalablePushRegistry, final DataSourceNode logicalNode, @@ -53,6 +55,7 @@ public void open() { @Override public Object next() { + rowsRead++; return processingQueue.poll(); } @@ -101,4 +104,9 @@ public boolean droppedRows() { public boolean hasError() { return processingQueue.getHasError(); } + + @Override + public long getRowsReadCount() { + return rowsRead; + } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java index ba2fb8ae2e61..454deb1d5ecc 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/operators/PushDataSourceOperator.java @@ -30,5 +30,9 @@ public interface PushDataSourceOperator { // If rows have been dropped. boolean droppedRows(); + // If an error has occurred. boolean hasError(); + + // Number of rows read + long getRowsReadCount(); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java index 568fcd82f6e8..3ad2e470fdaa 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/ScalablePushQueryMetadata.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.physical.scalablepush.PushQueryPreparer; import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator; import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle; @@ -24,8 +25,13 @@ import io.confluent.ksql.query.LimitHandler; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; public class ScalablePushQueryMetadata implements PushQueryMetadata { @@ -34,9 +40,14 @@ public class ScalablePushQueryMetadata implements PushQueryMetadata { private final LogicalSchema logicalSchema; private final QueryId queryId; private final BlockingRowQueue rowQueue; + private final Optional scalablePushQueryMetrics; private final ResultType resultType; private final PushQueryQueuePopulator pushQueryQueuePopulator; private final PushQueryPreparer pushQueryPreparer; + private final QuerySourceType sourceType; + private final RoutingNodeType routingNodeType; + private final Supplier rowsProcessedSupplier; + // Future for the start of the connections, which creates a handle private CompletableFuture startFuture = new CompletableFuture<>(); @@ -49,16 +60,25 @@ public ScalablePushQueryMetadata( final LogicalSchema logicalSchema, final QueryId queryId, final BlockingRowQueue blockingRowQueue, + final Optional scalablePushQueryMetrics, final ResultType resultType, final PushQueryQueuePopulator pushQueryQueuePopulator, - final PushQueryPreparer pushQueryPreparer + final PushQueryPreparer pushQueryPreparer, + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType, + final Supplier rowsProcessedSupplier ) { this.logicalSchema = logicalSchema; this.queryId = queryId; this.rowQueue = blockingRowQueue; + this.scalablePushQueryMetrics = scalablePushQueryMetrics; this.resultType = resultType; this.pushQueryQueuePopulator = pushQueryQueuePopulator; this.pushQueryPreparer = pushQueryPreparer; + this.sourceType = sourceType; + this.routingNodeType = routingNodeType; + this.rowsProcessedSupplier = rowsProcessedSupplier; + } /** @@ -139,8 +159,37 @@ public ResultType getResultType() { public void onException(final Consumer consumer) { runningFuture.exceptionally(t -> { + scalablePushQueryMetrics.ifPresent(metrics -> + metrics.recordErrorRate(1, sourceType, routingNodeType)); consumer.accept(t); return null; }); } + + public void onCompletion(final Consumer consumer) { + runningFuture.thenAccept(consumer); + } + + public void onCompletionOrException(final BiConsumer biConsumer) { + runningFuture.handle((v, t) -> { + biConsumer.accept(v, t); + return null; + }); + } + + public QuerySourceType getSourceType() { + return sourceType; + } + + public RoutingNodeType getRoutingNodeType() { + return routingNodeType; + } + + public long getTotalRowsReturned() { + return rowQueue.size(); + } + + public long getTotalRowsProcessed() { + return rowsProcessedSupplier.get(); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java index 5285bec72d97..57adf0864b47 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushPhysicalPlanTest.java @@ -14,6 +14,7 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.reactive.BufferedPublisher; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; import io.vertx.core.Context; import io.vertx.core.Vertx; import java.util.ArrayList; @@ -45,6 +46,8 @@ public class PushPhysicalPlanTest { private ScalablePushRegistry scalablePushRegistry; @Mock private PushDataSourceOperator pushDataSourceOperator; + @Mock + private QuerySourceType querySourceType; @Captor private ArgumentCaptor runnableCaptor; @@ -67,7 +70,7 @@ public void tearDown() { @Test public void shouldPublishRows() throws InterruptedException { final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, - scalablePushRegistry, pushDataSourceOperator, context); + scalablePushRegistry, pushDataSourceOperator, context, querySourceType); doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); when(pushDataSourceOperator.droppedRows()).thenReturn(false); @@ -95,7 +98,7 @@ public void shouldPublishRows() throws InterruptedException { @Test public void shouldStopOnDroppedRows() throws InterruptedException { final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, - scalablePushRegistry, pushDataSourceOperator, context); + scalablePushRegistry, pushDataSourceOperator, context, querySourceType); doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); when(pushDataSourceOperator.droppedRows()).thenReturn(false, false, true); @@ -128,7 +131,7 @@ public void shouldStopOnDroppedRows() throws InterruptedException { @Test public void shouldStopOnHasError() throws InterruptedException { final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, - scalablePushRegistry, pushDataSourceOperator, context); + scalablePushRegistry, pushDataSourceOperator, context, querySourceType); doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); when(pushDataSourceOperator.hasError()).thenReturn(false, false, true); @@ -161,7 +164,7 @@ public void shouldStopOnHasError() throws InterruptedException { @Test public void shouldThrowErrorOnOpen() throws InterruptedException { final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, - scalablePushRegistry, pushDataSourceOperator, context); + scalablePushRegistry, pushDataSourceOperator, context, querySourceType); doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); doThrow(new RuntimeException("Error on open")).when(root).open(); @@ -178,7 +181,7 @@ public void shouldThrowErrorOnOpen() throws InterruptedException { @Test public void shouldThrowErrorOnNext() throws InterruptedException { final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId, - scalablePushRegistry, pushDataSourceOperator, context); + scalablePushRegistry, pushDataSourceOperator, context, querySourceType); doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture()); when(pushDataSourceOperator.droppedRows()).thenReturn(false); doThrow(new RuntimeException("Error on next")).when(root).next(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java index 72409c34597c..8eef9baae2ae 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java @@ -14,6 +14,7 @@ import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; import io.confluent.ksql.config.SessionConfig; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle; import io.confluent.ksql.physical.scalablepush.PushRouting.RoutingResult; @@ -35,6 +36,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -84,6 +86,8 @@ public class PushRoutingTest { private KsqlNode ksqlNodeRemote2; @Mock private TransientQueryQueue transientQueryQueueMock; + @Mock + private Optional scalablePushQueryMetrics; private Vertx vertx; private Context context; @@ -143,7 +147,7 @@ public void shouldSucceed_forward() throws ExecutionException, InterruptedExcept // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); @@ -184,7 +188,7 @@ public void shouldSucceed_addRemoteNode() throws ExecutionException, Interrupted // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); @@ -238,7 +242,7 @@ public void shouldSucceed_removeRemoteNode() throws ExecutionException, Interrup // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); @@ -288,7 +292,7 @@ public void shouldSucceed_remoteNodeComplete() throws ExecutionException, Interr // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); @@ -337,7 +341,7 @@ public void shouldFail_remoteNodeException() throws ExecutionException, Interrup // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); final PushConnectionsHandle handle = future.get(); final AtomicReference exception = new AtomicReference<>(null); handle.onException(exception::set); @@ -369,7 +373,7 @@ public void shouldSucceed_justForwarded() throws ExecutionException, Interrupted // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); @@ -402,7 +406,7 @@ public void shouldFail_duringPlanExecute() throws ExecutionException, Interrupte // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); PushConnectionsHandle handle = future.get(); // Then: @@ -420,7 +424,7 @@ public void shouldFail_non200RemoteCall() throws ExecutionException, Interrupted // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); PushConnectionsHandle handle = future.get(); // Then: @@ -443,7 +447,7 @@ public void shouldFail_errorRemoteCall() throws ExecutionException, InterruptedE // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); PushConnectionsHandle handle = future.get(); // Then: @@ -463,7 +467,7 @@ public void shouldFail_hitRequestLimitLocal() throws ExecutionException, Interru // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); @@ -498,7 +502,7 @@ public void shouldFail_hitRequestLimitRemote() throws ExecutionException, Interr // When: CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, - outputSchema, transientQueryQueue); + outputSchema, transientQueryQueue, scalablePushQueryMetrics); PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { remotePublisher.accept(REMOTE_ROW1); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java index d4a90d11a85f..17022bc8c77f 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/ScalablePushQueryMetadataTest.java @@ -21,12 +21,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator; import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle; import io.confluent.ksql.query.BlockingRowQueue; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.util.PushQueryMetadata.ResultType; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.junit.Before; @@ -52,6 +54,8 @@ public class ScalablePushQueryMetadataTest { private ArgumentCaptor> errorCallbackCaptor; @Mock private Consumer errorCallback; + @Mock + private Optional metrics; private ScalablePushQueryMetadata query; @@ -61,9 +65,13 @@ public void setUp() { logicalSchema, new QueryId("queryid"), blockingRowQueue, + metrics, ResultType.STREAM, populator, - () -> { } + () -> { }, + null, + null, + null ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index 62f6797a6564..b37831eb91e1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -29,6 +29,7 @@ import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; import io.confluent.ksql.execution.streams.RoutingOptions; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; @@ -48,8 +49,8 @@ import io.confluent.ksql.rest.server.resources.streaming.PushQueryConfigRoutingOptions; import io.confluent.ksql.rest.util.ConcurrencyLimiter; import io.confluent.ksql.rest.util.ConcurrencyLimiter.Decrementer; -import io.confluent.ksql.rest.util.PullQueryMetricsUtil; import io.confluent.ksql.rest.util.QueryCapacityUtil; +import io.confluent.ksql.rest.util.QueryMetricsUtil; import io.confluent.ksql.rest.util.ScalablePushUtil; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.utils.FormatOptions; @@ -83,6 +84,7 @@ public class QueryEndpoint { private final KsqlRestConfig ksqlRestConfig; private final RoutingFilterFactory routingFilterFactory; private final Optional pullQueryMetrics; + private final Optional scalablePushQueryMetrics; private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; @@ -100,6 +102,7 @@ public QueryEndpoint( final KsqlRestConfig ksqlRestConfig, final RoutingFilterFactory routingFilterFactory, final Optional pullQueryMetrics, + final Optional scalablePushQueryMetrics, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, final SlidingWindowRateLimiter pullBandLimiter, @@ -113,6 +116,7 @@ public QueryEndpoint( this.ksqlRestConfig = ksqlRestConfig; this.routingFilterFactory = routingFilterFactory; this.pullQueryMetrics = pullQueryMetrics; + this.scalablePushQueryMetrics = scalablePushQueryMetrics; this.rateLimiter = rateLimiter; this.pullConcurrencyLimiter = pullConcurrencyLimiter; this.pullBandRateLimiter = pullBandLimiter; @@ -185,7 +189,8 @@ public QueryPublisher createQueryPublisher( statement, workerExecutor, requestProperties, - metricsCallbackHolder + metricsCallbackHolder, + scalablePushQueryMetrics ); } else { return createPushQueryPublisher(context, serviceContext, statement, workerExecutor); @@ -199,11 +204,15 @@ private QueryPublisher createScalablePushQueryPublisher( final ConfiguredStatement statement, final WorkerExecutor workerExecutor, final Map requestProperties, - final MetricsCallbackHolder metricsCallbackHolder + final MetricsCallbackHolder metricsCallbackHolder, + final Optional scalablePushQueryMetrics + ) { - metricsCallbackHolder.setCallback((statusCode, requestBytes, responseBytes, startTimeNanos) -> { - scalablePushBandRateLimiter.add(responseBytes); - }); + // First thing, set the metrics callback so that it gets called, even if we hit an error + final AtomicReference resultForMetrics = + new AtomicReference<>(null); + metricsCallbackHolder.setCallback(QueryMetricsUtil.initializeScalablePushMetricsCallback( + scalablePushQueryMetrics, scalablePushBandRateLimiter, resultForMetrics)); final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); @@ -211,10 +220,6 @@ private QueryPublisher createScalablePushQueryPublisher( final PushQueryConfigRoutingOptions routingOptions = new PushQueryConfigRoutingOptions(requestProperties); - metricsCallbackHolder.setCallback((statusCode, requestBytes, responseBytes, startTimeNanos) -> { - scalablePushBandRateLimiter.add(responseBytes); - }); - final PushQueryConfigPlannerOptions plannerOptions = new PushQueryConfigPlannerOptions( ksqlConfig, statement.getSessionConfig().getOverrides()); @@ -223,11 +228,12 @@ private QueryPublisher createScalablePushQueryPublisher( final ScalablePushQueryMetadata query = ksqlEngine .executeScalablePushQuery(analysis, serviceContext, statement, pushRouting, routingOptions, - plannerOptions, context); + plannerOptions, context, scalablePushQueryMetrics); query.prepare(); + resultForMetrics.set(query); - - publisher.setQueryHandle(new KsqlQueryHandle(query), false, true); + publisher.setQueryHandle( + new KsqlScalablePushQueryHandle(query, scalablePushQueryMetrics), false, true); return publisher; } @@ -270,7 +276,7 @@ private QueryPublisher createStreamPullQueryPublisher( // First thing, set the metrics callback so that it gets called, even if we hit an error final AtomicReference resultForMetrics = new AtomicReference<>(null); final AtomicReference refDecrementer = new AtomicReference<>(null); - metricsCallbackHolder.setCallback(PullQueryMetricsUtil.initializeStreamMetricsCallback( + metricsCallbackHolder.setCallback(QueryMetricsUtil.initializePullStreamMetricsCallback( pullQueryMetrics, pullBandRateLimiter, analysis, resultForMetrics, refDecrementer)); PullQueryExecutionUtil.checkRateLimit(rateLimiter); @@ -310,7 +316,7 @@ private QueryPublisher createTablePullQueryPublisher( ) { // First thing, set the metrics callback so that it gets called, even if we hit an error final AtomicReference resultForMetrics = new AtomicReference<>(null); - metricsCallbackHolder.setCallback(PullQueryMetricsUtil.initializeTableMetricsCallback( + metricsCallbackHolder.setCallback(QueryMetricsUtil.initializePullTableMetricsCallback( pullQueryMetrics, pullBandRateLimiter, resultForMetrics)); final RoutingOptions routingOptions = new PullQueryConfigRoutingOptions( ksqlConfig, @@ -501,4 +507,64 @@ public QueryId getQueryId() { return result.getQueryId(); } } + + private static class KsqlScalablePushQueryHandle implements QueryHandle { + + private final ScalablePushQueryMetadata scalablePushQueryMetadata; + private final Optional scalablePushQueryMetrics; + private final CompletableFuture future = new CompletableFuture<>(); + + KsqlScalablePushQueryHandle(final ScalablePushQueryMetadata scalablePushQueryMetadata, + final Optional scalablePushQueryMetrics) { + this.scalablePushQueryMetadata = Objects.requireNonNull(scalablePushQueryMetadata); + this.scalablePushQueryMetrics = Objects.requireNonNull(scalablePushQueryMetrics); + } + + @Override + public List getColumnNames() { + return colNamesFromSchema(scalablePushQueryMetadata.getLogicalSchema().columns()); + } + + @Override + public List getColumnTypes() { + return colTypesFromSchema(scalablePushQueryMetadata.getLogicalSchema().columns()); + } + + @Override + public void start() { + try { + scalablePushQueryMetadata.start(); + scalablePushQueryMetadata.onException(future::completeExceptionally); + scalablePushQueryMetadata.onCompletion(future::complete); + } catch (Exception e) { + scalablePushQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate( + 1, + scalablePushQueryMetadata.getSourceType(), + scalablePushQueryMetadata.getRoutingNodeType())); + } + } + + @Override + public void stop() { + scalablePushQueryMetadata.close(); + } + + @Override + public BlockingRowQueue getQueue() { + return scalablePushQueryMetadata.getRowQueue(); + } + + @Override + public void onException(final Consumer onException) { + future.exceptionally(t -> { + onException.accept(t); + return null; + }); + } + + @Override + public QueryId getQueryId() { + return scalablePushQueryMetadata.getQueryId(); + } + } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index ca249901d988..1280f9a1d3dd 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -44,6 +44,7 @@ import io.confluent.ksql.function.UserFunctionLoader; import io.confluent.ksql.internal.JmxDataPointsReporter; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.internal.StorageUtilizationMetricsReporter; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; @@ -198,6 +199,7 @@ public final class KsqlRestApplication implements Executable { private final DenyListPropertyValidator denyListPropertyValidator; private final RoutingFilterFactory routingFilterFactory; private final Optional pullQueryMetrics; + private final Optional scalablePushQueryMetrics; private final RateLimiter pullQueryRateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; @@ -240,6 +242,7 @@ public static SourceName getCommandsStreamName() { final Vertx vertx, final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, + final Optional scalablePushQueryMetrics, final RoutingFilterFactory routingFilterFactory, final RateLimiter pullQueryRateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, @@ -286,11 +289,7 @@ public static SourceName getCommandsStreamName() { this.heartbeatResource = Optional.empty(); this.clusterStatusResource = Optional.empty(); } - if (lagReportingAgent.isPresent()) { - this.lagReportingResource = Optional.of(new LagReportingResource(lagReportingAgent.get())); - } else { - this.lagReportingResource = Optional.empty(); - } + this.lagReportingResource = lagReportingAgent.map(LagReportingResource::new); this.healthCheckResource = HealthCheckResource.create( ksqlResource, serviceContext, @@ -299,6 +298,8 @@ public static SourceName getCommandsStreamName() { this.commandRunner); MetricCollectors.addConfigurableReporter(ksqlConfigNoPort); this.pullQueryMetrics = requireNonNull(pullQueryMetrics, "pullQueryMetrics"); + this.scalablePushQueryMetrics = + requireNonNull(scalablePushQueryMetrics, "scalablePushQueryMetrics"); log.debug("ksqlDB API server instance created"); this.routingFilterFactory = requireNonNull(routingFilterFactory, "routingFilterFactory"); this.pullQueryRateLimiter = requireNonNull(pullQueryRateLimiter, "pullQueryRateLimiter"); @@ -349,6 +350,7 @@ public void startAsync() { errorHandler, denyListPropertyValidator, pullQueryMetrics, + scalablePushQueryMetrics, routingFilterFactory, pullQueryRateLimiter, pullConcurrencyLimiter, @@ -378,6 +380,7 @@ public void startAsync() { serverMetadataResource, wsQueryEndpoint, pullQueryMetrics, + scalablePushQueryMetrics, pullQueryRateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, @@ -538,6 +541,12 @@ public void shutdown() { log.error("Exception while waiting for pull query metrics to close", e); } + try { + scalablePushQueryMetrics.ifPresent(ScalablePushQueryMetrics::close); + } catch (final Exception e) { + log.error("Exception while waiting for scalable push query metrics to close", e); + } + localCommands.ifPresent(lc -> { try { lc.close(); @@ -681,7 +690,7 @@ public static KsqlRestApplication buildApplication(final KsqlRestConfig restConf ); } - @SuppressWarnings("checkstyle:MethodLength") + @SuppressWarnings({"checkstyle:JavaNCSS", "checkstyle:MethodLength"}) static KsqlRestApplication buildApplication( final String metricsPrefix, final KsqlRestConfig restConfig, @@ -824,6 +833,13 @@ static KsqlRestApplication buildApplication( Time.SYSTEM)) : Optional.empty(); + final Optional scalablePushQueryMetrics = + ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PUSH_V2_ENABLED) + ? Optional.of(new ScalablePushQueryMetrics( + ksqlEngine.getServiceId(), + ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS), + Time.SYSTEM)) + : Optional.empty(); final HARouting pullQueryRouting = new HARouting( routingFilterFactory, pullQueryMetrics, ksqlConfig); @@ -843,6 +859,7 @@ static KsqlRestApplication buildApplication( errorHandler, denyListPropertyValidator, pullQueryMetrics, + scalablePushQueryMetrics, routingFilterFactory, pullQueryRateLimiter, pullQueryConcurrencyLimiter, @@ -923,6 +940,7 @@ static KsqlRestApplication buildApplication( vertx, denyListPropertyValidator, pullQueryMetrics, + scalablePushQueryMetrics, routingFilterFactory, pullQueryRateLimiter, pullQueryConcurrencyLimiter, @@ -956,9 +974,7 @@ private static Optional initializeHeartbeatAgent( .threadPoolSize(restConfig.getInt( KsqlRestConfig.KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG)); - if (lagReportingAgent.isPresent()) { - builder.addHostStatusListener(lagReportingAgent.get()); - } + lagReportingAgent.ifPresent(builder::addHostStatusListener); return Optional.of(builder.build(ksqlEngine, serviceContext)); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index c81e0ed89401..0e12e2209e1d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -32,6 +32,7 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.physical.pull.HARouting; import io.confluent.ksql.physical.scalablepush.PushRouting; import io.confluent.ksql.rest.EndpointResponse; @@ -89,6 +90,7 @@ public class KsqlServerEndpoints implements Endpoints { private final ServerMetadataResource serverMetadataResource; private final WSQueryEndpoint wsQueryEndpoint; private final Optional pullQueryMetrics; + private final Optional scalablePushQueryMetrics; private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; private final SlidingWindowRateLimiter pullBandRateLimiter; @@ -116,6 +118,7 @@ public KsqlServerEndpoints( final ServerMetadataResource serverMetadataResource, final WSQueryEndpoint wsQueryEndpoint, final Optional pullQueryMetrics, + final Optional scalablePushQueryMetrics, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, final SlidingWindowRateLimiter pullBandRateLimiter, @@ -143,6 +146,7 @@ public KsqlServerEndpoints( this.serverMetadataResource = Objects.requireNonNull(serverMetadataResource); this.wsQueryEndpoint = Objects.requireNonNull(wsQueryEndpoint); this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics); + this.scalablePushQueryMetrics = Objects.requireNonNull(scalablePushQueryMetrics); this.rateLimiter = Objects.requireNonNull(rateLimiter); this.pullConcurrencyLimiter = pullConcurrencyLimiter; this.pullBandRateLimiter = Objects.requireNonNull(pullBandRateLimiter); @@ -167,8 +171,8 @@ public CompletableFuture createQueryPublisher(final String sql, try { return new QueryEndpoint( ksqlEngine, ksqlConfig, ksqlRestConfig, routingFilterFactory, pullQueryMetrics, - rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, scalablePushBandRateLimiter, - routing, pushRouting, localCommands) + scalablePushQueryMetrics, rateLimiter, pullConcurrencyLimiter, pullBandRateLimiter, + scalablePushBandRateLimiter, routing, pushRouting, localCommands) .createQueryPublisher( sql, properties, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index b50a47c03fd5..7d75690ecfb7 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -33,8 +33,6 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.physical.pull.HARouting; import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType; import io.confluent.ksql.physical.pull.PullQueryResult; import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.server.resources.streaming.Flow.Subscriber; @@ -44,6 +42,8 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KeyValue; import io.confluent.ksql.util.KsqlConstants.KsqlQueryType; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -155,7 +155,7 @@ public synchronized void subscribe(final Subscriber> sub private void recordMetrics( final PullQueryExecutorMetrics metrics, final PullQueryResult result) { - final PullSourceType sourceType = result.getSourceType(); + final QuerySourceType sourceType = result.getSourceType(); final PullPhysicalPlanType planType = result.getPlanType(); final RoutingNodeType routingNodeType = result.getRoutingNodeType(); // Note: we are not recording response size in this case because it is not diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java index 19c84fc635e2..38f280e07b31 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryConfigRoutingOptions.java @@ -47,4 +47,12 @@ public boolean getExpectingStartOfRegistryData() { } return KsqlRequestConfig.KSQL_REQUEST_QUERY_PUSH_REGISTRY_START_DEFAULT; } + + @Override + public boolean getIsDebugRequest() { + if (requestProperties.containsKey(KsqlRequestConfig.KSQL_DEBUG_REQUEST)) { + return (Boolean) requestProperties.get(KsqlRequestConfig.KSQL_DEBUG_REQUEST); + } + return KsqlRequestConfig.KSQL_DEBUG_REQUEST_DEFAULT; + } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java index d02f72c55e82..3b347adb8588 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java @@ -24,6 +24,7 @@ import io.confluent.ksql.analyzer.ImmutableAnalysis; import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.physical.scalablepush.PushRouting; import io.confluent.ksql.physical.scalablepush.PushRoutingOptions; @@ -36,6 +37,8 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KeyValue; import io.confluent.ksql.util.KsqlConstants.KsqlQueryType; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; import io.confluent.ksql.util.PushQueryMetadata; import io.confluent.ksql.util.ScalablePushQueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; @@ -87,6 +90,8 @@ public static PushQueryPublisher createScalablePushQueryPublisher( final ImmutableAnalysis analysis, final PushRouting pushRouting, final Context context, + final Optional scalablePushQueryMetrics, + final long startTimeNanos, final SlidingWindowRateLimiter scalablePushBandRateLimiter ) { final PushRoutingOptions routingOptions = new PushQueryConfigRoutingOptions( @@ -99,10 +104,23 @@ public static PushQueryPublisher createScalablePushQueryPublisher( scalablePushBandRateLimiter.allow(KsqlQueryType.PUSH); - final ScalablePushQueryMetadata pushQueryMetadata = ksqlEngine - .executeScalablePushQuery(analysis, serviceContext, query, pushRouting, routingOptions, - plannerOptions, context); - pushQueryMetadata.prepare(); + ScalablePushQueryMetadata pushQueryMetadata = null; + try { + pushQueryMetadata = ksqlEngine + .executeScalablePushQuery(analysis, serviceContext, query, pushRouting, + routingOptions, plannerOptions, context, scalablePushQueryMetrics); + + final ScalablePushQueryMetadata finalPushQueryMetadata = pushQueryMetadata; + pushQueryMetadata.onCompletionOrException((v, throwable) -> + scalablePushQueryMetrics.ifPresent( + m -> recordMetrics(m, finalPushQueryMetadata, startTimeNanos))); + pushQueryMetadata.prepare(); + } catch (Throwable t) { + if (pushQueryMetadata == null) { + scalablePushQueryMetrics.ifPresent(m -> recordErrorMetrics(m, startTimeNanos)); + } + throw t; + } return new PushQueryPublisher(exec, pushQueryMetadata); } @@ -119,6 +137,29 @@ public synchronized void subscribe(final Flow.Subscriber subscriber.onSubscribe(subscription); } + + private static void recordMetrics( + final ScalablePushQueryMetrics metrics, final ScalablePushQueryMetadata metadata, + final long startTimeNanos) { + + final QuerySourceType sourceType = metadata.getSourceType(); + final RoutingNodeType routingNodeType = metadata.getRoutingNodeType(); + // Note: we are not recording response size in this case because it is not + // accessible in the websocket endpoint. + metrics.recordConnectionDuration(startTimeNanos, sourceType, routingNodeType); + metrics.recordRowsReturned(metadata.getTotalRowsReturned(), + sourceType, routingNodeType); + metrics.recordRowsProcessed(metadata.getTotalRowsProcessed(), + sourceType, routingNodeType); + } + + private static void recordErrorMetrics( + final ScalablePushQueryMetrics metrics, final long startTimeNanos) { + metrics.recordConnectionDurationForError(startTimeNanos); + metrics.recordZeroRowsReturnedForError(); + metrics.recordZeroRowsProcessedForError(); + } + static class PushQuerySubscription extends PollingSubscription> { private final PushQueryMetadata queryMetadata; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 79cfabbb558a..a5b9e39caafb 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -30,6 +30,7 @@ import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; import io.confluent.ksql.execution.streams.RoutingOptions; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.logging.query.QueryLogger; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; @@ -53,8 +54,8 @@ import io.confluent.ksql.rest.util.CommandStoreUtil; import io.confluent.ksql.rest.util.ConcurrencyLimiter; import io.confluent.ksql.rest.util.ConcurrencyLimiter.Decrementer; -import io.confluent.ksql.rest.util.PullQueryMetricsUtil; import io.confluent.ksql.rest.util.QueryCapacityUtil; +import io.confluent.ksql.rest.util.QueryMetricsUtil; import io.confluent.ksql.rest.util.ScalablePushUtil; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.security.KsqlSecurityContext; @@ -102,6 +103,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private final Errors errorHandler; private final DenyListPropertyValidator denyListPropertyValidator; private final Optional pullQueryMetrics; + private final Optional scalablePushQueryMetrics; private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; private final ConcurrencyLimiter concurrencyLimiter; @@ -126,6 +128,7 @@ public StreamedQueryResource( final Errors errorHandler, final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, + final Optional scalablePushQueryMetrics, final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter concurrencyLimiter, @@ -147,6 +150,7 @@ public StreamedQueryResource( errorHandler, denyListPropertyValidator, pullQueryMetrics, + scalablePushQueryMetrics, routingFilterFactory, rateLimiter, concurrencyLimiter, @@ -173,6 +177,7 @@ public StreamedQueryResource( final Errors errorHandler, final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, + final Optional scalablePushQueryMetrics, final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter concurrencyLimiter, @@ -197,6 +202,8 @@ public StreamedQueryResource( this.denyListPropertyValidator = Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics"); + this.scalablePushQueryMetrics = + Objects.requireNonNull(scalablePushQueryMetrics, "scalablePushQueryMetrics"); this.routingFilterFactory = Objects.requireNonNull(routingFilterFactory, "routingFilterFactory"); this.rateLimiter = Objects.requireNonNull(rateLimiter, "rateLimiter"); @@ -318,7 +325,11 @@ private EndpointResponse handleStatement( } } + // CHECKSTYLE_RULES.OFF: MethodLength + // CHECKSTYLE_RULES.OFF: JavaNCSS private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, + // CHECKSTYLE_RULES.ON: MethodLength + // CHECKSTYLE_RULES.ON: JavaNCSS final KsqlRequest request, final PreparedStatement statement, final CompletableFuture connectionClosedFuture, @@ -352,7 +363,7 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, case KTABLE: { // First thing, set the metrics callback so that it gets called, even if we hit an error final AtomicReference resultForMetrics = new AtomicReference<>(null); - metricsCallbackHolder.setCallback(PullQueryMetricsUtil.initializeTableMetricsCallback( + metricsCallbackHolder.setCallback(QueryMetricsUtil.initializePullTableMetricsCallback( pullQueryMetrics, pullBandRateLimiter, resultForMetrics)); final SessionConfig sessionConfig = SessionConfig.of(ksqlConfig, configProperties); @@ -376,7 +387,7 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, new AtomicReference<>(null); final AtomicReference refDecrementer = new AtomicReference<>(null); metricsCallbackHolder.setCallback( - PullQueryMetricsUtil.initializeStreamMetricsCallback( + QueryMetricsUtil.initializePullStreamMetricsCallback( pullQueryMetrics, pullBandRateLimiter, analysis, resultForMetrics, refDecrementer)); @@ -402,6 +413,11 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, } else if (ScalablePushUtil .isScalablePushQuery(statement.getStatement(), ksqlEngine, ksqlConfig, configProperties)) { + // First thing, set the metrics callback so that it gets called, even if we hit an error + final AtomicReference resultForMetrics = + new AtomicReference<>(null); + metricsCallbackHolder.setCallback(QueryMetricsUtil.initializeScalablePushMetricsCallback( + scalablePushQueryMetrics, scalablePushBandRateLimiter, resultForMetrics)); final ImmutableAnalysis analysis = ksqlEngine .analyzeQueryWithNoOutputTopic( @@ -412,9 +428,6 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, QueryLogger.info("Scalable push query created", statement.getStatementText()); - metricsCallbackHolder.setCallback((statusCode, requestBytes, responseBytes, startTimeNanos) -> - scalablePushBandRateLimiter.add(responseBytes)); - return handleScalablePushQuery( analysis, securityContext.getServiceContext(), @@ -423,7 +436,8 @@ private EndpointResponse handleQuery(final KsqlSecurityContext securityContext, request.getRequestProperties(), connectionClosedFuture, context, - scalablePushBandRateLimiter + scalablePushBandRateLimiter, + resultForMetrics ); } else { // log validated statements for query anonymization @@ -515,7 +529,8 @@ private EndpointResponse handleScalablePushQuery( final Map requestProperties, final CompletableFuture connectionClosedFuture, final Context context, - final SlidingWindowRateLimiter scalablePushBandRateLimiter + final SlidingWindowRateLimiter scalablePushBandRateLimiter, + final AtomicReference resultForMetrics ) { final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); @@ -530,8 +545,9 @@ private EndpointResponse handleScalablePushQuery( final ScalablePushQueryMetadata query = ksqlEngine .executeScalablePushQuery(analysis, serviceContext, configured, pushRouting, routingOptions, - plannerOptions, context); + plannerOptions, context, scalablePushQueryMetrics); query.prepare(); + resultForMetrics.set(query); final QueryStreamWriter queryStreamWriter = new QueryStreamWriter( query, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 0d0fe8dc33a2..83a03b0c88f1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -29,6 +29,7 @@ import io.confluent.ksql.engine.PullQueryExecutionUtil; import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.PrintTopic; @@ -93,6 +94,7 @@ public class WSQueryEndpoint { private final Errors errorHandler; private final DenyListPropertyValidator denyListPropertyValidator; private final Optional pullQueryMetrics; + private final Optional scalablePushQueryMetrics; private final RoutingFilterFactory routingFilterFactory; private final RateLimiter rateLimiter; private final ConcurrencyLimiter pullConcurrencyLimiter; @@ -116,6 +118,7 @@ public WSQueryEndpoint( final Errors errorHandler, final DenyListPropertyValidator denyListPropertyValidator, final Optional pullQueryMetrics, + final Optional scalablePushQueryMetrics, final RoutingFilterFactory routingFilterFactory, final RateLimiter rateLimiter, final ConcurrencyLimiter pullConcurrencyLimiter, @@ -141,6 +144,8 @@ public WSQueryEndpoint( this.denyListPropertyValidator = Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator"); this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics"); + this.scalablePushQueryMetrics = + Objects.requireNonNull(scalablePushQueryMetrics, "scalablePushQueryMetrics"); this.routingFilterFactory = Objects.requireNonNull( routingFilterFactory, "routingFilterFactory"); this.rateLimiter = Objects.requireNonNull(rateLimiter, "rateLimiter"); @@ -374,6 +379,8 @@ private void handleQuery(final RequestContext info, final Query query, analysis, pushRouting, context, + scalablePushQueryMetrics, + startTimeNanos, scalablePushBandRateLimiter ).subscribe(streamSubscriber); } else { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/PullQueryMetricsUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryMetricsUtil.java similarity index 69% rename from ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/PullQueryMetricsUtil.java rename to ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryMetricsUtil.java index d85b3ba77254..72ac1605f19e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/PullQueryMetricsUtil.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/QueryMetricsUtil.java @@ -19,24 +19,26 @@ import io.confluent.ksql.api.server.MetricsCallback; import io.confluent.ksql.api.server.SlidingWindowRateLimiter; import io.confluent.ksql.internal.PullQueryExecutorMetrics; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType; import io.confluent.ksql.physical.pull.PullQueryResult; import io.confluent.ksql.query.TransientQueryQueue; import io.confluent.ksql.rest.util.ConcurrencyLimiter.Decrementer; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; +import io.confluent.ksql.util.ScalablePushQueryMetadata; import io.confluent.ksql.util.StreamPullQueryMetadata; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; -public final class PullQueryMetricsUtil { +public final class QueryMetricsUtil { - private PullQueryMetricsUtil() { + private QueryMetricsUtil() { } - public static MetricsCallback initializeTableMetricsCallback( + public static MetricsCallback initializePullTableMetricsCallback( final Optional pullQueryMetrics, final SlidingWindowRateLimiter pullBandRateLimiter, final AtomicReference resultForMetrics) { @@ -51,7 +53,7 @@ public static MetricsCallback initializeTableMetricsCallback( if (r == null) { recordErrorMetrics(pullQueryMetrics, responseBytes, startTimeNanos); } else { - final PullSourceType sourceType = r.getSourceType(); + final QuerySourceType sourceType = r.getSourceType(); final PullPhysicalPlanType planType = r.getPlanType(); final RoutingNodeType routingNodeType = RoutingNodeType.SOURCE_NODE; metrics.recordResponseSize( @@ -79,7 +81,7 @@ public static MetricsCallback initializeTableMetricsCallback( return metricsCallback; } - public static MetricsCallback initializeStreamMetricsCallback( + public static MetricsCallback initializePullStreamMetricsCallback( final Optional pullQueryMetrics, final SlidingWindowRateLimiter pullBandRateLimiter, final ImmutableAnalysis analysis, @@ -106,8 +108,8 @@ public static MetricsCallback initializeStreamMetricsCallback( .getDataSource() .getKsqlTopic() .getKeyFormat().isWindowed(); - final PullSourceType sourceType = isWindowed - ? PullSourceType.WINDOWED_STREAM : PullSourceType.NON_WINDOWED_STREAM; + final QuerySourceType sourceType = isWindowed + ? QuerySourceType.WINDOWED_STREAM : QuerySourceType.NON_WINDOWED_STREAM; // There is no WHERE clause constraint information in the persistent logical plan final PullPhysicalPlanType planType = PullPhysicalPlanType.UNKNOWN; final RoutingNodeType routingNodeType = RoutingNodeType.SOURCE_NODE; @@ -143,6 +145,48 @@ public static MetricsCallback initializeStreamMetricsCallback( return metricsCallback; } + public static MetricsCallback initializeScalablePushMetricsCallback( + final Optional scalablePushQueryMetrics, + final SlidingWindowRateLimiter scalablePushBandRateLimiter, + final AtomicReference resultForMetrics) { + + final MetricsCallback metricsCallback = + (statusCode, requestBytes, responseBytes, startTimeNanos) -> + scalablePushQueryMetrics.ifPresent(metrics -> { + metrics.recordStatusCode(statusCode); + metrics.recordRequestSize(requestBytes); + final ScalablePushQueryMetadata r = resultForMetrics.get(); + if (r == null) { + metrics.recordResponseSizeForError(responseBytes); + metrics.recordConnectionDurationForError(startTimeNanos); + metrics.recordZeroRowsReturnedForError(); + metrics.recordZeroRowsProcessedForError(); + } else { + final QuerySourceType sourceType = r.getSourceType(); + final RoutingNodeType routingNodeType = r.getRoutingNodeType(); + metrics.recordResponseSize( + responseBytes, + sourceType, + routingNodeType + ); + metrics.recordConnectionDuration( + startTimeNanos, + sourceType, + routingNodeType + ); + metrics.recordRowsReturned( + r.getTotalRowsReturned(), + sourceType, routingNodeType); + metrics.recordRowsProcessed( + r.getTotalRowsProcessed(), + sourceType, routingNodeType); + } + scalablePushBandRateLimiter.add(responseBytes); + }); + + return metricsCallback; + } + private static void recordErrorMetrics( final Optional pullQueryMetrics, final long responseBytes, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 47e6c7f124be..272aa51bd093 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -495,6 +495,7 @@ private void givenAppWithRestConfig(final Map restConfigMap) { vertx, denyListPropertyValidator, Optional.empty(), + Optional.empty(), routingFilterFactory, rateLimiter, concurrencyLimiter, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryMetricsTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryMetricsTest.java index a77d2b433527..0584ac16834e 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryMetricsTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryMetricsTest.java @@ -15,6 +15,10 @@ package io.confluent.ksql.rest.server.execution; +import static io.confluent.ksql.util.KsqlConstants.KSQL_QUERY_PLAN_TYPE_TAG; +import static io.confluent.ksql.util.KsqlConstants.KSQL_QUERY_ROUTING_TYPE_TAG; +import static io.confluent.ksql.util.KsqlConstants.KSQL_QUERY_SOURCE_TAG; +import static io.confluent.ksql.util.KsqlConstants.KSQL_SERVICE_ID_METRICS_TAG; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; @@ -28,12 +32,10 @@ import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType; -import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType; -import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; import io.confluent.ksql.util.ReservedInternalTopics; import java.util.Map; -import java.util.Random; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.junit.After; @@ -51,7 +53,7 @@ public class PullQueryMetricsTest { private static final Map CUSTOM_TAGS = ImmutableMap .of("tag1", "value1", "tag2", "value2"); private static final Map CUSTOM_TAGS_WITH_SERVICE_ID = ImmutableMap - .of("tag1", "value1", "tag2", "value2", KsqlConstants.KSQL_SERVICE_ID_METRICS_TAG, KSQL_SERVICE_ID); + .of("tag1", "value1", "tag2", "value2", KSQL_SERVICE_ID_METRICS_TAG, KSQL_SERVICE_ID); @Mock private KsqlEngine ksqlEngine; @@ -123,7 +125,7 @@ public void shouldRecordNumberOfRemoteRequests() { @Test public void shouldRecordErrorRate() { // Given: - pullMetrics.recordErrorRate(3, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordErrorRate(3, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // When: @@ -132,7 +134,7 @@ public void shouldRecordErrorRate() { final double legacyValue = getMetricValueLegacy("-error-total"); final double legacyRate = getMetricValueLegacy("-error-rate"); final double detailedValue = getMetricValue("-detailed-error-total", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // Then: assertThat(value, equalTo(1.0)); @@ -145,13 +147,13 @@ public void shouldRecordErrorRate() { @Test public void shouldRecordResponseSize() { // Given: - pullMetrics.recordResponseSize(1500, PullSourceType.NON_WINDOWED, + pullMetrics.recordResponseSize(1500, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // When: final double value = getMetricValue("-response-size"); final double detailedValue = getMetricValue("-detailed-response-size", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // Then: assertThat(value, equalTo(1500.0)); @@ -161,11 +163,11 @@ public void shouldRecordResponseSize() { @Test public void shouldRecordRequestRate() { // Given: - pullMetrics.recordLatency(3000, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(3000, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); - pullMetrics.recordLatency(3000, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(3000, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); - pullMetrics.recordLatency(3000, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(3000, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // When: @@ -180,7 +182,7 @@ public void shouldRecordRequestRate() { @Test public void shouldRecordLatency() { // Given: - pullMetrics.recordLatency(3000, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(3000, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // When: @@ -193,13 +195,13 @@ public void shouldRecordLatency() { final double legacyMin = getMetricValueLegacy("-latency-min"); final double legacyTotal = getMetricValueLegacy("-total"); final double detailedAvg = getMetricValue("-detailed-latency-avg", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); final double detailedMax = getMetricValue("-detailed-latency-max", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); final double detailedMin = getMetricValue("-detailed-latency-min", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); final double detailedTotal = getMetricValue("-detailed-total", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // Then: assertThat(avg, is(3.0)); @@ -220,26 +222,26 @@ public void shouldRecordLatency() { public void shouldRecordLatencyPercentiles() { // Given: when(time.nanoseconds()).thenReturn(600000000L); - pullMetrics.recordLatency(100000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(100000000L, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); - pullMetrics.recordLatency(200000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(200000000L, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); - pullMetrics.recordLatency(300000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(300000000L, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); - pullMetrics.recordLatency(400000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(400000000L, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); - pullMetrics.recordLatency(500000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, + pullMetrics.recordLatency(500000000L, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // When: final double detailed50 = getMetricValue("-detailed-distribution-50", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); final double detailed75 = getMetricValue("-detailed-distribution-75", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); final double detailed90 = getMetricValue("-detailed-distribution-90", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); final double detailed99 = getMetricValue("-detailed-distribution-99", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // Then: assertThat(detailed50, closeTo(297857.85, 0.1)); @@ -272,12 +274,12 @@ public void shouldRecordStatus() { @Test public void shouldRecordRowsReturned() { // Given: - pullMetrics.recordRowsReturned(12, PullSourceType.NON_WINDOWED, + pullMetrics.recordRowsReturned(12, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // When: final double detailedValue = getMetricValue("-rows-returned-total", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // Then: assertThat(detailedValue, equalTo(12.0)); @@ -286,12 +288,12 @@ public void shouldRecordRowsReturned() { @Test public void shouldRecordRowsProcessed() { // Given: - pullMetrics.recordRowsProcessed(1399, PullSourceType.NON_WINDOWED, + pullMetrics.recordRowsProcessed(1399, QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // When: final double detailedValue = getMetricValue("-rows-processed-total", - PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); + QuerySourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE); // Then: assertThat(detailedValue, equalTo(1399.0)); @@ -311,16 +313,16 @@ private double getMetricValue(final String metricName) { private double getMetricValue( final String metricName, - final PullSourceType sourceType, + final QuerySourceType sourceType, final PullPhysicalPlanType planType, final RoutingNodeType routingNodeType ) { final Metrics metrics = pullMetrics.getMetrics(); final Map tags = ImmutableMap.builder() .putAll(CUSTOM_TAGS_WITH_SERVICE_ID) - .put(KsqlConstants.KSQL_QUERY_SOURCE_TAG, sourceType.name().toLowerCase()) - .put(KsqlConstants.KSQL_QUERY_PLAN_TYPE_TAG, planType.name().toLowerCase()) - .put(KsqlConstants.KSQL_QUERY_ROUTING_TYPE_TAG, routingNodeType.name().toLowerCase()) + .put(KSQL_QUERY_SOURCE_TAG, sourceType.name().toLowerCase()) + .put(KSQL_QUERY_PLAN_TYPE_TAG, planType.name().toLowerCase()) + .put(KSQL_QUERY_ROUTING_TYPE_TAG, routingNodeType.name().toLowerCase()) .build(); return Double.parseDouble( metrics.metric( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ScalablePushQueryMetricsTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ScalablePushQueryMetricsTest.java new file mode 100644 index 000000000000..98c9ae48749e --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ScalablePushQueryMetricsTest.java @@ -0,0 +1,291 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.execution; + +import static io.confluent.ksql.util.KsqlConstants.KSQL_QUERY_ROUTING_TYPE_TAG; +import static io.confluent.ksql.util.KsqlConstants.KSQL_QUERY_SOURCE_TAG; +import static io.confluent.ksql.util.KsqlConstants.KSQL_SERVICE_ID_METRICS_TAG; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.internal.ScalablePushQueryMetrics; +import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.util.KsqlConstants.QuerySourceType; +import io.confluent.ksql.util.KsqlConstants.RoutingNodeType; +import io.confluent.ksql.util.ReservedInternalTopics; +import java.util.Map; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ScalablePushQueryMetricsTest { + + private ScalablePushQueryMetrics scalablePushQueryMetrics; + private static final String KSQL_SERVICE_ID = "test-ksql-service-id"; + private static final Map CUSTOM_TAGS = ImmutableMap + .of("tag1", "value1", "tag2", "value2"); + private static final Map CUSTOM_TAGS_WITH_SERVICE_ID = ImmutableMap + .of("tag1", "value1", "tag2", "value2", KSQL_SERVICE_ID_METRICS_TAG, KSQL_SERVICE_ID); + + @Mock + private KsqlEngine ksqlEngine; + + @Mock + private Time time; + + @Before + public void setUp() { + MetricCollectors.initialize(); + when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID); + when(time.nanoseconds()).thenReturn(6000L); + + scalablePushQueryMetrics = new ScalablePushQueryMetrics(ksqlEngine.getServiceId(), CUSTOM_TAGS, time); + } + + @After + public void tearDown() { + scalablePushQueryMetrics.close(); + MetricCollectors.cleanUp(); + } + + @Test + public void shouldRemoveAllSensorsOnClose() { + assertTrue(scalablePushQueryMetrics.getSensors().size() > 0); + + scalablePushQueryMetrics.close(); + + scalablePushQueryMetrics.getSensors().forEach( + sensor -> assertThat(scalablePushQueryMetrics.getMetrics().getSensor(sensor.name()), is(nullValue()))); + } + + @Test + public void shouldRecordNumberOfLocalRequests() { + // Given: + scalablePushQueryMetrics.recordLocalRequests(3); + + // When: + final double value = getMetricValue("-local-count"); + final double rate = getMetricValue("-local-rate"); + + // Then: + assertThat(value, equalTo(1.0)); + assertThat(rate, closeTo(0.03, 0.001)); + } + + @Test + public void shouldRecordNumberOfRemoteRequests() { + // Given: + scalablePushQueryMetrics.recordRemoteRequests(3); + + // When: + final double value = getMetricValue("-remote-count"); + final double rate = getMetricValue("-remote-rate"); + + // Then: + assertThat(value, equalTo(1.0)); + assertThat(rate, closeTo(0.03, 0.001)); + } + + @Test + public void shouldRecordErrorRate() { + // Given: + scalablePushQueryMetrics.recordErrorRate(3, QuerySourceType.NON_WINDOWED, + RoutingNodeType.SOURCE_NODE); + + // When: + final double value = getMetricValue("-error-total"); + final double rate = getMetricValue("-error-rate"); + final double detailedValue = getMetricValue("-detailed-error-total", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // Then: + assertThat(value, equalTo(1.0)); + assertThat(rate, closeTo(0.03, 0.001)); + assertThat(detailedValue, equalTo(1.0)); + } + + @Test + public void shouldRecordResponseSize() { + // Given: + scalablePushQueryMetrics.recordResponseSize(1500, QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // When: + final double value = getMetricValue("-response-size"); + final double detailedValue = getMetricValue("-detailed-response-size", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // Then: + assertThat(value, equalTo(1500.0)); + assertThat(detailedValue, equalTo(1500.0)); + } + + @Test + public void shouldRecordConnectionDuration() { + // Given: + scalablePushQueryMetrics.recordConnectionDuration(3000, QuerySourceType.NON_WINDOWED, + RoutingNodeType.SOURCE_NODE); + + // When: + final double avg = getMetricValue("-connection-duration-avg"); + final double max = getMetricValue("-connection-duration-max"); + final double min = getMetricValue("-connection-duration-min"); + final double total = getMetricValue("-total"); + final double detailedAvg = getMetricValue("-detailed-connection-duration-avg", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + final double detailedMax = getMetricValue("-detailed-connection-duration-max", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + final double detailedMin = getMetricValue("-detailed-connection-duration-min", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + final double detailedTotal = getMetricValue("-detailed-total", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // Then: + assertThat(avg, is(3.0)); + assertThat(min, is(3.0)); + assertThat(max, is(3.0)); + assertThat(total, is(1.0)); + assertThat(detailedAvg, is(3.0)); + assertThat(detailedMin, is(3.0)); + assertThat(detailedMax, is(3.0)); + assertThat(detailedTotal, is(1.0)); + } + + @Test + public void shouldRecordConnectionDurationPercentiles() { + // Given: + when(time.nanoseconds()).thenReturn(600000000L); + scalablePushQueryMetrics.recordConnectionDuration(100000000L, QuerySourceType.NON_WINDOWED, + RoutingNodeType.SOURCE_NODE); + scalablePushQueryMetrics.recordConnectionDuration(200000000L, QuerySourceType.NON_WINDOWED, + RoutingNodeType.SOURCE_NODE); + scalablePushQueryMetrics.recordConnectionDuration(300000000L, QuerySourceType.NON_WINDOWED, + RoutingNodeType.SOURCE_NODE); + scalablePushQueryMetrics.recordConnectionDuration(400000000L, QuerySourceType.NON_WINDOWED, + RoutingNodeType.SOURCE_NODE); + scalablePushQueryMetrics.recordConnectionDuration(500000000L, QuerySourceType.NON_WINDOWED, + RoutingNodeType.SOURCE_NODE); + + // When: + final double detailed50 = getMetricValue("-detailed-distribution-50", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + final double detailed75 = getMetricValue("-detailed-distribution-75", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + final double detailed90 = getMetricValue("-detailed-distribution-90", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + final double detailed99 = getMetricValue("-detailed-distribution-99", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // Then: + assertThat(detailed50, closeTo(297857.85, 0.1)); + assertThat(detailed75, closeTo(398398.39, 0.1)); + assertThat(detailed90, closeTo(495555.55, 0.1)); + assertThat(detailed99, closeTo(495555.55, 0.1)); + } + + @Test + public void shouldRecordStatus() { + // Given: + scalablePushQueryMetrics.recordStatusCode(200); + scalablePushQueryMetrics.recordStatusCode(200); + scalablePushQueryMetrics.recordStatusCode(401); + scalablePushQueryMetrics.recordStatusCode(401); + scalablePushQueryMetrics.recordStatusCode(401); + scalablePushQueryMetrics.recordStatusCode(502); + + // When: + final double total2XX = getMetricValue("-2XX-total"); + final double total4XX = getMetricValue("-4XX-total"); + final double total5XX = getMetricValue("-5XX-total"); + + // Then: + assertThat(total2XX, is(2.0)); + assertThat(total4XX, is(3.0)); + assertThat(total5XX, is(1.0)); + } + + @Test + public void shouldRecordRowsReturned() { + // Given: + scalablePushQueryMetrics.recordRowsReturned(12, QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // When: + final double detailedValue = getMetricValue("-rows-returned-total", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // Then: + assertThat(detailedValue, equalTo(12.0)); + } + + @Test + public void shouldRecordRowsProcessed() { + // Given: + scalablePushQueryMetrics.recordRowsProcessed(1399, QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // When: + final double detailedValue = getMetricValue("-rows-processed-total", + QuerySourceType.NON_WINDOWED, RoutingNodeType.SOURCE_NODE); + + // Then: + assertThat(detailedValue, equalTo(1399.0)); + } + + private double getMetricValue(final String metricName) { + final Metrics metrics = scalablePushQueryMetrics.getMetrics(); + return Double.parseDouble( + metrics.metric( + metrics.metricName( + "scalable-push-query-requests" + metricName, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "scalable-push-query", + CUSTOM_TAGS_WITH_SERVICE_ID) + ).metricValue().toString() + ); + } + + private double getMetricValue( + final String metricName, + final QuerySourceType sourceType, + final RoutingNodeType routingNodeType + ) { + final Metrics metrics = scalablePushQueryMetrics.getMetrics(); + final Map tags = ImmutableMap.builder() + .putAll(CUSTOM_TAGS_WITH_SERVICE_ID) + .put(KSQL_QUERY_SOURCE_TAG, sourceType.name().toLowerCase()) + .put(KSQL_QUERY_ROUTING_TYPE_TAG, routingNodeType.name().toLowerCase()) + .build(); + return Double.parseDouble( + metrics.metric( + metrics.metricName( + "scalable-push-query-requests" + metricName, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "scalable-push-query", + tags) + ).metricValue().toString() + ); + } + +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java index 7e4f8170737e..456954d01fee 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/WSQueryEndpointTest.java @@ -84,6 +84,7 @@ public void setUp() { mock(Errors.class), denyListPropertyValidator, Optional.empty(), + Optional.empty(), mock(RoutingFilterFactory.class), mock(RateLimiter.class), mock(ConcurrencyLimiter.class), diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 0d1c7d63e7c2..791f5810af57 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -286,6 +286,7 @@ public void setup() { errorsHandler, denyListPropertyValidator, Optional.empty(), + Optional.empty(), routingFilterFactory, rateLimiter, concurrencyLimiter, @@ -390,6 +391,7 @@ public void shouldRateLimit() { errorsHandler, denyListPropertyValidator, Optional.empty(), + Optional.empty(), routingFilterFactory, pullQueryRateLimiter, concurrencyLimiter, @@ -435,6 +437,7 @@ public void shouldRateLimitStreamPullQueries() { errorsHandler, denyListPropertyValidator, Optional.empty(), + Optional.empty(), routingFilterFactory, pullQueryRateLimiter, concurrencyLimiter, @@ -573,6 +576,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { errorsHandler, denyListPropertyValidator, Optional.empty(), + Optional.empty(), routingFilterFactory, rateLimiter, concurrencyLimiter, @@ -758,6 +762,7 @@ public void shouldThrowOnDenyListedStreamProperty() { errorsHandler, denyListPropertyValidator, Optional.empty(), + Optional.empty(), routingFilterFactory, rateLimiter, concurrencyLimiter,