Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make a plan for sort_values/set_index #2272

Closed
mrocklin opened this issue Mar 20, 2019 · 80 comments
Closed

Make a plan for sort_values/set_index #2272

mrocklin opened this issue Mar 20, 2019 · 80 comments
Labels
dask Dask issue Python Affects Python cuDF API.

Comments

@mrocklin
Copy link
Collaborator

It would be nice to be able to use the set_index method to sort the dataframe by a particular column.

There are currently two implementations for this, one in dask.dataframe and one in dask-cudf which uses a batcher sorting net. While most dask-cudf code has been removed in favor of the dask.dataframe implementations this sorting code has remained, mostly because I don't understand it fully, and don't know if there was a reason for this particular implementation.

Why was this implementation chosen? Was this discussed somewhere? Alternatively @sklam, do you have any information here?

cc @kkraus14 @randerzander

@mrocklin
Copy link
Collaborator Author

mrocklin commented Mar 28, 2019

NVIDIA folks have asked if there is some way to integrate an MPI or NCCL enabled multi-gpu sort into Dask for improved efficiency. My initial reaction to this is that it's likely to be difficult to integrate smoothly in a way that respects other Dask management like resilience, spilling to disk, load balancing, and so on. Lets expand on this.

First, if we have a multi-node sorting algorithm, we can always treat it how we treat XGBoost. Dask gives up control to some other higher performance system, it does its thing, we claim control back. If anything fails during this stage then we just retry the whole thing. We give up on any kind of memory management or load balancing during this process and just hope that the external system can handle things well without blowing up.

Second question is if we just have a single-node multi-GPU system, maybe we can use that? This is also a bit tricky currently, but we might be able to make structural changes to Dask to make it less tricky. The cost-benefit analysis of those changes might make this undesirable though. Currently the approach most people seem to be using with Dask and GPUs is to have one Dask worker per GPU. Currently Dask workers don't have any knowledge of other Dask workers on the same node, so there isn't anything built up to handle local collective action. We would be doing something similar to what is done above where we would more or less stop Dask from doing its normal task-scheduling thing, hand-write a bunch of control flow, hope nothing breaks, run custom code, and then have Dask take back control when we're done.

Both are totally doable, but would require us to build something like dask-xgboost, and raise general concerns around memory management, resilience, diagnostics, load balancing, spilling to disk, and so forth. We lose a lot of Dask's management when we switch into this mode.

So maybe Dask should start thinking more about collective actions. This is coming up often enough that it probably deserves more attention. That's a large conversation though and probably requires dedicated time from someone somewhat deeply familiar with Dask scheduler internals.

I think that, short term, we should continue with the current approach of using efficient single-core/gpu sorts and shuffling techniques currently done in dask.dataframe. We should tune these to the extent that we can both by making the single-gpu sort algorithms faster, the book keeping operations faster, and the communication faster. If this isn't enough then we should investigate collective actions as discussed above, but that should be part of a larger effort than just sorting.

@datametrician
Copy link
Contributor

I agree with this approach. Thrust has a good single GPU sorting, and I think UCX should help tremendously. Any thoughts on book keeping, or was that more of a placeholder for if we need it?

@mrocklin
Copy link
Collaborator Author

There are two forms of book keeping that are relevant here:

  1. The slicing and concatenation of cudf dataframes. This is largely a test of memory management.
  2. The administrative tracking of tasks within the scheduler. This becomes more of an issue as we scale out to more nodes.

@mrocklin
Copy link
Collaborator Author

mrocklin commented Apr 1, 2019

There are currently two implementations for this, one in dask.dataframe and one in dask-cudf which uses a batcher sorting net. While most dask-cudf code has been removed in favor of the dask.dataframe implementations this sorting code has remained, mostly because I don't understand it fully, and don't know if there was a reason for this particular implementation.

Why was this implementation chosen? Was this discussed somewhere? Alternatively @sklam, do you have any information here?

