From 1a880a63e3eb8ddc57e26019acf587ac33d2bee4 Mon Sep 17 00:00:00 2001 From: Katherine Mantel Date: Sat, 2 Sep 2023 02:29:47 +0000 Subject: [PATCH 1/5] condition: correct Condition class mro dependencies - dependency dict must store names, not classes - must exclude own class name from dependencies (but not "alias" class names like And->All or Or->Any --- psyneulink/core/scheduling/condition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psyneulink/core/scheduling/condition.py b/psyneulink/core/scheduling/condition.py index 2d0e1fbfdf3..3ecf1d34933 100644 --- a/psyneulink/core/scheduling/condition.py +++ b/psyneulink/core/scheduling/condition.py @@ -144,7 +144,7 @@ def _parse_condition_arg(arg): for cond_name in graph_scheduler.condition.__all__: sched_module_cond_obj = getattr(graph_scheduler.condition, cond_name) - cond_dependencies[cond_name] = set(sched_module_cond_obj.__mro__[1:]) + cond_dependencies[cond_name] = {c.__name__ for c in sched_module_cond_obj.__mro__ if c.__name__ != cond_name} # iterate in order such that superclass types are before subclass types for cond_name in sorted( From 5cfbc7832edaf56ec44a74bb030595204c134cc1 Mon Sep 17 00:00:00 2001 From: Katherine Mantel Date: Fri, 15 Dec 2023 03:54:17 +0000 Subject: [PATCH 2/5] scheduling: handle empty graph-scheduler object docstring --- psyneulink/core/scheduling/__init__.py | 10 +++++++--- tests/documentation/test_module_docs.py | 16 +++++++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/psyneulink/core/scheduling/__init__.py b/psyneulink/core/scheduling/__init__.py index 34b37a0528e..bf0da2924a8 100644 --- a/psyneulink/core/scheduling/__init__.py +++ b/psyneulink/core/scheduling/__init__.py @@ -59,12 +59,15 @@ if cls.__doc__ is None: try: - cls.__doc__ = f'{getattr(ext_module, cls_name).__doc__}' + ext_cls = getattr(ext_module, cls_name) except AttributeError: # PNL-exclusive object continue + else: + cls.__doc__ = ext_cls.__doc__ - cls.__doc__ = re.sub(pattern, repl, cls.__doc__, flags=re.MULTILINE | re.DOTALL) + if cls.__doc__ is not None: + cls.__doc__ = re.sub(pattern, repl, cls.__doc__, flags=re.MULTILINE | re.DOTALL) for cls, repls in module._doc_subs.items(): if cls is None: @@ -73,7 +76,8 @@ cls = getattr(module, cls) for pattern, repl in repls: - cls.__doc__ = re.sub(pattern, repl, cls.__doc__, flags=re.MULTILINE | re.DOTALL) + if cls.__doc__ is not None: + cls.__doc__ = re.sub(pattern, repl, cls.__doc__, flags=re.MULTILINE | re.DOTALL) del graph_scheduler del re diff --git a/tests/documentation/test_module_docs.py b/tests/documentation/test_module_docs.py index ecc18564bf0..1594455daf2 100644 --- a/tests/documentation/test_module_docs.py +++ b/tests/documentation/test_module_docs.py @@ -47,6 +47,13 @@ def test_other_docs(mod, capsys): fail, total, captured.err, captured.out), pytrace=False) +def consistent_doc_attrs(*objs): + return ( + all(o.__doc__ is None for o in objs) + or all(isinstance(o.__doc__, str) for o in objs) + ) + + @pytest.mark.parametrize( 'mod', [ @@ -77,8 +84,11 @@ def test_scheduler_substitutions(mod): except AttributeError: continue + assert consistent_doc_attrs(cls, ext_cls) # global replacements may not happen in every docstring - assert re.sub(r'\\\d', '', repl) in cls.__doc__ or not re.match(pattern, ext_cls.__doc__) + if cls.__doc__ is not None: + assert re.sub(r'\\\d', '', repl) in cls.__doc__ or not re.match(pattern, ext_cls.__doc__) - ext_module_docstring = getattr(graph_scheduler, mod.__name__.split('.')[-1]).__doc__ - assert re.sub(r'\\\d', '', repl) in mod.__doc__ or not re.match(pattern, ext_module_docstring) + ext_module = getattr(graph_scheduler, mod.__name__.split('.')[-1]).__doc__ + assert consistent_doc_attrs(mod, ext_module) + assert re.sub(r'\\\d', '', repl) in mod.__doc__ or not re.match(pattern, ext_module.__doc__) From 647bee75e225ee2890d81bbd2731e545ab732aad Mon Sep 17 00:00:00 2001 From: Katherine Mantel Date: Mon, 20 Nov 2023 23:46:10 +0000 Subject: [PATCH 3/5] Scheduler: use ConditionSet to store user-specified conds supports graph structure conditions automatically --- psyneulink/core/scheduling/scheduler.py | 12 +++++++----- tests/composition/test_composition.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/psyneulink/core/scheduling/scheduler.py b/psyneulink/core/scheduling/scheduler.py index 3eb3c4c272f..5802df5b191 100644 --- a/psyneulink/core/scheduling/scheduler.py +++ b/psyneulink/core/scheduling/scheduler.py @@ -52,7 +52,9 @@ def __init__( default_execution_id = composition.default_execution_id # TODO: consider integrating something like this into graph-scheduler? - self._user_specified_conds = copy.copy(conditions) if conditions is not None else {} + self._user_specified_conds = graph_scheduler.ConditionSet() + if conditions is not None: + self._user_specified_conds.add_condition_set(copy.copy(conditions)) self._user_specified_termination_conds = copy.copy(termination_conds) if termination_conds is not None else {} super().__init__( @@ -96,7 +98,7 @@ def _validate_conditions(self): ) def add_condition(self, owner, condition): - self._user_specified_conds[owner] = condition + self._user_specified_conds.add_condition(owner, condition) self._add_condition(owner, condition) def _add_condition(self, owner, condition): @@ -104,7 +106,7 @@ def _add_condition(self, owner, condition): super().add_condition(owner, condition) def add_condition_set(self, conditions): - self._user_specified_conds.update(conditions) + self._user_specified_conds.add_condition_set(conditions) self._add_condition_set(conditions) def _add_condition_set(self, conditions): @@ -114,8 +116,8 @@ def _add_condition_set(self, conditions): pass conditions = { - node: _create_as_pnl_condition(cond) - for node, cond in conditions.items() + node: _create_as_pnl_condition(conditions[node]) + for node in conditions } super().add_condition_set(conditions) diff --git a/tests/composition/test_composition.py b/tests/composition/test_composition.py index 248a1e31b4d..313e4b97132 100644 --- a/tests/composition/test_composition.py +++ b/tests/composition/test_composition.py @@ -7932,7 +7932,7 @@ def test_rebuild_scheduler_after_add_node(self): assert comp.scheduler.conditions[C].args == (A, 2) assert comp.scheduler.execution_list[comp.default_execution_id] == [{A}, {A, B}, {C}] - assert set(comp.scheduler._user_specified_conds.keys()) == {B, C} + assert set(comp.scheduler._user_specified_conds.conditions.keys()) == {B, C} def test_rebuild_scheduler_after_remove_node(self): A = ProcessingMechanism(name='A') @@ -7952,7 +7952,7 @@ def test_rebuild_scheduler_after_remove_node(self): assert comp.scheduler.conditions[C].args == (A, 2) assert comp.scheduler.execution_list[comp.default_execution_id] == [{A}, {A}, {C}] - assert set(comp.scheduler._user_specified_conds.keys()) == {C} + assert set(comp.scheduler._user_specified_conds.conditions.keys()) == {C} class TestInputSpecsDocumentationExamples: From ecb1d595bb9512f6a3d15ec658b9d7a9c7fb3349 Mon Sep 17 00:00:00 2001 From: Katherine Mantel Date: Sat, 2 Sep 2023 01:27:22 +0000 Subject: [PATCH 4/5] utilities: add toposort_key function that produces a python sorting key given a dependency dictionary --- psyneulink/core/globals/utilities.py | 29 ++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/psyneulink/core/globals/utilities.py b/psyneulink/core/globals/utilities.py index d0bca9513c5..f3b01aa765c 100644 --- a/psyneulink/core/globals/utilities.py +++ b/psyneulink/core/globals/utilities.py @@ -101,12 +101,14 @@ import collections import copy import inspect +import itertools import logging import psyneulink import re import time import warnings import weakref +import toposort import types import typing from beartype import beartype @@ -2060,3 +2062,30 @@ def get_function_sig_default_value( return sig.parameters[parameter].default except KeyError: return inspect._empty + + +def toposort_key( + dependency_dict: typing.Dict[typing.Hashable, typing.Iterable[typing.Any]] +) -> typing.Callable[[typing.Any], int]: + """ + Creates a key function for python sorting that causes all items in + **dependency_dict** to be sorted after their dependencies + + Args: + dependency_dict (typing.Dict[typing.Hashable, typing.Iterable[typing.Any]]): + a dictionary where values are the dependencies of keys + + Returns: + typing.Callable[[typing.Any], int]: a key function for python + sorting + """ + topo_ordering = list(toposort.toposort(dependency_dict)) + topo_ordering = list(itertools.chain.from_iterable(topo_ordering)) + + def _generated_toposort_key(obj): + try: + return topo_ordering.index(obj) + except ValueError: + return -1 + + return _generated_toposort_key From 1869c2e51b75801cb2853d6bea48982c0ee57355 Mon Sep 17 00:00:00 2001 From: Katherine Mantel Date: Sat, 5 Aug 2023 01:15:13 +0000 Subject: [PATCH 5/5] condition: support graph-scheduler graph structure conditions - modify Scheduler wrappers to support both basic and structural Conditions - add wrappers for new graph_scheduler.Scheduler methods: - remove_condition - add_graph_edge - remove_graph_edge - update requirements to graph-scheduler<1.3.0 to include graph structure conditions release --- psyneulink/core/scheduling/condition.py | 118 +++++++++++++------ psyneulink/core/scheduling/scheduler.py | 63 +++++++--- requirements.txt | 2 +- tests/composition/test_composition.py | 19 ++- tests/mdf/test_mdf.py | 3 +- tests/scheduling/conftest.py | 22 ++++ tests/scheduling/test_scheduler.py | 146 +++++++++++++++++++++++- 7 files changed, 322 insertions(+), 51 deletions(-) diff --git a/psyneulink/core/scheduling/condition.py b/psyneulink/core/scheduling/condition.py index 3ecf1d34933..06b203d065d 100644 --- a/psyneulink/core/scheduling/condition.py +++ b/psyneulink/core/scheduling/condition.py @@ -10,7 +10,6 @@ import collections import copy -import functools import inspect import numbers import warnings @@ -23,12 +22,44 @@ from psyneulink.core.globals.mdf import MDFSerializable from psyneulink.core.globals.keywords import MODEL_SPEC_ID_TYPE, comparison_operators from psyneulink.core.globals.parameters import parse_context -from psyneulink.core.globals.utilities import parse_valid_identifier +from psyneulink.core.globals.utilities import parse_valid_identifier, toposort_key __all__ = copy.copy(graph_scheduler.condition.__all__) __all__.extend(['Threshold']) +# avoid restricting graph_scheduler versions for this code +# ConditionBase was introduced with graph structure conditions +gs_condition_base_class = graph_scheduler.condition.Condition +condition_class_parents = [graph_scheduler.condition.Condition] + + +try: + gs_condition_base_class = graph_scheduler.condition.ConditionBase +except AttributeError: + pass +else: + class ConditionBase(graph_scheduler.condition.ConditionBase, MDFSerializable): + def as_mdf_model(self): + raise graph_scheduler.ConditionError( + f'MDF support not yet implemented for {type(self)}' + ) + condition_class_parents.append(ConditionBase) + + +try: + graph_scheduler.condition.GraphStructureCondition +except AttributeError: + graph_structure_conditions_available = False + gsc_unavailable_message = ( + 'Graph structure conditions are not available' + f'in your installed graph-scheduler v{graph_scheduler.__version__}' + ) +else: + graph_structure_conditions_available = True + gsc_unavailable_message = '' + + def _create_as_pnl_condition(condition): import psyneulink as pnl @@ -38,12 +69,24 @@ def _create_as_pnl_condition(condition): return condition # already a pnl Condition - if isinstance(condition, Condition): + if isinstance(condition, pnl_condition_base_class): return condition - if not issubclass(pnl_class, graph_scheduler.Condition): + if not issubclass(pnl_class, gs_condition_base_class): return None + if ( + graph_structure_conditions_available + and isinstance(condition, graph_scheduler.condition.GraphStructureCondition) + ): + try: + return pnl_class( + *condition.nodes, + **{k: v for k, v in condition.kwargs.items() if k != 'nodes'} + ) + except AttributeError: + return pnl_class(**condition.kwargs) + new_args = [_create_as_pnl_condition(a) or a for a in condition.args] new_kwargs = {k: _create_as_pnl_condition(v) or v for k, v in condition.kwargs.items()} sig = inspect.signature(pnl_class) @@ -58,7 +101,7 @@ def _create_as_pnl_condition(condition): return res -class Condition(graph_scheduler.Condition, MDFSerializable): +class Condition(*condition_class_parents, MDFSerializable): @handle_external_context() def is_satisfied(self, *args, context=None, execution_id=None, **kwargs): if execution_id is None: @@ -81,7 +124,7 @@ def as_mdf_model(self): def _parse_condition_arg(arg): if isinstance(arg, Component): return parse_valid_identifier(arg.name) - elif isinstance(arg, graph_scheduler.Condition): + elif isinstance(arg, Condition): return arg.as_mdf_model() elif arg is None or isinstance(arg, numbers.Number): return arg @@ -112,7 +155,7 @@ def _parse_condition_arg(arg): for a in self.args: if isinstance(a, Component): a = parse_valid_identifier(a.name) - elif isinstance(a, graph_scheduler.Condition): + elif isinstance(a, Condition): a = a.as_mdf_model() args_list.append(a) extra_args[name] = args_list @@ -139,40 +182,47 @@ def _parse_condition_arg(arg): # below produces psyneulink versions of each Condition class so that # they are compatible with the extra changes made in Condition above # (the scheduler does not handle Context objects or mdf/json export) -cond_dependencies = {} +gs_class_dependencies = {} +gs_classes_to_copy_as_pnl = [] pnl_conditions_module = locals() # inserting into locals defines the classes +pnl_condition_base_class = pnl_conditions_module[gs_condition_base_class.__name__] + +for class_name in graph_scheduler.condition.__dict__: + cls_ = getattr(graph_scheduler.condition, class_name) + if inspect.isclass(cls_): + # don't substitute classes explicitly defined above + if class_name not in pnl_conditions_module: + if issubclass(cls_, gs_condition_base_class): + gs_classes_to_copy_as_pnl.append(class_name) + else: + pnl_conditions_module[class_name] = cls_ -for cond_name in graph_scheduler.condition.__all__: - sched_module_cond_obj = getattr(graph_scheduler.condition, cond_name) - cond_dependencies[cond_name] = {c.__name__ for c in sched_module_cond_obj.__mro__ if c.__name__ != cond_name} + gs_class_dependencies[class_name] = { + c.__name__ for c in cls_.__mro__ if c.__name__ != class_name + } # iterate in order such that superclass types are before subclass types for cond_name in sorted( - graph_scheduler.condition.__all__, - key=functools.cmp_to_key(lambda a, b: -1 if b in cond_dependencies[a] else 1) + gs_classes_to_copy_as_pnl, + key=toposort_key(gs_class_dependencies) ): - # don't substitute Condition because it is explicitly defined above - if cond_name == 'Condition': - continue - sched_module_cond_obj = getattr(graph_scheduler.condition, cond_name) - if ( - inspect.isclass(sched_module_cond_obj) - and issubclass(sched_module_cond_obj, graph_scheduler.Condition) - ): - new_mro = [] - for cls_ in sched_module_cond_obj.__mro__: - if cls_ is not graph_scheduler.Condition: - try: - new_mro.append(pnl_conditions_module[cls_.__name__]) - - except KeyError: - new_mro.append(cls_) - else: - new_mro.extend(Condition.__mro__[:-1]) - pnl_conditions_module[cond_name] = type(cond_name, tuple(new_mro), {}) - elif isinstance(sched_module_cond_obj, type): - pnl_conditions_module[cond_name] = sched_module_cond_obj + new_bases = [] + for cls_ in sched_module_cond_obj.__mro__: + try: + new_bases.append(pnl_conditions_module[cls_.__name__]) + except KeyError: + new_bases.append(cls_) + if cls_ is gs_condition_base_class: + break + + new_meta = type(new_bases[0]) + if new_meta is not type: + pnl_conditions_module[cond_name] = new_meta( + cond_name, tuple(new_bases), {'__module__': Condition.__module__} + ) + else: + pnl_conditions_module[cond_name] = type(cond_name, tuple(new_bases), {}) pnl_conditions_module[cond_name].__doc__ = sched_module_cond_obj.__doc__ diff --git a/psyneulink/core/scheduling/scheduler.py b/psyneulink/core/scheduling/scheduler.py index 5802df5b191..5413d257668 100644 --- a/psyneulink/core/scheduling/scheduler.py +++ b/psyneulink/core/scheduling/scheduler.py @@ -10,15 +10,17 @@ import copy import logging import typing +from typing import Hashable import graph_scheduler import pint +import psyneulink as pnl from psyneulink import _unit_registry from psyneulink.core.globals.context import Context, handle_external_context from psyneulink.core.globals.mdf import MDFSerializable from psyneulink.core.globals.utilities import parse_valid_identifier -from psyneulink.core.scheduling.condition import _create_as_pnl_condition +from psyneulink.core.scheduling.condition import _create_as_pnl_condition, graph_structure_conditions_available, gsc_unavailable_message __all__ = [ 'Scheduler', 'SchedulingMode' @@ -50,6 +52,7 @@ def __init__( graph = composition.graph_processing.prune_feedback_edges()[0] if default_execution_id is None: default_execution_id = composition.default_execution_id + self.composition = composition # TODO: consider integrating something like this into graph-scheduler? self._user_specified_conds = graph_scheduler.ConditionSet() @@ -77,8 +80,15 @@ def replace_term_conds(term_conds): def _validate_conditions(self): unspecified_nodes = [] + + # pre-graph-structure-condition compatibility + try: + conditions_basic = self.conditions.conditions_basic + except AttributeError: + conditions_basic = self.conditions.conditions + for node in self.nodes: - if node not in self.conditions: + if node not in conditions_basic: dependencies = list(self.dependency_dict[node]) if len(dependencies) == 0: cond = graph_scheduler.Always() @@ -90,12 +100,8 @@ def _validate_conditions(self): # TODO: replace this call in graph-scheduler if adding _user_specified_conds self._add_condition(node, cond) unspecified_nodes.append(node) - if len(unspecified_nodes) > 0: - logger.info( - 'These nodes have no Conditions specified, and will be scheduled with conditions: {0}'.format( - {node: self.conditions[node] for node in unspecified_nodes} - ) - ) + + super()._validate_conditions() def add_condition(self, owner, condition): self._user_specified_conds.add_condition(owner, condition) @@ -105,22 +111,37 @@ def _add_condition(self, owner, condition): condition = _create_as_pnl_condition(condition) super().add_condition(owner, condition) + if graph_structure_conditions_available: + if isinstance(condition, pnl.GraphStructureCondition): + self.composition._analyze_graph() + def add_condition_set(self, conditions): self._user_specified_conds.add_condition_set(conditions) self._add_condition_set(conditions) def _add_condition_set(self, conditions): - try: - conditions = conditions.conditions - except AttributeError: - pass - conditions = { node: _create_as_pnl_condition(conditions[node]) for node in conditions } super().add_condition_set(conditions) + def remove_condition(self, owner_or_condition): + try: + res = super().remove_condition(owner_or_condition) + except AttributeError as e: + if "has no attribute 'remove_condition'" in str(e): + raise graph_scheduler.SchedulerError( + f'remove_condition unavailable in your installed graph-scheduler v{graph_scheduler.__version__}' + ) + else: + raise + else: + if isinstance(res, pnl.GraphStructureCondition): + self.composition._analyze_graph() + + return res + @graph_scheduler.Scheduler.termination_conds.setter def termination_conds(self, termination_conds): if termination_conds is not None: @@ -160,6 +181,22 @@ def as_mdf_model(self): def get_clock(self, context): return super().get_clock(context.execution_id) + def add_graph_edge(self, sender: Hashable, receiver: Hashable) -> 'pnl.AddEdgeTo': + if not graph_structure_conditions_available: + raise graph_scheduler.SchedulerError(gsc_unavailable_message) + + cond = pnl.AddEdgeTo(receiver) + self.add_condition(sender, cond) + return cond + + def remove_graph_edge(self, sender: Hashable, receiver: Hashable) -> 'pnl.RemoveEdgeFrom': + if not graph_structure_conditions_available: + raise graph_scheduler.SchedulerError(gsc_unavailable_message) + + cond = pnl.RemoveEdgeFrom(sender) + self.add_condition(receiver, cond) + return cond + _doc_subs = { None: [ diff --git a/requirements.txt b/requirements.txt index e557eda4aff..2106c31f7b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ autograd<1.7 beartype<0.16.0 dill<0.3.8 fastkde>=1.0.24, <1.0.31 -graph-scheduler>=1.1.1, <1.1.3 +graph-scheduler>=1.1.1, <1.3.0 graphviz<0.21.0 grpcio<1.60.0 leabra-psyneulink<0.3.3 diff --git a/tests/composition/test_composition.py b/tests/composition/test_composition.py index 313e4b97132..40ce2f5beea 100644 --- a/tests/composition/test_composition.py +++ b/tests/composition/test_composition.py @@ -40,7 +40,7 @@ NAME, PROJECTIONS, RESULT, OBJECTIVE_MECHANISM, OUTPUT_MECHANISM, OVERRIDE, PARAMS, SLOPE, TARGET_MECHANISM, VARIABLE, VARIANCE) -from psyneulink.core.scheduling.condition import AtTimeStep, AtTrial, Never, TimeInterval +from psyneulink.core.scheduling.condition import AtTimeStep, AtTrial, Never, TimeInterval, graph_structure_conditions_available, gsc_unavailable_message from psyneulink.core.scheduling.condition import EveryNCalls from psyneulink.core.scheduling.scheduler import Scheduler, SchedulingMode from psyneulink.core.scheduling.time import TimeScale @@ -7671,6 +7671,23 @@ def test_feedback_projection_added_by_pathway(self): B: {NodeRole.TERMINAL, NodeRole.OUTPUT, NodeRole.FEEDBACK_SENDER}, } + @pytest.mark.skipif( + not graph_structure_conditions_available, + reason=gsc_unavailable_message + ) + def test_graph_structure_condition_role_changes(self): + A = pnl.ProcessingMechanism(name='A') + B = pnl.ProcessingMechanism(name='B') + C = pnl.ProcessingMechanism(name='C') + + comp = Composition(pathways=[A, B, C]) + + comp.scheduler.add_condition(C, pnl.BeforeNode(A)) + + assert comp.nodes_to_roles[A] == {NodeRole.INPUT} + assert comp.nodes_to_roles[B] == {NodeRole.INTERNAL, NodeRole.TERMINAL} + assert comp.nodes_to_roles[C] == {NodeRole.OUTPUT, NodeRole.ORIGIN} + class TestMisc: diff --git a/tests/mdf/test_mdf.py b/tests/mdf/test_mdf.py index 4fb7332ab31..a8ae9b7ddde 100644 --- a/tests/mdf/test_mdf.py +++ b/tests/mdf/test_mdf.py @@ -303,7 +303,8 @@ def test_mdf_equivalence_individual_functions(mech_type, function, runtime_param trial_termination_cond = eval(trial_termination_cond) except TypeError: pass - comp.scheduler.termination_conds = {pnl.TimeScale.TRIAL: trial_termination_cond} + if trial_termination_cond is not None: + comp.scheduler.termination_conds = {pnl.TimeScale.TRIAL: trial_termination_cond} comp.run(inputs={A: [[1.0]]}, runtime_params=eval(runtime_params)) diff --git a/tests/scheduling/conftest.py b/tests/scheduling/conftest.py index 7215ab298bc..958d8caa074 100644 --- a/tests/scheduling/conftest.py +++ b/tests/scheduling/conftest.py @@ -34,3 +34,25 @@ def three_node_linear_composition(): comp.add_linear_processing_pathway([A, B, C]) return comp.nodes, comp + + +@pytest.helpers.register +def composition_from_string_pathways(pathways): + mechanisms = {} + pathways_as_mechs = [] + + for p in pathways: + p_as_mechs = [] + assert not isinstance(p, str), 'pathways must be a list of lists' + for m in p: + try: + mech = mechanisms[m] + except KeyError: + mech = pnl.ProcessingMechanism(name=m) + mechanisms[m] = mech + p_as_mechs.append(mech) + pathways_as_mechs.append(p_as_mechs) + + comp = pnl.Composition(pathways=pathways_as_mechs) + + return comp, mechanisms, mechanisms.values() diff --git a/tests/scheduling/test_scheduler.py b/tests/scheduling/test_scheduler.py index ad145ad9808..07feba1fcad 100644 --- a/tests/scheduling/test_scheduler.py +++ b/tests/scheduling/test_scheduler.py @@ -1,5 +1,6 @@ import fractions import logging +import graph_scheduler import numpy as np import psyneulink as pnl import pytest @@ -12,7 +13,7 @@ from psyneulink.core.components.projections.pathway.mappingprojection import MappingProjection from psyneulink.core.compositions.composition import Composition, EdgeType from psyneulink.core.globals.keywords import VALUE -from psyneulink.core.scheduling.condition import AfterNCalls, AfterNPasses, AfterNTrials, AfterPass, All, AllHaveRun, Always, Any, AtPass, BeforeNCalls, BeforePass, EveryNCalls, EveryNPasses, JustRan, TimeInterval, WhenFinished, Never +from psyneulink.core.scheduling.condition import AfterNCalls, AfterNPasses, AfterNTrials, AfterPass, All, AllHaveRun, Always, Any, AtPass, BeforeNCalls, BeforePass, EveryNCalls, EveryNPasses, JustRan, TimeInterval, WhenFinished, Never, graph_structure_conditions_available, gsc_unavailable_message from psyneulink.core.scheduling.scheduler import Scheduler from psyneulink.core.scheduling.time import TimeScale from psyneulink.library.components.mechanisms.processing.integrator.ddm import DDM @@ -20,6 +21,18 @@ logger = logging.getLogger(__name__) +# mock graph structure conditions if not present (this causes errors on +# collect due to being used in parametrizations, although the tests +# using them will be skipped) +if not graph_structure_conditions_available: + # only the conditions used in parametrization below + mock_gs_conditions = [ + 'AfterNodes', 'BeforeNode', 'BeforeNodes', + ] + for c in mock_gs_conditions: + setattr(pnl, c, lambda *args, **kwargs: None) + + class TestScheduler: @classmethod def setup_class(self): @@ -1685,3 +1698,134 @@ def test_absolute_interval_linear(self, three_node_linear_composition, condition comp.scheduler.add_condition(eval(node), conditions[node]) assert comp.scheduler._get_absolute_consideration_set_execution_unit() == interval + + +@pytest.mark.skipif( + not graph_structure_conditions_available, + reason=gsc_unavailable_message +) +class TestGraphStructureConditions: + @pytest.mark.parametrize('add_method', ['add_graph_edge', 'add_condition_AddEdgeTo']) + @pytest.mark.parametrize('remove_method', ['remove_graph_edge', 'add_condition_RemoveEdgeFrom']) + def test_add_graph_structure_conditions(self, add_method, remove_method): + def add_condition(owner, condition): + if isinstance(condition, pnl.AddEdgeTo) and add_method == 'add_graph_edge': + return scheduler.add_graph_edge(owner, condition.node) + elif isinstance(condition, pnl.RemoveEdgeFrom) and remove_method == 'remove_graph_edge': + return scheduler.remove_graph_edge(condition.node, owner) + else: + scheduler.add_condition(owner, condition) + return condition + + comp, _, mechanisms = pytest.helpers.composition_from_string_pathways( + [['A', 'B', 'C', 'D', 'E']] + ) + A, B, C, D, E = mechanisms + scheduler = comp.scheduler + initial_conds = {A: pnl.AddEdgeTo(C)} + initial_graph = scheduler.graph + scheduler.add_condition_set(initial_conds) + + assert scheduler.dependency_dict == { + **initial_graph, + **{C: {A, B}}, + } + assert len(scheduler._graphs) == 2 + assert scheduler._graphs[0] == initial_graph + + addl_conditions = [ + (B, pnl.AddEdgeTo(D)), + (B, pnl.AddEdgeTo(E)), + (C, pnl.AddEdgeTo(E)), + (E, pnl.RemoveEdgeFrom(B)), + (D, pnl.RemoveEdgeFrom(B)), + ] + + for i, (owner, cond) in enumerate(addl_conditions): + added_cond = add_condition(owner, cond) + addl_conditions[i] = (owner, added_cond) + + assert scheduler.dependency_dict == { + A: set(), + B: {A}, + C: {A, B}, + D: {C}, + E: {C, D}, + } + assert scheduler._last_handled_structural_condition_order == ( + [initial_conds[A]] + [c[1] for c in addl_conditions] + ) + + # take only the first three elements in addl_conditions + addl_conds_sub_idx = 3 + scheduler.conditions = pnl.ConditionSet({ + **{ + k: [ + addl_conditions[i][1] for i in range(addl_conds_sub_idx) + if addl_conditions[i][0] == k + ] + for k in mechanisms + }, + A: initial_conds[A], + }) + assert scheduler.dependency_dict == { + A: set(), + B: {A}, + C: {A, B}, + D: {B, C}, + E: {B, C, D}, + } + assert scheduler._last_handled_structural_condition_order == ( + [initial_conds[A]] + [c[1] for c in addl_conditions[:addl_conds_sub_idx]] + ) + + @pytest.mark.parametrize( + 'pathways, conditions, expected_output', + [ + ([['A', 'B', 'C']], {'C': pnl.BeforeNode('A')}, [{'C'}, {'A'}, {'B'}]), + ([['A', 'B', 'C']], {'B': pnl.AfterNodes('C')}, [{'A'}, {'C'}, {'B'}]), + ([['A', 'B', 'D'], ['C', 'D']], {'D': pnl.BeforeNodes('A', 'C')}, [{'D'}, {'A', 'C'}, {'B'}]), + ] + ) + def test_run_graph_structure_conditions(self, pathways, conditions, expected_output): + comp, mechanisms, _ = pytest.helpers.composition_from_string_pathways(pathways) + comp.scheduler.add_condition_set( + { + mechanisms[owner]: type(cond)(*[mechanisms[n] for n in cond.nodes]) + for owner, cond in conditions.items() + } + ) + comp.run({n: [0] for n in mechanisms.values() if len(comp.scheduler._graphs[0][n]) == 0}) + output = comp.scheduler.execution_list[comp.default_execution_id] + + assert output == [{mechanisms[n] for n in eset} for eset in expected_output] + + def test_gsc_creates_cyclic_graph(self): + comp, _, (A, B, C) = pytest.helpers.composition_from_string_pathways([['A', 'B', 'C']]) + comp.scheduler.add_condition(B, pnl.EveryNCalls(A, 1)) + comp.scheduler.add_condition(B, pnl.AfterNode(C)) + with pytest.warns(UserWarning, match=r'for \(ProcessingMechanism B\) creates a cycle:'): + comp.scheduler.add_condition(B, pnl.BeforeNode(A, prune_cycles=False)) + + # If _build_consideration_queue failure not explicitly detected + # and handled while adding BeforeNode(A) for B, the new + # modified cyclic graph is pushed but the condition is not + # added, resulting in incorrect state of scheduler._graphs. + # Assert this doesn't happen. + assert len(comp.scheduler._graphs) == 3 + assert len(comp.scheduler.conditions.structural_condition_order) == 2 + + with pytest.raises(graph_scheduler.SchedulerError, match='contains a cycle'): + comp.run({A: [0]}) + + def test_gsc_exact_time_warning(self): + A = pnl.ProcessingMechanism(name='A') + B = pnl.ProcessingMechanism(name='B') + comp = Composition(pathways=[[A], [B]]) + comp.scheduler.add_condition(A, pnl.AfterNode(B)) + + with pytest.warns( + UserWarning, + match='In exact time mode, graph structure conditions will have no effect' + ): + comp.run(scheduling_mode=pnl.SchedulingMode.EXACT_TIME)