Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Refactor #3

Merged
merged 15 commits into from
Dec 7, 2022
Merged

Conversation

antarcticrainforest
Copy link
Contributor

@antarcticrainforest antarcticrainforest commented Nov 29, 2022

This adds some refactoring to the code. The main goal of those changes is the ability to download files in bulk from the tape archive. On top of that, I added type annotations. I am not entirely sure about the purpose of the slkspec._version so before going into deep trouble and adding annotations I wanted to make sure this is needed at all.

I will also add basic unit tests and mypy type checking, hence the draft.

@antarcticrainforest antarcticrainforest marked this pull request as draft November 29, 2022 07:02
@antarcticrainforest antarcticrainforest marked this pull request as ready for review November 29, 2022 07:04
@antarcticrainforest
Copy link
Contributor Author

I have just added unit tests and type checks. @observingClouds I think you'd have to enable CI pipelines in order to run the tests.

@observingClouds
Copy link
Owner

This looks already very nice @antarcticrainforest. I just tried your PR and get an error when executing:

import os
os.environ["SLK_CACHE"] = "/scratch/m/m300408/retrieval/"
from intake import open_catalog
cat=open_catalog("https://raw.githubusercontent.com/observingClouds/tape_archive_index/main/catalog.yml")
ds=cat["EUREC4A_ICON-LES_highCCN_DOM02_surface_native"].to_dask()
AttributeError: 'NoneType' object has no attribute 'seek'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Input In [6], in ()
      1 from intake import open_catalog
      2 cat=open_catalog("https://raw.githubusercontent.com/observingClouds/tape_archive_index/main/catalog.yml")
