Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow fsspec/zarr/mfdataset #4461

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,9 @@ storage buckets using zarr. This example uses the `gcsfs`_ package to provide
a ``MutableMapping`` interface to `Google Cloud Storage`_, which we can then
pass to xarray::


.. ipython:: python

import gcsfs
fs = gcsfs.GCSFileSystem(project='<project-name>', token=None)
gcsmap = gcsfs.mapping.GCSMap('<bucket-name>', gcs=fs, check=True, create=False)
Expand All @@ -921,6 +924,22 @@ pass to xarray::
# read it back
ds_gcs = xr.open_zarr(gcsmap)

New in v0.16.2: general `fsspec`_ URLs are now parsed and the store set up for you
martindurant marked this conversation as resolved.
Show resolved Hide resolved
automatically when reading, such that the read part of the above code can
be replaced with

.. ipython:: python

ds_gcs = xr.open_dataset(
"gcs://<bucket-name>/path.zarr",
backend_kwargs={"storage_options": {"project": '<project-name>', "token": None}},
engine="zarr"
)

This also works with ``open_mfdataset``, allowing you to pass a list of paths or
a URL to be interpreted as a glob string.

.. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/
.. _Zarr: http://zarr.readthedocs.io/
.. _Amazon S3: https://aws.amazon.com/s3/
.. _Google Cloud Storage: https://cloud.google.com/storage/
Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ New Features
now works with ``engine="zarr"`` (:issue:`3668`, :pull:`4003`, :pull:`4187`).
By `Miguel Jimenez <https://github.com/Mikejmnez>`_ and `Wei Ji Leong <https://github.com/weiji14>`_.

- :py:func:`open_dataset` and :py:func:`open_mfdataset` now accept ``fsspec`` URLs
(including globs for the latter) for ``engine="zarr"``, and so allow reading from
many remote and other file systems (:pull:`4461`)
By `Martin Durant <https://github.com/martindurant>`_

Bug fixes
~~~~~~~~~

