Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement scalable distributed joins #63

Closed
andygrove opened this issue Apr 25, 2021 · 3 comments · Fixed by #634
Closed

Implement scalable distributed joins #63

andygrove opened this issue Apr 25, 2021 · 3 comments · Fixed by #634
Assignees
Labels
enhancement New feature or request

Comments

@andygrove
Copy link
Member

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The main issue limiting scalability in Ballista today is that joins are implemented as hash joins where each partition of the probe side causes the entire left side to be loaded into memory.

Describe the solution you'd like

To make this scalable we need to hash partition left and right inputs so that we can join the left and right partitions in parallel.

There is already work underway in DataFusion to implement this that we can leverage.

Describe alternatives you've considered
None

Additional context
None

@andygrove andygrove added enhancement New feature or request ballista labels Apr 25, 2021
@boazberman
Copy link
Contributor

I'd love to work on this if someone can provide further reading material and/or the area in the code

@andygrove
Copy link
Member Author

Here is some additional information. When I run TPC-H query 5 in the benchmarks, against DataFusion, I see that the physical plan used partitioned joins.

For example, I see that both inputs to the join are partitioned on the join keys, and the join mode is Partitioned.

HashJoinExec: mode=Partitioned, join_type=Inner, on=[("c_custkey", "o_custkey")]
  RepartitionExec: partitioning=Hash([Column { name: "c_custkey" }], 24)
    ParquetExec: batch_size=8192, limit=None, partitions=[...]
  RepartitionExec: partitioning=Hash([Column { name: "o_custkey" }], 24)
    FilterExec: o_orderdate >= CAST(1994-01-01 AS Date32) AND o_orderdate < CAST(1995-01-01 AS Date32)
      ParquetExec: batch_size=8192, limit=None, partitions=[...]

This means that the join can run in parallel because the inputs are partitioned. So partition 1 of the join reads partition 1 of the left and right inputs, and so on.

When I run the same query against Ballista, I see.

HashJoinExec: mode=CollectLeft, join_type=Inner, on=[("c_custkey", "o_custkey")]
  ParquetExec: batch_size=8192, limit=None, partitions=[...]
  FilterExec: o_orderdate >= CAST(1994-01-01 AS Date32) AND o_orderdate < CAST(1995-01-01 AS Date32)
    ParquetExec: batch_size=8192, limit=None, partitions=[

Here, we see join mode CollectLeft, which means that each partition being executed will go and fetch the entire left-side of the join into memory. This is very inefficient both in terms of memory and compute and potentially gets exponentially slower the more partitions we have.

What we need to do is apply the same "partitioned hash join" pattern to Ballista.

@andygrove
Copy link
Member Author

I created a Google doc to discuss the design, and planned work, in more detail.

https://docs.google.com/document/d/1yUnGWsHKYOAxWijDJisEFYU4dIym_GSRSMpwfWjVZq8/edit?usp=sharing

@andygrove andygrove self-assigned this Jun 27, 2021
Ted-Jiang referenced this issue in Ted-Jiang/arrow-datafusion Jul 26, 2022
alamb pushed a commit that referenced this issue Jul 27, 2022
alamb pushed a commit that referenced this issue Nov 18, 2024
* Skip casting to binary when inner expr is value (#60)

* Skip casting to binary when inner expr is value

* Update datafusion/sql/src/unparser/expr.rs

Co-authored-by: Jack Eadie <[email protected]>

---------

Co-authored-by: Jack Eadie <[email protected]>

* Fix binary view cast (#63)

* fix

* Fix clippy error

---------

Co-authored-by: Jack Eadie <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants