Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If a NetCDF file is chunked on disk, open it with compatible dask chunks #1440

Closed
Zac-HD opened this issue Jun 3, 2017 · 26 comments · Fixed by #7948
Closed

If a NetCDF file is chunked on disk, open it with compatible dask chunks #1440

Zac-HD opened this issue Jun 3, 2017 · 26 comments · Fixed by #7948

Comments

@Zac-HD
Copy link
Contributor

Zac-HD commented Jun 3, 2017

NetCDF4 data can be saved as chunks on disk, which has several benefits including efficient reads when using a compatible chunk shape. This is particularly important for files with chunk-based compression (ie all nc4 files with compression) or on HPC and parallel file systems (eg), where IO is typically dominated by the number of reads and chunks-from-disk are often cached. Caches are also common in network data backends such as Thredds OPeNDAP, in which case using disk-compatible chunks will reduce cache pressure as well as latency.

Xarray can use chunks, of course, but as of v0.9 the chunk size has to be specified manually - and the easiest way to discover it is to open the file and look at the _Chunksizes attribute for each variable. I propose that xr.open_dataset (and array, and mfdataset) change their default behaviour.

If Dask is available and chunks=None (the default), chunks should be taken from the file on disk. This may lead to a chunked or unchunked dataset. To force an un-chunked load, users can specify chunks={}, or simple .load() the dataset after opening it.

@shoyer
Copy link
Member

shoyer commented Jun 4, 2017

My main concern is that netCDF4 chunk sizes (e.g., ~10-100KB in that blog post) are often much smaller than well sized dask chunks (10-100MB, per the Dask FAQ).

I do think it would be appropriate to issue a warning if you are making dask chunks that don't line up nicely with chunks on disk to avoid performance issues (in general each chunk on disk should usually end up on only one chunk in dask), but there are lots of options for aggregating to larger chunks and it's hard to choose the best way to do that without knowing how the data will be used.

@jhamman
Copy link
Member

jhamman commented Jun 6, 2017

I'd certainly support a warning when dask chunks do not align with the on-disk chunks.

Beyond that, I think we could work on a utility for automatically determining chunks sizes for xarray using some heuristics. Before we go there though, I think we really should develop some performance benchmarks. We're starting to get a lot of questions/issues about performance and it seems like we need some benchmarking to happen before we can really start fixing the underlying issues.

@shoyer
Copy link
Member

shoyer commented Jun 6, 2017

I think its unavoidable that users understand how their data will be processed (e.g., whether operations will be mapped over time or space). But maybe some sort of heuristics (if not a fully automated solution) are possible.

For example, maybe chunks={'time'} (note the set rather than a dict) could indicate "divide me into automatically chosen chunks over the time dimension". It's still explicit about how chunking is being done, but comes closer to expressing the intent rather than the details.

@Zac-HD
Copy link
Contributor Author

Zac-HD commented Jun 7, 2017

I'd certainly support a warning when dask chunks do not align with the on-disk chunks.

This sounds like a very good idea to me 👍

I think its unavoidable that users understand how their data will be processed (e.g., whether operations will be mapped over time or space). But maybe some sort of heuristics (if not a fully automated solution) are possible.

I think that depends on the size of the data - a very common workflow in our group is to open some national-scale collection, select a small (MB to low GB) section, and proceed with that. At this scale we only use chunks because many of the input files are larger than memory, and shape is basically irrelevant -
chunks avoid loading anything until after selecting the subset (I think this is related to #1396).

It's certainly good to know the main processing dimensions though, and user-guided chunk selection heuristics could take us a long way - I actually think a dimension hint and good heuristics are likely to perform better than most users (who are not experts and have not profiled their performance).

The set notation is also very elegant, but I wonder about the interpretation. With chunks=, I specify how to break up the data - and any omitted dimensions are not chunked. For the hint, I'd expect to express which dimension(s) to keep - ie {'lat', lon'} should indicate that my analysis is mostly spatial, rather than mostly not. Maybe we could use a string (eg time for timeseries or lat lon for spatial) instead of a set to specify large chunk dimensions?

@JanisGailis
Copy link

We had a similar issue some time ago. We use xr.open_mfdataset to open long time series of data, where each time slice is a single file. In this case each file becomes a single dask chunk, which is appropriate for most data we have to work with (ESA CCI datasets).

We encountered a problem, however, with a few datasets that had very significant compression levels, such that a single file would fit in memory, but not a few of them, on a consumer-ish laptop. So, the machine would quickly run out of memory when working with the opened dataset.

As we have to be able to open 'automatically' all ESA CCI datasets, manually denoting the chunk sizes was not an option, so we explored a few ways how to do this. Aligning the chunk sizes with NetCDF chunking was not a great idea because of the reason shoyer mentions above. The chunk sizes for some datasets would be too small and the bottleneck moves from memory consumption to the amount of read/write operations.

We eventually figured (with help from shoyer :)) that the chunks should be small enough to fit in memory on an average user's laptop. yet as big as possible to maximize the amount of NetCDF chunks falling nicely in the dask chunk. Also, shape of the dask chunk can be of importance to maximize the amount of NetCDF chunks falling nicely in. We figured it's a good guess to divide both lat and lon dimensions by the same divisor, as that's also how NetCDF is often chunked.

So, we open the first file, determine it's 'uncompressed' size and then figure out if we should chunk it as 1, 2x2, 3x3, etc. It's far from a perfect solution, but it works in our case. Here's how we have implemented this:
https://github.com/CCI-Tools/cate-core/blob/master/cate/core/ds.py#L506

@Zac-HD
Copy link
Contributor Author

Zac-HD commented Jun 8, 2017

I love a real-world example 😄 This sounds pretty similar to how I'm thinking of doing it, with a few caveats - mostly that cate assumes that the data is 3D, has lat and lon, single time step, spatial dimensions wholly divisible some small N. Obviously this is fine for CCI data, but not generally true of things Xarray might open.

Taking a step back for a moment, chunks are great for avoiding out-of-memory errors, faster processing of reorderable operations, and efficient indexing. The overhead is not great when data is small or chunks are small, it's bad when a single on-disk chunk is on multiple dask chunks, and very bad when a dask chunk includes several files. (of course all of these are generalisations with pathological cases, but IMO good enough to build some heuristics on)

With that in mind, here's how I'd decide whether to use the heuristic:

  • If chunks is a dict, never use heuristic (always use explicit user chunks)
  • If chunks is a hint, eg set or list as discussed above, or a later proposal, always use heuristic mode - guided by the hint, of course. Files which may otherwise default to non-heuristic or non-chunking mode (eg in mfdataset) could use eg. the empty set to activate the heuristics without hints.
  • If chunks is None, and the uncompressed data is above a size threshold (eg 500MB, 1GB), use chunks given by the heuristic

Having decided to use a heuristic, we know the array shape and dimensions, the chunk shape if any, and the hint if any:

  • Start by selecting a maximum nbytes for the chunk, eg 250 MB
  • If the total array nbytes is <= max_nbytes, use a single chunk for the whole thing; return
  • If the array is stored as (a, b, ...) chunks on disk, our dask chunks must be (m.a, n.b, ...), ie each dimension has some independent integer multiple.
    • Loop over dimensions not to chunk (per above, either those not in the set, or those in a string), adding one to the respective multiplier. Alternatively if the file is now five or less dask chunks across, simply divide the on-disk chunks into 4, 3, 2, or 1 dask chunks (avoiding a dimension of 1.1 chunks, etc)
    • For each step, if this would make the chunk larger than max_nbytes, return.
    • Repeat the loop for remaining dimensions.
  • If the array is not chunked on disk, increase a divisor for each dimension to chunk until the chunk size is <= max_nbytes and return.

It's probably a good idea to constrain this further, so that the ratio of chunk edge length along dimensions should not exceed the greater of 100:1 or four times the ratio of chunks on disk (I don't have universal profiling to back this up, but it's always worked well for me). This will mitigate the potentially-very-large effects of dimension order, especially in unchunked files or large chunks.

For datasets (as opposed to arrays), I'd calculate chunks once for the largest dtype and just reuse that shape.

@JanisGailis
Copy link

I quite like the approach you're suggesting! What I dislike the most currently with our approach is that it is a real possibility that a single netCDF chunk falls into multiple dask chunks, we don't control for that in any way! I'd happily swap our approach out to the more general one you suggest.

This does of course beg for input regarding the API constraints, as in, would it be a good idea to add more kwargs for chunk size threshold and edge ratio to open functions?

@Zac-HD
Copy link
Contributor Author

Zac-HD commented Jun 8, 2017

🎉

My view is actually that anyone who can beat the default heuristic should just specify their chunks - you'd already need a good sense for the data and your computation (and the heuristic!). IMO, the few cases where tuning is desirable - but manual chunks are impractical - don't justify adding yet another kwarg to the fairly busy interfaces.

@matt-long
Copy link

I have encountered a related issue here. When I read a file with netCDF4 compression into a Dataset, a subsequent call to write the dataset using to_netcdf fails.

For instance, using data from the POP model, I can convert output to netCDF4 using NCO

$ ncks --netcdf4 --deflate 1 $file nc4-test.nc

Then in Python:

ds = xr.open_dataset('nc4-test.nc',decode_times=False,decode_coords=False)
ds.to_netcdf('test-out.nc')

The write fails with: "RuntimeError: NetCDF: Bad chunk sizes."

If I include format = NETCDF3_64BIT, the write completes.

This seems like a bug.

@jhamman
Copy link
Member

jhamman commented Jun 15, 2017

@Zac-HD - I'm about to put up a PR with some initial benchmarking functionality (#1457). Are you open to putting together PR for the features you've described above? Hopefully, these two can work together.

As for the API changes related to this issue, I'd propose the following:

Use the chunks keyword to support 3 additional options

def open_dataset(filename_or_obj, ..., chunks=None, ...):
    """Load and decode a dataset from a file or file-like object.
    Parameters
    ----------
     ....
    chunks : int or dict or set or 'auto' or 'disk', optional
        If chunks is provided, it used to load the new dataset into dask
        arrays. ``chunks={}`` loads the dataset with dask using a single
        chunk for all arrays.
    ...
    """ 
  • int: chunk each dimension by chunks
  • dict: Dictionary with keys given by dimension names and values given by chunk sizes. In general, these should divide the dimensions of each dataset
  • set (or list or tuple) of str: chunk the dimension(s) provided by some heuristic, try to keep the chunk shape/size compatible with the storage of the data on disk and for use with dask
  • 'auto' (str): chunk the array(s) using some auto-magical heuristic that is compatible with the storage of the data on disk and is semi-optimized (in size) for use with dask
  • 'disk' (str): use the chunksize of the netCDF variable directly.

@Zac-HD
Copy link
Contributor Author

Zac-HD commented Jun 16, 2017

@matt-long, I think that's a separate issue. Please open a new pull request, including a link to data that will let us reproduce the problem.

@jhamman - [updated] I was always keen to work on this if I could make time, but have since changed jobs. However I'd still be happy to help anyone who wants to work on it with design and review.

I definitely want to preserve the exact present semantics of dict arguments (so users have exact control, with a warning if it's incompatible with disk chunks). I may interpret int arguments as a (deprecated) hint though, as that's what it's mostly used for, and will add a fairly limited hints API to start with - more advanced users can just specify exact chunks.

@matt-long
Copy link

@Zac-HD: Thanks, I submitted a separate issue: #1458

@Zac-HD
Copy link
Contributor Author

Zac-HD commented Jun 19, 2017

I've just had a meeting at NCI which has helped clarify what I'm trying to do and how to tell if it's working. This comment is mostly for my own notes, and public for anyone interested. I'll refer to dask chunks as 'blocks' (as in 'blocked algorithms'), and netcdf chunks in a file as 'chunks', to avoid confusion)

The approximate algorithm I'm thinking about is outlined in this comment above. Considerations, in rough order of performance impact, are:

  • No block should include data from multiple files (near-absolute, due to locking - though concurrent read is supported on lower levels?)
  • Contiguous access is critical in un-chunked files - reading the fastest-changing dimension in small parts murders IO performance. It may be important in chunked files too, at the level of chunks, but that's mostly down to Dask and user access patterns.
  • Each chunk, if the file is chunked, must fall entirely into a single block.
  • Blocks should usually be around 100MB, to balance scheduler overhead with memory usage of intermediate results.
  • Blocks should be of even(ish) size - if a dimension is of size 100 with chunks of 30, better to have blocks of 60 at the expense of relative shape than have blocks of 90 with one almost empty.
  • Chunks are cached by underlying libraries, so benchmarks need to clear the caches each run for valid results. Note that this affects IO but typically not decompression of chunks.
  • Contra contiguous access (above), it might be good to prevent very skewed block shapes (ie 1*X*Y or T*1*1). Possibly limiting the lowest edge length of a block (10px?), or limiting the edge ratio of a block (20:1?)
  • If a dimension hint is given (eg 'my analysis will be spatial'), making blocks larger along other dimensions will make whole-file processing more efficient, and subsetting less efficient. (Unless Dask can optimise away loading of part of a block?)

