From b8ae0e4b41c541c5b2b27417af30fa1b9afcbdce Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 7 Feb 2023 23:33:56 -0600 Subject: [PATCH] Support conversion to/from cudf in dask.dataframe.core.to_backend (#12380) This PR corresponds to the `cudf` component of https://github.com/dask/dask/pull/9758 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/12380 --- python/dask_cudf/dask_cudf/backends.py | 194 +++++++++++------- python/dask_cudf/dask_cudf/tests/test_core.py | 55 ++++- 2 files changed, 170 insertions(+), 79 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index b6be5ade6ba..821ec103204 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -11,6 +11,10 @@ import dask.dataframe as dd from dask import config +from dask.dataframe.backends import ( + DataFrameBackendEntrypoint, + PandasBackendEntrypoint, +) from dask.dataframe.core import get_parallel_type, meta_nonempty from dask.dataframe.dispatch import ( categorical_dtype_dispatch, @@ -30,7 +34,7 @@ make_meta_obj, ) from dask.sizeof import sizeof as sizeof_dispatch -from dask.utils import is_arraylike +from dask.utils import Dispatch, is_arraylike import cudf from cudf.api.types import is_string_dtype @@ -446,91 +450,127 @@ def _default_backend(func, *args, **kwargs): return func(*args, **kwargs) -try: +def _unsupported_kwargs(old, new, kwargs): + # Utility to raise a meaningful error when + # unsupported kwargs are encountered within + # ``to_backend_dispatch`` + if kwargs: + raise ValueError( + f"Unsupported key-word arguments used in `to_backend` " + f"for {old}-to-{new} conversion: {kwargs}" + ) - # Define "cudf" backend engine to be registered with Dask - from dask.dataframe.backends import DataFrameBackendEntrypoint - - class CudfBackendEntrypoint(DataFrameBackendEntrypoint): - """Backend-entrypoint class for Dask-DataFrame - - This class is registered under the name "cudf" for the - ``dask.dataframe.backends`` entrypoint in ``setup.cfg``. - Dask-DataFrame will use the methods defined in this class - in place of ``dask.dataframe.`` when the - "dataframe.backend" configuration is set to "cudf": - - Examples - -------- - >>> import dask - >>> import dask.dataframe as dd - >>> with dask.config.set({"dataframe.backend": "cudf"}): - ... ddf = dd.from_dict({"a": range(10)}) - >>> type(ddf) - - """ - - @staticmethod - def from_dict( - data, - npartitions, - orient="columns", - dtype=None, - columns=None, - constructor=cudf.DataFrame, - ): - - return _default_backend( - dd.from_dict, - data, - npartitions=npartitions, - orient=orient, - dtype=dtype, - columns=columns, - constructor=constructor, - ) - @staticmethod - def read_parquet(*args, engine=None, **kwargs): - from dask_cudf.io.parquet import CudfEngine +# Register cudf->pandas +to_pandas_dispatch = PandasBackendEntrypoint.to_backend_dispatch() - return _default_backend( - dd.read_parquet, - *args, - engine=CudfEngine, - **kwargs, - ) - @staticmethod - def read_json(*args, **kwargs): - from dask_cudf.io.json import read_json +@to_pandas_dispatch.register((cudf.DataFrame, cudf.Series, cudf.Index)) +def to_pandas_dispatch_from_cudf(data, nullable=False, **kwargs): + _unsupported_kwargs("cudf", "pandas", kwargs) + return data.to_pandas(nullable=nullable) - return read_json(*args, **kwargs) - @staticmethod - def read_orc(*args, **kwargs): - from dask_cudf.io import read_orc +# Register pandas->cudf +to_cudf_dispatch = Dispatch("to_cudf_dispatch") - return read_orc(*args, **kwargs) - @staticmethod - def read_csv(*args, **kwargs): - from dask_cudf.io import read_csv +@to_cudf_dispatch.register((pd.DataFrame, pd.Series, pd.Index)) +def to_cudf_dispatch_from_pandas(data, nan_as_null=None, **kwargs): + _unsupported_kwargs("pandas", "cudf", kwargs) + return cudf.from_pandas(data, nan_as_null=nan_as_null) - return read_csv(*args, **kwargs) - @staticmethod - def read_hdf(*args, **kwargs): - from dask_cudf import from_dask_dataframe +# Define "cudf" backend engine to be registered with Dask +class CudfBackendEntrypoint(DataFrameBackendEntrypoint): + """Backend-entrypoint class for Dask-DataFrame - # HDF5 reader not yet implemented in cudf - warnings.warn( - "read_hdf is not yet implemented in cudf/dask_cudf. " - "Moving to cudf from pandas. Expect poor performance!" - ) - return from_dask_dataframe( - _default_backend(dd.read_hdf, *args, **kwargs) - ) + This class is registered under the name "cudf" for the + ``dask.dataframe.backends`` entrypoint in ``setup.cfg``. + Dask-DataFrame will use the methods defined in this class + in place of ``dask.dataframe.`` when the + "dataframe.backend" configuration is set to "cudf": -except ImportError: - pass + Examples + -------- + >>> import dask + >>> import dask.dataframe as dd + >>> with dask.config.set({"dataframe.backend": "cudf"}): + ... ddf = dd.from_dict({"a": range(10)}) + >>> type(ddf) + + """ + + @classmethod + def to_backend_dispatch(cls): + return to_cudf_dispatch + + @classmethod + def to_backend(cls, data: dd.core._Frame, **kwargs): + if isinstance(data._meta, (cudf.DataFrame, cudf.Series, cudf.Index)): + # Already a cudf-backed collection + _unsupported_kwargs("cudf", "cudf", kwargs) + return data + return data.map_partitions(cls.to_backend_dispatch(), **kwargs) + + @staticmethod + def from_dict( + data, + npartitions, + orient="columns", + dtype=None, + columns=None, + constructor=cudf.DataFrame, + ): + + return _default_backend( + dd.from_dict, + data, + npartitions=npartitions, + orient=orient, + dtype=dtype, + columns=columns, + constructor=constructor, + ) + + @staticmethod + def read_parquet(*args, engine=None, **kwargs): + from dask_cudf.io.parquet import CudfEngine + + return _default_backend( + dd.read_parquet, + *args, + engine=CudfEngine, + **kwargs, + ) + + @staticmethod + def read_json(*args, **kwargs): + from dask_cudf.io.json import read_json + + return read_json(*args, **kwargs) + + @staticmethod + def read_orc(*args, **kwargs): + from dask_cudf.io import read_orc + + return read_orc(*args, **kwargs) + + @staticmethod + def read_csv(*args, **kwargs): + from dask_cudf.io import read_csv + + return read_csv(*args, **kwargs) + + @staticmethod + def read_hdf(*args, **kwargs): + from dask_cudf import from_dask_dataframe + + # HDF5 reader not yet implemented in cudf + warnings.warn( + "read_hdf is not yet implemented in cudf/dask_cudf. " + "Moving to cudf from pandas. Expect poor performance!" + ) + return from_dask_dataframe( + _default_backend(dd.read_hdf, *args, **kwargs) + ) diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index ee8229bc7e8..7f8876c8564 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -6,6 +6,7 @@ import numpy as np import pandas as pd import pytest +from packaging import version import dask from dask import dataframe as dd @@ -31,6 +32,58 @@ def test_from_dict_backend_dispatch(): dd.assert_eq(expect, ddf) +def test_to_backend(): + np.random.seed(0) + data = { + "x": np.random.randint(0, 5, size=10000), + "y": np.random.normal(size=10000), + } + with dask.config.set({"dataframe.backend": "pandas"}): + ddf = dd.from_dict(data, npartitions=2) + assert isinstance(ddf._meta, pd.DataFrame) + + gdf = ddf.to_backend("cudf") + assert isinstance(gdf, dgd.DataFrame) + dd.assert_eq(cudf.DataFrame(data), ddf) + + assert isinstance(gdf.to_backend()._meta, pd.DataFrame) + + +def test_to_backend_kwargs(): + data = {"x": [0, 2, np.nan, 3, 4, 5]} + with dask.config.set({"dataframe.backend": "pandas"}): + dser = dd.from_dict(data, npartitions=2)["x"] + assert isinstance(dser._meta, pd.Series) + + # Using `nan_as_null=False` will result in a cudf-backed + # Series with a NaN element (ranther than ) + gser_nan = dser.to_backend("cudf", nan_as_null=False) + assert isinstance(gser_nan, dgd.Series) + assert np.isnan(gser_nan.compute()).sum() == 1 + + # Using `nan_as_null=True` will result in a cudf-backed + # Series with a element (ranther than NaN) + gser_null = dser.to_backend("cudf", nan_as_null=True) + assert isinstance(gser_null, dgd.Series) + assert np.isnan(gser_null.compute()).sum() == 0 + + # Check `nullable` argument for `cudf.Series.to_pandas` + dser_null = gser_null.to_backend("pandas", nullable=False) + assert dser_null.compute().dtype == "float" + dser_null = gser_null.to_backend("pandas", nullable=True) + assert isinstance(dser_null.compute().dtype, pd.Float64Dtype) + + # Check unsupported arguments + with pytest.raises(ValueError, match="pandas-to-cudf"): + dser.to_backend("cudf", bad_arg=True) + + with pytest.raises(ValueError, match="cudf-to-cudf"): + gser_null.to_backend("cudf", bad_arg=True) + + with pytest.raises(ValueError, match="cudf-to-pandas"): + gser_null.to_backend("pandas", bad_arg=True) + + def test_from_cudf(): np.random.seed(0) @@ -547,8 +600,6 @@ def test_unary_ops(func, gdf, gddf): # Fixed in https://github.com/dask/dask/pull/4657 if isinstance(p, cudf.Index): - from packaging import version - if version.parse(dask.__version__) < version.parse("1.1.6"): pytest.skip( "dask.dataframe assert_eq index check hardcoded to "