----> 3 ds=cat["EUREC4A_ICON-LES_highCCN_DOM02_surface_native"].to_dask()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_xarray/base.py:69, in DataSourceMixin.to_dask(self)
67 def to_dask(self):
68 """Return xarray object where variables are dask arrays"""
---> 69 return self.read_chunked()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_xarray/base.py:44, in DataSourceMixin.read_chunked(self)
42 def read_chunked(self):
43 """Return xarray object (which will have chunks)"""
---> 44 self._load_metadata()
45 return self._ds

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake/source/base.py:285, in DataSourceBase._load_metadata(self)
283 """load metadata only if needed"""
284 if self._schema is None:
--> 285 self._schema = self._get_schema()
286 self.dtype = self._schema.dtype
287 self.shape = self._schema.shape

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_xarray/base.py:18, in DataSourceMixin._get_schema(self)
15 self.urlpath = self._get_cache(self.urlpath)[0]
17 if self._ds is None:
---> 18 self._open_dataset()
20 metadata = {
21 'dims': dict(self._ds.dims),
22 'data_vars': {k: list(self._ds[k].coords)
23 for k in self._ds.data_vars.keys()},
24 'coords': tuple(self._ds.coords.keys()),
25 }
26 if getattr(self, 'on_server', False):

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_xarray/xzarr.py:46, in ZarrSource._open_dataset(self)
44 self._ds = xr.open_mfdataset(self.urlpath, **kw)
45 else:
---> 46 self._ds = xr.open_dataset(self.urlpath, **kw)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/api.py:539, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, backend_kwargs, **kwargs)
527 decoders = _resolve_decoders_kwargs(
528 decode_cf,
529 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...)
535 decode_coords=decode_coords,
536 )
538 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 539 backend_ds = backend.open_dataset(
540 filename_or_obj,
541 drop_variables=drop_variables,
542 **decoders,
543 **kwargs,
544 )
545 ds = _dataset_from_backend_dataset(
546 backend_ds,
547 filename_or_obj,
(...)
555 **kwargs,
556 )
557 return ds

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/zarr.py:848, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel)
828 def open_dataset(
829 self,
830 filename_or_obj,
(...)
844 stacklevel=3,
845 ):
847 filename_or_obj = _normalize_path(filename_or_obj)
--> 848 store = ZarrStore.open_group(
849 filename_or_obj,
850 group=group,
851 mode=mode,
852 synchronizer=synchronizer,
853 consolidated=consolidated,
854 consolidate_on_close=False,
855 chunk_store=chunk_store,
856 storage_options=storage_options,
857 stacklevel=stacklevel + 1,
858 )
860 store_entrypoint = StoreBackendEntrypoint()
861 with close_on_error(store):

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/zarr.py:400, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel)
397 raise FileNotFoundError(f"No such file or directory: '{store}'")
398 elif consolidated:
399 # TODO: an option to pass the metadata_key keyword
--> 400 zarr_group = zarr.open_consolidated(store, **open_kwargs)
401 else:
402 zarr_group = zarr.open_group(store, **open_kwargs)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/zarr/convenience.py:1300, in open_consolidated(store, metadata_key, mode, **kwargs)
1297 metadata_key = 'meta/root/consolidated/' + metadata_key
1299 # setup metadata store
-> 1300 meta_store = ConsolidatedStoreClass(store, metadata_key=metadata_key)
1302 # pass through
1303 chunk_store = kwargs.pop('chunk_store', None) or store

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/zarr/storage.py:2861, in ConsolidatedMetadataStore.init(self, store, metadata_key)
2858 self.store = Store._ensure_store(store)
2860 # retrieve consolidated metadata
-> 2861 meta = json_loads(self.store[metadata_key])
2863 # check format of consolidated metadata
2864 consolidated_format = meta.get('zarr_consolidated_format', None)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/zarr/storage.py:1376, in FSStore.getitem(self, key)
1374 key = self._normalize_key(key)
1375 try:
-> 1376 return self.map[key]
1377 except self.exceptions as e:
1378 raise KeyError(key) from e

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/fsspec/mapping.py:143, in FSMap.getitem(self, key, default)
141 k = self._key_to_str(key)
142 try:
--> 143 result = self.fs.cat(k)
144 except self.missing_exceptions:
145 if default is not None:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/preffs/core.py:252, in PRefFileSystem.cat(self, path, recursive, on_error, **kwargs)
250 for p in paths:
251 try:
--> 252 out[p] = self.cat_file(p, **kwargs)
253 except Exception as e:
254 if on_error == "raise":

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/preffs/core.py:137, in PRefFileSystem.cat_file(self, path, start, end, **kwargs)
135 yield piece
136 try:
--> 137 return b''.join(gen())
138 except KeyError:
139 raise FileNotFoundError(path)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/preffs/core.py:133, in PRefFileSystem.cat_file..gen()
131 if isinstance(piece, tuple):
132 protocol, piece_path, piece_start, piece_end = piece
--> 133 yield self.get_fs(protocol).cat_file(piece_path, start=piece_start, end=piece_end)
134 else:
135 yield piece

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/fsspec/spec.py:725, in AbstractFileSystem.cat_file(self, path, start, end, **kwargs)
723 if start is not None:
724 if start >= 0:
--> 725 f.seek(start)
726 else:
727 f.seek(max(0, f.size + start))

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/slkspec/core.py:181, in SLKFile.seek(self, target)
179 def seek(self, target: int) -> int: # type: ignore
180 if self._file_obj is None:
--> 181 return self._file_obj.seek(target) # type: ignore
182 self._cache_files()
183 return self._file_obj.seek(target)

AttributeError: 'NoneType' object has no attribute 'seek'

