Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with intake-esm #6

Open
observingClouds opened this issue Dec 7, 2022 · 19 comments
Open

Issues with intake-esm #6

observingClouds opened this issue Dec 7, 2022 · 19 comments
Labels

Comments

@observingClouds
Copy link
Owner

    On levante:
import intake
import json
import pandas as pd
with open("/pool/data/Catalogs/dkrz_cmip6_disk.json") as f:
    catconfig=json.load(f)
testcat=intake.open_esm_datastore(esmcol_obj=pd.read_csv("/home/k/k204210/intake-esm/catalogs/dkrz_cmip6_archive.csv.gz"),
                                  esmcol_data=catconfig)
subset=testcat.search(source_id="MPI-ESM1-2-LR",
               experiment_id="ssp370",
               variable_id="tas",
              table_id="Amon")
import os
if "slk" not in os.environ["PATH"]:
    os.environ["PATH"]=os.environ["PATH"]+":/sw/spack-levante/slk-3.3.67-jrygfs/bin/:/sw/spack-levante/openjdk-17.0.0_35-k5o6dr/bin"
SLK_CACHE="/scratch/k/k204210/INTAKE"
%env SLK_CACHE={SLK_CACHE}
subset.to_dataset_dict()

calls 33 slk retrieves which call 33 /sw/spack-levante/slk-3.3.67-jrygfs/lib/slk-cli-tools-3.3.67.jar retrieve (66 processes in total) for 10 unique tars from the same directory on hsm. that cant be right

Originally posted by @wachsylon in #3 (comment)

@observingClouds
Copy link
Owner Author

@wachsylon, how many results does the search find? Are these results independent, i.e. are there no results that would request the same tar file?

@observingClouds
Copy link
Owner Author

This seems to be an issue with how intake-esm is requesting individual datasets when using to_dataset_dict().
The datasets are loaded independently ( different to e.g. xr.open_mfdataset):

        with concurrent.futures.ThreadPoolExecutor(max_workers=dask.system.CPU_COUNT) as executor:
            future_tasks = [
                executor.submit(_load_source, key, source) for key, source in sources.items()
            ]

[from intake_esm/core.py]

Due to these independent jobs, the slk-retrievals do not know anything from each other. One might think about allowing to set the queue for slkspec externally, similarly to the get_client of swiftspec.

@observingClouds observingClouds added the enhancement New feature or request label Dec 7, 2022
@observingClouds observingClouds changed the title Issues with intake-esm Issues with intake-esm: allow retrieval queue to be provided to slkspec Dec 7, 2022
@observingClouds
Copy link
Owner Author

@wachsylon could you give access to the catalog or point me to a public catalog where the same problem occurs?

PermissionError: [Errno 13] Permission denied: '/home/k/k204210/intake-esm/catalogs/dkrz_cmip6_archive.csv.gz'

@wachsylon
Copy link

@observingClouds

sorry, I put the catalog to
/work/ik1017/Catalogs/dkrz_cmip6_archive.csv.gz

I either ran into

OSError: 
            Failed to open netCDF/HDF dataset.

            *** Arguments passed to xarray.open_dataset() ***:

            - filename_or_obj: <fsspec.implementations.tar.TarContainedFile object at 0x7ffddaec9120>
            - kwargs: {'chunks': {}}

            *** fsspec options used ***:

            - root: ./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r7i1p1f1/Amon/tas/gn/v20190710/tas_Amon_MPI-ESM1-2-LR_ssp370_r7i1p1f1_gn_209501-210012.nc
            - protocol: tar

            ********************************************

Or with subset.to_dataset_dict(cdf_kwargs=dict(engine="h5netcdf")):

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/h5py/_hl/dataset.py:741, in Dataset.__getitem__(self, args, new_dtype)
    740 try:
--> 741     return self._fast_reader.read(args)
    742 except TypeError:

File h5py/_selector.pyx:370, in h5py._selector.Reader.read()

File h5py/h5fd.pyx:160, in h5py.h5fd.H5FD_fileobj_read()

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/fsspec/implementations/tar.py:175, in TarContainedFile.seek(self, to, whence)
    174     raise ValueError("Whence must be (0, 1, 2)")
--> 175 self.of.seek(to)

File /work/ik1017/CMIP6/meta/packems3/slkspec/slkspec/core.py:182, in SLKFile.seek(self, target)
    181     self._cache_files()
--> 182 return self._file_obj.seek(target)

ValueError: seek of closed file

The above exception was the direct cause of the following exception:

where it seems like fsspec.open is run twice.

@observingClouds
Copy link
Owner Author

observingClouds commented Dec 13, 2022

@wachsylon I had to adjust your example a bit. The arguments you were using do not exist in the current intake-esm version:

import intake
import json
import pandas as pd
with open("/pool/data/Catalogs/dkrz_cmip6_disk.json") as f:
    catconfig=json.load(f)
df=pd.read_csv("/work/ik1017/Catalogs/dkrz_cmip6_archive.csv.gz")
testcat=intake.open_esm_datastore(obj={"esmcat":catconfig,"df":df})
subset=testcat.search(source_id="MPI-ESM1-2-LR",
               experiment_id="ssp370",
               variable_id="tas",
              table_id="Amon")
subset.to_dataset_dict(xarray_open_kwargs=dict(engine="h5netcdf"))

The issue here is how intake-esm creates the datasets. As mentioned in #6 (comment) intake-esm opens every catalog entry independently. Because the subsets refer to non-unique tar-archives, one tar-file might be opened by several intake-esm calls at the same time.

In [4]: testcat['ScenarioMIP.MPI-ESM1-2-LR.ssp370.Amon.gn'].df.iloc[0]["uri"]
Out[4]: 'tar://./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r10i1p1f1/Amon/cct/gn/v20190710/cct_Amon_MPI-ESM1-2-LR_ssp370_r10i1p1f1_gn_201501-203412.nc::slk:///arch/ik1017/cmip6/CMIP6/ScenarioMIP_3964.tar'

In [5]: testcat['ScenarioMIP.MPI-ESM1-2-LR.ssp370.Amon.gn'].df.iloc[1]["uri"]
Out[5]: 'tar://./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r10i1p1f1/Amon/cct/gn/v20190710/cct_Amon_MPI-ESM1-2-LR_ssp370_r10i1p1f1_gn_203501-205412.nc::slk:///arch/ik1017/cmip6/CMIP6/ScenarioMIP_3964.tar'

In [6]: testcat['ScenarioMIP.MPI-ESM1-2-LR.ssp370.Amon.gn'].df.iloc[2]["uri"]
Out[6]: 'tar://./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r10i1p1f1/Amon/cct/gn/v20190710/cct_Amon_MPI-ESM1-2-LR_ssp370_r10i1p1f1_gn_205501-207412.nc::slk:///arch/ik1017/cmip6/CMIP6/ScenarioMIP_3964.tar'

In addition, and this is certainly something to fix upstream, because also a local tar file cannot be opened with intake-esm independent of slkspec.
After retrieving the tar-files e.g. with slkspec, remove the slkspec protocol from the uri and load them directly from /scratch with:

subset.df["uri"] = subset.df["uri"].replace("slk://","file:///scratch/<path>/<to>/<SLK-CACHE>")
subset.to_dataset_dict()
Error message
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:240, in ESMDataSource._open_dataset(self)
    220 datasets = [
    221     _open_dataset(
    222         record[self.path_column_name],
   (...)
    237     for _, record in self.df.iterrows()
    238 ]
--> 240 datasets = dask.compute(*datasets)
    241 if len(datasets) == 1:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
598 postcomputes.append(x.dask_postcompute())
--> 600 results = schedule(dsk, keys, **kwargs)
601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb)
318 raise exc.with_traceback(tb)
--> 319 raise exc

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/utils.py:71, in apply(func, args, kwargs)
70 if kwargs:
---> 71 return func(*args, **kwargs)
72 else:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:67, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format)
66 # Handle multi-file datasets with xr.open_mfdataset()
---> 67 if '*' in url or isinstance(url, list):
68 # How should we handle concat_dim, and other xr.open_mfdataset kwargs?
69 xarray_open_kwargs.update(preprocess=preprocess)

TypeError: argument of type 'TarContainedFile' is not iterable

Maybe you can come up with a minimal example and raise an issue upstream. After that is fixed, we can see what we are still missing here.

@observingClouds
Copy link
Owner Author

The queuing actually seems to work correctly now when patched with #18 . Therefore, I change the title.

@observingClouds observingClouds changed the title Issues with intake-esm: allow retrieval queue to be provided to slkspec Issues with intake-esm Dec 13, 2022
@observingClouds observingClouds added upstream and removed enhancement New feature or request labels Dec 13, 2022
@wachsylon
Copy link

