Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista: Implement scalable distributed joins #634

Merged
merged 2 commits into from
Jul 4, 2021

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Jun 27, 2021

Which issue does this PR close?

Closes #63.

This PR removes previous hacks around partitioning and now faithfully translates the DataFusion query plan, including RepartitionExec. I have tested with TPC-H query 12 and see consistent results between DataFusion and Ballista with the 100GB data set, where each table has 8 partitions. I have tested with multiple executors as well as single executors.

There is more work to do but I think this is at a good point to merge since it fixes some correctness issues.

Rationale for this change

Ballista cannot scale well without this because work is duplicated across all partitions to load the entire left side of the join into memory currently.

What changes are included in this PR?

  • Enables RepartitionExec in Ballista query plans and translate them to shuffles
  • Removes previous hacks intended to detect changes in partitioning

Are there any user-facing changes?

Query plans will change.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Jun 27, 2021
@andygrove andygrove changed the title Ballista: Implement scalable distributed joins [DRAFT] Ballista: Implement scalable distributed joins Jul 3, 2021
@andygrove andygrove marked this pull request as ready for review July 3, 2021 13:48
@andygrove
Copy link
Member Author

@edrevo fyi

.with_repartition_joins(false)
.with_repartition_aggregations(false)
.with_physical_optimizer_rules(rules);
let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the idea here for later? I guess the repartitioning needs to be applied with concurrency=1 too to avoid inefficient plans?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing 😎😎😎

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ready to merge; very neat solution! 💯

@Dandandan Dandandan merged commit 9314dbb into apache:master Jul 4, 2021
@houqp houqp added the enhancement New feature or request label Jul 29, 2021
@andygrove andygrove deleted the ballista-scalable-join branch February 6, 2022 17:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement scalable distributed joins
4 participants