Expand Down
44 changes: 35 additions & 9 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def open_dataset(
ends with .gz, in which case the file is gunzipped and opened with
scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like
objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF).
Also supports arbitrary ``fsspec`` URLs, only for the "zarr" backend.
group : str, optional
Path to the netCDF4 group in the given file to open (only works for
netCDF4 files).
Expand Down Expand Up @@ -398,7 +399,10 @@ def open_dataset(
backend_kwargs: dict, optional
A dictionary of keyword arguments to pass on to the backend. This
may be useful when backend options would improve performance or
allow user control of dataset processing.
allow user control of dataset processing. When using an ``fsspec``
path for the filename, they key ``storage_options`` can be used
here to configure the backend storage instance. Alternatively, a
pre-configured file instance can be supplied with key ``fs``.
use_cftime: bool, optional
Only relevant if encoded dates come from a standard calendar
(e.g. "gregorian", "proleptic_gregorian", "standard", or not
Expand Down Expand Up @@ -535,11 +539,14 @@ def maybe_decode_store(store, chunks, lock=False):
ds2._file_obj = ds._file_obj
return ds2

filename_or_obj = _normalize_path(filename_or_obj)
fs = backend_kwargs.get("fs", None)
if fs is None:
filename_or_obj = _normalize_path(filename_or_obj)

if isinstance(filename_or_obj, AbstractDataStore):
store = filename_or_obj
else:
backend_kwargs = backend_kwargs.copy()
if engine is None:
engine = _autodetect_engine(filename_or_obj)

Expand All @@ -550,11 +557,15 @@ def maybe_decode_store(store, chunks, lock=False):
extra_kwargs["lock"] = lock
elif engine == "zarr":
backend_kwargs = backend_kwargs.copy()
backend_kwargs.pop("fs", None)
overwrite_encoded_chunks = backend_kwargs.pop(
"overwrite_encoded_chunks", None
)
extra_kwargs["mode"] = "r"
extra_kwargs["group"] = group
if fs is not None:
filename_or_obj = fs.get_mapper(filename_or_obj)
Comment on lines +580 to +581
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than adding the fs keyword argument, why not just encourage passing in an appropriate mapping for filename_or_obj?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works already, and will continue to work. However, the whole point. of this PR is to allow working out those details in a single call to open_dataset, which turns out very convenient for encoding in an Intake catalog, for instance, or indeed for the open_mfdataset implementation in here.

Copy link
Collaborator

@alexamici alexamici Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we are working on refactor of the backend API that among other things aims at removing all knowledge of what backends can or can't do from open_dataset. Adding logic inside if engine == "zarr" will probably result in merge conflicts.

I would suggest to move the call to fs.get_mapper(filename_or_obj) inside the zarr backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. I already did one slightly complex conflict resolve.

It isn't totally clear, though, that the logic can be buried in the zarr engine for two reasons:

  • when using open_mf, the globbing of remote files/directories happens early, before establishing individual zarr instances
  • actually the file instances that fsspec makes from URLs can be used by some other backends; that just happens not the be the emphasis here

Happy to go whichever way is most convenient for the library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to resolve this discussion in order to decide what to do about this PR. Any more thoughts from other devs.

In my view, some of the fsspec logic introduced in the PR should eventually move to the generic open_mfdataset function, as it is not really specific to Zarr. However, I don't see a strong downside to adding it to open_zarr right now. Eventually open_zarr will be deprecated. But the pattern used here could be copied and incorporated into the backend refactor.

backend_kwargs.pop("storage_options", None)

opener = _get_backend_cls(engine)
store = opener(filename_or_obj, **extra_kwargs, **backend_kwargs)
Expand Down Expand Up @@ -774,7 +785,8 @@ def open_mfdataset(
files to open. Paths can be given as strings or as pathlib Paths. If
concatenation along more than one dimension is desired, then ``paths`` must be a
nested list-of-lists (see ``combine_nested`` for details). (A string glob will
be expanded to a 1-dimensional list.)
be expanded to a 1-dimensional list.). When engine=="zarr", the path(s) can
be of any type understood by ``fsspec``.
chunks : int or dict, optional
Dictionary with keys given by dimension names and values given by chunk sizes.
In general, these should divide the dimensions of each dataset. If int, chunk
Expand Down Expand Up @@ -895,13 +907,27 @@ def open_mfdataset(
"""
if isinstance(paths, str):
if is_remote_uri(paths):
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs: "
"{!r}. Instead, supply paths as an explicit list of strings.".format(
paths
if engine != "zarr":
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs: "
"{!r}. Instead, supply paths as an explicit list of strings.".format(
paths
)
)
)
paths = sorted(glob(paths))
else:
import fsspec # type: ignore

backend_kwargs = kwargs.get("backend_kwargs", {})
storage_options = backend_kwargs.get("storage_options", None)

fs, _, _ = fsspec.core.get_fs_token_paths(
paths, storage_options=storage_options
)
paths = fs.expand_path(paths)
backend_kwargs["fs"] = fs
kwargs["backend_kwargs"] = backend_kwargs
else:
paths = sorted(glob(paths))
else:
paths = [str(p) if isinstance(p, Path) else p for p in paths]

Expand Down
3 changes: 3 additions & 0 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,16 @@ def open_group(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
):
import zarr

open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group)
if chunk_store:
open_kwargs["chunk_store"] = chunk_store

if storage_options:
open_kwargs["storage_options"] = storage_options
if consolidated:
# TODO: an option to pass the metadata_key keyword
zarr_group = zarr.open_consolidated(store, **open_kwargs)
Expand Down
2 changes: 1 addition & 1 deletion xarray/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def close_on_error(f):


def is_remote_uri(path: str) -> bool:
return bool(re.search(r"^https?\://", path))
return bool(re.search(r"^[a-z][a-z0-9]*(\://|\:\:)", path))


def is_grib_path(path: str) -> bool:
Expand Down
1 change: 1 addition & 0 deletions xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def LooseVersion(vstring):
has_nc_time_axis, requires_nc_time_axis = _importorskip("nc_time_axis")
has_rasterio, requires_rasterio = _importorskip("rasterio")
has_zarr, requires_zarr = _importorskip("zarr")
has_fsspec, requires_fsspec = _importorskip("fsspec")
has_iris, requires_iris = _importorskip("iris")
has_cfgrib, requires_cfgrib = _importorskip("cfgrib")
has_numbagg, requires_numbagg = _importorskip("numbagg")
Expand Down
31 changes: 31 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
requires_cfgrib,
requires_cftime,
requires_dask,
requires_fsspec,
requires_h5netcdf,
requires_netCDF4,
requires_pseudonetcdf,
Expand Down Expand Up @@ -4668,3 +4669,33 @@ def test_extract_zarr_variable_encoding():
actual = backends.zarr.extract_zarr_variable_encoding(
var, raise_on_invalid=True
)


@requires_zarr
@requires_fsspec
def test_open_fsspec():
import fsspec # type: ignore
import zarr

if not hasattr(zarr.storage, "FSStore") or not hasattr(
zarr.storage.FSStore, "getitems"
):
pytest.skip("zarr too old")

ds = open_dataset(os.path.join(os.path.dirname(__file__), "data", "example_1.nc"))

m = fsspec.filesystem("memory")
mm = m.get_mapper("out1.zarr")
ds.to_zarr(mm) # old interface
ds0 = ds.copy()
ds0["time"] = ds.time + pd.to_timedelta("1 day")
mm = m.get_mapper("out2.zarr")
ds0.to_zarr(mm) # old interface

url = "memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

url = "memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2