From e1edd8a30b28c837d51ad31d06137225694b8aef Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 12 Jan 2024 10:46:24 -0800 Subject: [PATCH 01/13] fix to_backend --- dask_expr/_collection.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index de03efcb..61ae815d 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -1275,9 +1275,14 @@ def to_backend(self, backend: str | None = None, **kwargs): ------- DataFrame, Series or Index """ - from dask.dataframe.io import to_backend - - return to_backend(self.to_dask_dataframe(), backend=backend, **kwargs) + from dask.dataframe.backends import dataframe_creation_dispatch + + # Get desired backend + backend = backend or dataframe_creation_dispatch.backend + # Check that "backend" has a registered entrypoint + backend_entrypoint = dataframe_creation_dispatch.dispatch(backend) + # Call `DataFrameBackendEntrypoint.to_backend` + return backend_entrypoint.to_backend(self, **kwargs) def dot(self, other, meta=no_default): if not isinstance(other, FrameBase): From 24ce9a4752200cf3700b09fe6cf230b8b51d7f65 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 12 Jan 2024 15:04:51 -0800 Subject: [PATCH 02/13] add creation dispatching to dask-expr --- dask_expr/_backends.py | 64 ++++++++++++++++++++++++++++++++++++++++ dask_expr/_collection.py | 6 +++- 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 dask_expr/_backends.py diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py new file mode 100644 index 00000000..2225a8c7 --- /dev/null +++ b/dask_expr/_backends.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import pandas as pd +from dask.backends import CreationDispatch, detect_entrypoints +from dask.dataframe.backends import DataFrameBackendEntrypoint + + +class DaskExprCreationDispatch(CreationDispatch): + """Dask-Expr version of CreationDispatch + + TODO: This code can all go away if CreationDispatch + makes it possible to override the entrypoint path. + We just want to allow external libraries to expose + a dask-expr entrypoint and dask (legacy) entrypoint + at the same time. + """ + + def detect_entrypoints(self): + return detect_entrypoints(f"dask-expr.{self._module_name}.backends") + + def dispatch(self, backend: str): + """Return the desired backend entrypoint""" + try: + impl = self._lookup[backend] + except KeyError: + # Check entrypoints for the specified backend + entrypoints = self.detect_entrypoints() + if backend in entrypoints: + return self.register_backend(backend, entrypoints[backend].load()()) + else: + return impl + raise ValueError(f"No backend dispatch registered for {backend}") + + +dataframe_creation_dispatch = DaskExprCreationDispatch( + module_name="dataframe", + default="pandas", + entrypoint_class=DataFrameBackendEntrypoint, + name="dataframe_creation_dispatch", +) + + +class PandasBackendEntrypoint(DataFrameBackendEntrypoint): + """Pandas-Backend Entrypoint Class for Dask-Expressions + + Note that all DataFrame-creation functions are defined + and registered 'in-place'. + """ + + @classmethod + def to_backend_dispatch(cls): + from dask.dataframe.dispatch import to_pandas_dispatch + + return to_pandas_dispatch + + @classmethod + def to_backend(cls, data, **kwargs): + if isinstance(data._meta, (pd.DataFrame, pd.Series, pd.Index)): + # Already a pandas-backed collection + return data + return data.map_partitions(cls.to_backend_dispatch(), **kwargs) + + +dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint()) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 61ae815d..842df392 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -55,6 +55,7 @@ from dask_expr import _expr as expr from dask_expr._align import AlignPartitions +from dask_expr._backends import dataframe_creation_dispatch from dask_expr._categorical import CategoricalAccessor, Categorize, GetCategories from dask_expr._concat import Concat from dask_expr._datetime import DatetimeAccessor @@ -1275,7 +1276,7 @@ def to_backend(self, backend: str | None = None, **kwargs): ------- DataFrame, Series or Index """ - from dask.dataframe.backends import dataframe_creation_dispatch + from dask_expr._backends import dataframe_creation_dispatch # Get desired backend backend = backend or dataframe_creation_dispatch.backend @@ -2710,6 +2711,7 @@ def from_graph(*args, **kwargs): return new_collection(FromGraph(*args, **kwargs)) +@dataframe_creation_dispatch.register_inplace("pandas") def from_dict( data, npartitions, @@ -2786,6 +2788,7 @@ def from_dask_array(x, columns=None, index=None, meta=None): return from_dask_dataframe(df, optimize=True) +@dataframe_creation_dispatch.register_inplace("pandas") def read_csv( path, *args, @@ -2836,6 +2839,7 @@ def read_table( ) +@dataframe_creation_dispatch.register_inplace("pandas") def read_parquet( path=None, columns=None, From f2e54040767947595db7a97d0626f21d23cd5835 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 12 Jan 2024 15:32:07 -0800 Subject: [PATCH 03/13] Rely on changes in dask/dask#10794 --- dask_expr/_backends.py | 33 +++------------------------------ 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 2225a8c7..59a72f70 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -1,41 +1,14 @@ from __future__ import annotations import pandas as pd -from dask.backends import CreationDispatch, detect_entrypoints +from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint - -class DaskExprCreationDispatch(CreationDispatch): - """Dask-Expr version of CreationDispatch - - TODO: This code can all go away if CreationDispatch - makes it possible to override the entrypoint path. - We just want to allow external libraries to expose - a dask-expr entrypoint and dask (legacy) entrypoint - at the same time. - """ - - def detect_entrypoints(self): - return detect_entrypoints(f"dask-expr.{self._module_name}.backends") - - def dispatch(self, backend: str): - """Return the desired backend entrypoint""" - try: - impl = self._lookup[backend] - except KeyError: - # Check entrypoints for the specified backend - entrypoints = self.detect_entrypoints() - if backend in entrypoints: - return self.register_backend(backend, entrypoints[backend].load()()) - else: - return impl - raise ValueError(f"No backend dispatch registered for {backend}") - - -dataframe_creation_dispatch = DaskExprCreationDispatch( +dataframe_creation_dispatch = CreationDispatch( module_name="dataframe", default="pandas", entrypoint_class=DataFrameBackendEntrypoint, + entrypoint_root="dask-expr", # Differs from `dask.dataframe` name="dataframe_creation_dispatch", ) From 5acdbc934ddffdbeb47392b09149a28537dc652e Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 12 Jan 2024 16:01:12 -0800 Subject: [PATCH 04/13] use dask_expr instead of dask-expr --- dask_expr/_backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 59a72f70..24cf25ac 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -8,7 +8,7 @@ module_name="dataframe", default="pandas", entrypoint_class=DataFrameBackendEntrypoint, - entrypoint_root="dask-expr", # Differs from `dask.dataframe` + entrypoint_root="dask_expr", # Differs from `dask.dataframe` name="dataframe_creation_dispatch", ) From 9f753ab6d2024d4ee50726a26d2c62d3733b7af8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 16 Jan 2024 12:28:41 -0800 Subject: [PATCH 05/13] add get_collection_type dispatch --- dask_expr/_backends.py | 62 ++++++++++++++++++++++++++++++++++++++-- dask_expr/_collection.py | 17 ++--------- 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 24cf25ac..8529b2ea 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -4,11 +4,31 @@ from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint -dataframe_creation_dispatch = CreationDispatch( +from dask_expr._dispatch import get_collection_type + + +class DXCreationDispatch(CreationDispatch): + """Dask-expressions version of CreationDispatch""" + + # TODO Remove after https://github.com/dask/dask/pull/10794 + def dispatch(self, backend: str): + from dask.backends import detect_entrypoints + + try: + impl = self._lookup[backend] + except KeyError: + entrypoints = detect_entrypoints(f"dask-expr.{self._module_name}.backends") + if backend in entrypoints: + return self.register_backend(backend, entrypoints[backend].load()()) + else: + return impl + raise ValueError(f"No backend dispatch registered for {backend}") + + +dataframe_creation_dispatch = DXCreationDispatch( module_name="dataframe", default="pandas", entrypoint_class=DataFrameBackendEntrypoint, - entrypoint_root="dask_expr", # Differs from `dask.dataframe` name="dataframe_creation_dispatch", ) @@ -35,3 +55,41 @@ def to_backend(cls, data, **kwargs): dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint()) + + +@get_collection_type.register(pd.Series) +def get_collection_type_series(_): + from dask_expr._collection import Series + + return Series + + +@get_collection_type.register(pd.DataFrame) +def get_collection_type_dataframe(_): + from dask_expr._collection import DataFrame + + return DataFrame + + +@get_collection_type.register(pd.Index) +def get_collection_type_index(_): + from dask_expr._collection import Index + + return Index + + +@get_collection_type.register(object) +def get_collection_type_object(_): + from dask_expr._collection import Scalar + + return Scalar + + +###################################### +# cuDF: Pandas Dataframes on the GPU # +###################################### + + +@get_collection_type.register_lazy("cudf") +def _register_cudf(): + import dask_cudf # noqa: F401 diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 4a36fd43..72eafefa 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -25,7 +25,6 @@ has_parallel_type, is_arraylike, is_dataframe_like, - is_index_like, is_series_like, meta_warning, new_dd_object, @@ -56,7 +55,6 @@ from dask_expr import _expr as expr from dask_expr._align import AlignPartitions -from dask_expr._backends import dataframe_creation_dispatch from dask_expr._categorical import CategoricalAccessor, Categorize, GetCategories from dask_expr._concat import Concat from dask_expr._datetime import DatetimeAccessor @@ -1954,7 +1952,7 @@ def __getattr__(self, key): # Check if key is in columns if key # is not a normal attribute if key in self.expr._meta.columns: - return Series(self.expr[key]) + return new_collection(self.expr[key]) raise err except AttributeError: # Fall back to `BaseFrame.__getattr__` @@ -3073,17 +3071,11 @@ def __array__(self): def new_collection(expr): """Create new collection from an expr""" + from dask_expr._dispatch import get_collection_type meta = expr._meta expr._name # Ensure backend is imported - if is_dataframe_like(meta): - return DataFrame(expr) - elif is_series_like(meta): - return Series(expr) - elif is_index_like(meta): - return Index(expr) - else: - return Scalar(expr) + return get_collection_type(meta)(expr) def optimize(collection, fuse=True): @@ -3150,7 +3142,6 @@ def from_graph(*args, **kwargs): return new_collection(FromGraph(*args, **kwargs)) -@dataframe_creation_dispatch.register_inplace("pandas") def from_dict( data, npartitions, @@ -3227,7 +3218,6 @@ def from_dask_array(x, columns=None, index=None, meta=None): return from_dask_dataframe(df, optimize=True) -@dataframe_creation_dispatch.register_inplace("pandas") def read_csv( path, *args, @@ -3278,7 +3268,6 @@ def read_table( ) -@dataframe_creation_dispatch.register_inplace("pandas") def read_parquet( path=None, columns=None, From ab095a7db4cdf925cbf86aa052ada06ebe28c645 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 16 Jan 2024 12:29:04 -0800 Subject: [PATCH 06/13] add missing file --- dask_expr/_dispatch.py | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 dask_expr/_dispatch.py diff --git a/dask_expr/_dispatch.py b/dask_expr/_dispatch.py new file mode 100644 index 00000000..dee5178e --- /dev/null +++ b/dask_expr/_dispatch.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from dask.utils import Dispatch + +get_collection_type = Dispatch("get_collection_type") From cdea6d5a69d64363640cadadddb787d8712a7cb9 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 16 Jan 2024 12:58:45 -0800 Subject: [PATCH 07/13] make sure backend is imported --- dask_expr/_collection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index fab5d3a7..0a1fb7f6 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -3102,6 +3102,8 @@ def __array__(self): def new_collection(expr): """Create new collection from an expr""" + # Make sure "pandas" backend is imported + import dask_expr._backends # noqa: F401 from dask_expr._dispatch import get_collection_type meta = expr._meta From cde64ad647906f2e371c901469c3be61fa81b926 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 16 Jan 2024 14:33:39 -0800 Subject: [PATCH 08/13] register from_dict dispatch to enable testing --- dask_expr/_backends.py | 2 +- dask_expr/_collection.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 8529b2ea..70e0a71b 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -17,7 +17,7 @@ def dispatch(self, backend: str): try: impl = self._lookup[backend] except KeyError: - entrypoints = detect_entrypoints(f"dask-expr.{self._module_name}.backends") + entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends") if backend in entrypoints: return self.register_backend(backend, entrypoints[backend].load()()) else: diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 0a1fb7f6..ebbefcfe 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -54,12 +54,15 @@ from pandas.api.types import is_timedelta64_dtype from tlz import first +import dask_expr._backends # noqa: F401 from dask_expr import _expr as expr from dask_expr._align import AlignPartitions +from dask_expr._backends import dataframe_creation_dispatch from dask_expr._categorical import CategoricalAccessor, Categorize, GetCategories from dask_expr._concat import Concat from dask_expr._datetime import DatetimeAccessor from dask_expr._describe import DescribeNonNumeric, DescribeNumeric +from dask_expr._dispatch import get_collection_type from dask_expr._expr import ( BFill, Diff, @@ -3102,10 +3105,6 @@ def __array__(self): def new_collection(expr): """Create new collection from an expr""" - # Make sure "pandas" backend is imported - import dask_expr._backends # noqa: F401 - from dask_expr._dispatch import get_collection_type - meta = expr._meta expr._name # Ensure backend is imported return get_collection_type(meta)(expr) @@ -3175,6 +3174,7 @@ def from_graph(*args, **kwargs): return new_collection(FromGraph(*args, **kwargs)) +@dataframe_creation_dispatch.register_inplace("pandas") def from_dict( data, npartitions, From 4521d7c2b534319ca9f14f2e169372bdf3245d6f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 18 Jan 2024 09:54:16 -0800 Subject: [PATCH 09/13] remove DXCreationDispatch --- dask_expr/_backends.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 70e0a71b..2a9b93e0 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -6,28 +6,10 @@ from dask_expr._dispatch import get_collection_type - -class DXCreationDispatch(CreationDispatch): - """Dask-expressions version of CreationDispatch""" - - # TODO Remove after https://github.com/dask/dask/pull/10794 - def dispatch(self, backend: str): - from dask.backends import detect_entrypoints - - try: - impl = self._lookup[backend] - except KeyError: - entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends") - if backend in entrypoints: - return self.register_backend(backend, entrypoints[backend].load()()) - else: - return impl - raise ValueError(f"No backend dispatch registered for {backend}") - - -dataframe_creation_dispatch = DXCreationDispatch( +dataframe_creation_dispatch = CreationDispatch( module_name="dataframe", default="pandas", + entrypoint_root="dask_expr", entrypoint_class=DataFrameBackendEntrypoint, name="dataframe_creation_dispatch", ) From 0694d39a4ab8a9db4739b3068ddca61733bfbd77 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 18 Jan 2024 10:07:11 -0800 Subject: [PATCH 10/13] roll back simplification for now --- dask_expr/_backends.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 2a9b93e0..a325c5ba 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -6,7 +6,26 @@ from dask_expr._dispatch import get_collection_type -dataframe_creation_dispatch = CreationDispatch( + +class DXCreationDispatch(CreationDispatch): + """Dask-expressions version of CreationDispatch""" + + # TODO Remove after https://github.com/dask/dask/pull/10794 + def dispatch(self, backend: str): + from dask.backends import detect_entrypoints + + try: + impl = self._lookup[backend] + except KeyError: + entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends") + if backend in entrypoints: + return self.register_backend(backend, entrypoints[backend].load()()) + else: + return impl + raise ValueError(f"No backend dispatch registered for {backend}") + + +dataframe_creation_dispatch = DXCreationDispatch( module_name="dataframe", default="pandas", entrypoint_root="dask_expr", From 4681adf0ab5ef51ea1d4b6abb21858ba012c377c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 18 Jan 2024 10:09:24 -0800 Subject: [PATCH 11/13] roll back simplification for now (second attempt) --- dask_expr/_backends.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index a325c5ba..70e0a71b 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -28,7 +28,6 @@ def dispatch(self, backend: str): dataframe_creation_dispatch = DXCreationDispatch( module_name="dataframe", default="pandas", - entrypoint_root="dask_expr", entrypoint_class=DataFrameBackendEntrypoint, name="dataframe_creation_dispatch", ) From aa54da8d6aa52e024eb04e18c86b374acc1a0214 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 18 Jan 2024 10:30:12 -0800 Subject: [PATCH 12/13] expose get_collection_type in a public place --- dask_expr/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_expr/__init__.py b/dask_expr/__init__.py index 55da24c2..0eebc0d1 100644 --- a/dask_expr/__init__.py +++ b/dask_expr/__init__.py @@ -1,5 +1,6 @@ from dask_expr import _version, datasets from dask_expr._collection import * +from dask_expr._dispatch import get_collection_type from dask_expr._dummies import get_dummies from dask_expr.io._delayed import from_delayed from dask_expr.io.bag import to_bag From d4319ecc0c93a6a2c514b86993ad312f8f1dd620 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 19 Jan 2024 06:20:35 -0800 Subject: [PATCH 13/13] remove DXCreationDispatch again --- dask_expr/_backends.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 70e0a71b..2a9b93e0 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -6,28 +6,10 @@ from dask_expr._dispatch import get_collection_type - -class DXCreationDispatch(CreationDispatch): - """Dask-expressions version of CreationDispatch""" - - # TODO Remove after https://github.com/dask/dask/pull/10794 - def dispatch(self, backend: str): - from dask.backends import detect_entrypoints - - try: - impl = self._lookup[backend] - except KeyError: - entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends") - if backend in entrypoints: - return self.register_backend(backend, entrypoints[backend].load()()) - else: - return impl - raise ValueError(f"No backend dispatch registered for {backend}") - - -dataframe_creation_dispatch = DXCreationDispatch( +dataframe_creation_dispatch = CreationDispatch( module_name="dataframe", default="pandas", + entrypoint_root="dask_expr", entrypoint_class=DataFrameBackendEntrypoint, name="dataframe_creation_dispatch", )