Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow fsspec URLs in open_(mf)dataset #4823

Merged
merged 24 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -890,17 +890,44 @@ Cloud Storage Buckets

It is possible to read and write xarray datasets directly from / to cloud
storage buckets using zarr. This example uses the `gcsfs`_ package to provide
a ``MutableMapping`` interface to `Google Cloud Storage`_, which we can then
pass to xarray::
an interface to `Google Cloud Storage`_.

From v0.16.2: general `fsspec`_ URLs are parsed and the store set up for you
automatically when reading, such that you can open a dataset in a single
call. You should include any arguments to the storage backend as the
key ``storage_options``, part of ``backend_kwargs``.

.. code:: python

ds_gcs = xr.open_dataset(
"gcs://<bucket-name>/path.zarr",
backend_kwargs={
"storage_options": {"project": "<project-name>", "token": None}
},
engine="zarr",
)


This also works with ``open_mfdataset``, allowing you to pass a list of paths or
a URL to be interpreted as a glob string.

For older versions, and for writing, you must explicitly set up a ``MutableMapping``
instance and pass this, as follows:

.. code:: python

import gcsfs
fs = gcsfs.GCSFileSystem(project='<project-name>', token=None)
gcsmap = gcsfs.mapping.GCSMap('<bucket-name>', gcs=fs, check=True, create=False)

fs = gcsfs.GCSFileSystem(project="<project-name>", token=None)
gcsmap = gcsfs.mapping.GCSMap("<bucket-name>", gcs=fs, check=True, create=False)
# write to the bucket
ds.to_zarr(store=gcsmap)
# read it back
ds_gcs = xr.open_zarr(gcsmap)

(or use the utility function ``fsspec.get_mapper()``).

.. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/
.. _Zarr: http://zarr.readthedocs.io/
.. _Amazon S3: https://aws.amazon.com/s3/
.. _Google Cloud Storage: https://cloud.google.com/storage/
Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ New Features
in the form of kwargs as well as a dict, like most similar methods.
By `Maximilian Roos <https://github.com/max-sixty>`_.

- :py:func:`open_dataset` and :py:func:`open_mfdataset` now accept ``fsspec`` URLs
(including globs for the latter) for ``engine="zarr"``, and so allow reading from
many remote and other file systems (:pull:`4461`)
By `Martin Durant <https://github.com/martindurant>`_

Bug fixes
~~~~~~~~~
- :py:meth:`DataArray.resample` and :py:meth:`Dataset.resample` do not trigger computations anymore if :py:meth:`Dataset.weighted` or :py:meth:`DataArray.weighted` are applied (:issue:`4625`, :pull:`4668`). By `Julius Busecke <https://github.com/jbusecke>`_.
Expand Down
27 changes: 24 additions & 3 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ def open_dataarray(
backend_kwargs: dict, optional
A dictionary of keyword arguments to pass on to the backend. This
may be useful when backend options would improve performance or
allow user control of dataset processing.
allow user control of dataset processing. If using fsspec URLs,
include the key "storage_options" to pass arguments to the
storage layer.
use_cftime: bool, optional
Only relevant if encoded dates come from a standard calendar
(e.g. "gregorian", "proleptic_gregorian", "standard", or not
Expand Down Expand Up @@ -869,14 +871,33 @@ def open_mfdataset(
.. [2] http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance
"""
if isinstance(paths, str):
if is_remote_uri(paths):
if is_remote_uri(paths) and engine == "zarr":
try:
from fsspec.core import get_fs_token_paths
except ImportError as e:
raise ImportError(
"The use of remote URLs for opening zarr requires the package fsspec"
) from e

fs, _, _ = get_fs_token_paths(
paths,
mode="rb",
storage_options=kwargs.get("backend_kwargs", {}).get(
"storage_options", {}
),
expand=False,
)
paths = fs.glob(fs._strip_protocol(paths)) # finds directories
paths = [fs.get_mapper(path) for path in paths]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky. This assumes the backend is to want a mapper object (as the zarr backend does). But, what if the glob returns a list of netcdf files? Wouldn't we want a list of file objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is my comment for "should we actually special case zarr". It could make files - for now it would just error. We don't have tests for this, though, but now might be the time to start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now tracking with the comments above, I think we have two options:

  1. inject some backend specific logic in the api here to make a decision about what sort of object to return (if engine=='zarr', return mapper; else return file_obj)
  2. only support globbing remote zarr stores

(1) seems to be the more reasonable thing to do here but is slightly less principled as we've been working to cleanly separate the api from the backends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose a third alternative might be to pass the paths through, and create mappers in the zarr backend (will re-instantiate the FS, but that's fine) and add the opening of remote files into each of the other backends that can handle it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have essentially done 1), but excluded HTTP for the non-zarr path, because it has a special place for some backends (dap...). In any case, I don't suppose anyone is using globbing with http, since it's generally unreliable.

elif is_remote_uri(paths):
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs: "
"{!r}. Instead, supply paths as an explicit list of strings.".format(
paths
)
)
paths = sorted(glob(_normalize_path(paths)))
else:
paths = sorted(glob(_normalize_path(paths)))
else:
paths = [str(p) if isinstance(p, Path) else p for p in paths]

Expand Down
16 changes: 15 additions & 1 deletion xarray/backends/zarr.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pathlib
from distutils.version import LooseVersion

import numpy as np

Expand Down Expand Up @@ -295,6 +296,7 @@ def open_group(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
append_dim=None,
write_region=None,
):
Expand All @@ -303,7 +305,15 @@ def open_group(
if isinstance(store, pathlib.Path):
store = os.fspath(store)

open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group)
open_kwargs = dict(
mode=mode,
synchronizer=synchronizer,
path=group,
)
if LooseVersion(zarr.__version__) >= "2.5.0":
open_kwargs["storage_options"] = storage_options
elif storage_options:
raise ValueError("Storage options only compatible with zarr>=2.5.0")
if chunk_store:
open_kwargs["chunk_store"] = chunk_store

Expand Down Expand Up @@ -537,6 +547,7 @@ def open_zarr(
consolidated=False,
overwrite_encoded_chunks=False,
chunk_store=None,
storage_options=None,
decode_timedelta=None,
use_cftime=None,
**kwargs,
Expand Down Expand Up @@ -649,6 +660,7 @@ def open_zarr(
"consolidated": consolidated,
"overwrite_encoded_chunks": overwrite_encoded_chunks,
"chunk_store": chunk_store,
"storage_options": storage_options,
}

ds = open_dataset(
Expand Down Expand Up @@ -687,6 +699,7 @@ def open_dataset(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
):
store = ZarrStore.open_group(
filename_or_obj,
Expand All @@ -696,6 +709,7 @@ def open_dataset(
consolidated=consolidated,
consolidate_on_close=consolidate_on_close,
chunk_store=chunk_store,
storage_options=storage_options,
)

store_entrypoint = StoreBackendEntrypoint()
Expand Down
7 changes: 6 additions & 1 deletion xarray/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,12 @@ def close_on_error(f):


def is_remote_uri(path: str) -> bool:
return bool(re.search(r"^https?\://", path))
"""Finds URLs of the form protocol:// or protocol::

This also matches for http[s]://, which were the only remote URLs
supported in <=v0.16.2.
"""
return bool(re.search(r"^[a-z][a-z0-9]*(\://|\:\:)", path))


def read_magic_number(filename_or_obj, count=8):
Expand Down
1 change: 1 addition & 0 deletions xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def LooseVersion(vstring):
has_nc_time_axis, requires_nc_time_axis = _importorskip("nc_time_axis")
has_rasterio, requires_rasterio = _importorskip("rasterio")
has_zarr, requires_zarr = _importorskip("zarr")
has_fsspec, requires_fsspec = _importorskip("fsspec")
has_iris, requires_iris = _importorskip("iris")
has_cfgrib, requires_cfgrib = _importorskip("cfgrib")
has_numbagg, requires_numbagg = _importorskip("numbagg")
Expand Down
52 changes: 51 additions & 1 deletion xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
requires_cfgrib,
requires_cftime,
requires_dask,
requires_fsspec,
requires_h5netcdf,
requires_netCDF4,
requires_pseudonetcdf,
Expand Down Expand Up @@ -3040,10 +3041,17 @@ def test_open_mfdataset(self):

with raises_regex(IOError, "no files to open"):
open_mfdataset("foo-bar-baz-*.nc")

with raises_regex(ValueError, "wild-card"):
open_mfdataset("http://some/remote/uri")

@requires_fsspec
def test_open_mfdataset_no_files(self):
pytest.importorskip("aiobotocore")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means this test is always skipped. Should this be added to some of the environments?


# glob is attempted as of #4823, but finds no files
with raises_regex(OSError, "no files"):
open_mfdataset("http://some/remote/uri", engine="zarr")

def test_open_mfdataset_2d(self):
original = Dataset({"foo": (["x", "y"], np.random.randn(10, 8))})
with create_tmp_file() as tmp1:
Expand Down Expand Up @@ -4799,6 +4807,48 @@ def test_extract_zarr_variable_encoding():
)


@requires_zarr
@requires_fsspec
def test_open_fsspec():
import fsspec # type: ignore
keewis marked this conversation as resolved.
Show resolved Hide resolved
import zarr

if not hasattr(zarr.storage, "FSStore") or not hasattr(
zarr.storage.FSStore, "getitems"
):
pytest.skip("zarr too old")

ds = open_dataset(os.path.join(os.path.dirname(__file__), "data", "example_1.nc"))

m = fsspec.filesystem("memory")
mm = m.get_mapper("out1.zarr")
ds.to_zarr(mm) # old interface
ds0 = ds.copy()
ds0["time"] = ds.time + pd.to_timedelta("1 day")
mm = m.get_mapper("out2.zarr")
ds0.to_zarr(mm) # old interface

# single dataset
url = "memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

# single dataset with caching
url = "simplecache::memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

# multi dataset
url = "memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2

# multi dataset with caching
url = "simplecache::memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2


@requires_h5netcdf
def test_load_single_value_h5netcdf(tmp_path):
"""Test that numeric single-element vector attributes are handled fine.
Expand Down