Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce StageManager for managing tasks stage by stage #1983

Merged
merged 6 commits into from
Mar 17, 2022
Merged

Introduce StageManager for managing tasks stage by stage #1983

merged 6 commits into from
Mar 17, 2022

Conversation

yahoNanJing
Copy link
Contributor

@yahoNanJing yahoNanJing commented Mar 10, 2022

Which issue does this PR close?

Closes #1936, #1704.

Rationale for this change

Previously the task fetching time complexity is n^2. By introducing stage-based scheduling, it can be reduced to n. And it will also be easy for task error recovering and stage priority-based scheduling in the future.

What's more, by this PR, the performance of Pull-Staged task scheduling is also improved a lot due to there's no need to check all of the tasks status in the whole system.

What changes are included in this PR?

  • Introduced Stage and StageManager
  • Removed SchedulerStateWatcher, due to we have introduced event-based processing mechanism for the state machine changes.

Details see #1936.

Are there any user-facing changes?

It's blocked by #1934.

@yahoNanJing
Copy link
Contributor Author

Hi @alamb and @thinkharderdev, could you help review this PR? The details design can be seen in #1936.

) -> Result<Option<SchedulerServerEvent>> {
let mut available_executors = self.state.get_available_executors_data();
async fn offer_resources(&self, n: u32) -> Result<Option<SchedulerServerEvent>> {
let mut available_executors = if self.is_test {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, setting flags specific for tests in application logic is usually not an anti-pattern. It makes the code harder to read and adds unnecessary runtime overhead. If I understand it correctly, the main problem is we don't have executor heartbeats in tests causing get_alive_executors to return an empty set? If so, I would recommend adding a helper method to set artificial executor heartbeats directly from tests so we can keep the core logic agnostics to tests.

Copy link
Contributor Author

@yahoNanJing yahoNanJing Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just the heartbeat issue. For Push-based task scheduling, the schedule will launch tasks to the corresponding executors which should start up the GRPC service. It's a bit too heavy for a unit test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a cfg to achieve the same result. So something like:

    #[cfg(not(test))]
    pub(crate) fn get_available_executors_data(&self) -> Vec<ExecutorData> {
        let mut res = {
            let alive_executors = self.get_alive_executors_within_one_minute();
            let executors_data = self.executors_data.read();
            executors_data
                .iter()
                .filter_map(|(exec, data)| {
                    (data.available_task_slots > 0 && alive_executors.contains(exec))
                        .then(|| data.clone())
                })
                .collect::<Vec<ExecutorData>>()
        };
        res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
        res
    }

    #[cfg(test)]
    pub(crate) fn get_available_executors_data(&self) -> Vec<ExecutorData> {
        let mut res: Vec<ExecutorData> =
            self.executors_data.read().values().cloned().collect();
        res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
        res
    }

and then

        #[cfg(not(test))]
        if num_tasks > 0 {
            self.launch_tasks(&executors_delta_data, tasks_assigment)
                .await?;
        }

This still makes the code slightly harder to read but it removes any runtime overhead and also localizes the additional flags to two places.

@yahoNanJing
Copy link
Contributor Author

yahoNanJing commented Mar 11, 2022

With this PR, the performance of the load testing is improved a lot.

Cluster:

One scheduler + one executor of 4 task slots.

Benchmark of Load Testing:

  1. Firstly, for each case, setup an empty cluster.
  2. Then run 3 rounds with the following command with 200 requests and 50 concurrency on 1g tpch testing data:
    ./tpch loadtest ballista-load --requests 200 --concurrency 50 --host localhost --port 50050 --sql-path queries/ --format parquet --data-path data/tpch-1g-oneFile --query-list 1
TaskSchedulingPolicy Pull Push
Before PR 1. load test took 419353.8 ms
2. load test took 1008875.6 ms
3. load test took 1543815.8 ms
1. load test took 394576.6 ms
2. load test took 473540.7 ms
3. load test took 400769.2 ms
With PR 1. load test took 169643.3 ms
2. load test took 178026.6 ms
3. load test took 189341.6 ms
1. load test took 235245.9 ms
2. load test took 208277.6 ms
3. load test took 220425.3 ms

It seems there's some bug for the master branch when doing the load testing for the push-based task scheduling. Will check it later. It's fixed by #2006.

Single Query Performance

As the comparison standard, we need to know the single query performance.

  1. Firstly, for each case, setup an empty cluster.
  2. Then run 3 rounds with the following command with 1 requests and 1 concurrency on 1g tpch testing data:
    ./tpch loadtest ballista-load --requests 1 --concurrency 1 --host localhost --port 50050 --sql-path queries/ --format parquet --data-path data/tpch-1g-oneFile --query-list 1
TaskSchedulingPolicy Pull Push
Before PR 1. load test took 1765.2 ms
2. load test took 1535.3 ms
3. load test took 1534.2 ms
1. load test took 617.6 ms
2. load test took 615.6 ms
3. load test took 616.6 ms
With PR 1. load test took 1631.0 ms
2. load test took 1327.6 ms
3. load test took 1327.8 ms
1. load test took 616.3 ms
2. load test took 616.3 ms
3. load test took 614.7 ms

@thinkharderdev
Copy link
Contributor

Hi @alamb and @thinkharderdev, could you help review this PR? The details design can be seen in #1936.

Have been travelling but but I will spend time reviewing this week.

@yahoNanJing
Copy link
Contributor Author

yahoNanJing commented Mar 14, 2022

Hi @houqp, @thinkharderdev, @alamb, @yjshen, I've just fixed the bug for the master branch's push-based task scheduling by #2006. Could you help review that PR?

Now we can have the whole view of benchmark results.

  • For the push-based task scheduling, with this PR, the performance is improved around 2x.
  • For the pull-based task scheduling, with this PR, the performance is improved around 2x at the beginning and will not be linearly downgraded.

Comment on lines 138 to 133
self.state
.executor_manager
.update_executor_data(executor_delta_data);
// TODO check whether launching task is successful or not
client.launch_task(LaunchTaskParams { task: tasks }).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in the reverse order? Seems like a potential issue if launching the task fails then the executor's task slots will never be freed.

Doing it in reverse (launch tasks -> update state) could also be a problem, so maybe a write lock needs to be acquired before launching tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @thinkharderdev, here, I didn't consider too much about the error handling previously. I think you suggestion of doing it reverse makes sense.

Before updating the executor data, the tasks status have already been updated into Running status. Whether task launch successful or not, these tasks will be dangling in Running status. It's better for us to leverage speculative task execution in the future. And we don't need to consider too much of the situation of partial tasks launching success.

Why do you think we need a write lock?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was just a thought, not sure if the performance cost would be justifies. But the idea is that updating the executor state and launching the tasks should in principle be an atomic transaction. If you update first and then the launching fails, you resources will never be released but if you launch first and then update state then you have a concurrency issue where another thread may try and launch tasks after the task are launched and before the state is updated so you end up overcommitted.

The second issue seems less of a problem because it seems pretty straightforward to just have the executor reject the task if it has no available task slots, so it may be enough to just reverse the order here (launch tasks -> update state) and let the existing error handling mechanisms deal with it. Since launching requires a network call, holding a write lock on the entire state for that operation could be a major performance bottleneck, so if we did need a lock it would probably have to be at the individual executor level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @thinkharderdev. Actually, for the current design, the method launch_tasks will only be invoked in the single consumer side of the channel, which means there will be no concurrency issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, as you mentioned, the executor available task slots in the scheduler side is just kind of guidance for task scheduling. The real source of truth is in the executor side.

) -> Result<Option<SchedulerServerEvent>> {
let mut available_executors = self.state.get_available_executors_data();
async fn offer_resources(&self, n: u32) -> Result<Option<SchedulerServerEvent>> {
let mut available_executors = if self.is_test {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a cfg to achieve the same result. So something like:

    #[cfg(not(test))]
    pub(crate) fn get_available_executors_data(&self) -> Vec<ExecutorData> {
        let mut res = {
            let alive_executors = self.get_alive_executors_within_one_minute();
            let executors_data = self.executors_data.read();
            executors_data
                .iter()
                .filter_map(|(exec, data)| {
                    (data.available_task_slots > 0 && alive_executors.contains(exec))
                        .then(|| data.clone())
                })
                .collect::<Vec<ExecutorData>>()
        };
        res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
        res
    }

    #[cfg(test)]
    pub(crate) fn get_available_executors_data(&self) -> Vec<ExecutorData> {
        let mut res: Vec<ExecutorData> =
            self.executors_data.read().values().cloned().collect();
        res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
        res
    }

and then

        #[cfg(not(test))]
        if num_tasks > 0 {
            self.launch_tasks(&executors_delta_data, tasks_assigment)
                .await?;
        }

This still makes the code slightly harder to read but it removes any runtime overhead and also localizes the additional flags to two places.

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to spend some more time to understand fully but had a couple comments based on a quick review.

@yahoNanJing
Copy link
Contributor Author

yahoNanJing commented Mar 16, 2022

Could we use a cfg to achieve the same result. So something like:

Thanks @thinkharderdev for your suggestion. I used cfg at the beginning. However, I don't know whether this way is acceptable or not. If we all prefer this way, I'll change it back to cfg way.

By the way, if we use cfg, how to avoid the warning message for cargo test, like "warning: unused variable: tasks_assigment", "warning: associated function is never used: launch_tasks"?

@mingmwang
Copy link
Contributor

I think the ExecutionContext should not a member of QueryStageScheduler, since we will support session context soon.

pub(crate) struct QueryStageScheduler<
    T: 'static + AsLogicalPlan,
    U: 'static + AsExecutionPlan,
> {
    ctx: Arc<RwLock<SessionContext>>,
    state: Arc<SchedulerState<T, U>>,
    event_sender: Option<EventSender<SchedulerServerEvent>>,
}

@thinkharderdev
Copy link
Contributor

thinkharderdev commented Mar 16, 2022

Could we use a cfg to achieve the same result. So something like:

Thanks @thinkharderdev for your suggestion. I used cfg at the beginning. However, I don't know whether this way is acceptable or not. If we all prefer this way, I'll change it back to cfg way.

By the way, if we use cfg, how to avoid the warning message for cargo test, like "warning: unused variable: tasks_assigment", "warning: associated function is never used: launch_tasks"?

Yeah, I'm relatively new to this project (and Rust in general) so not sure whether this is an acceptable approach either, but it seems better to me for three reasons:

  1. It removes any runtime overhead/branching
  2. It doesn't require introducing a flag into a higher-level API
  3. It reduces the risk of a runtime bug caused by a misconfiguration/coding error since the runtime binary will never be compiled with the test code.

For the warnings I think we can use #[allow(dead_code)], but not sure if there is a way to make that conditional on the test attribute or not

@yahoNanJing
Copy link
Contributor Author

yahoNanJing commented Mar 16, 2022

I think the ExecutionContext should not a member of QueryStageScheduler, since we will support session context soon.

pub(crate) struct QueryStageScheduler<
    T: 'static + AsLogicalPlan,
    U: 'static + AsExecutionPlan,
> {
    ctx: Arc<RwLock<SessionContext>>,
    state: Arc<SchedulerState<T, U>>,
    event_sender: Option<EventSender<SchedulerServerEvent>>,
}

Thanks @mingmwang. It makes sense not to include context to the QueryStageScheduler. Will refactor it.

@yahoNanJing
Copy link
Contributor Author

Thanks @thinkharderdev. Just fixed as you suggested.

@yahoNanJing
Copy link
Contributor Author

Hi @thinkharderdev, @houqp, @yjshen and @alamb, are there any more concerns for this PR? If not much, how about merging first and refine step by step in the future?

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@houqp
Copy link
Member

houqp commented Mar 17, 2022

Very nice benchmark result @yahoNanJing !

I suggest we wait for @thinkharderdev 's review since he was actively reviewing PR.

@houqp houqp added the performance Make DataFusion faster label Mar 17, 2022
Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work @yahoNanJing!

Looks good to me. I'm cool with merging this and then iterating on any issues which may come up

@yjshen
Copy link
Member

yjshen commented Mar 17, 2022

Thanks @yahoNanJing @houqp @thinkharderdev @mingmwang!

@yjshen yjshen merged commit b88f103 into apache:master Mar 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Ballista] Introduce StageManager for managing tasks stage by stage
6 participants