Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce push-based task scheduling for Ballista #1560

Merged
merged 3 commits into from
Jan 23, 2022
Merged

Introduce push-based task scheduling for Ballista #1560

merged 3 commits into from
Jan 23, 2022

Conversation

yahoNanJing
Copy link
Contributor

@yahoNanJing yahoNanJing commented Jan 14, 2022

Which issue does this PR close?

Closes #1221.

Rationale for this change

https://docs.google.com/document/d/1Z1GO2A3bo7M_N26w_5t-9h3AhIPC2Huoh0j8jgwFETk/edit?usp=sharing

What changes are included in this PR?

This PR also includes the fix of #1558.

Are there any user-facing changes?

To startup scheduler and executors, we add an additional option to indicate which task scheduling policy to be used. To enable push-based task scheduling, we can just run as follows:

RUST_LOG=info ./target/debug/ballista-scheduler -s PushStaged

RUST_LOG=info ./target/debug/ballista-executor -s PushStaged

@realno
Copy link
Contributor

realno commented Jan 14, 2022

@yahoNanJing it is a very well written document, great work!

I am wondering if there are any options based on the original poll model you have investigated and what your findings are. I think there are many benefits for using the poll/pull model:

  • The scheduler and executor are better decoupled. The scheduler does not need to have any knowledge of the executors, its job is to construct and optimize the plan. On the other hand the executors just need to know where to get the tasks, this can be future abstracted by using some queuing or messaging system. It is a fairly clean design and can scale pretty well.
  • There are minimal states maintained within the system, that will help stability and resilience of the system
  • The complexity of the system is low comparing to the push model

Regarding the original issue, I see a good reason to try reducing CPU usage. In terms of query time, is it that critical for DataFusion use cases? IMO we would optimize for large distrbuted jobs, perhaps we can live with a few millisecond delay here and there.

Again, thanks for the proposal I am curious about what you and other contributors think.

BTW, I am recently thinking about having Ballista production ready and work well with modern cloud native architecture, I think you are into the same topic. I am happy to have discussion about it.

@alamb alamb requested a review from andygrove January 16, 2022 13:19
@alamb
Copy link
Contributor

alamb commented Jan 16, 2022

Thank you for the contribution @yahoNanJing . This looks like some great work, but I don't feel qualified to review this at the moment.

I think this is a symptom of a larger problem that the community doesn't really have a "ballista" champion (aka someone who knows the ballista codebase and architecture well and is driving it forward). There is more discussion on this issue at #1273 and @houqp and @andygrove have some good thoughts. @rdettai has been silent for some time, but in the past he seemed interested in helping too.

I would personally love to get Ballista driven forward by someone who was actively using it. I wonder if you and/or your group and project are willing to do so? I can volunteer to provide basic support such as merging PRs and sanity checks, but I can't realistically provide useful architectural or code guidance.

cc @jimexist @Dandandan in case you have other thoughts

@alamb alamb changed the title Introduce push-based task scheduling Introduce push-based task scheduling for Ballista Jan 17, 2022
@houqp houqp linked an issue Jan 19, 2022 that may be closed by this pull request
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM over all. Left a comment on whether we should still keep the old polling logic around.

Very well written design doc and nice benchmark numbers @yahoNanJing 👍

ballista/rust/executor/src/main.rs Show resolved Hide resolved
ballista/rust/scheduler/src/lib.rs Outdated Show resolved Hide resolved
ballista/rust/scheduler/src/lib.rs Outdated Show resolved Hide resolved
ballista/rust/executor/src/executor_server.rs Show resolved Hide resolved
@realno
Copy link
Contributor

realno commented Jan 19, 2022

LGTM over all. Left a comment on whether we should still keep the old polling logic around.

Very well written design doc and nice benchmark numbers @yahoNanJing 👍

@houqp I have a slight concern that the push model adds some extra complexities to the design - please see my comment above. Perhaps we can have a discussion before merging this?

