-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rename concurrency to target_partitions #706
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
table_name: impl Into<String>, | ||
) -> Result<Self> { | ||
let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); | ||
let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One concern: when increasing the partitions, we also increase the number of maximum nr of threads while reading parquet. I think this should be decoupled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it should be decoupled, but I also don't think this PR makes the coupling any worse -- perhaps we could file a follow on ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #924 to track
I think this is mostly good. One concern I have is that the current config also sets the number of maximum threads during reading parquet files. For Ballista, I also think it makes sense to configure the amount of threads Tokio uses. When using multiple executors per machine/node you'll likely want to limit to avoid many (Tokio) threads, to avoid higher memory usage. This could be set when creating / configuring the tokio Runtime. |
Is this still true though? I know we were creating threads at one point in time but we are using Tokio/async now, so we are not creating threads. Increasing partition count will increase the number of async tasks that we run in the thread pool but won't increase the number of threads. |
We run the tasks now with |
What's the ideal design going forward? If the end goal is to create one async task for each partition and let tokio thread pool to manage the parallelism (threads), then I think the proper change we want to introduce should be getting rid of |
@Dandandan Ah, ok. I had not understood fully what was happening there. I think the goal is to have one async task per partition and to remove the spawn_blocking, as @houqp suggested. |
All our readers are blocking; if we do not run then on |
Maybe we do need two separate configs after all as per @Dandandan PR. Maybe with concurrency renamed to something more specific to readers. |
I believe that's correct. Might not be a problem when reading from one source, but if multiple sources come in, one task that waits on results from one source prevents other tasks from being started, which might limit the amount of both CPU and IO. |
For now, something specific to readers seems fine to me. For the future, I think we also need to have something for setting the max number of threads for running multiple executors / DataFusion processes on one node (using the builder). By default this uses the number of CPU cores, but if you run multiple ballista workers on one node it should be better to lower this. https://docs.rs/tokio/1.8.1/tokio/runtime/struct.Builder.html# |
I agree with @houqp that the ideal design is to make the parquet reader fully async (which is blocked on getting an actually async parquet reader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about naming the config option target_partitions
instead of default_partitions
which I think better describes what it is used for?
I think default_partitions
is aready a more specific name than concurrency
so I would be happy with that as well
Shall we try and push this one along? |
65d0c88
to
9723992
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good change. @houqp what do you think?
datafusion/src/execution/context.rs
Outdated
@@ -665,13 +665,13 @@ pub struct ExecutionConfig { | |||
/// virtual tables for displaying schema information | |||
information_schema: bool, | |||
/// Should DataFusion repartition data using the join keys to execute joins in parallel | |||
/// using the provided `concurrency` level | |||
/// using the provided `default_partitions` level |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some of these references to default_partitions
should probably be updated to target_partitions
to match the new field name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
table_name: impl Into<String>, | ||
) -> Result<Self> { | ||
let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); | ||
let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #924 to track
👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
3df67eb
to
2b5b783
Compare
I rebased this PR against master and if CI is all clean I plan to merge it in 🎉 |
Which issue does this PR close?
Closes #685 .
Rationale for this change
We originally used the
concurrency
config setting to determine how many threads to launch in certain parts of the code base but we now use Tokio to manage task concurrency. Theconcurrency
setting is only used to determine the number of partitions to use when repartitioning the plan. We should therefore rename this config setting.What changes are included in this PR?
Introduce new
with_default_partitions
method and leavewith_concurrency
in place for now so that we don't break the API.Are there any user-facing changes?
Yes
concurrency
field inExecutionContext
then they will now need to usedefault_partitions
instead. Users should not be accessing these attributes directly though.