search_str = pyslk.slk_search(pyslk.slk_gen_file_query(inp_files))
search_id_re = re.search("Search ID: [0-9]*", search_str)
if search_id_re is None:
raise ValueError("No files found in archive.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be rather

raise ValueError("Search threw an unexpected error.")

slkspec/core.py Outdated
if search_id_re is None:
raise ValueError("No files found in archive.")
search_id = int(search_id_re.group(0)[11:])
logger.critical("Retrieving files for search id: %i", search_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this line, I would add a check on how many results were found.

(a) slk search returns as search_id even if no files were found. We have a feature request to StrongLink: "slk search: not return a search_id if no results are found". But, this feature request has a low priority and I don't know when/if it will come.

(b) Currently, pyslk.slk_list throws an error if it receives a search_id with no search results. This is so far OK (from my point of view) because the pyslk.slk_* wrappers should be dumb wrappers. However, there is a parsers.slk_list_formatted which returns a Pandas.DataFrame. This should handle the error properly and return an empty DataFrame. I'll try to fix this today evening. Question to @antarcticrainforest would be: Do you want to catch the error of pyslk.slk_list or do you want to check the output of parsers.slk_list_formatted for emptiness?

The error message in the first case would be: ERROR: No resources found for given search id: 246936 (if search_id was 246936). It would be this error in total pyslk.exceptions.PySlkException: pyslk.slk_list: ERROR: No resources found for given search id: 246936

raise ValueError("No files found in archive.")
search_id = int(search_id_re.group(0)[11:])
logger.critical("Retrieving files for search id: %i", search_id)
pyslk.slk_retrieve(search_id, str(output_dir))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the compute, shared and interactive partitions, slk retrieve is allowed to retrieve 500 files at once. Thus, if more than 500 files are requested here, it should be split into several retrievals. @antarcticrainforest Or would you split up the file list to parts <501 anyway before calling this function? I'll try to implement this feature ("group_files_by_tape") to the slk_helpers as soon as possible. But currently, I am mainly bound to slk testing and user support. So, let's see ;-) .

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the login nodes we allow slk retrieve to retrieve one file with one call of slk retrieve. There is a StrongLink config file in /etc/stronglink.conf which is JSON and contains an attribute "retrieve_file_limit":1 (on login nodes) or "retrieve_file_limit":500 (on other nodes). This file could be imported somewhere to find out how many files are allowed to be retrieved. Maybe, this number is changed in future if needed.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to issue #8

retrieval_requests: Dict[Path, List[str]] = defaultdict(list)
logger.critical("Retrieving %i items from tape", len(retrieve_files))
for inp_file, out_dir in retrieve_files:
retrieval_requests[Path(out_dir)].append(inp_file)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might run a parsers.slk_exists_bool(inp_file) here for each file to check whether it really exists. It only works for explicite filenames (not regex or similar). This would prevent us from getting empty search id further below.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have an empty search, rather then checking each file individually and potentially slowing down the response.

@antarcticrainforest
Copy link
Contributor Author

This looks already very nice @antarcticrainforest. I just tried your PR and get an error when executing:

import os
os.environ["SLK_CACHE"] = "/scratch/m/m300408/retrieval/"
from intake import open_catalog
cat=open_catalog("https://raw.githubusercontent.com/observingClouds/tape_archive_index/main/catalog.yml")
ds=cat["EUREC4A_ICON-LES_highCCN_DOM02_surface_native"].to_dask()

AttributeError: 'NoneType' object has no attribute 'seek'

Thanks for the snippet, I've tried to reproduce this but failed. First I had to install an intake-xarray plugin and then I ran into the problem that the dependencies of preffs do not seem to be complete:

A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

I would suggest that we deal with getting the preffs to work later. Unless you see what the issue is.

Would you be able to provide an example that uses pure slk:// files?

slkspec/core.py Outdated Show resolved Hide resolved
slkspec/core.py Outdated Show resolved Hide resolved
slkspec/core.py Outdated Show resolved Hide resolved
slkspec/core.py Outdated Show resolved Hide resolved
slkspec/tests/conftest.py Outdated Show resolved Hide resolved
slkspec/core.py Outdated Show resolved Hide resolved
retrieval_requests: Dict[Path, List[str]] = defaultdict(list)
logger.critical("Retrieving %i items from tape", len(retrieve_files))
for inp_file, out_dir in retrieve_files:
retrieval_requests[Path(out_dir)].append(inp_file)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have an empty search, rather then checking each file individually and potentially slowing down the response.

@observingClouds
Copy link
Owner

@antarcticrainforest

First I had to install an intake-xarray plugin and then I ran into the problem that the dependencies of preffs do not seem to be complete

The intake-xarray plugin is a not a dependency of preffs. My snippet just had a link to an intake catalog which contains a dataset that should be opened with the xarrray driver.

Concerning the missing parquet dependency you are right. I created an issue upstream. You just need to install one of them though and it should work ;)

slkspec/core.py Outdated Show resolved Hide resolved
@antarcticrainforest
Copy link
Contributor Author

mypy fails, I'll deal with this later.

Comment on lines +140 to +143
for output_dir, inp_files in retrieval_requests.items():
output_dir.mkdir(parents=True, exist_ok=True, mode=self.file_permissions)
logger.debug("Creating slk query for %i files", len(inp_files))
search_str = pyslk.slk_search(pyslk.slk_gen_file_query(inp_files))
Copy link
Owner

@observingClouds observingClouds Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not yet taking advantage of combining retrievals, right?
I imagine something like:

pyslk.slk_search(pyslk.slk_gen_file_query(retrieval_requests.values()))

(not looping over individual entries)
so that all files that are currently in the queue are gathered in a single search.

Coming back to my catalog example:

from intake import open_catalog
cat=open_catalog("https://raw.githubusercontent.com/observingClouds/tape_archive_index/main/catalog.yml")
ds=cat["EUREC4A_ICON-LES_highCCN_DOM02_surface_native"].to_dask()
ds[["u_10m","v_10m"]].isel(time=slice(0,1)).mean().compute() # this requests two tar files from tape

The log file currently shows:

/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
/scratch/m/m300408/retrieval/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface/EUREC4A_ICON-LES_highCCN_DOM02_surface_native.v_10m.007.tar
slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/experiments/EUREC4A/highCCN/output/DOM02/surface","$max_depth":1}},{"resources.name":{"$regex":"EUREC4A_ICON-LES_highCCN_DOM02_surface_native.u_10m.006.tar"}}]}'

So all files are requested, but only the first one "searched" and retrieved and eventually the next file will be requested separately.

I can merge this though and we can work on the remaining issues/features in finer increments/PRs. This PR works now similar to the initial version, but has of course also advanced already by a lot 👍

Copy link
Contributor Author

@antarcticrainforest antarcticrainforest Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a code perspective, the queue with the thread lock should be able to do this but @neumannd suggested this retrieval strategy. Because currently, we can only retrieve files residing in the same directory at once.

Suppose that you have:

/arch/foo/file1.txt
/arch/foo/file2.txt
/arch/foo/file3.txt
/arch/bar/file1.txt
/arch/bar/file2.txt
/arch/bar/file3.txt

Then everything that is in /arch/foo would be retrieved at once while /arch/bar would be processed in a separate request. That's why the two loops. The first gathers the unique dirs /arch/foo /arch/bar etc and then requests the files in those dirs.

What we maybe could do is process retrievals in a multi-process/thread pool. @neumannd what do you think about doing the pysl.slk_search and pyslk.slk_retrieve in parallel (essentially the second loop).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in my example above the files are all residing in the same directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I'll take a look at this, to see whats wrong.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antarcticrainforest With slk 3.3.67 the slk retrieve SEARCH_ID /target will recreate the whole path of each found file in /target. I forgot to mention that. Thus, if search_id 123456 found the files from the example above and if we were doing slk retrieve 123456 /target then we would get folders /target/arch/foo and /target/arch/bar (and the files would be copied into them accordingly).

Copy link
Owner

@observingClouds observingClouds Dec 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antarcticrainforest despite the example above, I think slkspec needs to be derived from the AsyncFileSystem class to support joined retrievals in everyday situations. Here is another example that does not have url chaining, but still fails:

import xarray as xr
xr.open_mfdataset("slk:///arch/mh0010/m300408/showcase/dataset.zarr", engine="zarr")

Copy link
Owner

@observingClouds observingClouds Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot completely test my example because I get the error PySlkException: pyslk.slk_search: search command is disabled

Traceback
In [2]: xr.open_mfdataset("slk:///arch/mh0010/m300408/showcase/dataset.zarr" ,engine="zarr", consolidated=False)
/home/m/m300408/.conda/envs/slkspec_dev/lib/python3.10/site-packages/fsspec/spec.py:76: UserWarning: The slk_cache variable nor the SLK_CACHE environmentvariable wasn't set. Falling back to default /scratch/m/m300408
  obj = super().__call__(*args, **kwargs)
/scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/.zgroup
slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr","$max_depth":1}},{"resources.name":{"$regex":".zgroup"}}]}'
---------------------------------------------------------------------------
PySlkException                            Traceback (most recent call last)
Input In [2], in ()
----> 1 xr.open_mfdataset("slk:///arch/mh0010/m300408/showcase/dataset.zarr" ,engine="zarr", consolidated=False)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/api.py:996, in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, data_vars, coords, combine, parallel, join, attrs_file, combine_attrs, **kwargs)
993 open_ = open_dataset
994 getattr_ = getattr
--> 996 datasets = [open_(p, **open_kwargs) for p in paths]
997 closers = [getattr_(ds, "_close") for ds in datasets]
998 if preprocess is not None:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/api.py:996, in (.0)
993 open_ = open_dataset
994 getattr_ = getattr
--> 996 datasets = [open_(p, **open_kwargs) for p in paths]
997 closers = [getattr_(ds, "_close") for ds in datasets]
998 if preprocess is not None:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/api.py:539, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, backend_kwargs, **kwargs)
527 decoders = _resolve_decoders_kwargs(
528 decode_cf,
529 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...)
535 decode_coords=decode_coords,
536 )
538 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 539 backend_ds = backend.open_dataset(
540 filename_or_obj,
541 drop_variables=drop_variables,
542 **decoders,
543 **kwargs,
544 )
545 ds = _dataset_from_backend_dataset(
546 backend_ds,
547 filename_or_obj,
(...)
555 **kwargs,
556 )
557 return ds

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/zarr.py:848, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel)
828 def open_dataset(
829 self,
830 filename_or_obj,
(...)
844 stacklevel=3,
845 ):
847 filename_or_obj = _normalize_path(filename_or_obj)
--> 848 store = ZarrStore.open_group(
849 filename_or_obj,
850 group=group,
851 mode=mode,
852 synchronizer=synchronizer,
853 consolidated=consolidated,
854 consolidate_on_close=False,
855 chunk_store=chunk_store,
856 storage_options=storage_options,
857 stacklevel=stacklevel + 1,
858 )
860 store_entrypoint = StoreBackendEntrypoint()
861 with close_on_error(store):

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/xarray/backends/zarr.py:402, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel)
400 zarr_group = zarr.open_consolidated(store, **open_kwargs)
401 else:
--> 402 zarr_group = zarr.open_group(store, **open_kwargs)
403 return cls(
404 zarr_group,
405 mode,
(...)
409 safe_chunks,
410 )

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/zarr/hierarchy.py:1371, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, meta_array)
1368 # ensure store is initialized
1370 if mode in ['r', 'r+']:
-> 1371 if not contains_group(store, path=path):
1372 if contains_array(store, path=path):
1373 raise ContainsArrayError(path)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/zarr/storage.py:117, in contains_group(store, path, explicit_only)
115 store_version = getattr(store, '_store_version', 2)
116 if store_version == 2 or explicit_only:
--> 117 return key in store
118 else:
119 if key in store:

File ~/.conda/envs/slkspec_dev/lib/python3.10/_collections_abc.py:825, in Mapping.contains(self, key)
823 def contains(self, key):
824 try:
--> 825 self[key]
826 except KeyError:
827 return False

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/zarr/storage.py:724, in KVStore.getitem(self, key)
723 def getitem(self, key):
--> 724 return self._mutable_mapping[key]

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/fsspec/mapping.py:143, in FSMap.getitem(self, key, default)
141 k = self._key_to_str(key)
142 try:
--> 143 result = self.fs.cat(k)
144 except self.missing_exceptions:
145 if default is not None:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/fsspec/spec.py:823, in AbstractFileSystem.cat(self, path, recursive, on_error, **kwargs)
821 return out
822 else:
--> 823 return self.cat_file(paths[0], **kwargs)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/fsspec/spec.py:732, in AbstractFileSystem.cat_file(self, path, start, end, **kwargs)
730 end = f.size + end
731 return f.read(end - f.tell())
--> 732 return f.read()

File ~/GitProjects/slkspec_dev/slkspec/core.py:207, in SLKFile.read(self, size)
200 """The the content of a file-stream.
201
202 size: int, default: -1
203 read at most size characters from the stream, -1 means everything
204 is read.
205 """
206 if self._file_obj is None:
--> 207 self._cache_files()
208 return self._file_obj.read(size)

