-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add basic backend dispatching to dask-expr #728
Conversation
…to creation-dispatch
@phofl - I know you are busy with many things, but I'd be interested to know if you have any thoughts/concerns about this PR. My hope is that there is nothing controversial here: We are reusing the existing dispatching mechanisms and conventions from |
dask_expr/_backends.py
Outdated
try: | ||
impl = self._lookup[backend] | ||
except KeyError: | ||
entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason we cannot use CreationDispatch
directly is the fact that dask_cudf
is already using the "dask.dataframe.backends"
path to expose the "cudf"
backend entrypoint for dask.dataframe
.
We can just use CreationDispatch
directly after dask/dask#10794 makes it possible to modify the entrypoint path upon initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, removed this now that dask/dask#10794 is in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that - I should probably wait for the next dask/dask release to remove DXCreationDispatch
dask_expr/_backends.py
Outdated
dataframe_creation_dispatch = DXCreationDispatch( | ||
module_name="dataframe", | ||
default="pandas", | ||
entrypoint_class=DataFrameBackendEntrypoint, | ||
name="dataframe_creation_dispatch", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we want to introduce a new CreationDispatch
object for dask-expr, because we want to distinguish "legacy" dask-dataframe creation functions from newer expression-based creation functions (to be discovered at the "dask_expr.dataframe.backend"
entrypoint path instead of "dask.dataframe.backend"
).
In both legacy and expr-based code, the same "dask.dataframe.backend"
config value is used to specify the default backend.
def new_collection(expr): | ||
"""Create new collection from an expr""" | ||
|
||
meta = expr._meta | ||
expr._name # Ensure backend is imported | ||
if is_dataframe_like(meta): | ||
return DataFrame(expr) | ||
elif is_series_like(meta): | ||
return Series(expr) | ||
elif is_index_like(meta): | ||
return Index(expr) | ||
else: | ||
return Scalar(expr) | ||
return get_collection_type(meta)(expr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we have been using new_collection
from the beginning, because we knew we needed to make this change to support other backends (i.e. cudf) long term.
The behavior of new_collection
is now consistent with the behavior of new_dd_object
in dask.dataframe
.
|
||
from dask.utils import Dispatch | ||
|
||
get_collection_type = Dispatch("get_collection_type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we will need many dispatch functions here. So, we could also define this in dask.datafame.dispatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now publicly exposed as dask_expr.get_collection_type
. This can always be moved to dask.datafame.dispatch
in the future.
I've disabled released ci because we enabled a couple of other things that aren't in released dask yet. You can remove everything that depends on released here and wouldn't be necessary with main now. I don't intend to release before the next dask release anyway, so testing against the released version isn't very important |
Awesome. Thanks! |
thx |
Mostly addresses #15027 dask/dask-expr#728 exposed the necessary mechanisms for us to define a custom dask-expr backend for `cudf`. The new dispatching mechanisms are effectively the same as those in `dask.dataframe`. The only difference is that we are now registering/implementing "expression-based" collections. This PR does the following: - Defines a basic `DataFrameBackendEntrypoint` class for collection creation, and registers new collections using `get_collection_type`. - Refactors the `dask_cudf` import structure to properly support the `"dataframe.query-planning"` configuration. - Modifies CI to test dask-expr support for some of the `dask_cudf` tests. This coverage can be expanded in follow-up work. ~**Experimental Change**: This PR patches `dask_expr._expr.Expr.__new__` to enable type-based dispatching. This effectively allows us to surgically replace problematic `Expr` subclasses that do not work for cudf-backed data. For example, this PR replaces the upstream `TakeLast` expression to avoid using `squeeze` (since this method is not supported by cudf). This particular fix can be moved upstream relatively easily. However, having this kind of "patching" mechanism may be valuable for more complicated pandas/cudf discrepancies.~ ## Usage example ```python from dask import config config.set({"dataframe.query-planning": True}) import dask_cudf df = dask_cudf.DataFrame.from_dict( {"x": range(100), "y": [1, 2, 3, 4] * 25, "z": ["1", "2"] * 50}, npartitions=10, ) df["y2"] = df["x"] + df["y"] agg = df.groupby("y").agg({"y2": "mean"})["y2"] agg.simplify().pprint() ``` Dask cuDF should now be using dask-expr for "query planning": ``` Projection: columns='y2' GroupbyAggregation: arg={'y2': 'mean'} observed=True split_out=1'y' Assign: y2= Projection: columns=['y'] FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y'] Add: Projection: columns='x' FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y'] Projection: columns='y' FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y'] ``` ## TODO - [x] Add basic tests - [x] Confirm that general design makes sense **Follow Up Work**: - Expand dask-expr test coverage - Fix local and upstream bugs - Add documentation once "critical mass" is reached Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Lawrence Mitchell (https://github.com/wence-) - Vyas Ramasubramani (https://github.com/vyasr) - Bradley Dice (https://github.com/bdice) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Ray Douglass (https://github.com/raydouglass) URL: #14805
It seems like we will temporarily need to allow external libraries (i.e. cudf) to register a
DaskBackendEntrypoint
for bothdask.dataframe
anddask_expr
. I think the easiest approach is to let cudf use a"dask-expr.dataframe.backend"
entrypoint path for dask-expr (and continue using"dask.dataframe.backend"
in the legacydask.dataframe
API).This PR also introduces a simple
get_collection_type
dispatch function (the dask-expr version ofget_parallel_type
). This completely breaks cudf support for now, but dask-expr + cudf is already broken, and this is probably the only way to fix it anyway.NOTE: This PR does not introduce any new dispatching mechanisms (like #321). It just adds enough code to support the existing dispatching mechanisms used in
dask.dataframe
.