Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support conversion to/from cudf in dask.dataframe.core.to_backend #12380

Merged
merged 24 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fd190ab
support DataFrame.to_backend in dask_cudf
rjzamora Dec 13, 2022
29eb466
formatting
rjzamora Dec 13, 2022
5df1191
Merge branch 'branch-23.02' into to-backend
rjzamora Jan 27, 2023
b5b8337
trigger formatting
rjzamora Jan 27, 2023
ceff3e2
roll back comment chang
rjzamora Jan 27, 2023
a995644
Merge branch 'branch-23.02' into to-backend
rjzamora Jan 30, 2023
a08e7d1
update test
rjzamora Jan 30, 2023
7592fd5
remove try/except
rjzamora Jan 30, 2023
0c6e8f1
Merge remote-tracking branch 'upstream/branch-23.02' into to-backend
rjzamora Jan 30, 2023
6c72cb7
move to 23.04
rjzamora Jan 31, 2023
d685001
Merge remote-tracking branch 'upstream/branch-23.04' into to-backend
rjzamora Jan 31, 2023
7916664
improve test coverage
rjzamora Jan 31, 2023
d89825e
Merge branch 'branch-23.04' into to-backend
rjzamora Jan 31, 2023
351232b
Merge branch 'branch-23.04' into to-backend
rjzamora Jan 31, 2023
ed04e3c
Merge branch 'branch-23.04' into to-backend
rjzamora Feb 1, 2023
b0c0193
Merge remote-tracking branch 'upstream/branch-23.04' into to-backend
rjzamora Feb 1, 2023
7624864
roll back unnecessary change
rjzamora Feb 1, 2023
0242b5e
Merge branch 'to-backend' of https://github.com/rjzamora/cudf into to…
rjzamora Feb 1, 2023
46a99cb
Merge branch 'branch-23.04' into to-backend
rjzamora Feb 2, 2023
d56ff3f
Merge branch 'branch-23.04' into to-backend
rjzamora Feb 3, 2023
e3239b0
Merge branch 'branch-23.04' into to-backend
rjzamora Feb 6, 2023
1099f8b
Merge branch 'branch-23.04' into to-backend
rjzamora Feb 6, 2023
7d10ccf
Merge branch 'branch-23.04' into to-backend
rjzamora Feb 7, 2023
8f947b4
Merge branch 'branch-23.04' into to-backend
galipremsagar Feb 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 107 additions & 77 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

import dask.dataframe as dd
from dask import config
from dask.dataframe.backends import (
DataFrameBackendEntrypoint,
PandasBackendEntrypoint,
)
from dask.dataframe.core import get_parallel_type, meta_nonempty
from dask.dataframe.dispatch import (
categorical_dtype_dispatch,
Expand All @@ -30,7 +34,7 @@
make_meta_obj,
)
from dask.sizeof import sizeof as sizeof_dispatch
from dask.utils import is_arraylike
from dask.utils import Dispatch, is_arraylike

import cudf
from cudf.api.types import is_string_dtype
Expand Down Expand Up @@ -446,91 +450,117 @@ def _default_backend(func, *args, **kwargs):
return func(*args, **kwargs)


try:
# TODO: Skip this check when Dask>=2023.1.1 is required
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
if hasattr(PandasBackendEntrypoint, "to_backend_dispatch"):
to_pandas_dispatch = PandasBackendEntrypoint.to_backend_dispatch()

# Define "cudf" backend engine to be registered with Dask
from dask.dataframe.backends import DataFrameBackendEntrypoint

class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for Dask-DataFrame

This class is registered under the name "cudf" for the
``dask.dataframe.backends`` entrypoint in ``setup.cfg``.
Dask-DataFrame will use the methods defined in this class
in place of ``dask.dataframe.<creation-method>`` when the
"dataframe.backend" configuration is set to "cudf":

Examples
--------
>>> import dask
>>> import dask.dataframe as dd
>>> with dask.config.set({"dataframe.backend": "cudf"}):
... ddf = dd.from_dict({"a": range(10)})
>>> type(ddf)
<class 'dask_cudf.core.DataFrame'>
"""

@staticmethod
def from_dict(
data,
npartitions,
orient="columns",
dtype=None,
columns=None,
constructor=cudf.DataFrame,
):

return _default_backend(
dd.from_dict,
data,
npartitions=npartitions,
orient=orient,
dtype=dtype,
columns=columns,
constructor=constructor,
)
@to_pandas_dispatch.register((cudf.DataFrame, cudf.Series, cudf.Index))
def to_pandas_dispatch_from_cudf(data, **kwargs):
return data.to_pandas(**kwargs)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf.io.parquet import CudfEngine

return _default_backend(
dd.read_parquet,
*args,
engine=CudfEngine,
**kwargs,
)
to_cudf_dispatch = Dispatch("to_cudf_dispatch")

@staticmethod
def read_json(*args, **kwargs):
from dask_cudf.io.json import read_json

return read_json(*args, **kwargs)
@to_cudf_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
def to_cudf_dispatch_from_pandas(data, **kwargs):
return cudf.from_pandas(data, **kwargs)

@staticmethod
def read_orc(*args, **kwargs):
from dask_cudf.io import read_orc

return read_orc(*args, **kwargs)
@to_cudf_dispatch.register((cudf.DataFrame, cudf.Series, cudf.Index))
def to_cudf_dispatch_from_cudf(data, **kwargs):
return data

@staticmethod
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv

return read_csv(*args, **kwargs)
# Define "cudf" backend engine to be registered with Dask
class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for Dask-DataFrame

@staticmethod
def read_hdf(*args, **kwargs):
from dask_cudf import from_dask_dataframe
This class is registered under the name "cudf" for the
``dask.dataframe.backends`` entrypoint in ``setup.cfg``.
Dask-DataFrame will use the methods defined in this class
in place of ``dask.dataframe.<creation-method>`` when the
"dataframe.backend" configuration is set to "cudf":

# HDF5 reader not yet implemented in cudf
warnings.warn(
"read_hdf is not yet implemented in cudf/dask_cudf. "
"Moving to cudf from pandas. Expect poor performance!"
)
return from_dask_dataframe(
_default_backend(dd.read_hdf, *args, **kwargs)
)
Examples
--------
>>> import dask
>>> import dask.dataframe as dd
>>> with dask.config.set({"dataframe.backend": "cudf"}):
... ddf = dd.from_dict({"a": range(10)})
>>> type(ddf)
<class 'dask_cudf.core.DataFrame'>
"""

except ImportError:
pass
@classmethod
def to_backend_dispatch(cls):
return to_cudf_dispatch

@classmethod
def to_backend(cls, data: dd.core._Frame, **kwargs):
if isinstance(data._meta, (cudf.DataFrame, cudf.Series, cudf.Index)):
# Already a cudf-backed collection
return data
return data.map_partitions(cls.to_backend_dispatch(), **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is a question for the dask side of the implementation, but under what circumstances can kwargs be non-empty (and then relatedly, are there circumstances in which we already have a cudf-backed collection where non-empty kwargs could semantically require something different than data.map_partitions(identity))?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I updated things a bit to be explicit about supported/unsupported key-word arguments. I also added a bit more test coverage.

Context: We (probably) do want to make it possible to do things like ddf.to_backend("cudf", nan_as_null=False). However, I'm still unsure of the best way to document the supported to_backend kwargs for the various conversions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, can we link to the registered backend dispatch function which could explain what it accepts as keyword arguments?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll give this a think, but not sure how/where to link to the dispatch-function args, since the available arguments to the user-facing API (to_backend) will depend on the specified backend. Any ideas?


@staticmethod
def from_dict(
data,
npartitions,
orient="columns",
dtype=None,
columns=None,
constructor=cudf.DataFrame,
):

return _default_backend(
dd.from_dict,
data,
npartitions=npartitions,
orient=orient,
dtype=dtype,
columns=columns,
constructor=constructor,
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf.io.parquet import CudfEngine

return _default_backend(
dd.read_parquet,
*args,
engine=CudfEngine,
**kwargs,
)

@staticmethod
def read_json(*args, **kwargs):
from dask_cudf.io.json import read_json

return read_json(*args, **kwargs)

@staticmethod
def read_orc(*args, **kwargs):
from dask_cudf.io import read_orc

return read_orc(*args, **kwargs)

@staticmethod
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv

return read_csv(*args, **kwargs)

@staticmethod
def read_hdf(*args, **kwargs):
from dask_cudf import from_dask_dataframe

# HDF5 reader not yet implemented in cudf
warnings.warn(
"read_hdf is not yet implemented in cudf/dask_cudf. "
"Moving to cudf from pandas. Expect poor performance!"
)
return from_dask_dataframe(
_default_backend(dd.read_hdf, *args, **kwargs)
)
25 changes: 23 additions & 2 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pandas as pd
import pytest
from packaging import version

import dask
from dask import dataframe as dd
Expand All @@ -31,6 +32,28 @@ def test_from_dict_backend_dispatch():
dd.assert_eq(expect, ddf)


def test_to_backend():
# Test DataFrame.to_backend
# Requires Dask>=2023.1.1
if version.parse(dask.__version__) < version.parse("2023.1.1"):
pytest.skip("to_backend not supported in this version of Dask.")
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

np.random.seed(0)
data = {
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
with dask.config.set({"dataframe.backend": "pandas"}):
ddf = dd.from_dict(data, npartitions=2)
assert isinstance(ddf._meta, pd.DataFrame)

gdf = ddf.to_backend("cudf")
assert isinstance(gdf, dgd.DataFrame)
dd.assert_eq(cudf.DataFrame(data), ddf)

assert isinstance(gdf.to_backend()._meta, pd.DataFrame)


def test_from_cudf():
np.random.seed(0)

Expand Down Expand Up @@ -547,8 +570,6 @@ def test_unary_ops(func, gdf, gddf):

# Fixed in https://github.com/dask/dask/pull/4657
if isinstance(p, cudf.Index):
from packaging import version

if version.parse(dask.__version__) < version.parse("1.1.6"):
pytest.skip(
"dask.dataframe assert_eq index check hardcoded to "
Expand Down