Skip to content

Commit

Permalink
feat: Add metrics for pull queries endpoint (#4608)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas authored Mar 16, 2020
1 parent c442a62 commit 23e3868
Show file tree
Hide file tree
Showing 11 changed files with 348 additions and 22 deletions.
12 changes: 12 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC = "Timeout in "
+ "milliseconds when waiting for rebalancing of the stream store during a pull query";

public static final String KSQL_QUERY_PULL_METRICS_ENABLED =
"ksql.query.pull.metrics.enabled";
public static final String KSQL_QUERY_PULL_METRICS_ENABLED_DOC =
"Config to enable/disable collecting JMX metrics for pull queries.";

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of();

Expand Down Expand Up @@ -632,6 +637,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
"Feature flag for removing restriction on key names - WIP, do not enable."
)
.define(
KSQL_QUERY_PULL_METRICS_ENABLED,
Type.BOOLEAN,
false,
Importance.LOW,
KSQL_QUERY_PULL_METRICS_ENABLED_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;

public class QueryStreamEndpoint {
Expand Down Expand Up @@ -91,7 +92,8 @@ private QueryPublisher createPushQueryPublisher(final Context context,
private QueryPublisher createPullQueryPublisher(final Context context,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement) {
final TableRowsEntity tableRows = pullQueryExecutor.execute(statement, serviceContext);
final TableRowsEntity tableRows = pullQueryExecutor.execute(
statement, serviceContext, Optional.empty());
return new PullQueryPublisher(context, tableRows, colNamesFromSchema(tableRows.getSchema()),
colTypesFromSchema(tableRows.getSchema()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ public static KsqlRestConfig convertToApiServerConfig(final KsqlRestConfig confi
final List<KsqlServerPrecondition> preconditions,
final List<KsqlConfigurable> configurables,
final Consumer<KsqlConfig> rocksDBConfigSetterHandler,
final PullQueryExecutor pullQueryExecutor,
final Optional<HeartbeatAgent> heartbeatAgent,
final Optional<LagReportingAgent> lagReportingAgent
) {
Expand Down Expand Up @@ -300,12 +301,11 @@ public static KsqlRestConfig convertToApiServerConfig(final KsqlRestConfig confi
this.configurables = requireNonNull(configurables, "configurables");
this.rocksDBConfigSetterHandler =
requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler");
this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor");
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent");

this.routingFilterFactory = initializeRoutingFilterFactory(
ksqlConfigNoPort, heartbeatAgent, lagReportingAgent);
this.pullQueryExecutor = new PullQueryExecutor(ksqlEngine, routingFilterFactory);
}

@Override
Expand Down Expand Up @@ -445,8 +445,14 @@ private void initialize(final KsqlConfig configWithApplicationServer) {
serverState.setReady();
}

@SuppressWarnings("checkstyle:NPathComplexity")
@Override
public void triggerShutdown() {
try {
streamedQueryResource.closeMetrics();
} catch (final Exception e) {
log.error("Exception while waiting for pull query metrics to close", e);
}
try {
ksqlEngine.close();
} catch (final Exception e) {
Expand Down Expand Up @@ -717,7 +723,8 @@ static KsqlRestApplication buildApplication(
heartbeatAgent, lagReportingAgent);

final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(
ksqlEngine, routingFilterFactory);
ksqlEngine, routingFilterFactory, ksqlConfig);

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
commandStore,
Expand Down Expand Up @@ -792,6 +799,7 @@ static KsqlRestApplication buildApplication(
preconditions,
configurables,
rocksDBConfigSetterHandler,
pullQueryExecutor,
heartbeatAgent,
lagReportingAgent
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public final class PullQueryExecutor {

public PullQueryExecutor(
final KsqlExecutionContext executionContext,
final RoutingFilterFactory routingFilterFactory
final RoutingFilterFactory routingFilterFactory,
final KsqlConfig ksqlConfig
) {
this.executionContext = Objects.requireNonNull(executionContext, "executionContext");
this.routingFilterFactory =
Expand All @@ -152,7 +153,8 @@ public static void validate(

public TableRowsEntity execute(
final ConfiguredStatement<Query> statement,
final ServiceContext serviceContext
final ServiceContext serviceContext,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {
if (!statement.getStatement().isPullQuery()) {
throw new IllegalArgumentException("Executor can only handle pull queries");
Expand Down Expand Up @@ -193,16 +195,17 @@ public TableRowsEntity execute(
analysis,
whereInfo,
queryId,
contextStacker);
contextStacker,
pullQueryMetrics);

return handlePullQuery(
statement,
executionContext,
serviceContext,
pullQueryContext
);

} catch (final Exception e) {
pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1));
throw new KsqlStatementException(
e.getMessage() == null ? "Server Error" : e.getMessage(),
statement.getStatementText(),
Expand Down Expand Up @@ -256,13 +259,17 @@ private TableRowsEntity routeQuery(
if (node.isLocal()) {
LOG.debug("Query {} executed locally at host {} at timestamp {}.",
statement.getStatementText(), node.location(), System.currentTimeMillis());
pullQueryContext.pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordLocalRequests(1));
return queryRowsLocally(
statement,
executionContext,
pullQueryContext);
} else {
LOG.debug("Query {} routed to host {} at timestamp {}.",
statement.getStatementText(), node.location(), System.currentTimeMillis());
pullQueryContext.pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1));
return forwardTo(node, statement, serviceContext);
}
}
Expand Down Expand Up @@ -399,21 +406,26 @@ private static final class PullQueryContext {
private final WhereInfo whereInfo;
private final QueryId queryId;
private final QueryContext.Stacker contextStacker;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

private PullQueryContext(
final Struct key,
final Materialization mat,
final ImmutableAnalysis analysis,
final WhereInfo whereInfo,
final QueryId queryId,
final QueryContext.Stacker contextStacker
final QueryContext.Stacker contextStacker,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics

) {
this.key = Objects.requireNonNull(key, "key");
this.mat = Objects.requireNonNull(mat, "materialization");
this.analysis = Objects.requireNonNull(analysis, "analysis");
this.whereInfo = Objects.requireNonNull(whereInfo, "whereInfo");
this.queryId = Objects.requireNonNull(queryId, "queryId");
this.contextStacker = Objects.requireNonNull(contextStacker, "contextStacker");
this.pullQueryMetrics = Objects.requireNonNull(
pullQueryMetrics, "pullQueryExecutorMetrics");
}

public Struct getKey() {
Expand Down
Loading

0 comments on commit 23e3868

Please sign in to comment.