diff --git a/CHANGES.md b/CHANGES.md index a371140cf..93e3600ef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,6 @@ ## Version 2.0.0.dev16 (in development) +* Added new operations `compute_dataset()` and `compute_data_frame()` [#703](https://github.com/CCI-Tools/cate/issues/703). * Fixed division by zero error in RGB tile generation if data min and max were equal * Allow displaying and working with CCI Sea Level MSLAMPH data. Addresses [#531](https://github.com/CCI-Tools/cate/issues/531). diff --git a/cate/ops/arithmetics.py b/cate/ops/arithmetics.py index 50083189b..603abfcc7 100644 --- a/cate/ops/arithmetics.py +++ b/cate/ops/arithmetics.py @@ -28,14 +28,23 @@ Functions ========= """ +import math +from typing import Dict, Any, Mapping, Sequence, Tuple +import geopandas +import geopandas as gpd +import numpy import numpy as np +import pandas import pandas as pd +import scipy +import scipy as sp +import xarray import xarray as xr from xarray import ufuncs as xu from cate.core.op import op, op_input, op_return -from cate.core.types import DatasetLike, ValidationError +from cate.core.types import DatasetLike, ValidationError, DataFrameLike from cate.util.monitor import Monitor from cate.util.safe import safe_exec @@ -160,13 +169,74 @@ def diff(ds: xr.Dataset, return diff +# noinspection PyIncorrectDocstring +@op(tags=['arithmetic'], version='1.0') +@op_input('df', data_type=DataFrameLike, nullable=True, default_value=None) +@op_input('script', script_lang="python") +@op_input('copy') +@op_input('_ctx', context=True) +def compute_data_frame(df: DataFrameLike.TYPE, + script: str, + copy: bool = False, + _ctx: dict = None, + monitor: Monitor = Monitor.NONE) -> pd.DataFrame: + """ + Compute a new data frame from the given Python *script*. + The argument *script* must be valid Python code or a single expression comprising at least one + value assignment of the form {name} = {expr}. Multiple assignments can be done on multiple lines + or on a single line separated by semicolons. + + {expr} may reference variables in the given context dataset *ds* or other resources and their variables + from the current workflow. + In the latter case, use the dot operator to select a variable from a dataset resource. + + Any new variable in *script* whose name does not begin with and underscore ('_') and + that has an appropriate data type will be added to the new data frame. + + The following packages are available in the *script*: + + * ``geopandas``, ``gpd``: The ``geopandas`` top-level package (http://geopandas.org/) + * ``math``: The standard Python ``math`` library (https://docs.python.org/3/library/math.html) + * ``numpy``, ``np``: The ``numpy`` top-level package (https://docs.scipy.org/doc/numpy/reference/) + * ``pandas``, ``pd``: The ``pandas`` top-level package (http://pandas.pydata.org/pandas-docs/stable/api.html) + * ``scipy``, ``sp``: The ``scipy`` top-level package (https://docs.scipy.org/doc/scipy/reference/) + * ``xarray``, ``xr``: The ``xarray`` top-level package (http://xarray.pydata.org/en/stable/api.html) + + :param df: Optional context data frame. If provided, all series of this data frame are + directly accessible in the *script*. + If omitted, all series (variables) of other data frame (dataset) resources need to be prefixed + by their resource name. + :param script: Valid Python expression comprising at least one assignment of the form {name} = {expr}. + :param copy: Whether to copy all series from *df*. + :param monitor: An optional progress monitor. + :return: A new data frame object. + """ + series = _exec_script(script, (pd.Series, np.ndarray, float, int), _ctx, df, monitor) + + if df is not None and copy: + new_df = df.copy() + for name, data in series.items(): + new_df[name] = data + else: + max_size = 0 + for data in series.values(): + try: + size = len(data) + except TypeError: + size = 1 + max_size = max(max_size, size) + index = pd.Int64Index(data=np.arange(max_size), name='id') + new_df = pd.DataFrame(data=series, index=index) + + return new_df + + # noinspection PyIncorrectDocstring @op(tags=['arithmetic'], version='1.0') @op_input('ds', data_type=DatasetLike, nullable=True, default_value=None) @op_input('script', script_lang="python") @op_input('copy') @op_input('_ctx', context=True) -# @op_return(add_history=True) def compute_dataset(ds: DatasetLike.TYPE, script: str, copy: bool = False, @@ -175,65 +245,91 @@ def compute_dataset(ds: DatasetLike.TYPE, """ Compute a new dataset from the given Python *script*. The argument *script* must be valid Python code or a single expression comprising at least one - value assignment of the form = . Multiple assignments can be done on multiple lines + value assignment of the form {name} = {expr}. Multiple assignments can be done on multiple lines or on a single line separated by semicolons. - may reference variables in the given context dataset *ds* or other resources and their variables + {expr} may reference variables in the given context dataset *ds* or other resources and their variables from the current workflow. In the latter case, use the dot operator to select a variable from a dataset resource. - Every new variable in *script* of type data array will be added to the new dataset. + Any new variable in *script* whose name does not begin with and underscore ('_') and + that has an appropriate data type will be added to the new dataset. - The following packages are available in the code: + The following packages are available in the *script*: - * ``np``: The ``numpy`` top-level package (https://docs.scipy.org/doc/numpy/reference/) - * ``pd``: The ``pandas`` top-level package (http://pandas.pydata.org/pandas-docs/stable/api.html) - * ``xr``: The ``xarray`` top-level package (http://xarray.pydata.org/en/stable/api.html) - * ``xu``: The ``xarray.ufuncs`` package (http://xarray.pydata.org/en/stable/api.html#universal-functions) + * ``geopandas``, ``gpd``: The ``geopandas`` top-level package (http://geopandas.org/) + * ``math``: The standard Python ``math`` library (https://docs.python.org/3/library/math.html) + * ``numpy``, ``np``: The ``numpy`` top-level package (https://docs.scipy.org/doc/numpy/reference/) + * ``pandas``, ``pd``: The ``pandas`` top-level package (http://pandas.pydata.org/pandas-docs/stable/api.html) + * ``scipy``, ``sp``: The ``scipy`` top-level package (https://docs.scipy.org/doc/scipy/reference/) + * ``xarray``, ``xr``: The ``xarray`` top-level package (http://xarray.pydata.org/en/stable/api.html) - Note, in contrast to the ``np`` package, all the math functions defined in ``xu`` will preserve variable attributes. - - :param ds: Optional context dataset. All variables of this dataset are directly accessible in the *script*. - If omitted, all variables need to be prefixed by their dataset resource names. - :param script: Valid Python expression comprising at least one assignment of the form = . + :param ds: Optional context dataset. If provided, all variables of this dataset are + directly accessible in the *script*. + If omitted, all variables (series) of other dataset (data frame) resources need to be prefixed + by their resource name. + :param script: Valid Python expression comprising at least one assignment of the form {name} = {expr}. :param copy: Whether to copy all variables from *ds*. :param monitor: An optional progress monitor. - :return: A new dataset. + :return: A new dataset object. """ + data_vars = _exec_script(script, (xr.DataArray, np.ndarray, float, int), _ctx, ds, monitor) - if _ctx is not None and 'value_cache' in _ctx: - orig_namespace = dict(_ctx['value_cache']) + if ds is not None and copy: + new_ds = ds.copy() + for name, data in data_vars.items(): + new_ds[name] = data else: - orig_namespace = dict() + new_ds = xr.Dataset(data_vars=data_vars) + + return new_ds - if ds is not None: - orig_namespace.update(ds.data_vars) - orig_namespace['np'] = np - orig_namespace['pd'] = pd - orig_namespace['xr'] = xr - orig_namespace['xu'] = xu +def _exec_script(script: str, + element_types: Tuple[type, ...], + operation_context: Mapping[str, Any] = None, + context_object: Mapping[str, Any] = None, + monitor: Monitor = Monitor.NONE) -> Dict[str, Any]: + """ + Helper for compute_dataset() and compute_data_frame(). + """ + if not script: + raise ValidationError(f'Python script must not be empty') + + # Include common libraries + orig_namespace = dict( + gpd=gpd, + geopandas=geopandas, + math=math, + np=np, + numpy=numpy, + pd=pd, + pandas=pandas, + sp=sp, + scipy=scipy, + xr=xr, + xarray=xarray, + ) + + if operation_context is not None and 'value_cache' in operation_context: + orig_namespace.update(operation_context['value_cache']) + + if context_object is not None: + orig_namespace.update(context_object) local_namespace = dict(orig_namespace) with monitor.observing("Executing script"): - safe_exec(script, local_namespace=local_namespace) - - data_vars = {} - for name, array in local_namespace.items(): - if isinstance(array, xr.DataArray) or isinstance(array, xr.Variable): - is_new_data_var = name not in orig_namespace - if not is_new_data_var: - is_new_data_var = array is not orig_namespace[name] - if is_new_data_var: - array.name = name - data_vars[name] = array - - if ds is not None and copy: - new_ds = ds.copy() - for name, array in data_vars.items(): - new_ds[name] = array - else: - new_ds = xr.Dataset(data_vars=data_vars) - - return new_ds + try: + safe_exec(script, local_namespace=local_namespace) + except BaseException as e: + raise ValidationError(f'Error in Python script: {e}') from e + + elements = dict() + for name, element in local_namespace.items(): + if not name.startswith('_'): + if isinstance(element, element_types): + if name not in orig_namespace or element is not orig_namespace[name]: + elements[name] = element + + return elements diff --git a/test/cli/test_main.py b/test/cli/test_main.py index 5597d3fc4..3b8b572d0 100644 --- a/test/cli/test_main.py +++ b/test/cli/test_main.py @@ -291,7 +291,7 @@ def test_op_list(self): self.assert_main(['op', 'list', '--internal'], expected_stdout=['2 operations found']) self.assert_main(['op', 'list', '--tag', 'input'], expected_stdout=['9 operations found']) self.assert_main(['op', 'list', '--tag', 'output'], expected_stdout=['6 operations found']) - self.assert_main(['op', 'list', '--deprecated'], expected_stdout=['One operation found']) + self.assert_main(['op', 'list', '--deprecated'], expected_stdout=['2 operations found']) @unittest.skip(reason='Hardcoded values from remote service, contains outdated assumptions') diff --git a/test/ops/test_arithmetics.py b/test/ops/test_arithmetics.py index a805c7da3..760b302e9 100644 --- a/test/ops/test_arithmetics.py +++ b/test/ops/test_arithmetics.py @@ -2,14 +2,15 @@ Tests for arithmetic operations """ +from datetime import datetime from unittest import TestCase import numpy as np +import pandas as pd import xarray as xr -from datetime import datetime -from cate.ops import arithmetics from cate.core.op import OP_REGISTRY +from cate.ops import arithmetics from cate.util.misc import object_to_qualified_name @@ -205,68 +206,55 @@ def test_registered(self): assert_dataset_equal(expected * -1, actual) -# noinspection PyMethodMayBeStatic -class ComputeTest(TestCase): +class ComputeDatasetTest(TestCase): def test_plain_compute(self): - first = np.ones([45, 90, 3]) - second = np.ones([45, 90, 3]) + da1 = np.ones([45, 90, 3]) + da2 = np.ones([45, 90, 3]) lon = np.linspace(-178, 178, 90) lat = np.linspace(-88, 88, 45) - dataset = xr.Dataset({ - 'first': (['lat', 'lon', 'time'], first), - 'second': (['lat', 'lon', 'time'], second), - 'lat': lat, - 'lon': lon - }) - actual = arithmetics.compute_dataset(ds=dataset, - script="third = 6 * first - 3 * second") - expected = xr.Dataset({ - 'third': (['lat', 'lon', 'time'], 6 * first - 3 * second), - 'lat': lat, - 'lon': lon - }) - assert_dataset_equal(expected, actual) - actual = arithmetics.compute_dataset(ds=dataset, - script="third = 6 * first - 3 * second", - copy=True) - expected = xr.Dataset({ - 'first': (['lat', 'lon', 'time'], first), - 'second': (['lat', 'lon', 'time'], second), - 'third': (['lat', 'lon', 'time'], 6 * first - 3 * second), + ds1 = xr.Dataset({ + 'da1': (['lat', 'lon', 'time'], da1), + 'da2': (['lat', 'lon', 'time'], da2), 'lat': lat, 'lon': lon }) - assert_dataset_equal(expected, actual) - def test_registered_compute(self): - reg_op = OP_REGISTRY.get_op(object_to_qualified_name(arithmetics.compute_dataset)) - first = np.ones([45, 90, 3]) - second = np.ones([45, 90, 3]) - dataset = xr.Dataset({ - 'first': (['lat', 'lon', 'time'], first), - 'second': (['lat', 'lon', 'time'], second), - 'lat': np.linspace(-88, 88, 45), - 'lon': np.linspace(-178, 178, 90)}) - actual = reg_op(ds=dataset, - script="third = 6 * first - 3 * second") - expected = xr.Dataset({ - 'third': (['lat', 'lon', 'time'], 6 * first - 3 * second), - 'lat': np.linspace(-88, 88, 45), - 'lon': np.linspace(-178, 178, 90)}) - assert_dataset_equal(expected, actual) - - actual = reg_op(ds=dataset, - script="third = 6 * first - 3 * second", - copy=True) - expected = xr.Dataset({ - 'first': (['lat', 'lon', 'time'], first), - 'second': (['lat', 'lon', 'time'], second), - 'third': (['lat', 'lon', 'time'], 6 * first - 3 * second), - 'lat': np.linspace(-88, 88, 45), - 'lon': np.linspace(-178, 178, 90)}) - assert_dataset_equal(expected, actual) + ds2 = arithmetics.compute_dataset(ds=ds1, + script="_x = 0.5 * da2\n" + "x1 = 2 * da1 - 3 * _x\n" + "x2 = 3 * da1 + 4 * _x\n") + self.assertIsInstance(ds2, xr.Dataset) + self.assertIn('lon', ds2) + self.assertIn('lat', ds2) + self.assertIn('x1', ds2) + self.assertIn('x2', ds2) + self.assertNotIn('da1', ds2) + self.assertNotIn('da2', ds2) + _x = 0.5 * da2 + expected_x1 = 2 * da1 - 3 * _x + expected_x2 = 3 * da1 + 4 * _x + np.testing.assert_array_almost_equal(expected_x1, ds2['x1'].values) + np.testing.assert_array_almost_equal(expected_x2, ds2['x2'].values) + + ds2 = arithmetics.compute_dataset(ds=ds1, + script="_x = 0.6 * da2\n" + "x1 = 4 * da1 - 4 * _x\n" + "x2 = 5 * da1 + 3 * _x\n", + copy=True) + self.assertIsInstance(ds2, xr.Dataset) + self.assertIn('lon', ds2) + self.assertIn('lat', ds2) + self.assertIn('x1', ds2) + self.assertIn('x2', ds2) + self.assertIn('da1', ds2) + self.assertIn('da2', ds2) + _x = 0.6 * da2 + expected_x1 = 4 * da1 - 4 * _x + expected_x2 = 5 * da1 + 3 * _x + np.testing.assert_array_almost_equal(expected_x1, ds2['x1'].values) + np.testing.assert_array_almost_equal(expected_x2, ds2['x2'].values) def test_plain_compute_with_context(self): first = np.ones([45, 90, 3]) @@ -290,37 +278,101 @@ def test_plain_compute_with_context(self): actual = arithmetics.compute_dataset(ds=None, script="third = 6 * res_1.first - 3 * res_2.second", _ctx=_ctx) + self.assertIsInstance(actual, xr.Dataset) expected = xr.Dataset({ 'third': (['lat', 'lon', 'time'], 6 * first - 3 * second), 'lat': lat, 'lon': lon}) assert_dataset_equal(expected, actual) - def test_registered_compute_with_context(self): - reg_op = OP_REGISTRY.get_op(object_to_qualified_name(arithmetics.compute_dataset)) - first = np.ones([45, 90, 3]) - second = np.ones([45, 90, 3]) - lon = np.linspace(-178, 178, 90) - lat = np.linspace(-88, 88, 45) - res_1 = xr.Dataset({ - 'first': (['lat', 'lon', 'time'], first), - 'lat': lat, - 'lon': lon - }) - res_2 = xr.Dataset({ - 'second': (['lat', 'lon', 'time'], second), - 'lat': lat, - 'lon': lon - }) +class ComputeDataFrameTest(TestCase): + + def test_compute_simple(self): + s1 = 10. * np.linspace(0, 1, 11) + s2 = -2 * np.linspace(0, 1, 11) + s3 = +2 * np.linspace(0, 1, 11) + + df1 = pd.DataFrame(dict(s1=s1, s2=s2, s3=s3)) + + df2 = arithmetics.compute_data_frame(df=df1, + script="_a = 3 * s2 - 4 * s3\n" + "a1 = 1 + 2 * s1 + _a\n" + "a2 = 2 + 3 * s1 + _a\n") + + self.assertIsInstance(df2, pd.DataFrame) + self.assertEqual(11, len(df2)) + self.assertIn('a1', df2) + self.assertIn('a2', df2) + self.assertNotIn('_a', df2) + self.assertNotIn('s1', df2) + self.assertNotIn('s2', df2) + self.assertNotIn('s3', df2) + expected_a = 3 * s2 - 4 * s3 + expected_a1 = 1 + 2 * s1 + expected_a + expected_a2 = 2 + 3 * s1 + expected_a + np.testing.assert_array_almost_equal(expected_a1, df2['a1'].values) + np.testing.assert_array_almost_equal(expected_a2, df2['a2'].values) + + df2 = arithmetics.compute_data_frame(df=df1, + script="_a = 3 * s2 - 4 * s3\n" + "a1 = 1 + 2 * s1 + _a\n" + "a2 = 2 + 3 * s1 + _a\n", + copy=True) - # Note, if executed from a workflow, _ctx will be set by the framework - _ctx = dict(value_cache=dict(res_1=res_1, res_2=res_2)) - actual = reg_op(ds=None, - script="third = 6 * res_1.first - 3 * res_2.second", - _ctx=_ctx) - expected = xr.Dataset({ - 'third': (['lat', 'lon', 'time'], 6 * first - 3 * second), - 'lat': lat, - 'lon': lon}) - assert_dataset_equal(expected, actual) + self.assertIsInstance(df2, pd.DataFrame) + self.assertEqual(11, len(df2)) + self.assertIn('a1', df2) + self.assertIn('a2', df2) + self.assertNotIn('_a', df2) + self.assertIn('s1', df2) + self.assertIn('s2', df2) + self.assertIn('s3', df2) + expected_a = 3 * s2 - 4 * s3 + expected_a1 = 1 + 2 * s1 + expected_a + expected_a2 = 2 + 3 * s1 + expected_a + np.testing.assert_array_almost_equal(expected_a1, df2['a1'].values) + np.testing.assert_array_almost_equal(expected_a2, df2['a2'].values) + + def test_compute_aggregations(self): + s1 = 10. * np.linspace(0, 1, 11) + s2 = -2 * np.linspace(0, 1, 11) + s3 = +2 * np.linspace(0, 1, 11) + + df1 = pd.DataFrame(dict(s1=s1, s2=s2, s3=s3)) + + df2 = arithmetics.compute_data_frame(df=df1, + script="s1_mean = s1.mean()\n" + "s2_sum = s2.sum()\n" + "s3_median = s3.median()\n") + + self.assertIsInstance(df2, pd.DataFrame) + self.assertEqual(1, len(df2)) + self.assertIn('s1_mean', df2) + self.assertIn('s2_sum', df2) + self.assertIn('s3_median', df2) + self.assertNotIn('s1', df2) + self.assertNotIn('s2', df2) + self.assertNotIn('s3', df2) + np.testing.assert_almost_equal(np.mean(s1), df2['s1_mean'].values) + np.testing.assert_almost_equal(np.sum(s2), df2['s2_sum'].values) + np.testing.assert_almost_equal(np.median(s3), df2['s3_median'].values) + + df2 = arithmetics.compute_data_frame(df=df1, + script="s1_mean = s1.mean()\n" + "s2_sum = s2.sum()\n" + "s3_median = s3.median()\n", + copy=True) + + self.assertIsInstance(df2, pd.DataFrame) + self.assertEqual(11, len(df2)) + self.assertIn('s1_mean', df2) + self.assertIn('s2_sum', df2) + self.assertIn('s3_median', df2) + self.assertIn('s1', df2) + self.assertIn('s2', df2) + self.assertIn('s3', df2) + + np.testing.assert_almost_equal(np.mean(s1), df2['s1_mean'].values) + np.testing.assert_almost_equal(np.sum(s2), df2['s2_sum'].values) + np.testing.assert_almost_equal(np.median(s3), df2['s3_median'].values)