Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista: Implement map-side shuffle #543

Merged
merged 5 commits into from
Jun 26, 2021

Conversation

andygrove
Copy link
Member

Which issue does this PR close?

Closes #456

Rationale for this change

Another step towards implementing full shuffle support.

What changes are included in this PR?

Are there any user-facing changes?

The result meta-data from executing a query stage now has an additional column with a partition number.

@codecov-commenter
Copy link

codecov-commenter commented Jun 11, 2021

Codecov Report

Merging #543 (eb2d673) into master (63e3045) will increase coverage by 0.08%.
The diff coverage is 90.90%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     apache/arrow-datafusion#543      +/-   ##
==========================================
+ Coverage   76.08%   76.17%   +0.08%     
==========================================
  Files         156      156              
  Lines       27035    27174     +139     
==========================================
+ Hits        20570    20699     +129     
- Misses       6465     6475      +10     
Impacted Files Coverage Δ
...lista/rust/core/src/execution_plans/query_stage.rs 85.13% <90.78%> (+9.34%) ⬆️
ballista/rust/core/src/serde/scheduler/mod.rs 60.71% <100.00%> (+1.78%) ⬆️
datafusion/src/physical_plan/mod.rs 79.09% <100.00%> (+0.38%) ⬆️
datafusion/src/physical_plan/planner.rs 77.53% <0.00%> (-2.66%) ⬇️
ballista/rust/core/src/utils.rs 25.53% <0.00%> (-2.06%) ⬇️
...ista/rust/core/src/serde/physical_plan/to_proto.rs 49.38% <0.00%> (-0.93%) ⬇️
datafusion/src/physical_plan/hash_join.rs 84.89% <0.00%> (-0.63%) ⬇️
datafusion/src/physical_plan/expressions/case.rs 75.00% <0.00%> (-0.57%) ⬇️
datafusion/src/execution/context.rs 92.00% <0.00%> (-0.09%) ⬇️
ballista/rust/client/src/context.rs 0.00% <0.00%> (ø)
... and 9 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 63e3045...eb2d673. Read the comment docs.

@andygrove andygrove marked this pull request as ready for review June 11, 2021 22:27
@andygrove
Copy link
Member Author

@edrevo fyi

.into_array(input_batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could reuse the code better at some moment?

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
hashes_buf.resize(arrays[0].len(), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noob question: is there a guarantee that all recordbatches have at least one element?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be at least one column based on the expressions in hash repartitioning - which I think should be a prerequisite when doing hash repartitioning - I am not sure whether DataFusion checks on that explicitly when constructing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err(DataFusionError::NotImplemented(
"Shuffle partitioning not implemented yet".to_owned(),
))
Some(Partitioning::Hash(exprs, n)) => {
Copy link
Contributor

@edrevo edrevo Jun 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just thinking out loud without any data to back me up, but maybe it is worth special-casing when n==1, so we don't actually perform the hash of everything, since all of the data is going to end up in the same partition anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I filed https://github.com/apache/arrow-datafusion/issues/626 for this. I'd like to get the basic end-to-end shuffle mechanism working before we start optimizing too much.


// we won't necessary produce output for every possible partition, so we
// create writers on demand
let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Arc + Mutex is unnecessary if you use .iter_mut() when necessary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried changing this but ran into ownership issues. I'll go ahead and merge and perhaps someone can help me with fixing this as a follow up PR.

@andygrove andygrove merged commit 61199b9 into apache:master Jun 26, 2021
@andygrove andygrove deleted the shuffle-write-many branch June 26, 2021 16:05
@edrevo edrevo mentioned this pull request Jun 28, 2021
@houqp houqp added api change Changes the API exposed to users of the crate enhancement New feature or request labels Jul 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ballista: Implement map-side of shuffle
6 participants