Skip to content

Commit

Permalink
fix: Allows remote pull queries to be cancelled (#8252)
Browse files Browse the repository at this point in the history
* fix: Allows remote pull queries to be cancelled
  • Loading branch information
AlanConfluent authored Oct 15, 2021
1 parent 033fb10 commit 4efa445
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ PullQueryResult executeTablePullQuery(
final LogicalPlanNode logicalPlan = buildAndValidateLogicalPlan(
statement, analysis, ksqlConfig, queryPlannerOptions, false);

// This is a cancel signal that is used to stop both local operations
// This is a cancel signal that is used to stop both local operations and requests
final CompletableFuture<Void> shouldCancelRequests = new CompletableFuture<>();

plan = buildPullPhysicalPlan(
Expand All @@ -266,7 +266,7 @@ PullQueryResult executeTablePullQuery(
final PullQueryQueuePopulator populator = () -> routing.handlePullQuery(
serviceContext,
physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(),
physicalPlan.getQueryId(), pullQueryQueue);
physicalPlan.getQueryId(), pullQueryQueue, shouldCancelRequests);
final PullQueryResult result = new PullQueryResult(physicalPlan.getOutputSchema(), populator,
physicalPlan.getQueryId(), pullQueryQueue, pullQueryMetrics, physicalPlan.getSourceType(),
physicalPlan.getPlanType(), routingNodeType, physicalPlan::getRowsReadFromDataSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public CompletableFuture<Void> handlePullQuery(
final RoutingOptions routingOptions,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) {
final List<KsqlPartitionLocation> allLocations = pullPhysicalPlan.getMaterialization().locator()
.locate(
Expand Down Expand Up @@ -151,7 +152,7 @@ public CompletableFuture<Void> handlePullQuery(
coordinatorExecutorService.submit(() -> {
try {
executeRounds(serviceContext, pullPhysicalPlan, statement, routingOptions, outputSchema,
queryId, locations, pullQueryQueue);
queryId, locations, pullQueryQueue, shouldCancelRequests);
completableFuture.complete(null);
} catch (Throwable t) {
completableFuture.completeExceptionally(t);
Expand All @@ -169,7 +170,8 @@ private void executeRounds(
final LogicalSchema outputSchema,
final QueryId queryId,
final List<KsqlPartitionLocation> locations,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) throws InterruptedException {
// The remaining partition locations to retrieve without error
List<KsqlPartitionLocation> remainingLocations = ImmutableList.copyOf(locations);
Expand Down Expand Up @@ -197,7 +199,8 @@ private void executeRounds(
futures.put(node, routerExecutorService.submit(
() -> routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue)
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue,
shouldCancelRequests)
));
}

Expand Down Expand Up @@ -268,7 +271,8 @@ RoutingResult routeQuery(
PullPhysicalPlan pullPhysicalPlan,
LogicalSchema outputSchema,
QueryId queryId,
PullQueryQueue pullQueryQueue
PullQueryQueue pullQueryQueue,
CompletableFuture<Void> shouldCancelRequests
);
}

Expand All @@ -284,7 +288,8 @@ static RoutingResult executeOrRouteQuery(
final PullPhysicalPlan pullPhysicalPlan,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) {
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory = (rawRow, schema) ->
new PullQueryRow(rawRow, schema, Optional.ofNullable(
Expand Down Expand Up @@ -315,7 +320,7 @@ static RoutingResult executeOrRouteQuery(
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1));
forwardTo(node, locations, statement, serviceContext, pullQueryQueue, rowFactory,
outputSchema);
outputSchema, shouldCancelRequests);
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error forwarding query to node {}. Falling back to standby state which may "
Expand All @@ -337,7 +342,8 @@ private static void forwardTo(
final ServiceContext serviceContext,
final PullQueryQueue pullQueryQueue,
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory,
final LogicalSchema outputSchema
final LogicalSchema outputSchema,
final CompletableFuture<Void> shouldCancelRequests
) {

// Specify the partitions we specifically want to read. This will prevent reading unintended
Expand All @@ -360,7 +366,8 @@ private static void forwardTo(
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema)
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema),
shouldCancelRequests
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
Expand All @@ -369,6 +376,12 @@ private static void forwardTo(
if (ksqlException != null) {
throw ksqlException;
}
// If the exception was caused by closing the connection, we consider this intentional and
// just return.
if (shouldCancelRequests.isDone()) {
LOG.warn("Connection canceled, so returning");
return;
}
// If we get some kind of unknown error, we assume it's network or other error from the
// KsqlClient and try standbys
throw new StandbyFallbackException(String.format(
Expand Down
Loading

0 comments on commit 4efa445

Please sign in to comment.