From 9e6208619d7a4c1f0fc3f736c5bc70c1632756a1 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Tue, 4 Oct 2022 14:15:23 -0400 Subject: [PATCH] Simplify updateInputsForQueryResults logic in FaultTolerantQueryScheduler --- .../FaultTolerantQueryScheduler.java | 44 +++++++------------ 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java index 6081a2fa38f0..ff4d52864e2a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantQueryScheduler.java @@ -15,7 +15,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.concurrent.SetThreadName; import io.airlift.log.Logger; @@ -62,7 +61,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Ticker.systemTicker; import static com.google.common.base.Verify.verify; -import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Lists.reverse; import static io.airlift.concurrent.MoreFutures.addExceptionCallback; import static io.airlift.concurrent.MoreFutures.addSuccessCallback; @@ -71,6 +69,7 @@ import static io.trino.SystemSessionProperties.getFaultTolerantExecutionPartitionCount; import static io.trino.SystemSessionProperties.getRetryPolicy; import static io.trino.execution.QueryState.FINISHING; +import static io.trino.execution.scheduler.Exchanges.getAllSourceHandles; import static io.trino.operator.RetryPolicy.TASK; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -226,7 +225,6 @@ private Scheduler createScheduler() checkArgument(taskRetryAttemptsOverall >= 0, "taskRetryAttemptsOverall must be greater than or equal to 0: %s", taskRetryAttemptsOverall); AtomicInteger remainingTaskRetryAttemptsOverall = new AtomicInteger(taskRetryAttemptsOverall); - List outputStages = new ArrayList<>(); for (SqlStage stage : stagesInReverseTopologicalOrder) { PlanFragment fragment = stage.getFragment(); @@ -241,11 +239,6 @@ private Scheduler createScheduler() outputStage); exchanges.put(fragment.getId(), exchange); - if (outputStage) { - // output will be consumed by coordinator - outputStages.add(exchange); - } - ImmutableMap.Builder sourceExchanges = ImmutableMap.builder(); for (SqlStage childStage : stageManager.getChildren(fragment.getId())) { PlanFragmentId childFragmentId = childStage.getFragment().getId(); @@ -276,26 +269,23 @@ private Scheduler createScheduler() dynamicFilterService); schedulers.add(scheduler); - } - if (!stagesInReverseTopologicalOrder.isEmpty()) { - verify(!outputStages.isEmpty(), "coordinatorConsumedExchanges is empty"); - List>> futures = outputStages.stream() - .map(Exchange::getSourceHandles) - .map(Exchanges::getAllSourceHandles) - .collect(toImmutableList()); - ListenableFuture>> allFuture = Futures.allAsList(futures); - addSuccessCallback(allFuture, result -> { - List handles = result.stream() - .flatMap(List::stream) - .collect(toImmutableList()); - ImmutableList.Builder inputs = ImmutableList.builder(); - if (!handles.isEmpty()) { - inputs.add(new SpoolingExchangeInput(handles)); - } - queryStateMachine.updateInputsForQueryResults(inputs.build(), true); - }); - addExceptionCallback(allFuture, queryStateMachine::transitionToFailed); + if (outputStage) { + ListenableFuture> sourceHandles = getAllSourceHandles(exchange.getSourceHandles()); + addSuccessCallback(sourceHandles, handles -> { + try { + ImmutableList.Builder inputs = ImmutableList.builder(); + if (!handles.isEmpty()) { + inputs.add(new SpoolingExchangeInput(handles)); + } + queryStateMachine.updateInputsForQueryResults(inputs.build(), true); + } + catch (Throwable t) { + queryStateMachine.transitionToFailed(t); + } + }); + addExceptionCallback(sourceHandles, queryStateMachine::transitionToFailed); + } } return new Scheduler(