Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Get progress from xarray / dask tasks #290

Merged
merged 13 commits into from
Jul 12, 2017
2 changes: 1 addition & 1 deletion cate/core/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
**Operation monitoring**

:Description: Operation registration should recognise an optional *monitor* argument of a user function:
``f(*args, monitor=Monitor.NULL, **kwargs)``. In this case the a monitor (of type :py:class:`Monitor`) will be passed
``f(*args, monitor=Monitor.NONE, **kwargs)``. In this case the a monitor (of type :py:class:`Monitor`) will be passed
by the framework to the user function in order to observe the progress and to cancel an operation.

----
Expand Down
65 changes: 65 additions & 0 deletions cate/core/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# The MIT License (MIT)
# Copyright (c) 2016, 2017 by the ESA CCI Toolbox development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

__author__ = "Marco Zuehlke (Brockmann Consult GmbH)"

"""
Internal module that contains utility functions.
"""

from dask.callbacks import Callback
from cate.util import Monitor

_DEBUG_DASK_PROGRESS = False


class DaskMonitor(Callback):
"""
A ``dask.Callback`` that reports the task level notification that the
dask scheduler generates to the provided ``Monitor``.

This allows for tracking then progress inside dask compute/get calls and
the possibility to cancel them.
"""
def __init__(self, label: str, monitor: Monitor):
super().__init__()
self._label = label
self._monitor = monitor

def _start_state(self, dsk, state):
num_tasks = sum(len(state[k]) for k in ['ready', 'waiting'])
self._monitor.start(label=self._label, total_work=num_tasks)
if _DEBUG_DASK_PROGRESS:
print("DaskMonitor.start_state: num_tasks=", num_tasks)

def _pretask(self, key, dsk, state):
if self._monitor.is_cancelled():
raise InterruptedError

def _posttask(self, key, result, dsk, state, worker_id):
self._monitor.progress(work=1)
if _DEBUG_DASK_PROGRESS:
print("DaskMonitor.posttask: key=", key)

def _finish(self, dsk, state, failed):
self._monitor.done()
if _DEBUG_DASK_PROGRESS:
print("DaskMonitor.finish")
68 changes: 44 additions & 24 deletions cate/ops/coregistration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import xarray as xr

from cate.core.op import op_input, op, op_return
from cate.core.utils import DaskMonitor
from cate.util import Monitor

from cate.ops import resampling

Expand All @@ -50,7 +52,8 @@
def coregister(ds_master: xr.Dataset,
ds_slave: xr.Dataset,
method_us: str = 'linear',
method_ds: str = 'mean') -> xr.Dataset:
method_ds: str = 'mean',
monitor: Monitor = Monitor.NONE) -> xr.Dataset:
"""
Perform coregistration of two datasets by resampling the slave dataset unto the
grid of the master. If upsampling has to be performed, this is achieved using
Expand Down Expand Up @@ -78,6 +81,7 @@ def coregister(ds_master: xr.Dataset,
:param ds_slave: The dataset that will be resampled
:param method_us: Interpolation method to use for upsampling.
:param method_ds: Interpolation method to use for downsampling.
:param monitor: a progress monitor.
:return: The slave dataset resampled on the grid of the master
"""
# Check if the grids of the provided datasets are equidistant and pixel
Expand Down Expand Up @@ -122,7 +126,7 @@ def coregister(ds_master: xr.Dataset,
methods_us = {'nearest': 10, 'linear': 11}
methods_ds = {'first': 50, 'last': 51, 'mean': 54, 'mode': 56, 'var': 57, 'std': 58}

return _resample_dataset(ds_master, ds_slave, methods_us[method_us], methods_ds[method_ds])
return _resample_dataset(ds_master, ds_slave, methods_us[method_us], methods_ds[method_ds], monitor)


def _is_equidistant(array: np.ndarray) -> bool:
Expand Down Expand Up @@ -179,7 +183,7 @@ def _within_bounds(array: np.ndarray, low_bound) -> bool:
return (array[0] >= low_bound and array[-1] <= abs(low_bound))


def _resample_slice(arr_slice: xr.DataArray, w: int, h: int, ds_method: int, us_method: int) -> xr.DataArray:
def _resample_slice(arr_slice: xr.DataArray, w: int, h: int, ds_method: int, us_method: int, monitor: Monitor) -> xr.DataArray:
"""
Resample a single time slice of a larger xr.DataArray

Expand All @@ -188,18 +192,23 @@ def _resample_slice(arr_slice: xr.DataArray, w: int, h: int, ds_method: int, us_
:param h: The desired new height (amount of latitudes)
:param ds_method: Downsampling method, see resampling.py
:param us_method: Upsampling method, see resampling.py
:param monitor: a progress monitor.
:return: resampled slice
"""
result = resampling.resample_2d(np.ma.masked_invalid(arr_slice.values),
w,
h,
ds_method,
us_method)
return xr.DataArray(result)
if monitor.is_cancelled():
raise InterruptedError
child_monitor = monitor.child(1)
with DaskMonitor("resample time slice", child_monitor):
result = resampling.resample_2d(np.ma.masked_invalid(arr_slice.values),
w,
h,
ds_method,
us_method)
return xr.DataArray(result)


def _resample_array(array: xr.DataArray, lon: xr.DataArray, lat: xr.DataArray, method_us: int,
method_ds: int) -> xr.DataArray:
method_ds: int, monitor: Monitor) -> xr.DataArray:
"""
Resample the given xr.DataArray to a new grid defined by lat and lon

Expand All @@ -208,24 +217,33 @@ def _resample_array(array: xr.DataArray, lon: xr.DataArray, lat: xr.DataArray, m
:param lon: 'lon' xr.DataArray attribute for the new grid
:param method_us: Interpolation method to use for upsampling, see resampling.py
:param method_ds: Interpolation method to use for downsampling, see resampling.py
:param monitor: a progress monitor.
:return: The resampled array
"""
# Determine width and height of the resampled array
width = lon.values.size
height = lat.values.size

kwargs = {'w': width, 'h': height, 'ds_method': method_ds, 'us_method': method_us}
temp_array = array.groupby('time').apply(_resample_slice, **kwargs)
chunks = list(temp_array.shape[1:])
chunks.insert(0, 1)
return xr.DataArray(temp_array.values,
name=array.name,
dims=array.dims,
coords={'time': array.time, 'lat': lat, 'lon': lon},
attrs=array.attrs).chunk(chunks=chunks)
if monitor.is_cancelled():
raise InterruptedError
child_monitor = monitor.child(1)

kwargs = {'w': width, 'h': height, 'ds_method': method_ds, 'us_method': method_us, 'monitor': child_monitor}
group_by_time = array.groupby('time')
num_time_steps = len(group_by_time)

def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: int, method_ds: int) -> xr.Dataset:
with child_monitor.starting("coregister dataarray", total_work=num_time_steps):
temp_array = group_by_time.apply(_resample_slice, **kwargs)
chunks = list(temp_array.shape[1:])
chunks.insert(0, 1)
return xr.DataArray(temp_array.values,
name=array.name,
dims=array.dims,
coords={'time': array.time, 'lat': lat, 'lon': lon},
attrs=array.attrs).chunk(chunks=chunks)


def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: int, method_ds: int, monitor: Monitor) -> xr.Dataset:
"""
Resample slave onto the grid of the master.
This does spatial resampling the whole dataset, e.g., all
Expand All @@ -238,6 +256,7 @@ def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: in
:param ds_slave: xr.Dataset that will be resampled on the masters' grid
:param method_us: Interpolation method for upsampling, see resampling.py
:param method_ds: Interpolation method for downsampling, see resampling.py
:param monitor: a progress monitor.
:return: xr.Dataset The resampled slave dataset
"""
# Find lat/lon bounds of the intersection of master and slave grids. The
Expand All @@ -257,10 +276,11 @@ def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: in
lon = ds_master['lon'].sel(lon=lon_slice)
lat = ds_master['lat'].sel(lat=lat_slice)

retset = ds_slave.sel(lat=lat_slice, lon=lon_slice)

kwargs = {'lon': lon, 'lat': lat, 'method_us': method_us, 'method_ds': method_ds}
retset = ds_slave.apply(_resample_array, keep_attrs=True, **kwargs)
if monitor.is_cancelled():
raise InterruptedError
with monitor.starting("coregister dataset", len(ds_slave.data_vars)):
kwargs = {'lon': lon, 'lat': lat, 'method_us': method_us, 'method_ds': method_ds, 'monitor': monitor}
retset = ds_slave.apply(_resample_array, keep_attrs=True, **kwargs)

# Set/Update global geospatial attributes
retset.attrs['geospatial_lat_min'] = retset.lat.values[0]
Expand Down
9 changes: 6 additions & 3 deletions cate/ops/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
from cate.core.objectio import OBJECT_IO_REGISTRY, ObjectIO
from cate.core.op import OP_REGISTRY, op_input, op
from cate.core.types import VarNamesLike, TimeRangeLike, PolygonLike, DictLike, FileLike
from cate.core.utils import DaskMonitor
from cate.ops.normalize import normalize as normalize_op
from cate.util import Monitor

_ALL_FILE_FILTER = dict(name='All Files', extensions=['*'])

Expand All @@ -54,7 +56,6 @@ def open_dataset(ds_name: str,
:param time_range: Optional time range of the requested dataset
:param region: Optional spatial region of the requested dataset
:param var_names: Optional names of variables of the requested dataset
:param monitor: a progress monitor, used only if *sync* is ``True``.
:param normalize: Whether to normalize the dataset's geo- and time-coding upon opening. See operation ``normalize``.
:return: An new dataset instance.
"""
Expand All @@ -72,15 +73,17 @@ def open_dataset(ds_name: str,
@op_input('ds')
@op_input('file', file_open_mode='w', file_filters=[dict(name='NetCDF', extensions=['nc']), _ALL_FILE_FILTER])
@op_input('format', value_set=['NETCDF4', 'NETCDF4_CLASSIC', 'NETCDF3_64BIT', 'NETCDF3_CLASSIC'])
def save_dataset(ds: xr.Dataset, file: str, format: str = None):
def save_dataset(ds: xr.Dataset, file: str, format: str = None, monitor: Monitor = Monitor.NONE):
"""
Save a dataset to NetCDF file.

:param ds: The dataset
:param file: File path
:param format: NetCDF format flavour, one of 'NETCDF4', 'NETCDF4_CLASSIC', 'NETCDF3_64BIT', 'NETCDF3_CLASSIC'.
:param monitor: a progress monitor.
"""
ds.to_netcdf(file, format=format)
with DaskMonitor("save_dataset", monitor):
ds.to_netcdf(file, format=format)


# noinspection PyShadowingBuiltins
Expand Down
4 changes: 2 additions & 2 deletions cate/util/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Monitor(metaclass=ABCMeta):
Derived classes must implement also the following two abstract methods, if they want cancellation support:
:py:meth:`cancel` and :py:meth:`is_cancelled`.

Pass ``Monitor.NULL`` to functions that expect a monitor instead of passing ``None``.
Pass ``Monitor.NONE`` to functions that expect a monitor instead of passing ``None``.

Given here is an example of how progress monitors should be used by functions:::

Expand All @@ -73,7 +73,7 @@ def long_running_task(a, b, c, monitor):

"""

#: A valid monitor that effectively does nothing. Use ``Monitor.NULL`` it instead of passing ``None`` to
#: A valid monitor that effectively does nothing. Use ``Monitor.NONE`` it instead of passing ``None`` to
#: functions and methods that expect an argument of type ``Monitor``.
NONE = None

Expand Down
2 changes: 1 addition & 1 deletion doc/source/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ API clients for a given context. The ``monitor`` module defines two useful imple
* ``ConsoleMonitor``: a monitor that is used by the command-line interface
* ``ChildMonitor``: a sub-monitor that can be passed to sub-tasks called from the current task

In addition, the ``Monitor.NULL`` object, is a monitor singleton that basically does nothing. It is used instead
In addition, the ``Monitor.NONE`` object, is a monitor singleton that basically does nothing. It is used instead
of passing ``None`` into methods that don't require monitoring but expect a non-``None`` argument value.


Expand Down
Empty file added test/ops/__init__.py
Empty file.
40 changes: 37 additions & 3 deletions test/ops/test_coregistration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from cate.ops import coregister
from cate.ops.coregistration import _find_intersection
from ..util.test_monitor import RecordingMonitor


class TestCoregistration(TestCase):
Expand All @@ -32,18 +33,20 @@ def test_nominal(self):
'second': (['time', 'lat', 'lon'], np.array([np.eye(4, 8), np.eye(4, 8)])),
'lat': np.linspace(-67.5, 67.5, 4),
'lon': np.linspace(-157.5, 157.5, 8),
'time': np.array([1, 2])})
'time': np.array([1, 2])}).chunk(chunks={'lat': 2, 'lon': 4})

ds_coarse = xr.Dataset({
'first': (['time', 'lat', 'lon'], np.array([np.eye(3, 6), np.eye(3, 6)])),
'second': (['time', 'lat', 'lon'], np.array([np.eye(3, 6), np.eye(3, 6)])),
'lat': np.linspace(-60, 60, 3),
'lon': np.linspace(-150, 150, 6),
'time': np.array([1, 2])})
'time': np.array([1, 2])}).chunk(chunks={'lat': 3, 'lon': 3})

# Test that the coarse dataset has been resampled onto the grid
# of the finer dataset.
ds_coarse_resampled = coregister(ds_fine, ds_coarse)
rm = RecordingMonitor()
ds_coarse_resampled = coregister(ds_fine, ds_coarse, monitor=rm)

expected = xr.Dataset({
'first': (['time', 'lat', 'lon'], np.array([[[1., 0.28571429, 0., 0., 0., 0., 0., 0.],
[0.33333333, 0.57142857, 0.38095238, 0., 0., 0., 0., 0.],
Expand All @@ -70,6 +73,37 @@ def test_nominal(self):
'time': np.array([1, 2])})
assert_almost_equal(ds_coarse_resampled['first'].values, expected['first'].values)

self.assertEqual([('start', 'coregister dataset', 2),
('progress', 0.0, 'coregister dataarray', 0),
('progress', 0.0, 'coregister dataarray: resample time slice', 0),
('progress', 0.125, None, 6),
('progress', 0.125, None, 13),
('progress', 0.125, None, 19),
('progress', 0.125, None, 25),
('progress', 0.0, 'coregister dataarray: resample time slice', 25),
('progress', 0.0, 'coregister dataarray: resample time slice', 25),
('progress', 0.125, None, 31),
('progress', 0.125, None, 38),
('progress', 0.125, None, 44),
('progress', 0.125, None, 50),
('progress', 0.0, 'coregister dataarray: resample time slice', 50),
('progress', 0.0, 'coregister dataarray', 50),
('progress', 0.0, 'coregister dataarray', 50),
('progress', 0.0, 'coregister dataarray: resample time slice', 50),
('progress', 0.125, None, 56),
('progress', 0.125, None, 63),
('progress', 0.125, None, 69),
('progress', 0.125, None, 75),
('progress', 0.0, 'coregister dataarray: resample time slice', 75),
('progress', 0.0, 'coregister dataarray: resample time slice', 75),
('progress', 0.125, None, 81),
('progress', 0.125, None, 88),
('progress', 0.125, None, 94),
('progress', 0.125, None, 100),
('progress', 0.0, 'coregister dataarray: resample time slice', 100),
('progress', 0.0, 'coregister dataarray', 100),
('done',)], rm.records)

# Test that the fine dataset has been resampled (aggregated)
# onto the grid of the coarse dataset.
ds_fine_resampled = coregister(ds_coarse, ds_fine)
Expand Down