Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

settings in ExecuteQueryParams is omitted by the Ballista's scheduler.execute_query(), cause wrong partition count #1848

Closed
mingmwang opened this issue Feb 17, 2022 · 7 comments
Labels
bug Something isn't working

Comments

@mingmwang
Copy link
Contributor

Describe the bug

The issue is caused by the changes 1677
which always use the ExecutionContext from the SchedulerServer.

Before the change, run TPCH benchmark Q1 on Ballista:

[2022-02-16T08:47:59Z INFO ballista_scheduler] Adding stage 1 with 1 pending tasks
[2022-02-16T08:47:59Z INFO ballista_scheduler] Adding stage 2 with 2 pending tasks
[2022-02-16T08:47:59Z INFO ballista_scheduler] Adding stage 3 with 1 pending tasks

After the change:

[2022-02-16T08:44:57Z INFO ballista_scheduler] Adding stage 1 with 1 pending tasks
[2022-02-16T08:44:57Z INFO ballista_scheduler] Adding stage 2 with 8 pending tasks
[2022-02-16T08:44:57Z INFO ballista_scheduler] Adding stage 3 with 1 pending tasks.

A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

Expected behavior

SchedulerServer should honor the configuration settings from the ExecuteQueryParams.

Additional context
Add any other context about the problem here.

@mingmwang mingmwang added the bug Something isn't working label Feb 17, 2022
@mingmwang
Copy link
Contributor Author

@thinkharderdev Please take a look.

@mingmwang
Copy link
Contributor Author

I think we need to introduce a session level state to hold any session specific configurations instead of global shared ExecutionContext/ExecutionContextState. We might have a shared Ballista Scheduler, different users might submit SQLs with different sql configurations or shuffle settings.

@thinkharderdev
Copy link
Contributor

Will do. I think there are a couple of different ways we can approach this:

  1. Have the client specify a namespace in the request and use a ExecutionContext-per-namespace on the scheduler. We could then dynamically create new contexts whenever a new namespace comes in.
  2. Have the scheduler dynamically set target partitions based on executor statistics (e.g. number of available task slots). This would I think require a way to set the target partitions explicitly when creating a sql plan. So maybe add a new method to ExecutionContext like

pub async fn sql(&mut self, sql: &str, target_partitions: usize) -> Result<Arc<dyn DataFrame>>

Or both. 1 may be necessary anyway to support multi-tenancy but we may still, within a single namespace, want to allow specifying shuffle settings on a per-query basis.

@thinkharderdev
Copy link
Contributor

Also, good catch! Apologies for overlooking this.

@mingmwang
Copy link
Contributor Author

Will do. I think there are a couple of different ways we can approach this:

  1. Have the client specify a namespace in the request and use a ExecutionContext-per-namespace on the scheduler. We could then dynamically create new contexts whenever a new namespace comes in.
  2. Have the scheduler dynamically set target partitions based on executor statistics (e.g. number of available task slots). This would I think require a way to set the target partitions explicitly when creating a sql plan. So maybe add a new method to ExecutionContext like

pub async fn sql(&mut self, sql: &str, target_partitions: usize) -> Result<Arc<dyn DataFrame>>

Or both. 1 may be necessary anyway to support multi-tenancy but we may still, within a single namespace, want to allow specifying shuffle settings on a per-query basis.

I would prefer to let the users choose the target partition at the current phase. Target partition should not be changed too dynamically, otherwise the runtime distributed physical plan will not be stable and could introduce additional shuffle exchanges. In future we might add some kind of adaptive methods to adjust the target partition size based on input/output data volume.

@mingmwang
Copy link
Contributor Author

Beside the target partition count, I think there are couple of other configuration options that could be specified by the users and can be changed dynamically, for example, batch_size, parquet_pruning, repartition_windows etc.

I searched the open issues and found there are couple of configuration related issues that are still open.

138
682

I think it is time to resolve those and come up with a more extensible configuration design.

@mingmwang
Copy link
Contributor Author

The issue is fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants