From 52bf44c3826d30fcaa485822da6e0af9b4108bf5 Mon Sep 17 00:00:00 2001 From: Carlos Chinchilla Date: Wed, 4 Oct 2023 12:35:36 -0700 Subject: [PATCH] tests: Use CancellableReader for PW RPC clients The CancellableReader provides clean teardown of read threads at exit. Switching using readers and RpcClients to context statements makes sure the teardown is done. This comes from a recent change in Pigweed's RPC Python library. --- .../efr32/py/nl_test_runner/nl_test_runner.py | 10 +- .../common/pigweed_client.py | 16 ++- .../lighting-app/test_app.py | 112 +++++++++--------- .../integration_tests/lock-app/test_app.py | 112 +++++++++--------- .../integration_tests/pigweed-app/test_app.py | 10 +- 5 files changed, 138 insertions(+), 122 deletions(-) diff --git a/src/test_driver/efr32/py/nl_test_runner/nl_test_runner.py b/src/test_driver/efr32/py/nl_test_runner/nl_test_runner.py index b4f096afed100d..16a0ceb401e85a 100644 --- a/src/test_driver/efr32/py/nl_test_runner/nl_test_runner.py +++ b/src/test_driver/efr32/py/nl_test_runner/nl_test_runner.py @@ -81,10 +81,10 @@ def flash_device(device: str, flash_image: str, **kwargs): def get_hdlc_rpc_client(device: str, baudrate: int, output: Any, **kwargs): """Get the HdlcRpcClient based on arguments.""" serial_device = serial.Serial(device, baudrate, timeout=1) - def read(): return serial_device.read(8192) + reader = rpc.SerialReader(serial_device, 8192) write = serial_device.write - return HdlcRpcClient(read, PROTOS, default_channels(write), - lambda data: write_to_file(data, output)) + return rpc.HdlcRpcClient(reader, PROTOS, rpc.default_channels(write), + lambda data: rpc.write_to_file(data, output)) def runner(client) -> int: @@ -133,8 +133,8 @@ def main() -> int: if args.flash_image: flash_device(**vars(args)) time.sleep(1) # Give time for device to boot - client = get_hdlc_rpc_client(**vars(args)) - return runner(client) + with get_hdlc_rpc_client(**vars(args)) as client: + return runner(client) if __name__ == '__main__': diff --git a/src/test_driver/mbed/integration_tests/common/pigweed_client.py b/src/test_driver/mbed/integration_tests/common/pigweed_client.py index 9d7d6a341a1613..d2caf73e520f3d 100644 --- a/src/test_driver/mbed/integration_tests/common/pigweed_client.py +++ b/src/test_driver/mbed/integration_tests/common/pigweed_client.py @@ -28,8 +28,11 @@ def __init__(self, device, protos): self.device.stop() self.last_timeout = self.device.serial.get_timeout() self.device.serial.set_timeout(0.01) - self._pw_rpc_client = HdlcRpcClient(lambda: self.device.serial.read(4096), - protos, default_channels(self.device.serial.write)) + reader = rpc.Serialreader(self.device.serial, 4096) + self._pw_rpc_client = rpc.HdlcRpcClient( + reader, + protos, + rpc.default_channels(self.device.serial.write)) self._rpcs = self._pw_rpc_client.rpcs() def __del__(self): @@ -39,3 +42,12 @@ def __del__(self): @property def rpcs(self): return self._rpcs + + def __enter__(self) -> 'PigweedClient': + return self + + def __exit__(self, *exc_info) -> None: + self.stop() + + def stop(self) -> None: + self._pw_rpc_client.stop() diff --git a/src/test_driver/mbed/integration_tests/lighting-app/test_app.py b/src/test_driver/mbed/integration_tests/lighting-app/test_app.py index 67041a6720b813..7c302fad32ae57 100644 --- a/src/test_driver/mbed/integration_tests/lighting-app/test_app.py +++ b/src/test_driver/mbed/integration_tests/lighting-app/test_app.py @@ -146,77 +146,79 @@ def test_light_ctrl(device, network): def test_device_info_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo() - assert status.ok() is True - assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo() + assert status.ok() is True + assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None - device_details = get_device_details(device) - assert device_details is not None and len(device_details) != 0 + device_details = get_device_details(device) + assert device_details is not None and len(device_details) != 0 - assert int(device_details["VendorID"]) == payload.vendor_id - assert int(device_details["ProductID"]) == payload.product_id - assert int(device_details["Discriminator"] - ) == payload.pairing_info.discriminator - assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code + assert int(device_details["VendorID"]) == payload.vendor_id + assert int(device_details["ProductID"]) == payload.product_id + assert int(device_details["Discriminator"] + ) == payload.pairing_info.discriminator + assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code def test_device_factory_reset_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset() - assert status.ok() is True + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset() + assert status.ok() is True def test_device_reboot_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.Reboot() - assert status == Status.UNIMPLEMENTED + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.Reboot() + assert status == Status.UNIMPLEMENTED def test_device_ota_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta() - assert status == Status.UNIMPLEMENTED + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta() + assert status == Status.UNIMPLEMENTED def test_ligth_ctrl_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) + with PigweedClient(device, RPC_PROTOS) as pw_client: - # Check light on - status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True) - assert status.ok() is True - status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() - assert status.ok() is True - assert payload.on is True + # Check light on + status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True) + assert status.ok() is True + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() is True + assert payload.on is True - # Check light off - status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False) - assert status.ok() is True - status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() - assert status.ok() is True - assert payload.on is False + # Check light off + status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False) + assert status.ok() is True + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() is True + assert payload.on is False def test_button_ctrl_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - - # Check button 0 (lighting) - status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() - assert status.ok() is True - initial_state = bool(payload.on) - - compare_state = not initial_state - status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) - assert status.ok() is True - sleep(2) - status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() - assert status.ok() is True - assert payload.on == compare_state - - compare_state = initial_state - status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) - assert status.ok() is True - sleep(2) - status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() - assert status.ok() is True - assert payload.on == compare_state + with PigweedClient(device, RPC_PROTOS) as pw_client: + + # Check button 0 (lighting) + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() is True + initial_state = bool(payload.on) + + compare_state = not initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, + pushed=True) + assert status.ok() is True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() is True + assert payload.on == compare_state + + compare_state = initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, + pushed=True) + assert status.ok() is True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() is True + assert payload.on == compare_state diff --git a/src/test_driver/mbed/integration_tests/lock-app/test_app.py b/src/test_driver/mbed/integration_tests/lock-app/test_app.py index 0592846571d5ee..ffe86d7a369615 100644 --- a/src/test_driver/mbed/integration_tests/lock-app/test_app.py +++ b/src/test_driver/mbed/integration_tests/lock-app/test_app.py @@ -136,77 +136,79 @@ def test_lock_ctrl(device, network): def test_device_info_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo() - assert status.ok() is True - assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo() + assert status.ok() is True + assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None - device_details = get_device_details(device) - assert device_details is not None and len(device_details) != 0 + device_details = get_device_details(device) + assert device_details is not None and len(device_details) != 0 - assert int(device_details["VendorID"]) == payload.vendor_id - assert int(device_details["ProductID"]) == payload.product_id - assert int(device_details["Discriminator"] - ) == payload.pairing_info.discriminator - assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code + assert int(device_details["VendorID"]) == payload.vendor_id + assert int(device_details["ProductID"]) == payload.product_id + assert int(device_details["Discriminator"] + ) == payload.pairing_info.discriminator + assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code def test_device_factory_reset_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset() - assert status.ok() is True + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset() + assert status.ok() is True def test_device_reboot_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.Reboot() - assert status == Status.UNIMPLEMENTED + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.Reboot() + assert status == Status.UNIMPLEMENTED def test_device_ota_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta() - assert status == Status.UNIMPLEMENTED + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta() + assert status == Status.UNIMPLEMENTED def test_lock_ctrl_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) + with PigweedClient(device, RPC_PROTOS) as pw_client: - # Check locked - status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True) - assert status.ok() is True - status, payload = pw_client.rpcs.chip.rpc.Locking.Get() - assert status.ok() is True - assert payload.locked is True + # Check locked + status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True) + assert status.ok() is True + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() is True + assert payload.locked is True - # Check unlocked - status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False) - assert status.ok() is True - status, payload = pw_client.rpcs.chip.rpc.Locking.Get() - assert status.ok() is True - assert payload.locked is False + # Check unlocked + status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False) + assert status.ok() is True + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() is True + assert payload.locked is False def test_button_ctrl_rpc(device): - pw_client = PigweedClient(device, RPC_PROTOS) - - # Check button 0 (locking) - status, payload = pw_client.rpcs.chip.rpc.Locking.Get() - assert status.ok() is True - initial_state = bool(payload.locked) - - compare_state = not initial_state - status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) - assert status.ok() is True - sleep(2) - status, payload = pw_client.rpcs.chip.rpc.Locking.Get() - assert status.ok() is True - assert payload.locked is compare_state - - compare_state = initial_state - status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) - assert status.ok() is True - sleep(2) - status, payload = pw_client.rpcs.chip.rpc.Locking.Get() - assert status.ok() is True - assert payload.locked == compare_state + with PigweedClient(device, RPC_PROTOS) as pw_client: + + # Check button 0 (locking) + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() is True + initial_state = bool(payload.locked) + + compare_state = not initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, + pushed=True) + assert status.ok() is True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() is True + assert payload.locked is compare_state + + compare_state = initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, + pushed=True) + assert status.ok() is True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() is True + assert payload.locked == compare_state diff --git a/src/test_driver/mbed/integration_tests/pigweed-app/test_app.py b/src/test_driver/mbed/integration_tests/pigweed-app/test_app.py index 4d14d479ed1db1..e9273848e54b40 100644 --- a/src/test_driver/mbed/integration_tests/pigweed-app/test_app.py +++ b/src/test_driver/mbed/integration_tests/pigweed-app/test_app.py @@ -31,8 +31,8 @@ def test_smoke_test(device): def test_echo(device): - pw_client = PigweedClient(device, RPC_PROTOS) - status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo( - msg=PW_ECHO_TEST_MESSAGE) - assert status.ok() is True - assert payload.msg == PW_ECHO_TEST_MESSAGE + with PigweedClient(device, RPC_PROTOS) as pw_client: + status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo( + msg=PW_ECHO_TEST_MESSAGE) + assert status.ok() is True + assert payload.msg == PW_ECHO_TEST_MESSAGE