Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notebook crashes after calling .to_dask_dataframe #6811

Closed
4 tasks done
lewfish opened this issue Jul 19, 2022 · 8 comments · Fixed by #7472
Closed
4 tasks done

Notebook crashes after calling .to_dask_dataframe #6811

lewfish opened this issue Jul 19, 2022 · 8 comments · Fixed by #7472

Comments

@lewfish
Copy link

lewfish commented Jul 19, 2022

What happened?

We are trying to convert a 17gb Zarr dataset to Parquet using xArray by calling xr.to_dask_dataframe and then ddf.to_parquet. When calling to_dask_dataframe the notebook crashes with "Kernel Restarting: The kernel for debug/minimal.ipynb appears to have died. It will restart automatically." We also find this occurs when using a synthetic dataset of the same size which we create in the example below.

What did you expect to happen?

We expected a Dask dataframe object to be created lazily and not crash the notebook. We expected the operation to be lazy based on the source code, and the following line in the docs "For datasets containing dask arrays where the data should be lazily loaded, see the Dataset.to_dask_dataframe() method."

Minimal Complete Verifiable Example

import dask.array as da
import xarray as xr
import numpy as np

chunks = 5000
dim1_sz = 100_000
dim2_sz = 100_000

# Does not crash when using the following constants.
'''
dim1_sz = 10_000
dim2_sz = 10_000
'''

ds = xr.Dataset({
    'x': xr.DataArray(
        data   = da.random.random((dim1_sz, dim2_sz), chunks=chunks),
        dims   = ['dim1', 'dim2'],
        coords = {'dim1': np.arange(0, dim1_sz), 'dim2': np.arange(0, dim2_sz)})})

df = ds.to_dask_dataframe()
df

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.

Relevant log output

No response

Anything else we need to know?

This operation crashes when the size of the array is above some (presumably machine specific) threshold, and works below it. You may need to play with the array size to replicate this behavior.

Environment

INSTALLED VERSIONS

commit: None
python: 3.9.12 | packaged by conda-forge | (main, Mar 24 2022, 23:25:59)
[GCC 10.3.0]
python-bits: 64
OS: Linux
OS-release: 5.4.196-108.356.amzn2.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: C.UTF-8
LANG: C.UTF-8
LOCALE: ('en_US', 'UTF-8')
libhdf5: None
libnetcdf: None

xarray: 2022.3.0
pandas: 1.4.2
numpy: 1.22.3
scipy: None
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.12.0
cftime: None
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2022.05.0
distributed: 2022.5.0
matplotlib: 3.5.2
cartopy: None
seaborn: None
numbagg: None
fsspec: 2021.11.0
cupy: None
pint: None
sparse: None
setuptools: 62.3.1
pip: 22.1
conda: None
pytest: None
IPython: 8.3.0
sphinx: None

@lewfish lewfish added bug needs triage Issue that has not been reviewed by xarray team member labels Jul 19, 2022
@vlulla
Copy link

vlulla commented Jul 28, 2022

I ran this script in an ipython session on r5a.4xlarge instance which ran successfully! While this script was running I had another window with htop open and was surprised to observe that at one point during this script the memory usage was quite high (about 75 gb of memory). After the script completed, with a usable ipython repl prompt, the memory usage was down to 4gb (htop) and I was able to access the dataframe df and run some computations on it (df.head and df.groupby).

It appears that I'm mistaken in my understanding of xarray. I thought that xarray creates the Dataset/DataArray lazily but the high memory usage (for a brief time) indicates that that is not the case. This brings up yet another question: how come the memory usage is down to 4 gb (from htop) when the script concludes? And, how can I access all these values in my eventual calculations? For instance, running df.loc[df.dim1 < 1000, ['dim1','x']].groupby('x').mean().compute() runs with peak memory usage (as reported by htop) of about 25 gb!

I have been unable to find answers to my questions in the documentation. Can you please point me to docs (user or developer) which can help me clear my misunderstandings?

Thanks in advance!

@dcherian
Copy link
Contributor

You can check whether it computes using the following

from xarray.tests import raise_if_dask_computes

with raise_if_dask_computes():
    ds.to_dask_dataframe()

Does that raise an error?

@dcherian dcherian added topic-dask and removed bug labels Jul 28, 2022
@vlulla
Copy link

vlulla commented Jul 28, 2022

No, i don't think this raised an error. This is what I see in my ipython session (from running ipython -i tst.py):