@yahoNanJing
Copy link
Contributor Author

yahoNanJing commented Jan 19, 2022

Hi @realno, thanks for your comments.

As I have mentioned in the design document, there are several disadvantages for the poll/pull model for task assignment.

  • Because there's no master for the cluster topology, it's not so easy to do global optimization for the task assignment, like local shuffle, etc.
  • It's not so efficient to fetch next task from the scheduler. Like current implement, it has to scan all of the tasks to check whether it's able to be scheduled. In the design, we also have attached our benchmark results. As the scheduler runs with scheduling more and more tasks, the performance of the poll/pull model will be downgraded. However, for the push model, it can achieve job level task checking and the task scheduling performance will not be downgraded.
  • CPU waste and may need 100ms latency which is not good for interactive queries which may need to be finished within 1s.

The poll/pull model is simple. However, as I know, it's rarely used in existing production environment.

Besides, this PR does not remove the code of the poll/pull model. Users can choose which model to use for their own cases.

@yahoNanJing
Copy link
Contributor Author

Hi @alamb, sorry for the late response. Our group currently is actively working on the distribute engine of Datafusion. And our final goal is to put the Ballista run in the production environment as stable and powerful as the Spark. And our team is glad to drive the Ballista forward.

@houqp
Copy link
Member

houqp commented Jan 19, 2022

@realno to your question on poll v.s. push, I think you are spot on with regards to poll being simpler on design. @edrevo and @j4ckcyw had some good prior discussions on this topic as well at https://github.com/ballista-compute/ballista/issues/463.

My current thinking on this is the scheduler state complexity introduced through the push model might be needed in the long run for optimal task scheduling when we can take more resource related factors into account instead of just available task slots. Having a global view of the system generally yields better task placements.

On the scaling front, even though push based model incurs more state management overhead on the scheduler side, it has its own scaling strength as well. For example, active message exchanges between scheduler and executors scale much better with larger executor pool size because heart beat messages are much easier to process compared to task poll requests. It is also easier for the scheduler to reduce its own load by scheduling proactively schedule less tasks v.s. rate limiting executor poll requests in a polling model.

So I think there is no clear cut here. Perhaps another angle would be a hybrid model where we still use the base model as the foundation , but try to move as much state management logic into the executors to reduce the overhead on the scheduler side.

@houqp
Copy link
Member

houqp commented Jan 19, 2022

Sorry @realno , I thought I pressed the comment button, turns out I didn't, thus the delay in responding to your earlier comment :(

@yahoNanJing
Copy link
Contributor Author

Thanks @houqp. Just rebase the code to avoid the conflicts and add a fix commit

@andygrove
Copy link
Member

I just saw this PR and will review the design doc in the next day or two. I wonder if @edrevo has any thoughts on this as well, since he designed much of the current implementation.

@realno
Copy link
Contributor

realno commented Jan 19, 2022

@yahoNanJing and @houqp thanks for the great discussion, and @houqp thanks for sharing the old issue, really good read, and happy to see there were some discussions happened before.

I definitely agree that there is no clear cut and each mode has its pros and cons, my preference is to use a simpler design that in practice works well enough. The reason is quite related to our use case - we are planning to deploy at least thousands of clusters allocating resources dynamically. So some key criteria are stability, error handling and recovery, and low management overhead per cluster. So this is the reasoning for simplifying communication model, simplifying state as they all have big impact on the above goal.

Because there's no master for the cluster topology, it's not so easy to do global optimization for the task assignment, like local shuffle, etc.

I am not sure there will be a difference in optimization. I think we can achieve the same thing, the difference is the communication model.

It's not so efficient to fetch next task from the scheduler. Like current implement, it has to scan all of the tasks to check whether it's able to be scheduled. In the design, we also have attached our benchmark results. As the scheduler runs with scheduling more and more tasks, the performance of the poll/pull model will be downgraded. However, for the push model, it can achieve job level task checking and the task scheduling performance will not be downgraded.
CPU waste and may need 100ms latency which is not good for interactive queries which may need to be finished within 1s.

