Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Add more monitors in ops #430

Merged
merged 7 commits into from
Sep 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cate/ops/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ def anomaly_external(ds: xr.Dataset,

:param ds: The dataset to calculate anomalies from
:param file: Path to reference data file
:param str: Apply the given transformation before calculating the anomaly.
:param transform: Apply the given transformation before calculating the anomaly.
For supported operations see help on 'ds_arithmetics' operation.
:param monitor: a progress monitor.
:return: The anomaly dataset
"""
# Check if the time coordinate is of dtype datetime
Expand Down Expand Up @@ -138,6 +139,7 @@ def anomaly_internal(ds: xr.Dataset,
:param ds: The dataset to calculate anomalies from
:param time_range: Time range to use for reference data
:param region: Spatial region to use for reference data
:param monitor: a progress monitor.
:return: The anomaly dataset
"""
ref = ds.copy()
Expand Down
5 changes: 3 additions & 2 deletions cate/ops/arithmetics.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ def ds_arithmetics(ds: DatasetLike.TYPE,

:param ds: The dataset to which to apply arithmetic operations
:param op: A comma separated list of arithmetic operations to apply
:param monitor: a progress monitor.
:return: The dataset with given arithmetic operations applied
"""
ds = DatasetLike.convert(ds)
retset = ds
with monitor.starting('Calculate result', total_work=len(op.split(','))):
for item in op.split(','):
child_mon = monitor.child(1)
with child_mon.observing("Calculate"):
with monitor.child(1).observing("Calculate"):
item = item.strip()
if item[0] == '+':
retset = retset + float(item[1:])
Expand Down Expand Up @@ -118,6 +118,7 @@ def diff(ds: xr.Dataset,

:param ds: The minuend dataset
:param ds2: The subtrahend dataset
:param monitor: a progress monitor.
:return: The difference dataset
"""
try:
Expand Down
21 changes: 16 additions & 5 deletions cate/ops/correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def pearson_correlation_scalar(ds_x: DatasetLike.TYPE,
:param ds_y: The 'y' dataset
:param var_x: Dataset variable to use for correlation analysis in the 'variable' dataset
:param var_y: Dataset variable to use for correlation analysis in the 'dependent' dataset
:param monitor: a progress monitor.
:returns: {'corr_coef': correlation coefficient, 'p_value': probability value}
"""
ds_x = DatasetLike.convert(ds_x)
Expand All @@ -96,7 +97,7 @@ def pearson_correlation_scalar(ds_x: DatasetLike.TYPE,
raise ValueError('The length of the time dimension should not be less'
' than three to run the calculation.')

with monitor.starting("Calculate Pearson correlation", total_work=1):
with monitor.observing("Calculate Pearson correlation"):
cc, pv = pearsonr(array_x.values, array_y.values)

return pd.DataFrame({'corr_coef': [cc], 'p_value': [pv]})
Expand Down Expand Up @@ -142,6 +143,7 @@ def pearson_correlation(ds_x: DatasetLike.TYPE,
:param ds_y: The 'y' dataset
:param var_x: Dataset variable to use for correlation analysis in the 'variable' dataset
:param var_y: Dataset variable to use for correlation analysis in the 'dependent' dataset
:param monitor: a progress monitor.
:returns: a dataset containing a map of correlation coefficients and p_values
"""
ds_x = DatasetLike.convert(ds_x)
Expand Down Expand Up @@ -231,7 +233,7 @@ def _pearsonr(x: xr.DataArray, y: xr.DataArray, monitor: Monitor) -> xr.Dataset:
----------
http://www.statsoft.com/textbook/glosp.html#Pearson%20Correlation
"""
with monitor.observing("Calculate Pearson correlation"):
with monitor.starting("Calculate Pearson correlation", total_work=6):
n = len(x['time'])

xm, ym = x - x.mean(dim='time'), y - y.mean(dim='time')
Expand All @@ -251,8 +253,14 @@ def _pearsonr(x: xr.DataArray, y: xr.DataArray, monitor: Monitor) -> xr.Dataset:
# deferred processing.
# Comparing with NaN produces warnings that can be safely ignored
default_warning_settings = np.seterr(invalid='ignore')
r.values[r.values < -1.0] = -1.0
r.values[r.values > 1.0] = 1.0
with monitor.child(1).observing("task 1"):
negativ_r = r.values < -1.0
Copy link
Member Author

@JanisGailis JanisGailis Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzuehlke
So, we only monitor when we try to access the values? As in, processing from lines 239-247 would be deferred until then?
EDIT: Oh my god I'm dumb, I just read my own comment.

with monitor.child(1).observing("task 2"):
r.values[negativ_r] = -1.0
with monitor.child(1).observing("task 3"):
positiv_r = r.values > 1.0
with monitor.child(1).observing("task 4"):
r.values[positiv_r] = 1.0
np.seterr(**default_warning_settings)
r.attrs = {'description': 'Correlation coefficients between'
' {} and {}.'.format(x.name, y.name)}
Expand All @@ -261,7 +269,10 @@ def _pearsonr(x: xr.DataArray, y: xr.DataArray, monitor: Monitor) -> xr.Dataset:
t_squared = xr.ufuncs.square(r) * (df / ((1.0 - r.where(r != 1)) *
(1.0 + r.where(r != -1))))
prob = df / (df + t_squared)
prob.values = betainc(0.5 * df, 0.5, prob.values)
with monitor.child(1).observing("task 5"):
prob_values_in = prob.values
with monitor.child(1).observing("task 6"):
prob.values = betainc(0.5 * df, 0.5, prob_values_in)
prob.attrs = {'description': 'Rough indicator of probability of an'
' uncorrelated system producing datasets that have a Pearson'
' correlation at least as extreme as the one computed from'
Expand Down
15 changes: 6 additions & 9 deletions cate/ops/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def enso_nino34(ds: xr.Dataset,
"""
n34 = '-170, -5, -120, 5'
name = 'ENSO N3.4 Index'
return _generic_index_calculation(ds, var, n34, 5, file, name, threshold,
monitor)
return _generic_index_calculation(ds, var, n34, 5, file, name, threshold, monitor)


@op(tags=['index'])
Expand Down Expand Up @@ -117,8 +116,7 @@ def enso(ds: xr.Dataset,
if 'custom' == region:
name = 'ENSO Index over ' + PolygonLike.format(converted_region)

return _generic_index_calculation(ds, var, converted_region, 5, file, name,
threshold, monitor)
return _generic_index_calculation(ds, var, converted_region, 5, file, name, threshold, monitor)


@op(tags=['index'])
Expand Down Expand Up @@ -146,7 +144,7 @@ def oni(ds: xr.Dataset,
"""
n34 = '-170, -5, -120, 5'
name = 'ONI Index'
return _generic_index_calculation(ds, var, n34, 3, file, name, threshold)
return _generic_index_calculation(ds, var, n34, 3, file, name, threshold, monitor)


def _generic_index_calculation(ds: xr.Dataset,
Expand Down Expand Up @@ -175,12 +173,11 @@ def _generic_index_calculation(ds: xr.Dataset,
var = VarName.convert(var)
region = PolygonLike.convert(region)

with monitor.starting("Calculate the index"):
child_mon = monitor.child(1)
with monitor.starting("Calculate the index", total_work=2):
ds = select_var(ds, var)
ds_subset = subset_spatial(ds, region)
anom = anomaly_external(ds_subset, file, monitor=child_mon)
with monitor.observing("Calculate mean"):
anom = anomaly_external(ds_subset, file, monitor=monitor.child(1))
with monitor.child(1).observing("Calculate mean"):
ts = anom.mean(dim=['lat', 'lon'])
df = pd.DataFrame(data=ts[var].values, columns=[name], index=ts.time)
retval = df.rolling(window=window, center=True).mean().dropna()
Expand Down
2 changes: 1 addition & 1 deletion cate/ops/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def tseries_mean(ds: xr.Dataset,
for name in names:
dims = list(ds[name].dims)
dims.remove('time')
with monitor.observing("Calculate mean"):
with monitor.child(1).observing("Calculate mean"):
retset[name] = retset[name].mean(dim=dims, keep_attrs=True)
retset[name].attrs['Cate_Description'] = 'Mean aggregated over {} at each point in time.'.format(dims)
std_name = name + std_suffix
Expand Down
9 changes: 6 additions & 3 deletions test/ops/test_correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from scipy.stats import pearsonr

from cate.ops.correlation import pearson_correlation_scalar, pearson_correlation
from ..util.test_monitor import RecordingMonitor


class TestPearsonScalar(TestCase):
Expand Down Expand Up @@ -83,7 +84,7 @@ def test_nominal(self):
np.ones([4, 8])])),
'lat': np.linspace(-67.5, 67.5, 4),
'lon': np.linspace(-157.5, 157.5, 8),
'time': np.array([1, 2, 3])})
'time': np.array([1, 2, 3])}).chunk(chunks={'lat': 2, 'lon': 4})

ds2 = xr.Dataset({
'second': (['time', 'lat', 'lon'], np.array([np.ones([4, 8]),
Expand All @@ -94,9 +95,11 @@ def test_nominal(self):
np.ones([4, 8])])),
'lat': np.linspace(-67.5, 67.5, 4),
'lon': np.linspace(-157.5, 157.5, 8),
'time': np.array([1, 2, 3])})
'time': np.array([1, 2, 3])}).chunk(chunks={'lat': 2, 'lon': 4})

corr = pearson_correlation(ds1, ds2, 'first', 'first')
rm = RecordingMonitor()
corr = pearson_correlation(ds1, ds2, 'first', 'first', monitor=rm)
self.assertEqual(564, len(rm.records))

self.assertTrue(corr['corr_coef'].max() == corr['corr_coef'].min())
self.assertTrue(corr['corr_coef'].max() == -0.5)
Expand Down