diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java index d81e49861aa0..f03d18cb3fc2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlStage.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlStage.java @@ -44,6 +44,15 @@ import static io.trino.server.DynamicFilterService.getOutboundDynamicFilters; import static java.util.Objects.requireNonNull; +/** + * This class is merely a container used by coordinator to track tasks for a single stage. + *
+ * It is designed to keep track of execution statistics for tasks from the same stage as well + * as aggregating them and providing a final stage info when the stage execution is completed. + *
+ * This class doesn't imply anything about the nature of execution. It is not responsible + * for scheduling tasks in a certain order, gang scheduling or any other execution primitives. + */ @ThreadSafe public final class SqlStage { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java index d9812b0a2233..cec1488ff8a2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java @@ -81,6 +81,26 @@ import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE; import static java.util.Objects.requireNonNull; +/** + * This class is designed to facilitate the pipelined mode of execution. + *
+ * In the pipeline mode the tasks are executed in all-or-nothing fashion with all + * the intermediate data being "piped" between stages in a streaming way. + *
+ * This class has two main responsibilities: + *
+ * 1. Linking pipelined stages together. If a new task is scheduled the implementation + * notifies upstream stages to add an additional output buffer for the task as well as + * it notifies the downstream stage to update a list of source tasks. It is also + * responsible of notifying both upstream and downstream stages when no more tasks will + * be added. + *
+ * 2. Facilitates state transitioning for a pipelined stage execution according to the + * all-or-noting model. If any of the tasks fail the implementation is responsible for + * terminating all remaining tasks as well as propagating the original error. If all + * the tasks finish successfully the implementation is responsible for notifying the + * scheduler about a successful completion of a given stage. + */ public class PipelinedStageExecution { private static final Logger log = Logger.get(PipelinedStageExecution.class); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index a0742db9f03d..0f9ec09f0e29 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -712,6 +712,18 @@ public void noMoreTasks(PlanFragmentId fragmentId) } } + /** + * Scheduler for stages that must be executed on coordinator. + *
+ * Scheduling for coordinator only stages must be represented as a separate entity to + * ensure the coordinator stages/tasks are never restarted in an event of a failure. + *
+ * Coordinator only tasks cannot be restarted due to the nature of operations + * they perform. For example commit operations for DML statements are performed as a + * coordinator only task (via {@link io.trino.operator.TableFinishOperator}). Today it is + * not required for a commit operation to be side effect free and idempotent what makes it + * impossible to safely retry. + */ private static class CoordinatorStagesScheduler { private static final int[] SINGLE_PARTITION = new int[] {0}; @@ -1006,6 +1018,20 @@ public void onTaskFailed(TaskId taskId, Throwable failure) } } + /** + * Scheduler for stages executed on workers. + *
+ * As opposed to {@link CoordinatorStagesScheduler} this component is designed + * to facilitate failure recovery. + *
+ * In an event of a failure the system may decide to terminate an active scheduler + * and create a new one to initiate a new query attempt. + *
+ * Stages scheduled by this scheduler are assumed to be safe to retry. + *
+ * The implementation is responsible for task creation and orchestration as well as
+ * split enumeration, split assignment and state transitioning for the tasks scheduled.
+ */
private interface DistributedStagesScheduler
{
void schedule();
@@ -1023,6 +1049,9 @@ private interface DistributedStagesScheduler
Optional