Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor test_move_to_zero() #678

Merged
merged 12 commits into from
Jul 29, 2022
80 changes: 53 additions & 27 deletions apstools/devices/tests/test_positioner_soft_done.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
import time

PV_PREFIX = f"{IOC}gp:"
delay_active = False


@pytest.fixture(scope="function")
@@ -41,27 +42,33 @@ def rbv():


@run_in_thread
def delayed_complete(p, readback, delay=1):
def delayed_complete(positioner, readback, delay=1):
"Time-delayed completion of positioner move."
global delay_active

delay_active = True
time.sleep(delay)
readback.put(p.setpoint.get())
readback.put(positioner.setpoint.get())
delay_active = False


@run_in_thread
def delayed_stop(p, delay=1):
"Time-delayed stop of positioner `p`."
def delayed_stop(positioner, delay=1):
"Time-delayed stop of positioner."
time.sleep(delay)
p.stop()
positioner.stop()


class Tst_PVPos(PVPositionerSoftDone):
"To pass the readback_pv and setpoint_pv kwargs."

def __init__(self, prefix, *args, **kwargs):
super().__init__(prefix, *args, readback_pv="r", setpoint_pv="p", **kwargs)


class Tst_PVPosWStop(PVPositionerSoftDoneWithStop):
"To pass the readback_pv and setpoint_pv kwargs."

def __init__(self, prefix, *args, **kwargs):
super().__init__(prefix, *args, readback_pv="r", setpoint_pv="p", **kwargs)

@@ -72,11 +79,10 @@ def __init__(self, prefix, *args, **kwargs):
[Tst_PVPos, "", False, "read_attrs", 2],
[Tst_PVPos, "", False, "configuration_attrs", 3],
[Tst_PVPos, "", False, "component_names", 6],

[Tst_PVPosWStop, "", False, "read_attrs", 2],
[Tst_PVPosWStop, "", False, "configuration_attrs", 3],
[Tst_PVPosWStop, "", False, "component_names", 6],
]
],
)
def test_attribute_quantities(device, pv, connect, attr, expected):
"""Verify the quantities of the different attributes."""
@@ -92,7 +98,7 @@ def test_attribute_quantities(device, pv, connect, attr, expected):
[PVPositionerSoftDoneWithStop, None, None],
[PVPositionerSoftDoneWithStop, "", ""],
[PVPositionerSoftDoneWithStop, "test", "test"],
]
],
)
def test_same_sp_and_rb(klass, rb, sp):
with pytest.raises(ValueError) as exc:
@@ -102,10 +108,7 @@ def test_same_sp_and_rb(klass, rb, sp):

@pytest.mark.parametrize(
"device, has_inposition",
[
[PVPositionerSoftDone, False],
[PVPositionerSoftDoneWithStop, True]
]
[[PVPositionerSoftDone, False], [PVPositionerSoftDoneWithStop, True]],
)
def test_structure(device, has_inposition):
"actual PVs not necessary for this test"
@@ -220,24 +223,47 @@ def test_move_and_stopped_early(rbv, pos):
assert pos.inposition


def test_move_to_zero(rbv, pos):
@pytest.mark.parametrize("target", [0, round(2 + 5 * random.random(), 2), 0, 0, 0])
def test_move_to_zero(target, rbv, pos):
short_delay_for_EPICS_IOC_database_processing()

# first, move to some random, non-zero, initial position
target = round(2 + 5 * random.random(), 2)
delayed_complete(pos, rbv, delay=0.5)
status = pos.move(target) # readback set by delayed_complete()
# start from known position
known_start_position = -1
pos.setpoint.put(known_start_position)
short_delay_for_EPICS_IOC_database_processing()
assert status.done
assert status.success
assert status.elapsed >= 0.5
rbv.put(known_start_position) # note: pos.readback is read-only
short_delay_for_EPICS_IOC_database_processing(1)

# confirm all are ready
assert pos.inposition
assert pos.setpoint.get(use_monitor=False) == known_start_position
assert round(pos.readback.get(use_monitor=False), 2) == known_start_position

# pick a random time to change RBV after the move starts
rbv_delay = round(0.2 + 0.7 * random.random(), 1)

# start code that will update the RBV later
delayed_complete(pos, rbv, delay=rbv_delay)

# start the move and wait for RBV to be updated
status = pos.move(target)

# move to 0.0
longer_delay = 2
delayed_complete(pos, rbv, delay=longer_delay)
status = pos.move(0)
short_delay_for_EPICS_IOC_database_processing()
assert pos.setpoint.get() == 0
assert round(pos.position, 2) == 0

# assert not delay_active
assert pos.setpoint.get(use_monitor=False) == target

# note: pos.position has been failing (issue #668)
# Replace ``pos.position`` and force a CA get from the IOC.
assert round(pos.readback.get(use_monitor=False), 2) == target
assert pos.inposition
assert status.elapsed >= longer_delay

assert status.done
assert status.success

# verify the readback was updated AFTER the setpoint with correct delay
dt = (
pos.readback.read()["pos"]["timestamp"]
- pos.setpoint.read()["pos_setpoint"]["timestamp"]
)
assert round(dt, 1) == rbv_delay, f"dt={dt}, rbv_delay={rbv_delay}"
4 changes: 1 addition & 3 deletions apstools/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -7,9 +7,7 @@


def short_delay_for_EPICS_IOC_database_processing(delay=None):
if delay is None:
delay = SHORT_DELAY_FOR_EPICS_IOC_DATABASE_PROCESSING
time.sleep(delay)
time.sleep(delay or SHORT_DELAY_FOR_EPICS_IOC_DATABASE_PROCESSING)


def common_attribute_quantities_test(device, pv, connect, attr, expected):