Skip to content

Commit

Permalink
Allow fsspec URLs in open_(mf)dataset (#4823)
Browse files Browse the repository at this point in the history
Co-authored-by: keewis <[email protected]>
Co-authored-by: Ray Bell <[email protected]>
  • Loading branch information
3 people authored Feb 16, 2021
1 parent 735a359 commit 8bf415a
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 10 deletions.
1 change: 1 addition & 0 deletions ci/requirements/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
- conda-forge
- nodefaults
dependencies:
- aiobotocore
- boto3
- bottleneck
- cartopy
Expand Down
2 changes: 2 additions & 0 deletions ci/requirements/py38-all-but-dask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ channels:
- nodefaults
dependencies:
- python=3.8
- black
- aiobotocore
- boto3
- bottleneck
- cartopy
Expand Down
35 changes: 31 additions & 4 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -890,17 +890,44 @@ Cloud Storage Buckets

It is possible to read and write xarray datasets directly from / to cloud
storage buckets using zarr. This example uses the `gcsfs`_ package to provide
a ``MutableMapping`` interface to `Google Cloud Storage`_, which we can then
pass to xarray::
an interface to `Google Cloud Storage`_.

From v0.16.2: general `fsspec`_ URLs are parsed and the store set up for you
automatically when reading, such that you can open a dataset in a single
call. You should include any arguments to the storage backend as the
key ``storage_options``, part of ``backend_kwargs``.

.. code:: python
ds_gcs = xr.open_dataset(
"gcs://<bucket-name>/path.zarr",
backend_kwargs={
"storage_options": {"project": "<project-name>", "token": None}
},
engine="zarr",
)
This also works with ``open_mfdataset``, allowing you to pass a list of paths or
a URL to be interpreted as a glob string.

For older versions, and for writing, you must explicitly set up a ``MutableMapping``
instance and pass this, as follows:

.. code:: python
import gcsfs
fs = gcsfs.GCSFileSystem(project='<project-name>', token=None)
gcsmap = gcsfs.mapping.GCSMap('<bucket-name>', gcs=fs, check=True, create=False)
fs = gcsfs.GCSFileSystem(project="<project-name>", token=None)
gcsmap = gcsfs.mapping.GCSMap("<bucket-name>", gcs=fs, check=True, create=False)
# write to the bucket
ds.to_zarr(store=gcsmap)
# read it back
ds_gcs = xr.open_zarr(gcsmap)
(or use the utility function ``fsspec.get_mapper()``).

.. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/
.. _Zarr: http://zarr.readthedocs.io/
.. _Amazon S3: https://aws.amazon.com/s3/
.. _Google Cloud Storage: https://cloud.google.com/storage/
Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ New Features
in the form of kwargs as well as a dict, like most similar methods.
By `Maximilian Roos <https://github.com/max-sixty>`_.

- :py:func:`open_dataset` and :py:func:`open_mfdataset` now accept ``fsspec`` URLs
(including globs for the latter) for ``engine="zarr"``, and so allow reading from
many remote and other file systems (:pull:`4461`)
By `Martin Durant <https://github.com/martindurant>`_

Bug fixes
~~~~~~~~~
- :py:meth:`DataArray.resample` and :py:meth:`Dataset.resample` do not trigger computations anymore if :py:meth:`Dataset.weighted` or :py:meth:`DataArray.weighted` are applied (:issue:`4625`, :pull:`4668`). By `Julius Busecke <https://github.com/jbusecke>`_.
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-distributed.*]
ignore_missing_imports = True
[mypy-fsspec.*]
ignore_missing_imports = True
[mypy-h5netcdf.*]
ignore_missing_imports = True
[mypy-h5py.*]
Expand Down
27 changes: 24 additions & 3 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ def open_dataarray(
backend_kwargs: dict, optional
A dictionary of keyword arguments to pass on to the backend. This
may be useful when backend options would improve performance or
allow user control of dataset processing.
allow user control of dataset processing. If using fsspec URLs,
include the key "storage_options" to pass arguments to the
storage layer.
use_cftime: bool, optional
Only relevant if encoded dates come from a standard calendar
(e.g. "gregorian", "proleptic_gregorian", "standard", or not
Expand Down Expand Up @@ -869,14 +871,33 @@ def open_mfdataset(
.. [2] http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance
"""
if isinstance(paths, str):
if is_remote_uri(paths):
if is_remote_uri(paths) and engine == "zarr":
try:
from fsspec.core import get_fs_token_paths
except ImportError as e:
raise ImportError(
"The use of remote URLs for opening zarr requires the package fsspec"
) from e

fs, _, _ = get_fs_token_paths(
paths,
mode="rb",
storage_options=kwargs.get("backend_kwargs", {}).get(
"storage_options", {}
),
expand=False,
)
paths = fs.glob(fs._strip_protocol(paths)) # finds directories
paths = [fs.get_mapper(path) for path in paths]
elif is_remote_uri(paths):
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs: "
"{!r}. Instead, supply paths as an explicit list of strings.".format(
paths
)
)
paths = sorted(glob(_normalize_path(paths)))
else:
paths = sorted(glob(_normalize_path(paths)))
else:
paths = [str(p) if isinstance(p, Path) else p for p in paths]

Expand Down
16 changes: 15 additions & 1 deletion xarray/backends/zarr.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pathlib
from distutils.version import LooseVersion

import numpy as np

Expand Down Expand Up @@ -295,6 +296,7 @@ def open_group(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
append_dim=None,
write_region=None,
):
Expand All @@ -303,7 +305,15 @@ def open_group(
if isinstance(store, pathlib.Path):
store = os.fspath(store)

open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group)
open_kwargs = dict(
mode=mode,
synchronizer=synchronizer,
path=group,
)
if LooseVersion(zarr.__version__) >= "2.5.0":
open_kwargs["storage_options"] = storage_options
elif storage_options:
raise ValueError("Storage options only compatible with zarr>=2.5.0")
if chunk_store:
open_kwargs["chunk_store"] = chunk_store

Expand Down Expand Up @@ -537,6 +547,7 @@ def open_zarr(
consolidated=False,
overwrite_encoded_chunks=False,
chunk_store=None,
storage_options=None,
decode_timedelta=None,
use_cftime=None,
**kwargs,
Expand Down Expand Up @@ -649,6 +660,7 @@ def open_zarr(
"consolidated": consolidated,
"overwrite_encoded_chunks": overwrite_encoded_chunks,
"chunk_store": chunk_store,
"storage_options": storage_options,
}

ds = open_dataset(
Expand Down Expand Up @@ -687,6 +699,7 @@ def open_dataset(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
):
store = ZarrStore.open_group(
filename_or_obj,
Expand All @@ -696,6 +709,7 @@ def open_dataset(
consolidated=consolidated,
consolidate_on_close=consolidate_on_close,
chunk_store=chunk_store,
storage_options=storage_options,
)

store_entrypoint = StoreBackendEntrypoint()
Expand Down
7 changes: 6 additions & 1 deletion xarray/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,12 @@ def close_on_error(f):


def is_remote_uri(path: str) -> bool:
return bool(re.search(r"^https?\://", path))
"""Finds URLs of the form protocol:// or protocol::
This also matches for http[s]://, which were the only remote URLs
supported in <=v0.16.2.
"""
return bool(re.search(r"^[a-z][a-z0-9]*(\://|\:\:)", path))


def read_magic_number(filename_or_obj, count=8):
Expand Down
1 change: 1 addition & 0 deletions xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def LooseVersion(vstring):
has_nc_time_axis, requires_nc_time_axis = _importorskip("nc_time_axis")
has_rasterio, requires_rasterio = _importorskip("rasterio")
has_zarr, requires_zarr = _importorskip("zarr")
has_fsspec, requires_fsspec = _importorskip("fsspec")
has_iris, requires_iris = _importorskip("iris")
has_cfgrib, requires_cfgrib = _importorskip("cfgrib")
has_numbagg, requires_numbagg = _importorskip("numbagg")
Expand Down
52 changes: 51 additions & 1 deletion xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
requires_cfgrib,
requires_cftime,
requires_dask,
requires_fsspec,
requires_h5netcdf,
requires_netCDF4,
requires_pseudonetcdf,
Expand Down Expand Up @@ -3040,10 +3041,17 @@ def test_open_mfdataset(self):

with raises_regex(IOError, "no files to open"):
open_mfdataset("foo-bar-baz-*.nc")

with raises_regex(ValueError, "wild-card"):
open_mfdataset("http://some/remote/uri")

@requires_fsspec
def test_open_mfdataset_no_files(self):
pytest.importorskip("aiobotocore")

# glob is attempted as of #4823, but finds no files
with raises_regex(OSError, "no files"):
open_mfdataset("http://some/remote/uri", engine="zarr")

def test_open_mfdataset_2d(self):
original = Dataset({"foo": (["x", "y"], np.random.randn(10, 8))})
with create_tmp_file() as tmp1:
Expand Down Expand Up @@ -4799,6 +4807,48 @@ def test_extract_zarr_variable_encoding():
)


@requires_zarr
@requires_fsspec
def test_open_fsspec():
import fsspec
import zarr

if not hasattr(zarr.storage, "FSStore") or not hasattr(
zarr.storage.FSStore, "getitems"
):
pytest.skip("zarr too old")

ds = open_dataset(os.path.join(os.path.dirname(__file__), "data", "example_1.nc"))

m = fsspec.filesystem("memory")
mm = m.get_mapper("out1.zarr")
ds.to_zarr(mm) # old interface
ds0 = ds.copy()
ds0["time"] = ds.time + pd.to_timedelta("1 day")
mm = m.get_mapper("out2.zarr")
ds0.to_zarr(mm) # old interface

# single dataset
url = "memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

# single dataset with caching
url = "simplecache::memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

# multi dataset
url = "memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2

# multi dataset with caching
url = "simplecache::memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2


@requires_h5netcdf
def test_load_single_value_h5netcdf(tmp_path):
"""Test that numeric single-element vector attributes are handled fine.
Expand Down

0 comments on commit 8bf415a

Please sign in to comment.