diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 2584ac47878..f02c75eb3e8 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -1,5 +1,6 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. +import warnings from collections.abc import Iterator import cupy as cp @@ -8,6 +9,8 @@ import pyarrow as pa from pandas.api.types import is_scalar +import dask.dataframe as dd +from dask import config from dask.dataframe.core import get_parallel_type, meta_nonempty from dask.dataframe.dispatch import ( categorical_dtype_dispatch, @@ -426,3 +429,110 @@ def sizeof_cudf_dataframe(df): @_dask_cudf_nvtx_annotate def sizeof_cudf_series_index(obj): return obj.memory_usage() + + +def _default_backend(func, *args, **kwargs): + # Utility to call a dask.dataframe function with + # the default ("pandas") backend + + # NOTE: Some `CudfBackendEntrypoint` methods need to + # invoke the "pandas"-version of the same method, but + # with custom kwargs (e.g. `engine`). In these cases, + # an explicit "pandas" config context is needed to + # avoid a recursive loop + with config.set({"dataframe.backend": "pandas"}): + return func(*args, **kwargs) + + +try: + + # Define "cudf" backend engine to be registered with Dask + from dask.dataframe.backends import DataFrameBackendEntrypoint + + class CudfBackendEntrypoint(DataFrameBackendEntrypoint): + """Backend-entrypoint class for Dask-DataFrame + + This class is registered under the name "cudf" for the + ``dask.dataframe.backends`` entrypoint in ``setup.cfg``. + Dask-DataFrame will use the methods defined in this class + in place of ``dask.dataframe.`` when the + "dataframe.backend" configuration is set to "cudf": + + Examples + -------- + >>> import dask + >>> import dask.dataframe as dd + >>> with dask.config.set({"dataframe.backend": "cudf"}): + ... ddf = dd.from_dict({"a": range(10)}) + >>> type(ddf) + + """ + + @staticmethod + def from_dict(data, npartitions, orient="columns", **kwargs): + from dask_cudf import from_cudf + + if orient != "columns": + raise ValueError(f"orient={orient} is not supported") + # TODO: Use cudf.from_dict + # (See: https://github.com/rapidsai/cudf/issues/11934) + return from_cudf( + cudf.DataFrame(data), + npartitions=npartitions, + ) + + @staticmethod + def read_parquet(*args, engine=None, **kwargs): + from dask_cudf.io.parquet import CudfEngine + + return _default_backend( + dd.read_parquet, + *args, + engine=CudfEngine, + **kwargs, + ) + + @staticmethod + def read_json(*args, engine=None, **kwargs): + return _default_backend( + dd.read_json, + *args, + engine=cudf.read_json, + **kwargs, + ) + + @staticmethod + def read_orc(*args, **kwargs): + from dask_cudf.io import read_orc + + return read_orc(*args, **kwargs) + + @staticmethod + def read_csv(*args, **kwargs): + from dask_cudf.io import read_csv + + chunksize = kwargs.pop("chunksize", None) + blocksize = kwargs.pop("blocksize", "default") + if chunksize is None and blocksize != "default": + chunksize = blocksize + return read_csv( + *args, + chunksize=chunksize, + **kwargs, + ) + + @staticmethod + def read_hdf(*args, **kwargs): + from dask_cudf import from_dask_dataframe + + # HDF5 reader not yet implemented in cudf + warnings.warn( + "read_hdf is not yet implemented in cudf/dask_cudf. " + "Moving to cudf from pandas. Expect poor performance!" + ) + return from_dask_dataframe( + _default_backend(dd.read_hdf, *args, **kwargs) + ) + +except ImportError: + pass diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index 0bf39df313a..76705e7cbf1 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -2,10 +2,10 @@ import math import warnings -from distutils.version import LooseVersion import numpy as np import pandas as pd +from packaging.version import parse as parse_version from tlz import partition_all import dask @@ -31,7 +31,11 @@ from dask_cudf.accessors import ListMethods, StructMethods from dask_cudf.sorting import _get_shuffle_type -DASK_VERSION = LooseVersion(dask.__version__) +DASK_BACKEND_SUPPORT = parse_version(dask.__version__) >= parse_version( + "2022.10.0" +) +# TODO: Remove DASK_BACKEND_SUPPORT throughout codebase +# when dask_cudf is pinned to dask>=2022.10.0 class _Frame(dd.core._Frame, OperatorMethodMixin): @@ -736,7 +740,7 @@ def from_dask_dataframe(df): return df.map_partitions(cudf.from_pandas) -for name in [ +for name in ( "add", "sub", "mul", @@ -751,16 +755,13 @@ def from_dask_dataframe(df): "rfloordiv", "rmod", "rpow", -]: +): meth = getattr(cudf.DataFrame, name) - kwargs = {"original": cudf.DataFrame} if DASK_VERSION >= "2.11.1" else {} - DataFrame._bind_operator_method(name, meth, **kwargs) + DataFrame._bind_operator_method(name, meth, original=cudf.Series) meth = getattr(cudf.Series, name) - kwargs = {"original": cudf.Series} if DASK_VERSION >= "2.11.1" else {} - Series._bind_operator_method(name, meth, **kwargs) + Series._bind_operator_method(name, meth, original=cudf.Series) -for name in ["lt", "gt", "le", "ge", "ne", "eq"]: +for name in ("lt", "gt", "le", "ge", "ne", "eq"): meth = getattr(cudf.Series, name) - kwargs = {"original": cudf.Series} if DASK_VERSION >= "2.11.1" else {} - Series._bind_comparison_method(name, meth, **kwargs) + Series._bind_comparison_method(name, meth, original=cudf.Series) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_csv.py b/python/dask_cudf/dask_cudf/io/tests/test_csv.py index 564a719fb86..7f69e208b5a 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_csv.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_csv.py @@ -16,6 +16,22 @@ import dask_cudf +@pytest.mark.skipif( + not dask_cudf.core.DASK_BACKEND_SUPPORT, + reason="No backend-dispatch support", +) +def test_csv_roundtrip_backend_dispatch(tmp_path): + # Test ddf.read_csv cudf-backend dispatch + df = cudf.DataFrame({"x": [1, 2, 3, 4], "id": ["a", "b", "c", "d"]}) + ddf = dask_cudf.from_cudf(df, npartitions=2) + csv_path = str(tmp_path / "data-*.csv") + ddf.to_csv(csv_path, index=False) + with dask.config.set({"dataframe.backend": "cudf"}): + ddf2 = dd.read_csv(csv_path) + assert isinstance(ddf2, dask_cudf.DataFrame) + dd.assert_eq(ddf, ddf2, check_divisions=False, check_index=False) + + def test_csv_roundtrip(tmp_path): df = cudf.DataFrame({"x": [1, 2, 3, 4], "id": ["a", "b", "c", "d"]}) ddf = dask_cudf.from_cudf(df, npartitions=2) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_json.py b/python/dask_cudf/dask_cudf/io/tests/test_json.py index 3f854bb343b..d19f7736e8e 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_json.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_json.py @@ -12,6 +12,23 @@ import dask_cudf +@pytest.mark.skipif( + not dask_cudf.core.DASK_BACKEND_SUPPORT, + reason="No backend-dispatch support", +) +def test_read_json_backend_dispatch(tmp_path): + # Test ddf.read_json cudf-backend dispatch + df1 = dask.datasets.timeseries( + dtypes={"x": int, "y": int}, freq="120s" + ).reset_index(drop=True) + json_path = str(tmp_path / "data-*.json") + df1.to_json(json_path) + with dask.config.set({"dataframe.backend": "cudf"}): + df2 = dd.read_json(json_path) + assert isinstance(df2, dask_cudf.DataFrame) + dd.assert_eq(df1, df2) + + def test_read_json(tmp_path): df1 = dask.datasets.timeseries( dtypes={"x": int, "y": int}, freq="120s" diff --git a/python/dask_cudf/dask_cudf/io/tests/test_orc.py b/python/dask_cudf/dask_cudf/io/tests/test_orc.py index f19396a9b37..2291dfba536 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_orc.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_orc.py @@ -6,18 +6,30 @@ import pytest +import dask from dask import dataframe as dd import cudf import dask_cudf -# import pyarrow.orc as orc - cur_dir = os.path.dirname(__file__) sample_orc = os.path.join(cur_dir, "data/orc/sample.orc") +@pytest.mark.skipif( + not dask_cudf.core.DASK_BACKEND_SUPPORT, + reason="No backend-dispatch support", +) +def test_read_orc_backend_dispatch(): + # Test ddf.read_orc cudf-backend dispatch + df1 = cudf.read_orc(sample_orc) + with dask.config.set({"dataframe.backend": "cudf"}): + df2 = dd.read_orc(sample_orc) + assert isinstance(df2, dask_cudf.DataFrame) + dd.assert_eq(df1, df2, check_index=False) + + def test_read_orc_defaults(): df1 = cudf.read_orc(sample_orc) df2 = dask_cudf.read_orc(sample_orc) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index ef5741b0539..7b9f926da3f 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -46,6 +46,20 @@ def _divisions(setting): return {"gather_statistics": setting} +@pytest.mark.skipif( + not dask_cudf.core.DASK_BACKEND_SUPPORT, + reason="No backend-dispatch support", +) +def test_roundtrip_backend_dispatch(tmpdir): + # Test ddf.read_parquet cudf-backend dispatch + tmpdir = str(tmpdir) + ddf.to_parquet(tmpdir, engine="pyarrow") + with dask.config.set({"dataframe.backend": "cudf"}): + ddf2 = dd.read_parquet(tmpdir, index=False) + assert isinstance(ddf2, dask_cudf.DataFrame) + dd.assert_eq(ddf.reset_index(drop=False), ddf2) + + @pytest.mark.parametrize("write_metadata_file", [True, False]) @pytest.mark.parametrize("divisions", [True, False]) def test_roundtrip_from_dask(tmpdir, divisions, write_metadata_file): diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index 40041fd5c0e..f7c46466705 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -17,6 +17,23 @@ import dask_cudf as dgd +@pytest.mark.skipif( + not dgd.core.DASK_BACKEND_SUPPORT, reason="No backend-dispatch support" +) +def test_from_dict_backend_dispatch(): + # Test ddf.from_dict cudf-backend dispatch + np.random.seed(0) + data = { + "x": np.random.randint(0, 5, size=10000), + "y": np.random.normal(size=10000), + } + expect = cudf.DataFrame(data) + with dask.config.set({"dataframe.backend": "cudf"}): + ddf = dd.from_dict(data, npartitions=2) + assert isinstance(ddf, dgd.DataFrame) + dd.assert_eq(expect, ddf) + + def test_from_cudf(): np.random.seed(0) diff --git a/python/dask_cudf/setup.cfg b/python/dask_cudf/setup.cfg index 8f4c2029a87..f45bdf00430 100644 --- a/python/dask_cudf/setup.cfg +++ b/python/dask_cudf/setup.cfg @@ -38,3 +38,7 @@ skip= buck-out build dist + +[options.entry_points] +dask.dataframe.backends = + cudf = dask_cudf.backends:CudfBackendEntrypoint