Skip to content

Commit

Permalink
Register partd encode dispatch in dask_cudf (#14287)
Browse files Browse the repository at this point in the history
This PR enables "disk"-based shuffling of `cudf`-backed Dask-DataFrame collections, but does **not** yet add the `shuffle="disk"` option to the `dask_cudf.DataFrame.shuffle/sort_values` APIs.

We now use basic (slow) `pickle` logic to convert `cudf.DataFrame` objects to/from `bytes` here, so I'd like to consider further optimizations before making the `shuffle="disk"` option "official".

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Ray Douglass (https://github.com/raydouglass)
  - gpuCI (https://github.com/GPUtester)
  - Mike Wendt (https://github.com/mike-wendt)
  - AJ Schmidt (https://github.com/ajschmidt8)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #14287
  • Loading branch information
rjzamora authored Nov 6, 2023
1 parent c8c3e5c commit 70c4283
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
25 changes: 25 additions & 0 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,31 @@ def sizeof_cudf_series_index(obj):
return obj.memory_usage()


# TODO: Remove try/except when cudf is pinned to dask>=2023.10.0
try:
from dask.dataframe.dispatch import partd_encode_dispatch

@partd_encode_dispatch.register(cudf.DataFrame)
def _simple_cudf_encode(_):
# Basic pickle-based encoding for a partd k-v store
import pickle
from functools import partial

import partd

def join(dfs):
if not dfs:
return cudf.DataFrame()
else:
return cudf.concat(dfs)

dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
return partial(partd.Encode, dumps, pickle.loads, join)

except ImportError:
pass


def _default_backend(func, *args, **kwargs):
# Utility to call a dask.dataframe function with
# the default ("pandas") backend
Expand Down
11 changes: 11 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ def test_sort_values_empty_string(by):
if "a" in by:
expect = df.sort_values(by)
assert dd.assert_eq(got, expect, check_index=False)


def test_disk_shuffle():
try:
from dask.dataframe.dispatch import partd_encode_dispatch # noqa: F401
except ImportError:
pytest.skip("need a version of dask that has partd_encode_dispatch")
df = cudf.DataFrame({"a": [1, 2, 3] * 20, "b": [4, 5, 6, 7] * 15})
ddf = dd.from_pandas(df, npartitions=4)
got = dd.DataFrame.shuffle(ddf, "a", shuffle="disk")
dd.assert_eq(got, df)

0 comments on commit 70c4283

Please sign in to comment.