Skip to content

Commit

Permalink
feat: Add metrics for pull query request/response size in bytes (#6148)
Browse files Browse the repository at this point in the history
* adding metrics for request/response size

* measure size via vertx methods

* remove jol dependency

* removed routingContext from streamedqueryresource

* Addressed alan's comments

* address alan's comments

* rebase
  • Loading branch information
vpapavas authored Sep 28, 2020
1 parent 57b7b2e commit 946d2d3
Show file tree
Hide file tree
Showing 22 changed files with 240 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.entity.TableRows;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.execution.PullQueryResult;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.utils.FormatOptions;
Expand All @@ -52,12 +53,18 @@ public class QueryEndpoint {
private final KsqlEngine ksqlEngine;
private final KsqlConfig ksqlConfig;
private final PullQueryExecutor pullQueryExecutor;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

public QueryEndpoint(final KsqlEngine ksqlEngine, final KsqlConfig ksqlConfig,
final PullQueryExecutor pullQueryExecutor) {
public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final PullQueryExecutor pullQueryExecutor,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.pullQueryExecutor = pullQueryExecutor;
this.pullQueryMetrics = pullQueryMetrics;
}

public QueryPublisher createQueryPublisher(
Expand All @@ -72,7 +79,8 @@ public QueryPublisher createQueryPublisher(
final ConfiguredStatement<Query> statement = createStatement(sql, properties.getMap());

if (statement.getStatement().isPullQuery()) {
return createPullQueryPublisher(context, serviceContext, statement, startTimeNanos);
return createPullQueryPublisher(
context, serviceContext, statement, pullQueryMetrics, startTimeNanos);
} else {
return createPushQueryPublisher(context, serviceContext, statement, workerExecutor);
}
Expand All @@ -97,10 +105,12 @@ private QueryPublisher createPullQueryPublisher(
final Context context,
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final long startTimeNanos
) {
final PullQueryResult result = pullQueryExecutor.execute(
statement, ImmutableMap.of(), serviceContext, Optional.of(false), startTimeNanos);
statement, ImmutableMap.of(), serviceContext, Optional.of(false), pullQueryMetrics);
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
final TableRows tableRows = result.getTableRows();

return new PullQueryPublisher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.WorkerExecutor;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import org.apache.kafka.common.utils.Time;

public final class OldApiUtils {

Expand All @@ -49,10 +51,13 @@ private OldApiUtils() {
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String CHUNKED_ENCODING = "chunked";

static <T> void handleOldApiRequest(final Server server,
static <T> void handleOldApiRequest(
final Server server,
final RoutingContext routingContext,
final Class<T> requestClass,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final BiFunction<T, ApiSecurityContext, CompletableFuture<EndpointResponse>> requestor) {
final long startTimeNanos = Time.SYSTEM.nanoseconds();
final T requestObject;
if (requestClass != null) {
final Optional<T> optRequestObject = ServerUtils
Expand All @@ -64,22 +69,30 @@ static <T> void handleOldApiRequest(final Server server,
} else {
requestObject = null;
}

pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize(
routingContext.request().bytesRead()));
final CompletableFuture<EndpointResponse> completableFuture = requestor
.apply(requestObject, DefaultApiSecurityContext.create(routingContext));
completableFuture.thenAccept(endpointResponse -> {
handleOldApiResponse(server, routingContext, endpointResponse);
handleOldApiResponse(
server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos);
}).exceptionally(t -> {
if (t instanceof CompletionException) {
t = t.getCause();
}
handleOldApiResponse(server, routingContext, mapException(t));
handleOldApiResponse(
server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos);
return null;
});
}

static void handleOldApiResponse(final Server server, final RoutingContext routingContext,
final EndpointResponse endpointResponse) {
static void handleOldApiResponse(
final Server server, final RoutingContext routingContext,
final EndpointResponse endpointResponse,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final long startTimeNanos
) {
final HttpServerResponse response = routingContext.response();
response.putHeader(CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE);

Expand Down Expand Up @@ -112,6 +125,12 @@ static void handleOldApiResponse(final Server server, final RoutingContext routi
response.end(responseBody);
}
}
pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize(
routingContext.response().bytesWritten()));
pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics
.recordLatency(startTimeNanos));

}

private static void streamEndpointResponse(final Server server,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.KeystoreUtil;
import io.confluent.ksql.security.KsqlSecurityExtension;
Expand Down Expand Up @@ -78,21 +79,25 @@ public class Server {
private final Optional<AuthenticationPlugin> authenticationPlugin;
private final ServerState serverState;
private final List<URI> listeners = new ArrayList<>();
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private URI internalListener;
private WorkerExecutor workerExecutor;
private FileWatcher fileWatcher;

public Server(final Vertx vertx, final KsqlRestConfig config, final Endpoints endpoints,
public Server(
final Vertx vertx, final KsqlRestConfig config, final Endpoints endpoints,
final KsqlSecurityExtension securityExtension,
final Optional<AuthenticationPlugin> authenticationPlugin,
final ServerState serverState) {
final ServerState serverState,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics) {
this.vertx = Objects.requireNonNull(vertx);
this.config = Objects.requireNonNull(config);
this.endpoints = Objects.requireNonNull(endpoints);
this.securityExtension = Objects.requireNonNull(securityExtension);
this.authenticationPlugin = Objects.requireNonNull(authenticationPlugin);
this.serverState = Objects.requireNonNull(serverState);
this.maxPushQueryCount = config.getInt(KsqlRestConfig.MAX_PUSH_QUERIES);
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics");
if (!OpenSsl.isAvailable()) {
log.warn("OpenSSL does not appear to be installed. ksqlDB will fall back to using the JDK "
+ "TLS implementation. OpenSSL is recommended for better performance.");
Expand Down Expand Up @@ -126,7 +131,7 @@ public synchronized void start() {
final ServerVerticle serverVerticle = new ServerVerticle(endpoints,
createHttpServerOptions(config, listener.getHost(), listener.getPort(),
listener.getScheme().equalsIgnoreCase("https"), isInternalListener.orElse(false)),
this, isInternalListener);
this, isInternalListener, pullQueryMetrics);
vertx.deployVerticle(serverVerticle, vcf);
final int index = i;
final CompletableFuture<String> deployFuture = vcf.thenApply(s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpHeaders;
Expand Down Expand Up @@ -65,16 +66,19 @@ public class ServerVerticle extends AbstractVerticle {
private ConnectionQueryManager connectionQueryManager;
private HttpServer httpServer;
private final Optional<Boolean> isInternalListener;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;

public ServerVerticle(
final Endpoints endpoints,
final HttpServerOptions httpServerOptions,
final Server server,
final Optional<Boolean> isInternalListener) {
final Optional<Boolean> isInternalListener,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics) {
this.endpoints = Objects.requireNonNull(endpoints);
this.httpServerOptions = Objects.requireNonNull(httpServerOptions);
this.server = Objects.requireNonNull(server);
this.isInternalListener = Objects.requireNonNull(isInternalListener);
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics);
}

@Override
Expand Down Expand Up @@ -206,7 +210,7 @@ private Router setupRouter() {
}

private void handleKsqlRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, KsqlRequest.class,
handleOldApiRequest(server, routingContext, KsqlRequest.class, Optional.empty(),
(ksqlRequest, apiSecurityContext) ->
endpoints
.executeKsqlRequest(ksqlRequest, server.getWorkerExecutor(),
Expand All @@ -215,7 +219,7 @@ private void handleKsqlRequest(final RoutingContext routingContext) {
}

private void handleTerminateRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, ClusterTerminateRequest.class,
handleOldApiRequest(server, routingContext, ClusterTerminateRequest.class, Optional.empty(),
(request, apiSecurityContext) ->
endpoints
.executeTerminate(request, server.getWorkerExecutor(),
Expand All @@ -227,32 +231,33 @@ private void handleQueryRequest(final RoutingContext routingContext) {

final CompletableFuture<Void> connectionClosedFuture = new CompletableFuture<>();
routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null));

handleOldApiRequest(server, routingContext, KsqlRequest.class,
handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics,
(request, apiSecurityContext) ->
endpoints
.executeQueryRequest(request, server.getWorkerExecutor(), connectionClosedFuture,
.executeQueryRequest(
request, server.getWorkerExecutor(), connectionClosedFuture,
DefaultApiSecurityContext.create(routingContext),
isInternalRequest(routingContext))

);
}

private void handleInfoRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeInfo(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleClusterStatusRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeClusterStatus(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleHeartbeatRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, HeartbeatMessage.class,
handleOldApiRequest(server, routingContext, HeartbeatMessage.class, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeHeartbeat(request, DefaultApiSecurityContext.create(routingContext))
);
Expand All @@ -263,43 +268,43 @@ private void handleStatusRequest(final RoutingContext routingContext) {
final String type = request.getParam("type");
final String entity = request.getParam("entity");
final String action = request.getParam("action");
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(r, apiSecurityContext) ->
endpoints.executeStatus(type, entity, action,
DefaultApiSecurityContext.create(routingContext))
);
}

private void handleAllStatusesRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(r, apiSecurityContext) ->
endpoints.executeAllStatuses(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleLagReportRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, LagReportingMessage.class,
handleOldApiRequest(server, routingContext, LagReportingMessage.class, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeLagReport(request, DefaultApiSecurityContext.create(routingContext))
);
}

private void handleHealthcheckRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeCheckHealth(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleServerMetadataRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints.executeServerMetadata(DefaultApiSecurityContext.create(routingContext))
);
}

private void handleServerMetadataClusterIdRequest(final RoutingContext routingContext) {
handleOldApiRequest(server, routingContext, null,
handleOldApiRequest(server, routingContext, null, Optional.empty(),
(request, apiSecurityContext) ->
endpoints
.executeServerMetadataClusterId(DefaultApiSecurityContext.create(routingContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ CompletableFuture<EndpointResponse> executeKsqlRequest(KsqlRequest request,
CompletableFuture<EndpointResponse> executeTerminate(ClusterTerminateRequest request,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext);

CompletableFuture<EndpointResponse> executeQueryRequest(KsqlRequest request,
WorkerExecutor workerExecutor, CompletableFuture<Void> connectionClosedFuture,
ApiSecurityContext apiSecurityContext, Optional<Boolean> isInternalRequest);
CompletableFuture<EndpointResponse> executeQueryRequest(
KsqlRequest request, WorkerExecutor workerExecutor,
CompletableFuture<Void> connectionClosedFuture, ApiSecurityContext apiSecurityContext,
Optional<Boolean> isInternalRequest);

CompletableFuture<EndpointResponse> executeInfo(ApiSecurityContext apiSecurityContext);

Expand Down
Loading

0 comments on commit 946d2d3

Please sign in to comment.