Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Bubble up errors from HARouting unless using StandbyFallbackException #7238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlRequestConfig;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -121,7 +119,7 @@ public CompletableFuture<Void> handlePullQuery(
LOG.debug("Unable to execute pull query: {}. All nodes are dead or exceed max allowed lag.",
statement.getStatementText());
throw new MaterializationException(String.format(
"Unable to execute pull query %s. All nodes are dead or exceed max allowed lag.",
"Unable to execute pull query \"%s\". All nodes are dead or exceed max allowed lag.",
statement.getStatementText()));
}

Expand Down Expand Up @@ -169,32 +167,37 @@ private void executeRounds(

// Make requests to each host, specifying the partitions we're interested in from
// this host.
final Map<KsqlNode, Future<Void>> futures = new LinkedHashMap<>();
final Map<KsqlNode, Future<RoutingResult>> futures = new LinkedHashMap<>();
for (Map.Entry<KsqlNode, List<KsqlPartitionLocation>> entry : groupedByHost.entrySet()) {
final KsqlNode node = entry.getKey();
futures.put(node, executorService.submit(
() -> {
routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue);
return null;
}
() -> routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue)
));
}

// Go through all of the results of the requests, either aggregating rows or adding
// the locations to the nextRoundRemaining list.
final ImmutableList.Builder<KsqlPartitionLocation> nextRoundRemaining
= ImmutableList.builder();
for (Map.Entry<KsqlNode, Future<Void>> entry : futures.entrySet()) {
final Future<Void> future = entry.getValue();
for (Map.Entry<KsqlNode, Future<RoutingResult>> entry : futures.entrySet()) {
final Future<RoutingResult> future = entry.getValue();
final KsqlNode node = entry.getKey();
RoutingResult routingResult = null;
try {
future.get();
routingResult = future.get();
} catch (ExecutionException e) {
LOG.warn("Error routing query {} to host {} at timestamp {} with exception {}",
statement.getStatementText(), node, System.currentTimeMillis(), e.getCause());
throw new MaterializationException(String.format(
"Unable to execute pull query \"%s\". %s",
statement.getStatementText(), e.getCause().getMessage()));
}
if (routingResult == RoutingResult.STANDBY_FALLBACK) {
nextRoundRemaining.addAll(groupedByHost.get(node));
} else {
Preconditions.checkState(routingResult == RoutingResult.SUCCESS);
}
}
remainingLocations = nextRoundRemaining.build();
Expand Down Expand Up @@ -225,7 +228,7 @@ private static Map<KsqlNode, List<KsqlPartitionLocation>> groupByHost(
// If one of the partitions required is out of nodes, then we cannot continue.
if (round >= location.getNodes().size()) {
throw new MaterializationException(String.format(
"Unable to execute pull query: %s. Exhausted standby hosts to try.",
"Unable to execute pull query: \"%s\". Exhausted standby hosts to try.",
statement.getStatementText()));
}
final KsqlNode nextHost = location.getNodes().get(round);
Expand All @@ -236,7 +239,7 @@ private static Map<KsqlNode, List<KsqlPartitionLocation>> groupByHost(

@VisibleForTesting
interface RouteQuery {
void routeQuery(
RoutingResult routeQuery(
KsqlNode node,
List<KsqlPartitionLocation> locations,
ConfiguredStatement<Query> statement,
Expand All @@ -251,7 +254,7 @@ void routeQuery(
}

@VisibleForTesting
static void executeOrRouteQuery(
static RoutingResult executeOrRouteQuery(
final KsqlNode node,
final List<KsqlPartitionLocation> locations,
final ConfiguredStatement<Query> statement,
Expand All @@ -273,29 +276,41 @@ static void executeOrRouteQuery(
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordLocalRequests(1));
pullPhysicalPlan.execute(locations, pullQueryQueue, rowFactory);
} catch (Exception e) {
LOG.error("Error executing query {} locally at node {} with exception",
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error executing query {} locally at node {}. Falling back to standby state which "
+ "may return stale results",
statement.getStatementText(), node, e.getCause());
return RoutingResult.STANDBY_FALLBACK;
} catch (Exception e) {
LOG.error("Error executing query {} locally at node {}",
statement.getStatementText(), node.location(), e.getCause());
throw new KsqlException(
String.format("Error executing query %s locally at node %s",
statement.getStatementText(), node),
String.format("Error executing query locally at node %s: %s", node.location(),
e.getMessage()),
e
);
}
} else {
try {
LOG.debug("Query {} routed to host {} at timestamp {}.",
statement.getStatementText(), node.location(), System.currentTimeMillis());
statement.getStatementText(), node.location(), System.currentTimeMillis());
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1));
forwardTo(node, locations, statement, serviceContext, pullQueryQueue, rowFactory,
outputSchema);
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error forwarding query {} to node {}. Falling back to standby state which may "
+ "return stale results",
statement.getStatementText(), node.location(), e.getCause());
return RoutingResult.STANDBY_FALLBACK;
} catch (Exception e) {
LOG.error("Error forwarding query {} to node {} with exception {}",
LOG.error("Error forwarding query {} to node {}",
statement.getStatementText(), node, e.getCause());
throw new KsqlException(
String.format("Error forwarding query %s to node %s",
statement.getStatementText(), node),
String.format("Error forwarding query to node %s: %s", node.location(),
e.getMessage()),
e
);
}
Expand All @@ -322,83 +337,105 @@ private static void forwardTo(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true,
KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true,
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
final RestResponse<Integer> response = serviceContext
.getKsqlClient()
.makeQueryRequest(
owner.location(),
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, statement, requestProperties, pullQueryQueue, rowFactory,
outputSchema)
);
final RestResponse<Integer> response;

try {
response = serviceContext
.getKsqlClient()
.makeQueryRequest(
owner.location(),
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema)
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
// wrapped in a KsqlException, so any intentional exception or bug will be surfaced.
final KsqlException ksqlException = causedByKsqlException(e);
if (ksqlException != null) {
throw ksqlException;
}
// If we get some kind of unknown error, we assume it's network or other error from the
// KsqlClient and try standbys
throw new StandbyFallbackException(String.format(
"Forwarding pull query request [%s, %s] failed with error %s ",
statement.getSessionConfig().getOverrides(), requestProperties,
e.getMessage()), e);
}

if (response.isErroneous()) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed with error %s ",
statement.getStatement(), statement.getSessionConfig().getOverrides(), requestProperties,
owner, response.getErrorMessage()));
throw new KsqlException(String.format(
"Forwarding pull query request [%s, %s] failed with error %s ",
statement.getSessionConfig().getOverrides(), requestProperties,
response.getErrorMessage()));
}

final int numRows = response.getResponse();
if (numRows == 0) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed due to invalid "
throw new KsqlException(String.format(
"Forwarding pull query request [%s, %s] failed due to invalid "
+ "empty response from forwarding call, expected a header row.",
statement.getStatement(), statement.getSessionConfig().getOverrides(), requestProperties,
owner));
statement.getSessionConfig().getOverrides(), requestProperties));
}
}

private static KsqlException causedByKsqlException(final Exception e) {
Throwable throwable = e;
while (throwable != null) {
if (throwable instanceof KsqlException) {
return (KsqlException) throwable;
}
throwable = throwable.getCause();
}
return null;
}

private static Consumer<List<StreamedRow>> streamedRowsHandler(
final KsqlNode owner,
final ConfiguredStatement<Query> statement,
final Map<String, Object> requestProperties,
final PullQueryQueue pullQueryQueue,
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory,
final LogicalSchema outputSchema
) {
final AtomicInteger processedRows = new AtomicInteger(0);
final AtomicReference<Header> header = new AtomicReference<>();
return streamedRows -> {
if (streamedRows == null || streamedRows.isEmpty()) {
return;
}
final List<PullQueryRow> rows = new ArrayList<>();

// If this is the first row overall, skip the header
final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size());
for (int i = 0; i < streamedRows.size(); i++) {
final StreamedRow row = streamedRows.get(i);
if (i == 0 && previousProcessedRows == 0) {
final Optional<Header> optionalHeader = row.getHeader();
optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner));
optionalHeader.ifPresent(header::set);
continue;
try {
if (streamedRows == null || streamedRows.isEmpty()) {
return;
}
final List<PullQueryRow> rows = new ArrayList<>();

if (row.getErrorMessage().isPresent()) {
throw new KsqlStatementException(
row.getErrorMessage().get().getMessage(),
statement.getStatementText()
);
}
// If this is the first row overall, skip the header
final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size());
for (int i = 0; i < streamedRows.size(); i++) {
final StreamedRow row = streamedRows.get(i);
if (i == 0 && previousProcessedRows == 0) {
final Optional<Header> optionalHeader = row.getHeader();
optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner));
optionalHeader.ifPresent(header::set);
continue;
}

if (!row.getRow().isPresent()) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed due to "
+ "missing row data.",
statement.getStatement(), statement.getSessionConfig().getOverrides(),
requestProperties, owner));
}
if (row.getErrorMessage().isPresent()) {
// If we receive an error that's not a network error, we let that bubble up.
throw new KsqlException(row.getErrorMessage().get().getMessage());
}

final List<?> r = row.getRow().get().getColumns();
Preconditions.checkNotNull(header.get());
rows.add(rowFactory.apply(r, header.get().getSchema()));
}
if (!row.getRow().isPresent()) {
throw new KsqlException("Missing row data on row " + i + " of chunk");
}

final List<?> r = row.getRow().get().getColumns();
Preconditions.checkNotNull(header.get());
rows.add(rowFactory.apply(r, header.get().getSchema()));
}

if (!pullQueryQueue.acceptRows(rows)) {
LOG.info("Failed to queue all rows");
if (!pullQueryQueue.acceptRows(rows)) {
LOG.error("Failed to queue all rows");
}
} catch (Exception e) {
throw new KsqlException("Error handling streamed rows: " + e.getMessage(), e);
}
};
}
Expand All @@ -414,4 +451,9 @@ private static void validateSchema(
forwardedSchema, forwardedNode, expectedSchema));
}
}

private enum RoutingResult {
SUCCESS,
STANDBY_FALLBACK
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.pull;

import io.confluent.ksql.util.KsqlException;

/**
* This exception is thrown to indicate that pull queries should fallback on the next standby in
* line.
*/
public class StandbyFallbackException extends KsqlException {

public StandbyFallbackException(final Throwable cause) {
super(cause);
}

public StandbyFallbackException(final String message) {
super(message);
}

public StandbyFallbackException(final String message, final Throwable cause) {
super(message, cause);
}
}
Loading