Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: addressing issue #56, allowing for us to determine how often we caput #64

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 3 additions & 56 deletions beams/behavior_tree/ActionNode.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,18 @@
import atexit
import logging
import os
import time
from multiprocessing import Event, Queue, Value
from typing import Callable
from multiprocessing import Event

import py_trees

from beams.behavior_tree.ActionWorker import wrapped_action_work # noqa: F401
from beams.behavior_tree.ActionWorker import ActionWorker
from beams.behavior_tree.VolatileStatus import VolatileStatus
from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop,
Evaluatable)
from beams.typing_helper import ActionNodeWorkLoop, Evaluatable

logger = logging.getLogger(__name__)


def wrapped_action_work(loop_period_sec: float = 0.1):
def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop:
def work_wrapper(
do_work: Value,
name: str,
work_gate: Event,
volatile_status: VolatileStatus,
completion_condition: Evaluatable,
log_queue: Queue,
log_configurer: Callable) -> None:
"""
Wrap self.work_func, and set up logging / status communication
InterProcess Communication performed by shared memory objects:
- volatile status
- logging queue

Runs a persistent while loop, in which the work func is called repeatedly
"""
log_configurer(log_queue)
while (do_work.value):
logger.debug(f"WAITING FOR INIT from node: {name}")
work_gate.wait()
work_gate.clear()

# Set to running
volatile_status.set_value(py_trees.common.Status.RUNNING)
while not completion_condition():
logger.debug(f"CALLING CAGET FROM from node ({name})")
try:
status = func(completion_condition)
except Exception as ex:
volatile_status.set_value(py_trees.common.Status.FAILURE)
logger.error(f"Work function failed, setting node ({name}) "
f"as FAILED. ({ex})")
break

volatile_status.set_value(status)
logger.debug(f"Setting node ({name}): {volatile_status.get_value()}")
time.sleep(loop_period_sec)

# one last check
if completion_condition():
volatile_status.set_value(py_trees.common.Status.SUCCESS)
else:
volatile_status.set_value(py_trees.common.Status.FAILURE)

logger.debug(f"Worker for node ({name}) completed.")
return work_wrapper
return action_worker_work_function_generator


class ActionNode(py_trees.behaviour.Behaviour):
def __init__(
self,
Expand Down
74 changes: 70 additions & 4 deletions beams/behavior_tree/ActionWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,80 @@
* LOGGER_QUEUE: instance of the logging queue
* worker_logging_configurer: utility functuon to register log queue with handler
"""
from multiprocessing import Event
from typing import Any, Callable, Optional
import logging
import time
from multiprocessing import Event, Queue, Value
from typing import Callable, Optional

import py_trees
from epics.multiproc import CAProcess

from beams.behavior_tree.VolatileStatus import VolatileStatus
from beams.logging import LOGGER_QUEUE, worker_logging_configurer
from beams.sequencer.helpers.Timer import Timer
from beams.sequencer.helpers.Worker import Worker
from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop,
Evaluatable)

logger = logging.getLogger(__name__)


def wrapped_action_work(loop_period_sec: float = 0.1, work_function_timeout_period_sec: float = 2):
def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop:
def work_wrapper(
do_work: Value,
name: str,
work_gate: Event,
volatile_status: VolatileStatus,
completion_condition: Evaluatable,
log_queue: Queue,
log_configurer: Callable) -> None:
"""
Wrap self.work_func, and set up logging / status communication
InterProcess Communication performed by shared memory objects:
- volatile status
- logging queue

Runs a persistent while loop, in which the work func is called repeatedly
"""
log_configurer(log_queue)
work_loop_timeout_timer = Timer(name=name,
timer_period_seconds=work_function_timeout_period_sec,
auto_start=False,
is_periodic=True)
while (do_work.value):
logger.debug(f"WAITING FOR INIT from node: {name}")
work_gate.wait()
work_gate.clear()

# Set to running
volatile_status.set_value(py_trees.common.Status.RUNNING)
# Start timer
work_loop_timeout_timer.start_timer()
while not completion_condition() and not work_loop_timeout_timer.is_elapsed():
logger.debug(f"CALLING CAGET FROM from node ({name})")
try:
status = func(completion_condition)
except Exception as ex:
volatile_status.set_value(py_trees.common.Status.FAILURE)
logger.error(f"Work function failed, setting node ({name}) "
f"as FAILED. ({ex})")
break

volatile_status.set_value(status)
logger.debug(f"Setting node ({name}): {volatile_status.get_value()}")
time.sleep(loop_period_sec)

# check if we exited loop because we timed out or we succeeded at task
if completion_condition():
logger.debug(f"Worker for node ({name}) completed.")
volatile_status.set_value(py_trees.common.Status.SUCCESS)
else:
logger.debug(f"Worker for node ({name}) failed.")
volatile_status.set_value(py_trees.common.Status.FAILURE)

return work_wrapper
return action_worker_work_function_generator


class ActionWorker(Worker):
Expand All @@ -27,8 +93,8 @@ def __init__(
proc_name: str,
volatile_status: VolatileStatus,
work_gate: Event,
work_func: Callable[[Any], None],
comp_cond: Callable[[Any], bool],
work_func: Callable[..., None],
comp_cond: Callable[..., bool],
stop_func: Optional[Callable[[None], None]] = None
):
super().__init__(
Expand Down
38 changes: 38 additions & 0 deletions beams/sequencer/helpers/Timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import time


class Timer():
def __init__(self,
name: str,
timer_period_seconds: float,
auto_start: bool = False,
is_periodic: bool = False):
self.name = name
self.timer_period_seconds = timer_period_seconds
self.is_periodic = is_periodic
self.auto_start = auto_start
if (self.auto_start):
self.timer_start_time = time.monotonic()
else:
self.timer_start_time = -1

def start_timer(self) -> None:
self.timer_start_time = time.time()

def check_valid_timer(self) -> bool:
if (self.timer_start_time == -1):
raise RuntimeError(f"{self.name} timer checked but not started")

def is_elapsed(self) -> bool:
elapsed = self.get_elapsed()
if (elapsed > self.timer_period_seconds):
if (self.is_periodic):
self.timer_start_time = time.time()
return True
else:
return False

def get_elapsed(self) -> float:
self.check_valid_timer()
now = time.time()
return now - self.timer_start_time
3 changes: 3 additions & 0 deletions beams/tests/artifacts/im2l0_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"pv": "IM2L0:XTES:MMS:STATE:GET_RBV",
"value": "OUT",
"loop_period_sec": 0.01,
"caput_paradigm": "EVERY_SECOND",
"termination_check": {
"ConditionItem": {
"name": "check_reticule_state",
Expand Down Expand Up @@ -55,6 +56,7 @@
"pv": "IM2L0:XTES:CLZ.RBV",
"value": 25,
"loop_period_sec": 0.01,
"caput_paradigm": "EVERY_SECOND",
"termination_check": {
"ConditionItem": {
"name": "check_zoom_motor",
Expand Down Expand Up @@ -86,6 +88,7 @@
"pv": "IM2L0:XTES:CLF.RBV",
"value": 50,
"loop_period_sec": 0.01,
"caput_paradigm": "ONCE",
"termination_check": {
"ConditionItem": {
"name": "check_focus_motor",
Expand Down
29 changes: 29 additions & 0 deletions beams/tests/test_leaf_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,35 @@ def comp_cond():
assert percentage_complete.value == 100


def test_action_node_timeout():
# For test
percentage_complete = Value("i", 0)

@wrapped_action_work(loop_period_sec=0.001, work_function_timeout_period_sec=.002)
def work_func(comp_condition: Callable) -> Status:
percentage_complete.value += 10
if comp_condition():
return Status.SUCCESS
logger.debug(f"pct complete -> {percentage_complete.value}")
return Status.RUNNING

def comp_cond():
return percentage_complete.value >= 100

action = ActionNode(name="action", work_func=work_func,
completion_condition=comp_cond)
action.setup()

while action.status not in (
Status.SUCCESS,
Status.FAILURE,
):
time.sleep(0.01)
action.tick_once()
assert action.status == Status.FAILURE
assert percentage_complete.value != 100


def test_condition_node():
def condition_fn():
return True
Expand Down
36 changes: 36 additions & 0 deletions beams/tests/test_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import time

import pytest

from beams.sequencer.helpers.Timer import Timer


class TestTimer():
def test_elapsed(self):
t = Timer(name="test_elapsed",
timer_period_seconds=0.1,
is_periodic=False)
t.start_timer()
assert t.is_elapsed() is False
time.sleep(0.5)
assert t.is_elapsed() is True

def test_timer_error(self):
t = Timer(name="test_error_not_started",
timer_period_seconds=0.1,
is_periodic=False)
with pytest.raises(RuntimeError):
t.get_elapsed()
with pytest.raises(RuntimeError):
t.is_elapsed()

def test_periodic(self):
t = Timer(name="test_error_not_started",
timer_period_seconds=0.1,
auto_start=True,
is_periodic=True)
time.sleep(0.2)
assert t.is_elapsed() is True
assert t.is_elapsed() is False
time.sleep(0.1)
assert t.is_elapsed() is True
36 changes: 34 additions & 2 deletions beams/tree_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from beams.behavior_tree.ActionNode import ActionNode, wrapped_action_work
from beams.behavior_tree.CheckAndDo import CheckAndDo
from beams.behavior_tree.ConditionNode import ConditionNode
from beams.sequencer.helpers.Timer import Timer
from beams.serialization import as_tagged_union
from beams.typing_helper import Evaluatable

Expand Down Expand Up @@ -200,6 +201,12 @@ def cond_func():
return cond_func


class CaputParadigm(str, Enum):
AT_LOOP_FREQUENCY = "AT_LOOP_FREQUENCY"
ONCE = "ONCE"
EVERY_SECOND = "EVERY_SECOND"


@dataclass
class SequenceConditionItem(BaseSequenceItem, BaseConditionItem):
"""
Expand Down Expand Up @@ -237,23 +244,48 @@ class SetPVActionItem(BaseItem):
pv: str = ""
value: Any = 1
loop_period_sec: float = 1.0
caput_paradigm: CaputParadigm = CaputParadigm.AT_LOOP_FREQUENCY

termination_check: BaseConditionItem = field(default_factory=ConditionItem)

def __post_init__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this timer stuff probably makes the most sense living in the ActionNode, not the tree-item class. The purpose of the *Item here is to assist with serialization, and the *Node does the actual work.

We already did a bunch of work moving references away from the TreeItem into the ActionNode, it feels like a step backward to do so again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point

is_timer_periodic = True
timer_period = 1
if (self.caput_paradigm == CaputParadigm.ONCE):
is_timer_periodic = False
if (self.caput_paradigm == CaputParadigm.AT_LOOP_FREQUENCY):
timer_period = self.loop_period_sec
if (self.caput_paradigm == CaputParadigm.EVERY_SECOND):
timer_period = 1

self.work_frequency_timer = Timer(name=f"{self.name}_caput_timer",
timer_period_seconds=timer_period,
auto_start=False,
is_periodic=is_timer_periodic)
self.do_once = False

def get_tree(self) -> ActionNode:

@wrapped_action_work(self.loop_period_sec)
def work_func(comp_condition: Evaluatable) -> py_trees.common.Status:
try:
if not self.do_once:
self.work_frequency_timer.start_timer()
self.do_once = True

# Set to running
value = caget(self.pv) # double caget, this is uneeded as currently the comp_condition has caget baked in
value = caget(self.pv, as_string=isinstance(self.value, str)) # double caget, this is uneeded as currently the comp_condition has caget

if comp_condition():
self.do_once = False
return py_trees.common.Status.SUCCESS
logger.debug(f"{self.name}: Value is {value}")

# specific caput logic to SetPVActionItem
caput(self.pv, self.value)
if (not self.work_frequency_timer.is_elapsed()):
caput(self.pv, self.value)
logger.debug(f"{self.name} CAPUTTING")

return py_trees.common.Status.RUNNING
except Exception as ex:
logger.warning(f"{self.name}: work failed: {ex}")
Expand Down