Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add copy to move_and_delete #233

Merged
merged 18 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ New features and enhancements
* File re-structuration from catalogs with ``xscen.catutils.build_path``. (:pull:`205`).
* New scripting functions `save_and_update` and `move_and_delete`. (:pull:`214`).
* Spatial dimensions can be generalized as X/Y when rechunking and will be mapped to rlon/rlat or lon/lat accordingly. (:pull:`221`).
* New argument `var_as_string` for `get_cat_attrs` to return variable names as strings. (:pull:`233`).
* New argument `copy` for `move_and_delete`. (:pull:`233`).
* New argument `restrict_year` for `compute_indicators`. (:pull:`233`).
* Add more comments in the template. (:pull:`233`, :issue:`232`).


Breaking changes
^^^^^^^^^^^^^^^^
Expand Down
44 changes: 38 additions & 6 deletions templates/1-basic_workflow_with_config/config1.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,43 @@
# example of a config for the workflow
# Example of a config file for the workflow
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliettelavoie
I added quite some detail here.
My suggestion would be to comment the first task similarly detailed. After that, subsequent tasks all follow a similar scheme of iteration loops, anyway.


# The contents of this file control the workflow. The highest level entries are the tasks to accomplish.
# Below the task level are the elements required for the task. For example, the task 'extract' interates
# over 'reconstruction' data and 'simulation' data, the two sections specifying what (data, variables, time periods)
# to extract and how (region, attributes, subsequent saving) to do that. For the 'regrid' tasks, among other,
# the 'input' and 'output' data are specified along with the methodology for the regridding process.

# This workflow is made to know exactly where to restart if the code is stopped.
# It looks if the result of a task exists in the project catalog before executing the task.
# Note that some information is 'outsourced' to a separate 'paths1_example.yml' file. The contents of the two are
# merged in the CONFIG dictionary when the configuration is loaded. The configuration also contains the names
# of the 'properties1.yml' and 'indicators1.yml' files to draw their information for the respective tasks.

# The workflow is made to be able to pick up it's processing in any case where the code is stopped.
# To achieve this, it checks before executing a task if it's result exists in the project catalog.
# When a task is completed, the produced data files are added to the project catalog.
# The workflow does NOT automatically remove intermediate files. You might run out of space.

Copy link
Contributor Author

@juliettelavoie juliettelavoie Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vindelico Does this text answer the questions you had about the config ?
Is there more that should be included ?

As I said in #232, adding every possible arguments to functions in sections that are automatically passed seems overkill to me. You don't normally pass argument with just their defaults...

# The input information for the workflow listed in this configuration file is used in two different ways:
#
# 1) Direct 'manual' access in the code of the workflow, with usage like a dictionary (e.g. CONFIG["regrid"]["inputs"])
# No specific format is prescribed here, it can be anything you want. It would be good practice to put those entries
# under the associated module or task name, to find them more easily in your workflow.
#
# 2) Arguments that will be automatically passed to xscen functions.
# For example the dictionary in CONFIG["regrid"]["regrid_dataset"] will be passed to xscen.regrid_dataset(...).
# For this to work, the CONFIG must follow a specific structure.
# module:
# function:
# argument_1: value_a
# argument_2: value_b
# This only works on functions that are decorated with @parse_config in the xscen code.
# In these entries, only arguments that differ from the function defaults need to be specified.
# In this example configuration file, those instances are marked by "# automatically passed to the function."
#
# You can learn more about how the config works here: https://xscen.readthedocs.io/en/latest/notebooks/6_config.html

# List of task to accomplish in the workflow
# Each task will iterate over all simulations before moving on to the next task.
# It might be useful to start by running it once over all tasks on a single simulation
# (by modifying extract:simulation:search_data_catalogs:other_search_criteria:)
# (by modifying extract:simulation:search_data_catalogs:other_search_criteria:)
# to make sure all tasks work properly before launching it for all simulations.
tasks:
- extract # Extract the simulation and reference dataset with the right domain and period.
Expand All @@ -22,9 +51,12 @@ tasks:
- delta # Compute the deltas of the climatological means.
- ensembles # Compute the ensemble statistics on indicators, climatology and deltas.

# Task Arguments

# Task Arguments
extract:
# The following ('reconstruction') is NOT automatically passed to the function because
# the format is module:other_word:function (other_word=reconstruction).
# It is dealt with manually in workflow1.py, building a loop over all entries below 'extract:'.
reconstruction:
dask:
n_workers: 2
Expand Down Expand Up @@ -126,7 +158,7 @@ regrid:
output:
type: reconstruction
processing_level: extracted
regrid_dataset: # this will be automatically passed to the function.
regrid_dataset: # automatically passed to the function (module name is regrid)
regridder_kwargs:
method: bilinear
extrap_method: inverse_dist
Expand Down
4 changes: 4 additions & 0 deletions templates/1-basic_workflow_with_config/workflow1.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
from xscen.config import CONFIG

# Load configuration
# paths1.yml is used to add private information to your workflow, such as file paths, without running the risk of them being pushed to a public Github repo.
# For this to work as intended, 'paths1.yml' should be included in your .gitignore.
# config1.yml (or any number of those) can then contain the rest of the configuration.
# All configuration files are merged together into a single CONFIG instance when you call xs.load_config
xs.load_config(
"paths1.yml", "config1.yml", verbose=(__name__ == "__main__"), reset=True
)
Expand Down
24 changes: 24 additions & 0 deletions tests/test_indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,27 @@ def test_multiple_outputs(self, periods):
"rain_season_length",
]
)

@pytest.mark.parametrize("restrict_years", [True, False])
def test_as_jul(self, restrict_years):
indicator = xclim.core.indicator.Indicator.from_dict(
data={"base": "freezing_degree_days", "parameters": {"freq": "AS-JUL"}},
identifier="degree_days_below_0_annual_start_july",
module="tests",
)
ind_dict = xs.compute_indicators(
self.ds,
indicators=[("degree_days_below_0_annual_start_july", indicator)],
restrict_years=restrict_years,
)

assert "AS-JUL" in ind_dict["AS-JUL"].attrs["cat:xrfreq"]
out = ind_dict["AS-JUL"]
if restrict_years:
assert len(out.time) == 3 # same length as input ds
assert out.time[0].dt.strftime("%Y-%m-%d").item() == "2001-07-01"
assert out.time[-1].dt.strftime("%Y-%m-%d").item() == "2003-07-01"
else:
assert len(out.time) == 4
assert out.time[0].dt.strftime("%Y-%m-%d").item() == "2000-07-01"
assert out.time[-1].dt.strftime("%Y-%m-%d").item() == "2003-07-01"
16 changes: 15 additions & 1 deletion tests/test_scripting.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ def test_move_and_delete(self):
)
cat.update()

# f2 should be moved to dir2 and f1 should be deleted (not in row 0 anymore)
# f2 should be moved to dir2 and dir1 should be deleted (not in row 0 anymore)
assert cat.df.path[0] == root + "/dir2/test_r1i1p1f2.zarr"
assert len(cat.df) == 1 # only one file left

sc.move_and_delete(
moving=[
[root + "/dir2/test_r1i1p1f2.zarr", root + "/dir1/test_r1i1p1f2.zarr"]
],
pcat=cat,
copy=True,
)
cat.update()

# f2 should be copied to dir1 and f1 should still exist
assert cat.df.path[0] == root + "/dir2/test_r1i1p1f2.zarr"
assert cat.df.path[1] == root + "/dir1/test_r1i1p1f2.zarr"
assert len(cat.df) == 2 # only one file left
38 changes: 38 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
import pandas as pd
import pytest
from xclim.testing.helpers import test_timeseries as timeseries