I think this two points are because of the current implementation, there are other ways to improve/fix. For example, have the scheduler to split the tasks based on partition and readiness instead of scanning the whole list. And change to a model that doesn't involve a sleep loop.

My current thinking on this is the scheduler state complexity introduced through the push model might be needed in the long run for optimal task scheduling when we can take more resource related factors into account instead of just available task slots. Having a global view of the system generally yields better task placements.

This is a great point. My current thinking is to leverage modern orchestration technologies such as k8s to help. That is, leave it to k8s to figure out resource allocation on the physical cluster, we can set a few universal slot sizes for executors so we don't need to deal with hardware level RA - this is also for simplicity. And I know there will be use cases that still prefer the traditional way to manage a single physical cluster, we do need to evaluate the impact for that model. Though the main reason I am thinking about going away from Spark (and looking at DF as replacement) is its deployment model and operational cost comes with it.

On the scaling front, even though push based model incurs more state management overhead on the scheduler side, it has its own scaling strength as well. For example, active message exchanges between scheduler and executors scale much better with larger executor pool size because heart beat messages are much easier to process compared to task poll requests. It is also easier for the scheduler to reduce its own load by scheduling proactively schedule less tasks v.s. rate limiting executor poll requests in a polling model.

Agreed. I think there are different ways to limit polling rate. especially if something like k8s comes to play. If the executors are more or less universally sized and dynamically managed we can potentially use auto-scale techniques to control that. That is, instead of all executors polling scheduler constantly, only new ones and ones have done their job need to poll.

So I think there is no clear cut here. Perhaps another angle would be a hybrid model where we still use the base model as the foundation , but try to move as much state management logic into the executors to reduce the overhead on the scheduler side.

Completely agree. I want to add that since we are a community here it is great to share different view points and make a reasonable decision that we devote to improve and maintain going forward. I am hoping either way we choose to move forward with we will make it a competing (hopefully better) platform to Spark.

@realno
Copy link
Contributor

realno commented Jan 19, 2022

@yahoNanJing You mentioned your use case is mainly interactive queries. Out of curiosity, what does your current Spark deployment looks like? What are the biggest drivers (or pain points) for you to switch?

Feel free to move this conversation offline if you prefer.

@yahoNanJing
Copy link
Contributor Author

You mentioned your use case is mainly interactive queries. Out of curiosity, what does your current Spark deployment looks like? What are the biggest drivers (or pain points) for you to switch?

Hi @realno, actually our team has customized the Spark to serve interactive queries. The Spark cluster is long running with thrift server for receiving query requests. The main pain point is it's not fast enough to serve queries within seconds. That's the main reason that we hope to utilize a native distributed compute engine with zero copy shuffling for our case.

@realno
Copy link
Contributor

realno commented Jan 20, 2022

You mentioned your use case is mainly interactive queries. Out of curiosity, what does your current Spark deployment looks like? What are the biggest drivers (or pain points) for you to switch?

Hi @realno, actually our team has customized the Spark to serve interactive queries. The Spark cluster is long running with thrift server for receiving query requests. The main pain point is it's not fast enough to serve queries within seconds. That's the main reason that we hope to utilize a native distributed compute engine with zero copy shuffling for our case.

That makes a lot of sense @yahoNanJing. We have exact the same setup for one use case. How many executors do you guys run?

@houqp
Copy link
Member

houqp commented Jan 20, 2022

This is a great point. My current thinking is to leverage modern orchestration technologies such as k8s to help. That is, leave it to k8s to figure out resource allocation on the physical cluster, we can set a few universal slot sizes for executors so we don't need to deal with hardware level RA - this is also for simplicity. And I know there will be use cases that still prefer the traditional way to manage a single physical cluster, we do need to evaluate the impact for that model. Though the main reason I am thinking about going away from Spark (and looking at DF as replacement) is its deployment model and operational cost comes with it.

