Skip to content

Commit

Permalink
[ENH]: Allow for matching on the full path
Browse files Browse the repository at this point in the history
This adds a parameter to `Storage.walk`, allowing users to specif
whether the `matches` applies just to the filename or to the full
path, including the filename.

The same keyword is added to the task and task input, to let this be
set in the datasset.yaml.

Recent changes to the data published in the ecmwf container requires a
new feature in pctasks' create-chunks: the ability to match on a full
path rather than just a filename.

Data are being published under both the

    - <date-prefix>/ifs/...
    - <date-prefix>/aifs/...

We list both sets of data, but our stactools package can't handle the
aifs data properly yet. So we want to filter it out.

Previously, `matches` could only target the filename being listed.
We need that to filter to specific products. We want to extend that
match expression to include the prefix, so that we can filter out the
`aifs` data.
  • Loading branch information
Tom Augspurger committed Jun 3, 2024
1 parent 4445a56 commit afcf0f8
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pctasks/core/pctasks/core/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def walk(
matches: Optional[str] = None,
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
"""
Recursively walk storage.
Expand All @@ -87,6 +88,9 @@ def walk(
matches: Optional regex that path must match
walk_limit: Limit the number of times to yield
file_limit: Limit the number of files returned
match_full_path: bool, default False
Whether to match on just the file name segment of the path (the default) or
the entire path, including the base path.
Returns:
Generator of (path, files, folders) tuples. Similar to os.walk. Lists
Expand Down
13 changes: 11 additions & 2 deletions pctasks/core/pctasks/core/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ def walk(
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
max_concurrency: int = 32,
match_full_path: bool = False,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
# Ensure UTC set
since_date = map_opt(lambda d: d.replace(tzinfo=timezone.utc), since_date)
Expand Down Expand Up @@ -556,9 +557,17 @@ def _get_prefix_content(

for future in concurrent.futures.as_completed(futures):
full_prefix = futures[future]
folders, files = future.result()
folders, unfiltered_files = future.result()

files = [file for file in files if path_filter(file)]
files = []

for file in unfiltered_files:
if match_full_path:
match_on = "/".join([full_prefix.rstrip("/"), file])
else:
match_on = file
if path_filter(match_on):
files.append(file)

if file_limit and file_count + len(files) > file_limit:
files = files[: file_limit - file_count]
Expand Down
6 changes: 5 additions & 1 deletion pctasks/core/pctasks/core/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def walk(
matches: Optional[str] = None,
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
def _get_depth(path: str) -> int:
relpath = os.path.relpath(path, self.base_dir)
Expand All @@ -102,7 +103,10 @@ def _filter_file(root: str, p: str) -> bool:
if since_date:
if since_date > Datetime.fromtimestamp(os.path.getmtime(full_path)):
return False
return path_filter(p)
if match_full_path:
return path_filter(full_path)
else:
return path_filter(p)

for root, folders, files in os.walk(self.base_dir):

Expand Down
16 changes: 16 additions & 0 deletions pctasks/core/tests/storage/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,19 @@ def test_blob_download_timeout():
def test_maybe_rewrite_blob_storage_url(url, expected):
result = maybe_rewrite_blob_storage_url(url)
assert result == expected


def test_walk_match_full_path():
with temp_azurite_blob_storage(
HERE / ".." / "data-files" / "simple-assets"
) as storage:
result: Dict[str, Tuple[List[str], List[str]]] = {}
for root, folders, files in storage.walk(
matches="a/asset-.*.json", match_full_path=True
):
result[root] = (folders, files)

assert set(result.keys()) == {".", "a", "b"}
assert set(result["."][0]) == {"a", "b"}
assert set(result["a"][1]) == {"asset-a-1.json", "asset-a-2.json"}
assert set(result["b"][1]) == set()
12 changes: 12 additions & 0 deletions pctasks/core/tests/storage/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,15 @@ def test_fsspec_components():
storage = LocalStorage(HERE / ".." / "data-files" / "simple-assets")
assert storage.fsspec_storage_options == {}
assert storage.fsspec_path("foo/bar.csv") == "file://foo/bar.csv"


def test_walk_match_full_path():
storage = LocalStorage(HERE / ".." / "data-files" / "simple-assets")
subdirs = {
root: files
for root, _, files in storage.walk(
min_depth=1, max_depth=1, matches="a/asset-.*.json", match_full_path=True
)
}
assert subdirs["a"] == ["asset-a-1.json", "asset-a-2.json"]
assert subdirs["b"] == []
1 change: 1 addition & 0 deletions pctasks/dataset/pctasks/dataset/chunks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def create_chunks(
file_limit=input.options.limit,
max_depth=input.options.max_depth,
min_depth=input.options.min_depth,
match_full_path=input.options.match_full_path,
):
if input.options.list_folders:
gen = folders
Expand Down
3 changes: 3 additions & 0 deletions pctasks/dataset/pctasks/dataset/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class ChunkOptions(PCBaseModel):
matches: Optional[str] = None
"""Only include asset URIs that match this regex."""

match_full_path: bool = False
"""Whether to match on just the file name (the default) or the full path."""

limit: Optional[int] = None
"""Limit the number of URIs to process. """

Expand Down

0 comments on commit afcf0f8

Please sign in to comment.