Skip to content

Commit

Permalink
fix: Ensure that we always close /query writer (#8164)
Browse files Browse the repository at this point in the history
* fix: Ensure that we always close /query writer
  • Loading branch information
AlanConfluent authored Sep 24, 2021
1 parent 9747b34 commit f7c2002
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 78 deletions.
17 changes: 15 additions & 2 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,15 @@ public class KsqlConfig extends AbstractConfig {

public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG
= "ksql.query.pull.thread.pool.size";
public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 100;
public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 50;
public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC =
"Size of thread pool used for sending/executing pull queries";
"Size of thread pool used for coordinating pull queries";

public static final String KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_CONFIG
= "ksql.query.pull.router.thread.pool.size";
public static final Integer KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DEFAULT = 50;
public static final String KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DOC =
"Size of thread pool used for routing pull queries";

public static final String KSQL_QUERY_PULL_TABLE_SCAN_ENABLED
= "ksql.query.pull.table.scan.enabled";
Expand Down Expand Up @@ -949,6 +955,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC
)
.define(
KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_CONFIG,
Type.INT,
KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_DOC
)
.define(
KSQL_QUERY_PULL_TABLE_SCAN_ENABLED,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
Expand Down Expand Up @@ -243,10 +244,14 @@ PullQueryResult executeTablePullQuery(
final LogicalPlanNode logicalPlan = buildAndValidateLogicalPlan(
statement, analysis, ksqlConfig, queryPlannerOptions, false);

// This is a cancel signal that is used to stop both local operations and requests
final CompletableFuture<Void> shouldCancelRequests = new CompletableFuture<>();

plan = buildPullPhysicalPlan(
logicalPlan,
analysis,
queryPlannerOptions
queryPlannerOptions,
shouldCancelRequests
);
final PullPhysicalPlan physicalPlan = plan;

Expand All @@ -257,10 +262,11 @@ PullQueryResult executeTablePullQuery(
final PullQueryQueuePopulator populator = () -> routing.handlePullQuery(
serviceContext,
physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(),
physicalPlan.getQueryId(), pullQueryQueue);
physicalPlan.getQueryId(), pullQueryQueue, shouldCancelRequests);
final PullQueryResult result = new PullQueryResult(physicalPlan.getOutputSchema(), populator,
physicalPlan.getQueryId(), pullQueryQueue, pullQueryMetrics, physicalPlan.getSourceType(),
physicalPlan.getPlanType(), routingNodeType, physicalPlan::getRowsReadFromDataSource);
physicalPlan.getPlanType(), routingNodeType, physicalPlan::getRowsReadFromDataSource,
shouldCancelRequests);
if (startImmediately) {
result.start();
}
Expand Down Expand Up @@ -663,14 +669,16 @@ private PushPhysicalPlan buildScalablePushPhysicalPlan(
private PullPhysicalPlan buildPullPhysicalPlan(
final LogicalPlanNode logicalPlan,
final ImmutableAnalysis analysis,
final QueryPlannerOptions queryPlannerOptions
final QueryPlannerOptions queryPlannerOptions,
final CompletableFuture<Void> shouldCancelRequests
) {

final PullPhysicalPlanBuilder builder = new PullPhysicalPlanBuilder(
engineContext.getProcessingLogContext(),
PullQueryExecutionUtil.findMaterializingQuery(engineContext, analysis),
analysis,
queryPlannerOptions
queryPlannerOptions,
shouldCancelRequests
);
return builder.buildPullPhysicalPlan(logicalPlan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public final class HARouting implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(HARouting.class);

private final ExecutorService executorService;
private final ExecutorService coordinatorExecutorService;
private final ExecutorService routerExecutorService;
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RouteQuery routeQuery;
Expand All @@ -87,16 +88,20 @@ public HARouting(
) {
this.routingFilterFactory =
Objects.requireNonNull(routingFilterFactory, "routingFilterFactory");
this.executorService = Executors.newFixedThreadPool(
this.coordinatorExecutorService = Executors.newFixedThreadPool(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG),
new ThreadFactoryBuilder().setNameFormat("pull-query-executor-%d").build());
new ThreadFactoryBuilder().setNameFormat("pull-query-coordinator-%d").build());
this.routerExecutorService = Executors.newFixedThreadPool(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_ROUTER_THREAD_POOL_SIZE_CONFIG),
new ThreadFactoryBuilder().setNameFormat("pull-query-router-%d").build());
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics");
this.routeQuery = Objects.requireNonNull(routeQuery);
}

@Override
public void close() {
executorService.shutdown();
coordinatorExecutorService.shutdown();
routerExecutorService.shutdown();
}

public CompletableFuture<Void> handlePullQuery(
Expand All @@ -106,7 +111,8 @@ public CompletableFuture<Void> handlePullQuery(
final RoutingOptions routingOptions,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) {
final List<KsqlPartitionLocation> allLocations = pullPhysicalPlan.getMaterialization().locator()
.locate(
Expand Down Expand Up @@ -143,10 +149,10 @@ public CompletableFuture<Void> handlePullQuery(
.collect(Collectors.toList());

final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
executorService.submit(() -> {
coordinatorExecutorService.submit(() -> {
try {
executeRounds(serviceContext, pullPhysicalPlan, statement, routingOptions, outputSchema,
queryId, locations, pullQueryQueue);
queryId, locations, pullQueryQueue, shouldCancelRequests);
completableFuture.complete(null);
} catch (Throwable t) {
completableFuture.completeExceptionally(t);
Expand All @@ -164,7 +170,8 @@ private void executeRounds(
final LogicalSchema outputSchema,
final QueryId queryId,
final List<KsqlPartitionLocation> locations,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) throws InterruptedException {
// The remaining partition locations to retrieve without error
List<KsqlPartitionLocation> remainingLocations = ImmutableList.copyOf(locations);
Expand All @@ -189,10 +196,11 @@ private void executeRounds(
final Map<KsqlNode, Future<RoutingResult>> futures = new LinkedHashMap<>();
for (Map.Entry<KsqlNode, List<KsqlPartitionLocation>> entry : groupedByHost.entrySet()) {
final KsqlNode node = entry.getKey();
futures.put(node, executorService.submit(
futures.put(node, routerExecutorService.submit(
() -> routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue)
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue,
shouldCancelRequests)
));
}

Expand Down Expand Up @@ -250,6 +258,7 @@ private static Map<KsqlNode, List<KsqlPartitionLocation>> groupByHost(
return groupedByHost;
}

@SuppressWarnings("ParameterNumber")
@VisibleForTesting
interface RouteQuery {
RoutingResult routeQuery(
Expand All @@ -262,10 +271,12 @@ RoutingResult routeQuery(
PullPhysicalPlan pullPhysicalPlan,
LogicalSchema outputSchema,
QueryId queryId,
PullQueryQueue pullQueryQueue
PullQueryQueue pullQueryQueue,
CompletableFuture<Void> shouldCancelRequests
);
}

@SuppressWarnings("ParameterNumber")
@VisibleForTesting
static RoutingResult executeOrRouteQuery(
final KsqlNode node,
Expand All @@ -277,7 +288,8 @@ static RoutingResult executeOrRouteQuery(
final PullPhysicalPlan pullPhysicalPlan,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
) {
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory = (rawRow, schema) ->
new PullQueryRow(rawRow, schema, Optional.ofNullable(
Expand Down Expand Up @@ -308,7 +320,7 @@ static RoutingResult executeOrRouteQuery(
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1));
forwardTo(node, locations, statement, serviceContext, pullQueryQueue, rowFactory,
outputSchema);
outputSchema, shouldCancelRequests);
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error forwarding query to node {}. Falling back to standby state which may "
Expand All @@ -330,7 +342,8 @@ private static void forwardTo(
final ServiceContext serviceContext,
final PullQueryQueue pullQueryQueue,
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory,
final LogicalSchema outputSchema
final LogicalSchema outputSchema,
final CompletableFuture<Void> shouldCancelRequests
) {

// Specify the partitions we specifically want to read. This will prevent reading unintended
Expand All @@ -353,7 +366,8 @@ private static void forwardTo(
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema)
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema),
shouldCancelRequests
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
Expand All @@ -362,6 +376,12 @@ private static void forwardTo(
if (ksqlException != null) {
throw ksqlException;
}
// If the exception was caused by closing the connection, we consider this intentional and
// just return.
if (shouldCancelRequests.isDone()) {
LOG.warn("Connection canceled, so returning");
return;
}
// If we get some kind of unknown error, we assume it's network or other error from the
// KsqlClient and try standbys
throw new StandbyFallbackException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* Traverses the logical plan top-down and creates a physical plan for pull queries.
Expand All @@ -67,6 +68,7 @@ public class PullPhysicalPlanBuilder {
private final ProcessingLogContext processingLogContext;
private final Stacker contextStacker;
private final PersistentQueryMetadata persistentQueryMetadata;
private final CompletableFuture<Void> shouldCancelOperations;
private final QueryId queryId;
private final Materialization mat;
private final QueryPlannerOptions queryPlannerOptions;
Expand All @@ -80,12 +82,15 @@ public PullPhysicalPlanBuilder(
final ProcessingLogContext processingLogContext,
final PersistentQueryMetadata persistentQueryMetadata,
final ImmutableAnalysis analysis,
final QueryPlannerOptions queryPlannerOptions
final QueryPlannerOptions queryPlannerOptions,
final CompletableFuture<Void> shouldCancelOperations
) {
this.processingLogContext = Objects.requireNonNull(
processingLogContext, "processingLogContext");
this.persistentQueryMetadata = Objects.requireNonNull(
persistentQueryMetadata, "persistentQueryMetadata");
this.shouldCancelOperations = Objects.requireNonNull(shouldCancelOperations,
"shouldCancelOperations");
this.contextStacker = new Stacker();
queryId = uniqueQueryId();
mat = this.persistentQueryMetadata
Expand Down Expand Up @@ -227,9 +232,9 @@ private AbstractPhysicalOperator translateDataSourceNode(
? PullSourceType.WINDOWED : PullSourceType.NON_WINDOWED;
if (pullPhysicalPlanType == PullPhysicalPlanType.TABLE_SCAN) {
if (!logicalNode.isWindowed()) {
return new TableScanOperator(mat, logicalNode);
return new TableScanOperator(mat, logicalNode, shouldCancelOperations);
} else {
return new WindowedTableScanOperator(mat, logicalNode);
return new WindowedTableScanOperator(mat, logicalNode, shouldCancelOperations);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PullQueryResult {
private final PullPhysicalPlanType planType;
private final RoutingNodeType routingNodeType;
private final Supplier<Long> rowsProcessedSupplier;
private final CompletableFuture<Void> shouldCancelRequests;

// This future is used to keep track of all of the callbacks since we allow for adding them both
// before and after the pull query has been started. When the pull query has completed, it will
Expand All @@ -61,7 +62,8 @@ public PullQueryResult(
final PullSourceType sourceType,
final PullPhysicalPlanType planType,
final RoutingNodeType routingNodeType,
final Supplier<Long> rowsProcessedSupplier
final Supplier<Long> rowsProcessedSupplier,
final CompletableFuture<Void> shouldCancelRequests
) {
this.schema = schema;
this.populator = populator;
Expand All @@ -72,6 +74,7 @@ public PullQueryResult(
this.planType = planType;
this.routingNodeType = routingNodeType;
this.rowsProcessedSupplier = rowsProcessedSupplier;
this.shouldCancelRequests = shouldCancelRequests;
}

public LogicalSchema getSchema() {
Expand Down Expand Up @@ -115,6 +118,7 @@ public void stop() {
LOG.error("Error closing pull query queue", t);
}
future.complete(null);
shouldCancelRequests.complete(null);
}

public void onException(final Consumer<Throwable> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +38,7 @@ public class TableScanOperator extends AbstractPhysicalOperator

private final Materialization mat;
private final DataSourceNode logicalNode;
private final CompletableFuture<Void> shouldCancelOperations;

private ImmutableList<KsqlPartitionLocation> partitionLocations;
private Iterator<Row> resultIterator;
Expand All @@ -46,10 +48,13 @@ public class TableScanOperator extends AbstractPhysicalOperator

public TableScanOperator(
final Materialization mat,
final DataSourceNode logicalNode
final DataSourceNode logicalNode,
final CompletableFuture<Void> shouldCancelOperations
) {
this.mat = Objects.requireNonNull(mat, "mat");
this.logicalNode = Objects.requireNonNull(logicalNode, "logicalNode");
this.shouldCancelOperations = Objects.requireNonNull(shouldCancelOperations,
"shouldCancelOperations");
}

@Override
Expand All @@ -67,6 +72,10 @@ public void open() {

@Override
public Object next() {
if (shouldCancelOperations.isDone()) {
return null;
}

while (!resultIterator.hasNext()) {
// Exhausted resultIterator
if (partitionLocationIterator.hasNext()) {
Expand Down
Loading

0 comments on commit f7c2002

Please sign in to comment.