Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to_dataset_dict does not work with tar file entries #554

Closed
observingClouds opened this issue Dec 15, 2022 · 1 comment · Fixed by #558
Closed

to_dataset_dict does not work with tar file entries #554

observingClouds opened this issue Dec 15, 2022 · 1 comment · Fixed by #558

Comments

@observingClouds
Copy link
Contributor

observingClouds commented Dec 15, 2022

Description

I'm trying to access entries in an intake-esm catalog that are pointing to a tar archive, but it potentially also does not work with other paths that are interpreted by fsspec.

What I Did

import xarray as xr
import tarfile
import pandas as pd
import intake
import tempfile

temp_dir = tempfile.mkdtemp()

ds=xr.tutorial.load_dataset("eraint_uvz")
ds.to_netcdf(temp_dir+"/output.nc")
with tarfile.TarFile.open(temp_dir+"/tarfile.tar","w") as f:
    f.add(temp_dir+"/output.nc", arcname="output.nc")


cat = dict({1:{"experiment":"object","case":"test","uri":f"tar://output.nc::{temp_dir}/tarfile.tar", "format":"netcdf"}})
catconfig_test = {'default_columns': ['experiment', 'case', 'uri', 'format'],
 'aggregation_control': {'aggregations': [],
  'groupby_attrs': ['experiment', 'case'],
  'variable_column_name': 'case'},
 'assets': {'column_name': 'uri', 'format_column_name': 'format'},
 'attributes': [],
 'description': 'Test cat',
 'last_updated': None,
 'esmcat_version': '0.1.0'}
df_test=pd.DataFrame.from_dict(cat,orient='index')
testcat=intake.open_esm_datastore(obj={"esmcat":catconfig_test,"df":df_test})
testcat.to_dataset_dict(xarray_open_kwargs=dict(engine="h5netcdf"))

This results in the following error message, in particular in:

TypeError: argument of type 'TarContainedFile' is not iterable
Entire traceback
--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.case'
------------------------------------------------------------------------------------------------------| 0.00% [0/1 00:00<?]
TypeError                                 Traceback (most recent call last)
File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:240, in ESMDataSource._open_dataset(self)
    220 datasets = [
    221     _open_dataset(
    222         record[self.path_column_name],
   (...)
    237     for _, record in self.df.iterrows()
    238 ]
--> 240 datasets = dask.compute(*datasets)
    241 if len(datasets) == 1:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb)
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
    225 id = get_id()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/utils.py:71, in apply(func, args, kwargs)
     70 if kwargs:
---> 71     return func(*args, **kwargs)
     72 else:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:67, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format)
     66 # Handle multi-file datasets with `xr.open_mfdataset()`
---> 67 if '*' in url or isinstance(url, list):
     68     # How should we handle concat_dim, and other xr.open_mfdataset kwargs?
     69     xarray_open_kwargs.update(preprocess=preprocess)

TypeError: argument of type 'TarContainedFile' is not iterable

The above exception was the direct cause of the following exception:

ESMDataSourceError                        Traceback (most recent call last)
Input In [1], in <cell line: 27>()
     25 df_test=pd.DataFrame.from_dict(cat,orient='index')
     26 testcat=intake.open_esm_datastore(obj={"esmcat":catconfig_test,"df":df_test})
---> 27 testcat.to_dataset_dict(xarray_open_kwargs=dict(engine="h5netcdf"))

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/core.py:651, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    649         except Exception as exc:
    650             if not skip_on_error:
--> 651                 raise exc
    652 self.datasets = self._create_derived_variables(datasets, skip_on_error)
    653 return self.datasets

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/core.py:647, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    645 for task in gen:
    646     try:
--> 647         key, ds = task.result()
    648         datasets[key] = ds
    649     except Exception as exc:

File ~/.conda/envs/slkspec_dev/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
    449     raise CancelledError()
    450 elif self._state == FINISHED:
--> 451     return self.__get_result()
    453 self._condition.wait(timeout)
    455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/.conda/envs/slkspec_dev/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/.conda/envs/slkspec_dev/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/core.py:789, in _load_source(key, source)
    788 def _load_source(key, source):
--> 789     return key, source.to_dask()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:273, in ESMDataSource.to_dask(self)
    271 def to_dask(self):
    272     """Return xarray object (which will have chunks)"""
--> 273     self._load_metadata()
    274     return self._ds

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake/source/base.py:285, in DataSourceBase._load_metadata(self)
    283 """load metadata only if needed"""
    284 if self._schema is None:
--> 285     self._schema = self._get_schema()
    286     self.dtype = self._schema.dtype
    287     self.shape = self._schema.shape

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:205, in ESMDataSource._get_schema(self)
    202 def _get_schema(self) -> Schema:
    204     if self._ds is None:
--> 205         self._open_dataset()
    206         metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
    207         self._schema = Schema(
    208             datashape=None,
    209             dtype=None,
   (...)
    212             extra_metadata=metadata,
    213         )

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:265, in ESMDataSource._open_dataset(self)
    262     self._ds.attrs[OPTIONS['dataset_key']] = self.key
    264 except Exception as exc:
--> 265     raise ESMDataSourceError(
    266         f"""Failed to load dataset with key='{self.key}'
    267          You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
    268          """
    269     ) from exc

ESMDataSourceError: Failed to load dataset with key='object.test'
                 You can use `cat['object.test'].df` to inspect the assets/files for this key.

Potential solution

The issue is

if '*' in url or isinstance(url, list):

The url is a fsspec.implementations.tar.TarContainedFile in this case and cannot be iterated. This issue might also effect other cases, where the url is not just a local path. The issue occurs with version 2022.9.18. With version 2021.8.17 the MWE returns the expected result:

--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.case'
Out[14]: █████████████████████████████████████████████████████████████████████████████████████████████| 100.00% [1/1 00:00<00:00]
{'object.test': <xarray.Dataset>
 Dimensions:    (longitude: 480, latitude: 241, level: 3, month: 2)
 Coordinates:
   * longitude  (longitude) float32 -180.0 -179.2 -178.5 ... 177.8 178.5 179.2
   * latitude   (latitude) float32 90.0 89.25 88.5 87.75 ... -88.5 -89.25 -90.0
   * level      (level) int32 200 500 850
   * month      (month) int32 1 7
 Data variables:
     z          (month, level, latitude, longitude) float32 dask.array<chunksize=(2, 3, 241, 480), meta=np.ndarray>
     u          (month, level, latitude, longitude) float32 dask.array<chunksize=(2, 3, 241, 480), meta=np.ndarray>
     v          (month, level, latitude, longitude) float32 dask.array<chunksize=(2, 3, 241, 480), meta=np.ndarray>
 Attributes:
     Conventions:                     CF-1.0
     Info:                            Monthly ERA-Interim data. Downloaded and...
     intake_esm_vars:                 ['test']
     intake_esm_attrs:experiment:     object
     intake_esm_attrs:case:           test
     intake_esm_attrs:uri:            tar://output.nc::/tmp/tmpqmtn9j9q/tarfil...
     intake_esm_attrs:_data_format_:  netcdf
     intake_esm_dataset_key:          object.test}

My suggested fix would be to change:

if '*' in url or isinstance(url, list):

to:

if (isinstance(url, str) and '*' in url) or isinstance(url, list):

Version information: output of intake_esm.show_versions()

Paste the output of intake_esm.show_versions() here:

import intake_esm

intake_esm.show_versions()
INSTALLED VERSIONS
------------------

cftime: 1.6.2
dask: 2022.10.2
fastprogress: 1.0.3
fsspec: 2022.11.0
gcsfs: None
intake: 0.6.6
intake_esm: 2022.9.18
netCDF4: 1.6.1
pandas: 1.5.1
requests: 2.25.1
s3fs: None
xarray: 2022.11.0
zarr: 2.13.3
@observingClouds observingClouds changed the title to_dataset_dict does not work with tar files to_dataset_dict does not work with tar file entries Dec 16, 2022
@andersy005
Copy link
Member

thank you for looking into this, @observingClouds! when you get a chance, a PR w/ the proposed fixed would be appreciated :)

mgrover1 added a commit that referenced this issue Jan 13, 2023
* check url is string

* add test for containerized netcdf file

* make dask single-threaded

Co-authored-by: Max Grover <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove auto pytest

Co-authored-by: Max Grover <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants