diff --git a/cate/ops/anomaly.py b/cate/ops/anomaly.py index 466c369bf..1aae9b73f 100644 --- a/cate/ops/anomaly.py +++ b/cate/ops/anomaly.py @@ -63,8 +63,9 @@ def anomaly_external(ds: xr.Dataset, :param ds: The dataset to calculate anomalies from :param file: Path to reference data file - :param str: Apply the given transformation before calculating the anomaly. + :param transform: Apply the given transformation before calculating the anomaly. For supported operations see help on 'ds_arithmetics' operation. + :param monitor: a progress monitor. :return: The anomaly dataset """ # Check if the time coordinate is of dtype datetime @@ -126,7 +127,8 @@ def _group_anomaly(group: xr.Dataset, @op_return(add_history=True) def anomaly_internal(ds: xr.Dataset, time_range: TimeRangeLike.TYPE = None, - region: PolygonLike.TYPE = None) -> xr.Dataset: + region: PolygonLike.TYPE = None, + monitor: Monitor = Monitor.NONE) -> xr.Dataset: """ Calculate anomaly using as reference data the mean of an optional region and time slice from the given dataset. If no time slice/spatial region is @@ -137,6 +139,7 @@ def anomaly_internal(ds: xr.Dataset, :param ds: The dataset to calculate anomalies from :param time_range: Time range to use for reference data :param region: Spatial region to use for reference data + :param monitor: a progress monitor. :return: The anomaly dataset """ ref = ds.copy() @@ -146,5 +149,7 @@ def anomaly_internal(ds: xr.Dataset, if region: region = PolygonLike.convert(region) ref = subset_spatial(ref, region) - ref = ref.mean(keep_attrs=True, skipna=True) - return ds - ref + with monitor.observing("Calculating anomaly"): + ref = ref.mean(keep_attrs=True, skipna=True) + diff = ds - ref + return diff diff --git a/cate/ops/arithmetics.py b/cate/ops/arithmetics.py index 4fbc37059..3e4f44d62 100644 --- a/cate/ops/arithmetics.py +++ b/cate/ops/arithmetics.py @@ -34,13 +34,15 @@ from cate.core.op import op, op_input, op_return from cate.core.types import DatasetLike +from cate.util.monitor import Monitor @op(tags=['arithmetic'], version='1.0') @op_input('ds', data_type=DatasetLike) @op_return(add_history=True) def ds_arithmetics(ds: DatasetLike.TYPE, - op: str) -> xr.Dataset: + op: str, + monitor: Monitor = Monitor.NONE) -> xr.Dataset: """ Do arithmetic operations on the given dataset by providing a list of arithmetic operations and the corresponding constant. The operations will @@ -62,40 +64,45 @@ def ds_arithmetics(ds: DatasetLike.TYPE, :param ds: The dataset to which to apply arithmetic operations :param op: A comma separated list of arithmetic operations to apply + :param monitor: a progress monitor. :return: The dataset with given arithmetic operations applied """ ds = DatasetLike.convert(ds) retset = ds - for item in op.split(','): - item = item.strip() - if item[0] == '+': - retset = retset + float(item[1:]) - elif item[0] == '-': - retset = retset - float(item[1:]) - elif item[0] == '*': - retset = retset * float(item[1:]) - elif item[0] == '/': - retset = retset / float(item[1:]) - elif item[:] == 'log': - retset = xu.log(retset) - elif item[:] == 'log10': - retset = xu.log10(retset) - elif item[:] == 'log2': - retset = xu.log2(retset) - elif item[:] == 'log1p': - retset = xu.log1p(retset) - elif item[:] == 'exp': - retset = xu.exp(retset) - else: - raise ValueError('Arithmetic operation {} not' - ' implemented.'.format(item[0])) + with monitor.starting('Calculate result', total_work=len(op.split(','))): + for item in op.split(','): + with monitor.child(1).observing("Calculate"): + item = item.strip() + if item[0] == '+': + retset = retset + float(item[1:]) + elif item[0] == '-': + retset = retset - float(item[1:]) + elif item[0] == '*': + retset = retset * float(item[1:]) + elif item[0] == '/': + retset = retset / float(item[1:]) + elif item[:] == 'log': + retset = xu.log(retset) + elif item[:] == 'log10': + retset = xu.log10(retset) + elif item[:] == 'log2': + retset = xu.log2(retset) + elif item[:] == 'log1p': + retset = xu.log1p(retset) + elif item[:] == 'exp': + retset = xu.exp(retset) + else: + raise ValueError('Arithmetic operation {} not' + ' implemented.'.format(item[0])) return retset @op(tags=['arithmetic'], version='1.0') @op_return(add_history=True) -def diff(ds: xr.Dataset, ds2: xr.Dataset) -> xr.Dataset: +def diff(ds: xr.Dataset, + ds2: xr.Dataset, + monitor: Monitor = Monitor.NONE) -> xr.Dataset: """ Calculate the difference of two datasets (ds - ds2). This is done by matching variable names in the two datasets against each other and taking @@ -111,6 +118,7 @@ def diff(ds: xr.Dataset, ds2: xr.Dataset) -> xr.Dataset: :param ds: The minuend dataset :param ds2: The subtrahend dataset + :param monitor: a progress monitor. :return: The difference dataset """ try: @@ -143,4 +151,7 @@ def diff(ds: xr.Dataset, ds2: xr.Dataset) -> xr.Dataset: else: raise TypeError(str(e)) - return ds - ds2 + with monitor.observing("Subtract datasets"): + diff = ds - ds2 + + return diff diff --git a/cate/ops/correlation.py b/cate/ops/correlation.py index 94cb65b9e..5230c9a29 100644 --- a/cate/ops/correlation.py +++ b/cate/ops/correlation.py @@ -38,6 +38,7 @@ from cate.core.op import op, op_input, op_return from cate.core.types import VarName, DatasetLike +from cate.util import Monitor from cate.ops.normalize import adjust_spatial_attrs @@ -50,7 +51,8 @@ def pearson_correlation_scalar(ds_x: DatasetLike.TYPE, ds_y: DatasetLike.TYPE, var_x: VarName.TYPE, - var_y: VarName.TYPE) -> pd.DataFrame: + var_y: VarName.TYPE, + monitor: Monitor = Monitor.NONE) -> pd.DataFrame: """ Do product moment `Pearson's correlation `_ analysis. @@ -68,6 +70,7 @@ def pearson_correlation_scalar(ds_x: DatasetLike.TYPE, :param ds_y: The 'y' dataset :param var_x: Dataset variable to use for correlation analysis in the 'variable' dataset :param var_y: Dataset variable to use for correlation analysis in the 'dependent' dataset + :param monitor: a progress monitor. :returns: {'corr_coef': correlation coefficient, 'p_value': probability value} """ ds_x = DatasetLike.convert(ds_x) @@ -94,7 +97,9 @@ def pearson_correlation_scalar(ds_x: DatasetLike.TYPE, raise ValueError('The length of the time dimension should not be less' ' than three to run the calculation.') - cc, pv = pearsonr(array_x.values, array_y.values) + with monitor.observing("Calculate Pearson correlation"): + cc, pv = pearsonr(array_x.values, array_y.values) + return pd.DataFrame({'corr_coef': [cc], 'p_value': [pv]}) @@ -107,7 +112,8 @@ def pearson_correlation_scalar(ds_x: DatasetLike.TYPE, def pearson_correlation(ds_x: DatasetLike.TYPE, ds_y: DatasetLike.TYPE, var_x: VarName.TYPE, - var_y: VarName.TYPE) -> xr.Dataset: + var_y: VarName.TYPE, + monitor: Monitor = Monitor.NONE) -> xr.Dataset: """ Do product moment `Pearson's correlation `_ analysis. @@ -137,6 +143,7 @@ def pearson_correlation(ds_x: DatasetLike.TYPE, :param ds_y: The 'y' dataset :param var_x: Dataset variable to use for correlation analysis in the 'variable' dataset :param var_y: Dataset variable to use for correlation analysis in the 'dependent' dataset + :param monitor: a progress monitor. :returns: a dataset containing a map of correlation coefficients and p_values """ ds_x = DatasetLike.convert(ds_x) @@ -189,13 +196,13 @@ def pearson_correlation(ds_x: DatasetLike.TYPE, ' than three to run the calculation.') # Do pixel by pixel correlation - retset = _pearsonr(array_x, array_y) + retset = _pearsonr(array_x, array_y, monitor) retset.attrs['Cate_Description'] = 'Correlation between {} {}'.format(var_y, var_x) return adjust_spatial_attrs(retset) -def _pearsonr(x: xr.DataArray, y: xr.DataArray) -> xr.Dataset: +def _pearsonr(x: xr.DataArray, y: xr.DataArray, monitor: Monitor) -> xr.Dataset: """ Calculates Pearson correlation coefficients and p-values for testing non-correlation of lon/lat/time xarray datasets for each lon/lat point. @@ -218,6 +225,7 @@ def _pearsonr(x: xr.DataArray, y: xr.DataArray) -> xr.Dataset: :param x: lon/lat/time xr.DataArray :param y: xr.DataArray of the same spatiotemporal extents and resolution as x. + :param monitor: Monitor to use for monitoring the calculation :return: A dataset containing the correlation coefficients and p_values on the lon/lat grid of x and y. @@ -225,42 +233,52 @@ def _pearsonr(x: xr.DataArray, y: xr.DataArray) -> xr.Dataset: ---------- http://www.statsoft.com/textbook/glosp.html#Pearson%20Correlation """ - n = len(x['time']) - - xm, ym = x - x.mean(dim='time'), y - y.mean(dim='time') - xm_ym = xm * ym - r_num = xm_ym.sum(dim='time') - xm_squared = xr.ufuncs.square(xm) - ym_squared = xr.ufuncs.square(ym) - r_den = xr.ufuncs.sqrt(xm_squared.sum(dim='time') * - ym_squared.sum(dim='time')) - r_den = r_den.where(r_den != 0) - r = r_num / r_den - - # Presumably, if abs(r) > 1, then it is only some small artifact of floating - # point arithmetic. - # At this point r should be a lon/lat dataArray, so it should be safe to - # load it in memory explicitly. This may take time as it will kick-start - # deferred processing. - # Comparing with NaN produces warnings that can be safely ignored - default_warning_settings = np.seterr(invalid='ignore') - r.values[r.values < -1.0] = -1.0 - r.values[r.values > 1.0] = 1.0 - np.seterr(**default_warning_settings) - r.attrs = {'description': 'Correlation coefficients between' - ' {} and {}.'.format(x.name, y.name)} - - df = n - 2 - t_squared = xr.ufuncs.square(r) * (df / ((1.0 - r.where(r != 1)) * - (1.0 + r.where(r != -1)))) - prob = df / (df + t_squared) - prob.values = betainc(0.5 * df, 0.5, prob.values) - prob.attrs = {'description': 'Rough indicator of probability of an' - ' uncorrelated system producing datasets that have a Pearson' - ' correlation at least as extreme as the one computed from' - ' these datsets. Not entirely reliable, but reasonable for' - ' datasets larger than 500 or so.'} - - retset = xr.Dataset({'corr_coef': r, - 'p_value': prob}) + with monitor.starting("Calculate Pearson correlation", total_work=6): + n = len(x['time']) + + xm, ym = x - x.mean(dim='time'), y - y.mean(dim='time') + xm_ym = xm * ym + r_num = xm_ym.sum(dim='time') + xm_squared = xr.ufuncs.square(xm) + ym_squared = xr.ufuncs.square(ym) + r_den = xr.ufuncs.sqrt(xm_squared.sum(dim='time') * + ym_squared.sum(dim='time')) + r_den = r_den.where(r_den != 0) + r = r_num / r_den + + # Presumably, if abs(r) > 1, then it is only some small artifact of floating + # point arithmetic. + # At this point r should be a lon/lat dataArray, so it should be safe to + # load it in memory explicitly. This may take time as it will kick-start + # deferred processing. + # Comparing with NaN produces warnings that can be safely ignored + default_warning_settings = np.seterr(invalid='ignore') + with monitor.child(1).observing("task 1"): + negativ_r = r.values < -1.0 + with monitor.child(1).observing("task 2"): + r.values[negativ_r] = -1.0 + with monitor.child(1).observing("task 3"): + positiv_r = r.values > 1.0 + with monitor.child(1).observing("task 4"): + r.values[positiv_r] = 1.0 + np.seterr(**default_warning_settings) + r.attrs = {'description': 'Correlation coefficients between' + ' {} and {}.'.format(x.name, y.name)} + + df = n - 2 + t_squared = xr.ufuncs.square(r) * (df / ((1.0 - r.where(r != 1)) * + (1.0 + r.where(r != -1)))) + prob = df / (df + t_squared) + with monitor.child(1).observing("task 5"): + prob_values_in = prob.values + with monitor.child(1).observing("task 6"): + prob.values = betainc(0.5 * df, 0.5, prob_values_in) + prob.attrs = {'description': 'Rough indicator of probability of an' + ' uncorrelated system producing datasets that have a Pearson' + ' correlation at least as extreme as the one computed from' + ' these datsets. Not entirely reliable, but reasonable for' + ' datasets larger than 500 or so.'} + + retset = xr.Dataset({'corr_coef': r, + 'p_value': prob}) return retset diff --git a/cate/ops/index.py b/cate/ops/index.py index fd0853536..4d05c80ae 100644 --- a/cate/ops/index.py +++ b/cate/ops/index.py @@ -37,6 +37,7 @@ from cate.ops.subset import subset_spatial from cate.ops.anomaly import anomaly_external from cate.core.types import PolygonLike, VarName +from cate.util import Monitor _ALL_FILE_FILTER = dict(name='All Files', extensions=['*']) @@ -48,7 +49,8 @@ def enso_nino34(ds: xr.Dataset, var: VarName.TYPE, file: str, - threshold: float=None) -> pd.DataFrame: + threshold: float=None, + monitor: Monitor = Monitor.NONE) -> pd.DataFrame: """ Calculate nino34 index, which is defined as a five month running mean of anomalies of monthly means of SST data in Nino3.4 region:: lon_min=-170 @@ -63,11 +65,12 @@ def enso_nino34(ds: xr.Dataset, threshold. Where anomaly larger than the positive value of the threshold indicates El Nino and anomaly smaller than the negative of the given threshold indicates La Nina. + :param monitor: a progress monitor. :return: A dataset that contains the index timeseries. """ n34 = '-170, -5, -120, 5' name = 'ENSO N3.4 Index' - return _generic_index_calculation(ds, var, n34, 5, file, name, threshold) + return _generic_index_calculation(ds, var, n34, 5, file, name, threshold, monitor) @op(tags=['index']) @@ -80,7 +83,8 @@ def enso(ds: xr.Dataset, file: str, region: str='n34', custom_region: PolygonLike.TYPE=None, - threshold: float=None) -> pd.DataFrame: + threshold: float=None, + monitor: Monitor = Monitor.NONE) -> pd.DataFrame: """ Calculate ENSO index, which is defined as a five month running mean of anomalies of monthly means of SST data in the given region. @@ -96,6 +100,7 @@ def enso(ds: xr.Dataset, threshold. Where anomaly larger than then positive value of the threshold indicates El Nino and anomaly smaller than the negative of the given threshold indicates La Nina. + :param monitor: a progress monitor. :return: A dataset that contains the index timeseries. """ regions = {'N1+2': '-90, -10, -80, 0', @@ -111,13 +116,17 @@ def enso(ds: xr.Dataset, if 'custom' == region: name = 'ENSO Index over ' + PolygonLike.format(converted_region) - return _generic_index_calculation(ds, var, converted_region, 5, file, name, threshold) + return _generic_index_calculation(ds, var, converted_region, 5, file, name, threshold, monitor) @op(tags=['index']) @op_input('var', value_set_source='ds', data_type=VarName) @op_input('file', file_open_mode='w', file_filters=[dict(name='NetCDF', extensions=['nc']), _ALL_FILE_FILTER]) -def oni(ds: xr.Dataset, var: VarName.TYPE, file: str, threshold: float=None) -> pd.DataFrame: +def oni(ds: xr.Dataset, + var: VarName.TYPE, + file: str, + threshold: float=None, + monitor: Monitor = Monitor.NONE) -> pd.DataFrame: """ Calculate ONI index, which is defined as a three month running mean of anomalies of monthly means of SST data in the Nino3.4 region. @@ -130,10 +139,12 @@ def oni(ds: xr.Dataset, var: VarName.TYPE, file: str, threshold: float=None) -> threshold. Where anomaly larger than then positive value of the threshold indicates El Nino and anomaly smaller than the negative of the given threshold indicates La Nina. + :param monitor: a progress monitor. + :return: A dataset that containts the index timeseries """ n34 = '-170, -5, -120, 5' name = 'ONI Index' - return _generic_index_calculation(ds, var, n34, 3, file, name, threshold) + return _generic_index_calculation(ds, var, n34, 3, file, name, threshold, monitor) def _generic_index_calculation(ds: xr.Dataset, @@ -142,7 +153,8 @@ def _generic_index_calculation(ds: xr.Dataset, window: int, file: str, name: str, - threshold: float = None) -> pd.DataFrame: + threshold: float = None, + monitor: Monitor = Monitor.NONE) -> pd.DataFrame: """ A generic index calculation. Where an index is defined as an anomaly against the given reference of a moving average of the given window size of @@ -155,16 +167,20 @@ def _generic_index_calculation(ds: xr.Dataset, :param file: Path to the reference file :param threshold: Absolute threshold that indicates an ENSO event :param name: Name of the index + :param monitor: a progress monitor. + :return: A dataset that contains the index timeseries """ var = VarName.convert(var) region = PolygonLike.convert(region) - ds = select_var(ds, var) - ds_subset = subset_spatial(ds, region) - anom = anomaly_external(ds_subset, file) - ts = anom.mean(dim=['lat', 'lon']) - df = pd.DataFrame(data=ts[var].values, columns=[name], index=ts.time) - retval = df.rolling(window=window, center=True).mean().dropna() + with monitor.starting("Calculate the index", total_work=2): + ds = select_var(ds, var) + ds_subset = subset_spatial(ds, region) + anom = anomaly_external(ds_subset, file, monitor=monitor.child(1)) + with monitor.child(1).observing("Calculate mean"): + ts = anom.mean(dim=['lat', 'lon']) + df = pd.DataFrame(data=ts[var].values, columns=[name], index=ts.time) + retval = df.rolling(window=window, center=True).mean().dropna() if threshold is None: return retval diff --git a/cate/ops/timeseries.py b/cate/ops/timeseries.py index 0a6a91d67..1be950d35 100644 --- a/cate/ops/timeseries.py +++ b/cate/ops/timeseries.py @@ -34,6 +34,7 @@ from cate.core.op import op_input, op, op_return from cate.ops.select import select_var from cate.core.types import VarNamesLike, PointLike +from cate.util import Monitor @op(tags=['timeseries', 'temporal', 'filter', 'point'], version='1.0') @@ -96,7 +97,8 @@ def tseries_point(ds: xr.Dataset, def tseries_mean(ds: xr.Dataset, var: VarNamesLike.TYPE, std_suffix: str = '_std', - calculate_std: bool = True) -> xr.Dataset: + calculate_std: bool = True, + monitor: Monitor = Monitor.NONE) -> xr.Dataset: """ Extract spatial mean timeseries of the provided variables, return the dataset that in addition to all the information in the given dataset @@ -111,6 +113,7 @@ def tseries_mean(ds: xr.Dataset, :param var: Variables for which to perform timeseries extraction :param calculate_std: Whether to calculate std in addition to mean :param std_suffix: Std suffix to use for resulting datasets, if std is calculated. + :param monitor: a progress monitor. :return: Dataset with timeseries variables """ if not var: @@ -119,13 +122,15 @@ def tseries_mean(ds: xr.Dataset, retset = select_var(ds, var) names = retset.data_vars.keys() - for name in names: - dims = list(ds[name].dims) - dims.remove('time') - retset[name] = retset[name].mean(dim=dims, keep_attrs=True) - retset[name].attrs['Cate_Description'] = 'Mean aggregated over {} at each point in time.'.format(dims) - std_name = name + std_suffix - retset[std_name] = ds[name].std(dim=dims) - retset[std_name].attrs['Cate_Description'] = 'Accompanying std values for variable \'{}\''.format(name) + with monitor.starting("Calculate mean", total_work=len(names)): + for name in names: + dims = list(ds[name].dims) + dims.remove('time') + with monitor.child(1).observing("Calculate mean"): + retset[name] = retset[name].mean(dim=dims, keep_attrs=True) + retset[name].attrs['Cate_Description'] = 'Mean aggregated over {} at each point in time.'.format(dims) + std_name = name + std_suffix + retset[std_name] = ds[name].std(dim=dims) + retset[std_name].attrs['Cate_Description'] = 'Accompanying std values for variable \'{}\''.format(name) return retset diff --git a/test/ops/test_correlation.py b/test/ops/test_correlation.py index 0a1696856..76f4ef3b8 100644 --- a/test/ops/test_correlation.py +++ b/test/ops/test_correlation.py @@ -10,6 +10,7 @@ from scipy.stats import pearsonr from cate.ops.correlation import pearson_correlation_scalar, pearson_correlation +from ..util.test_monitor import RecordingMonitor class TestPearsonScalar(TestCase): @@ -83,7 +84,7 @@ def test_nominal(self): np.ones([4, 8])])), 'lat': np.linspace(-67.5, 67.5, 4), 'lon': np.linspace(-157.5, 157.5, 8), - 'time': np.array([1, 2, 3])}) + 'time': np.array([1, 2, 3])}).chunk(chunks={'lat': 2, 'lon': 4}) ds2 = xr.Dataset({ 'second': (['time', 'lat', 'lon'], np.array([np.ones([4, 8]), @@ -94,9 +95,11 @@ def test_nominal(self): np.ones([4, 8])])), 'lat': np.linspace(-67.5, 67.5, 4), 'lon': np.linspace(-157.5, 157.5, 8), - 'time': np.array([1, 2, 3])}) + 'time': np.array([1, 2, 3])}).chunk(chunks={'lat': 2, 'lon': 4}) - corr = pearson_correlation(ds1, ds2, 'first', 'first') + rm = RecordingMonitor() + corr = pearson_correlation(ds1, ds2, 'first', 'first', monitor=rm) + self.assertTrue(len(rm.records) > 0) self.assertTrue(corr['corr_coef'].max() == corr['corr_coef'].min()) self.assertTrue(corr['corr_coef'].max() == -0.5)