-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #898 from BCDA-APS/897-temperature-sims
Hoist simulated controller positioners from bluesky_training
- Loading branch information
Showing
8 changed files
with
273 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
""" | ||
Simulate process controllers as positioners using EPICS records. | ||
.. autosummary:: | ||
~SimulatedSwaitControllerPositioner | ||
~SimulatedTransformControllerPositioner | ||
""" | ||
|
||
from . import PVPositionerSoftDoneWithStop | ||
from ..synApps import SwaitRecord | ||
from ..synApps import TransformRecord | ||
from ophyd import FormattedComponent as FC | ||
import time | ||
|
||
|
||
class SimulatedSwaitControllerPositioner(PVPositionerSoftDoneWithStop): | ||
""" | ||
Simulated process controller as positioner with EPICS swait record. | ||
The swait record completes the feedback loop, computing | ||
the next simulated controller reading. | ||
Example with ``swait`` record:: | ||
controller = SimulatedSwaitControllerPositioner( | ||
"", | ||
name="controller", | ||
loop_pv="gp:userCalc1", | ||
) | ||
controller.wait_for_connection() | ||
controller.setup(25) | ||
.. autosummary:: | ||
~setup | ||
""" | ||
|
||
loop = FC(SwaitRecord, "{loop_pv}", kind="config") | ||
|
||
def __init__(self, *args, loop_pv="", **kwargs): | ||
if len(loop_pv.strip()) == 0: | ||
raise ValueError("Must supply a value for 'loop_pv'.") | ||
|
||
self.loop_pv = loop_pv | ||
|
||
kwargs["readback_pv"] = f"{loop_pv}.VAL" | ||
kwargs["setpoint_pv"] = f"{loop_pv}.B" | ||
|
||
super().__init__(*args, **kwargs) | ||
|
||
def setup( | ||
self, | ||
setpoint, | ||
label="controller", | ||
noise=1, | ||
period="1 second", | ||
max_change=1, | ||
tolerance=1, | ||
): | ||
""" | ||
Configure the swait record as a process controller. | ||
""" | ||
self.tolerance.put(tolerance) | ||
|
||
swait = self.loop | ||
swait.reset() # remove any prior configuration | ||
time.sleep(2.0 / 60) # short pause for IOC processing | ||
|
||
swait.description.put(label) | ||
swait.channels.A.input_value.put(setpoint) # readback | ||
swait.channels.A.input_pv.put(swait.calculated_value.pvname) | ||
swait.channels.B.input_value.put(setpoint) # setpoint | ||
swait.channels.C.input_value.put(noise) | ||
swait.channels.D.input_value.put(max_change) | ||
swait.scanning_rate.put(period) | ||
swait.precision.put(3) | ||
swait.calculated_value.put(setpoint) # preset initial value | ||
swait.calculation.put("A+max(-D,min(D,(B-A)))+C*(RNDM-0.5)") | ||
|
||
|
||
class SimulatedTransformControllerPositioner(PVPositionerSoftDoneWithStop): | ||
""" | ||
Simulated process controller as positioner with EPICS transform record. | ||
The transform record completes the feedback loop, computing the next | ||
simulated controller reading and reporting if the readback is "in position". | ||
Example with ``transform`` record:: | ||
controller = SimulatedTransformControllerPositioner( | ||
"", name="controller", loop_pv="gp:userTran1", | ||
) controller.wait_for_connection() temperature.setup(25) | ||
.. autosummary:: | ||
~setup | ||
""" | ||
|
||
loop = FC(TransformRecord, "{loop_pv}", kind="config") | ||
|
||
def __init__(self, *args, loop_pv="", **kwargs): | ||
if len(loop_pv.strip()) == 0: | ||
raise ValueError("Must supply a value for 'loop_pv'.") | ||
|
||
self.loop_pv = loop_pv | ||
|
||
kwargs["readback_pv"] = f"{loop_pv}.H" | ||
kwargs["setpoint_pv"] = f"{loop_pv}.B" | ||
|
||
super().__init__(*args, **kwargs) | ||
|
||
self.following_error = self.loop.channels.E.current_value | ||
|
||
def setup( | ||
self, | ||
setpoint, | ||
label="controller", | ||
noise=2, | ||
period="1 second", | ||
max_change=2, | ||
tolerance=1, | ||
): | ||
""" | ||
Configure the transform record as a temperature controller. | ||
""" | ||
self.wait_for_connection() | ||
self.tolerance.put(tolerance) | ||
|
||
transform = self.loop | ||
transform.reset() # remove any prior configuration | ||
time.sleep(2.0 / 60) # short pause for IOC processing | ||
|
||
transform.description.put(label) | ||
|
||
transform.channels.A.comment.put("last readback") | ||
transform.channels.A.current_value.put(setpoint) # readback | ||
transform.channels.A.input_pv.put(transform.channels.H.current_value.pvname) | ||
|
||
transform.channels.B.comment.put("setpoint") | ||
transform.channels.B.current_value.put(setpoint) # setpoint | ||
|
||
transform.channels.C.comment.put("noise level") | ||
transform.channels.C.current_value.put(noise) | ||
|
||
transform.channels.D.comment.put("max_change") | ||
transform.channels.D.current_value.put(max_change) | ||
|
||
transform.channels.E.comment.put("following error") | ||
transform.channels.E.expression.put("B-A") | ||
|
||
transform.channels.F.comment.put("step") | ||
transform.channels.F.expression.put("max(-D,min(D,E))") | ||
|
||
transform.channels.G.comment.put("noise") | ||
transform.channels.G.expression.put("C*(RNDM-0.5)") | ||
|
||
transform.channels.H.comment.put("readback") | ||
transform.channels.H.current_value.put(setpoint) # preset initial value | ||
transform.channels.H.expression.put("A+F+G") | ||
|
||
transform.channels.I.comment.put("tolerance") | ||
transform.channels.I.current_value.put(tolerance) | ||
|
||
transform.channels.J.comment.put("in position") | ||
transform.channels.J.expression.put("abs(H-B)<=I") | ||
|
||
transform.precision.put(3) | ||
|
||
transform.scanning_rate.put(period) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
""" | ||
Test the simulated controllers. | ||
""" | ||
|
||
import math | ||
from contextlib import nullcontext as does_not_raise | ||
|
||
import pytest | ||
|
||
from ...tests import IOC_GP | ||
from ...tests import timed_pause | ||
from .. import simulated_controllers as stc | ||
|
||
PV_SWAIT = f"{IOC_GP}userCalc7" | ||
PV_TRANS = f"{IOC_GP}userTran7" | ||
|
||
|
||
@pytest.mark.parametrize("sp", [-55, 120, 998]) | ||
@pytest.mark.parametrize( | ||
"pv, controller_class, context, exp_info", | ||
[ | ||
[PV_SWAIT, stc.SimulatedSwaitControllerPositioner, does_not_raise(), "None"], | ||
[PV_TRANS, stc.SimulatedTransformControllerPositioner, does_not_raise(), "None"], | ||
["", stc.SimulatedSwaitControllerPositioner, pytest.raises(ValueError), "Must supply a value for"], | ||
["", stc.SimulatedTransformControllerPositioner, pytest.raises(ValueError), "Must supply a value for"], | ||
["wrong_pv", stc.SimulatedSwaitControllerPositioner, pytest.raises(TimeoutError), "Failed to connect"], | ||
["wrong_pv", stc.SimulatedTransformControllerPositioner, pytest.raises(TimeoutError), "Failed to connect"], | ||
], | ||
) | ||
@pytest.mark.parametrize("tol", [0.99, 2, 5]) | ||
def test_simulators(sp, pv, controller_class, context, exp_info, tol): | ||
""" | ||
Test the simulator. | ||
""" | ||
with context as info: | ||
sim = controller_class("", loop_pv=pv, name="sim") | ||
sim.wait_for_connection() | ||
assert exp_info in str(info), f"{str(info)=!r}" | ||
if exp_info != "None": | ||
return | ||
|
||
timed_pause() | ||
assert sim.connected | ||
|
||
sim.setup(sp, tolerance=tol) | ||
timed_pause() | ||
|
||
assert math.isclose(sim.setpoint.get(), sp, abs_tol=0.01) | ||
assert math.isclose(sim.tolerance.get(), tol, abs_tol=0.0001) | ||
assert math.isclose(sim.position, sp, abs_tol=tol) | ||
assert sim.inposition |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.