From d2e3d3eeb370375b3197b4c75ca775ec706f24ea Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 27 Sep 2017 08:57:09 +0200 Subject: [PATCH 1/4] Let no_op() return something that doesn't falsify --- cate/ops/utility.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cate/ops/utility.py b/cate/ops/utility.py index fe799c305..b9a9f942e 100644 --- a/cate/ops/utility.py +++ b/cate/ops/utility.py @@ -134,7 +134,7 @@ def no_op(num_steps: int = 10, step_duration: float = 0.5, fail_before: bool = False, fail_after: bool = False, - monitor: Monitor = Monitor.NONE): + monitor: Monitor = Monitor.NONE) -> bool: """ An operation that basically does nothing but spending configurable time. It may be useful for testing purposes. @@ -144,6 +144,7 @@ def no_op(num_steps: int = 10, :param fail_before: If the operation should fail before spending time doing nothing. :param fail_after: If the operation should fail after spending time doing nothing. :param monitor: A progress monitor. + :return: Always True """ import time monitor.start('Computing nothing', num_steps) @@ -155,6 +156,7 @@ def no_op(num_steps: int = 10, if fail_after: raise ValueError('Intentionally failed after doing nothing.') monitor.done() + return True @op(tags=['utility', 'internal']) From d8a623c6e4ca9654252f14b08241fc1ce7a15f13 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 27 Sep 2017 08:58:27 +0200 Subject: [PATCH 2/4] trying to reproduce #306 by unit-test --- cate/core/workspace.py | 10 ++++++++-- test/core/test_workspace.py | 26 +++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/cate/core/workspace.py b/cate/core/workspace.py index 82d061881..dea3021d6 100644 --- a/cate/core/workspace.py +++ b/cate/core/workspace.py @@ -505,8 +505,9 @@ def set_resource(self, :param res_name: An optional resource name. If given and not empty, it must be unique within this workspace. If not provided, a workspace-unique resource name will be generated. - :param op_name: - :param op_kwargs: + :param op_name: The name of a registered operation. + :param op_kwargs: The operation's keyword arguments. Each argument must be a dict having either a "source" or + "value" key. :param overwrite: :param validate_args: :return: The resource name, either the one passed in or a generated one. @@ -521,6 +522,11 @@ def set_resource(self, if not res_name: res_name = self._new_resource_name(op) + # TODO (forman): #391 + print('res_name=%s' %res_name) + import time + time.sleep(0.25) + new_step = OpStep(op, node_id=res_name) workflow = self.workflow diff --git a/test/core/test_workspace.py b/test/core/test_workspace.py index 0110872f8..4cdb449dd 100644 --- a/test/core/test_workspace.py +++ b/test/core/test_workspace.py @@ -281,6 +281,29 @@ def test_set_and_rename_and_execute_step(self): self.assertEqual(ws.resource_cache.get('Y'), 5) self.assertEqual(ws.resource_cache.get('Z'), 5) + # TODO (forman): #391 + def test_set_resource_is_reentrant(self): + from concurrent.futures import ThreadPoolExecutor + + ws = Workspace('/path', Workflow(OpMetaInfo('workspace_workflow', header=dict(description='Test!')))) + + def set_resource_and_execute(): + res_name = ws.set_resource('cate.ops.utility.no_op', + op_kwargs=dict(num_steps=dict(value=10), + step_duration=dict(value=0.05))) + ws.execute_workflow(res_name=res_name) + return res_name + + num_res = 5 + res_names = [] + with ThreadPoolExecutor(max_workers=2 * num_res) as executor: + for i in range(num_res): + res_names.append(executor.submit(set_resource_and_execute)) + + actual_res_names = {f.result() for f in res_names} + expected_res_names = {'res_%s' % (i + 1) for i in range(num_res)} + self.assertEqual(actual_res_names, expected_res_names) + def test_example(self): expected_json_text = """{ "schema_version": 1, @@ -328,7 +351,8 @@ def test_example(self): with self.assertRaises(ValueError) as e: ws.set_resource('cate.ops.timeseries.tseries_point', - mk_op_kwargs(ds="@p", point="iih!", var="precipitation"), res_name='ts2', validate_args=True) + mk_op_kwargs(ds="@p", point="iih!", var="precipitation"), res_name='ts2', + validate_args=True) self.assertEqual(str(e.exception), "input 'point' for operation 'cate.ops.timeseries.tseries_point': " "cannot convert value to PointLike") From e43afe2809b8e0fc4c0542a39b9470fcaf0c7d5b Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 27 Sep 2017 14:22:52 +0200 Subject: [PATCH 3/4] lowered type --- cate/util/misc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cate/util/misc.py b/cate/util/misc.py index 5efed5622..a3e032b5d 100644 --- a/cate/util/misc.py +++ b/cate/util/misc.py @@ -29,7 +29,7 @@ from contextlib import contextmanager from datetime import datetime, date, timedelta from io import StringIO -from typing import Union, Tuple, Sequence, Optional +from typing import Union, Tuple, Sequence, Optional, Iterable import numpy as np @@ -389,7 +389,7 @@ def filter_fileset(names: Sequence[str], return filtered_names -def new_indexed_name(names: Sequence[str], pattern: str) -> str: +def new_indexed_name(names: Iterable[str], pattern: str) -> str: """ Return a new name that is unique in *names* and that conforms to *pattern*. The argument *pattern* must contain a single ``"{index}"`` substring. From b4524bb5f0513b5593eb583bb39732d2383b721c Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 27 Sep 2017 14:35:35 +0200 Subject: [PATCH 4/4] fixes #26 fixes #391 --- cate/core/workspace.py | 365 +++++++++++++++++++++-------------------- 1 file changed, 187 insertions(+), 178 deletions(-) diff --git a/cate/core/workspace.py b/cate/core/workspace.py index dea3021d6..56858b8be 100644 --- a/cate/core/workspace.py +++ b/cate/core/workspace.py @@ -27,6 +27,7 @@ import shutil import sys from collections import OrderedDict +from threading import RLock from typing import List, Any, Dict, Optional import fiona @@ -95,6 +96,7 @@ def __init__(self, base_dir: str, workflow: Workflow, is_modified: bool = False) self._is_closed = False self._resource_cache = ValueCache() self._user_data = dict() + self._lock = RLock() def __del__(self): self.close() @@ -181,42 +183,44 @@ def open(cls, base_dir: str, monitor: Monitor = Monitor.NONE) -> 'Workspace': def close(self): if self._is_closed: return - self._resource_cache.close() - # Remove all resource files that are no longer required - if os.path.isdir(self.workspace_dir): - persistent_ids = {step.id for step in self.workflow.steps if step.persistent} - for filename in os.listdir(self.workspace_dir): - res_file = os.path.join(self.workspace_dir, filename) - if os.path.isfile(res_file) and filename.endswith('.nc'): - res_name = filename[0: -3] - if res_name not in persistent_ids: - try: - os.remove(res_file) - except (OSError, IOError) as e: - print('error:', e) + with self._lock: + self._resource_cache.close() + # Remove all resource files that are no longer required + if os.path.isdir(self.workspace_dir): + persistent_ids = {step.id for step in self.workflow.steps if step.persistent} + for filename in os.listdir(self.workspace_dir): + res_file = os.path.join(self.workspace_dir, filename) + if os.path.isfile(res_file) and filename.endswith('.nc'): + res_name = filename[0: -3] + if res_name not in persistent_ids: + try: + os.remove(res_file) + except (OSError, IOError) as e: + print('error:', e) def save(self, monitor: Monitor = Monitor.NONE): self._assert_open() - base_dir = self.base_dir - try: - if not os.path.isdir(base_dir): - os.mkdir(base_dir) - workspace_dir = self.workspace_dir - if not os.path.isdir(workspace_dir): - os.mkdir(workspace_dir) - self.workflow.store(self.workflow_file) - - # Write resources for all persistent steps - persistent_steps = [step for step in self.workflow.steps if step.persistent] - if persistent_steps: - with monitor.starting('Writing resources', len(persistent_steps)): - for step in persistent_steps: - self._write_resource_to_file(step.id) - monitor.progress(1) - - self._is_modified = False - except (IOError, OSError) as e: - raise WorkspaceError(e) + with self._lock: + base_dir = self.base_dir + try: + if not os.path.isdir(base_dir): + os.mkdir(base_dir) + workspace_dir = self.workspace_dir + if not os.path.isdir(workspace_dir): + os.mkdir(workspace_dir) + self.workflow.store(self.workflow_file) + + # Write resources for all persistent steps + persistent_steps = [step for step in self.workflow.steps if step.persistent] + if persistent_steps: + with monitor.starting('Writing resources', len(persistent_steps)): + for step in persistent_steps: + self._write_resource_to_file(step.id) + monitor.progress(1) + + self._is_modified = False + except (IOError, OSError) as e: + raise WorkspaceError(e) def _write_resource_to_file(self, res_name): res_value = self._resource_cache.get(res_name) @@ -241,13 +245,14 @@ def _read_resource_from_file(self, res_name): # <<< Issue #270 def set_resource_persistence(self, res_name: str, persistent: bool): - self._assert_open() - res_step = self.workflow.find_node(res_name) - if res_step is None: - raise WorkspaceError('Resource "%s" not found' % res_name) - if res_step.persistent == persistent: - return - res_step.persistent = persistent + with self._lock: + self._assert_open() + res_step = self.workflow.find_node(res_name) + if res_step is None: + raise WorkspaceError('Resource "%s" not found' % res_name) + if res_step.persistent == persistent: + return + res_step.persistent = persistent @classmethod def from_json_dict(cls, json_dict): @@ -258,14 +263,15 @@ def from_json_dict(cls, json_dict): return Workspace(base_dir, workflow, is_modified=is_modified) def to_json_dict(self): - self._assert_open() - return OrderedDict([('base_dir', self.base_dir), - ('is_scratch', self.is_scratch), - ('is_modified', self.is_modified), - ('is_saved', os.path.exists(self.workspace_dir)), - ('workflow', self.workflow.to_json_dict()), - ('resources', self._resources_to_json_list()) - ]) + with self._lock: + self._assert_open() + return OrderedDict([('base_dir', self.base_dir), + ('is_scratch', self.is_scratch), + ('is_modified', self.is_modified), + ('is_saved', os.path.exists(self.workspace_dir)), + ('workflow', self.workflow.to_json_dict()), + ('resources', self._resources_to_json_list()) + ]) def _resources_to_json_list(self): resource_descriptors = [] @@ -453,45 +459,48 @@ def _is_y_flipped(variable): return lat_coords.to_index().is_monotonic_increasing def delete(self): - self.close() - try: - shutil.rmtree(self.workspace_dir) - except (IOError, OSError) as e: - raise WorkspaceError(e) + with self._lock: + self.close() + try: + shutil.rmtree(self.workspace_dir) + except (IOError, OSError) as e: + raise WorkspaceError(e) def delete_resource(self, res_name: str): - res_step = self.workflow.find_node(res_name) - if res_step is None: - raise WorkspaceError('Resource "%s" not found' % res_name) + with self._lock: + res_step = self.workflow.find_node(res_name) + if res_step is None: + raise WorkspaceError('Resource "%s" not found' % res_name) - dependent_steps = [] - for step in self.workflow.steps: - if step is not res_step and step.requires(res_step): - dependent_steps.append(step.id) + dependent_steps = [] + for step in self.workflow.steps: + if step is not res_step and step.requires(res_step): + dependent_steps.append(step.id) - if dependent_steps: - raise WorkspaceError('Cannot delete resource "%s" because the following resource(s) ' - 'depend on it: %s' % (res_name, ', '.join(dependent_steps))) + if dependent_steps: + raise WorkspaceError('Cannot delete resource "%s" because the following resource(s) ' + 'depend on it: %s' % (res_name, ', '.join(dependent_steps))) - self.workflow.remove_step(res_step) - if res_name in self._resource_cache: - del self._resource_cache[res_name] + self.workflow.remove_step(res_step) + if res_name in self._resource_cache: + del self._resource_cache[res_name] def rename_resource(self, res_name: str, new_res_name: str) -> None: - res_step = self.workflow.find_node(res_name) - if res_step is None: - raise WorkspaceError('Resource "%s" not found' % res_name) - res_step_new = self.workflow.find_node(new_res_name) - if res_step_new is res_step: - return - if res_step_new is not None: - raise WorkspaceError('Resource "%s" cannot be renamed to "%s", ' - 'because "%s" is already in use.' % (res_name, new_res_name, new_res_name)) + with self._lock: + res_step = self.workflow.find_node(res_name) + if res_step is None: + raise WorkspaceError('Resource "%s" not found' % res_name) + res_step_new = self.workflow.find_node(new_res_name) + if res_step_new is res_step: + return + if res_step_new is not None: + raise WorkspaceError('Resource "%s" cannot be renamed to "%s", ' + 'because "%s" is already in use.' % (res_name, new_res_name, new_res_name)) - res_step.set_id(new_res_name) + res_step.set_id(new_res_name) - if res_name in self._resource_cache: - self._resource_cache.rename_key(res_name, new_res_name) + if res_name in self._resource_cache: + self._resource_cache.rename_key(res_name, new_res_name) def set_resource(self, op_name: str, @@ -519,85 +528,84 @@ def set_resource(self, if not op: raise WorkspaceError('Unknown operation "%s"' % op_name) - if not res_name: - res_name = self._new_resource_name(op) - - # TODO (forman): #391 - print('res_name=%s' %res_name) - import time - time.sleep(0.25) - - new_step = OpStep(op, node_id=res_name) - - workflow = self.workflow - - # This namespace will allow us to wire the new resource with existing workflow steps - # We only add step outputs, so we cannot reference another step's input neither. - # This is not a problem because a workspace's workflow doesn't have any inputs - # to be referenced anyway. - namespace = dict() - for step in workflow.steps: - output_namespace = step.outputs - namespace[step.id] = output_namespace - - does_exist = res_name in namespace - if not overwrite and does_exist: - raise WorkspaceError('A resource named "%s" already exists' % res_name) - - if does_exist: - # Prevent resource from self-referencing - namespace.pop(res_name, None) - - # Wire new op_step with outputs from existing steps - for input_name, input_value in op_kwargs.items(): - if input_name not in new_step.inputs: - raise WorkspaceError('"%s" is not an input of operation "%s"' % (input_name, op_name)) - input_port = new_step.inputs[input_name] - - if 'source' in input_value: - source = input_value['source'] - if source is not None: - source = safe_eval(source, namespace) - if isinstance(source, NodePort): - # source is an output NodePort of another step - input_port.source = source - elif isinstance(source, Namespace): - # source is output_namespace of another step - if OpMetaInfo.RETURN_OUTPUT_NAME not in source: - raise WorkspaceError('Illegal argument for input "%s" of operation "%s', (input_name, op_name)) - input_port.source = source[OpMetaInfo.RETURN_OUTPUT_NAME] - elif 'value' in input_value: - # Constant value - input_port.value = input_value['value'] - else: - raise WorkspaceError('Illegal argument for input "%s" of operation "%s', (input_name, op_name)) + with self._lock: + if not res_name: + default_res_pattern = conf.get_default_res_pattern() + res_pattern = op.op_meta_info.header.get('res_pattern', default_res_pattern) + res_name = self._new_resource_name(res_pattern) - if validate_args: - inputs = new_step.inputs - input_values = {kw: inputs[kw].source or inputs[kw].value for kw, v in op_kwargs.items()} - # Validate all values except those of type NodePort (= the sources) - op.op_meta_info.validate_input_values(input_values, [NodePort]) + new_step = OpStep(op, node_id=res_name) - old_step = workflow.find_node(res_name) + workflow = self.workflow - # Collect keys of invalidated cache entries, initialize with res_name - ids_of_invalidated_steps = {res_name} - if old_step is not None: - # Collect all IDs of steps that depend on old_step, if any + # This namespace will allow us to wire the new resource with existing workflow steps + # We only add step outputs, so we cannot reference another step's input neither. + # This is not a problem because a workspace's workflow doesn't have any inputs + # to be referenced anyway. + namespace = dict() for step in workflow.steps: - requires = step.requires(old_step) - if requires: - ids_of_invalidated_steps.add(step.id) - - workflow = self._workflow - # noinspection PyUnusedLocal - workflow.add_step(new_step, can_exist=True) - self._is_modified = True - - # Remove any cached resource values, whose steps became invalidated - for key in ids_of_invalidated_steps: - if key in self._resource_cache: - self._resource_cache[key] = UNDEFINED + output_namespace = step.outputs + namespace[step.id] = output_namespace + + does_exist = res_name in namespace + if not overwrite and does_exist: + raise WorkspaceError('A resource named "%s" already exists' % res_name) + + if does_exist: + # Prevent resource from self-referencing + namespace.pop(res_name, None) + + # Wire new op_step with outputs from existing steps + for input_name, input_value in op_kwargs.items(): + if input_name not in new_step.inputs: + raise WorkspaceError('"%s" is not an input of operation "%s"' % (input_name, op_name)) + input_port = new_step.inputs[input_name] + + if 'source' in input_value: + source = input_value['source'] + if source is not None: + source = safe_eval(source, namespace) + if isinstance(source, NodePort): + # source is an output NodePort of another step + input_port.source = source + elif isinstance(source, Namespace): + # source is output_namespace of another step + if OpMetaInfo.RETURN_OUTPUT_NAME not in source: + raise WorkspaceError('Illegal argument for input "%s" of operation "%s', + (input_name, op_name)) + input_port.source = source[OpMetaInfo.RETURN_OUTPUT_NAME] + elif 'value' in input_value: + # Constant value + input_port.value = input_value['value'] + else: + raise WorkspaceError('Illegal argument for input "%s" of operation "%s', (input_name, op_name)) + + if validate_args: + inputs = new_step.inputs + input_values = {kw: inputs[kw].source or inputs[kw].value for kw, v in op_kwargs.items()} + # Validate all values except those of type NodePort (= the sources) + op.op_meta_info.validate_input_values(input_values, [NodePort]) + + old_step = workflow.find_node(res_name) + + # Collect keys of invalidated cache entries, initialize with res_name + ids_of_invalidated_steps = {res_name} + if old_step is not None: + # Collect all IDs of steps that depend on old_step, if any + for step in workflow.steps: + requires = step.requires(old_step) + if requires: + ids_of_invalidated_steps.add(step.id) + + workflow = self._workflow + # noinspection PyUnusedLocal + workflow.add_step(new_step, can_exist=True) + self._is_modified = True + + # Remove any cached resource values, whose steps became invalidated + for key in ids_of_invalidated_steps: + if key in self._resource_cache: + self._resource_cache[key] = UNDEFINED return res_name @@ -609,31 +617,34 @@ def run_op(self, op_name: str, op_kwargs: OpKwArgs, monitor=Monitor.NONE): if not op: raise WorkspaceError('Unknown operation "%s"' % op_name) - unpacked_op_kwargs = {} - for input_name, input_value in op_kwargs.items(): - if 'source' in input_value: - unpacked_op_kwargs[input_name] = safe_eval(input_value['source'], self.resource_cache) - elif 'value' in input_value: - unpacked_op_kwargs[input_name] = input_value['value'] + with self._lock: + unpacked_op_kwargs = {} + for input_name, input_value in op_kwargs.items(): + if 'source' in input_value: + unpacked_op_kwargs[input_name] = safe_eval(input_value['source'], self.resource_cache) + elif 'value' in input_value: + unpacked_op_kwargs[input_name] = input_value['value'] - with monitor.starting("Running operation '%s'" % op_name, 2): - self.workflow.invoke(context=self._new_context(), monitor=monitor.child(work=1)) - op(monitor=monitor.child(work=1), **unpacked_op_kwargs) + with monitor.starting("Running operation '%s'" % op_name, 2): + self.workflow.invoke(context=self._new_context(), monitor=monitor.child(work=1)) + op(monitor=monitor.child(work=1), **unpacked_op_kwargs) def execute_workflow(self, res_name: str = None, monitor: Monitor = Monitor.NONE): self._assert_open() - if not res_name: - steps = self.workflow.sorted_steps - else: - res_step = self.workflow.find_node(res_name) - if res_step is None: - raise WorkspaceError('Resource "%s" not found' % res_name) - steps = self.workflow.find_steps_to_compute(res_step.id) - if len(steps): - self.workflow.invoke_steps(steps, context=self._new_context(), monitor=monitor) - return steps[-1].get_output_value() - else: - return None + + with self._lock: + if not res_name: + steps = self.workflow.sorted_steps + else: + res_step = self.workflow.find_node(res_name) + if res_step is None: + raise WorkspaceError('Resource "%s" not found' % res_name) + steps = self.workflow.find_steps_to_compute(res_step.id) + if len(steps): + self.workflow.invoke_steps(steps, context=self._new_context(), monitor=monitor) + return steps[-1].get_output_value() + else: + return None def _new_context(self): return dict(value_cache=self._resource_cache, workspace=self) @@ -642,10 +653,8 @@ def _assert_open(self): if self._is_closed: raise WorkspaceError('Workspace is already closed: ' + self._base_dir) - def _new_resource_name(self, op): - default_res_pattern = conf.get_default_res_pattern() - res_pattern = op.op_meta_info.header.get('res_pattern', default_res_pattern) - return new_indexed_name([step.id for step in self.workflow.steps], res_pattern) + def _new_resource_name(self, res_pattern): + return new_indexed_name({step.id for step in self.workflow.steps}, res_pattern) # noinspection PyArgumentList