Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine grained manipulation on hashing partitioned data #4631

Closed
5 of 6 tasks
fuzhe1989 opened this issue Apr 12, 2022 · 1 comment · Fixed by #5149
Closed
5 of 6 tasks

Fine grained manipulation on hashing partitioned data #4631

fuzhe1989 opened this issue Apr 12, 2022 · 1 comment · Fixed by #5149
Labels
type/enhancement The issue or PR belongs to an enhancement.

Comments

@fuzhe1989
Copy link
Contributor

fuzhe1989 commented Apr 12, 2022

Enhancement

We know that some operators (agg, join, etc.) could benefit from prehashing. If each inputstream belongs to one hash bucket:

  1. For agg, we do not need the merging phase, each thread could directly generate the final blocks.
  2. For build side of join, we do not need any lock and the resizing of hash table could only has influence to one thread.
  3. For probe side of join, it could start as soon as its corresponding build job ends, not the whole build phase.
  4. For exchange sender, we do not need to partition the data again, just directly send them.

Note that all the potential enhancements requires:

  1. Use the same (or compatible) hash function.
  2. The prehash key is the super set of the sink hash key.

So how to do prehashing? Here are some ideas:

  1. For a hash partition table, just let each inputstream corresponds to one partition (N:1 or 1:1 relation). The key is how to carry hash info along with inputstream.
  2. For large data volume, the agg will merge data into 256 buckets, however it then discards the hashing info, merging them into one inputstream. The upper operators could benefit from disclosing the hashing info from agg.
  3. We could insert some virtual hash partition operators near to TableScan and Exchange operators (which are leaves of a MPP task) to generate inputstreams with hashing info.

Especially, for Exchange, ExchangeSender is a better place to partition data than ExchangeReceiver: partitioning is already part of ExchangeSender's job.

Note: blindly split data at ExchangeSender could make the packet smaller and weaken the effect of vectorization. A previous demo showed that blindly splitting will decrease the performance (using current implementation), while well-designed batching will increase the performance a lot.

Subtasks for window function:

Subtasks for HashAgg/Join:

  • HashAgg + fine-grained shuffle
@fuzhe1989 fuzhe1989 added the type/enhancement The issue or PR belongs to an enhancement. label Apr 12, 2022
@fuzhe1989
Copy link
Contributor Author

example: table scan and two-phase aggregation.

tablescan -> partial-agg -> exchange-sender -> exchange-receiver -> final-agg

We know that even a partial-agg (from TiDB's point of view), in TiFlash its execution is divided into two phases. Here we could perform two optimizations:

  1. Let partial-agg output K inputstreams where K is the partition number of exchange-sender. Based on the fact that partital-agg already aggregates data into 256 buckets by hashing, the following aggregation of buckets to inputstreams has little cost. Then exchange-sender won't have to partition the data again, what it needs to do is pack blocks into packets and send them out.
  2. Prepartition data of each connection into P partitions at exchange-sender, then exchange-receiver makes P inputstreams each for one partition. This optimization allows the final-agg eliminates its merge phase.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants