Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allows remote pull queries to be cancelled #8252

Merged
merged 3 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ PullQueryResult executeTablePullQuery(
final LogicalPlanNode logicalPlan = buildAndValidateLogicalPlan(
statement, analysis, ksqlConfig, queryPlannerOptions, false);

// This is a cancel signal that is used to stop both local operations
// This is a cancel signal that is used to stop both local operations and requests
final CompletableFuture<Void> shouldCancelRequests = new CompletableFuture<>();

plan = buildPullPhysicalPlan(
Expand All @@ -266,7 +266,7 @@ PullQueryResult executeTablePullQuery(
final PullQueryQueuePopulator populator = () -> routing.handlePullQuery(
serviceContext,
physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(),
physicalPlan.getQueryId(), pullQueryQueue);
physicalPlan.getQueryId(), pullQueryQueue, shouldCancelRequests);
final PullQueryResult result = new PullQueryResult(physicalPlan.getOutputSchema(), populator,
physicalPlan.getQueryId(), pullQueryQueue, pullQueryMetrics, physicalPlan.getSourceType(),
physicalPlan.getPlanType(), routingNodeType, physicalPlan::getRowsReadFromDataSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public CompletableFuture<Void> handlePullQuery(
final RoutingOptions routingOptions,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) {
final List<KsqlPartitionLocation> allLocations = pullPhysicalPlan.getMaterialization().locator()
.locate(
Expand Down Expand Up @@ -151,7 +152,7 @@ public CompletableFuture<Void> handlePullQuery(
coordinatorExecutorService.submit(() -> {
try {
executeRounds(serviceContext, pullPhysicalPlan, statement, routingOptions, outputSchema,
queryId, locations, pullQueryQueue);
queryId, locations, pullQueryQueue, shouldCancelRequests);
completableFuture.complete(null);
} catch (Throwable t) {
completableFuture.completeExceptionally(t);
Expand All @@ -169,7 +170,8 @@ private void executeRounds(
final LogicalSchema outputSchema,
final QueryId queryId,
final List<KsqlPartitionLocation> locations,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) throws InterruptedException {
// The remaining partition locations to retrieve without error
List<KsqlPartitionLocation> remainingLocations = ImmutableList.copyOf(locations);
Expand Down Expand Up @@ -197,7 +199,8 @@ private void executeRounds(
futures.put(node, routerExecutorService.submit(
() -> routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue)
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue,
shouldCancelRequests)
));
}

Expand Down Expand Up @@ -268,7 +271,8 @@ RoutingResult routeQuery(
PullPhysicalPlan pullPhysicalPlan,
LogicalSchema outputSchema,
QueryId queryId,
PullQueryQueue pullQueryQueue
PullQueryQueue pullQueryQueue,
CompletableFuture<Void> shouldCancelRequests
);
}

Expand All @@ -284,7 +288,8 @@ static RoutingResult executeOrRouteQuery(
final PullPhysicalPlan pullPhysicalPlan,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) {
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory = (rawRow, schema) ->
new PullQueryRow(rawRow, schema, Optional.ofNullable(
Expand Down Expand Up @@ -315,7 +320,7 @@ static RoutingResult executeOrRouteQuery(
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1));
forwardTo(node, locations, statement, serviceContext, pullQueryQueue, rowFactory,
outputSchema);
outputSchema, shouldCancelRequests);
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error forwarding query to node {}. Falling back to standby state which may "
Expand All @@ -337,7 +342,8 @@ private static void forwardTo(
final ServiceContext serviceContext,
final PullQueryQueue pullQueryQueue,
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory,
final LogicalSchema outputSchema
final LogicalSchema outputSchema,
final CompletableFuture<Void> shouldCancelRequests
) {

// Specify the partitions we specifically want to read. This will prevent reading unintended
Expand All @@ -360,7 +366,8 @@ private static void forwardTo(
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema)
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema),
shouldCancelRequests
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
Expand All @@ -369,6 +376,12 @@ private static void forwardTo(
if (ksqlException != null) {
throw ksqlException;
}
// If the exception was caused by closing the connection, we consider this intentional and
// just return.
if (shouldCancelRequests.isDone()) {
LOG.warn("Connection canceled, so returning");
return;
}
// If we get some kind of unknown error, we assume it's network or other error from the
// KsqlClient and try standbys
throw new StandbyFallbackException(String.format(
Expand Down
Loading