Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Get progress from xarray / dask tasks #290

Merged
merged 13 commits into from
Jul 12, 2017
2 changes: 1 addition & 1 deletion cate/core/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
**Operation monitoring**

:Description: Operation registration should recognise an optional *monitor* argument of a user function:
``f(*args, monitor=Monitor.NULL, **kwargs)``. In this case the a monitor (of type :py:class:`Monitor`) will be passed
``f(*args, monitor=Monitor.NONE, **kwargs)``. In this case the a monitor (of type :py:class:`Monitor`) will be passed
by the framework to the user function in order to observe the progress and to cancel an operation.

----
Expand Down
10 changes: 4 additions & 6 deletions cate/ds/esa_cci_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from cate.core.cdm import Schema
from cate.core.ds import DataStore, DataSource, open_xarray_dataset, DATA_STORE_REGISTRY, get_data_stores_path
from cate.core.types import PolygonLike, TimeRangeLike, VarNamesLike
from cate.util import to_datetime, Monitor
from cate.util import to_datetime, Monitor, Cancellation

Time = Union[str, datetime]
TimeRange = Tuple[Time, Time]
Expand Down Expand Up @@ -255,7 +255,7 @@ def _sync_files(self, ftp, ftp_base_dir, expected_remote_files, num_of_expected_
file_set_size = 0
for expected_dir_path, expected_filename_dict in expected_remote_files.items():
if monitor.is_cancelled():
return
raise Cancellation()
ftp_dir = ftp_base_dir + '/' + expected_dir_path
try:
ftp.cwd(ftp_dir)
Expand All @@ -274,7 +274,7 @@ def _sync_files(self, ftp, ftp_base_dir, expected_remote_files, num_of_expected_

for existing_filename, facts in remote_dir_content:
if monitor.is_cancelled():
return
raise Cancellation()
if facts.get('type', None) == 'file' and existing_filename in expected_filename_dict:
# update expected_filename_dict with facts of existing_filename
expected_filename_dict[existing_filename] = facts
Expand All @@ -292,7 +292,7 @@ def _sync_files(self, ftp, ftp_base_dir, expected_remote_files, num_of_expected_
checked_files_number += 1
child_monitor = monitor.child(work=1.)
if monitor.is_cancelled():
return
raise Cancellation()
if last_cwd is not existing_file_info['path']:
ftp.cwd(ftp_base_dir + '/' + existing_file_info['path'])
last_cwd = existing_file_info['path']
Expand Down Expand Up @@ -438,8 +438,6 @@ def _start(self) -> DownloadStatus:
return DownloadStatus.SUCCESS if error_msg is None else DownloadStatus.FAILURE

def on_new_block(self, bytes_block):
if self._monitor.is_cancelled():
raise KeyboardInterrupt()
self._fp.write(bytes_block)
block_size = len(bytes_block)
self._bytes_written += block_size
Expand Down
7 changes: 0 additions & 7 deletions cate/ds/esa_cci_odp.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ def _fetch_solr_json(base_url, query_args, offset=0, limit=3500, timeout=10, mon
with monitor.starting("Loading", 10):
while True:
monitor.progress(work=1)
if monitor.is_cancelled():
raise InterruptedError
paging_query_args = dict(query_args or {})
paging_query_args.update(offset=offset, limit=limit, format='application/solr+json')
url = base_url + '?' + urllib.parse.urlencode(paging_query_args)
Expand Down Expand Up @@ -854,16 +852,12 @@ def _make_local(self,
file_number = 1

for filename, coverage_from, coverage_to, file_size, url in outdated_file_list:
if monitor.is_cancelled():
raise InterruptedError
dataset_file = os.path.join(local_path, filename)
sub_monitor = monitor.child(work=1.0)

# noinspection PyUnusedLocal
def reporthook(block_number, read_size, total_file_size):
dl_stat.handle_chunk(read_size)
if monitor.is_cancelled():
raise InterruptedError
sub_monitor.progress(work=read_size, msg=str(dl_stat))

sub_monitor_msg = "file %d of %d" % (file_number, len(outdated_file_list))
Expand All @@ -872,7 +866,6 @@ def reporthook(block_number, read_size, total_file_size):
file_number += 1
local_ds.add_dataset(os.path.join(local_id, filename), (coverage_from, coverage_to))
local_ds.save(True)
monitor.done()

def make_local(self,
local_name: str,
Expand Down
61 changes: 37 additions & 24 deletions cate/ops/coregistration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import xarray as xr

from cate.core.op import op_input, op, op_return
from cate.util import Monitor

from cate.ops import resampling

Expand All @@ -50,7 +51,8 @@
def coregister(ds_master: xr.Dataset,
ds_slave: xr.Dataset,
method_us: str = 'linear',
method_ds: str = 'mean') -> xr.Dataset:
method_ds: str = 'mean',
monitor: Monitor = Monitor.NONE) -> xr.Dataset:
"""
Perform coregistration of two datasets by resampling the slave dataset unto the
grid of the master. If upsampling has to be performed, this is achieved using
Expand Down Expand Up @@ -78,6 +80,7 @@ def coregister(ds_master: xr.Dataset,
:param ds_slave: The dataset that will be resampled
:param method_us: Interpolation method to use for upsampling.
:param method_ds: Interpolation method to use for downsampling.
:param monitor: a progress monitor.
:return: The slave dataset resampled on the grid of the master
"""
# Check if the grids of the provided datasets are equidistant and pixel
Expand Down Expand Up @@ -122,7 +125,7 @@ def coregister(ds_master: xr.Dataset,
methods_us = {'nearest': 10, 'linear': 11}
methods_ds = {'first': 50, 'last': 51, 'mean': 54, 'mode': 56, 'var': 57, 'std': 58}

return _resample_dataset(ds_master, ds_slave, methods_us[method_us], methods_ds[method_ds])
return _resample_dataset(ds_master, ds_slave, methods_us[method_us], methods_ds[method_ds], monitor)


def _is_equidistant(array: np.ndarray) -> bool:
Expand Down Expand Up @@ -179,7 +182,7 @@ def _within_bounds(array: np.ndarray, low_bound) -> bool:
return (array[0] >= low_bound and array[-1] <= abs(low_bound))


def _resample_slice(arr_slice: xr.DataArray, w: int, h: int, ds_method: int, us_method: int) -> xr.DataArray:
def _resample_slice(arr_slice: xr.DataArray, w: int, h: int, ds_method: int, us_method: int, parent_monitor: Monitor) -> xr.DataArray:
"""
Resample a single time slice of a larger xr.DataArray

Expand All @@ -188,18 +191,21 @@ def _resample_slice(arr_slice: xr.DataArray, w: int, h: int, ds_method: int, us_
:param h: The desired new height (amount of latitudes)
:param ds_method: Downsampling method, see resampling.py
:param us_method: Upsampling method, see resampling.py
:param parent_monitor: the parent progress monitor.
:return: resampled slice
"""
result = resampling.resample_2d(np.ma.masked_invalid(arr_slice.values),
w,
h,
ds_method,
us_method)
return xr.DataArray(result)
monitor = parent_monitor.child(1)
with monitor.observing("resample time slice"):
result = resampling.resample_2d(np.ma.masked_invalid(arr_slice.values),
w,
h,
ds_method,
us_method)
return xr.DataArray(result)


def _resample_array(array: xr.DataArray, lon: xr.DataArray, lat: xr.DataArray, method_us: int,
method_ds: int) -> xr.DataArray:
method_ds: int, parent_monitor: Monitor) -> xr.DataArray:
"""
Resample the given xr.DataArray to a new grid defined by lat and lon

Expand All @@ -208,24 +214,31 @@ def _resample_array(array: xr.DataArray, lon: xr.DataArray, lat: xr.DataArray, m
:param lon: 'lon' xr.DataArray attribute for the new grid
:param method_us: Interpolation method to use for upsampling, see resampling.py
:param method_ds: Interpolation method to use for downsampling, see resampling.py
:param parent_monitor: the parent progress monitor.
:return: The resampled array
"""
# Determine width and height of the resampled array
width = lon.values.size
height = lat.values.size

kwargs = {'w': width, 'h': height, 'ds_method': method_ds, 'us_method': method_us}
temp_array = array.groupby('time').apply(_resample_slice, **kwargs)
chunks = list(temp_array.shape[1:])
chunks.insert(0, 1)
return xr.DataArray(temp_array.values,
name=array.name,
dims=array.dims,
coords={'time': array.time, 'lat': lat, 'lon': lon},
attrs=array.attrs).chunk(chunks=chunks)
monitor = parent_monitor.child(1)

kwargs = {'w': width, 'h': height, 'ds_method': method_ds, 'us_method': method_us, 'parent_monitor': monitor}
group_by_time = array.groupby('time')
num_time_steps = len(group_by_time)

def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: int, method_ds: int) -> xr.Dataset:
with monitor.starting("coregister dataarray", total_work=num_time_steps):
temp_array = group_by_time.apply(_resample_slice, **kwargs)
chunks = list(temp_array.shape[1:])
chunks.insert(0, 1)
return xr.DataArray(temp_array.values,
name=array.name,
dims=array.dims,
coords={'time': array.time, 'lat': lat, 'lon': lon},
attrs=array.attrs).chunk(chunks=chunks)


def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: int, method_ds: int, monitor: Monitor) -> xr.Dataset:
"""
Resample slave onto the grid of the master.
This does spatial resampling the whole dataset, e.g., all
Expand All @@ -238,6 +251,7 @@ def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: in
:param ds_slave: xr.Dataset that will be resampled on the masters' grid
:param method_us: Interpolation method for upsampling, see resampling.py
:param method_ds: Interpolation method for downsampling, see resampling.py
:param monitor: a progress monitor.
:return: xr.Dataset The resampled slave dataset
"""
# Find lat/lon bounds of the intersection of master and slave grids. The
Expand All @@ -257,10 +271,9 @@ def _resample_dataset(ds_master: xr.Dataset, ds_slave: xr.Dataset, method_us: in
lon = ds_master['lon'].sel(lon=lon_slice)
lat = ds_master['lat'].sel(lat=lat_slice)

retset = ds_slave.sel(lat=lat_slice, lon=lon_slice)

kwargs = {'lon': lon, 'lat': lat, 'method_us': method_us, 'method_ds': method_ds}
retset = ds_slave.apply(_resample_array, keep_attrs=True, **kwargs)
with monitor.starting("coregister dataset", len(ds_slave.data_vars)):
kwargs = {'lon': lon, 'lat': lat, 'method_us': method_us, 'method_ds': method_ds, 'parent_monitor': monitor}
retset = ds_slave.apply(_resample_array, keep_attrs=True, **kwargs)

# Set/Update global geospatial attributes
retset.attrs['geospatial_lat_min'] = retset.lat.values[0]
Expand Down
8 changes: 5 additions & 3 deletions cate/ops/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from cate.core.op import OP_REGISTRY, op_input, op
from cate.core.types import VarNamesLike, TimeRangeLike, PolygonLike, DictLike, FileLike
from cate.ops.normalize import normalize as normalize_op
from cate.util import Monitor

_ALL_FILE_FILTER = dict(name='All Files', extensions=['*'])

Expand All @@ -54,7 +55,6 @@ def open_dataset(ds_name: str,
:param time_range: Optional time range of the requested dataset
:param region: Optional spatial region of the requested dataset
:param var_names: Optional names of variables of the requested dataset
:param monitor: a progress monitor, used only if *sync* is ``True``.
:param normalize: Whether to normalize the dataset's geo- and time-coding upon opening. See operation ``normalize``.
:return: An new dataset instance.
"""
Expand All @@ -72,15 +72,17 @@ def open_dataset(ds_name: str,
@op_input('ds')
@op_input('file', file_open_mode='w', file_filters=[dict(name='NetCDF', extensions=['nc']), _ALL_FILE_FILTER])
@op_input('format', value_set=['NETCDF4', 'NETCDF4_CLASSIC', 'NETCDF3_64BIT', 'NETCDF3_CLASSIC'])
def save_dataset(ds: xr.Dataset, file: str, format: str = None):
def save_dataset(ds: xr.Dataset, file: str, format: str = None, monitor: Monitor = Monitor.NONE):
"""
Save a dataset to NetCDF file.

:param ds: The dataset
:param file: File path
:param format: NetCDF format flavour, one of 'NETCDF4', 'NETCDF4_CLASSIC', 'NETCDF3_64BIT', 'NETCDF3_CLASSIC'.
:param monitor: a progress monitor.
"""
ds.to_netcdf(file, format=format)
with monitor.observing("save_dataset"):
ds.to_netcdf(file, format=format)


# noinspection PyShadowingBuiltins
Expand Down
55 changes: 31 additions & 24 deletions cate/ops/outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from cate.core.op import op, op_input, op_return
from cate.core.types import VarNamesLike
from cate.util import Monitor
from cate import __version__


Expand All @@ -43,10 +44,11 @@
@op_return(add_history=True)
def detect_outliers(ds: xr.Dataset,
var: VarNamesLike.TYPE,
threshold_low: float=0.05,
threshold_high: float=0.95,
quantiles: bool=True,
mask: bool=False) -> xr.Dataset:
threshold_low: float = 0.05,
threshold_high: float = 0.95,
quantiles: bool = True,
mask: bool = False,
monitor: Monitor = Monitor.NONE) -> xr.Dataset:
"""
Detect outliers in the given Dataset.

Expand All @@ -61,11 +63,12 @@ def detect_outliers(ds: xr.Dataset,
select multiple variables matching a pattern.
:param threshold_low: Values less or equal to this will be removed/masked
:param threshold_high: Values greater or equal to this will be removed/masked
:bool quantiles: If True, threshold values are treated as quantiles,
:param quantiles: If True, threshold values are treated as quantiles,
otherwise as absolute values.
:bool mask: If True, an ancillary variable containing flag values for
:param mask: If True, an ancillary variable containing flag values for
outliers will be added to the dataset. Otherwise, outliers will be replaced
with nan directly in the data variables.
:param monitor: A progress monitor.
:return: The dataset with outliers masked or replaced with nan
"""
# Create a list of variable names on which to perform outlier detection
Expand All @@ -80,29 +83,33 @@ def detect_outliers(ds: xr.Dataset,
# For each array in the dataset for which we should detect outliers, detect
# outliers
ret_ds = ds.copy()
for var_name in variables:
if quantiles:
# Get threshold values
threshold_low = ret_ds[var_name].quantile(threshold_low)
threshold_high = ret_ds[var_name].quantile(threshold_high)

# If not mask, put nans in the data arrays for min/max outliers
if not mask:
arr = ret_ds[var_name]
attrs = arr.attrs
ret_ds[var_name] = arr.where((arr > threshold_low) &
(arr < threshold_high))
ret_ds[var_name].attrs = attrs
else:
# Create and add a data variable containing the mask for this data
# variable
_mask_outliers(ret_ds, var_name, threshold_low, threshold_high)
with monitor.starting("detect_outliers", total_work=len(variables) * 3):
for var_name in variables:
if quantiles:
# Get threshold values
with monitor.child(1).observing("quantile low"):
threshold_low = ret_ds[var_name].quantile(threshold_low)
with monitor.child(1).observing("quantile high"):
threshold_high = ret_ds[var_name].quantile(threshold_high)
else:
monitor.progress(2)
# If not mask, put nans in the data arrays for min/max outliers
if not mask:
arr = ret_ds[var_name]
attrs = arr.attrs
ret_ds[var_name] = arr.where((arr > threshold_low) & (arr < threshold_high))
ret_ds[var_name].attrs = attrs
else:
# Create and add a data variable containing the mask for this data
# variable
_mask_outliers(ret_ds, var_name, threshold_low, threshold_high)
monitor.progress(1)

return ret_ds


def _mask_outliers(ds: xr.Dataset, var_name: str, threshold_low: float,
threshold_high: float) -> xr.Dataset:
threshold_high: float):
"""
Create a mask data array for the given variable of the dataset and given
absolute threshold values. Add the mask data array as an ancillary data
Expand Down
2 changes: 0 additions & 2 deletions cate/ops/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ def no_op(num_steps: int = 10,
for i in range(num_steps):
time.sleep(step_duration)
monitor.progress(1.0, 'Step %s of %s doing nothing' % (i + 1, num_steps))
if monitor.is_cancelled():
raise InterruptedError
if fail_after:
raise ValueError('Intentionally failed after doing nothing.')
monitor.done()
2 changes: 1 addition & 1 deletion cate/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from .extend import extend
from .misc import *
from .monitor import Monitor, ChildMonitor, ConsoleMonitor
from .monitor import Monitor, ChildMonitor, ConsoleMonitor, Cancellation
from .namespace import Namespace
from .opmetainf import OpMetaInfo
from .undefined import UNDEFINED
Expand Down
Loading