Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Out of core sorts #19

Closed
revans2 opened this issue May 28, 2020 · 3 comments · Fixed by #1719
Closed

[FEA] Out of core sorts #19

revans2 opened this issue May 28, 2020 · 3 comments · Fixed by #1719
Assignees
Labels
feature request New feature or request P0 Must have for release SQL part of the SQL/Dataframe plugin

Comments

@revans2
Copy link
Collaborator

revans2 commented May 28, 2020

Is your feature request related to a problem? Please describe.
Currently when sorting the plugin has to hold all of the data for a single partition in memory. It would be great if we supported spilling some of that data to host memory/disk so if someone had a horribly skewed sort or something that the plugin could still work, even if it was not ideal.

@revans2 revans2 added feature request New feature or request ? - Needs Triage Need team to review and classify SQL part of the SQL/Dataframe plugin labels May 28, 2020
@revans2
Copy link
Collaborator Author

revans2 commented Aug 10, 2020

Sorry for the wall of text. I didn't have the energy to come up with some pictures.

I have been playing around with a few algorithms on paper to try and understand what is the ideal way to do this. For now I think we want to do an external merge sort but with a few twists because of limitations we have with spilling the data, with the APIs we have available to us through CUDF, and the fact that we don't know how much data we are going to sort ahead of time.

So the main idea is that we sort each batch as it comes in and split it into N chunks (I'll describe this more in detail later). Each chunk will be stored in the spill-able cache, but before we put them into the cache we get the min key for each of those chunks. (I'll describe this based off of an ascending sort order for all columns but it should be simple to generalize this to any order for any of the columns)

Once we have done this for all of the incoming batches we can order these chunks by the minimum value in them. Take the N chunks with the lowest minimum values in them and do a merge sort on these values. We then look at the minimum value in the next chunk that has not been included in this. Everything less than or equal to this key is now in sorted order and could be output for later processing. Everything greater than this key needs to be split into smaller chunks again and the process repeated until all of the data is sorted.

Conceptually I think we want to deal with bytes instead of chunks of memory, when splitting the data as such we should divide the target output batch size spark.rapids.sql.batchSizeBytes by N (which would be a new config) that would give us the target size in bytes for each chunk. We can then try to divide the data we have to split up into chunks that are approximately this size (which we can use row count and actual memory usage to estimate). Generally we want to set this N value to be somewhat high. (8 maybe 16, but we need to run some performance tests to really know). The downside to setting this high is that we have in increased amount of metadata we have to store and more buffers that we might have to spill, so the overhead can be a lot. I also don't totally understand the merge sort algorithm so there may be some limitations there too.

Additionally when loading the data we could keep track of the data size for each chunk and load chunks until we would overflow spark.rapids.sql.batchSizeBytes instead of exactly N entries. This could help if we get small chunks in some cases after splitting off the amount of data that can be output.

This works great on paper so long as N is large enough. If we have a lot of input data (enough to warrant multiple merge-sort passes through the data), then I see us outputting relatively small amounts of data in many cases. In the worst case all of the input batches are the same data and we can only output one row per input batch. To avoid this I think we want a target output size for sort. So if we ran into a situation where we could not output enough data, we would stored the data to output in the spill-able cache and then go on until we had enough data to output. We need to be careful here too, because I might end up with a batch that would exceed spark.rapids.sql.batchSizeBytes. That might not be a big deal if it is a very soft limit, but if we want to make it a harder limit we could then split the chunk ready for output further so when we concat all of the pending output chunks together we are closer to spark.rapids.sql.batchSizeBytes and then store the rest of the output chunk for later.

There are also some games that we can play with not putting data into the if we know that we are going to use it in the next round of processing. I have seen this be a big benefit on paper, but I have no idea in real life how it would work. We need to be careful though. We need to make sure that all of the data we are not going to output is in the cache when we had control off down stream. We don't currently do that for several algorithms, but I think we need to work on doing it more.

@revans2
Copy link
Collaborator Author

revans2 commented Aug 10, 2020

Just as a side note there was some discussion about implementing a histogram like operation for the GPU to be used with distributed sort. I have not see that API, but it would in theory let us get a dynamically bucketed histogram of the data.

We could then do an algorithm where we do one pass through the data to create this histogram. We pull it back and figure out all of the cut points. We would then read back in all of the data sort it and cut it, putting it back in the cache. After that we then pull in just the parts we care about for each final merge sort and output. This would be closer to what spark does for distributed sort (where we also hope to eventually use this histogram code).

We might want to do some analysis about this once we have the histogram code in place. The big issue I see with using the histogram code is that it could make us susceptible to data skew.

@sameerz sameerz added P1 Nice to have for release and removed ? - Needs Triage Need team to review and classify labels Oct 13, 2020
wjxiz1992 pushed a commit to wjxiz1992/spark-rapids that referenced this issue Oct 29, 2020
* Commit sample app for spark xgboost

* Updated according to the comments
@sameerz
Copy link
Collaborator

sameerz commented Jan 5, 2021

Removing from 0.4 as it has not been prioritized for this release. However we should consider coming back to this soon.

@sameerz sameerz added P0 Must have for release and removed P1 Nice to have for release labels Jan 5, 2021
@sameerz sameerz added this to the Feb 16 - Feb 26 milestone Feb 16, 2021
@sameerz sameerz linked a pull request Feb 16, 2021 that will close this issue
@revans2 revans2 closed this as completed Feb 22, 2021
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
Signed-off-by: spark-rapids automation <[email protected]>
sperlingxx pushed a commit to sperlingxx/spark-rapids that referenced this issue Jan 16, 2024
one log for GPU reuse fixup

Signed-off-by: Firestarman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request P0 Must have for release SQL part of the SQL/Dataframe plugin
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants