Skip to content

Commit

Permalink
- preparing for 4.4.1 release
Browse files Browse the repository at this point in the history
- fixed modbus issue with new version of pymodbus
  • Loading branch information
rptmat57 committed Jan 31, 2023
1 parent d9d0fc8 commit 2952f55
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 59 deletions.
2 changes: 1 addition & 1 deletion NEMO/apps/sensors/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def do_read_values(self, sensor: Sensor_model) -> List:
valid_connection = client.connect()
if not valid_connection:
raise Exception(f"Connection to server {sensor.card.server}:{sensor.card.port} could not be established")
kwargs = {"unit": sensor.unit_id} if sensor.unit_id is not None else {}
kwargs = {"slave": sensor.unit_id} if sensor.unit_id is not None else {}
read_response = client.read_holding_registers(sensor.read_address, sensor.number_of_values, **kwargs)
if read_response.isError():
raise Exception(str(read_response))
Expand Down
3 changes: 2 additions & 1 deletion NEMO/interlocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def _send_command(self, interlock: Interlock_model, command_type: Interlock_mode
elif command_type == Interlock_model.State.UNLOCKED:
state = self.setRelayState(interlock, self.MODBUS_ON)
except Exception as error:
interlocks_logger.exception(error)
raise Exception("General exception: " + str(error))
return state

Expand All @@ -385,7 +386,7 @@ def setRelayState(cls, interlock: Interlock_model, state: {0, 1}) -> Interlock_m
coil = interlock.channel
client = ModbusTcpClient(interlock.card.server, port=interlock.card.port)
client.connect()
kwargs = {"unit": interlock.unit_id} if interlock.unit_id is not None else {}
kwargs = {"slave": interlock.unit_id} if interlock.unit_id is not None else {}
write_reply = client.write_coil(coil, state, **kwargs)
if write_reply.isError():
raise Exception(str(write_reply))
Expand Down
76 changes: 76 additions & 0 deletions NEMO/tests/test_interlocks/test_modbus_interlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from unittest import mock

from django.conf import settings
from django.test import TestCase
from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse
from pymodbus.bit_write_message import WriteSingleCoilRequest, WriteSingleCoilResponse
from pymodbus.client import ModbusTcpClient
from pymodbus.pdu import ModbusRequest, ModbusResponse

# This method will be used by the mock to replace socket.send
# In the stanford interlock case, it will return a list of byte
from NEMO.models import Interlock, InterlockCard, InterlockCardCategory, Tool, User

server1 = "server1.nist.gov"
server2 = "https://server2.nist.gov"
port1 = 80
port2 = 8080


def mocked_modbus_client(*args, **kwargs):
class MockModbusClient(ModbusTcpClient):
def connect(self):
return True

def execute(self, request: ModbusRequest = None) -> ModbusResponse:
if isinstance(request, WriteSingleCoilRequest):
# store value so we can return it later. if server1, good otherwise set to opposite
if self.params.host == server1:
self.tmp_value = request.value
else:
return request.doException(None)
return WriteSingleCoilResponse()
elif isinstance(request, ReadCoilsRequest):
return ReadCoilsResponse(values=[self.tmp_value])
return ModbusResponse()

return MockModbusClient(*args, **kwargs)


class ModbusInterlockTestCase(TestCase):
tool: Tool = None
wrong_response_interlock: Interlock = None
bad_interlock: Interlock = None

def setUp(self):
global tool, wrong_response_interlock, bad_interlock
# enable interlock functionality
settings.__setattr__("INTERLOCKS_ENABLED", True)
interlock_card_category = InterlockCardCategory.objects.get(key="modbus_tcp")
interlock_card = InterlockCard.objects.create(
server=server1, port=port1, number=1, category=interlock_card_category
)
interlock_card2 = InterlockCard.objects.create(
server=server2, port=port2, number=1, category=interlock_card_category
)
interlock = Interlock.objects.create(card=interlock_card, channel=1, unit_id=1)
wrong_response_interlock = Interlock.objects.create(card=interlock_card2, channel=2)
bad_interlock = Interlock.objects.create(card=interlock_card2, channel=3)
owner = User.objects.create(username="mctest", first_name="Testy", last_name="McTester")
tool = Tool.objects.create(name="test_tool", primary_owner=owner, interlock=interlock)

@mock.patch("NEMO.interlocks.ModbusTcpClient", side_effect=mocked_modbus_client)
def test_all_good(self, mock_args):
self.assertTrue(tool.interlock.unlock())
self.assertEqual(tool.interlock.state, Interlock.State.UNLOCKED)
self.assertTrue(tool.interlock.lock())
self.assertEqual(tool.interlock.state, Interlock.State.LOCKED)

@mock.patch("NEMO.interlocks.ModbusTcpClient", side_effect=mocked_modbus_client)
def test_server2_command_fail(self, mock_args):
self.assertFalse(wrong_response_interlock.unlock())
self.assertEqual(wrong_response_interlock.state, Interlock.State.UNKNOWN)
self.assertTrue("Exception Response" in wrong_response_interlock.most_recent_reply)
self.assertFalse(wrong_response_interlock.lock())
self.assertEqual(wrong_response_interlock.state, Interlock.State.UNKNOWN)
self.assertTrue("Exception Response" in wrong_response_interlock.most_recent_reply)
58 changes: 36 additions & 22 deletions NEMO/tests/test_interlocks/test_stanford_interlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,37 @@

# This method will be used by the mock to replace socket.send
# In the stanford interlock case, it will return a list of byte
from NEMO.models import Tool, Interlock, InterlockCard, InterlockCardCategory, User
from NEMO.models import Interlock, InterlockCard, InterlockCardCategory, Tool, User

server1 = 'server1.nist.gov'
server2 = 'https://server2.nist.gov'
server1 = "server1.nist.gov"
server2 = "https://server2.nist.gov"
port1 = 80
port2 = 8080


def mocked_socket_send(*args, **kwargs):
class MockSocket(socket):

def __init__(self):
super().__init__()
self.response = None
self.address = None

def send(self, data: bytes, flags: int = ...) -> int:
schema = struct.Struct('!20siiiiiiiiibbbbb18s')
schema = struct.Struct("!20siiiiiiiiibbbbb18s")
message_to_be_sent = schema.unpack(data)
card_number = message_to_be_sent[2]
even_port = message_to_be_sent[3]
odd_port = message_to_be_sent[4]
channel = message_to_be_sent[5]
command_type = message_to_be_sent[7]
command_result = 1 # success
command_result = 1 # success

# server2 port 2 channel 2 will send a command failed back
if self.address[0] == server2 and self.address[1] == port2 and channel == 2:
command_result = 0 # fail
command_result = 0 # fail

# created the response to be sent later
response_schema = struct.Struct('!iiiiiiiiibbbbb')
response_schema = struct.Struct("!iiiiiiiiibbbbb")
self.response = response_schema.pack(
1, # Instruction count
card_number,
Expand Down Expand Up @@ -65,6 +65,7 @@ def recv(self, bufsize: int, flags: int = ...) -> bytes:

return MockSocket()


class StanfordInterlockTestCase(TestCase):
tool: Tool = None
wrong_response_interlock: Interlock = None
Expand All @@ -73,33 +74,46 @@ class StanfordInterlockTestCase(TestCase):
def setUp(self):
global tool, wrong_response_interlock, bad_interlock
# enable interlock functionality
settings.__setattr__('INTERLOCKS_ENABLED', True)
settings.__setattr__("INTERLOCKS_ENABLED", True)
even_port = 124
odd_port = 125
interlock_card_category = InterlockCardCategory.objects.get(key='stanford')
interlock_card = InterlockCard.objects.create(server=server1, port=port1, number=1, even_port=even_port, odd_port=odd_port, category=interlock_card_category)
interlock_card2 = InterlockCard.objects.create(server=server2, port=port2, number=1, even_port=even_port, odd_port=odd_port, category=interlock_card_category)
interlock_card_category = InterlockCardCategory.objects.get(key="stanford")
interlock_card = InterlockCard.objects.create(
server=server1,
port=port1,
number=1,
even_port=even_port,
odd_port=odd_port,
category=interlock_card_category,
)
interlock_card2 = InterlockCard.objects.create(
server=server2,
port=port2,
number=1,
even_port=even_port,
odd_port=odd_port,
category=interlock_card_category,
)
interlock = Interlock.objects.create(card=interlock_card, channel=1)
wrong_response_interlock = Interlock.objects.create(card=interlock_card2, channel=2)
bad_interlock = Interlock.objects.create(card=interlock_card2, channel=3)
owner = User.objects.create(username='mctest', first_name='Testy', last_name='McTester')
tool = Tool.objects.create(name='test_tool', primary_owner=owner, interlock=interlock)
owner = User.objects.create(username="mctest", first_name="Testy", last_name="McTester")
tool = Tool.objects.create(name="test_tool", primary_owner=owner, interlock=interlock)

@mock.patch('NEMO.interlocks.socket.socket', side_effect=mocked_socket_send)
def test_all_good(self, mock_args):
@mock.patch("NEMO.interlocks.socket.socket", side_effect=mocked_socket_send)
def test_all_good(self, mock_args):
self.assertTrue(tool.interlock.unlock())
self.assertEquals(tool.interlock.state, Interlock.State.UNLOCKED)
self.assertTrue(tool.interlock.lock())
self.assertEquals(tool.interlock.state, Interlock.State.LOCKED)

@mock.patch('NEMO.interlocks.socket.socket', side_effect=mocked_socket_send)
@mock.patch("NEMO.interlocks.socket.socket", side_effect=mocked_socket_send)
def test_server2_command_fail(self, mock_args):
self.assertFalse(wrong_response_interlock.unlock())
self.assertEquals(wrong_response_interlock.state, Interlock.State.UNKNOWN)
self.assertTrue('failed' in wrong_response_interlock.most_recent_reply)
self.assertTrue('command return value = 0' in wrong_response_interlock.most_recent_reply)
self.assertTrue("failed" in wrong_response_interlock.most_recent_reply)
self.assertTrue("command return value = 0" in wrong_response_interlock.most_recent_reply)
self.assertFalse(wrong_response_interlock.lock())
self.assertEquals(wrong_response_interlock.state, Interlock.State.UNKNOWN)
self.assertTrue('failed' in wrong_response_interlock.most_recent_reply)
self.assertTrue('command return value = 0' in wrong_response_interlock.most_recent_reply)

self.assertTrue("failed" in wrong_response_interlock.most_recent_reply)
self.assertTrue("command return value = 0" in wrong_response_interlock.most_recent_reply)
87 changes: 53 additions & 34 deletions NEMO/tests/test_interlocks/test_web_relay_interlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,30 @@
from NEMO.interlocks import WebRelayHttpInterlock
from NEMO.models import Interlock, InterlockCard, InterlockCardCategory, Tool, User

server1 = 'server1.nist.gov'
server2 = 'https://server2.nist.gov'
server1 = "server1.nist.gov"
server2 = "https://server2.nist.gov"
port1 = 80
port2 = 8080
ERROR_500 = '500 Server Error'
UNLOCK_ERROR = 'unlock: bad interlock'
LOCK_ERROR = 'lock: bad interlock'
ERROR_500 = "500 Server Error"
UNLOCK_ERROR = "unlock: bad interlock"
LOCK_ERROR = "lock: bad interlock"

bad_interlock: Interlock = None
wrong_response_interlock: Interlock = None
wrong_state_interlock: Interlock = None
disabled_interlock: Interlock = None


def web_relay_response(relayNumber, state) -> str:
return f"<datavalues>" \
f" <relay{relayNumber}state>{state}</relay{relayNumber}state>" \
f"</datavalues>"
return f"<datavalues>" f" <relay{relayNumber}state>{state}</relay{relayNumber}state>" f"</datavalues>"


# This method will be used by the mock to replace requests.get
# In the web relay case, it will return a xml file with the relay statuses
def mocked_requests_get(*args, **kwargs):
from requests import Response
class MockResponse(Response):

class MockResponse(Response):
def __init__(self, content, status_code, reason=None):
super().__init__()
self.content = content.encode()
Expand All @@ -42,23 +41,41 @@ def content(self):
return self.content

# interlock 3 on server 2 is bad
if args[0] in [f'{bad_interlock.card.server}:{bad_interlock.card.port}/{xml_name}?relay3=1' for xml_name in WebRelayHttpInterlock.state_xml_names]:
return MockResponse('', 500, UNLOCK_ERROR)
elif args[0] in [f'{bad_interlock.card.server}:{bad_interlock.card.port}/{xml_name}?relay3=0' for xml_name in WebRelayHttpInterlock.state_xml_names]:
return MockResponse('', 500, LOCK_ERROR)
if args[0] in [
f"{bad_interlock.card.server}:{bad_interlock.card.port}/{xml_name}?relay3=1"
for xml_name in WebRelayHttpInterlock.state_xml_names
]:
return MockResponse("", 500, UNLOCK_ERROR)
elif args[0] in [
f"{bad_interlock.card.server}:{bad_interlock.card.port}/{xml_name}?relay3=0"
for xml_name in WebRelayHttpInterlock.state_xml_names
]:
return MockResponse("", 500, LOCK_ERROR)
# interlock 2 on server 2 sends wrong response
elif args[0] in [f'{wrong_response_interlock.card.server}:{wrong_response_interlock.card.port}/{xml_name}?relay2=0' for xml_name in WebRelayHttpInterlock.state_xml_names] or args[0] in [f'{wrong_response_interlock.card.server}:{wrong_response_interlock.card.port}/{xml_name}?relay2=1' for xml_name in WebRelayHttpInterlock.state_xml_names]:
return MockResponse('bad response', 200)
elif args[0] in [
f"{wrong_response_interlock.card.server}:{wrong_response_interlock.card.port}/{xml_name}?relay2=0"
for xml_name in WebRelayHttpInterlock.state_xml_names
] or args[0] in [
f"{wrong_response_interlock.card.server}:{wrong_response_interlock.card.port}/{xml_name}?relay2=1"
for xml_name in WebRelayHttpInterlock.state_xml_names
]:
return MockResponse("bad response", 200)
# interlock 5 on server 2 sends wrong state response
elif args[0] in [f'{wrong_state_interlock.card.server}:{wrong_state_interlock.card.port}/stateFull.xml?relay5=0' for xml_name in WebRelayHttpInterlock.state_xml_names] or args[0] in [f'{wrong_state_interlock.card.server}:{wrong_state_interlock.card.port}/stateFull.xml?relay5=1' for xml_name in WebRelayHttpInterlock.state_xml_names]:
elif args[0] in [
f"{wrong_state_interlock.card.server}:{wrong_state_interlock.card.port}/stateFull.xml?relay5=0"
for xml_name in WebRelayHttpInterlock.state_xml_names
] or args[0] in [
f"{wrong_state_interlock.card.server}:{wrong_state_interlock.card.port}/stateFull.xml?relay5=1"
for xml_name in WebRelayHttpInterlock.state_xml_names
]:
return MockResponse(web_relay_response(5, 10), 200)
elif 'stateFull.xml' in args[0]:
elif "stateFull.xml" in args[0]:
# grab the relay number and the state from the request, and return it to pretend it was successful
url_state = args[0][-8:]
relay_number, relay_state = [int(i) for i in url_state if i.isdigit()]
return MockResponse(web_relay_response(relay_number, relay_state), 200)

return MockResponse('', 404)
return MockResponse("", 404)


class WebRelayInterlockTestCase(TestCase):
Expand All @@ -67,36 +84,38 @@ class WebRelayInterlockTestCase(TestCase):
def setUp(self):
global tool, wrong_response_interlock, wrong_state_interlock, bad_interlock, disabled_interlock
# enable interlock functionality
settings.__setattr__('INTERLOCKS_ENABLED', True)
interlock_card_category = InterlockCardCategory.objects.get(key='web_relay_http')
settings.__setattr__("INTERLOCKS_ENABLED", True)
interlock_card_category = InterlockCardCategory.objects.get(key="web_relay_http")
interlock_card = InterlockCard.objects.create(server=server1, port=port1, category=interlock_card_category)
interlock_card2 = InterlockCard.objects.create(server=server2, port=port2, category=interlock_card_category)
interlock_card3 = InterlockCard.objects.create(server=server2, port=port2, category=interlock_card_category, enabled=False)
interlock_card3 = InterlockCard.objects.create(
server=server2, port=port2, category=interlock_card_category, enabled=False
)
disabled_interlock = Interlock.objects.create(card=interlock_card3, channel=3)
interlock = Interlock.objects.create(card=interlock_card, channel=1)
wrong_response_interlock = Interlock.objects.create(card=interlock_card2, channel=2)
wrong_state_interlock = Interlock.objects.create(card=interlock_card2, channel=5)
bad_interlock = Interlock.objects.create(card=interlock_card2, channel=3)
owner = User.objects.create(username='mctest', first_name='Testy', last_name='McTester')
tool = Tool.objects.create(name='test_tool', primary_owner=owner, interlock=interlock)
owner = User.objects.create(username="mctest", first_name="Testy", last_name="McTester")
tool = Tool.objects.create(name="test_tool", primary_owner=owner, interlock=interlock)

@mock.patch('NEMO.interlocks.requests.get', side_effect=mocked_requests_get)
@mock.patch("NEMO.interlocks.requests.get", side_effect=mocked_requests_get)
def test_disabled_card(self, mock_args):
self.assertTrue(disabled_interlock.unlock())
self.assertEqual(disabled_interlock.state, Interlock.State.UNLOCKED)
self.assertTrue('Interlock interface mocked out' in disabled_interlock.most_recent_reply)
self.assertTrue("Interlock interface mocked out" in disabled_interlock.most_recent_reply)
self.assertTrue(disabled_interlock.lock())
self.assertEqual(disabled_interlock.state, Interlock.State.LOCKED)
self.assertTrue('Interlock interface mocked out' in disabled_interlock.most_recent_reply)
self.assertTrue("Interlock interface mocked out" in disabled_interlock.most_recent_reply)

@mock.patch('NEMO.interlocks.requests.get', side_effect=mocked_requests_get)
@mock.patch("NEMO.interlocks.requests.get", side_effect=mocked_requests_get)
def test_all_good(self, mock_args):
self.assertTrue(tool.interlock.unlock())
self.assertEqual(tool.interlock.state, Interlock.State.UNLOCKED)
self.assertTrue(tool.interlock.lock())
self.assertEqual(tool.interlock.state, Interlock.State.LOCKED)

@mock.patch('NEMO.interlocks.requests.get', side_effect=mocked_requests_get)
@mock.patch("NEMO.interlocks.requests.get", side_effect=mocked_requests_get)
def test_error_response_from_interlock(self, mock_args):
self.assertFalse(bad_interlock.unlock())
self.assertTrue(ERROR_500 in bad_interlock.most_recent_reply)
Expand All @@ -108,16 +127,16 @@ def test_error_response_from_interlock(self, mock_args):
self.assertTrue(LOCK_ERROR in bad_interlock.most_recent_reply)
self.assertEqual(bad_interlock.state, Interlock.State.UNKNOWN)

@mock.patch('NEMO.interlocks.requests.get', side_effect=mocked_requests_get)
@mock.patch("NEMO.interlocks.requests.get", side_effect=mocked_requests_get)
def test_wrong_response_from_interlock(self, mock_args):
self.assertFalse(wrong_response_interlock.unlock())
self.assertTrue('General exception' in wrong_response_interlock.most_recent_reply)
self.assertTrue('syntax error' in wrong_response_interlock.most_recent_reply)
self.assertTrue("General exception" in wrong_response_interlock.most_recent_reply)
self.assertTrue("syntax error" in wrong_response_interlock.most_recent_reply)
self.assertEqual(wrong_response_interlock.state, Interlock.State.UNKNOWN)

@mock.patch('NEMO.interlocks.requests.get', side_effect=mocked_requests_get)
@mock.patch("NEMO.interlocks.requests.get", side_effect=mocked_requests_get)
def test_wrong_state_from_interlock(self, mock_args):
self.assertFalse(wrong_state_interlock.unlock())
self.assertTrue('General exception' in wrong_state_interlock.most_recent_reply)
self.assertTrue('Unexpected state received' in wrong_state_interlock.most_recent_reply)
self.assertTrue("General exception" in wrong_state_interlock.most_recent_reply)
self.assertTrue("Unexpected state received" in wrong_state_interlock.most_recent_reply)
self.assertEqual(wrong_state_interlock.state, Interlock.State.UNKNOWN)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='NEMO',
version='4.4.0',
version='4.4.1',
python_requires='>=3.8',
packages=find_namespace_packages(exclude=['NEMO.tests', 'NEMO.tests.*']),
include_package_data=True,
Expand Down

0 comments on commit 2952f55

Please sign in to comment.