I think being able to leverage k8s (or other orchestration platform) would be a good feature for us to explore and support. Although I do have a fairly strong feeling on this topic. Specifically I think it would be good for ballista to be self contained and not have a hard dependency on complex technologies like k8s, nomad, etc. For the k8s scheduling feature, I think we should make it one of the options users can configure to use. I am 100% with you on putting simple deployment and operation model front and center as one of the big selling points for ballista.

Just throwing out random ideas here. For k8s backed clusters, what if instead of running fixed set of executors ahead of time, we let the scheduler create and schedule new pod for each new task it needs to assign. Then each executor pod will run to completion and report back task status before exit. This way, we avoid all the unnecessary polling required for a long running executor and can let k8s fully manage the resource allocation at the task level. There will be no state management required from the schedule's point of view as well because it basically deals with an "infinite" number of executors ready to accept tasks. This setup can be easily extended to other container orchestration systems as well like AWS ECS. In fact, we can even run each task as an AWS lambda function if we want.

Completely agree. I want to add that since we are a community here it is great to share different view points and make a reasonable decision that we devote to improve and maintain going forward. I am hoping either way we choose to move forward with we will make it a competing (hopefully better) platform to Spark.

Perhaps like what @yahoNanJing said earlier, we can keep both modes for now and make the final decision on this when we are able to gather more data from running different real world workloads on larger clusters? Technical decisions are usually much easier to make when they are backed by real data :)

@alamb
Copy link
Contributor

alamb commented Jan 20, 2022

Is this PR ready to merge?

@realno
Copy link
Contributor

realno commented Jan 20, 2022

For k8s backed clusters, what if instead of running fixed set of executors ahead of time, we let the scheduler create and schedule new pod for each new task it needs to assign.

This is what I am thinking too.

we can keep both modes for now and make the final decision on this when we are able to gather more data from running different real world workloads on larger clusters?

This sounds good to me.

@houqp
Copy link
Member

houqp commented Jan 20, 2022

@alamb I believe we are waiting for feedback from Andy, see #1560 (comment).

This is what I am thinking too.

Awesome! I think this design leans more towards a push model at its core and will likely deserve to have its own execution mode other than the existing poll and push modes. I am personally very looking forward to see a working PoC :)

@realno
Copy link
Contributor

realno commented Jan 20, 2022

ikely deserve to have its own execution mode other than the existing poll and push modes. I am personally very looking forward to see a working PoC :)

@alamb I believe we are waiting for feedback from Andy, see #1560 (comment).

This is what I am thinking too.

Awesome! I think this design leans more towards a push model at its core and will likely deserve to have its own execution mode other than the existing poll and push modes. I am personally very looking forward to see a working PoC :)

I am slowly working on the initial proposal atm, will share when ready. After that I will try to break the work down into issues/tasks. It would be nice to build the PoC with developers from the community, I still need some time to assemble a team in my org.

@yahoNanJing
Copy link
Contributor Author

yahoNanJing commented Jan 21, 2022

Hi @realno,

We have exact the same setup for one use case. How many executors do you guys run?

Actually, we have two products for different cases. One is for generic ad-hoc case. The other is for more time-sensitive cases. For the generic one, there're thousands of executors. While for the other one, it depends on the use cases. We may plan to have around one hundred executors.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +45 to +46
clap = "2"
parse_arg = "0.1.3"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice if we can avoid depending on CLI related dependencies like clap and parse_arg in the core, but I think it's a minor issue we can clean up as follow ups later.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the design doc and it looks good to me. Thanks for pushing Ballista forward!

@houqp houqp merged commit 71757bb into apache:master Jan 23, 2022
@houqp
Copy link
Member

houqp commented Jan 23, 2022

Thank you @yahoNanJing for the cool optimization. Great to have your team and @realno driving Ballista forward :D

@houqp houqp added the performance Make DataFusion faster label Jan 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Make DataFusion faster
Projects
None yet
6 participants