import xscen as xs

Expand Down Expand Up @@ -40,3 +41,40 @@ def test_normal(self, date, end_of_period, dtype, exp):
assert pd.isna(out)
else:
assert out == exp


class TestScripting:
ds = timeseries(
np.tile(np.arange(1, 2), 50),
variable="tas",
start="2000-01-01",
freq="AS-JAN",
as_dataset=True,
)
ds.attrs = {
"cat:type": "simulation",
"cat:processing_level": "raw",
"cat:variable": ("tas",),
"dog:source": "CanESM5",
}

@pytest.mark.parametrize(
"prefix, var_as_str", [["cat:", False], ["cat:", True], ["dog:", True]]
)
def test_get_cat_attrs(self, prefix, var_as_str):
out = xs.utils.get_cat_attrs(self.ds, prefix=prefix, var_as_str=var_as_str)

if var_as_str and prefix == "cat:":
assert out == {
"type": "simulation",
"processing_level": "raw",
"variable": "tas",
}
elif not var_as_str and prefix == "cat:":
assert out == {
"type": "simulation",
"processing_level": "raw",
"variable": ("tas",),
}
elif prefix == "dog:":
assert out == {"source": "CanESM5"}
13 changes: 11 additions & 2 deletions xscen/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from xscen.config import parse_config

from .catutils import parse_from_ds
from .utils import CV, standardize_periods

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,6 +66,7 @@ def compute_indicators(
],
*,
periods: list = None,
restrict_years: bool = True,
to_level: str = "indicators",
) -> Union[dict, xr.Dataset]:
"""Calculate variables and indicators based on a YAML call to xclim.
Expand All @@ -85,6 +87,11 @@ def compute_indicators(
periods : list
Either [start, end] or list of [start, end] of continuous periods over which to compute the indicators. This is needed when the time axis of ds contains some jumps in time.
If None, the dataset will be considered continuous.
restrict_years:
If True, cut the time axis to be within the same years as the input.
This is mostly useful for frequencies that do not start in January, such as QS-DEC.
In that instance, `xclim` would start on previous_year-12-01 (DJF), with a NaN. `restrict_years` will cut that first timestep.
This should have no effect on YS and MS indicators.
to_level : str, optional
The processing level to assign to the output.
If None, the processing level of the inputs is preserved.
Expand Down Expand Up @@ -193,7 +200,7 @@ def _infer_freq_from_meta(ind):
if (out[c].attrs != ds[c].attrs) and (out[c].sizes == ds[c].sizes):
out[c].attrs = ds[c].attrs

if "time" in out.dims:
if restrict_years and "time" in out.dims:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test? What happens with something like AS-JUL ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test with as-jul to show.
Basically, with xclim, you have a nan in the first and the last position.
The first time is the year before the start of the input.
The last time is the same year as the end of the input.
With restrict_years= False, we keep the time steps for the 2 nans.
With restrict_years= True, we cut the first nan.

# cut the time axis to be within the same years as the input
# for QS-DEC, xclim starts on DJF with time previous_year-12-01 with a nan as values. We want to cut this.
# this should have no effect on YS and MS indicators
Expand All @@ -209,7 +216,9 @@ def _infer_freq_from_meta(ind):
out_dict[key] = out
# TODO: Double-check History, units, attrs, add missing variables (grid_mapping), etc.
out_dict[key].attrs = ds.attrs
out_dict[key].attrs.pop("cat:variable", None)
out_dict[key].attrs["cat:variable"] = parse_from_ds(
out_dict[key], ["variable"]
)["variable"]
out_dict[key].attrs["cat:xrfreq"] = freq
out_dict[key].attrs["cat:frequency"] = CV.xrfreq_to_frequency(freq, None)
if to_level is not None:
Expand Down
27 changes: 23 additions & 4 deletions xscen/scripting.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import warnings
from contextlib import contextmanager
from distutils.dir_util import copy_tree
from email.message import EmailMessage
from io import BytesIO
from pathlib import Path
Expand Down Expand Up @@ -395,7 +396,9 @@ def save_and_update(

# get path
if path is not None:
path = str(path).format(**get_cat_attrs(ds)) # fill path with attrs
path = str(path).format(
**get_cat_attrs(ds, var_as_str=True)
) # fill path with attrs
else: # if path is not given build it
build_path_kwargs.setdefault("format", file_format)
from .catutils import build_path
Expand All @@ -420,7 +423,7 @@ def save_and_update(
logger.info(f"File {path} has saved succesfully and the catalog was updated.")


def move_and_delete(moving, pcat, deleting):
def move_and_delete(moving, pcat, deleting=None, copy=False):
"""
First, move files, then update the catalog with new locations. Finally, delete directories.

