Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to_collection method returning an Xcollection #405

Merged
merged 35 commits into from
Nov 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
756c436
add wildcard
jukent Sep 18, 2020
90fd1ce
linting
jukent Sep 18, 2020
6524705
Merge branch 'master' of github.com:jukent/intake-esm
jukent Nov 8, 2021
f5501a1
first pass at editing to_dataset_dict
jukent Nov 8, 2021
955c3ed
add xcollection to intake-esm requirements
jukent Nov 8, 2021
474e23e
Merge branch 'intake:main' into collection
jukent Nov 24, 2021
5fc56fb
first pass at collection
jukent Nov 24, 2021
3b1626f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 24, 2021
500357c
rm xr from imports
jukent Nov 24, 2021
52af83e
Merge branch 'collection' of github.com:jukent/intake-esm into collec…
jukent Nov 24, 2021
3c07c6d
git syntax
jukent Nov 24, 2021
7496fa7
pin mamba version
jukent Nov 24, 2021
0e4194a
Delete wildcard.ipynb
jukent Nov 24, 2021
ff58201
unpin mamba
jukent Nov 24, 2021
f62c2e3
Update requirements.txt
Nov 24, 2021
d8af735
update setup.py
jukent Nov 24, 2021
451f8eb
change hack
jukent Nov 24, 2021
3302477
Update requirements.txt
Nov 24, 2021
9d52148
add xarray back
jukent Nov 24, 2021
424be8a
Merge branch 'collection' of github.com:jukent/intake-esm into collec…
jukent Nov 24, 2021
4f9ef5c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 24, 2021
ffa1dd2
Update intake_esm/core.py
jukent Nov 24, 2021
aae8457
add to_dataet_dict original back
jukent Nov 24, 2021
23bd360
call to_dataset_dict
jukent Nov 24, 2021
befc742
update docstring
jukent Nov 24, 2021
b92b7a0
add to_dataset_dict input kwargs
jukent Nov 24, 2021
96a5631
Delete to_datset_dict_test.ipynb
jukent Nov 24, 2021
68f4275
Merge branch 'main' into collection
jukent Nov 24, 2021
dfdb763
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 24, 2021
755dce2
add test
jukent Nov 24, 2021
5dfea27
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 24, 2021
d65e9b1
get parameterize
jukent Nov 24, 2021
f8aa214
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 24, 2021
0ee96ad
use published xcollection
jukent Nov 24, 2021
254f709
Update intake_esm/core.py
jukent Nov 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions intake_esm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas as pd
import pydantic
import xarray as xr
import xcollection as xc
from fastprogress.fastprogress import progress_bar
from intake.catalog import Catalog

Expand Down Expand Up @@ -247,6 +248,7 @@ def __dir__(self) -> typing.List[str]:
rv = [
'df',
'to_dataset_dict',
'to_collection',
'to_dask',
'keys',
'serialize',
Expand Down Expand Up @@ -573,6 +575,88 @@ def to_dataset_dict(
self.datasets = self._create_derived_variables(datasets, skip_on_error)
return self.datasets

@pydantic.validate_arguments
def to_collection(
self,
xarray_open_kwargs: typing.Dict[str, typing.Any] = None,
xarray_combine_by_coords_kwargs: typing.Dict[str, typing.Any] = None,
preprocess: typing.Callable = None,
storage_options: typing.Dict[pydantic.StrictStr, typing.Any] = None,
progressbar: pydantic.StrictBool = None,
aggregate: pydantic.StrictBool = None,
skip_on_error: pydantic.StrictBool = False,
**kwargs,
) -> xc.Collection:
"""
Load catalog entries into a Collection of xarray datasets.

Parameters
----------
xarray_open_kwargs : dict
Keyword arguments to pass to :py:func:`~xarray.open_dataset` function
xarray_combine_by_coords_kwargs: : dict
Keyword arguments to pass to :py:func:`~xarray.combine_by_coords` function.
preprocess : callable, optional
If provided, call this function on each dataset prior to aggregation.
storage_options : dict, optional
Parameters passed to the backend file-system such as Google Cloud Storage,
Amazon Web Service S3.
progressbar : bool
If True, will print a progress bar to standard error (stderr)
when loading assets into :py:class:`~xarray.Dataset`.
aggregate : bool, optional
If False, no aggregation will be done.
skip_on_error : bool, optional
If True, skip datasets that cannot be loaded and/or variables we are unable to derive.

Returns
-------
dsets : Collection
A Collection of xarray :py:class:`~xarray.Dataset`.

Examples
--------
>>> import intake
>>> col = intake.open_esm_datastore("glade-cmip6.json")
>>> cat = col.search(
... source_id=["BCC-CSM2-MR", "CNRM-CM6-1", "CNRM-ESM2-1"],
... experiment_id=["historical", "ssp585"],
... variable_id="pr",
... table_id="Amon",
... grid_label="gn",
... )
>>> dsets = cat.to_collection()
>>> dsets.keys()
dict_keys(['CMIP.BCC.BCC-CSM2-MR.historical.Amon.gn', 'ScenarioMIP.BCC.BCC-CSM2-MR.ssp585.Amon.gn'])
>>> dsets["CMIP.BCC.BCC-CSM2-MR.historical.Amon.gn"]
<xarray.Dataset>
Dimensions: (bnds: 2, lat: 160, lon: 320, member_id: 3, time: 1980)
Coordinates:
* lon (lon) float64 0.0 1.125 2.25 3.375 ... 355.5 356.6 357.8 358.9
* lat (lat) float64 -89.14 -88.03 -86.91 -85.79 ... 86.91 88.03 89.14
* time (time) object 1850-01-16 12:00:00 ... 2014-12-16 12:00:00
* member_id (member_id) <U8 'r1i1p1f1' 'r2i1p1f1' 'r3i1p1f1'
Dimensions without coordinates: bnds
Data variables:
lat_bnds (lat, bnds) float64 dask.array<chunksize=(160, 2), meta=np.ndarray>
lon_bnds (lon, bnds) float64 dask.array<chunksize=(320, 2), meta=np.ndarray>
time_bnds (time, bnds) object dask.array<chunksize=(1980, 2), meta=np.ndarray>
pr (member_id, time, lat, lon) float32 dask.array<chunksize=(1, 600, 160, 320), meta=np.ndarray>
"""

self.datasets = self.to_dataset_dict(
xarray_open_kwargs=xarray_open_kwargs,
xarray_combine_by_coords_kwargs=xarray_combine_by_coords_kwargs,
preprocess=preprocess,
storage_options=storage_options,
progressbar=progressbar,
aggregate=aggregate,
skip_on_error=skip_on_error,
**kwargs,
)
self.datasets = xc.Collection(self.datasets)
return self.datasets

def to_dask(self, **kwargs) -> xr.Dataset:
"""
Convert result to dataset.
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ requests>=2.24.0
xarray>=0.19,!=0.20.0,!=0.20.1
zarr>=2.5
pydantic>=1.8.2
xcollection
34 changes: 34 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pydantic
import pytest
import xarray as xr
import xcollection as xc

import intake_esm

Expand Down Expand Up @@ -241,6 +242,39 @@ def test_to_dataset_dict(path, query, xarray_open_kwargs):
assert ds.time.encoding


@pytest.mark.parametrize(
'path, query, xarray_open_kwargs',
[
(
zarr_col_pangeo_cmip6,
dict(
variable_id=['pr'],
experiment_id='ssp370',
activity_id='AerChemMIP',
source_id='BCC-ESM1',
table_id='Amon',
grid_label='gn',
),
{'consolidated': True, 'backend_kwargs': {'storage_options': {'token': 'anon'}}},
),
(
cdf_col_sample_cmip6,
dict(source_id=['CNRM-ESM2-1', 'CNRM-CM6-1', 'BCC-ESM1'], variable_id=['tasmax']),
{'chunks': {'time': 1}},
),
],
)
def test_to_collection(path, query, xarray_open_kwargs):
cat = intake.open_esm_datastore(path)
cat_sub = cat.search(**query)
coll = cat_sub.to_collection(xarray_open_kwargs=xarray_open_kwargs)
_, ds = coll.popitem()
assert 'member_id' in ds.dims
assert len(ds.__dask_keys__()) > 0
assert ds.time.encoding
assert isinstance(coll, xc.Collection)


@pytest.mark.parametrize(
'path, query, xarray_open_kwargs',
[
Expand Down