The arguments you were using do not exist in the current intake-esm version:

I am sorry - the most recent version has changed keywords of the arguments and I eventually have to update the entire intake-esm workflow which will be... annoying...
It is actually another reason to rather support intake-xarray than intake-esm.

one tar-file might be opened by several intake-esm calls at the same time.

But how can that be a problem if it works when I apply your idea replace("slk://","file:// .... )`? Isn't intake-esm calling tars several times in that case as well?

@observingClouds
Copy link
Owner Author

But how can that be a problem if it works when I apply your idea replace("slk://","file:// .... )`? Isn't intake-esm calling tars several times in that case as well?

Sry, maybe I wasn't clear. I meant that this is not working either and just an option to test intake-esm for tar-files. Or are you saying that this is working with your intake-esm version?

@wachsylon
Copy link

It is working.

@wachsylon
Copy link

@observingClouds

After the retrieval when tars exist, this is working:

import intake
import json
import pandas as pd
with open("/pool/data/Catalogs/dkrz_cmip6_disk.json") as f:
    catconfig=json.load(f)
testcat=intake.open_esm_datastore(esmcol_obj=pd.read_csv("/work/ik1017/Catalogs/dkrz_cmip6_archive.csv.gz"),
                                  esmcol_data=catconfig)
subset=testcat.search(source_id="MPI-ESM1-2-LR",
               experiment_id="ssp370",
               variable_id="tas",
              table_id="Amon")
import os
if "slk" not in os.environ["PATH"]:
    os.environ["PATH"]=os.environ["PATH"]+":/sw/spack-levante/slk-3.3.67-jrygfs/bin/:/sw/spack-levante/openjdk-17.0.0_35-k5o6dr/bin"
SLK_CACHE="/scratch/k/k204210/INTAKE"
%env SLK_CACHE={SLK_CACHE}
subset._df["uri"]=subset._df["uri"].str.replace("slk","file").str.replace("/arch","/scratch/k/k204210/INTAKE/arch")
subset.to_dataset_dict(cdf_kwargs=dict(engine="h5netcdf"))

@wachsylon
Copy link

which is the same old outdated code but with the extra line

subset._df["uri"]=subset._df["uri"].str.replace("slk","file").str.replace("/arch","/scratch/k/k204210/INTAKE/arch")

@observingClouds
Copy link
Owner Author

Interesting! It fails for me. Which version of intake-esm are you using? Could you try if this still works with 2022.9.18?

@wachsylon
Copy link

wachsylon commented Dec 15, 2022

With the recent version, I end up with

---> 67 if '*' in url or isinstance(url, list):
     68     # How should we handle concat_dim, and other xr.open_mfdataset kwargs?
     69     xarray_open_kwargs.update(preprocess=preprocess)

TypeError: argument of type 'TarContainedFile' is not iterable

annoying

@observingClouds
Copy link
Owner Author

Great! Well, great that you can reproduce my issue, not so great that the feature that we need here had worked in the past. Would you mind opening an issue at intake-esm and link it here? Which version did work for you?

@wachsylon
Copy link

2021.8.17

but extrapolating the version releases of intake-esm, we can expect the next one in 2024 :(

@wachsylon
Copy link

wachsylon commented Dec 15, 2022

I will open up an issue there!

Nevertheless, we could also try to get rid of

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/fsspec/implementations/tar.py:175, in TarContainedFile.seek(self, to, whence)
    174     raise ValueError("Whence must be (0, 1, 2)")
--> 175 self.of.seek(to)

File /work/ik1017/CMIP6/meta/packems3/slkspec/slkspec/core.py:182, in SLKFile.seek(self, target)
    181     self._cache_files()
--> 182 return self._file_obj.seek(target)

ValueError: seek of closed file

@observingClouds
Copy link
Owner Author

@wachsylon I went ahead and raised an issue upstream.

@antarcticrainforest
Copy link
Contributor

antarcticrainforest commented Jan 9, 2023

The queuing actually seems to work correctly now when patched with #18 . Therefore, I change the title.

Is that really true? The GIL should take care of the thread lock. If the thread lock doesn't work properly we'll have to find a better way of implementing it.

@observingClouds
Copy link
Owner Author

Yes, this is true. I think we should create additional tests for all these cases that don't yet work. As a first step it would be okay if those tests require data on Levante and can only be executed there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants