Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multi-scheduler deployments #59

Merged
merged 39 commits into from
Jul 16, 2022

Conversation

thinkharderdev
Copy link
Contributor

@thinkharderdev thinkharderdev commented Jun 5, 2022

Which issue does this PR close?

Closes #39

Posting this draft PR for review and feedback but there are some more TODO items still in progress (mostly around cleanup and unit test coverage) but this change passes the integration tests as is so should be ready for a "test-drive" now.

TODO (before merging)

  • Additional unit test coverage
  • Fix ExecuteQuery so we don't do all the planning as part of the gRPC handler (I changed this to simplify things while testing but need to revert back to the old implementation where we do the planning in an async task in the event loop)
  • General cleanup and documentation.

Rationale for this change

See #39 for a complete description but this change addresses the following issues:

  1. Allow for deployments with multiple schedulers for high-availability and scalability.
  2. Maintain fine-grained task state in persistent storage so even single-scheduler deployments are more robust to scheduler restarts.

What changes are included in this PR?

This is quite a large refactor. See the key points below:

Event Loop

The core event loop remains mostly unchanged. The biggest change is that stage scheduling has been mostly internalized by the ExecutionGraph so we only publish JobSubmitted and JobCompleted query stage events/ Likewise, the actual structure of SchedulerServerEvents is changed (see details below).

ExecutionGraph

In the current implementation, the state of a job is spread across a number of different data structures so would be difficult to move to external state in the current form, especially in light of requirements around distributed locks.

In this change, the TaskScheduler and StageManager have been replaced with an ExecutionGraph data structure which is serializable (so it can be persisted in the state backend easily) and internalized both the underlying execution plan and dependencies between query stages. This data structure internalizes the order of execution and availability of tasks and presents an interface that allows the scheduler to see a job as a stream of discrete tasks, which I believe makes the logic in the scheduler loop more straightforward.

ExecutorReservation

This data structure represents a reserved task slot on a given executor which may optionally be tied to a specific job ID. The idea here is that to avoid lock contention on the backend data store, we "hang on" to task slots through the scheduler loop instead of immediately returning them to the pool. So when a scheduler gets a task status update, it has a new task slot reservation that it can try to fill and will only return that task slot to the pool if it cannot find a task that is ready.

TaskManager

The TaskManager encapsulates task/job management within the scheduler. The two most important things the TaskManager does are:

  • fill_reservations which will take a list of ExecutorReservations and try to assign a task to each one (with preference given to the reservations job_id if present). See the docs string for details about the implementation.
  • update_task_statuses which will apply task status updates received from the executors and return a list of QueryStageSchedulerEvent along with a list of ExecutorReservation to be filled by the scheduler. See the docs string for details about the implementation.

StateBackendClient

I've changed this trait slightly to help with this use case:

  • Added the concept of a Keyspace which is just a namespace for keys but we already use namespace for something else so I didn't want to overload the term. This is mostly just encoding a desired structure for the storage layer as Keyspace is an enum and helping to remove boilerplate elsewhere in the codebase.
  • Made the lock method scoped to a Keyspace and key so we can lock individual resources. Using a single global mutex on the state is probably not scalable.
  • Added a scan method which will return all key/values in a particular keyspace (with an optional limit).
  • Added a scan_keys which will do the same as scan but only return keys
  • Added a put_txn method which allows atomically updating multiple key/values. Since we do batch updates in many places, this simplifies error handling. It is implemented in both Sled (using batch operations) and etcd (using transactions).
  • Added a mv (because move is a reserved word in rust :)) that will atomically move a key from one keyspace to another.

The Scheduler Flow

Putting this altogether, the conceptual flow in the scheduler works like so:

Push Scheduling

  • Receive a set of task updates
  • Call TaskManager::update_task_statuses to apply updates and get back a set of ExecutorReservations
  • Publish a SchedulerServerEvent::Offer event with these reservations
  • The event loop will receive this event and call TaskManager::fill_reservations to attempt to assign tasks to each reservation, giving priority to jobs which were being updated.
  • For assigned reservations, launch the corresponding tasks.
  • For unassigned reservations, cancel (i.e. return the task slots to the pool)

When a new job is submitted, we will try and reserve task slots up to the number of tasks in the job's initial stage and launch them.

Pull Scheduling

  • Receive a PollWorkRequest
  • Apply and task updated in the request using TaskManager::update_task_statuses
  • If the poller can accept a task, create a new reservation and call TaskManager::fill_reservations to try and fill it.
  • Return a PollWorkResponse with the task that was assigned (if any).

Are there any user-facing changes?

This change is mostly to the internal mechanics of the scheduler. The only user-facing change is to the StateBackendClient trait.

Yes, the StateBackendClient contract is changed.

@thinkharderdev thinkharderdev marked this pull request as draft June 5, 2022 17:16
@thinkharderdev
Copy link
Contributor Author

…-scheduler

# Conflicts:
#	ballista/rust/executor/src/execution_loop.rs
#	ballista/rust/scheduler/src/scheduler_server/event_loop.rs
#	ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
#	ballista/rust/scheduler/src/scheduler_server/grpc.rs
#	ballista/rust/scheduler/src/scheduler_server/mod.rs
#	ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
#	ballista/rust/scheduler/src/state/mod.rs
#	ballista/rust/scheduler/src/state/persistent_state.rs
#	ballista/rust/scheduler/src/state/task_scheduler.rs
string job_id = 1;
string session_id = 2;
JobStatus status = 3;
repeated ExecutionGraphStage stages = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't clear to me how these stages form a dag. Are the dependencies between stages stored elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, sorry I should have explained that. I'll add better docs to this struct, but for now each stage has an output_link: Option<usize> which specifies where it sends it's output. If output_link is None then the stage is final and it sends its output to the ExecutionGraphs output_locations. Likewise, each stage has a inputs: HashMap<usize,StageOuput> which "collects" input locations from its input stages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Yes, some comments in the structs here would be great,

@andygrove
Copy link
Member

I ran the integration tests (./dev/integration-tests.sh) and they ran without issue, so that gives me confidence that no regressions are introduced here.

@Ted-Jiang
Copy link
Member

@thinkharderdev wow! a lot change 👍, I have some questions:
Does the executor and scheduler have a one-to-one relationship?
if one-to-multi, how to keep executor available slot in each scheduler, if each sql-request need get state update from all-executor, is there are better way ?
I think this is one problem we should consider 😊

@@ -59,15 +58,14 @@ impl DistributedPlanner {
/// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
/// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
/// A [ShuffleWriterExec] is created whenever the partitioning changes.
pub async fn plan_query_stages<'a>(
pub fn plan_query_stages<'a>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some reason remove the async 🤔
I think there are some io work in plan_query_stages like save status in db

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was doing that at tone time but the implementation now is not doing IO so I changed it back to sync.

@thinkharderdev
Copy link
Contributor Author

@thinkharderdev wow! a lot change 👍, I have some questions: Does the executor and scheduler have a one-to-one relationship? if one-to-multi, how to keep executor available slot in each scheduler, if each sql-request need get state update from all-executor, is there are better way ? I think this is one problem we should consider 😊

No, it is one-to-multi (or really multi-to-multi in principle). The executor available slots are stored in the state backend (either etcd or Sled). But when an executor publishes a task status update to the scheduler, then the particular scheduler which receives that request can re-asssign the executor slots without reading the backend state because the slot hasn't been returned to the pool yet.

The session issue is interesting. For right now, we just save the session properties in the backend state and whenever we need to get a SessionContext for a session ID, we read the properties and create a new SessionContext. This can probably be optimized to use a Watch and an in-memory session registry but I kept it simple for right now.

@thinkharderdev
Copy link
Contributor Author

Thanks @thinkharderdev for the great work. The code is well implemented and well documented. And the task scheduling algorithm is also very sophisticated.

I only have two small concerns:

  1. For the stage-based task scheduling, suppose a scenario of 3 stages, stage3 ---> stage2--->stage1. 1K tasks for stage1, 1 task for stage2, and 1K tasks for stage3. When stage2 finishes, it's better to schedule the tasks for stage3 at all once. However, for the algorithm based on the ExecutorReservation, it seems this kind of all-at-once scheduling only happens when the job submitted. Maybe better to keep the previous event of StageFinished.
  2. For every state change for a job, like task status update, this ExecutionGraph-based implement needs to fetch and decode the ExecutionGraph from the config backend. Will it be too heavy, especially when there're thousands or millions of tasks for a job? Actually the previous design of keeping the tasks in memory aims to reduce such kind of cost. Therefore, I prefer not to persist the task status info into the config backend.
  3. For the job scheduling policy, this implementation makes it possible for one job to be scheduled by multiple schedulers. However, I think it's not necessary. It's better to employ an active-standby policy. And make the recovery level to be stage level rather than task level if the job's active scheduler terminated. Then we can avoid the ExecutionGraph decoding cost for every task update.

Thanks @yahoNanJing for the review.

  1. This is a good point. I wanted to avoid locking the executor state as much as possible but I see that the case you mentioned is a degenerate case.
  2. I am concerned about this as well. For simplicity I put everything into a single data structure but conceptually there is no reason we have to do it that way. We can have the ExecutionGraph store only the DAG structure and store the plan information in a separate data structure. Since the plan is immutable that could be cached in memory more effectively. And I think you're right in that the entire task status doesn't need to be stored at all. We only need to know whether it is pending or not.
  3. On this point I disagree. High availability is one goal of this work but another is horizontal scalability. The work involved in physical planning is sometimes non-trivial. For instance, scanning a large Hive table can involve reading parquet metadata from many files and it will be useful to be able to have multiple active schedulers to scale this work.

@yahoNanJing
Copy link
Contributor

Hi @thinkharderdev,

On this point I disagree. High availability is one goal of this work but another is horizontal scalability. The work involved in physical planning is sometimes non-trivial. For instance, scanning a large Hive table can involve reading parquet metadata from many files and it will be useful to be able to have multiple active schedulers to scale this work.

Maybe I did not make my point clear. My point is for one job, there's always be only one active scheduler and others for standby purpose. However, different jobs can have its own specific active scheduler. For the physical plan generation for a query, I think it's dealt with in one scheduler, right?

@thinkharderdev
Copy link
Contributor Author

Hi @thinkharderdev,

On this point I disagree. High availability is one goal of this work but another is horizontal scalability. The work involved in physical planning is sometimes non-trivial. For instance, scanning a large Hive table can involve reading parquet metadata from many files and it will be useful to be able to have multiple active schedulers to scale this work.

Maybe I did not make my point clear. My point is for one job, there's always be only one active scheduler and others for standby purpose. However, different jobs can have its own specific active scheduler. For the physical plan generation for a query, I think it's dealt with in one scheduler, right?

Sorry, yes I think that makes sense. If each job was "owned" by a single scheduler then we could avoid a lot of overhead and synchronization on the DAG state.

@thinkharderdev
Copy link
Contributor Author

@yahoNanJing Made a small change in the event loop. It will now eagerly attempt to schedule additional pending tasks for a job on update. I think this should address your point from 1 above.

@yahoNanJing
Copy link
Contributor

Hi @thinkharderdev, just left a few comments.

@thinkharderdev
Copy link
Contributor Author

Hi @thinkharderdev, just left a few comments.

Hi @yahoNanJing I don't see any comments. Did you submit them?

…-scheduler

# Conflicts:
#	ballista/rust/executor/src/execution_loop.rs
@yahoNanJing
Copy link
Contributor

Hi @thinkharderdev, it's aside the code.

@thinkharderdev
Copy link
Contributor Author

Hi @thinkharderdev, it's aside the code.

I don't see any comments :)

@thinkharderdev
Copy link
Contributor Author

@andygrove @yahoNanJing What do we think of this PR?

@yahoNanJing
Copy link
Contributor

Thanks @thinkharderdev. Sorry for response late. I think we can merge this PR first to avoid dealing with the conflicts repeatedly. Then we can continue refining step by step.

@andygrove
Copy link
Member

Thank you @thinkharderdev and @yahoNanJing !

@andygrove
Copy link
Member

@thinkharderdev Looks like there is a merge conflict that needs resolving

/// Map from stage ID -> List of child stage IDs
stage_dependencies: HashMap<usize, Vec<usize>>,
/// Map from Stage ID -> output link
output_links: HashMap<usize, usize>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One stage may be the input of multiple sub-stages. Therefore, the data structure of output_links should be HashMap<usize, Vec>

decode_into::<protobuf::ExecutorData, ExecutorData>(&value)?;
let take = std::cmp::min(data.available_task_slots, desired);

for _ in 0..take {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a different policy from the previous round-robin one. Not sure whether it's better for the tasks be scheduled evenly to the executors?

@@ -105,9 +105,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.executor_manager
.cancel_reservations(free_list)
.await?;
Ok(None)
} else if pending_tasks > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need else here? ❓

.state
.task_manager
.fill_reservations(&reservations)
.await
{
Ok((assignments, mut unassigned_reservations)) => {
Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
for (executor_id, task) in assignments.into_iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to classify tasks for the same executor and then can be launched only once?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for multi-scheduler deployments
5 participants