Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
closes #703
Browse files Browse the repository at this point in the history
  • Loading branch information
forman committed Jul 13, 2018
1 parent 12b7954 commit bf05f15
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Version 2.0.0.dev16 (in development)

* Added new operations `compute_dataset()` and `compute_data_frame()` [#703](https://github.com/CCI-Tools/cate/issues/703).
* Fixed division by zero error in RGB tile generation if data min and max were equal
* Allow displaying and working with CCI Sea Level MSLAMPH data.
Addresses [#531](https://github.com/CCI-Tools/cate/issues/531).
Expand Down
186 changes: 141 additions & 45 deletions cate/ops/arithmetics.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,23 @@
Functions
=========
"""
import math
from typing import Dict, Any, Mapping, Sequence, Tuple

import geopandas
import geopandas as gpd
import numpy
import numpy as np
import pandas
import pandas as pd
import scipy
import scipy as sp
import xarray
import xarray as xr
from xarray import ufuncs as xu

from cate.core.op import op, op_input, op_return
from cate.core.types import DatasetLike, ValidationError
from cate.core.types import DatasetLike, ValidationError, DataFrameLike
from cate.util.monitor import Monitor
from cate.util.safe import safe_exec

Expand Down Expand Up @@ -160,13 +169,74 @@ def diff(ds: xr.Dataset,
return diff


# noinspection PyIncorrectDocstring
@op(tags=['arithmetic'], version='1.0')
@op_input('df', data_type=DataFrameLike, nullable=True, default_value=None)
@op_input('script', script_lang="python")
@op_input('copy')
@op_input('_ctx', context=True)
def compute_data_frame(df: DataFrameLike.TYPE,
script: str,
copy: bool = False,
_ctx: dict = None,
monitor: Monitor = Monitor.NONE) -> pd.DataFrame:
"""
Compute a new data frame from the given Python *script*.
The argument *script* must be valid Python code or a single expression comprising at least one
value assignment of the form {name} = {expr}. Multiple assignments can be done on multiple lines
or on a single line separated by semicolons.
{expr} may reference variables in the given context dataset *ds* or other resources and their variables
from the current workflow.
In the latter case, use the dot operator to select a variable from a dataset resource.
Any new variable in *script* whose name does not begin with and underscore ('_') and
that has an appropriate data type will be added to the new data frame.
The following packages are available in the *script*:
* ``geopandas``, ``gpd``: The ``geopandas`` top-level package (http://geopandas.org/)
* ``math``: The standard Python ``math`` library (https://docs.python.org/3/library/math.html)
* ``numpy``, ``np``: The ``numpy`` top-level package (https://docs.scipy.org/doc/numpy/reference/)
* ``pandas``, ``pd``: The ``pandas`` top-level package (http://pandas.pydata.org/pandas-docs/stable/api.html)
* ``scipy``, ``sp``: The ``scipy`` top-level package (https://docs.scipy.org/doc/scipy/reference/)
* ``xarray``, ``xr``: The ``xarray`` top-level package (http://xarray.pydata.org/en/stable/api.html)
:param df: Optional context data frame. If provided, all series of this data frame are
directly accessible in the *script*.
If omitted, all series (variables) of other data frame (dataset) resources need to be prefixed
by their resource name.
:param script: Valid Python expression comprising at least one assignment of the form {name} = {expr}.
:param copy: Whether to copy all series from *df*.
:param monitor: An optional progress monitor.
:return: A new data frame object.
"""
series = _exec_script(script, (pd.Series, np.ndarray, float, int), _ctx, df, monitor)

if df is not None and copy:
new_df = df.copy()
for name, data in series.items():
new_df[name] = data
else:
max_size = 0
for data in series.values():
try:
size = len(data)
except TypeError:
size = 1
max_size = max(max_size, size)
index = pd.Int64Index(data=np.arange(max_size), name='id')
new_df = pd.DataFrame(data=series, index=index)

return new_df


# noinspection PyIncorrectDocstring
@op(tags=['arithmetic'], version='1.0')
@op_input('ds', data_type=DatasetLike, nullable=True, default_value=None)
@op_input('script', script_lang="python")
@op_input('copy')
@op_input('_ctx', context=True)
# @op_return(add_history=True)
def compute_dataset(ds: DatasetLike.TYPE,
script: str,
copy: bool = False,
Expand All @@ -175,65 +245,91 @@ def compute_dataset(ds: DatasetLike.TYPE,
"""
Compute a new dataset from the given Python *script*.
The argument *script* must be valid Python code or a single expression comprising at least one
value assignment of the form <name> = <expr>. Multiple assignments can be done on multiple lines
value assignment of the form {name} = {expr}. Multiple assignments can be done on multiple lines
or on a single line separated by semicolons.
<expr> may reference variables in the given context dataset *ds* or other resources and their variables
{expr} may reference variables in the given context dataset *ds* or other resources and their variables
from the current workflow.
In the latter case, use the dot operator to select a variable from a dataset resource.
Every new variable in *script* of type data array will be added to the new dataset.
Any new variable in *script* whose name does not begin with and underscore ('_') and
that has an appropriate data type will be added to the new dataset.
The following packages are available in the code:
The following packages are available in the *script*:
* ``np``: The ``numpy`` top-level package (https://docs.scipy.org/doc/numpy/reference/)
* ``pd``: The ``pandas`` top-level package (http://pandas.pydata.org/pandas-docs/stable/api.html)
* ``xr``: The ``xarray`` top-level package (http://xarray.pydata.org/en/stable/api.html)
* ``xu``: The ``xarray.ufuncs`` package (http://xarray.pydata.org/en/stable/api.html#universal-functions)
* ``geopandas``, ``gpd``: The ``geopandas`` top-level package (http://geopandas.org/)
* ``math``: The standard Python ``math`` library (https://docs.python.org/3/library/math.html)
* ``numpy``, ``np``: The ``numpy`` top-level package (https://docs.scipy.org/doc/numpy/reference/)
* ``pandas``, ``pd``: The ``pandas`` top-level package (http://pandas.pydata.org/pandas-docs/stable/api.html)
* ``scipy``, ``sp``: The ``scipy`` top-level package (https://docs.scipy.org/doc/scipy/reference/)
* ``xarray``, ``xr``: The ``xarray`` top-level package (http://xarray.pydata.org/en/stable/api.html)
Note, in contrast to the ``np`` package, all the math functions defined in ``xu`` will preserve variable attributes.
:param ds: Optional context dataset. All variables of this dataset are directly accessible in the *script*.
If omitted, all variables need to be prefixed by their dataset resource names.
:param script: Valid Python expression comprising at least one assignment of the form <name> = <expr>.
:param ds: Optional context dataset. If provided, all variables of this dataset are
directly accessible in the *script*.
If omitted, all variables (series) of other dataset (data frame) resources need to be prefixed
by their resource name.
:param script: Valid Python expression comprising at least one assignment of the form {name} = {expr}.
:param copy: Whether to copy all variables from *ds*.
:param monitor: An optional progress monitor.
:return: A new dataset.
:return: A new dataset object.
"""
data_vars = _exec_script(script, (xr.DataArray, np.ndarray, float, int), _ctx, ds, monitor)

if _ctx is not None and 'value_cache' in _ctx:
orig_namespace = dict(_ctx['value_cache'])
if ds is not None and copy:
new_ds = ds.copy()
for name, data in data_vars.items():
new_ds[name] = data
else:
orig_namespace = dict()
new_ds = xr.Dataset(data_vars=data_vars)

return new_ds

if ds is not None:
orig_namespace.update(ds.data_vars)

orig_namespace['np'] = np
orig_namespace['pd'] = pd
orig_namespace['xr'] = xr
orig_namespace['xu'] = xu
def _exec_script(script: str,
element_types: Tuple[type, ...],
operation_context: Mapping[str, Any] = None,
context_object: Mapping[str, Any] = None,
monitor: Monitor = Monitor.NONE) -> Dict[str, Any]:
"""
Helper for compute_dataset() and compute_data_frame().
"""
if not script:
raise ValidationError(f'Python script must not be empty')

# Include common libraries
orig_namespace = dict(
gpd=gpd,
geopandas=geopandas,
math=math,
np=np,
numpy=numpy,
pd=pd,
pandas=pandas,
sp=sp,
scipy=scipy,
xr=xr,
xarray=xarray,
)

if operation_context is not None and 'value_cache' in operation_context:
orig_namespace.update(operation_context['value_cache'])

if context_object is not None:
orig_namespace.update(context_object)

local_namespace = dict(orig_namespace)

with monitor.observing("Executing script"):
safe_exec(script, local_namespace=local_namespace)

data_vars = {}
for name, array in local_namespace.items():
if isinstance(array, xr.DataArray) or isinstance(array, xr.Variable):
is_new_data_var = name not in orig_namespace
if not is_new_data_var:
is_new_data_var = array is not orig_namespace[name]
if is_new_data_var:
array.name = name
data_vars[name] = array

if ds is not None and copy:
new_ds = ds.copy()
for name, array in data_vars.items():
new_ds[name] = array
else:
new_ds = xr.Dataset(data_vars=data_vars)

return new_ds
try:
safe_exec(script, local_namespace=local_namespace)
except BaseException as e:
raise ValidationError(f'Error in Python script: {e}') from e

elements = dict()
for name, element in local_namespace.items():
if not name.startswith('_'):
if isinstance(element, element_types):
if name not in orig_namespace or element is not orig_namespace[name]:
elements[name] = element

return elements
2 changes: 1 addition & 1 deletion test/cli/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def test_op_list(self):
self.assert_main(['op', 'list', '--internal'], expected_stdout=['2 operations found'])
self.assert_main(['op', 'list', '--tag', 'input'], expected_stdout=['9 operations found'])
self.assert_main(['op', 'list', '--tag', 'output'], expected_stdout=['6 operations found'])
self.assert_main(['op', 'list', '--deprecated'], expected_stdout=['One operation found'])
self.assert_main(['op', 'list', '--deprecated'], expected_stdout=['2 operations found'])


@unittest.skip(reason='Hardcoded values from remote service, contains outdated assumptions')
Expand Down
Loading

0 comments on commit bf05f15

Please sign in to comment.