@sklam can you expand on the motivation behind your use of batcher sort net? Why was this decided on rather than the approach take in the mainline dask dataframe codebase?

@sklam
Copy link
Contributor

sklam commented Apr 1, 2019

@mrocklin, I was trying to avoid the following of the mainline sort/shuffle:

  • Storing the data onto the disk
  • Output is imbalanced. All equal keys in the same partition. If the key distribution is heavily skewed, GPU memory may run out.

I chose the sorting network because:

  • It's very easy to implement.
  • The sorting network has a fixed structure given the number of partitions.
  • There are more parallel regions. (this one is according to my bad memory)
  • The memory usage is predictable. The network can be seen as multiple-stages. Each stage has data-dependency on the previous stage only. Given 1GB of data, the algorithm will roughly need 2GB (1GB for the input and 1GB for the output at each stage).
  • It will rebalance the partitions. It may produce fewer partitions and the partition size will not be more than the max(input_partition_sizes)

@mrocklin
Copy link
Collaborator Author

mrocklin commented Apr 1, 2019

OK, so these concerns seem similar to the concerns we had while designing the various shuffle algorithms for dask dataframe/bag. I'm not seeing anything here that is highly specific to GPU computation. (please correct me if I'm wrong).

My inclination then is to try to unify both CPU and GPU around a single implemenatation, probably starting with the one in dask.dataframe, but then maybe we would consider batcher-sorting-networks as an alternative for both, rather than just for dask-cudf, after doing some more extensive benchmarking.

Thanks for the information @sklam , I really appreciate the continuity here.

@mrocklin
Copy link
Collaborator Author

mrocklin commented Apr 8, 2019

I recently took a look at Dask dataframe's task-based shuffle and improved docstrings here in order to help others dive in: https://github.com/dask/dask/pull/4674/files

I think that we need the following:

  • Something like the pandas.util.hash_pandas_object function, which hashes a pandas object row-by-row returning a Series of integer values. (@kkraus14 do we have this?)

  • A Series.searchsorted method, allowing us to figure out where each row should go. Used here:

    def set_partitions_pre(s, divisions):
        partitions = pd.Series(divisions).searchsorted(s, side='right') - 1
        partitions[(s >= divisions[-1]).values] = len(divisions) - 2
        return partitions

    This helps us to assign a partition number to every row, based on where the future-index value of that row sits relative to the divisions. Example: "Is the value in between the first and second divisions? Great, it goes in the first partition."

  • The dask.dataframe.shuffle.shuffle_group function, which splits a pandas dataframe apart into a dict of pandas dataframes based on the value of a particular column provided by the hash values above. We also need to modify this value in a particular way to achieved multi-staged shuffling. This logic is explained in the PR above.

    We can either rewrite this from scratch or, if cudf is supporting ufuncs we might be able to get away with just dispatching on the pandas._libs.algos.groupsort_indexer and reuse all of the fancy logic here (which would be nice if cudf supports most ufuncs and out parameters)

  • Eventually we will also need quantile information. This is also evolving in Dask core at the moment, so I suggest that we wait on this for a bit.

@mrocklin
Copy link
Collaborator Author

mrocklin commented Apr 8, 2019

Also, just to direct people, I think that the core function that we'll have to make work is rearrange_by_column_tasks. I think that all of set_index/merge/sort_values can be made to rely on functionality there.

@mtjrider
Copy link
Contributor

@mrocklin @datametrician I have a vested interest in seeing this work succeed, and will begin by implementing the solution path you've outlined above. If we need to get into the specifics of MPI/NCCL, we can cross that bridge later. I think a general solution which puts Dask first is going to help the ecosystem the most.

@mrocklin
Copy link
Collaborator Author

mrocklin commented Apr 10, 2019

A minimal test would look something like the following:

import cudf, dask.dataframe as dd