In [1]: from xarray.tests import raise_if_dask_computes                                                                                     
   ...:                                                                                                                                     
   ...: with raise_if_dask_computes():                                                                                                      
   ...:     ds.to_dask_dataframe()                                                                                                          
   ...:                                                                                                                                     
/home/ubuntu/mambaforge/lib/python3.10/site-packages/IPython/core/interactiveshell.py:3338: PerformanceWarning: Reshaping is producing a lar
ge chunk. To accept the large                                                                                                               
chunk and silence this warning, set the option                                                                                              
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):                                                                
    ...     array.reshape(shape)                                                                                                            
                                                                                                                                            
To avoid creating the large chunks, set the option                                                                                          
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):                                                                 
    ...     array.reshape(shape)Explictly passing ``limit`` to ``reshape`` will also silence this warning                                   
    >>> array.reshape(shape, limit='128 MiB')                                                                                               
  if await self.run_code(code, result, async_=asy):                                                                                         
/home/ubuntu/mambaforge/lib/python3.10/site-packages/IPython/core/interactiveshell.py:3338: PerformanceWarning: Reshaping is producing a lar
ge chunk. To accept the large                                                                                                               
chunk and silence this warning, set the option                                                                                              
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):                                                                
    ...     array.reshape(shape)                                                                                                            
                                                                                                                                            
To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array.reshape(shape)Explictly passing ``limit`` to ``reshape`` will also silence this warning
    >>> array.reshape(shape, limit='128 MiB')
  if await self.run_code(code, result, async_=asy):
/home/ubuntu/mambaforge/lib/python3.10/site-packages/IPython/core/interactiveshell.py:3338: PerformanceWarning: Reshaping is producing a lar
ge chunk. To accept the large
chunk and silence this warning, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array.reshape(shape)

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array.reshape(shape)Explictly passing ``limit`` to ``reshape`` will also silence this warning
    >>> array.reshape(shape, limit='128 MiB')
  if await self.run_code(code, result, async_=asy):

In [2]:                             

@dcherian
Copy link
Contributor

Well then it isn't computing.

Depending on the shape of your DataArray, there's potentially a reshape involved which can be expensive in parallel: https://docs.dask.org/en/stable/array-chunks.html#reshaping . It does manifest as large memory usage.

@vlulla
Copy link

vlulla commented Jul 28, 2022

Hmm...i read! If it (what is the "it" here? dask or xarray?) isn't computing, what are the values for x that I get in df.head()/df.tail()?

I tried

ds = xr.Dataset({
    'x': xr.DataArray(
        data   = da.random.random((dim1_sz, dim2_sz), chunks='auto'),
        dims   = ['dim1', 'dim2'],
        coords = {'dim1': np.arange(0, dim1_sz), 'dim2': np.arange(0, dim2_sz)})})

which chose the chunk size of (4000, 4000) and ds.to_daskdataframe() still used about 75 gb of ram (during some part of the computation). So, it appears that to convert a large Dataset/DataArray into a dask dataframe will require having a scheduler (assuming that the "it" above is dask) with sizeable amount of ram. Is that correct?

@lewfish
Copy link
Author

lewfish commented Aug 3, 2022

Using chunks of shape (4000, 4000) isn't very different than what we were using originally (10_000, 10_000), so I'm not surprised the results are the same. After reading https://docs.dask.org/en/stable/array-chunks.html#reshaping I thought we could avoid the problem they discuss by making the chunks take up the entire width of the array using chunks=(100, -1). That didn't help the problem either.

@dcherian
Copy link
Contributor

I thought we could avoid the problem they discuss by making the chunks take up the entire width of the array using chunks=(100, -1)

This is surprising. The reshape is the expensive step but with this chunking it should be operating blockwise.

@BasileGoussard
Copy link

BasileGoussard commented Dec 9, 2022

We want mainly to do the same thing as @lewfish .
We got a tiff file and we want to save it as parquet (in order to push it on bigquery).
However, I had the same issue by running the following code

path = 'big_file.tif' # around 8Gb

# Open the dataset
dst = xr.open_dataset(path, engine = 'rasterio', chunks={"band": -1, "x": 'auto', "y": 'auto'})

# Transform to a daskdataframe
dd = dst.to_dask_dataframe()

(I try lot of different possible chunks ...)
And its crashes (I reach 90Gb of RAM {htop})
Looking forward to some advise.

@dcherian dcherian removed the needs triage Issue that has not been reviewed by xarray team member label Jan 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants