Skip to content

Commit

Permalink
Dask-CuDF: use default Dask Dataframe optimizer (#8581)
Browse files Browse the repository at this point in the history
In order to use the new HighLevelGraph optimization work in Dask/Distributed, this PR makes `dask_cudf.Dataframes` use the default Dask optimizer.
Previously, we have been explicitly materialized the HighLevelGraphs when calling `submit()` and `compute()` on `dask_cudf.Dataframes`.

Overall, this should improve performance but by default low-level task optimizations are disabled, which _might_ have a negative impact. High-level optimizations are done in any case and we are working on moving all low-level optimization to high-level but currently low-level optimization such as array slicing is only supported by the low-level. 

I don't think we will be missing any low-level optimizations related to Dataframes so I think we should follow Dask on this one and disable low-level optimizations by default.
It is possible to enable low-level optimizations explicitly by setting the Dask config like:
```python
dask.config.set({"optimization.fuse.active": True})
```

cc. @jakirkham, @quasiben, @beckernick, @VibhuJawa

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - https://github.com/jakirkham

URL: #8581
  • Loading branch information
madsbk authored Jun 23, 2021
1 parent 671b76d commit 99808ab
Showing 1 changed file with 1 addition and 17 deletions.
18 changes: 1 addition & 17 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
from tlz import partition_all

import dask
import dask.dataframe.optimize
from dask import dataframe as dd
from dask.base import normalize_token, tokenize
from dask.context import _globals
from dask.core import flatten
from dask.dataframe.core import (
Scalar,
finalize,
Expand All @@ -22,7 +21,6 @@
)
from dask.dataframe.utils import raise_on_meta_error
from dask.highlevelgraph import HighLevelGraph
from dask.optimization import cull, fuse
from dask.utils import M, OperatorMethodMixin, apply, derived_from, funcname

import cudf
Expand All @@ -34,19 +32,6 @@
DASK_VERSION = LooseVersion(dask.__version__)


def optimize(dsk, keys, **kwargs):
flatkeys = list(flatten(keys)) if isinstance(keys, list) else [keys]
dsk, dependencies = cull(dsk, flatkeys)
dsk, dependencies = fuse(
dsk,
keys,
dependencies=dependencies,
ave_width=_globals.get("fuse_ave_width", 1),
)
dsk, _ = cull(dsk, keys)
return dsk


class _Frame(dd.core._Frame, OperatorMethodMixin):
""" Superclass for DataFrame and Series
Expand All @@ -65,7 +50,6 @@ class _Frame(dd.core._Frame, OperatorMethodMixin):
"""

__dask_scheduler__ = staticmethod(dask.get)
__dask_optimize__ = staticmethod(optimize)

def __dask_postcompute__(self):
return finalize, ()
Expand Down

0 comments on commit 99808ab

Please sign in to comment.