# Make a dataframe, we'd like to divide the data by y
df = cudf.DataFrame({'x': [1, 2, 3, 4, 5, 6], 'y': [0, 4, 0, 4, 0, 4]})

# Split it up into a few partitions with Dask
ddf = dd.from_pandas(df, npartitions=3)

# Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4
out = dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0, 2, 4], shuffle='tasks')

# compute in a single thread so it's easy to use %pdb and %debug
out.compute(scheduler='single-threaded')

@mrocklin
Copy link
Collaborator Author

I just tried this and ran into an minor problem of cudf.DataFrame.drop not supporting the axis= keyword. As a suggestion, these errors are easier to identify if you remove some of Dask's error reporting with the following diff

diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py
index 6a08af9..894fba6 100644
--- a/dask/dataframe/core.py
+++ b/dask/dataframe/core.py
@@ -3736,8 +3736,8 @@ def _emulate(func, *args, **kwargs):
     Apply a function using args / kwargs. If arguments contain dd.DataFrame /
     dd.Series, using internal cache (``_meta``) for calculation
     """
-    with raise_on_meta_error(funcname(func), udf=kwargs.pop('udf', False)):
-        return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
+    kwargs.pop('udf')
+    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

@mrocklin
Copy link
Collaborator Author

#1074

@mrocklin
Copy link
Collaborator Author

I imagine that, like with the groupby aggregations work, this will end up triggering many small PRs in cudf.

@mrocklin
Copy link
Collaborator Author

Potential fix here: #1396

@mrocklin
Copy link
Collaborator Author

Next thing I run into, searchsorted

In [1]: import cudf, dask.dataframe as dd
   ...:
   ...: # Make a dataframe, we'd like to divide the data by y
   ...: df = cudf.DataFrame({'x': [1, 2, 3, 4, 5, 6], 'y': [0, 4, 0, 4, 0, 4]})
   ...:
   ...: # Split it up into a few partitions with Dask
   ...: ddf = dd.from_pandas(df, npartitions=3)
   ...:
   ...: # Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4
   ...: dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0, 2, 4], shuffle='tasks')
Out[1]: <dask_cudf.DataFrame | 40 tasks | 2 npartitions>

In [2]: _.compute(scheduler='single-threaded')
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
... <removed for clarity>
TypeError: can't compute boolean for <class 'cudf.dataframe.series.Series'>

In [3]: debug
> /home/nfs/mrocklin/cudf/python/cudf/dataframe/series.py(325)__bool__()
    323         into a boolean.
    324         """
--> 325         raise TypeError("can't compute boolean for {!r}".format(type(self)))
    326
    327     def values_to_string(self, nrows=None):

ipdb> up
> /home/nfs/mrocklin/miniconda/envs/cudf/lib/python3.7/site-packages/pandas/core/series.py(2337)searchsorted()
   2335             sorter = ensure_platform_int(sorter)
   2336         result = self._values.searchsorted(Series(value)._values,
-> 2337                                            side=side, sorter=sorter)
   2338
   2339         return result[0] if is_scalar(value) else result

ipdb>
> /home/nfs/mrocklin/dask/dask/dataframe/shuffle.py(434)set_partitions_pre()
    432
    433 def set_partitions_pre(s, divisions):
--> 434     partitions = pd.Series(divisions).searchsorted(s, side='right') - 1
    435     partitions[(s >= divisions[-1]).values] = len(divisions) - 2
    436     return partitions

ipdb>

@mike-wendt mike-wendt transferred this issue from rapidsai/dask-cudf Jul 15, 2019
@mrocklin
Copy link
Collaborator Author

Searchsorted work is happening here: #2156

@rjzamora
Copy link
Member

Just a minor update here - After #2156 goes through, the next issue we run into is the very next line of set_partitions_pre:

~/workspace/cudf-dask-devel/dask/dask/dataframe/shuffle.py in set_partitions_pre(s, divisions)
    546
    547 def set_partitions_pre(s, divisions):
    548     partitions = pd.Series(divisions).searchsorted(s, side='right') - 1
--> 549     partitions[(s >= divisions[-1]).values] = len(divisions) - 2
    550     return partitions

AttributeError: 'Series' object has no attribute 'values'

Since values is not a cudf.Series property, perhaps the solution is as simple as making it one (and just returning the column values on host)?

@rjzamora
Copy link
Member

Adding a simple values property to cudf.Series does seem to get us through the small test.
Changes to cudf/python/cudf/cudf/dataframe/series.py:

@property
def values(self):
    return self._values

@property
def _values(self):
    return self.to_pandas().values

Test:

In [1]: import cudf, dask.dataframe as dd 
   ...:  
   ...: # Make a dataframe, we'd like to divide the data by y 
   ...: df = cudf.DataFrame({'x': [1, 2, 3, 4, 5, 6], 'y': [0, 4, 0, 4, 0, 4]}) 
   ...:  
   ...: # Split it up into a few partitions with Dask 
   ...: ddf = dd.from_pandas(df, npartitions=3) 
   ...:  
   ...: # Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4 
   ...: out = dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0, 2, 4], shuffle='tasks') 
   ...:  
   ...: # compute in a single thread so it's easy to use %pdb and %debug 
   ...: print(out.compute(scheduler='single-threaded'))                                                                                                                                  
   x  y
0  1  0
2  3  0
4  5  0
1  2  4
3  4  4
5  6  4

@kkraus14 - Is there a reason we might want to avoid adding a values property to Series (especially one that returns a non-device array)?

@ayushdg
Copy link
Member

ayushdg commented Jul 25, 2019

@rjzamora Just tried a small example of set_index this morning and saw the same issue of missing the values property. #2395

@jrhemstad
Copy link
Contributor

@kkraus14 - Is there a reason we might want to avoid adding a values property to Series (especially one that returns a non-device array)?

So copying an entire column from device to host? That would be very expensive.

@ayushdg
Copy link
Member

ayushdg commented Jul 25, 2019

548     partitions = pd.Series(divisions).searchsorted(s, side='right') - 1

One issue I see above is a hard dependency on pandas.

549 partitions[(s >= divisions[-1]).values] = len(divisions) - 2

If partitions were to be a cudf series instead of a pandas series , values would not need to be a non-device array

@rjzamora
Copy link
Member

@jrhemstad Sorry - That was a silly question!

@ayushdg - Right. I guess I am just unsure of the best way (if possible) to handle both pandas and cudf using the same logic in Dask, but I haven't exactly tried much yet :)

@mrocklin
Copy link
Collaborator Author

Some options to explore:

  1. Can .values return something like a cupy array (if it is installed)?

  2. Do we strictly need to use .values? Is this because the result of searchsorted is a numpy array rather than a Series?

    def set_partitions_pre(s, divisions):
        partitions = pd.Series(divisions).searchsorted(s, side='right') - 1
        partitions[(s >= divisions[-1]).values] = len(divisions) - 2
        return partitions

I agree that it would be good not to call .to_pandas() here and try to keep things on the device.

See also some conversation about Series.values here: #1824

@ayushdg
Copy link
Member

ayushdg commented Jul 25, 2019

Also see: #2373 which attempts to add .values support by moving to host. Not sure if it's the best option. There is some discussion echoing this point on that pr as well.

@rjzamora
Copy link
Member

@mrocklin I explored some options to avoid copying the entire column from device to host. I made this simple gist with some experiments.

For a dataframe with 1e8 rows, the dask+pandas version takes about 13.5s, while the dask+cudf version takes about 7.6s. If I use a to_pandas()-based values property, the operation takes ~11s

Note that the gist also shows that the cudf version of searchsorted is a bit different than the pandas version (it returns a series, rather than a numpy array). However, this seems reasonable to me..

@kkraus14
Copy link
Collaborator

7.6s sounds very high, what's taking the time in this situation? Could you share a profile?

