From f74cc4301c26a65a460b101aecda6d271153addb Mon Sep 17 00:00:00 2001 From: Philipp Kerling Date: Fri, 14 Feb 2020 12:28:31 +0100 Subject: [PATCH 1/2] read/readwrite: Add option not to check data size Some structures (such as SAdsSymbolEntry) are dynamically sized, so the length cannot be checked with the read data size. Allow the user to disable length checking on request. --- pyads/ads.py | 62 +++++++++++++++++++++++++++++++++++++++-------- pyads/pyads_ex.py | 49 +++++++++++++++++++++++++++++++------ 2 files changed, 93 insertions(+), 18 deletions(-) diff --git a/pyads/ads.py b/pyads/ads.py index 90a2fc20..6586983e 100644 --- a/pyads/ads.py +++ b/pyads/ads.py @@ -248,8 +248,9 @@ def read_write( value, plc_write_datatype, return_ctypes=False, + check_length=True, ): - # type: (AmsAddr, int, int, Type, Any, Type, bool) -> Any + # type: (AmsAddr, int, int, Type, Any, Type, bool, bool) -> Any """Read and write data synchronous from/to an ADS-device. :param AmsAddr adr: local or remote AmsAddr @@ -263,6 +264,8 @@ def read_write( PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) :rtype: PLCTYPE :return: value: **value** @@ -277,13 +280,16 @@ def read_write( value, plc_write_datatype, return_ctypes, + check_length, ) return None -def read(adr, index_group, index_offset, plc_datatype, return_ctypes=False): - # type: (AmsAddr, int, int, Type, bool) -> Any +def read( + adr, index_group, index_offset, plc_datatype, return_ctypes=False, check_length=True +): + # type: (AmsAddr, int, int, Type, bool, bool) -> Any """Read data synchronous from an ADS-device. :param AmsAddr adr: local or remote AmsAddr @@ -294,18 +300,26 @@ def read(adr, index_group, index_offset, plc_datatype, return_ctypes=False): PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) :return: value: **value** """ if port is not None: return adsSyncReadReqEx2( - port, adr, index_group, index_offset, plc_datatype, return_ctypes + port, + adr, + index_group, + index_offset, + plc_datatype, + return_ctypes, + check_length, ) return None -def read_by_name(adr, data_name, plc_datatype, return_ctypes=False): +def read_by_name(adr, data_name, plc_datatype, return_ctypes=False, check_length=True): # type: (AmsAddr, str, Type, bool) -> Any """Read data synchronous from an ADS-device from data name. @@ -315,11 +329,15 @@ def read_by_name(adr, data_name, plc_datatype, return_ctypes=False): PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) :return: value: **value** """ if port is not None: - return adsSyncReadByNameEx(port, adr, data_name, plc_datatype, return_ctypes) + return adsSyncReadByNameEx( + port, adr, data_name, plc_datatype, return_ctypes, check_length=check_length + ) return None @@ -724,8 +742,9 @@ def read_write( value, plc_write_datatype, return_ctypes=False, + check_length=True, ): - # type: (int, int, Type, Any, Type, bool) -> Any + # type: (int, int, Type, Any, Type, bool, bool) -> Any """Read and write data synchronous from/to an ADS-device. :param int index_group: PLC storage area, according to the INDEXGROUP @@ -739,6 +758,8 @@ def read_write( :rtype: PLCTYPE :param bool return_ctypes: return ctypes instead of python types if True (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) :return: value: **value** """ @@ -752,12 +773,20 @@ def read_write( value, plc_write_datatype, return_ctypes, + check_length, ) return None - def read(self, index_group, index_offset, plc_datatype, return_ctypes=False): - # type: (int, int, Type, bool) -> Any + def read( + self, + index_group, + index_offset, + plc_datatype, + return_ctypes=False, + check_length=True, + ): + # type: (int, int, Type, bool, bool) -> Any """Read data synchronous from an ADS-device. :param int index_group: PLC storage area, according to the INDEXGROUP @@ -768,6 +797,8 @@ def read(self, index_group, index_offset, plc_datatype, return_ctypes=False): :return: value: **value** :param bool return_ctypes: return ctypes instead of python types if True (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) """ if self._port is not None: @@ -778,6 +809,7 @@ def read(self, index_group, index_offset, plc_datatype, return_ctypes=False): index_offset, plc_datatype, return_ctypes, + check_length, ) return None @@ -806,7 +838,14 @@ def release_handle(self, handle): if self._port is not None: adsReleaseHandle(self._port, self._adr, handle) - def read_by_name(self, data_name, plc_datatype, return_ctypes=False, handle=None): + def read_by_name( + self, + data_name, + plc_datatype, + return_ctypes=False, + handle=None, + check_length=True, + ): # type: (str, Type, bool, int) -> Any """Read data synchronous from an ADS-device from data name. @@ -818,6 +857,8 @@ def read_by_name(self, data_name, plc_datatype, return_ctypes=False, handle=None (default: False) :param int handle: PLC-variable handle, pass in handle if previously obtained to speed up reading (default: None) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) """ if self._port: @@ -828,6 +869,7 @@ def read_by_name(self, data_name, plc_datatype, return_ctypes=False, handle=None plc_datatype, return_ctypes=return_ctypes, handle=handle, + check_length=check_length, ) return None diff --git a/pyads/pyads_ex.py b/pyads/pyads_ex.py index 7818c6a8..601ad5dd 100644 --- a/pyads/pyads_ex.py +++ b/pyads/pyads_ex.py @@ -502,8 +502,9 @@ def adsSyncReadWriteReqEx2( value, write_data_type, return_ctypes=False, + check_length=True, ): - # type: (int, AmsAddr, int, int, Type, Any, Type, bool) -> Any + # type: (int, AmsAddr, int, int, Type, Any, Type, bool, bool) -> Any """Read and write data synchronous from/to an ADS-device. :param int port: local AMS port as returned by adsPortOpenEx() @@ -518,6 +519,8 @@ def adsSyncReadWriteReqEx2( PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) :rtype: read_data_type :return: value: value read from PLC @@ -571,7 +574,11 @@ def adsSyncReadWriteReqEx2( # If we're reading a value of predetermined size (anything but a string), # validate that the correct number of bytes were read - if read_data_type != PLCTYPE_STRING and bytes_read.value != read_length.value: + if ( + check_length + and read_data_type != PLCTYPE_STRING + and bytes_read.value != read_length.value + ): raise RuntimeError( "Insufficient data (expected {0} bytes, {1} were read).".format( read_length.value, bytes_read.value @@ -594,9 +601,15 @@ def adsSyncReadWriteReqEx2( def adsSyncReadReqEx2( - port, address, index_group, index_offset, data_type, return_ctypes=False + port, + address, + index_group, + index_offset, + data_type, + return_ctypes=False, + check_length=True, ): - # type: (int, AmsAddr, int, int, Type, bool) -> Any + # type: (int, AmsAddr, int, int, Type, bool, bool) -> Any """Read data synchronous from an ADS-device. :param int port: local AMS port as returned by adsPortOpenEx() @@ -608,6 +621,8 @@ def adsSyncReadReqEx2( PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) :rtype: data_type :return: value: **value** @@ -644,7 +659,11 @@ def adsSyncReadReqEx2( # If we're reading a value of predetermined size (anything but a string), # validate that the correct number of bytes were read - if data_type != PLCTYPE_STRING and bytes_read.value != data_length.value: + if ( + check_length + and data_type != PLCTYPE_STRING + and bytes_read.value != data_length.value + ): raise RuntimeError( "Insufficient data (expected {0} bytes, {1} were read).".format( data_length.value, bytes_read.value @@ -701,9 +720,15 @@ def adsReleaseHandle(port, address, handle): def adsSyncReadByNameEx( - port, address, data_name, data_type, return_ctypes=False, handle=None + port, + address, + data_name, + data_type, + return_ctypes=False, + handle=None, + check_length=True, ): - # type: (int, AmsAddr, str, Type, bool, int) -> Any + # type: (int, AmsAddr, str, Type, bool, int, bool) -> Any """Read data synchronous from an ADS-device from data name. :param int port: local AMS port as returned by adsPortOpenEx() @@ -714,6 +739,8 @@ def adsSyncReadByNameEx( :param bool return_ctypes: return ctypes instead of python types if True (default: False) :param int handle: PLC-variable handle (default: None) + :param bool check_length: check whether the amount of bytes read matches the size + of the read data type (default: True) :rtype: data_type :return: value: **value** @@ -726,7 +753,13 @@ def adsSyncReadByNameEx( # Read the value of a PLC-variable, via handle value = adsSyncReadReqEx2( - port, address, ADSIGRP_SYM_VALBYHND, handle, data_type, return_ctypes + port, + address, + ADSIGRP_SYM_VALBYHND, + handle, + data_type, + return_ctypes, + check_length, ) if no_handle is True: From 92de642a6ca92b730fbe676ef933f1893818a0fd Mon Sep 17 00:00:00 2001 From: Philipp Kerling Date: Thu, 5 Mar 2020 14:12:12 +0100 Subject: [PATCH 2/2] Add tests for check_length on read --- pyads/testserver.py | 4 +++ tests/test_integration.py | 71 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/pyads/testserver.py b/pyads/testserver.py index 7aebad08..6684a2ec 100644 --- a/pyads/testserver.py +++ b/pyads/testserver.py @@ -485,6 +485,10 @@ class AdvancedHandler(AbstractHandler): """ def __init__(self): + # type: () -> None + self.reset() + + def reset(self): # type: () -> None self._data = defaultdict( lambda: bytes(16) diff --git a/tests/test_integration.py b/tests/test_integration.py index dc68db4a..0b72ed5f 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -14,7 +14,7 @@ from pyads import ads, constants from pyads.utils import platform_is_linux from pyads.structs import AmsAddr, NotificationAttrib -from pyads.testserver import AdsTestServer +from pyads.testserver import AdsTestServer, AdvancedHandler # These are pretty arbitrary @@ -468,5 +468,74 @@ def callback(handle, name, timestamp, value): plc.write_by_name("a", 1, pyads.PLCTYPE_INT) +class AdsApiTestCaseAdvanced(TestCase): + @classmethod + def setUpClass(cls): + # Start dummy ADS Endpoint + cls.test_server = AdsTestServer(AdvancedHandler(), logging=False) + cls.test_server.start() + + # Endpoint AMS Address + cls.endpoint = AmsAddr(TEST_SERVER_AMS_NET_ID, TEST_SERVER_AMS_PORT) + + # Open AMS Port + ads.open_port() + + # wait a bit otherwise error might occur + time.sleep(1) + + # NOTE: On a Windows machine, this route needs to be configured + # within the router service for the tests to work. + if platform_is_linux(): + ads.add_route(cls.endpoint, TEST_SERVER_IP_ADDRESS) + + @classmethod + def tearDownClass(cls): + cls.test_server.stop() + + # wait a bit for server to shutdown + time.sleep(1) + + ads.close_port() + + if platform_is_linux(): + ads.delete_route(cls.endpoint) + + def setUp(self): + # Clear request history before each test + self.test_server.request_history = [] + self.test_server.handler.reset() + + def test_read_check_length(self): + # Write data shorter than what should be read + ads.write( + self.endpoint, + value=1, + index_group=constants.INDEXGROUP_DATA, + index_offset=1, + plc_datatype=constants.PLCTYPE_USINT, + ) + + with self.assertRaises(RuntimeError): + # Since the length is checked, this must give an error + ads.read( + self.endpoint, + index_group=constants.INDEXGROUP_DATA, + index_offset=1, + plc_datatype=constants.PLCTYPE_UINT, + check_length=True, + ) + + # If the length is not checked, no error should be raised + value = ads.read( + self.endpoint, + index_group=constants.INDEXGROUP_DATA, + index_offset=1, + plc_datatype=constants.PLCTYPE_UINT, + check_length=False, + ) + self.assertEqual(value, 1) + + if __name__ == "__main__": unittest.main()