Skip to content

Commit

Permalink
Simplify updateInputsForQueryResults logic in FaultTolerantQuerySched…
Browse files Browse the repository at this point in the history
…uler
  • Loading branch information
arhimondr committed Oct 7, 2022
1 parent 2147a06 commit 9e62086
Showing 1 changed file with 17 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -62,7 +61,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Ticker.systemTicker;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Lists.reverse;
import static io.airlift.concurrent.MoreFutures.addExceptionCallback;
import static io.airlift.concurrent.MoreFutures.addSuccessCallback;
Expand All @@ -71,6 +69,7 @@
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionPartitionCount;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.execution.QueryState.FINISHING;
import static io.trino.execution.scheduler.Exchanges.getAllSourceHandles;
import static io.trino.operator.RetryPolicy.TASK;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -226,7 +225,6 @@ private Scheduler createScheduler()

checkArgument(taskRetryAttemptsOverall >= 0, "taskRetryAttemptsOverall must be greater than or equal to 0: %s", taskRetryAttemptsOverall);
AtomicInteger remainingTaskRetryAttemptsOverall = new AtomicInteger(taskRetryAttemptsOverall);
List<Exchange> outputStages = new ArrayList<>();
for (SqlStage stage : stagesInReverseTopologicalOrder) {
PlanFragment fragment = stage.getFragment();

Expand All @@ -241,11 +239,6 @@ private Scheduler createScheduler()
outputStage);
exchanges.put(fragment.getId(), exchange);

if (outputStage) {
// output will be consumed by coordinator
outputStages.add(exchange);
}

ImmutableMap.Builder<PlanFragmentId, Exchange> sourceExchanges = ImmutableMap.builder();
for (SqlStage childStage : stageManager.getChildren(fragment.getId())) {
PlanFragmentId childFragmentId = childStage.getFragment().getId();
Expand Down Expand Up @@ -276,26 +269,23 @@ private Scheduler createScheduler()
dynamicFilterService);

schedulers.add(scheduler);
}

if (!stagesInReverseTopologicalOrder.isEmpty()) {
verify(!outputStages.isEmpty(), "coordinatorConsumedExchanges is empty");
List<ListenableFuture<List<ExchangeSourceHandle>>> futures = outputStages.stream()
.map(Exchange::getSourceHandles)
.map(Exchanges::getAllSourceHandles)
.collect(toImmutableList());
ListenableFuture<List<List<ExchangeSourceHandle>>> allFuture = Futures.allAsList(futures);
addSuccessCallback(allFuture, result -> {
List<ExchangeSourceHandle> handles = result.stream()
.flatMap(List::stream)
.collect(toImmutableList());
ImmutableList.Builder<ExchangeInput> inputs = ImmutableList.builder();
if (!handles.isEmpty()) {
inputs.add(new SpoolingExchangeInput(handles));
}
queryStateMachine.updateInputsForQueryResults(inputs.build(), true);
});
addExceptionCallback(allFuture, queryStateMachine::transitionToFailed);
if (outputStage) {
ListenableFuture<List<ExchangeSourceHandle>> sourceHandles = getAllSourceHandles(exchange.getSourceHandles());
addSuccessCallback(sourceHandles, handles -> {
try {
ImmutableList.Builder<ExchangeInput> inputs = ImmutableList.builder();
if (!handles.isEmpty()) {
inputs.add(new SpoolingExchangeInput(handles));
}
queryStateMachine.updateInputsForQueryResults(inputs.build(), true);
}
catch (Throwable t) {
queryStateMachine.transitionToFailed(t);
}
});
addExceptionCallback(sourceHandles, queryStateMachine::transitionToFailed);
}
}

return new Scheduler(
Expand Down

0 comments on commit 9e62086

Please sign in to comment.