Skip to content

Commit

Permalink
Add high level comments for some scheduling components
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr authored and sopel39 committed Dec 16, 2021
1 parent e8bcfe2 commit a60184c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@
import static io.trino.server.DynamicFilterService.getOutboundDynamicFilters;
import static java.util.Objects.requireNonNull;

/**
* This class is merely a container used by coordinator to track tasks for a single stage.
* <p>
* It is designed to keep track of execution statistics for tasks from the same stage as well
* as aggregating them and providing a final stage info when the stage execution is completed.
* <p>
* This class doesn't imply anything about the nature of execution. It is not responsible
* for scheduling tasks in a certain order, gang scheduling or any other execution primitives.
*/
@ThreadSafe
public final class SqlStage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@
import static io.trino.spi.StandardErrorCode.REMOTE_HOST_GONE;
import static java.util.Objects.requireNonNull;

/**
* This class is designed to facilitate the pipelined mode of execution.
* <p>
* In the pipeline mode the tasks are executed in all-or-nothing fashion with all
* the intermediate data being "piped" between stages in a streaming way.
* <p>
* This class has two main responsibilities:
* <p>
* 1. Linking pipelined stages together. If a new task is scheduled the implementation
* notifies upstream stages to add an additional output buffer for the task as well as
* it notifies the downstream stage to update a list of source tasks. It is also
* responsible of notifying both upstream and downstream stages when no more tasks will
* be added.
* <p>
* 2. Facilitates state transitioning for a pipelined stage execution according to the
* all-or-noting model. If any of the tasks fail the implementation is responsible for
* terminating all remaining tasks as well as propagating the original error. If all
* the tasks finish successfully the implementation is responsible for notifying the
* scheduler about a successful completion of a given stage.
*/
public class PipelinedStageExecution
{
private static final Logger log = Logger.get(PipelinedStageExecution.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,18 @@ public void noMoreTasks(PlanFragmentId fragmentId)
}
}

/**
* Scheduler for stages that must be executed on coordinator.
* <p>
* Scheduling for coordinator only stages must be represented as a separate entity to
* ensure the coordinator stages/tasks are never restarted in an event of a failure.
* <p>
* Coordinator only tasks cannot be restarted due to the nature of operations
* they perform. For example commit operations for DML statements are performed as a
* coordinator only task (via {@link io.trino.operator.TableFinishOperator}). Today it is
* not required for a commit operation to be side effect free and idempotent what makes it
* impossible to safely retry.
*/
private static class CoordinatorStagesScheduler
{
private static final int[] SINGLE_PARTITION = new int[] {0};
Expand Down Expand Up @@ -1006,6 +1018,20 @@ public void onTaskFailed(TaskId taskId, Throwable failure)
}
}

/**
* Scheduler for stages executed on workers.
* <p>
* As opposed to {@link CoordinatorStagesScheduler} this component is designed
* to facilitate failure recovery.
* <p>
* In an event of a failure the system may decide to terminate an active scheduler
* and create a new one to initiate a new query attempt.
* <p>
* Stages scheduled by this scheduler are assumed to be safe to retry.
* <p>
* The implementation is responsible for task creation and orchestration as well as
* split enumeration, split assignment and state transitioning for the tasks scheduled.
*/
private interface DistributedStagesScheduler
{
void schedule();
Expand All @@ -1023,6 +1049,9 @@ private interface DistributedStagesScheduler
Optional<StageFailureInfo> getFailureCause();
}

/**
* A {@link DistributedStagesScheduler} implementation facilitating pipelined execution mode.
*/
private static class PipelinedDistributedStagesScheduler
implements DistributedStagesScheduler
{
Expand Down

0 comments on commit a60184c

Please sign in to comment.