@rjzamora
Copy link
Member

7.6s sounds very high, what's taking the time in this situation? Could you share a profile?

@kkraus14 agreed - I'll take a closer look and collect a profile

@mrocklin
Copy link
Collaborator Author

We're probably ready to remove some of the older dask-cudf implementations of set_index/merge/join. I've raised this as a separate issue here: #2598

@mrocklin
Copy link
Collaborator Author

I thought I'd poke this. @harrism @jrhemstad do either of you have thoughts on what a timeline for the partition_map splitting operation could look like? We're quickly reaching a state where this will be our only remaining blocker.

cc @randerzander

@harrism
Copy link
Member

harrism commented Aug 23, 2019

@mrocklin yes, @jrhemstad and I have been a plan. A "fastest time to solution, not necessarily fastest solution" plan is described in #2677. Please review.

@randerzander
Copy link
Contributor

Now that all of the issues mentioned above have closed, and cupy packaging has been resolved, are we able to continue deprecating the custom dask_cudf sort code?

My hope is doing so will yield performance and scalability improvements. Several workflows have recently had problems with the dask_cudf implementation of set_index and sort_values.

If it's useful, I'm happy to open new issues with example unexpected behavior, but a quick read of the above makes me feel like you all have a good handle on the problem.

@randerzander randerzander added Python Affects Python cuDF API. dask Dask issue labels Dec 19, 2019
@rjzamora
Copy link
Member

rjzamora commented Dec 19, 2019

...are we able to continue deprecating the custom dask_cudf sort code?... My hope is doing so will yield performance and scalability improvements. Several workflows have recently had problems with the dask_cudf implementation of set_index and sort_values.

@randerzander Can you clarify the pieces that are still custom to dask_cudf? dask_cudf should now be using the main-line dask implementations of set_index and merge/join. Since sort_values is not supported in dask, we never really made a plan to remove the cudf-specific implementation.

If your comment is more about performance improvements/optimizations, then I can say we are certainly in the process of exploring ways to (1) Improve merge/join performance to better match the expected/available performance, and (2) improve the stability of operations like set_index.

Note that I am currently working on a simple (but experimental) modification to the upstream-dask shuffle and set_partition code. It seems that we can use hashing to moderately reduce some of the memory consumption and and data-copy overhead.

@harrism
Copy link
Member

harrism commented Feb 3, 2020

To what extent is this issue satisfied by the current work on providing multi-column distributed sorting primitives (table quantiles, k-way merge) in libcudf?

@kkraus14
Copy link
Collaborator

kkraus14 commented Feb 3, 2020

To what extent is this issue satisfied by the current work on providing multi-column distributed sorting primitives (table quantiles, k-way merge) in libcudf?

100%, this is the primary target for that work.

@harrism
Copy link
Member

harrism commented Mar 15, 2020

@kkraus14 both of those new primitives were added. Does this need to stay open?

@kkraus14
Copy link
Collaborator

I defer to @rjzamora if he'd like to keep this open for future discussion or if this can now be closed.

@rjzamora
Copy link
Member

We should probably close this once #4308 is merged. That should provide dask_cudf with the general sorting/set_index functionality targeted by this discussion. After that, the remaining work will be to push as mach dask_cudf code as possible into main-line dask (which can probably be tracked separately).

@rjzamora
Copy link
Member

Now that #4308 is merged, it probably makes sense to close this. We can open new issues in the future to discuss any performance/functionality challenges that come up.

Also, the task of moving the new sort_values and repartition_by_hash functionality into upstream dask can/should be tracked elsewhere.

@rjzamora
Copy link
Member

@randerzander - Feel free to reopen this if the current sort_values solution does not end up meeting your needs. You can also open a more-targeted issue.

@vyasr vyasr removed the dask-cudf label Feb 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask Dask issue Python Affects Python cuDF API.
Projects
None yet
Development

No branches or pull requests