Skip to content

Commit

Permalink
Enable backend dispatching for Dask-DataFrame creation (#11920)
Browse files Browse the repository at this point in the history
This PR depends on dask/dask#9475 (**Now Merged**)

After dask#9475, external libraries are now able to implement (and expose) their own `DataFrameBackendEntrypoint` definitions to specify custom creation functions for DataFrame collections. This PR introduces the `CudfBackendEntrypoint` class to create `dask_cudf.DataFrame` collections using the `dask.dataframe` API. By installing `dask_cudf` with this entrypoint definition in place, you get the following behavior in `dask.dataframe`:

```python
import dask.dataframe as dd
import dask

# Tell Dask that you want to create DataFrame collections
# with the "cudf" backend (for supported creation functions).
# This can also be used in a context, or set in a yaml file
dask.config.set({"dataframe.backend": "cudf"})

ddf = dd.from_dict({"a": range(10)}, npartitions=2)
type(ddf)  # dask_cudf.core.DataFrame
```

Note that the code snippet above does not require an explicit import of `cudf` or `dask_cudf`. The following creation functions will support backend dispatching after dask#9475:

- `from_dict`
- `read_paquet`
- `read_json`
- `read_orc`
- `read_csv`
- `read_hdf`

See also: dask/design-docs#1

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #11920
  • Loading branch information
rjzamora authored Oct 20, 2022
1 parent 08ffecc commit 416d4d5
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 13 deletions.
110 changes: 110 additions & 0 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

import warnings
from collections.abc import Iterator

import cupy as cp
Expand All @@ -8,6 +9,8 @@
import pyarrow as pa
from pandas.api.types import is_scalar

import dask.dataframe as dd
from dask import config
from dask.dataframe.core import get_parallel_type, meta_nonempty
from dask.dataframe.dispatch import (
categorical_dtype_dispatch,
Expand Down Expand Up @@ -426,3 +429,110 @@ def sizeof_cudf_dataframe(df):
@_dask_cudf_nvtx_annotate
def sizeof_cudf_series_index(obj):
return obj.memory_usage()


def _default_backend(func, *args, **kwargs):
# Utility to call a dask.dataframe function with
# the default ("pandas") backend

# NOTE: Some `CudfBackendEntrypoint` methods need to
# invoke the "pandas"-version of the same method, but
# with custom kwargs (e.g. `engine`). In these cases,
# an explicit "pandas" config context is needed to
# avoid a recursive loop
with config.set({"dataframe.backend": "pandas"}):
return func(*args, **kwargs)


try:

# Define "cudf" backend engine to be registered with Dask
from dask.dataframe.backends import DataFrameBackendEntrypoint

class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for Dask-DataFrame
This class is registered under the name "cudf" for the
``dask.dataframe.backends`` entrypoint in ``setup.cfg``.
Dask-DataFrame will use the methods defined in this class
in place of ``dask.dataframe.<creation-method>`` when the
"dataframe.backend" configuration is set to "cudf":
Examples
--------
>>> import dask
>>> import dask.dataframe as dd
>>> with dask.config.set({"dataframe.backend": "cudf"}):
... ddf = dd.from_dict({"a": range(10)})
>>> type(ddf)
<class 'dask_cudf.core.DataFrame'>
"""

@staticmethod
def from_dict(data, npartitions, orient="columns", **kwargs):
from dask_cudf import from_cudf

if orient != "columns":
raise ValueError(f"orient={orient} is not supported")
# TODO: Use cudf.from_dict
# (See: https://github.com/rapidsai/cudf/issues/11934)
return from_cudf(
cudf.DataFrame(data),
npartitions=npartitions,
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf.io.parquet import CudfEngine

return _default_backend(
dd.read_parquet,
*args,
engine=CudfEngine,
**kwargs,
)

@staticmethod
def read_json(*args, engine=None, **kwargs):
return _default_backend(
dd.read_json,
*args,
engine=cudf.read_json,
**kwargs,
)

@staticmethod
def read_orc(*args, **kwargs):
from dask_cudf.io import read_orc

return read_orc(*args, **kwargs)

@staticmethod
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv

chunksize = kwargs.pop("chunksize", None)
blocksize = kwargs.pop("blocksize", "default")
if chunksize is None and blocksize != "default":
chunksize = blocksize
return read_csv(
*args,
chunksize=chunksize,
**kwargs,
)

@staticmethod
def read_hdf(*args, **kwargs):
from dask_cudf import from_dask_dataframe

# HDF5 reader not yet implemented in cudf
warnings.warn(
"read_hdf is not yet implemented in cudf/dask_cudf. "
"Moving to cudf from pandas. Expect poor performance!"
)
return from_dask_dataframe(
_default_backend(dd.read_hdf, *args, **kwargs)
)

except ImportError:
pass
23 changes: 12 additions & 11 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import math
import warnings
from distutils.version import LooseVersion

import numpy as np
import pandas as pd
from packaging.version import parse as parse_version
from tlz import partition_all

import dask
Expand All @@ -31,7 +31,11 @@
from dask_cudf.accessors import ListMethods, StructMethods
from dask_cudf.sorting import _get_shuffle_type

DASK_VERSION = LooseVersion(dask.__version__)
DASK_BACKEND_SUPPORT = parse_version(dask.__version__) >= parse_version(
"2022.10.0"
)
# TODO: Remove DASK_BACKEND_SUPPORT throughout codebase
# when dask_cudf is pinned to dask>=2022.10.0


class _Frame(dd.core._Frame, OperatorMethodMixin):
Expand Down Expand Up @@ -736,7 +740,7 @@ def from_dask_dataframe(df):
return df.map_partitions(cudf.from_pandas)


for name in [
for name in (
"add",
"sub",
"mul",
Expand All @@ -751,16 +755,13 @@ def from_dask_dataframe(df):
"rfloordiv",
"rmod",
"rpow",
]:
):
meth = getattr(cudf.DataFrame, name)
kwargs = {"original": cudf.DataFrame} if DASK_VERSION >= "2.11.1" else {}
DataFrame._bind_operator_method(name, meth, **kwargs)
DataFrame._bind_operator_method(name, meth, original=cudf.Series)

meth = getattr(cudf.Series, name)
kwargs = {"original": cudf.Series} if DASK_VERSION >= "2.11.1" else {}
Series._bind_operator_method(name, meth, **kwargs)
Series._bind_operator_method(name, meth, original=cudf.Series)

for name in ["lt", "gt", "le", "ge", "ne", "eq"]:
for name in ("lt", "gt", "le", "ge", "ne", "eq"):
meth = getattr(cudf.Series, name)
kwargs = {"original": cudf.Series} if DASK_VERSION >= "2.11.1" else {}
Series._bind_comparison_method(name, meth, **kwargs)
Series._bind_comparison_method(name, meth, original=cudf.Series)
16 changes: 16 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
import dask_cudf


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_csv_roundtrip_backend_dispatch(tmp_path):
# Test ddf.read_csv cudf-backend dispatch
df = cudf.DataFrame({"x": [1, 2, 3, 4], "id": ["a", "b", "c", "d"]})
ddf = dask_cudf.from_cudf(df, npartitions=2)
csv_path = str(tmp_path / "data-*.csv")
ddf.to_csv(csv_path, index=False)
with dask.config.set({"dataframe.backend": "cudf"}):
ddf2 = dd.read_csv(csv_path)
assert isinstance(ddf2, dask_cudf.DataFrame)
dd.assert_eq(ddf, ddf2, check_divisions=False, check_index=False)


def test_csv_roundtrip(tmp_path):
df = cudf.DataFrame({"x": [1, 2, 3, 4], "id": ["a", "b", "c", "d"]})
ddf = dask_cudf.from_cudf(df, npartitions=2)
Expand Down
17 changes: 17 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@
import dask_cudf


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_read_json_backend_dispatch(tmp_path):
# Test ddf.read_json cudf-backend dispatch
df1 = dask.datasets.timeseries(
dtypes={"x": int, "y": int}, freq="120s"
).reset_index(drop=True)
json_path = str(tmp_path / "data-*.json")
df1.to_json(json_path)
with dask.config.set({"dataframe.backend": "cudf"}):
df2 = dd.read_json(json_path)
assert isinstance(df2, dask_cudf.DataFrame)
dd.assert_eq(df1, df2)


def test_read_json(tmp_path):
df1 = dask.datasets.timeseries(
dtypes={"x": int, "y": int}, freq="120s"
Expand Down
16 changes: 14 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,30 @@

import pytest

import dask
from dask import dataframe as dd

import cudf

import dask_cudf

# import pyarrow.orc as orc

cur_dir = os.path.dirname(__file__)
sample_orc = os.path.join(cur_dir, "data/orc/sample.orc")


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_read_orc_backend_dispatch():
# Test ddf.read_orc cudf-backend dispatch
df1 = cudf.read_orc(sample_orc)
with dask.config.set({"dataframe.backend": "cudf"}):
df2 = dd.read_orc(sample_orc)
assert isinstance(df2, dask_cudf.DataFrame)
dd.assert_eq(df1, df2, check_index=False)


def test_read_orc_defaults():
df1 = cudf.read_orc(sample_orc)
df2 = dask_cudf.read_orc(sample_orc)
Expand Down
14 changes: 14 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ def _divisions(setting):
return {"gather_statistics": setting}


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_roundtrip_backend_dispatch(tmpdir):
# Test ddf.read_parquet cudf-backend dispatch
tmpdir = str(tmpdir)
ddf.to_parquet(tmpdir, engine="pyarrow")
with dask.config.set({"dataframe.backend": "cudf"}):
ddf2 = dd.read_parquet(tmpdir, index=False)
assert isinstance(ddf2, dask_cudf.DataFrame)
dd.assert_eq(ddf.reset_index(drop=False), ddf2)


@pytest.mark.parametrize("write_metadata_file", [True, False])
@pytest.mark.parametrize("divisions", [True, False])
def test_roundtrip_from_dask(tmpdir, divisions, write_metadata_file):
Expand Down
17 changes: 17 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@
import dask_cudf as dgd


@pytest.mark.skipif(
not dgd.core.DASK_BACKEND_SUPPORT, reason="No backend-dispatch support"
)
def test_from_dict_backend_dispatch():
# Test ddf.from_dict cudf-backend dispatch
np.random.seed(0)
data = {
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
expect = cudf.DataFrame(data)
with dask.config.set({"dataframe.backend": "cudf"}):
ddf = dd.from_dict(data, npartitions=2)
assert isinstance(ddf, dgd.DataFrame)
dd.assert_eq(expect, ddf)


def test_from_cudf():
np.random.seed(0)

Expand Down
4 changes: 4 additions & 0 deletions python/dask_cudf/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ skip=
buck-out
build
dist

[options.entry_points]
dask.dataframe.backends =
cudf = dask_cudf.backends:CudfBackendEntrypoint

0 comments on commit 416d4d5

Please sign in to comment.