-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
data_vars option added to open_mfdataset #1580
Changes from 19 commits
180cf58
6195fcd
956fbeb
e721620
34b1004
09d25c6
e901a37
3141ce4
8319aa7
fdc940e
96e842e
b033bec
b854ce4
787a98b
4d3c685
1823ba3
b47e665
23f0fc6
05c8391
1f0e763
fadda83
f80fe1f
14dee9d
f1f9d8b
f64c9e3
633eec3
086cf25
b0ca228
e463e37
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -431,7 +431,7 @@ def close(self): | |
|
||
def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT, | ||
compat='no_conflicts', preprocess=None, engine=None, | ||
lock=None, **kwargs): | ||
lock=None, data_vars='all', **kwargs): | ||
"""Open multiple files as a single dataset. | ||
|
||
Requires dask to be installed. Attributes from the first dataset file | ||
|
@@ -487,6 +487,18 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT, | |
default, a per-variable lock is used when reading data from netCDF | ||
files with the netcdf4 and h5netcdf engines to avoid issues with | ||
concurrent access when using dask's multithreaded backend. | ||
data_vars : {'minimal', 'different', 'all' or list of str}, optional | ||
These data variables will be concatenated together: | ||
* 'minimal': Only data variables in which the dimension already | ||
appears are included. | ||
* 'different': Data variables which are not equal (ignoring | ||
attributes) across all datasets are also concatenated (as well as | ||
all for which dimension already appears). Beware: this option may | ||
load the data payload of data variables into memory if they are not | ||
already loaded. | ||
* 'all': All data variables will be concatenated. | ||
* list of str: The listed data variables will be concatenated, in | ||
addition to the 'minimal' data variables. | ||
**kwargs : optional | ||
Additional arguments passed on to :py:func:`xarray.open_dataset`. | ||
|
||
|
@@ -516,12 +528,20 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT, | |
if preprocess is not None: | ||
datasets = [preprocess(ds) for ds in datasets] | ||
|
||
if concat_dim is _CONCAT_DIM_DEFAULT: | ||
combined = auto_combine(datasets, compat=compat) | ||
else: | ||
combined = auto_combine(datasets, concat_dim=concat_dim, compat=compat) | ||
combined._file_obj = _MultiFileCloser(file_objs) | ||
combined.attrs = datasets[0].attrs | ||
# close datasets in case of a ValueError | ||
try: | ||
if concat_dim is _CONCAT_DIM_DEFAULT: | ||
combined = auto_combine(datasets, compat=compat, | ||
data_vars=data_vars) | ||
else: | ||
combined = auto_combine(datasets, concat_dim=concat_dim, | ||
compat=compat, data_vars=data_vars) | ||
combined._file_obj = _MultiFileCloser(file_objs) | ||
combined.attrs = datasets[0].attrs | ||
except ValueError as ve: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's only wrap the lines where this could fail -- so this should be moved up two lines, before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just use |
||
for ds in datasets: | ||
ds.close() | ||
raise ve | ||
|
||
return combined | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1267,6 +1267,133 @@ def test_4_open_large_num_files_h5netcdf(self): | |
self.validate_open_mfdataset_large_num_files(engine=['h5netcdf']) | ||
|
||
|
||
@requires_scipy_or_netCDF4 | ||
class OpenMFDatasetDataVarsKWTest(TestCase): | ||
coord_name = 'lon' | ||
var_name = 'v1' | ||
|
||
def gen_datasets_with_common_coord_and_time(self): | ||
# create coordinate data | ||
nx = 10 | ||
nt = 10 | ||
x = np.arange(nx) | ||
t1 = np.arange(nt) | ||
t2 = np.arange(nt, 2 * nt, 1) | ||
|
||
v1 = np.random.randn(nt, nx) | ||
v2 = np.random.randn(nt, nx) | ||
|
||
ds1 = Dataset(data_vars={self.var_name: (['t', 'x'], v1), | ||
self.coord_name: ('x', 2 * x)}, | ||
coords={ | ||
't': (['t', ], t1), | ||
'x': (['x', ], x) | ||
}) | ||
|
||
ds2 = Dataset(data_vars={self.var_name: (['t', 'x'], v2), | ||
self.coord_name: ('x', 2 * x)}, | ||
coords={ | ||
't': (['t', ], t2), | ||
'x': (['x', ], x) | ||
}) | ||
|
||
return ds1, ds2 | ||
|
||
def test_open_mfdataset_does_same_as_concat(self): | ||
with create_tmp_file() as tmpfile1: | ||
with create_tmp_file() as tmpfile2: | ||
ds1, ds2 = self.gen_datasets_with_common_coord_and_time() | ||
|
||
# save data to the temporary files | ||
ds1.to_netcdf(tmpfile1) | ||
ds2.to_netcdf(tmpfile2) | ||
|
||
files = [tmpfile1, tmpfile2] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put this shared logic in a context manager? e.g., @contextlib.contextmanager
def setup_files(self):
with create_tmp_file() as tmpfile1:
with create_tmp_file() as tmpfile2:
ds1, ds2 = self.gen_datasets_with_common_coord_and_time()
# save data to the temporary files
ds1.to_netcdf(tmpfile1)
ds2.to_netcdf(tmpfile2)
yield [tmpfile1, tmpfile2]
def test_open_mfdataset_does_same_as_concat(self):
with self.setup_files() as files:
...
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @shoyer: I like the contextmanager trick a lot. I did feel like there should be a better way to set up tests. Actually, I have never used it before. Cheers |
||
for opt in ['all', 'minimal']: | ||
with open_mfdataset(files, data_vars=opt) as ds: | ||
kwargs = dict(data_vars=opt, dim='t') | ||
ds_expect = xr.concat([ds1, ds2], **kwargs) | ||
|
||
data = ds[self.var_name][:] | ||
data_expect = ds_expect[self.var_name][:] | ||
|
||
coord = ds[self.coord_name][:] | ||
coord_expect = ds_expect[self.coord_name][:] | ||
|
||
self.assertArrayEqual(data, data_expect) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you make use of |
||
self.assertArrayEqual(coord, coord_expect) | ||
|
||
def test_common_coord_dims_should_change_when_datavars_all(self): | ||
with create_tmp_file() as tmpfile1: | ||
with create_tmp_file() as tmpfile2: | ||
ds1, ds2 = self.gen_datasets_with_common_coord_and_time() | ||
|
||
# save data to the temporary files | ||
ds1.to_netcdf(tmpfile1) | ||
ds2.to_netcdf(tmpfile2) | ||
|
||
files = [tmpfile1, tmpfile2] | ||
# open the files with the default data_vars='all' | ||
with open_mfdataset(files, data_vars='all') as ds: | ||
|
||
coord_shape = ds[self.coord_name].shape | ||
coord_shape1 = ds1[self.coord_name].shape | ||
coord_shape2 = ds2[self.coord_name].shape | ||
|
||
var_shape = ds[self.var_name].shape | ||
var_shape1 = ds1[self.var_name].shape | ||
var_shape2 = ds2[self.var_name].shape | ||
|
||
self.assertNotEqual(coord_shape1, coord_shape) | ||
self.assertNotEqual(coord_shape2, coord_shape) | ||
|
||
self.assertEqual(var_shape[0], | ||
var_shape1[0] + var_shape2[0]) | ||
|
||
self.assertEqual(var_shape, coord_shape) | ||
|
||
def test_common_coord_dims_should_not_change_when_datavars_minimal(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks very similar to the last test -- can you maybe consolidate it? Or you could even potentially drop some of these tests. We have unit tests for |
||
with create_tmp_file() as tmpfile1: | ||
with create_tmp_file() as tmpfile2: | ||
ds1, ds2 = self.gen_datasets_with_common_coord_and_time() | ||
|
||
# save data to the temporary files | ||
ds1.to_netcdf(tmpfile1) | ||
ds2.to_netcdf(tmpfile2) | ||
|
||
files = [tmpfile1, tmpfile2] | ||
# open the files with the default data_vars='all' | ||
with open_mfdataset(files, data_vars='minimal') as ds: | ||
|
||
coord_shape = ds[self.coord_name].shape | ||
coord_shape1 = ds1[self.coord_name].shape | ||
coord_shape2 = ds2[self.coord_name].shape | ||
|
||
var_shape = ds[self.var_name].shape | ||
var_shape1 = ds1[self.var_name].shape | ||
var_shape2 = ds2[self.var_name].shape | ||
|
||
self.assertEqual(coord_shape1, coord_shape) | ||
|
||
self.assertEqual(coord_shape2, coord_shape) | ||
self.assertEqual(var_shape[0], | ||
var_shape1[0] + var_shape2[0]) | ||
|
||
def test_invalid_data_vars_value_should_fail(self): | ||
with self.assertRaises(ValueError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move this to only go around the line where you expect the error |
||
with create_tmp_file() as tmpfile1: | ||
with create_tmp_file() as tmpfile2: | ||
ds1, ds2 = self.gen_datasets_with_common_coord_and_time() | ||
|
||
# save data to the temporary files | ||
ds1.to_netcdf(tmpfile1) | ||
ds2.to_netcdf(tmpfile2) | ||
|
||
files = [tmpfile1, tmpfile2] | ||
with open_mfdataset(files, data_vars='minimum'): | ||
pass | ||
|
||
|
||
@requires_dask | ||
@requires_scipy | ||
@requires_netCDF4 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For completeness, would it also make sense to pass on the
coords
option at this time?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @shoyer:
I have added the coords keyword in a similar manner as data_vars.
I'll probably have to add a test for it as well.
Cheers