Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Use CancellableReader for PW RPC clients #29567

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/test_driver/efr32/py/nl_test_runner/nl_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Any

import serial # type: ignore
from pw_hdlc.rpc import HdlcRpcClient, default_channels, write_to_file
from pw_hdlc import rpc

# RPC Protos
from nl_test_service import nl_test_pb2 # isort:skip
Expand Down Expand Up @@ -81,10 +81,10 @@ def flash_device(device: str, flash_image: str, **kwargs):
def get_hdlc_rpc_client(device: str, baudrate: int, output: Any, **kwargs):
"""Get the HdlcRpcClient based on arguments."""
serial_device = serial.Serial(device, baudrate, timeout=1)
def read(): return serial_device.read(8192)
reader = rpc.SerialReader(serial_device, 8192)
write = serial_device.write
return HdlcRpcClient(read, PROTOS, default_channels(write),
lambda data: write_to_file(data, output))
return rpc.HdlcRpcClient(reader, PROTOS, rpc.default_channels(write),
lambda data: rpc.write_to_file(data, output))


def runner(client) -> int:
Expand Down Expand Up @@ -133,8 +133,8 @@ def main() -> int:
if args.flash_image:
flash_device(**vars(args))
time.sleep(1) # Give time for device to boot
client = get_hdlc_rpc_client(**vars(args))
return runner(client)
with get_hdlc_rpc_client(**vars(args)) as client:
return runner(client)


if __name__ == '__main__':
Expand Down
18 changes: 15 additions & 3 deletions src/test_driver/mbed/integration_tests/common/pigweed_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pw_hdlc.rpc import HdlcRpcClient, default_channels
from pw_hdlc import rpc


class PigweedClient:
Expand All @@ -28,8 +28,11 @@ def __init__(self, device, protos):
self.device.stop()
self.last_timeout = self.device.serial.get_timeout()
self.device.serial.set_timeout(0.01)
self._pw_rpc_client = HdlcRpcClient(lambda: self.device.serial.read(4096),
protos, default_channels(self.device.serial.write))
reader = rpc.Serialreader(self.device.serial, 4096)
self._pw_rpc_client = rpc.HdlcRpcClient(
reader,
protos,
rpc.default_channels(self.device.serial.write))
self._rpcs = self._pw_rpc_client.rpcs()

def __del__(self):
Expand All @@ -39,3 +42,12 @@ def __del__(self):
@property
def rpcs(self):
return self._rpcs

def __enter__(self) -> 'PigweedClient':
return self

def __exit__(self, *exc_info) -> None:
self.stop()

def stop(self) -> None:
self._pw_rpc_client.stop()
112 changes: 57 additions & 55 deletions src/test_driver/mbed/integration_tests/lighting-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,77 +146,79 @@ def test_light_ctrl(device, network):


def test_device_info_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None

device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0
device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0

assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code
assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code


def test_device_factory_reset_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True


def test_device_reboot_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED


def test_device_ota_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED


def test_ligth_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check light on
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is True
# Check light on
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is True

# Check light off
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is False
# Check light off
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is False


def test_button_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)

# Check button 0 (lighting)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
initial_state = bool(payload.on)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check button 0 (lighting)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
initial_state = bool(payload.on)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state
112 changes: 57 additions & 55 deletions src/test_driver/mbed/integration_tests/lock-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,77 +136,79 @@ def test_lock_ctrl(device, network):


def test_device_info_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None

device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0
device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0

assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code
assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code


def test_device_factory_reset_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True


def test_device_reboot_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED


def test_device_ota_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED


def test_lock_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check locked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is True
# Check locked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is True

# Check unlocked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is False
# Check unlocked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is False


def test_button_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)

# Check button 0 (locking)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
initial_state = bool(payload.locked)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked == compare_state
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check button 0 (locking)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
initial_state = bool(payload.locked)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked == compare_state
10 changes: 5 additions & 5 deletions src/test_driver/mbed/integration_tests/pigweed-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def test_smoke_test(device):


def test_echo(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo(
msg=PW_ECHO_TEST_MESSAGE)
assert status.ok() is True
assert payload.msg == PW_ECHO_TEST_MESSAGE
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo(
msg=PW_ECHO_TEST_MESSAGE)
assert status.ok() is True
assert payload.msg == PW_ECHO_TEST_MESSAGE
Loading