Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out distributed writes in zarr docs #9132

Merged
merged 4 commits into from
Jun 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 91 additions & 84 deletions doc/user-guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,65 @@ instance and pass this, as follows:
.. _Google Cloud Storage: https://cloud.google.com/storage/
.. _gcsfs: https://github.com/fsspec/gcsfs

.. _io.zarr.distributed_writes:

Distributed writes
Copy link
Contributor

@dcherian dcherian Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this title; I think this section should mention that to_zarr with a dask-backed dataset will do a parallel and/or distributed write depending on how your dask cluster is set up.

region is a more advanced use-case, we want to describe the simpler case first.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes great idea

~~~~~~~~~~~~~~~~~~

Xarray will natively use dask to write in parallel to a zarr store, which should
satisfy most moderately sized datasets. For more flexible parallelization, we
can use ``region`` to write to limited regions of arrays in an existing Zarr
store.

To scale this up to writing large datasets, first create an initial Zarr store
without writing all of its array data. This can be done by first creating a
``Dataset`` with dummy values stored in :ref:`dask <dask>`, and then calling
``to_zarr`` with ``compute=False`` to write only metadata (including ``attrs``)
to Zarr:

.. ipython:: python
:suppress:

! rm -rf path/to/directory.zarr

.. ipython:: python

import dask.array

# The values of this dask array are entirely irrelevant; only the dtype,
# shape and chunks are used
dummies = dask.array.zeros(30, chunks=10)
ds = xr.Dataset({"foo": ("x", dummies)}, coords={"x": np.arange(30)})
path = "path/to/directory.zarr"
# Now we write the metadata without computing any array values
ds.to_zarr(path, compute=False)

Now, a Zarr store with the correct variable shapes and attributes exists that
can be filled out by subsequent calls to ``to_zarr``.
Setting ``region="auto"`` will open the existing store and determine the
correct alignment of the new data with the existing dimensions, or as an
explicit mapping from dimension names to Python ``slice`` objects indicating
where the data should be written (in index space, not label space), e.g.,

.. ipython:: python

# For convenience, we'll slice a single dataset, but in the real use-case
# we would create them separately possibly even from separate processes.
ds = xr.Dataset({"foo": ("x", np.arange(30))}, coords={"x": np.arange(30)})
# Any of the following region specifications are valid
ds.isel(x=slice(0, 10)).to_zarr(path, region="auto")
ds.isel(x=slice(10, 20)).to_zarr(path, region={"x": "auto"})
ds.isel(x=slice(20, 30)).to_zarr(path, region={"x": slice(20, 30)})

Concurrent writes with ``region`` are safe as long as they modify distinct
chunks in the underlying Zarr arrays (or use an appropriate ``lock``).

As a safety check to make it harder to inadvertently override existing values,
if you set ``region`` then *all* variables included in a Dataset must have
dimensions included in ``region``. Other variables (typically coordinates)
need to be explicitly dropped and/or written in a separate calls to ``to_zarr``
with ``mode='a'``.

Zarr Compressors and Filters
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -767,37 +826,6 @@ For example:
Not all native zarr compression and filtering options have been tested with
xarray.

.. _io.zarr.consolidated_metadata:

Consolidated Metadata
~~~~~~~~~~~~~~~~~~~~~

Xarray needs to read all of the zarr metadata when it opens a dataset.
In some storage mediums, such as with cloud object storage (e.g. `Amazon S3`_),
this can introduce significant overhead, because two separate HTTP calls to the
object store must be made for each variable in the dataset.
By default Xarray uses a feature called
*consolidated metadata*, storing all metadata for the entire dataset with a
single key (by default called ``.zmetadata``). This typically drastically speeds
up opening the store. (For more information on this feature, consult the
`zarr docs on consolidating metadata <https://zarr.readthedocs.io/en/latest/tutorial.html#consolidating-metadata>`_.)

By default, xarray writes consolidated metadata and attempts to read stores
with consolidated metadata, falling back to use non-consolidated metadata for
reads. Because this fall-back option is so much slower, xarray issues a
``RuntimeWarning`` with guidance when reading with consolidated metadata fails:

Failed to open Zarr store with consolidated metadata, falling back to try
reading non-consolidated metadata. This is typically much slower for
opening a dataset. To silence this warning, consider:

1. Consolidating metadata in this existing store with
:py:func:`zarr.consolidate_metadata`.
2. Explicitly setting ``consolidated=False``, to avoid trying to read
consolidate metadata.
3. Explicitly setting ``consolidated=True``, to raise an error in this case
instead of falling back to try reading non-consolidated metadata.

.. _io.zarr.appending:

Modifying existing Zarr stores
Expand Down Expand Up @@ -856,59 +884,6 @@ order, e.g., for time-stepping a simulation:
)
ds2.to_zarr("path/to/directory.zarr", append_dim="t")

Finally, you can use ``region`` to write to limited regions of existing arrays
in an existing Zarr store. This is a good option for writing data in parallel
from independent processes.

To scale this up to writing large datasets, the first step is creating an
initial Zarr store without writing all of its array data. This can be done by
first creating a ``Dataset`` with dummy values stored in :ref:`dask <dask>`,
and then calling ``to_zarr`` with ``compute=False`` to write only metadata
(including ``attrs``) to Zarr:

.. ipython:: python
:suppress:

! rm -rf path/to/directory.zarr

.. ipython:: python

import dask.array

# The values of this dask array are entirely irrelevant; only the dtype,
# shape and chunks are used
dummies = dask.array.zeros(30, chunks=10)
ds = xr.Dataset({"foo": ("x", dummies)}, coords={"x": np.arange(30)})
path = "path/to/directory.zarr"
# Now we write the metadata without computing any array values
ds.to_zarr(path, compute=False)

Now, a Zarr store with the correct variable shapes and attributes exists that
can be filled out by subsequent calls to ``to_zarr``.
Setting ``region="auto"`` will open the existing store and determine the
correct alignment of the new data with the existing coordinates, or as an
explicit mapping from dimension names to Python ``slice`` objects indicating
where the data should be written (in index space, not label space), e.g.,

.. ipython:: python

# For convenience, we'll slice a single dataset, but in the real use-case
# we would create them separately possibly even from separate processes.
ds = xr.Dataset({"foo": ("x", np.arange(30))}, coords={"x": np.arange(30)})
# Any of the following region specifications are valid
ds.isel(x=slice(0, 10)).to_zarr(path, region="auto")
ds.isel(x=slice(10, 20)).to_zarr(path, region={"x": "auto"})
ds.isel(x=slice(20, 30)).to_zarr(path, region={"x": slice(20, 30)})

Concurrent writes with ``region`` are safe as long as they modify distinct
chunks in the underlying Zarr arrays (or use an appropriate ``lock``).

As a safety check to make it harder to inadvertently override existing values,
if you set ``region`` then *all* variables included in a Dataset must have
dimensions included in ``region``. Other variables (typically coordinates)
need to be explicitly dropped and/or written in a separate calls to ``to_zarr``
with ``mode='a'``.

.. _io.zarr.writing_chunks:

Specifying chunks in a zarr store
Expand Down Expand Up @@ -978,6 +953,38 @@ length of each dimension by using the shorthand chunk size ``-1``:
The number of chunks on Tair matches our dask chunks, while there is now only a single
chunk in the directory stores of each coordinate.

.. _io.zarr.consolidated_metadata:

Consolidated Metadata
~~~~~~~~~~~~~~~~~~~~~

Xarray needs to read all of the zarr metadata when it opens a dataset.
In some storage mediums, such as with cloud object storage (e.g. `Amazon S3`_),
this can introduce significant overhead, because two separate HTTP calls to the
object store must be made for each variable in the dataset.
By default Xarray uses a feature called
*consolidated metadata*, storing all metadata for the entire dataset with a
single key (by default called ``.zmetadata``). This typically drastically speeds
up opening the store. (For more information on this feature, consult the
`zarr docs on consolidating metadata <https://zarr.readthedocs.io/en/latest/tutorial.html#consolidating-metadata>`_.)

By default, xarray writes consolidated metadata and attempts to read stores
with consolidated metadata, falling back to use non-consolidated metadata for
reads. Because this fall-back option is so much slower, xarray issues a
``RuntimeWarning`` with guidance when reading with consolidated metadata fails:

Failed to open Zarr store with consolidated metadata, falling back to try
reading non-consolidated metadata. This is typically much slower for
opening a dataset. To silence this warning, consider:

1. Consolidating metadata in this existing store with
:py:func:`zarr.consolidate_metadata`.
2. Explicitly setting ``consolidated=False``, to avoid trying to read
consolidate metadata.
3. Explicitly setting ``consolidated=True``, to raise an error in this case
instead of falling back to try reading non-consolidated metadata.


.. _io.iris:

Iris
Expand Down
Loading