Skip to content

Commit

Permalink
feat: add detailed scalable push query metrics with type breakdown (#…
Browse files Browse the repository at this point in the history
…8178)

* feat: add scalable push query metrics

* feat: add skeleton for spq metrics, ws endpoint

* feat: add spq metrics on http 1 and 2 endpoints

* fix: typo

* refactor: rename latency metric connection duration

* fix: typo

* fix: rename latency in test

* fix: address feedback

* fix: remove old tests

* test: remove legacy metrics from test

* fix: merge conflicts

* test: fix test

* chore: remove executor from name

* test email change'

* chore: fix auto import style

* chore: fix auto import style

* chore: fix imports

* Trigger Build

* chore: refactor metrics callback

* fix: close metrics

* style: line length

Co-authored-by: natea <[email protected]>
  • Loading branch information
nateab and nae701 authored Sep 30, 2021
1 parent ae8e0c4 commit 561af53
Show file tree
Hide file tree
Showing 34 changed files with 1,569 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public enum KsqlQueryType {
}

public enum PersistentQueryType {
CREATE_SOURCE, CREATE_AS, INSERT
CREATE_SOURCE,
CREATE_AS,
INSERT
}

public enum KsqlQueryStatus {
Expand All @@ -80,4 +82,24 @@ public static String getSRSubject(final String topicName, final boolean isKey) {
public static final String TIME_PATTERN = "HH:mm:ss.SSS";
public static final String DATE_PATTERN = "yyyy-MM-dd";
public static final String DATE_TIME_PATTERN = DATE_PATTERN + "'T'" + TIME_PATTERN;

/**
* The types we consider for metrics purposes. These should only be added to. You can deprecate
* a field, but don't delete it or change its meaning
*/
public enum QuerySourceType {
NON_WINDOWED,
WINDOWED,
NON_WINDOWED_STREAM,
WINDOWED_STREAM
}

/**
* The types we consider for metrics purposes. These should only be added to. You can deprecate
* a field, but don't delete it or change its meaning
*/
public enum RoutingNodeType {
SOURCE_NODE,
REMOTE_NODE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.name.SourceName;
Expand Down Expand Up @@ -184,6 +185,7 @@ PullQueryResult executeTablePullQuery(
* @param pushRouting The push routing object
* @param pushRoutingOptions The options for routing
* @param context The Vertx context of the request
* @param scalablePushQueryMetrics JMX metrics
* @return A ScalablePushQueryMetadata object
*/
ScalablePushQueryMetadata executeScalablePushQuery(
Expand All @@ -193,7 +195,8 @@ ScalablePushQueryMetadata executeScalablePushQuery(
PushRouting pushRouting,
PushRoutingOptions pushRoutingOptions,
QueryPlannerOptions queryPlannerOptions,
Context context
Context context,
Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.MutableMetaStore;
Expand All @@ -60,7 +61,6 @@
import io.confluent.ksql.physical.PhysicalPlan;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType;
import io.confluent.ksql.physical.pull.PullPhysicalPlanBuilder;
import io.confluent.ksql.physical.pull.PullQueryQueuePopulator;
import io.confluent.ksql.physical.pull.PullQueryResult;
Expand Down Expand Up @@ -91,6 +91,7 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlConstants.RoutingNodeType;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
Expand Down Expand Up @@ -231,6 +232,9 @@ PullQueryResult executeTablePullQuery(
throw new IllegalArgumentException("Executor can only handle pull queries");
}
final SessionConfig sessionConfig = statement.getSessionConfig();

// If we ever change how many hops a request can do, we'll need to update this for correct
// metrics.
final RoutingNodeType routingNodeType = routingOptions.getIsSkipForwardRequest()
? RoutingNodeType.REMOTE_NODE : RoutingNodeType.SOURCE_NODE;

Expand All @@ -255,9 +259,6 @@ PullQueryResult executeTablePullQuery(
);
final PullPhysicalPlan physicalPlan = plan;

// If we ever change how many hops a request can do, we'll need to update this for correct
// metrics.

final PullQueryQueue pullQueryQueue = new PullQueryQueue();
final PullQueryQueuePopulator populator = () -> routing.handlePullQuery(
serviceContext,
Expand Down Expand Up @@ -325,19 +326,30 @@ ScalablePushQueryMetadata executeScalablePushQuery(
final PushRouting pushRouting,
final PushRoutingOptions pushRoutingOptions,
final QueryPlannerOptions queryPlannerOptions,
final Context context
final Context context,
final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics
) {
final SessionConfig sessionConfig = statement.getSessionConfig();

// If we ever change how many hops a request can do, we'll need to update this for correct
// metrics.
final RoutingNodeType routingNodeType = pushRoutingOptions.getHasBeenForwarded()
? RoutingNodeType.REMOTE_NODE : RoutingNodeType.SOURCE_NODE;

PushPhysicalPlan plan = null;

try {
final KsqlConfig ksqlConfig = sessionConfig.getConfig(false);
final LogicalPlanNode logicalPlan = buildAndValidateLogicalPlan(
statement, analysis, ksqlConfig, queryPlannerOptions, true);
final PushPhysicalPlan physicalPlan = buildScalablePushPhysicalPlan(
plan = buildScalablePushPhysicalPlan(
logicalPlan,
analysis,
context,
pushRoutingOptions
);
final PushPhysicalPlan physicalPlan = plan;

final TransientQueryQueue transientQueryQueue
= new TransientQueryQueue(analysis.getLimitClause());
final PushQueryMetadata.ResultType resultType =
Expand All @@ -348,24 +360,66 @@ ScalablePushQueryMetadata executeScalablePushQuery(

final PushQueryQueuePopulator populator = () ->
pushRouting.handlePushQuery(serviceContext, physicalPlan, statement, pushRoutingOptions,
physicalPlan.getOutputSchema(), transientQueryQueue);
physicalPlan.getOutputSchema(), transientQueryQueue, scalablePushQueryMetrics);
final PushQueryPreparer preparer = () ->
pushRouting.preparePushQuery(physicalPlan, statement, pushRoutingOptions);
final ScalablePushQueryMetadata metadata = new ScalablePushQueryMetadata(
physicalPlan.getOutputSchema(),
physicalPlan.getQueryId(),
transientQueryQueue,
scalablePushQueryMetrics,
resultType,
populator,
preparer
preparer,
physicalPlan.getSourceType(),
routingNodeType,
physicalPlan::getRowsReadFromDataSource
);

return metadata;
} catch (final Exception e) {
if (plan == null) {
scalablePushQueryMetrics.ifPresent(m -> m.recordErrorRateForNoResult(1));
} else {
final PushPhysicalPlan pushPhysicalPlan = plan;
scalablePushQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1,
pushPhysicalPlan.getSourceType(),
routingNodeType
));
}

final String stmtLower = statement.getStatementText().toLowerCase(Locale.ROOT);
final String messageLower = e.getMessage().toLowerCase(Locale.ROOT);
final String stackLower = Throwables.getStackTraceAsString(e).toLowerCase(Locale.ROOT);

// do not include the statement text in the default logs as it may contain sensitive
// information - the exception which is returned to the user below will contain
// the contents of the query
if (messageLower.contains(stmtLower) || stackLower.contains(stmtLower)) {
final StackTraceElement loc = Iterables
.getLast(Throwables.getCausalChain(e))
.getStackTrace()[0];
LOG.error("Failure to execute push query V2 {} {}, not logging the error message since it "
+ "contains the query string, which may contain sensitive information."
+ " If you see this LOG message, please submit a GitHub ticket and"
+ " we will scrub the statement text from the error at {}",
pushRoutingOptions.debugString(),
queryPlannerOptions.debugString(),
loc);
} else {
LOG.error("Failure to execute push query V2. {} {}",
pushRoutingOptions.debugString(),
queryPlannerOptions.debugString(),
e);
}
LOG.debug("Failed push query V2 text {}, {}", statement.getStatementText(), e);

throw new KsqlStatementException(
e.getMessage(),
statement.getStatementText(),
e
e.getMessage() == null
? "Server Error" + Arrays.toString(e.getStackTrace())
: e.getMessage(),
statement.getStatementText(),
e
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.metastore.MetaStore;
Expand Down Expand Up @@ -455,7 +456,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery(
final PushRouting pushRouting,
final PushRoutingOptions pushRoutingOptions,
final QueryPlannerOptions queryPlannerOptions,
final Context context
final Context context,
final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics
) {
final ScalablePushQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getSessionConfig())
Expand All @@ -465,7 +467,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery(
pushRouting,
pushRoutingOptions,
queryPlannerOptions,
context);
context,
scalablePushQueryMetrics);
return query;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
Expand Down Expand Up @@ -207,7 +208,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery(
final PushRouting pushRouting,
final PushRoutingOptions pushRoutingOptions,
final QueryPlannerOptions queryPlannerOptions,
final Context context
final Context context,
final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics
) {
return EngineExecutor.create(
engineContext,
Expand All @@ -219,7 +221,8 @@ public ScalablePushQueryMetadata executeScalablePushQuery(
pushRouting,
pushRoutingOptions,
queryPlannerOptions,
context
context,
scalablePushQueryMetrics
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullPhysicalPlanType;
import io.confluent.ksql.physical.pull.PullPhysicalPlan.PullSourceType;
import io.confluent.ksql.physical.pull.PullPhysicalPlan.RoutingNodeType;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlConstants.QuerySourceType;
import io.confluent.ksql.util.KsqlConstants.RoutingNodeType;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -128,7 +128,7 @@ public void recordRemoteRequests(final double value) {

public void recordLatency(
final long startTimeNanos,
final PullSourceType sourceType,
final QuerySourceType sourceType,
final PullPhysicalPlanType planType,
final RoutingNodeType routingNodeType
) {
Expand Down Expand Up @@ -156,7 +156,7 @@ private void innerRecordLatency(final long startTimeNanos, final MetricsKey key)

public void recordErrorRate(
final double value,
final PullSourceType sourceType,
final QuerySourceType sourceType,
final PullPhysicalPlanType planType,
final RoutingNodeType routingNodeType
) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public void recordRequestSize(final double value) {

public void recordResponseSize(
final double value,
final PullSourceType sourceType,
final QuerySourceType sourceType,
final PullPhysicalPlanType planType,
final RoutingNodeType routingNodeType
) {
Expand Down Expand Up @@ -222,7 +222,7 @@ public void recordStatusCode(final int statusCode) {

public void recordRowsReturned(
final double value,
final PullSourceType sourceType,
final QuerySourceType sourceType,
final PullPhysicalPlanType planType,
final RoutingNodeType routingNodeType
) {
Expand All @@ -245,7 +245,7 @@ public void recordZeroRowsReturnedForError() {

public void recordRowsProcessed(
final double value,
final PullSourceType sourceType,
final QuerySourceType sourceType,
final PullPhysicalPlanType planType,
final RoutingNodeType routingNodeType
) {
Expand Down Expand Up @@ -519,7 +519,7 @@ private void addRequestMetricsToSensor(
sensor,
metricNamePrefix + "-total",
servicePrefix + PULL_QUERY_METRIC_GROUP,
"Total number of pull query request" + descriptionSuffix,
"Total number of pull query requests" + descriptionSuffix,
metricsTags,
new CumulativeCount()
);
Expand Down Expand Up @@ -654,7 +654,7 @@ private void addSensor(
final String groupName,
final String description,
final Map<String, String> metricsTags,
final MeasurableStat measureableStat
final MeasurableStat measurableStat
) {
sensor.add(
metrics.metricName(
Expand All @@ -663,15 +663,15 @@ private void addSensor(
description,
metricsTags
),
measureableStat
measurableStat
);
}

private Map<MetricsKey, Sensor> configureSensorMap(
final String sensorBaseName, final MetricsAdder metricsAdder) {
final ImmutableMap.Builder<MetricsKey, Sensor> builder = ImmutableMap.builder();

for (final PullSourceType sourceType : PullSourceType.values()) {
for (final QuerySourceType sourceType : QuerySourceType.values()) {
for (final PullPhysicalPlanType planType : PullPhysicalPlanType.values()) {
for (final RoutingNodeType routingNodeType : RoutingNodeType.values()) {
addSensorToMap(
Expand Down Expand Up @@ -723,7 +723,7 @@ private interface MetricsAdder {
// Detailed metrics are broken down by multiple parameters represented by the following key.
private static class MetricsKey {

private final PullSourceType sourceType;
private final QuerySourceType sourceType;
private final PullPhysicalPlanType planType;
private final RoutingNodeType routingNodeType;

Expand All @@ -738,7 +738,7 @@ private static class MetricsKey {
}

MetricsKey(
final PullSourceType sourceType,
final QuerySourceType sourceType,
final PullPhysicalPlanType planType,
final RoutingNodeType routingNodeType
) {
Expand Down
Loading

0 comments on commit 561af53

Please sign in to comment.