Expand All @@ -435,14 +438,28 @@ def move_and_delete(moving, pcat, deleting):
deleting: list
list of directories to be deleted including all contents and recreated empty.
E.g. the working directory of a workflow.
copy: bool
If True, copy directories instead of moving them.

"""
if isinstance(moving, list) and isinstance(moving[0], list):
for files in moving:
source, dest = files[0], files[1]
if Path(source).exists():
logger.info(f"Moving {source} to {dest}.")
sh.move(source, dest)
if copy:
logger.info(f"Copying {source} to {dest}.")
copied_files = copy_tree(source, dest)
for f in copied_files:
# copied files don't include zarr files
if f[-16:] == ".zarr/.zmetadata":
ds = xr.open_dataset(f[:-11])
pcat.update_from_ds(ds=ds, path=f[:-11])
if f[-3:] == ".nc":
ds = xr.open_dataset(f)
pcat.update_from_ds(ds=ds, path=f)
else:
logger.info(f"Moving {source} to {dest}.")
sh.move(source, dest)
if Path(dest).suffix in [".zarr", ".nc"]:
ds = xr.open_dataset(dest)
pcat.update_from_ds(ds=ds, path=dest)
Expand All @@ -458,5 +475,7 @@ def move_and_delete(moving, pcat, deleting):
logger.info(f"Deleting content inside {dir_to_delete}.")
sh.rmtree(dir_to_delete)
os.mkdir(dir_to_delete)
elif deleting is None:
pass
else:
raise ValueError("`deleting` should be a list.")
20 changes: 18 additions & 2 deletions xscen/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ def natural_sort(_list: list):
return sorted(_list, key=alphanum_key)


def get_cat_attrs(ds: Union[xr.Dataset, dict], prefix: str = "cat:") -> dict:
def get_cat_attrs(
ds: Union[xr.Dataset, dict], prefix: str = "cat:", var_as_str=False
) -> dict:
"""Return the catalog-specific attributes from a dataset or dictionary.

Parameters
Expand All @@ -383,6 +385,8 @@ def get_cat_attrs(ds: Union[xr.Dataset, dict], prefix: str = "cat:") -> dict:
Dataset to be parsed.
prefix: str
Prefix automatically generated by intake-esm. With xscen, this should be 'cat:'
var_as_str: bool
If True, variable will be returned as a string if there is only one.

Returns
-------
Expand All @@ -394,7 +398,19 @@ def get_cat_attrs(ds: Union[xr.Dataset, dict], prefix: str = "cat:") -> dict:
attrs = ds.attrs
else:
attrs = ds
return {k[len(prefix) :]: v for k, v in attrs.items() if k.startswith(f"{prefix}")}
facets = {
k[len(prefix) :]: v for k, v in attrs.items() if k.startswith(f"{prefix}")
}

# to be usable in a path
if (
var_as_str
and "variable" in facets
and not isinstance(facets["variable"], str)
and len(facets["variable"]) == 1
):
facets["variable"] = facets["variable"][0]
return facets


@parse_config
Expand Down