File ~/GitProjects/slkspec_dev/slkspec/core.py:162, in SLKFile._cache_files(self)
160 items.append(self._file_queue.get())
161 self._file_queue.task_done()
--> 162 self._retrieve_items(items)
163 _ = self._file_queue.get()
164 self._file_queue.task_done()

File ~/GitProjects/slkspec_dev/slkspec/core.py:143, in SLKFile._retrieve_items(self, retrieve_files)
141 output_dir.mkdir(parents=True, exist_ok=True, mode=self.file_permissions)
142 logger.debug("Creating slk query for %i files", len(inp_files))
--> 143 search_str = pyslk.slk_search(pyslk.slk_gen_file_query(inp_files))
144 search_id_re = re.search("Search ID: [0-9]*", search_str)
145 if search_id_re is None:

File ~/GitProjects/pyslk/pyslk/pyslk.py:193, in slk_search(search_string, group, user, name, partial)
185 raise ValueError(
186 'slk.{mod}.{fun}: argument "search_string" needs to be "str" but got "{type}".'.format(
187 mod=modName, fun=funName, type=type(search_string).name
188 )
189 )
191 print(slk_call)
--> 193 return run_slk(slk_call, inspect.stack()[0][3])

File ~/GitProjects/pyslk/pyslk/utils.py:75, in run_slk(slk_call, fun, env)
71 raise PySlkException(
72 f"pyslk.{fun}: {str(e).split(':')[-1][2:-1]}: " "command not found"
73 )
74 if output.returncode != 0:
---> 75 raise PySlkException(
76 f"pyslk.{fun}: "
77 + output.stdout.decode("utf-8")
78 + " "
79 + output.stderr.decode("utf-8")
80 )
82 # get return value and ...
83 tmp_return = output.stdout.decode("utf-8")

PySlkException: pyslk.slk_search: search command is disabled

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@observingClouds It's fixed now. Sorry for that. Sometimes wrong config files are deployed the levante nodes.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @neumannd!
@antarcticrainforest, this seems to be a good test now:
The metadata and the coordinates are loaded via separate search commands as they are in different directories:

xr.open_mfdataset("slk:///arch/mh0010/m300408/showcase/dataset.zarr", engine="zarr")
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/.zmetadata
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/time/0
# slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr/time","$max_depth":1}},{"resources.name":{"$regex":"0"}}]}'
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/time/0
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/lat/0
# slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr/lat","$max_depth":1}},{"resources.name":{"$regex":"0"}}]}'
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/lon/0
# slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr/lon","$max_depth":1}},{"resources.name":{"$regex":"0"}}]}'
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/time/0

and the air-temperature is loaded with two commands, although being in one directory:

ds.air.mean().compute()
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/0.0.0
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/0.0.1
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/0.1.0
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/0.1.1
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/1.0.0
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/1.0.1
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/1.1.0
# /scratch/m/m300408/arch/mh0010/m300408/showcase/dataset.zarr/air/1.1.1
# slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr/air","$max_depth":1}},{"resources.name":{"$regex":"0.0.0"}}]}'
# slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr/air","$max_depth":1}},{"resources.name":{"$regex":"0.0.1|0.1.0|0.1.1|1.0.0|1.0.1|1.1.0|1.1.1"}}]}'

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to #10


## Current limitations (among others)
- tape retrievals are currently not combined, leading to single/inefficient requests
Copy link
Owner

@observingClouds observingClouds Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still true, so far ( see one of my other comments)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the _retrieve_items method gets only called once, at which point it has information on all items that need to be downloaded. So the retrievals are combined - somewhat. There are limitations though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do a pyslk.slk_recall(SEARCH_ID_ALL_FILES), first, and, then, the individual retrievals.

Comment on lines +275 to +276
"The slk_cache variable nor the SLK_CACHE environment"
"variable wasn't set. Falling back to default "
Copy link
Owner

@observingClouds observingClouds Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The slk_cache variable nor the SLK_CACHE environment"
"variable wasn't set. Falling back to default "
"Neither the slk_cache argument nor the SLK_CACHE environment "
"variable is set. Falling back to default "

@wachsylon
Copy link

wachsylon commented Dec 7, 2022

Note: this issue has been moved to #5 . Please discuss there
Just adding things from the chat:

Would be nice if mappers could work:

import fsspec
import os
if "slk" not in os.environ["PATH"]:
    os.environ["PATH"]=os.environ["PATH"]+":/sw/spack-levante/slk-3.3.67-jrygfs/bin/:/sw/spack-levante/openjdk-17.0.0_35-k5o6dr/bin"
SLK_CACHE="/scratch/k/k204210/INTAKE"
%env SLK_CACHE={SLK_CACHE}

a=fsspec.get_mapper("slk:///arch/ik1017/cmip6/CMIP6/")
b=fsspec.get_mapper(SLK_CACHE)
target_name="AerChemMIP_002.tar"
b[target_name]=a[target_name]

TypeError                                 Traceback (most recent call last)
Cell In [8], line 2
      1 target_name="AerChemMIP_002.tar"
----> 2 b[target_name]=a[target_name]

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/fsspec/mapping.py:163, in FSMap.__setitem__(self, key, value)
    161 key = self._key_to_str(key)
    162 self.fs.mkdirs(self.fs._parent(key), exist_ok=True)
--> 163 self.fs.pipe_file(key, maybe_convert(value))

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/fsspec/spec.py:737, in AbstractFileSystem.pipe_file(self, path, value, **kwargs)
    735 """Set the bytes of given file"""
    736 with self.open(path, "wb", **kwargs) as f:
--> 737     f.write(value)

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/fsspec/implementations/local.py:340, in LocalFileOpener.write(self, *args, **kwargs)
    339 def write(self, *args, **kwargs):
--> 340     return self.f.write(*args, **kwargs)

TypeError: a bytes-like object is required, not '_io.BufferedReader'

@wachsylon
Copy link

why did you choose to use 3 '/' after slk:? isnt it normal to have 2?

@wachsylon
Copy link

wachsylon commented Dec 7, 2022

Note: this issue has been moved to #6 . Please discuss there
On levante:

import intake
import json
import pandas as pd
with open("/pool/data/Catalogs/dkrz_cmip6_disk.json") as f:
    catconfig=json.load(f)
testcat=intake.open_esm_datastore(esmcol_obj=pd.read_csv("/home/k/k204210/intake-esm/catalogs/dkrz_cmip6_archive.csv.gz"),
                                  esmcol_data=catconfig)
subset=testcat.search(source_id="MPI-ESM1-2-LR",
               experiment_id="ssp370",
               variable_id="tas",
              table_id="Amon")
import os
if "slk" not in os.environ["PATH"]:
    os.environ["PATH"]=os.environ["PATH"]+":/sw/spack-levante/slk-3.3.67-jrygfs/bin/:/sw/spack-levante/openjdk-17.0.0_35-k5o6dr/bin"
SLK_CACHE="/scratch/k/k204210/INTAKE"
%env SLK_CACHE={SLK_CACHE}
subset.to_dataset_dict()

calls 33 slk retrieves which call 33 /sw/spack-levante/slk-3.3.67-jrygfs/lib/slk-cli-tools-3.3.67.jar retrieve (66 processes in total) for 10 unique tars from the same directory on hsm. that cant be right

@observingClouds
Copy link
Owner

observingClouds commented Dec 7, 2022

why did you choose to use 3 '/' after slk:? isnt it normal to have 2?

@wachsylon, see e.g. here

@observingClouds observingClouds merged commit 9f1228c into observingClouds:main Dec 7, 2022
@observingClouds
Copy link
Owner

observingClouds commented Dec 7, 2022

I merged this PR now because it has the features from the initial implementation and becomes more and more a very complex discussion on various feature requests. I couldn't edit this branch so I fix the tests in a separate PR.
I'll do my best to create individual issues on topics/ideas/improvements raised in this PR that have not been addressed yet.

@antarcticrainforest
Copy link
Contributor Author

I merged this PR now because it has the features from the initial implementation and becomes more and more a very complex discussion on various feature requests. I couldn't edit this branch so I fix the tests in a separate PR. I'll do my best to create individual issues on topics/ideas/improvements raised in this PR that have not been addressed yet.

Thanks Hauke,

I was about to suggest the same thing because I won't have much time to look at this this month (as you have probably noticed).

I'll be happy to assist from January onwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants