Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ticket6777 #593

Merged
merged 17 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions common_tests/eurotherm.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def get_device(self):
def get_emulator_device(self):
pass

@abc.abstractmethod
def _get_temperature_setter_wrapper(self):
pass

def get_prefix(self):
return "{}:A01".format(self.get_device())

Expand Down Expand Up @@ -97,7 +101,9 @@ def test_WHEN_set_ramp_rate_in_K_per_min_THEN_current_temperature_reaches_set_po
self.ca.set_pv_value("RATE:SP", ramp_rate)
self.ca.assert_that_pv_is_number("RATE", ramp_rate, 0.1)
self.ca.set_pv_value("RAMPON:SP", ramp_on)
self.ca.set_pv_value("TEMP:SP", setpoint_temperature)

with self._get_temperature_setter_wrapper():
self.ca.set_pv_value("TEMP:SP", setpoint_temperature)

start = time.time()
self.ca.assert_that_pv_is_number("TEMP:SP:RBV", setpoint_temperature, tolerance=0.1, timeout=60)
Expand Down Expand Up @@ -149,13 +155,15 @@ def test_GIVEN_temperature_setpoint_followed_by_calibration_change_WHEN_same_set
tolerance = 0.2
self.ca.set_pv_value("RAMPON:SP", 0)
reset_calibration_file(self.ca)
self.ca.set_pv_value("TEMP:SP", temperature)
with self._get_temperature_setter_wrapper():
self.ca.set_pv_value("TEMP:SP", temperature)
self.ca.assert_that_pv_is_number("TEMP:SP:RBV", temperature, tolerance=tolerance, timeout=rbv_change_timeout)
with use_calibration_file(self.ca, "C006.txt"):
self.ca.assert_that_pv_is_not_number("TEMP:SP:RBV", temperature, tolerance=tolerance, timeout=rbv_change_timeout)

# Act
self.ca.set_pv_value("TEMP:SP", temperature)
with self._get_temperature_setter_wrapper():
self.ca.set_pv_value("TEMP:SP", temperature)

# Assert
self.ca.assert_that_pv_is_number("TEMP:SP:RBV", temperature, tolerance=tolerance, timeout=rbv_change_timeout)
Expand Down Expand Up @@ -242,7 +250,8 @@ def test_WHEN_disconnected_THEN_in_alarm(self, record):
with self._lewis.backdoor_simulate_disconnected_device():
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=30)
# Assert alarms clear on reconnection
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30)
with self._get_temperature_setter_wrapper():
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30)

@parameterized.expand(parameterized_list(PID_TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
Expand Down
13 changes: 9 additions & 4 deletions tests/eurotherm_modbus.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import unittest
import contextlib

from parameterized import parameterized

from utils.test_modes import TestModes
from utils.ioc_launcher import get_default_ioc_dir, ProcServLauncher
from common_tests.eurotherm import EurothermBaseTests, PID_TEST_VALUES, TEST_VALUES

from utils.testing import skip_if_recsim
from utils.testing import parameterized_list
from common_tests.eurotherm import EurothermBaseTests, PID_TEST_VALUES, TEST_VALUES

# Internal Address of device (must be 2 characters)
ADDRESS = "A01"
Expand All @@ -23,6 +24,7 @@
"ioc_launcher_class": ProcServLauncher,
"macros": {
"COMMS_MODE": "modbus",
"NEEDLE_VALVE": "no",
"ADDR": ADDRESS,
"ADDR_1": ADDR_1,
"ADDR_2": "",
Expand All @@ -48,14 +50,16 @@

TEST_MODES = [TestModes.DEVSIM]


class EurothermModbusTests(EurothermBaseTests, unittest.TestCase):
def get_device(self):
return DEVICE

def get_emulator_device(self):
return EMULATOR_DEVICE

def _get_temperature_setter_wrapper(self):
return contextlib.nullcontext()

def test_WHEN_autotune_set_THEN_readback_updates(self):
for state in [0, 1]:
self.ca.set_pv_value("AUTOTUNE:SP", state)
Expand All @@ -76,3 +80,4 @@ def test_WHEN_d_set_THEN_d_updates(self, _, val):
@parameterized.expand(parameterized_list([0, 0.5, 100]))
def test_WHEN_max_output_set_THEN_max_output_updates(self, _, val):
self.ca.assert_setting_setpoint_sets_readback(value=val, readback_pv="MAX_OUTPUT", timeout=15)

152 changes: 152 additions & 0 deletions tests/eurotherm_modbus_needlevalve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import unittest

from parameterized import parameterized

from utils.channel_access import ChannelAccess
from utils.test_modes import TestModes
from utils.ioc_launcher import get_default_ioc_dir, ProcServLauncher
from utils.testing import skip_if_recsim
from utils.testing import ManagerMode
from common_tests.eurotherm import EurothermBaseTests

from genie_python.genie_cachannel_wrapper import WriteAccessException

# Internal Address of device (must be 2 characters)
ADDRESS = "A01"
# Numerical address of the device
ADDR_1 = "01" # Leave this value as 1 when changing the ADDRESS value above - hard coded in LEWIS emulator
DEVICE = "EUROTHRM_01"

EMULATOR_DEVICE = "eurotherm"

IOCS = [
{
"name": DEVICE,
"directory": get_default_ioc_dir("EUROTHRM"),
"ioc_launcher_class": ProcServLauncher,
"macros": {
"COMMS_MODE": "modbus",
"NEEDLE_VALVE": "yes",
"ADDR": ADDRESS,
"ADDR_1": ADDR_1,
"ADDR_2": "",
"ADDR_3": "",
"ADDR_4": "",
"ADDR_5": "",
"ADDR_6": "",
"ADDR_7": "",
"ADDR_8": "",
"ADDR_9": "",
"ADDR_10": "",
"TEMP_SCALING_1": "0.1",
"P_SCALING_1": "1",
"I_SCALING_1": "1",
"D_SCALING_1": "1",
"OUTPUT_SCALING_1": "0.1",
},
"emulator": EMULATOR_DEVICE,
"lewis_protocol": "eurotherm_modbus",
},
{
# INSTETC is required to control manager mode.
"name": "INSTETC",
"directory": get_default_ioc_dir("INSTETC"),
"custom_prefix": "CS",
"pv_for_existence": "MANAGER",
}
]


TEST_MODES = [TestModes.DEVSIM]

class EurothermModbusNeedleValveTests(EurothermBaseTests, unittest.TestCase):
def get_device(self):
return DEVICE

def get_emulator_device(self):
return EMULATOR_DEVICE

def _get_temperature_setter_wrapper(self):
return ManagerMode(ChannelAccess())

def setUp(self):
super(EurothermModbusNeedleValveTests, self).setUp()
with ManagerMode(ChannelAccess()):
self.ca.set_pv_value("FLOW_SP_MODE_SELECT:SP", "AUTO")

# READ TESTS
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_using_needle_valve_THEN_flow_exists(self):
self._lewis.backdoor_set_on_device("flow", 5.0)
self.ca.assert_that_pv_is_number("FLOW", 5.0, tolerance=0, timeout=15)

@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_using_needle_valve_THEN_valve_dir_exists(self):
self._lewis.backdoor_set_on_device("valve_direction", 1)
self.ca.assert_that_pv_is("VALVE_DIR", "OPENING", timeout=15)

# WRITE TESTS
@parameterized.expand([
("FLOW_SP_LOWLIM", 2.0),
("FLOW_SP_MODE_SELECT", "MANUAL"),
("NEEDLE_VALVE_STOP", "STOPPED")])
def test_WHEN_using_needle_valve_WHEN_SP_set_then_RBV_updates(self, pv, val):
with ManagerMode(ChannelAccess()):
self.ca.assert_setting_setpoint_sets_readback(value=val, readback_pv=pv, timeout=15)

# MANAGER MODE TESTS
@parameterized.expand([
("FLOW_SP_LOWLIM:SP", 2.0),
("FLOW_SP_MODE_SELECT:SP", "MANUAL"),
("NEEDLE_VALVE_STOP:SP", "STOPPED")])
def test_WHEN_using_needle_valve_WHEN_manager_mode_on_THEN_writes_allowed(self, pv, val):
# try set with manager mode and check that it was set.
with ManagerMode(ChannelAccess()):
self.ca.set_pv_value(pv, val)
self.ca.assert_that_pv_is(pv, val, timeout=15)

@parameterized.expand([
("FLOW_SP_LOWLIM:SP", 2.0),
("FLOW_SP_MODE_SELECT:SP", "MANUAL"),
("NEEDLE_VALVE_STOP:SP", "STOPPED")])
def test_WHEN_using_needle_valve_WHEN_manager_mode_off_THEN_writes_disallowed(self, pv, val):
with self.assertRaises(WriteAccessException):
self.ca.set_pv_value(pv, val)

# SP MODE TESTS
@parameterized.expand([
("TEMP:SP.DISP", "MANUAL"),
("MANUAL_FLOW:SP.DISP", "AUTO")])
def test_WHEN_using_needle_valve_THEN_sp_modes_disable_correct_PV(self, pv_disp, mode):
with ManagerMode(ChannelAccess()):
self.ca.set_pv_value("FLOW_SP_MODE_SELECT:SP", mode)
self.ca.assert_that_pv_is(pv_disp, "1", timeout=15)

@parameterized.expand([
("TEMP:SP", "TEMP:SP:RBV", 2.0, "AUTO" ),
("MANUAL_FLOW:SP", "MANUAL_FLOW", 8.0, "MANUAL")])
def test_WHEN_using_needle_valve_and_correct_sp_mode_WHEN_manager_mode_on_THEN_writes_allowed(self, sp_pv, rbv_pv, val, mode):
with ManagerMode(ChannelAccess()):
self.ca.set_pv_value("FLOW_SP_MODE_SELECT:SP", mode)
self.ca.assert_setting_setpoint_sets_readback(val, readback_pv=rbv_pv, set_point_pv=sp_pv, timeout=15)

@parameterized.expand([
("TEMP:SP", 2.0, "MANUAL" ),
("MANUAL_FLOW:SP", 8.0, "AUTO")])
def test_WHEN_using_needle_valve_and_incorrect_sp_mode_WHEN_manager_mode_on_THEN_writes_disallowed(self, pv, val, mode):
with ManagerMode(ChannelAccess()):
self.ca.set_pv_value("FLOW_SP_MODE_SELECT:SP", mode)
with self.assertRaises(WriteAccessException):
self.ca.set_pv_value(pv, val)

@parameterized.expand([
("TEMP:SP", 2.0, "AUTO"),
("MANUAL_FLOW:SP", 8.0, "MANUAL"),
("TEMP:SP", 2.0, "MANUAL"),
("MANUAL_FLOW:SP", 8.0, "AUTO")])
def test_WHEN_using_needle_valve_and_any_sp_mode_WHEN_manager_mode_off_THEN_writes_disallowed(self, pv, val, mode):
with ManagerMode(ChannelAccess()):
self.ca.set_pv_value("FLOW_SP_MODE_SELECT:SP", mode)
with self.assertRaises(WriteAccessException):
self.ca.set_pv_value(pv, val)