From e950b201bb1c051476999db6f203f82a6bfde98c Mon Sep 17 00:00:00 2001 From: josh-deb Date: Fri, 18 Oct 2024 18:04:54 -0700 Subject: [PATCH 1/6] feat(timer): adding timer utility --- beams/sequencer/helpers/Timer.py | 38 ++++++++++++++++++++++++++++++++ beams/tests/test_timer.py | 34 ++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 beams/sequencer/helpers/Timer.py create mode 100644 beams/tests/test_timer.py diff --git a/beams/sequencer/helpers/Timer.py b/beams/sequencer/helpers/Timer.py new file mode 100644 index 0000000..0bee866 --- /dev/null +++ b/beams/sequencer/helpers/Timer.py @@ -0,0 +1,38 @@ +import time + + +class Timer(): + def __init__(self, + name: str, + timer_period_seconds: float, + auto_start: bool = False, + is_periodic: bool = False): + self.name = name + self.timer_period_seconds = timer_period_seconds + self.is_periodic = is_periodic + self.auto_start = auto_start + if (self.auto_start): + self.timer_start_time = time.time() + else: + self.timer_start_time = -1 + + def start_timer(self) -> None: + self.timer_start_time = time.time() + + def check_valid_timer(self) -> bool: + if (self.timer_start_time == -1): + raise RuntimeError(f"{self.name} timer checked but not started") + + def is_elapsed(self) -> bool: + elapsed = self.get_elapsed() + if (elapsed > self.timer_period_seconds): + if (self.is_periodic): + self.timer_start_time = time.time() + return True + else: + return False + + def get_elapsed(self) -> float: + self.check_valid_timer() + now = time.time() + return now - self.timer_start_time diff --git a/beams/tests/test_timer.py b/beams/tests/test_timer.py new file mode 100644 index 0000000..ac12f21 --- /dev/null +++ b/beams/tests/test_timer.py @@ -0,0 +1,34 @@ +import time +import pytest +from beams.sequencer.helpers.Timer import Timer + + +class TestTimer(): + def test_elapsed(self): + t = Timer(name="test_elapsed", + timer_period_seconds=0.1, + is_periodic=False) + t.start_timer() + assert t.is_elapsed() is False + time.sleep(0.5) + assert t.is_elapsed() is True + + def test_timer_error(self): + t = Timer(name="test_error_not_started", + timer_period_seconds=0.1, + is_periodic=False) + with pytest.raises(RuntimeError): + t.get_elapsed() + with pytest.raises(RuntimeError): + t.is_elapsed() + + def test_periodic(self): + t = Timer(name="test_error_not_started", + timer_period_seconds=0.1, + auto_start=True, + is_periodic=True) + time.sleep(0.2) + assert t.is_elapsed() is True + assert t.is_elapsed() is False + time.sleep(0.1) + assert t.is_elapsed() is True From 0e2f7e94b235c150b4f3432d3d14e5eea3147d77 Mon Sep 17 00:00:00 2001 From: josh-deb Date: Fri, 18 Oct 2024 18:19:36 -0700 Subject: [PATCH 2/6] feat(ActionWorker, ActionNode, Timeout): moved wrapped_action_work to ActioNodeWorker, added functional timeout ability, tested --- beams/behavior_tree/ActionNode.py | 60 ++--------------------- beams/behavior_tree/ActionWorker.py | 75 +++++++++++++++++++++++++++-- beams/tests/test_leaf_node.py | 29 +++++++++++ 3 files changed, 103 insertions(+), 61 deletions(-) diff --git a/beams/behavior_tree/ActionNode.py b/beams/behavior_tree/ActionNode.py index 2300dfd..0599aa8 100644 --- a/beams/behavior_tree/ActionNode.py +++ b/beams/behavior_tree/ActionNode.py @@ -1,71 +1,17 @@ import atexit import logging import os -import time -from multiprocessing import Event, Queue, Value -from typing import Callable +from multiprocessing import Event import py_trees -from beams.behavior_tree.ActionWorker import ActionWorker +from beams.behavior_tree.ActionWorker import ActionWorker, wrapped_action_work # the latter is grabbed as a pas through from beams.behavior_tree.VolatileStatus import VolatileStatus -from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop, - Evaluatable) +from beams.typing_helper import ActionNodeWorkLoop, Evaluatable logger = logging.getLogger(__name__) -def wrapped_action_work(loop_period_sec: float = 0.1): - def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop: - def work_wrapper( - do_work: Value, - name: str, - work_gate: Event, - volatile_status: VolatileStatus, - completion_condition: Evaluatable, - log_queue: Queue, - log_configurer: Callable) -> None: - """ - Wrap self.work_func, and set up logging / status communication - InterProcess Communication performed by shared memory objects: - - volatile status - - logging queue - - Runs a persistent while loop, in which the work func is called repeatedly - """ - log_configurer(log_queue) - while (do_work.value): - logger.debug(f"WAITING FOR INIT from node: {name}") - work_gate.wait() - work_gate.clear() - - # Set to running - volatile_status.set_value(py_trees.common.Status.RUNNING) - while not completion_condition(): - logger.debug(f"CALLING CAGET FROM from node ({name})") - try: - status = func(completion_condition) - except Exception as ex: - volatile_status.set_value(py_trees.common.Status.FAILURE) - logger.error(f"Work function failed, setting node ({name}) " - f"as FAILED. ({ex})") - break - - volatile_status.set_value(status) - logger.debug(f"Setting node ({name}): {volatile_status.get_value()}") - time.sleep(loop_period_sec) - - # one last check - if completion_condition(): - volatile_status.set_value(py_trees.common.Status.SUCCESS) - else: - volatile_status.set_value(py_trees.common.Status.FAILURE) - - logger.debug(f"Worker for node ({name}) completed.") - return work_wrapper - return action_worker_work_function_generator - - class ActionNode(py_trees.behaviour.Behaviour): def __init__( self, diff --git a/beams/behavior_tree/ActionWorker.py b/beams/behavior_tree/ActionWorker.py index da41f8f..e68b940 100644 --- a/beams/behavior_tree/ActionWorker.py +++ b/beams/behavior_tree/ActionWorker.py @@ -11,14 +11,81 @@ * LOGGER_QUEUE: instance of the logging queue * worker_logging_configurer: utility functuon to register log queue with handler """ -from multiprocessing import Event -from typing import Any, Callable, Optional +import logging +import time +from multiprocessing import Event, Value, Queue +from typing import Callable, Optional from epics.multiproc import CAProcess +import py_trees + from beams.behavior_tree.VolatileStatus import VolatileStatus from beams.logging import LOGGER_QUEUE, worker_logging_configurer +from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop, + Evaluatable) from beams.sequencer.helpers.Worker import Worker +from beams.sequencer.helpers.Timer import Timer + +logger = logging.getLogger(__name__) + + +def wrapped_action_work(loop_period_sec: float = 0.1, work_function_timeout_period_sec: float = 2): + def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop: + def work_wrapper( + do_work: Value, + name: str, + work_gate: Event, + volatile_status: VolatileStatus, + completion_condition: Evaluatable, + log_queue: Queue, + log_configurer: Callable) -> None: + """ + Wrap self.work_func, and set up logging / status communication + InterProcess Communication performed by shared memory objects: + - volatile status + - logging queue + + Runs a persistent while loop, in which the work func is called repeatedly + """ + log_configurer(log_queue) + work_loop_timeout_timer = Timer(name=name, + timer_period_seconds=work_function_timeout_period_sec, + auto_start=False, + is_periodic=True) + while (do_work.value): + logger.debug(f"WAITING FOR INIT from node: {name}") + work_gate.wait() + work_gate.clear() + + # Set to running + volatile_status.set_value(py_trees.common.Status.RUNNING) + # Start timer + work_loop_timeout_timer.start_timer() + while not completion_condition() and not work_loop_timeout_timer.is_elapsed(): + logger.debug(f"CALLING CAGET FROM from node ({name})") + try: + status = func(completion_condition) + except Exception as ex: + volatile_status.set_value(py_trees.common.Status.FAILURE) + logger.error(f"Work function failed, setting node ({name}) " + f"as FAILED. ({ex})") + break + + volatile_status.set_value(status) + logger.debug(f"Setting node ({name}): {volatile_status.get_value()}") + time.sleep(loop_period_sec) + + # check if we exited loop because we timed out or we succeeded at task + if completion_condition(): + logger.debug(f"Worker for node ({name}) completed.") + volatile_status.set_value(py_trees.common.Status.SUCCESS) + else: + logger.debug(f"Worker for node ({name}) failed.") + volatile_status.set_value(py_trees.common.Status.FAILURE) + + return work_wrapper + return action_worker_work_function_generator class ActionWorker(Worker): @@ -27,8 +94,8 @@ def __init__( proc_name: str, volatile_status: VolatileStatus, work_gate: Event, - work_func: Callable[[Any], None], - comp_cond: Callable[[Any], bool], + work_func: Callable[..., None], + comp_cond: Callable[..., bool], stop_func: Optional[Callable[[None], None]] = None ): super().__init__( diff --git a/beams/tests/test_leaf_node.py b/beams/tests/test_leaf_node.py index 674ef7d..a834194 100644 --- a/beams/tests/test_leaf_node.py +++ b/beams/tests/test_leaf_node.py @@ -35,6 +35,35 @@ def comp_cond(): assert percentage_complete.value == 100 +def test_action_node_timeout(): + # For test + percentage_complete = Value("i", 0) + + @wrapped_action_work(loop_period_sec=0.001, work_function_timeout_period_sec=.002) + def work_func(comp_condition: Callable) -> Status: + percentage_complete.value += 10 + if comp_condition(): + return Status.SUCCESS + logger.debug(f"pct complete -> {percentage_complete.value}") + return Status.RUNNING + + def comp_cond(): + return percentage_complete.value >= 100 + + action = ActionNode(name="action", work_func=work_func, + completion_condition=comp_cond) + action.setup() + + while action.status not in ( + Status.SUCCESS, + Status.FAILURE, + ): + time.sleep(0.01) + action.tick_once() + assert action.status == Status.FAILURE + assert percentage_complete.value != 100 + + def test_condition_node(): def condition_fn(): return True From eb0dd1389e25b84ef779bcf3e59d903aa41c03dd Mon Sep 17 00:00:00 2001 From: josh-deb Date: Fri, 18 Oct 2024 18:26:59 -0700 Subject: [PATCH 3/6] pre-commit --- beams/behavior_tree/ActionNode.py | 3 ++- beams/behavior_tree/ActionWorker.py | 9 ++++----- beams/sequencer/helpers/Timer.py | 4 ++-- beams/tests/test_timer.py | 2 ++ 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/beams/behavior_tree/ActionNode.py b/beams/behavior_tree/ActionNode.py index 0599aa8..647d3d0 100644 --- a/beams/behavior_tree/ActionNode.py +++ b/beams/behavior_tree/ActionNode.py @@ -5,7 +5,8 @@ import py_trees -from beams.behavior_tree.ActionWorker import ActionWorker, wrapped_action_work # the latter is grabbed as a pas through +from beams.behavior_tree.ActionWorker import wrapped_action_work # noqa: F401 +from beams.behavior_tree.ActionWorker import ActionWorker from beams.behavior_tree.VolatileStatus import VolatileStatus from beams.typing_helper import ActionNodeWorkLoop, Evaluatable diff --git a/beams/behavior_tree/ActionWorker.py b/beams/behavior_tree/ActionWorker.py index e68b940..c560678 100644 --- a/beams/behavior_tree/ActionWorker.py +++ b/beams/behavior_tree/ActionWorker.py @@ -13,19 +13,18 @@ """ import logging import time -from multiprocessing import Event, Value, Queue +from multiprocessing import Event, Queue, Value from typing import Callable, Optional -from epics.multiproc import CAProcess - import py_trees +from epics.multiproc import CAProcess from beams.behavior_tree.VolatileStatus import VolatileStatus from beams.logging import LOGGER_QUEUE, worker_logging_configurer +from beams.sequencer.helpers.Timer import Timer +from beams.sequencer.helpers.Worker import Worker from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop, Evaluatable) -from beams.sequencer.helpers.Worker import Worker -from beams.sequencer.helpers.Timer import Timer logger = logging.getLogger(__name__) diff --git a/beams/sequencer/helpers/Timer.py b/beams/sequencer/helpers/Timer.py index 0bee866..8e5c77c 100644 --- a/beams/sequencer/helpers/Timer.py +++ b/beams/sequencer/helpers/Timer.py @@ -3,9 +3,9 @@ class Timer(): def __init__(self, - name: str, + name: str, timer_period_seconds: float, - auto_start: bool = False, + auto_start: bool = False, is_periodic: bool = False): self.name = name self.timer_period_seconds = timer_period_seconds diff --git a/beams/tests/test_timer.py b/beams/tests/test_timer.py index ac12f21..9542b04 100644 --- a/beams/tests/test_timer.py +++ b/beams/tests/test_timer.py @@ -1,5 +1,7 @@ import time + import pytest + from beams.sequencer.helpers.Timer import Timer From 8b166968aceb4a9240e0ed441d02e82121def3ed Mon Sep 17 00:00:00 2001 From: josh-deb Date: Mon, 4 Nov 2024 12:00:18 -0800 Subject: [PATCH 4/6] wip --- beams/sequencer/helpers/Timer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beams/sequencer/helpers/Timer.py b/beams/sequencer/helpers/Timer.py index 8e5c77c..c1dbe1d 100644 --- a/beams/sequencer/helpers/Timer.py +++ b/beams/sequencer/helpers/Timer.py @@ -12,7 +12,7 @@ def __init__(self, self.is_periodic = is_periodic self.auto_start = auto_start if (self.auto_start): - self.timer_start_time = time.time() + self.timer_start_time = time.monotonic() else: self.timer_start_time = -1 From 6c507bb9c28a90fdb7a34038d6a04edcab8f6b39 Mon Sep 17 00:00:00 2001 From: josh-deb Date: Fri, 18 Oct 2024 19:40:37 -0700 Subject: [PATCH 5/6] wip: addressing issue #56 --- beams/tests/artifacts/im2l0_test.json | 5 +++- beams/tree_config.py | 36 +++++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/beams/tests/artifacts/im2l0_test.json b/beams/tests/artifacts/im2l0_test.json index f6e442a..6a71020 100644 --- a/beams/tests/artifacts/im2l0_test.json +++ b/beams/tests/artifacts/im2l0_test.json @@ -24,6 +24,7 @@ "pv": "IM2L0:XTES:MMS:STATE:GET_RBV", "value": "OUT", "loop_period_sec": 0.01, + "caput_paradigm": "EVERY_SECOND", "termination_check": { "ConditionItem": { "name": "check_reticule_state", @@ -55,6 +56,7 @@ "pv": "IM2L0:XTES:CLZ.RBV", "value": 25, "loop_period_sec": 0.01, + "caput_paradigm": "EVERY_SECOND", "termination_check": { "ConditionItem": { "name": "check_zoom_motor", @@ -86,6 +88,7 @@ "pv": "IM2L0:XTES:CLF.RBV", "value": 50, "loop_period_sec": 0.01, + "caput_paradigm": "ONCE", "termination_check": { "ConditionItem": { "name": "check_focus_motor", @@ -101,4 +104,4 @@ ] } } -} +} \ No newline at end of file diff --git a/beams/tree_config.py b/beams/tree_config.py index ba819a4..bcdecd4 100644 --- a/beams/tree_config.py +++ b/beams/tree_config.py @@ -20,6 +20,7 @@ from beams.behavior_tree.ActionNode import ActionNode, wrapped_action_work from beams.behavior_tree.CheckAndDo import CheckAndDo from beams.behavior_tree.ConditionNode import ConditionNode +from beams.sequencer.helpers.Timer import Timer from beams.serialization import as_tagged_union from beams.typing_helper import Evaluatable @@ -200,6 +201,12 @@ def cond_func(): return cond_func +class CaputParadigm(str, Enum): + AT_LOOP_FREQUENCY = "AT_LOOP_FREQUENCY" + ONCE = "ONCE" + EVERY_SECOND = "EVERY_SECOND" + + @dataclass class SequenceConditionItem(BaseSequenceItem, BaseConditionItem): """ @@ -237,23 +244,48 @@ class SetPVActionItem(BaseItem): pv: str = "" value: Any = 1 loop_period_sec: float = 1.0 + caput_paradigm: CaputParadigm = CaputParadigm.AT_LOOP_FREQUENCY termination_check: BaseConditionItem = field(default_factory=ConditionItem) + def __post_init__(self): + is_timer_periodic = True + timer_period = 1 + if (self.caput_paradigm == CaputParadigm.ONCE): + is_timer_periodic = False + if (self.caput_paradigm == CaputParadigm.AT_LOOP_FREQUENCY): + timer_period = self.loop_period_sec + if (self.caput_paradigm == CaputParadigm.EVERY_SECOND): + timer_period = 1 + + self.work_frequency_timer = Timer(name=f"{self.name}_caput_timer", + timer_period_seconds=timer_period, + auto_start=False, + is_periodic=is_timer_periodic) + self.do_once = False + def get_tree(self) -> ActionNode: @wrapped_action_work(self.loop_period_sec) def work_func(comp_condition: Evaluatable) -> py_trees.common.Status: try: + if not self.do_once: + self.work_frequency_timer.start_timer() + self.do_once = True + # Set to running - value = caget(self.pv) # double caget, this is uneeded as currently the comp_condition has caget baked in + value = caget(self.pv, as_string=isinstance(self.value, str)) # double caget, this is uneeded as currently the comp_condition has caget if comp_condition(): + self.do_once = False return py_trees.common.Status.SUCCESS logger.debug(f"{self.name}: Value is {value}") # specific caput logic to SetPVActionItem - caput(self.pv, self.value) + if (not self.work_frequency_timer.is_elapsed()): + caput(self.pv, self.value) + logger.debug(f"{self.name} CAPUTTING") + return py_trees.common.Status.RUNNING except Exception as ex: logger.warning(f"{self.name}: work failed: {ex}") From 9e7de8bb10131a53e15b42598d93b0e505f0529b Mon Sep 17 00:00:00 2001 From: josh-deb Date: Fri, 18 Oct 2024 19:49:23 -0700 Subject: [PATCH 6/6] pre-commit --- beams/tests/artifacts/im2l0_test.json | 2 +- beams/tree_config.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/beams/tests/artifacts/im2l0_test.json b/beams/tests/artifacts/im2l0_test.json index 6a71020..3250858 100644 --- a/beams/tests/artifacts/im2l0_test.json +++ b/beams/tests/artifacts/im2l0_test.json @@ -104,4 +104,4 @@ ] } } -} \ No newline at end of file +} diff --git a/beams/tree_config.py b/beams/tree_config.py index bcdecd4..1d5467d 100644 --- a/beams/tree_config.py +++ b/beams/tree_config.py @@ -274,7 +274,7 @@ def work_func(comp_condition: Evaluatable) -> py_trees.common.Status: self.do_once = True # Set to running - value = caget(self.pv, as_string=isinstance(self.value, str)) # double caget, this is uneeded as currently the comp_condition has caget + value = caget(self.pv, as_string=isinstance(self.value, str)) # double caget, this is uneeded as currently the comp_condition has caget if comp_condition(): self.do_once = False