Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid in-memory broadcasting when converting to_dask_dataframe #7472

Merged
merged 8 commits into from
Jan 26, 2023
31 changes: 30 additions & 1 deletion xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6389,6 +6389,28 @@ def to_dask_dataframe(

import dask.array as da
import dask.dataframe as dd
from dask.base import tokenize
from dask.core import flatten
from dask.highlevelgraph import HighLevelGraph

def ravel_chunks(arr: da.Array) -> da.Array:
"""
Return a flattened array.

https://github.com/dask/dask/issues/4855

"""

name = "ravel_chunks-" + tokenize(arr)
chunks = (tuple(map(math.prod, itertools.product(*arr.chunks))),)
dsk = {
(name, i): (methodcaller("ravel"), k)
for i, k in enumerate(flatten(arr.__dask_keys__()))
}
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[arr])
res = da.Array(graph, name, chunks, arr.dtype)

return res

ordered_dims = self._normalize_dim_order(dim_order=dim_order)

Expand All @@ -6410,8 +6432,15 @@ def to_dask_dataframe(
if isinstance(var, IndexVariable):
var = var.to_base_variable()

# Make sure var is a dask array, otherwise the array can become too large
dcherian marked this conversation as resolved.
Show resolved Hide resolved
# when it is broadcasted to several dimensions:
if var.chunks is None:
Illviljan marked this conversation as resolved.
Show resolved Hide resolved
var = var.chunk()

dask_array = var.set_dims(ordered_dims).chunk(self.chunks).data
series = dd.from_array(dask_array.reshape(-1), columns=[name])
dask_array_raveled = ravel_chunks(dask_array)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dask_array_raveled = ravel_chunks(dask_array)

Unfortunately we can't do this, at least not by default.

We could ask dask to add this behaviour as an opt-in kwarg for dask.dataframe.from_array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come?

If we go back to using .reshape(-1) or .ravel() we will continue getting this warning:

PerformanceWarning: Reshaping is producing a large chunk. To accept the large
chunk and silence this warning, set the option
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    array.reshape(shape)

To avoid creating the large chunks, set the option
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    array.reshape(shape)Explictly passing ``limit`` to ``reshape`` will also silence this warning
array.reshape(shape, limit='128 MiB')
  exec_fun(compile(ast_code, filename, 'exec'), ns_globals, ns_locals)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reshape/ravel have an implied order. With this change the ordering of rows in the output dataframe depends on the chunking of the input array, which would be confusing as default behaviour

I think the warning is fine. Users can override with the dask context manager as suggested in the warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll undo it, it wasn't necessary for the real fix anyway.

For comparison, here's the df.visualization():

With reshape:
image

with reshape with context {'array.slicing.split_large_chunks': True}:
image

With ravel_chunks:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it's a big improvement. If you're dying to add it someplace, polyfit would be a good candidate (and very impactful PR).

series = dd.from_array(dask_array_raveled, columns=[name])

series_list.append(series)

df = dd.concat(series_list, axis=1)
Expand Down