Skip to content

Commit

Permalink
tests: Use CancellableReader for PW RPC clients
Browse files Browse the repository at this point in the history
The CancellableReader provides clean teardown of read threads at exit.
Switching using readers and RpcClients to context statements makes
sure the teardown is done. This comes from a recent change in Pigweed's
RPC Python library.
  • Loading branch information
ChinchillaWithGoggles committed Oct 4, 2023
1 parent 73c327e commit 52bf44c
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 122 deletions.
10 changes: 5 additions & 5 deletions src/test_driver/efr32/py/nl_test_runner/nl_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def flash_device(device: str, flash_image: str, **kwargs):
def get_hdlc_rpc_client(device: str, baudrate: int, output: Any, **kwargs):
"""Get the HdlcRpcClient based on arguments."""
serial_device = serial.Serial(device, baudrate, timeout=1)
def read(): return serial_device.read(8192)
reader = rpc.SerialReader(serial_device, 8192)
write = serial_device.write
return HdlcRpcClient(read, PROTOS, default_channels(write),
lambda data: write_to_file(data, output))
return rpc.HdlcRpcClient(reader, PROTOS, rpc.default_channels(write),
lambda data: rpc.write_to_file(data, output))


def runner(client) -> int:
Expand Down Expand Up @@ -133,8 +133,8 @@ def main() -> int:
if args.flash_image:
flash_device(**vars(args))
time.sleep(1) # Give time for device to boot
client = get_hdlc_rpc_client(**vars(args))
return runner(client)
with get_hdlc_rpc_client(**vars(args)) as client:
return runner(client)


if __name__ == '__main__':
Expand Down
16 changes: 14 additions & 2 deletions src/test_driver/mbed/integration_tests/common/pigweed_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ def __init__(self, device, protos):
self.device.stop()
self.last_timeout = self.device.serial.get_timeout()
self.device.serial.set_timeout(0.01)
self._pw_rpc_client = HdlcRpcClient(lambda: self.device.serial.read(4096),
protos, default_channels(self.device.serial.write))
reader = rpc.Serialreader(self.device.serial, 4096)
self._pw_rpc_client = rpc.HdlcRpcClient(
reader,
protos,
rpc.default_channels(self.device.serial.write))
self._rpcs = self._pw_rpc_client.rpcs()

def __del__(self):
Expand All @@ -39,3 +42,12 @@ def __del__(self):
@property
def rpcs(self):
return self._rpcs

def __enter__(self) -> 'PigweedClient':
return self

def __exit__(self, *exc_info) -> None:
self.stop()

def stop(self) -> None:
self._pw_rpc_client.stop()
112 changes: 57 additions & 55 deletions src/test_driver/mbed/integration_tests/lighting-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,77 +146,79 @@ def test_light_ctrl(device, network):


def test_device_info_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None

device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0
device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0

assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code
assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code


def test_device_factory_reset_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True


def test_device_reboot_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED


def test_device_ota_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED


def test_ligth_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check light on
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is True
# Check light on
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is True

# Check light off
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is False
# Check light off
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is False


def test_button_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)

# Check button 0 (lighting)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
initial_state = bool(payload.on)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check button 0 (lighting)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
initial_state = bool(payload.on)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state
112 changes: 57 additions & 55 deletions src/test_driver/mbed/integration_tests/lock-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,77 +136,79 @@ def test_lock_ctrl(device, network):


def test_device_info_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None

device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0
device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0

assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code
assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code


def test_device_factory_reset_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True


def test_device_reboot_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED


def test_device_ota_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED


def test_lock_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check locked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is True
# Check locked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is True

# Check unlocked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is False
# Check unlocked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is False


def test_button_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)

# Check button 0 (locking)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
initial_state = bool(payload.locked)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked == compare_state
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check button 0 (locking)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
initial_state = bool(payload.locked)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked == compare_state
10 changes: 5 additions & 5 deletions src/test_driver/mbed/integration_tests/pigweed-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def test_smoke_test(device):


def test_echo(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo(
msg=PW_ECHO_TEST_MESSAGE)
assert status.ok() is True
assert payload.msg == PW_ECHO_TEST_MESSAGE
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo(
msg=PW_ECHO_TEST_MESSAGE)
assert status.ok() is True
assert payload.msg == PW_ECHO_TEST_MESSAGE

0 comments on commit 52bf44c

Please sign in to comment.