Bottom line, I could come up with something pretty quickly but would perfer to take a little longer to write and explore some benchmarks.

@jhamman
Copy link
Member

jhamman commented Jun 23, 2017

@Zac-HD - thanks for you detailed report.

ping me again when you get started on some benchmarking and feel free to chime in further to #1457.

No block should include data from multiple files (near-absolute, due to locking - though concurrent read is supported on lower levels?)

Hopefully we can find some optimizations that help with this. I routinely want to do this, though I understand why its not always a good idea.

@jhamman
Copy link
Member

jhamman commented Jul 27, 2017

@Zac-HD - We merged #1457 yesterday which should give us a platform to test any improvements we make related to this issue.

@jhamman
Copy link
Member

jhamman commented Jan 19, 2018

cc @kmpaul who wanted to review this conversation.

@stale
Copy link

stale bot commented Jan 11, 2020

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

@stale stale bot added the stale label Jan 11, 2020
@jhamman jhamman removed the stale label Jan 11, 2020
@rabernat
Copy link
Contributor

rabernat commented May 21, 2020

We discussed this issue today in our pangeo coffee break. We think the following plan would be good:

  • Write a function called auto_chunk(variable) which examines a variable for the presence of a chunks attribute in encoding or within the data itself. Returns a new variable with chunked data.
  • Refactor open_zarr to call this function
  • Add it also to open_dataset to enable auto-chunking of netCDF and geotiff data

Should we have an option like chunk_size='native', or chunk_size='100MB', with chunks chosen to align with source chunks.

@dcherian
Copy link
Contributor

should we have an option like chunk_size='native', or chunk_size='100MB'

Can we overload the chunks argument in open_xxx to do this? We are already adding support for chunks="auto" ...

@rabernat
Copy link
Contributor

Can we overload the chunks argument in open_xxx to do this? We are already adding support for chunks="auto" ...

This gets tricky, because we may want slightly different behavior depending on whether the underlying array store is chunked.

@kmpaul
Copy link
Contributor

kmpaul commented May 21, 2020

@rabernat When you say "underlying array store", are you talking about the storage layer? That is, the zarr store or the netcdf file?

It seems to me that the there are lots of "layers" of "chunking", especially when you are talking about chunking an entire dataset, which really confuses the whole issue. On an HPC system, there's filesystem blocksize, NetCDF/HDF5 "internal" chunks, chunking by spreading the data over multiple files, and in-memory chunks (i.e., Dask chunks). I'm not an expert on object store, but my understanding of object store is that (if you are storing NetCDF/HDF5 on object store) there is still "interal" NetCDF/HDF5 "chunking", then chunking over objects/files, and then in-memory chunking.

@rabernat
Copy link
Contributor

It seems to me that the there are lots of "layers" of "chunking", especially when you are talking about chunking an entire dataset,

To simplify a little bit, here we are only talking about reading a single store, i.e. one netcdf file or one zarr group. Also out of scope is the underlying storage medium (e.g. block size).

@stale
Copy link

stale bot commented Apr 28, 2022

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

@stale stale bot added the stale label Apr 28, 2022
@dcherian dcherian removed the stale label Apr 28, 2022
@lskopintseva
Copy link

I have a netCDF file where variables are saved on the disk in chunks and I would like to read my netcdf file using xr.open_dataset in original chunks. I there a way to do it in xarray? Since xarray is built in netCDF4 library, I would expect this feature to be present in xarray as well..

@jhamman
Copy link
Member

jhamman commented Mar 15, 2023

@lskopintseva - This feature has not been implemented in Xarray (yet). In the meantime, you might find something like this helpful:

ds = xr.open_dataset("dataset.nc")
for v in ds.data_vars:
    # get variable chunksizes 
    chunksizes = ds[v].encoding.get('chunksizes', None)
    if chunksizes is not None:
        chunks = dict(zip(ds[v].dims, chunksizes))
        ds[v] = ds[v].chunk(chunks)   # chunk the array using the underlying chunksizes

FWIW, I think this would be a nice feature to add to the netcdf4 and h5netcdf backends in Xarray. Contributions welcome!

@jhamman
Copy link
Member

jhamman commented Sep 12, 2023

Epic! Thanks @mraspaud for finally knocking off this 6-year old issue. 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants