Skip to content

Commit

Permalink
Support conversion to/from cudf in dask.dataframe.core.to_backend (#1…
Browse files Browse the repository at this point in the history
…2380)

This PR corresponds to the `cudf` component of dask/dask#9758

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #12380
  • Loading branch information
rjzamora authored Feb 8, 2023
1 parent fea6288 commit b8ae0e4
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 79 deletions.
194 changes: 117 additions & 77 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

import dask.dataframe as dd
from dask import config
from dask.dataframe.backends import (
DataFrameBackendEntrypoint,
PandasBackendEntrypoint,
)
from dask.dataframe.core import get_parallel_type, meta_nonempty
from dask.dataframe.dispatch import (
categorical_dtype_dispatch,
Expand All @@ -30,7 +34,7 @@
make_meta_obj,
)
from dask.sizeof import sizeof as sizeof_dispatch
from dask.utils import is_arraylike
from dask.utils import Dispatch, is_arraylike

import cudf
from cudf.api.types import is_string_dtype
Expand Down Expand Up @@ -446,91 +450,127 @@ def _default_backend(func, *args, **kwargs):
return func(*args, **kwargs)


try:
def _unsupported_kwargs(old, new, kwargs):
# Utility to raise a meaningful error when
# unsupported kwargs are encountered within
# ``to_backend_dispatch``
if kwargs:
raise ValueError(
f"Unsupported key-word arguments used in `to_backend` "
f"for {old}-to-{new} conversion: {kwargs}"
)

# Define "cudf" backend engine to be registered with Dask
from dask.dataframe.backends import DataFrameBackendEntrypoint

class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for Dask-DataFrame
This class is registered under the name "cudf" for the
``dask.dataframe.backends`` entrypoint in ``setup.cfg``.
Dask-DataFrame will use the methods defined in this class
in place of ``dask.dataframe.<creation-method>`` when the
"dataframe.backend" configuration is set to "cudf":
Examples
--------
>>> import dask
>>> import dask.dataframe as dd
>>> with dask.config.set({"dataframe.backend": "cudf"}):
... ddf = dd.from_dict({"a": range(10)})
>>> type(ddf)
<class 'dask_cudf.core.DataFrame'>
"""

@staticmethod
def from_dict(
data,
npartitions,
orient="columns",
dtype=None,
columns=None,
constructor=cudf.DataFrame,
):

return _default_backend(
dd.from_dict,
data,
npartitions=npartitions,
orient=orient,
dtype=dtype,
columns=columns,
constructor=constructor,
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf.io.parquet import CudfEngine
# Register cudf->pandas
to_pandas_dispatch = PandasBackendEntrypoint.to_backend_dispatch()

return _default_backend(
dd.read_parquet,
*args,
engine=CudfEngine,
**kwargs,
)

@staticmethod
def read_json(*args, **kwargs):
from dask_cudf.io.json import read_json
@to_pandas_dispatch.register((cudf.DataFrame, cudf.Series, cudf.Index))
def to_pandas_dispatch_from_cudf(data, nullable=False, **kwargs):
_unsupported_kwargs("cudf", "pandas", kwargs)
return data.to_pandas(nullable=nullable)

return read_json(*args, **kwargs)

@staticmethod
def read_orc(*args, **kwargs):
from dask_cudf.io import read_orc
# Register pandas->cudf
to_cudf_dispatch = Dispatch("to_cudf_dispatch")

return read_orc(*args, **kwargs)

@staticmethod
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv
@to_cudf_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
def to_cudf_dispatch_from_pandas(data, nan_as_null=None, **kwargs):
_unsupported_kwargs("pandas", "cudf", kwargs)
return cudf.from_pandas(data, nan_as_null=nan_as_null)

return read_csv(*args, **kwargs)

@staticmethod
def read_hdf(*args, **kwargs):
from dask_cudf import from_dask_dataframe
# Define "cudf" backend engine to be registered with Dask
class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for Dask-DataFrame
# HDF5 reader not yet implemented in cudf
warnings.warn(
"read_hdf is not yet implemented in cudf/dask_cudf. "
"Moving to cudf from pandas. Expect poor performance!"
)
return from_dask_dataframe(
_default_backend(dd.read_hdf, *args, **kwargs)
)
This class is registered under the name "cudf" for the
``dask.dataframe.backends`` entrypoint in ``setup.cfg``.
Dask-DataFrame will use the methods defined in this class
in place of ``dask.dataframe.<creation-method>`` when the
"dataframe.backend" configuration is set to "cudf":
except ImportError:
pass
Examples
--------
>>> import dask
>>> import dask.dataframe as dd
>>> with dask.config.set({"dataframe.backend": "cudf"}):
... ddf = dd.from_dict({"a": range(10)})
>>> type(ddf)
<class 'dask_cudf.core.DataFrame'>
"""

@classmethod
def to_backend_dispatch(cls):
return to_cudf_dispatch

@classmethod
def to_backend(cls, data: dd.core._Frame, **kwargs):
if isinstance(data._meta, (cudf.DataFrame, cudf.Series, cudf.Index)):
# Already a cudf-backed collection
_unsupported_kwargs("cudf", "cudf", kwargs)
return data
return data.map_partitions(cls.to_backend_dispatch(), **kwargs)

@staticmethod
def from_dict(
data,
npartitions,
orient="columns",
dtype=None,
columns=None,
constructor=cudf.DataFrame,
):

return _default_backend(
dd.from_dict,
data,
npartitions=npartitions,
orient=orient,
dtype=dtype,
columns=columns,
constructor=constructor,
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf.io.parquet import CudfEngine

return _default_backend(
dd.read_parquet,
*args,
engine=CudfEngine,
**kwargs,
)

@staticmethod
def read_json(*args, **kwargs):
from dask_cudf.io.json import read_json

return read_json(*args, **kwargs)

@staticmethod
def read_orc(*args, **kwargs):
from dask_cudf.io import read_orc

return read_orc(*args, **kwargs)

@staticmethod
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv

return read_csv(*args, **kwargs)

@staticmethod
def read_hdf(*args, **kwargs):
from dask_cudf import from_dask_dataframe

# HDF5 reader not yet implemented in cudf
warnings.warn(
"read_hdf is not yet implemented in cudf/dask_cudf. "
"Moving to cudf from pandas. Expect poor performance!"
)
return from_dask_dataframe(
_default_backend(dd.read_hdf, *args, **kwargs)
)
55 changes: 53 additions & 2 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pandas as pd
import pytest
from packaging import version

import dask
from dask import dataframe as dd
Expand All @@ -31,6 +32,58 @@ def test_from_dict_backend_dispatch():
dd.assert_eq(expect, ddf)


def test_to_backend():
np.random.seed(0)
data = {
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
with dask.config.set({"dataframe.backend": "pandas"}):
ddf = dd.from_dict(data, npartitions=2)
assert isinstance(ddf._meta, pd.DataFrame)

gdf = ddf.to_backend("cudf")
assert isinstance(gdf, dgd.DataFrame)
dd.assert_eq(cudf.DataFrame(data), ddf)

assert isinstance(gdf.to_backend()._meta, pd.DataFrame)


def test_to_backend_kwargs():
data = {"x": [0, 2, np.nan, 3, 4, 5]}
with dask.config.set({"dataframe.backend": "pandas"}):
dser = dd.from_dict(data, npartitions=2)["x"]
assert isinstance(dser._meta, pd.Series)

# Using `nan_as_null=False` will result in a cudf-backed
# Series with a NaN element (ranther than <NA>)
gser_nan = dser.to_backend("cudf", nan_as_null=False)
assert isinstance(gser_nan, dgd.Series)
assert np.isnan(gser_nan.compute()).sum() == 1

# Using `nan_as_null=True` will result in a cudf-backed
# Series with a <NA> element (ranther than NaN)
gser_null = dser.to_backend("cudf", nan_as_null=True)
assert isinstance(gser_null, dgd.Series)
assert np.isnan(gser_null.compute()).sum() == 0

# Check `nullable` argument for `cudf.Series.to_pandas`
dser_null = gser_null.to_backend("pandas", nullable=False)
assert dser_null.compute().dtype == "float"
dser_null = gser_null.to_backend("pandas", nullable=True)
assert isinstance(dser_null.compute().dtype, pd.Float64Dtype)

# Check unsupported arguments
with pytest.raises(ValueError, match="pandas-to-cudf"):
dser.to_backend("cudf", bad_arg=True)

with pytest.raises(ValueError, match="cudf-to-cudf"):
gser_null.to_backend("cudf", bad_arg=True)

with pytest.raises(ValueError, match="cudf-to-pandas"):
gser_null.to_backend("pandas", bad_arg=True)


def test_from_cudf():
np.random.seed(0)

Expand Down Expand Up @@ -547,8 +600,6 @@ def test_unary_ops(func, gdf, gddf):

# Fixed in https://github.com/dask/dask/pull/4657
if isinstance(p, cudf.Index):
from packaging import version

if version.parse(dask.__version__) < version.parse("1.1.6"):
pytest.skip(
"dask.dataframe assert_eq index check hardcoded to "
Expand Down

0 comments on commit b8ae0e4

Please sign in to comment.