From 221419729db83bc236ff2490a42a19a2472329e8 Mon Sep 17 00:00:00 2001 From: Artur Tynecki <77382963+ATmobica@users.noreply.github.com> Date: Tue, 30 Nov 2021 04:28:01 +0100 Subject: [PATCH] [Mbed] Add integration tests for example applications (#12039) * Move mbed unit test to test_driver/mbed/unit_test directory Add integration test dir and files Cahnge unit test path in launch.json, task.json and mbed_unit_tests.sh * Add pytest and mbed-ls to pigweed env * Fix realpath in unit_test Cmake file * Improve serial data parsing Update lock and lightings apps smoke tests * Add network fixture Add utlis file with common ble and wifi functions Add wifi provisioning test * Remove device_controller fixture is singleton al all Update wifi provisioning tests * Move unit test README file to right directory Add Pigweed-app integration test and pigweed client * Add shell app tests Improve unit tests check * Add RPC tests to lock and lighting apps * Add wlan access point class - run Wi-Fi AP Add access point as fixture Use AP in shell wifi connect test * Fix wlan AP * Improve BoardAllocator - checking connected device Improve unit-test checking * Add mount new mbed device script * Add close ble connection * Add wait for wifi connection log and close ble to wifi provisioning tests * Remove serial device warnings * Improve lock-app test and refactor to get more common functions * Improve lighting-app tests Cleanup lock-app and shell tests * Lock, ligthing and shell apps tests improvements Change log sfor ZCL command send * Changes restyle Remove redundant changes Remove acces point class - not ready to merge Code cleanup * Add integration tests README file Improve relative paths in unit test README file * Improve spelling * Add new .wordlist exceptions * Restore 40s timeout for session establishment in device controller * Add resolve step in wifi_provisioning tests * Improve lock and light control tests add wifi provisioning and resolve node steps * Changes restyle --- .github/.wordlist.txt | 5 + .vscode/launch.json | 8 +- .vscode/tasks.json | 2 +- scripts/constraints.txt | 86 ++++- scripts/requirements.mbed.txt | 3 +- scripts/tests/mbed/mbed_unit_tests.sh | 2 +- scripts/tests/mbed/mount_new_mbed_device.sh | 36 +++ .../mbed/integration_tests/README.md | 234 ++++++++++++++ .../mbed/integration_tests/common/__init__.py | 0 .../mbed/integration_tests/common/device.py | 118 +++++++ .../mbed/integration_tests/common/fixtures.py | 137 ++++++++ .../common/pigweed_client.py | 45 +++ .../common/serial_connection.py | 128 ++++++++ .../integration_tests/common/serial_device.py | 119 +++++++ .../mbed/integration_tests/common/utils.py | 295 ++++++++++++++++++ .../mbed/integration_tests/conftest.py | 34 ++ .../lighting-app/__init__.py | 0 .../lighting-app/test_app.py | 223 +++++++++++++ .../integration_tests/lock-app/__init__.py | 0 .../integration_tests/lock-app/test_app.py | 214 +++++++++++++ .../integration_tests/pigweed-app/__init__.py | 0 .../integration_tests/pigweed-app/test_app.py | 39 +++ .../mbed/integration_tests/pytest.ini | 7 + .../mbed/integration_tests/shell/__init__.py | 0 .../mbed/integration_tests/shell/test_app.py | 239 ++++++++++++++ .../mbed/integration_tests/test_set.in | 5 + .../integration_tests/unit-tests/test_app.py | 33 ++ .../mbed/{ => unit_tests}/.gitignore | 0 .../mbed/{ => unit_tests}/CMakeLists.txt | 2 +- .../mbed/{ => unit_tests}/README.md | 6 +- .../mbed/{ => unit_tests}/config.in | 0 .../main/include/CHIPProjectConfig.h | 0 .../mbed/{ => unit_tests}/main/main.cpp | 0 .../mbed/{ => unit_tests}/mbed_app.json | 0 34 files changed, 1994 insertions(+), 26 deletions(-) create mode 100644 scripts/tests/mbed/mount_new_mbed_device.sh create mode 100644 src/test_driver/mbed/integration_tests/README.md create mode 100644 src/test_driver/mbed/integration_tests/common/__init__.py create mode 100644 src/test_driver/mbed/integration_tests/common/device.py create mode 100644 src/test_driver/mbed/integration_tests/common/fixtures.py create mode 100644 src/test_driver/mbed/integration_tests/common/pigweed_client.py create mode 100644 src/test_driver/mbed/integration_tests/common/serial_connection.py create mode 100644 src/test_driver/mbed/integration_tests/common/serial_device.py create mode 100644 src/test_driver/mbed/integration_tests/common/utils.py create mode 100644 src/test_driver/mbed/integration_tests/conftest.py create mode 100644 src/test_driver/mbed/integration_tests/lighting-app/__init__.py create mode 100644 src/test_driver/mbed/integration_tests/lighting-app/test_app.py create mode 100644 src/test_driver/mbed/integration_tests/lock-app/__init__.py create mode 100644 src/test_driver/mbed/integration_tests/lock-app/test_app.py create mode 100644 src/test_driver/mbed/integration_tests/pigweed-app/__init__.py create mode 100644 src/test_driver/mbed/integration_tests/pigweed-app/test_app.py create mode 100644 src/test_driver/mbed/integration_tests/pytest.ini create mode 100644 src/test_driver/mbed/integration_tests/shell/__init__.py create mode 100644 src/test_driver/mbed/integration_tests/shell/test_app.py create mode 100644 src/test_driver/mbed/integration_tests/test_set.in create mode 100644 src/test_driver/mbed/integration_tests/unit-tests/test_app.py rename src/test_driver/mbed/{ => unit_tests}/.gitignore (100%) rename src/test_driver/mbed/{ => unit_tests}/CMakeLists.txt (98%) rename src/test_driver/mbed/{ => unit_tests}/README.md (97%) rename src/test_driver/mbed/{ => unit_tests}/config.in (100%) rename src/test_driver/mbed/{ => unit_tests}/main/include/CHIPProjectConfig.h (100%) rename src/test_driver/mbed/{ => unit_tests}/main/main.cpp (100%) rename src/test_driver/mbed/{ => unit_tests}/mbed_app.json (100%) diff --git a/.github/.wordlist.txt b/.github/.wordlist.txt index 9e5d03248c6df2..11ff1b84cd8831 100644 --- a/.github/.wordlist.txt +++ b/.github/.wordlist.txt @@ -51,6 +51,7 @@ armeabi ARMmbed armv asdk +AssertionError ASYNC att attId @@ -387,6 +388,7 @@ FOTA FreeRTOS FreeRTOSConfig fsl +fstab fsync fullclean gcloud @@ -768,6 +770,7 @@ PyFunction pylint PyObject PyRun +pytest QEMU Qorvo QPG @@ -781,6 +784,7 @@ qvCHIP RADVD raspberryPi RasPi +rAv RCP ReadConfigValue readelf @@ -866,6 +870,7 @@ SLAAC SLTB SLWSTK SmartThings +smoketest SMP socat socio diff --git a/.vscode/launch.json b/.vscode/launch.json index 147053871024d8..f89d0eaf3c462b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -192,7 +192,7 @@ "name": "Debug Mbed unit tests", "type": "cortex-debug", "request": "launch", - "cwd": "${workspaceRoot}/src/test_driver/mbed", + "cwd": "${workspaceRoot}/src/test_driver/mbed/unit_tests", "executable": "./build-${input:mbedTarget}/${input:mbedDebugProfile}/chip-mbed-unit-tests.elf", "armToolchainPath": "${env:PW_ENVIRONMENT_ROOT}/cipd/pigweed/bin/", // Pigweed environment bootstraping required "servertype": "openocd", @@ -225,7 +225,7 @@ "name": "Debug Mbed unit tests [remote]", "type": "cortex-debug", "request": "launch", - "cwd": "${workspaceRoot}/src/test_driver/mbed", + "cwd": "${workspaceRoot}/src/test_driver/mbed/unit_tests", "executable": "./build-${input:mbedTarget}/${input:mbedDebugProfile}/chip-mbed-unit-tests.elf", "armToolchainPath": "${env:PW_ENVIRONMENT_ROOT}/cipd/pigweed/bin/", // Pigweed environment bootstraping required "servertype": "external", @@ -252,7 +252,7 @@ "name": "Flash Mbed unit tests", "type": "cortex-debug", "request": "launch", - "cwd": "${workspaceRoot}/src/test_driver/mbed", + "cwd": "${workspaceRoot}/src/test_driver/mbed//unit_tests", "executable": "./build-${input:mbedTarget}/${input:mbedFlashProfile}/chip-mbed-unit-tests.elf", "armToolchainPath": "${env:PW_ENVIRONMENT_ROOT}/cipd/pigweed/bin/", // Pigweed environment bootstraping required "servertype": "openocd", @@ -277,7 +277,7 @@ "name": "Flash Mbed unit tests [remote]", "type": "cortex-debug", "request": "launch", - "cwd": "${workspaceRoot}/src/test_driver/mbed", + "cwd": "${workspaceRoot}/src/test_driver/mbed/unit_tests", "executable": "./build-${input:mbedTarget}/${input:mbedFlashProfile}/chip-mbed-unit-tests.elf", "armToolchainPath": "${env:PW_ENVIRONMENT_ROOT}/cipd/pigweed/bin/", // Pigweed environment bootstraping required "servertype": "external", diff --git a/.vscode/tasks.json b/.vscode/tasks.json index ce4fecb97314ee..fda87757b21a9c 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -167,7 +167,7 @@ "base": "$gcc", "fileLocation": [ "relative", - "${workspaceFolder}/src/test_driver/mbed/build" + "${workspaceFolder}/src/test_driver/mbed/unit_tests/build" ] } } diff --git a/scripts/constraints.txt b/scripts/constraints.txt index ed8c52b36fa478..7536963ec9345a 100644 --- a/scripts/constraints.txt +++ b/scripts/constraints.txt @@ -9,13 +9,16 @@ anytree==2.8.0 appdirs==1.4.4 # via # -r requirements.txt + # mbed-os-tools # virtualenv appnope==0.1.2 # via -r requirements.txt +attrs==21.2.0 + # via pytest backcall==0.2.0 # via ipython -bidict==0.21.2 - # via python-socketio +beautifulsoup4==4.10.0 + # via mbed-os-tools bitstring==3.1.7 # via -r requirements.esp32.txt brotli==1.0.9 @@ -29,13 +32,18 @@ chardet==4.0.0 click==7.1.2 # via # -r requirements.esp32.txt + # -r requirements.txt # flask # mbed-tools # pip-tools colorama==0.4.4 - # via west + # via + # mbed-os-tools + # west coloredlogs==15.0.1 # via -r requirements.txt +construct==2.10.54 + # via -r requirements.esp32.txt cryptography==3.4.7 # via -r requirements.esp32.txt cxxfilt==0.2.2 @@ -50,6 +58,10 @@ docopt==0.6.2 # via pykwalify ecdsa==0.17.0 # via -r requirements.esp32.txt +fastcore==1.3.26 + # via ghapi +fasteners==0.16.3 + # via mbed-os-tools filelock==3.0.12 # via virtualenv flask-compress==1.10.0 @@ -64,11 +76,14 @@ flask==0.12.5 future==0.18.2 # via # -r requirements.esp32.txt + # mbed-os-tools # mobly -gdbgui==0.13.2.0 ; platform_machine != "aarch64" +gdbgui==0.13.2.0 # via -r requirements.esp32.txt gevent==1.5.0 # via gdbgui +ghapi==0.1.19 + # via -r requirements.txt gitdb==4.0.7 # via gitpython gitpython==3.1.14 @@ -79,8 +94,12 @@ humanfriendly==9.2 # via coloredlogs idna==2.10 # via requests +iniconfig==1.1.1 + # via pytest intelhex==2.3.0 - # via -r requirements.txt + # via + # -r requirements.txt + # mbed-os-tools ipython-genutils==0.2.0 # via traitlets ipython==7.24.1 @@ -93,8 +112,14 @@ jinja2==3.0.1 # via # flask # mbed-tools +junit-xml==1.9 + # via mbed-os-tools +kconfiglib==13.7.1 + # via -r requirements.esp32.txt lockfile==0.12.2 - # via -r requirements.txt + # via + # -r requirements.txt + # mbed-os-tools mako==1.1.4 # via pdoc3 markdown==3.3.4 @@ -105,14 +130,22 @@ markupsafe==2.0.1 # mako matplotlib-inline==0.1.2 # via ipython -mbed-tools==7.22.0 +mbed-ls==1.8.11 ; platform_machine != "aarch64" and sys_platform == "linux" + # via -r requirements.mbed.txt +mbed-os-tools==1.8.11 + # via mbed-ls +mbed-tools==7.22.0 ; platform_machine != "aarch64" and sys_platform == "linux" # via -r requirements.mbed.txt mobly==1.10.1 # via -r requirements.txt numpy==1.20.3 # via pandas packaging==20.9 - # via west + # via + # fastcore + # ghapi + # pytest + # west pandas==1.2.4 ; platform_machine != "aarch64" and platform_machine != "arm64" # via -r requirements.txt parso==0.8.2 @@ -129,12 +162,16 @@ pickleshare==0.7.5 # via ipython pip-tools==6.1.0 # via -r requirements.txt +pluggy==1.0.0 + # via pytest portpicker==1.4.0 # via # -r requirements.txt # mobly -prettytable==0.7.2 - # via -r requirements.mbed.txt +prettytable==2.2.1 + # via + # mbed-ls + # mbed-os-tools prompt-toolkit==3.0.20 # via ipython protobuf==3.17.3 @@ -146,6 +183,8 @@ psutil==5.8.0 # mobly ptyprocess==0.7.0 # via pexpect +py==1.10.0 + # via pytest pycparser==2.20 # via cffi pyelftools==0.27 @@ -167,18 +206,23 @@ pyparsing==2.3.1 pyserial==3.5 # via # -r requirements.esp32.txt + # mbed-os-tools # mbed-tools # mobly +pytest==6.2.5 ; platform_machine != "aarch64" and sys_platform == "linux" + # via -r requirements.mbed.txt python-dateutil==2.8.1 # via # pandas # pykwalify python-dotenv==0.17.1 # via mbed-tools -python-engineio==4.2.0 +python-engineio==3.14.2 # via python-socketio -python-socketio<5 - # via flask-socketio +python-socketio==4.6.1 + # via + # -r requirements.esp32.txt + # flask-socketio pytz==2021.1 # via pandas pyudev==0.22.0 @@ -192,6 +236,7 @@ reedsolo==1.5.4 requests==2.25.1 # via # -r requirements.txt + # mbed-os-tools # mbed-tools ruamel.yaml.clib==0.2.2 # via ruamel.yaml @@ -201,18 +246,27 @@ six==1.16.0 # via # anytree # ecdsa + # fasteners + # junit-xml + # mbed-os-tools # protobuf # python-dateutil + # python-engineio + # python-socketio # pyudev # virtualenv smmap==4.0.0 # via gitdb +soupsieve==2.2.1 + # via beautifulsoup4 tabulate==0.8.9 # via mbed-tools timeout-decorator==0.5.0 # via mobly toml==0.10.2 - # via pep517 + # via + # pep517 + # pytest tqdm==4.61.1 # via mbed-tools traitlets==5.0.5 @@ -228,7 +282,9 @@ virtualenv==20.4.7 watchdog==2.1.2 # via -r requirements.txt wcwidth==0.2.5 - # via prompt-toolkit + # via + # prettytable + # prompt-toolkit werkzeug==0.16.1 # via flask west==0.11.0 diff --git a/scripts/requirements.mbed.txt b/scripts/requirements.mbed.txt index 0197a607c7d4b1..47907c083849eb 100644 --- a/scripts/requirements.mbed.txt +++ b/scripts/requirements.mbed.txt @@ -1,2 +1,3 @@ mbed-tools>=7.0.0 ; platform_machine != 'aarch64' and sys_platform == 'linux' -prettytable==0.7.2 ; platform_machine != 'aarch64' and sys_platform == 'linux' +pytest==6.2.5 ; platform_machine != 'aarch64' and sys_platform == 'linux' +mbed-ls==1.8.11 ; platform_machine != 'aarch64' and sys_platform == 'linux' diff --git a/scripts/tests/mbed/mbed_unit_tests.sh b/scripts/tests/mbed/mbed_unit_tests.sh index 8829f3c656c4f5..8e79635f6ed2ae 100755 --- a/scripts/tests/mbed/mbed_unit_tests.sh +++ b/scripts/tests/mbed/mbed_unit_tests.sh @@ -18,7 +18,7 @@ cd "$(dirname "$0")"/../../.. CHIP_ROOT=$PWD -cd "$CHIP_ROOT"/src/test_driver/mbed/ +cd "$CHIP_ROOT"/src/test_driver/mbed/unit_tests/ SUPPORTED_TOOLCHAIN=(GCC_ARM ARM) SUPPORTED_TARGET_BOARD=(CY8CPROTO_062_4343W) diff --git a/scripts/tests/mbed/mount_new_mbed_device.sh b/scripts/tests/mbed/mount_new_mbed_device.sh new file mode 100644 index 00000000000000..885102d1802a14 --- /dev/null +++ b/scripts/tests/mbed/mount_new_mbed_device.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +DIR=/dev/disk/by-id +DEVICE_CONFIG_FILE=/etc/fstab + +echo "Add new device start..." +echo "Connect your device via USB" + +res=$(inotifywait "$DIR") +if [[ ${res} != *"CREATE"* ]]; then + echo "Create new device in /dev/disk/by-id directory error" + exit 1 +fi + +new_device_id=$(echo "$res" | awk '{printf $NF}') + +if sudo cat "$DEVICE_CONFIG_FILE" | grep -q "$new_device_id"; then + echo "$new_device_id device is already set" + exit 0 +fi + +device_index=1 + +while :; do + dir_name="MBED_$device_index" + if [ ! -d /media/"$dir_name" ]; then + break + fi + + ((device_index++)) +done + +sudo mkdir -p /media/MBED_"$device_index" +line="/dev/disk/by-id/$new_device_id /media/MBED_$device_index vfat rw,user,sync,nofail,umask=007,gid=20 0 0" +echo "$line" | sudo tee -a "$DEVICE_CONFIG_FILE" + +echo "Finished" diff --git a/src/test_driver/mbed/integration_tests/README.md b/src/test_driver/mbed/integration_tests/README.md new file mode 100644 index 00000000000000..9166fbf5f27435 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/README.md @@ -0,0 +1,234 @@ +

+ ARM Mbed-OS logo +

+ +

Matter Arm Mbed OS Integration Tests

+ +This page describes the Matter project's functional testing approach for Mbed +applications. It allows you to better understand the use of tools and frameworks +for target validation. + +
+ +- [Overview](#overview) +- [Setup](#setup) + - [Environment setup](#environment-setup) + - [Tools installation](#tools-installation) + - [Setup WiFi access point](#setup-wifi-access-point) + - [Mount the device](#mount-the-device) +- [Prepare device under test](#prepare-device-under-test) +- [Tests configuration](#tests-configuration) +- [Test run](#test-run) +- [Tests results](#tests-results) + +
+ +# Overview + +According to the Matter project design idea: + +_"The ability to run tests on actual and emulated hardware is paramount in +embedded projects. CHIP is no exception. We want on-device testing to be a first +class goal of CHIP architecture. On-device testing requirements apply both to +Continuous Integration testing for main CHIP software stack development and to +eventual CHIP product certification."_ + +However, functional testing requires a host machine to run and control the +testing program. Connection and data exchange between the host machine and the +device under test is necessary. Various communication interfaces and protocols +are used for this purpose (i.e serial port, network communication via Ethernet, +or WiFi). + +The common functional test scenario involves the following steps: + +- Get binaries image of testing application (build directly or download CI + artifacts) +- Flash binary image to the device under test +- Configuration functional tests +- Run testing +- Collecting and processing the results + +Matter Arm Mbed OS Integration Tests happens at system level on real hardware. +The host machine running the tests should be connected to the board under test +via serial port. The tests send a succession of commands to the connected boards +or trigger external tools action that changes the device state and asserts the +results reported by the hardware. + +For example, here is a test validating that WiFi connection works: + +**Preparation:** The device is connected to the host via serial port and +executes the shell example application. + +**The test:** + +1. Checking if the device is connected +1. Sending the command connecting with network credentials +1. Check device response +1. Check the device IP address + +The Mbed integration tests are coded in Python and use the pyTest framework. +This also allows for easy integration with external CHIP tools such as +device-controller. + +Python CHIP Controller and RPC console are required for the correct run +integration tests. + +# Setup + +## Environment setup + +The first step check out the Matter repository and sync submodules using the +following command: + + $ git submodule update --init + +Building the application requires the use of **ARM Mbed-OS** sources and the +**arm-none-gnu-eabi** toolchain. The OpenOCD package is used for flashing +purpose.
Some additional packages may be needed, depending on selected +build target and its requirements. + +> **The VSCode devcontainer has these components pre-installed. Using the VSCode +> devcontainer is the recommended way to interact with Arm Mbed-OS port of the +> Matter Project.** +> +> **Please read this [README.md](../../../../docs/VSCODE_DEVELOPMENT.md) for +> more information about using VSCode in container.** + +To initialize the development environment, download all registered sub-modules +and activate the environment: + +``` +$ source ./scripts/bootstrap.sh +$ source ./scripts/activate.sh +``` + +If packages are already installed then you just need to activate the development +environment: + +``` +$ source ./scripts/activate.sh +``` + +## Tools installation + +Python CHIP Controller and RPC console are required for the correct run +integration tests. Build and install them inside the development environment. + +For building and installing Python CHIP Controller please visit +[Python CHIP Controller](../../../../docs/guides/python_chip_controller_building.md) + +For building and installing RPC console please visit +[CHIP RPC CONSOLE](../../../../examples/common/pigweed/rpc_console/README.md) + +## Setup WiFi access point + +The way of running the WiFi access point depends on the platform on which the +tests are run. Choose the best one. Just remember the network credentials ssid +and password. + +## Mount the device + +There is a special script for the easy Mbed board mounting. It adds a static +entry in fstab with a new connected device. + +Then you can just need to call `sudo mount -a` to mount the device. + +To add the new Mbed device you should: + +1. Run script `bash src/scripts/tests/mbed/mount_new_mbed_device.sh` +2. Plug the device +3. Run `sudo mount -a` + +That's all. The new device is ready for testing. + +# Prepare device under test + +Preparation of the device for testing should consist of two steps: + +- prepare binary image - building an right application from sources +- flash binary image to device + +For more information how to build and flash example application please visit +their documentation. + +# Tests configuration + +Mbed integration tests can be configured by PyTest command line arguments. Every +test call may contain a specific test configuration. The list of supported +parameters: + +**[Common]** + +- _platforms_ - list of platforms that can be used to run the tests. Platforms + are separated by a comma +- _binaries_ - platform and associated binary in the form platform:binary. + Multiple values are separated by a comma +- _serial_inter_byte_delay_ - time in second between two bytes sent on the + serial line (accepts floats), default=None +- _serial_baudrate_ - baudrate of the serial port used, default=115200 + +**[Test specific]** + +- _network_ - WiFi network credentials to which we want to connect device. + Format network_ssid:network_password + +# Test run + +To run Mbed integration tests execute the pytest command with the arguments +mentioned above. For example: + +`pytest --platforms=CY8CPROTO_062_4343W --network=$AP_SSID:$AP_PASSWORD ... src/test_driver/mbed/integration_tests/{APP_NAME}/test_app.py` + +The Mbed integration testes cases are divided into separate directory depends on +the testing Matter application: + +- shell - testing shell commands and check base device functionalities +- lock-app - testing WiFi provisioning and execute ZCL command to control + lock, use RPC client to run base device functionalities, control lock and + trigger some button actions +- lighting-app - testing WiFi provisioning and execute ZCL command to control + light, use RPC client to run base device functionalities, control light and + trigger some button actions +- pigweed-app - use RPC client to send echo message and receive the response +- unit-tests - check unit-tests result + +For more details on how to run tests using PyTest see: +[PyTest doc](https://docs.pytest.org/en/6.2.x/usage.html) + +Pytest markers have been added to run a specific set of tests: + +- smoketest - check base communication and correct launch of the application + +# Tests results + +Adding `-rAv` arguments to Pytest cause that short tests summary is on the +output. + +For example: + +``` +pytest -rAv simple_test.py +``` + +Output: + +``` +=================================================================================================================== short test summary info ==================================================================================================================== +PASSED CHIP/src/test_driver/mbed/integration_tests/shell/test_app.py::test_smoke_test +PASSED CHIP/src/test_driver/mbed/integration_tests/shell/test_app.py::test_help_check +PASSED CHIP/src/test_driver/mbed/integration_tests/shell/test_app.py::test_log_check +PASSED CHIP/src/test_driver/mbed/integration_tests/shell/test_app.py::test_rand_check +PASSED CHIP/src/test_driver/mbed/integration_tests/shell/test_app.py::test_base64_encode_decode +FAILED CHIP/src/test_driver/mbed/integration_tests/shell/test_app.py::test_wifi_mode - AssertionError: assert 'true' == 'false' +===================================================================================================== 1 failed, 4 passed, 7 warnings in 20.71s (0:03:29) ===================================================================================================== +``` + +There is also an option to save test results to HTML file. Adding +`--html=` arguments to Pytest cause that all test results are +saved to HTML file and can be open in your browser. + +For example: + +``` +pytest --html=result.html simple_test.py +``` diff --git a/src/test_driver/mbed/integration_tests/common/__init__.py b/src/test_driver/mbed/integration_tests/common/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/src/test_driver/mbed/integration_tests/common/device.py b/src/test_driver/mbed/integration_tests/common/device.py new file mode 100644 index 00000000000000..61879f2ca77a42 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/common/device.py @@ -0,0 +1,118 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import queue +from time import sleep, time +from typing import Optional + +log = logging.getLogger(__name__) + + +class Device: + + def __init__(self, name: Optional[str] = None): + """ + Base Device runner class containing device handling functions and logging + :param name: Logging name for the client + """ + self.iq = queue.Queue() + self.oq = queue.Queue() + if name is None: + self.name = str(hex(id(self))) + else: + self.name = name + + def send(self, command, expected_output=None, wait_before_read=None, wait_for_response=10, assert_output=True): + """ + Send command for client + :param command: Command + :param expected_output: Reply to wait from the client + :param wait_before_read: Timeout after write + :param wait_for_response: Timeout waiting the response + :param assert_output: Assert the fail situations to end the test run + :return: If there's expected output then the response line is returned + """ + log.debug('{}: Sending command to client: "{}"'.format( + self.name, command)) + self.flush(0) + self._write(command) + if expected_output is not None: + if wait_before_read is not None: + sleep(wait_before_read) + return self.wait_for_output(expected_output, wait_for_response, assert_output) + + def flush(self, timeout: float = 0) -> [str]: + """ + Flush the lines in the input queue + :param timeout: The timeout before flushing starts + :type timeout: float + :return: The lines removed from the input queue + :rtype: list of str + """ + sleep(timeout) + lines = [] + while True: + try: + lines.append(self._read_line(0)) + except queue.Empty: + return lines + + def wait_for_output(self, search: str, timeout: float = 10, assert_timeout: bool = True) -> [str]: + """ + Wait for expected output response + :param search: Expected response string + :type search: str + :param timeout: Response waiting time + :type timeout: float + :param assert_timeout: Assert on timeout situations + :type assert_timeout: bool + :return: Line received before a match + :rtype: list of str + """ + lines = [] + start = time() + now = 0 + timeout_error_msg = '{}: Didn\'t find {} in {} s'.format( + self.name, search, timeout) + + while time() - start <= timeout: + try: + line = self._read_line(1) + if line: + lines.append(line) + if search in line: + end = time() + return lines + + except queue.Empty: + last = now + now = time() + if now - start >= timeout: + if assert_timeout: + log.error(timeout_error_msg) + assert False, timeout_error_msg + else: + log.warning(timeout_error_msg) + return [] + if now - last > 1: + log.info('{}: Waiting for "{}" string... Timeout in {:.0f} s'.format(self.name, search, + abs(now - start - timeout))) + + def _write(self, data): + self.oq.put(data) + + def _read_line(self, timeout): + return self.iq.get(timeout=timeout) diff --git a/src/test_driver/mbed/integration_tests/common/fixtures.py b/src/test_driver/mbed/integration_tests/common/fixtures.py new file mode 100644 index 00000000000000..32f943c98a5a6f --- /dev/null +++ b/src/test_driver/mbed/integration_tests/common/fixtures.py @@ -0,0 +1,137 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional, Any, Mapping + +import mbed_lstools +import pytest +from time import sleep + +from .device import Device + +from .serial_connection import SerialConnection +from .serial_device import SerialDevice + +import logging +log = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def platforms(request): + if request.config.getoption('platforms'): + return request.config.getoption('platforms').split(',') + else: + return [ + 'CY8CPROTO_062_4343W' + ] + + +@pytest.fixture(scope="session") +def serial_inter_byte_delay(request): + if request.config.getoption('serial_inter_byte_delay'): + return float(request.config.getoption('serial_inter_byte_delay')) + return None + + +@pytest.fixture(scope="session") +def serial_baudrate(request): + if request.config.getoption('serial_baudrate'): + return int(request.config.getoption('serial_baudrate')) + return 115200 + + +@pytest.fixture(scope="session") +def network(request): + if request.config.getoption('network'): + credentials = request.config.getoption('network') + params = credentials.split(':') + return (params[0], params[1]) + return None + + +class BoardAllocation: + def __init__(self, description: Mapping[str, Any]): + self.description = description + self.device = None + + +class BoardAllocator: + def __init__(self, platforms_supported: List[str], serial_inter_byte_delay: float, baudrate: int): + mbed_ls = mbed_lstools.create(list_unmounted=True) + boards = mbed_ls.list_mbeds( + filter_function=lambda m: m['platform_name'] in platforms_supported) + self.board_description = boards + self.allocation = [] + self.serial_inter_byte_delay = serial_inter_byte_delay + self.baudrate = baudrate + for desc in boards: + self.allocation.append(BoardAllocation(desc)) + + def allocate(self, name: str = None): + for alloc in self.allocation: + if alloc.device is None: + platform = alloc.description['platform_name'] + log.info('Start {} board allocation'.format(platform)) + + # Create the serial connection + connection = SerialConnection( + port=alloc.description["serial_port"], + baudrate=self.baudrate, + inter_byte_delay=self.serial_inter_byte_delay + ) + connection.open() + + # Create the serial device + alloc.device = SerialDevice(connection, name) + alloc.device.start() + alloc.device.reset(duration=1) + alloc.device.flush(1) + + sleep(2) + + log.info('Allocate {} board as serial device'.format(platform)) + + return alloc.device + return None + + def release(self, device: Device) -> None: + for alloc in self.allocation: + if alloc.device == device and alloc.device is not None: + + # Stop activities + alloc.device.stop() + alloc.device.serial.close() + + # Cleanup + alloc.device = None + + log.info('Release {} board'.format( + alloc.description['platform_name'])) + + +@pytest.fixture(scope="session") +def board_allocator( + platforms: List[str], + serial_inter_byte_delay: float, + serial_baudrate: int, +): + yield BoardAllocator(platforms, serial_inter_byte_delay, serial_baudrate) + + +@pytest.fixture(scope="function") +def device(board_allocator): + device = board_allocator.allocate(name='DUT') + yield device + board_allocator.release(device) diff --git a/src/test_driver/mbed/integration_tests/common/pigweed_client.py b/src/test_driver/mbed/integration_tests/common/pigweed_client.py new file mode 100644 index 00000000000000..eb5c4b992e6209 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/common/pigweed_client.py @@ -0,0 +1,45 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from pathlib import Path +import importlib + +from pw_hdlc.rpc import HdlcRpcClient, default_channels + + +class PigweedClient: + def __init__(self, device, protos): + """ + Pigweed Client class containing RPC client initialization and service functions + Create HdlcRpcCLient object and redirect serial communication to it + :param device: test device instance + :param protos: array of RPC protocols + """ + self.device = device + self.device.stop() + self.last_timeout = self.device.serial.get_timeout() + self.device.serial.set_timeout(0.01) + self._pw_rpc_client = HdlcRpcClient(lambda: self.device.serial.read(4096), + protos, default_channels(self.device.serial.write)) + self._rpcs = self._pw_rpc_client.rpcs() + + def __del__(self): + self.device.serial.set_timeout(self.last_timeout) + self.device.start() + + @property + def rpcs(self): + return self._rpcs diff --git a/src/test_driver/mbed/integration_tests/common/serial_connection.py b/src/test_driver/mbed/integration_tests/common/serial_connection.py new file mode 100644 index 00000000000000..e3a621baaad37e --- /dev/null +++ b/src/test_driver/mbed/integration_tests/common/serial_connection.py @@ -0,0 +1,128 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from time import sleep + +from serial import Serial, SerialException + +log = logging.getLogger(__name__) + + +class SerialConnection: + """ + Serial Connection class containing serial connection handling functions + :param port: serial port name + :param baudrate: serial baudrate + :param timeout: serial read timeout in seconds (float) + :param inter_byte_delay: time in second between two bytes sent on the serial line (float) + """ + + def __init__(self, port=None, baudrate=9600, timeout=1, inter_byte_delay=None): + self.ser = Serial(port, baudrate, timeout=timeout) + self.inter_byte_delay = inter_byte_delay + + def open(self): + """ + Open serial port connection + """ + if not self.ser.is_open: + self.ser.open() + + def read(self, size=1): + """ + Read bytes from serial port + :return: Bytes from serial stream + """ + if not self.ser.is_open: + return None + try: + output = self.ser.read(size) + return output + except SerialException as se: + log.error('Serial connection read error: {}'.format(se)) + return None + + def readline(self): + """ + Read line from serial port + :return: One line from serial stream + """ + if not self.ser.is_open: + return None + try: + output = self.ser.readline() + return output + except SerialException as se: + log.error('Serial connection read line error: {}'.format(se)) + return None + + def write(self, data): + """ + Write data to serial port + :param data: Data to send + """ + if not self.ser.is_open: + return + try: + if self.inter_byte_delay: + for byte in data: + self.ser.write(bytes([byte])) + sleep(self.inter_byte_delay) + else: + self.ser.write(data) + except SerialException as se: + log.error('Serial connection write error: {}'.format(se)) + + def send_break(self, duration=0.25): + """ + Send break condition to serial port + :param duration: Break duration + """ + if not self.ser.is_open: + return None + try: + self.ser.send_break(duration) + except SerialException as se: + log.error('Serial connection send break error: {}'.format(se)) + + def get_timeout(self): + """ + Get read timeout of serial port + :return: Read timeout value in seconds + """ + try: + timeout = self.ser.timeout + except SerialException as se: + log.error('Serial connection get read timeout error: {}'.format(se)) + + return timeout + + def set_timeout(self, timeout=1): + """ + Set read timeout of serial port + :param timeout: timeout value in seconds + """ + try: + self.ser.timeout = timeout + except SerialException as se: + log.error('Serial connection set read timeout error: {}'.format(se)) + + def close(self): + """ + Close serial port connection + """ + log.info('Close serial port') + self.ser.close() diff --git a/src/test_driver/mbed/integration_tests/common/serial_device.py b/src/test_driver/mbed/integration_tests/common/serial_device.py new file mode 100644 index 00000000000000..4e8205e39f7783 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/common/serial_device.py @@ -0,0 +1,119 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +import threading + +from .device import Device + +log = logging.getLogger(__name__) + + +def strip_escape(string_to_escape) -> str: + """ + Strip escape characters from string. + :param string_to_escape: string to work on + :return: stripped string + """ + raw_ansi_pattern = r'\033\[((?:\d|;)*)([a-zA-Z])' + ansi_pattern = raw_ansi_pattern.encode() + ansi_eng = re.compile(ansi_pattern) + matches = [] + for match in ansi_eng.finditer(string_to_escape): + matches.append(match) + matches.reverse() + for match in matches: + start = match.start() + end = match.end() + string_to_escape = string_to_escape[0:start] + string_to_escape[end:] + return string_to_escape + + +class SerialDevice(Device): + + def __init__(self, serial_connection, name=None): + """ + Serial Device runner class containing device handling functions and logging, inherits the Device runner class + :param serial_connection: Serial connection object + :param name: Logging name for the client + """ + self.serial = serial_connection + self.run = False + super(SerialDevice, self).__init__(name) + + input_thread_name = '<-- {}'.format(self.name) + output_thread_name = '--> {}'.format(self.name) + + self.it = threading.Thread( + target=self._input_thread, name=input_thread_name) + self.ot = threading.Thread( + target=self._output_thread, name=output_thread_name) + + def reset(self, duration=0.25): + """ + Sends break to serial connection + :param duration: Break duration + """ + self.serial.send_break(duration) + + def start(self): + """ + Start the processing of the serial + """ + log.info('Starting "{}" runner...'.format(self.name)) + self.run = True + self.it.start() + self.ot.start() + log.info('"{}" runner started'.format(self.name)) + + def stop(self): + """ + Stop the processing of the serial + """ + log.info('Stopping "{}" runner...'.format(self.name)) + self.run = False + self.oq.put(None) + self.it.join() + self.ot.join() + log.info('"{}" runner stoped'.format(self.name)) + + def _input_thread(self): + while self.run: + line = self.serial.readline() + if line: + plain_line = strip_escape(line) + # Testapp uses \r to print characters to the same line, strip those and return only the last part + # If there is only one \r, don't remove anything. + if b'\r' in line and line.count(b'\r') > 1: + plain_line = plain_line.split(b'\r')[-2] + # Debug traces use tabulator characters, change those to spaces for readability + plain_line = plain_line.replace(b'\t', b' ') + plain_line = plain_line.decode('utf-8', 'ignore') + plain_line.rstrip() + log.info('<--|{}| {}'.format(self.name, plain_line.strip())) + self.iq.put(plain_line) + else: + pass + + def _output_thread(self): + while self.run: + line = self.oq.get() + if line: + log.info('-->|{}| {}'.format(self.name, line.strip())) + data = line + '\n' + self.serial.write(data.encode('utf-8')) + else: + log.debug('Nothing sent') diff --git a/src/test_driver/mbed/integration_tests/common/utils.py b/src/test_driver/mbed/integration_tests/common/utils.py new file mode 100644 index 00000000000000..1f22eddbd15df1 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/common/utils.py @@ -0,0 +1,295 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import sys +import os +import platform +import random +import shlex +import re + +from chip.setup_payload import SetupPayload +from chip import exceptions + +if platform.system() == 'Darwin': + from chip.ChipCoreBluetoothMgr import CoreBluetoothManager as BleManager +elif sys.platform.startswith('linux'): + from chip.ChipBluezMgr import BluezManager as BleManager + +import logging +log = logging.getLogger(__name__) + + +class ParsingError(exceptions.ChipStackException): + def __init__(self, msg=None): + self.msg = "Parsing Error: " + msg + + def __str__(self): + return self.msg + + +def get_device_details(device): + """ + Get device details from logs + :param device: serial device instance + :return: device details dictionary or None + """ + ret = device.wait_for_output("SetupQRCode") + if ret == None or len(ret) < 2: + return None + + qr_code = re.sub( + r"[\[\]]", "", ret[-1].partition("SetupQRCode:")[2]).strip() + try: + device_details = dict(SetupPayload().ParseQrCode( + "VP:vendorpayload%{}".format(qr_code)).attributes) + except exceptions.ChipStackError as ex: + log.error(ex.msg) + return None + + return device_details + + +def ParseEncodedString(value): + if value.find(":") < 0: + raise ParsingError( + "Value should be encoded in encoding:encodedvalue format") + enc, encValue = value.split(":", 1) + if enc == "str": + return encValue.encode("utf-8") + b'\x00' + elif enc == "hex": + return bytes.fromhex(encValue) + raise ParsingError("Only str and hex encoding is supported") + + +def ParseValueWithType(value, type): + if type == 'int': + return int(value) + elif type == 'str': + return value + elif type == 'bytes': + return ParseEncodedString(value) + elif type == 'bool': + return (value.upper() not in ['F', 'FALSE', '0']) + else: + raise ParsingError('Cannot recognize type: {}'.format(type)) + + +def FormatZCLArguments(args, command): + commandArgs = {} + for kvPair in args: + if kvPair.find("=") < 0: + raise ParsingError("Argument should in key=value format") + key, value = kvPair.split("=", 1) + valueType = command.get(key, None) + commandArgs[key] = ParseValueWithType(value, valueType) + return commandArgs + + +def send_zcl_command(devCtrl, line): + """ + Send ZCL message to device: + [key=value]... + :param devCtrl: device controller instance + :param line: command line + :return: error code and command responde + """ + res = None + err = 0 + try: + args = shlex.split(line) + all_commands = devCtrl.ZCLCommandList() + if len(args) < 5: + raise exceptions.InvalidArgumentCount(5, len(args)) + + if args[0] not in all_commands: + raise exceptions.UnknownCluster(args[0]) + command = all_commands.get(args[0]).get(args[1], None) + # When command takes no arguments, (not command) is True + if command == None: + raise exceptions.UnknownCommand(args[0], args[1]) + err, res = devCtrl.ZCLSend(args[0], args[1], int( + args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command), blocking=True) + if err != 0: + log.error("Failed to send ZCL command [{}] {}.".format(err, res)) + elif res != None: + log.info("Success, received command response:") + log.info(res) + else: + log.info("Success, no command response.") + except exceptions.ChipStackException as ex: + log.error("An exception occurred during processing ZCL command:") + log.error(str(ex)) + err = -1 + except Exception as ex: + log.error("An exception occurred during processing input:") + log.error(str(ex)) + err = -1 + + return (err, res) + + +def scan_chip_ble_devices(devCtrl): + """ + BLE scan CHIP device + BLE scanning for 10 seconds and collect the results + :param devCtrl: device controller instance + :return: List of visible BLE devices + """ + devices = [] + bleMgr = BleManager(devCtrl) + bleMgr.scan("-t 10") + + for device in bleMgr.peripheral_list: + devIdInfo = bleMgr.get_peripheral_devIdInfo(device) + if devIdInfo: + devInfo = devIdInfo.__dict__ + devInfo["name"] = device.Name + devices.append(devInfo) + + return devices + + +def check_chip_ble_devices_advertising(devCtrl, name, deviceDetails=None): + """ + Check if CHIP device advertise + BLE scanning for 10 seconds and compare with device details + :param devCtrl: device controller instance + :param name: device advertising name + :param name: device details + :return: True if device advertise else False + """ + ble_chip_device = scan_chip_ble_devices(devCtrl) + if ble_chip_device == None or len(ble_chip_device) == 0: + log.info("No BLE CHIP device found") + return False + + chip_device_found = False + + for ble_device in ble_chip_device: + if deviceDetails != None: + if (ble_device["name"] == name and + int(ble_device["discriminator"]) == int(deviceDetails["Discriminator"]) and + int(ble_device["vendorId"]) == int(deviceDetails["VendorID"]) and + int(ble_device["productId"]) == int(deviceDetails["ProductID"])): + chip_device_found = True + break + else: + if (ble_device["name"] == name): + chip_device_found = True + break + + return chip_device_found + + +def connect_device_over_ble(devCtrl, discriminator, pinCode, nodeId=None): + """ + Connect to Matter accessory device over BLE + :param devCtrl: device controller instance + :param discriminator: CHIP device discriminator + :param pinCode: CHIP device pin code + :param nodeId: default value of node ID + :return: node ID is provisioning successful, otherwise None + """ + if nodeId == None: + nodeId = random.randint(1, 1000000) + + try: + devCtrl.ConnectBLE(int(discriminator), int(pinCode), int(nodeId)) + except exceptions.ChipStackException as ex: + log.error("Connect device over BLE failed: {}".format(str(ex))) + return None + + return nodeId + + +def close_connection(devCtrl, nodeId): + """ + Close the BLE connection + :param devCtrl: device controller instance + :return: true if successful, otherwise false + """ + try: + devCtrl.CloseSession(nodeId) + except exceptions.ChipStackException as ex: + log.error("Close session failed: {}".format(str(ex))) + return False + + return True + + +def close_ble(devCtrl): + """ + Close the BLE connection + :param devCtrl: device controller instance + :return: true if successful, otherwise false + """ + try: + devCtrl.CloseBLEConnection() + except exceptions.ChipStackException as ex: + log.error("Close BLE connection failed: {}".format(str(ex))) + return False + + return True + + +def commissioning_wifi(devCtrl, ssid, password, nodeId): + """ + Commissioning a Wi-Fi device + :param devCtrl: device controller instance + :param ssid: network ssid + :param password: network password + :param nodeId: value of node ID + :return: error code + """ + + # Inject the credentials to the device + err, res = send_zcl_command( + devCtrl, "NetworkCommissioning AddWiFiNetwork {} 0 0 ssid=str:{} credentials=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid, password)) + if err != 0 and res["Status"] != 0: + log.error("Set Wi-Fi credentials failed [{}]".format(err)) + return err + + # Enable the Wi-Fi interface + err, res = send_zcl_command( + devCtrl, "NetworkCommissioning EnableNetwork {} 0 0 networkID=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid)) + if err != 0 and res["Status"] != 0: + log.error("Enable Wi-Fi failed [{}]".format(err)) + return err + + return err + + +def resolve_device(devCtrl, nodeId): + """ + Discover IP address and port of the device. + :param devCtrl: device controller instance + :param nodeId: value of node ID + :return: device IP address and port if successful, otherwise None + """ + ret = None + try: + err = devCtrl.ResolveNode(int(nodeId)) + if err == 0: + ret = devCtrl.GetAddressAndPort(int(nodeId)) + if ret == None: + log.error("Get address and port failed") + else: + log.error("Resolve node failed [{}]".format(err)) + except exceptions.ChipStackException as ex: + log.error("Resolve node failed {}".format(str(ex))) + + return ret diff --git a/src/test_driver/mbed/integration_tests/conftest.py b/src/test_driver/mbed/integration_tests/conftest.py new file mode 100644 index 00000000000000..56f549b83d119c --- /dev/null +++ b/src/test_driver/mbed/integration_tests/conftest.py @@ -0,0 +1,34 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +pytest_plugins = ['common.fixtures'] + + +def pytest_addoption(parser): + """ + Function for pytest to enable own custom commandline arguments + :param parser: argparser + :return: + """ + parser.addoption('--platforms', action='store', + help='List of platforms that can be used to run the tests. Platforms are separated by a comma') + parser.addoption('--serial_inter_byte_delay', action='store', + help='Time in second between two bytes sent on the serial line (accepts floats)') + parser.addoption('--serial_baudrate', action='store', + help='Baudrate of the serial port used', default='115200') + parser.addoption('--network', action='store', + help='WiFi network credentials to which we want to connect device. Format network_ssid:network_password') diff --git a/src/test_driver/mbed/integration_tests/lighting-app/__init__.py b/src/test_driver/mbed/integration_tests/lighting-app/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/src/test_driver/mbed/integration_tests/lighting-app/test_app.py b/src/test_driver/mbed/integration_tests/lighting-app/test_app.py new file mode 100644 index 00000000000000..27099d3ade10e2 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/lighting-app/test_app.py @@ -0,0 +1,223 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from time import sleep + +from chip import ChipDeviceCtrl + +from common.utils import * + +from common.pigweed_client import PigweedClient +from device_service import device_service_pb2 +from button_service import button_service_pb2 +from lighting_service import lighting_service_pb2 +from pw_status import Status + +import logging +log = logging.getLogger(__name__) + +BLE_DEVICE_NAME = "MBED-lighting" +DEVICE_NODE_ID = 1234 +TEST_BRIGHTNESS_LEVEL = 200 +RPC_PROTOS = [device_service_pb2, button_service_pb2, lighting_service_pb2] + + +@pytest.mark.smoketest +def test_smoke_test(device): + device.reset(duration=1) + ret = device.wait_for_output("Mbed lighting-app example application start") + assert ret != None and len(ret) > 0 + ret = device.wait_for_output("Mbed lighting-app example application run") + assert ret != None and len(ret) > 0 + + +def test_wifi_provisioning(device, network): + network_ssid = network[0] + network_pass = network[1] + + devCtrl = ChipDeviceCtrl.ChipDeviceController() + + device_details = get_device_details(device) + assert device_details != None and len(device_details) != 0 + + assert check_chip_ble_devices_advertising( + devCtrl, BLE_DEVICE_NAME, device_details) + + ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int( + device_details["SetUpPINCode"]), DEVICE_NODE_ID) + assert ret != None and ret == DEVICE_NODE_ID + + ret = device.wait_for_output("Device completed Rendezvous process") + assert ret != None and len(ret) > 0 + + ret = commissioning_wifi(devCtrl, network_ssid, + network_pass, DEVICE_NODE_ID) + assert ret == 0 + + ret = device.wait_for_output("StationConnected") + assert ret != None and len(ret) > 0 + + ret = device.wait_for_output("address set") + assert ret != None and len(ret) > 0 + + device_ip_address = ret[-1].partition("address set:")[2].strip() + + ret = resolve_device(devCtrl, DEVICE_NODE_ID) + assert ret != None and len(ret) == 2 + + ip_address = ret[0] + port = ret[1] + + assert device_ip_address == ip_address + + assert close_connection(devCtrl, DEVICE_NODE_ID) + assert close_ble(devCtrl) + + +def test_light_ctrl(device, network): + network_ssid = network[0] + network_pass = network[1] + + devCtrl = ChipDeviceCtrl.ChipDeviceController() + + device_details = get_device_details(device) + assert device_details != None and len(device_details) != 0 + + assert check_chip_ble_devices_advertising( + devCtrl, BLE_DEVICE_NAME, device_details) + + ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int( + device_details["SetUpPINCode"]), DEVICE_NODE_ID) + assert ret != None and ret == DEVICE_NODE_ID + + ret = device.wait_for_output("Device completed Rendezvous process") + assert ret != None and len(ret) > 0 + + ret = commissioning_wifi(devCtrl, network_ssid, + network_pass, DEVICE_NODE_ID) + assert ret == 0 + + ret = resolve_device(devCtrl, DEVICE_NODE_ID) + assert ret != None and len(ret) == 2 + + err, res = send_zcl_command( + devCtrl, "OnOff On {} 1 0".format(DEVICE_NODE_ID)) + assert err == 0 + + ret = device.wait_for_output("Turn On Action has been completed", 20) + assert ret != None and len(ret) > 0 + + err, res = send_zcl_command( + devCtrl, "OnOff Off {} 1 0".format(DEVICE_NODE_ID)) + assert err == 0 + + ret = device.wait_for_output("Turn Off Action has been completed", 20) + assert ret != None and len(ret) > 0 + + err, res = send_zcl_command( + devCtrl, "OnOff Toggle {} 1 0".format(DEVICE_NODE_ID)) + assert err == 0 + + ret = device.wait_for_output("Turn On Action has been completed", 20) + assert ret != None and len(ret) > 0 + + err, res = send_zcl_command(devCtrl, "LevelControl MoveToLevel {} 1 0 level={} transitionTime=1 optionMask=0 optionOverride=0".format( + DEVICE_NODE_ID, TEST_BRIGHTNESS_LEVEL)) + assert err == 0 + + ret = device.wait_for_output( + "Setting brightness level to {}".format(TEST_BRIGHTNESS_LEVEL), 20) + assert ret != None and len(ret) > 0 + + assert close_connection(devCtrl, DEVICE_NODE_ID) + assert close_ble(devCtrl) + + +def test_device_info_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo() + assert status.ok() == True + assert payload.vendor_id != None and payload.product_id != None and payload.serial_number != None + + device_details = get_device_details(device) + assert device_details != None and len(device_details) != 0 + + assert int(device_details["VendorID"]) == payload.vendor_id + assert int(device_details["ProductID"]) == payload.product_id + assert int(device_details["Discriminator"] + ) == payload.pairing_info.discriminator + assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code + + +def test_device_factory_reset_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset() + assert status.ok() == True + + +def test_device_reboot_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.Reboot() + assert status == Status.UNIMPLEMENTED + + +def test_device_ota_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta() + assert status == Status.UNIMPLEMENTED + + +def test_ligth_ctrl_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + + # Check light on + status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True) + assert status.ok() == True + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() == True + assert payload.on == True + + # Check light off + status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False) + assert status.ok() == True + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() == True + assert payload.on == False + + +def test_button_ctrl_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + + # Check button 0 (lighting) + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() == True + initial_state = bool(payload.on) + + compare_state = not initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) + assert status.ok() == True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() == True + assert payload.on == compare_state + + compare_state = initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) + assert status.ok() == True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Lighting.Get() + assert status.ok() == True + assert payload.on == compare_state diff --git a/src/test_driver/mbed/integration_tests/lock-app/__init__.py b/src/test_driver/mbed/integration_tests/lock-app/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/src/test_driver/mbed/integration_tests/lock-app/test_app.py b/src/test_driver/mbed/integration_tests/lock-app/test_app.py new file mode 100644 index 00000000000000..2fca2b5356f735 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/lock-app/test_app.py @@ -0,0 +1,214 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from time import sleep + +from chip import ChipDeviceCtrl + +from common.utils import * + +from common.pigweed_client import PigweedClient +from device_service import device_service_pb2 +from button_service import button_service_pb2 +from locking_service import locking_service_pb2 +from pw_status import Status + +import logging +log = logging.getLogger(__name__) + +BLE_DEVICE_NAME = "MBED-lock" +DEVICE_NODE_ID = 1234 +RPC_PROTOS = [device_service_pb2, button_service_pb2, locking_service_pb2] + + +@pytest.mark.smoketest +def test_smoke_test(device): + device.reset(duration=1) + ret = device.wait_for_output("Mbed lock-app example application start") + assert ret != None and len(ret) > 0 + ret = device.wait_for_output("Mbed lock-app example application run") + assert ret != None and len(ret) > 0 + + +def test_wifi_provisioning(device, network): + network_ssid = network[0] + network_pass = network[1] + + devCtrl = ChipDeviceCtrl.ChipDeviceController() + + device_details = get_device_details(device) + assert device_details != None and len(device_details) != 0 + + assert check_chip_ble_devices_advertising( + devCtrl, BLE_DEVICE_NAME, device_details) + + ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int( + device_details["SetUpPINCode"]), DEVICE_NODE_ID) + assert ret != None and ret == DEVICE_NODE_ID + + ret = device.wait_for_output("Device completed Rendezvous process") + assert ret != None and len(ret) > 0 + + ret = commissioning_wifi(devCtrl, network_ssid, + network_pass, DEVICE_NODE_ID) + assert ret == 0 + + ret = device.wait_for_output("StationConnected") + assert ret != None and len(ret) > 0 + + ret = device.wait_for_output("address set") + assert ret != None and len(ret) > 0 + + device_ip_address = ret[-1].partition("address set:")[2].strip() + + ret = resolve_device(devCtrl, DEVICE_NODE_ID) + assert ret != None and len(ret) == 2 + + ip_address = ret[0] + port = ret[1] + + assert device_ip_address == ip_address + + assert close_connection(devCtrl, DEVICE_NODE_ID) + assert close_ble(devCtrl) + + +def test_lock_ctrl(device, network): + network_ssid = network[0] + network_pass = network[1] + + devCtrl = ChipDeviceCtrl.ChipDeviceController() + + device_details = get_device_details(device) + assert device_details != None and len(device_details) != 0 + + assert check_chip_ble_devices_advertising( + devCtrl, BLE_DEVICE_NAME, device_details) + + ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int( + device_details["SetUpPINCode"]), DEVICE_NODE_ID) + assert ret != None and ret == DEVICE_NODE_ID + + ret = device.wait_for_output("Device completed Rendezvous process") + assert ret != None and len(ret) > 0 + + ret = commissioning_wifi(devCtrl, network_ssid, + network_pass, DEVICE_NODE_ID) + assert ret == 0 + + ret = resolve_device(devCtrl, DEVICE_NODE_ID) + assert ret != None and len(ret) == 2 + + err, res = send_zcl_command( + devCtrl, "OnOff Off {} 1 0".format(DEVICE_NODE_ID)) + assert err == 0 + + ret = device.wait_for_output("Unlock Action has been completed", 20) + assert ret != None and len(ret) > 0 + + err, res = send_zcl_command( + devCtrl, "OnOff On {} 1 0".format(DEVICE_NODE_ID)) + assert err == 0 + + ret = device.wait_for_output("Lock Action has been completed", 20) + assert ret != None and len(ret) > 0 + + err, res = send_zcl_command( + devCtrl, "OnOff Toggle {} 1 0".format(DEVICE_NODE_ID)) + assert err == 0 + + ret = device.wait_for_output("Unlock Action has been completed", 20) + assert ret != None and len(ret) > 0 + + assert close_connection(devCtrl, DEVICE_NODE_ID) + assert close_ble(devCtrl) + + +def test_device_info_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo() + assert status.ok() == True + assert payload.vendor_id != None and payload.product_id != None and payload.serial_number != None + + device_details = get_device_details(device) + assert device_details != None and len(device_details) != 0 + + assert int(device_details["VendorID"]) == payload.vendor_id + assert int(device_details["ProductID"]) == payload.product_id + assert int(device_details["Discriminator"] + ) == payload.pairing_info.discriminator + assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code + + +def test_device_factory_reset_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset() + assert status.ok() == True + + +def test_device_reboot_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.Reboot() + assert status == Status.UNIMPLEMENTED + + +def test_device_ota_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta() + assert status == Status.UNIMPLEMENTED + + +def test_lock_ctrl_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + + # Check locked + status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True) + assert status.ok() == True + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() == True + assert payload.locked == True + + # Check unlocked + status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False) + assert status.ok() == True + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() == True + assert payload.locked == False + + +def test_button_ctrl_rpc(device): + pw_client = PigweedClient(device, RPC_PROTOS) + + # Check button 0 (locking) + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() == True + initial_state = bool(payload.locked) + + compare_state = not initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) + assert status.ok() == True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() == True + assert payload.locked == compare_state + + compare_state = initial_state + status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True) + assert status.ok() == True + sleep(2) + status, payload = pw_client.rpcs.chip.rpc.Locking.Get() + assert status.ok() == True + assert payload.locked == compare_state diff --git a/src/test_driver/mbed/integration_tests/pigweed-app/__init__.py b/src/test_driver/mbed/integration_tests/pigweed-app/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/src/test_driver/mbed/integration_tests/pigweed-app/test_app.py b/src/test_driver/mbed/integration_tests/pigweed-app/test_app.py new file mode 100644 index 00000000000000..ad6ee4f9a142fe --- /dev/null +++ b/src/test_driver/mbed/integration_tests/pigweed-app/test_app.py @@ -0,0 +1,39 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from echo_service import echo_pb2 + +from common.pigweed_client import PigweedClient + +RPC_PROTOS = [echo_pb2] +PW_ECHO_TEST_MESSAGE = "Test_message" + + +@pytest.mark.smoketest +def test_smoke_test(device): + device.reset(duration=1) + ret = device.wait_for_output("Mbed pigweed-app example application start") + assert ret != None and len(ret) > 0 + ret = device.wait_for_output("Mbed pigweed-app example application run") + assert ret != None and len(ret) > 0 + + +def test_echo(device): + pw_client = PigweedClient(device, RPC_PROTOS) + status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo( + msg=PW_ECHO_TEST_MESSAGE) + assert status.ok() == True + assert payload.msg == PW_ECHO_TEST_MESSAGE diff --git a/src/test_driver/mbed/integration_tests/pytest.ini b/src/test_driver/mbed/integration_tests/pytest.ini new file mode 100644 index 00000000000000..6854576f00d5b4 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/pytest.ini @@ -0,0 +1,7 @@ +[pytest] +log_cli = true +log_level = INFO +log_format = %(asctime)s.%(msecs)03d %(levelname)s %(message)s +log_cli_format = %(asctime)s.%(msecs)03d %(levelname)s %(message)s +markers = + smoketest: Test with large coverage and short run time diff --git a/src/test_driver/mbed/integration_tests/shell/__init__.py b/src/test_driver/mbed/integration_tests/shell/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/src/test_driver/mbed/integration_tests/shell/test_app.py b/src/test_driver/mbed/integration_tests/shell/test_app.py new file mode 100644 index 00000000000000..6dcc37a89775a3 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/shell/test_app.py @@ -0,0 +1,239 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import re +from packaging import version +from time import sleep + +from chip.setup_payload import SetupPayload +from chip import exceptions +from chip import ChipDeviceCtrl +from common.utils import * +import logging +log = logging.getLogger(__name__) + +BLE_DEVICE_NAME = "MBED-shell" + +SHELL_COMMAND_NAME = ["echo", "log", "rand", "ping", "send", "base64", "version", + "ble", "wifi", "config", "device", "onboardingcodes", "dns", "help", "exit"] +WIFI_MODE_NAME = ["disable", "ap", "sta"] + + +def get_shell_command(response): + return [line.split()[0].strip() for line in response] + + +def parse_config_response(response): + config = {} + for param in response: + param_name = param.split(":")[0].lower() + if "discriminator" in param_name: + value = int(param.split(":")[1].strip(), 16) + elif "pincode" in param_name: + value = int(param.split(":")[1].strip()) + else: + value = int(param.split(":")[1].split()[0].strip()) + + if "productrevision" in param_name: + param_name = "productrev" + + config[param_name] = value + return config + + +def parse_boarding_codes_response(response): + codes = {} + for param in response: + codes[param.split(":")[0].lower()] = param.split()[1].strip() + return codes + + +@pytest.mark.smoketest +def test_smoke_test(device): + device.reset(duration=1) + ret = device.wait_for_output("Mbed shell example application start") + assert ret != None and len(ret) > 0 + ret = device.wait_for_output("Mbed shell example application run") + assert ret != None and len(ret) > 0 + + +def test_help_check(device): + ret = device.send(command="help", expected_output="Done") + assert ret != None and len(ret) > 1 + shell_commands = get_shell_command(ret[1:-1]) + assert set(SHELL_COMMAND_NAME) == set(shell_commands) + + +def test_echo_check(device): + ret = device.send(command="echo Hello", expected_output="Done") + assert ret != None and len(ret) > 1 + assert "Hello" in ret[-2] + + +def test_log_check(device): + ret = device.send(command="log Hello", expected_output="Done") + assert ret != None and len(ret) > 1 + assert "[INFO][CHIP]: [TOO]Hello" in ret[-2] + + +def test_rand_check(device): + ret = device.send(command="rand", expected_output="Done") + assert ret != None and len(ret) > 1 + assert ret[-2].rstrip().isdigit() + + +def test_base64_encode_decode(device): + hex_string = "1234" + ret = device.send(command="base64 encode {}".format( + hex_string), expected_output="Done") + assert ret != None and len(ret) > 1 + base64code = ret[-2] + ret = device.send(command="base64 decode {}".format( + base64code), expected_output="Done") + assert ret != None and len(ret) > 1 + assert ret[-2].rstrip() == hex_string + + +def test_version_check(device): + ret = device.send(command="version", expected_output="Done") + assert ret != None and len(ret) > 1 + assert "CHIP" in ret[-2].split()[0] + app_version = ret[-2].split()[1] + assert isinstance(version.parse(app_version), version.Version) + + +def test_ble_adv_check(device): + devCtrl = ChipDeviceCtrl.ChipDeviceController() + + ret = device.send(command="ble adv start", expected_output="Done") + assert ret != None and len(ret) > 0 + ret = device.send(command="ble adv state", expected_output="Done") + assert ret != None and len(ret) > 1 + assert "enabled" in ret[-2].split()[-1] + + sleep(1) + + assert check_chip_ble_devices_advertising(devCtrl, BLE_DEVICE_NAME) + + ret = device.send(command="ble adv stop", expected_output="Done") + assert ret != None and len(ret) > 0 + ret = device.send(command="ble adv state", expected_output="Done") + assert ret != None and len(ret) > 1 + assert "disabled" in ret[-2].split()[-1] + + sleep(1) + + assert not check_chip_ble_devices_advertising(devCtrl, BLE_DEVICE_NAME) + + +def test_device_config_check(device): + ret = device.send(command="config", expected_output="Done") + assert ret != None and len(ret) > 2 + + config = parse_config_response(ret[1:-1]) + + for param_name, value in config.items(): + ret = device.send(command="config {}".format( + param_name), expected_output="Done") + assert ret != None and len(ret) > 1 + if "discriminator" in param_name: + assert int(ret[-2].split()[0], 16) == value + else: + assert int(ret[-2].split()[0]) == value + + new_value = int(config['discriminator']) + 1 + ret = device.send(command="config discriminator {}".format( + new_value), expected_output="Done") + assert ret != None and len(ret) > 1 + assert "Setup discriminator set to: {}".format(new_value) in ret[-2] + + ret = device.send(command="config discriminator", expected_output="Done") + assert ret != None and len(ret) > 1 + assert int(ret[-2].split()[0], 16) == new_value + + +def test_on_boarding_codes(device): + ret = device.send(command="onboardingcodes", expected_output="Done") + assert ret != None and len(ret) > 2 + + boarding_codes = parse_boarding_codes_response(ret[1:-1]) + + for param, value in boarding_codes.items(): + ret = device.send(command="onboardingcodes {}".format( + param), expected_output="Done") + assert ret != None and len(ret) > 1 + assert value == ret[-2].strip() + + try: + device_details = dict(SetupPayload().ParseQrCode( + "VP:vendorpayload%{}".format(boarding_codes['qrcode'])).attributes) + except exceptions.ChipStackError as ex: + log.error(ex.msg) + assert False + assert device_details != None and len(device_details) != 0 + + try: + device_details = dict(SetupPayload().ParseManualPairingCode( + boarding_codes['manualpairingcode']).attributes) + except exceptions.ChipStackError as ex: + log.error(ex.msg) + assert False + assert device_details != None and len(device_details) != 0 + + +def test_wifi_mode(device): + ret = device.send(command="wifi mode", expected_output="Done") + assert ret != None and len(ret) > 1 + current_mode = ret[-2].strip() + assert current_mode in WIFI_MODE_NAME + + for mode in [n for n in WIFI_MODE_NAME if n != current_mode]: + print(mode) + ret = device.send(command="wifi mode {}".format( + mode), expected_output="Done") + assert ret != None and len(ret) > 0 + + ret = device.send(command="wifi mode", expected_output="Done") + assert ret != None and len(ret) > 1 + assert ret[-2].strip() == mode + + +def test_wifi_connect(device, network): + network_ssid = network[0] + network_pass = network[1] + + ret = device.send(command="wifi connect {} {}".format( + network_ssid, network_pass), expected_output="Done") + assert ret != None and len(ret) > 0 + + ret = device.wait_for_output("StationConnected", 30) + assert ret != None and len(ret) > 0 + + +def test_device_factory_reset(device): + ret = device.send(command="device factoryreset") + + sleep(1) + + ret = device.wait_for_output("Mbed shell example application start") + assert ret != None and len(ret) > 0 + ret = device.wait_for_output("Mbed shell example application run") + assert ret != None and len(ret) > 0 + + +def test_exit_check(device): + ret = device.send(command="exit", expected_output="Goodbye") + assert ret != None and len(ret) > 0 diff --git a/src/test_driver/mbed/integration_tests/test_set.in b/src/test_driver/mbed/integration_tests/test_set.in new file mode 100644 index 00000000000000..ebc8aeae33fdb7 --- /dev/null +++ b/src/test_driver/mbed/integration_tests/test_set.in @@ -0,0 +1,5 @@ +lock-app +lighting-app +pigweed-app +shell +unit-tests \ No newline at end of file diff --git a/src/test_driver/mbed/integration_tests/unit-tests/test_app.py b/src/test_driver/mbed/integration_tests/unit-tests/test_app.py new file mode 100644 index 00000000000000..04f81ae7320f4e --- /dev/null +++ b/src/test_driver/mbed/integration_tests/unit-tests/test_app.py @@ -0,0 +1,33 @@ +# Copyright (c) 2009-2021 Arm Limited +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import re + + +def test_unit_tests(device): + device.reset(duration=1) + # smoke test + ret = device.wait_for_output("Mbed unit-tests application start") + assert ret != None and len(ret) > 0 + ret = device.wait_for_output("Mbed unit-tests application run", 60) + assert ret != None and len(ret) > 0 + + ret = device.wait_for_output("CHIP test status:", 500) + # extract number of failures: + test_status = ret[-1] + result = re.findall(r'\d+', test_status) + assert len(result) == 1 + assert int(result[0]) == 0 diff --git a/src/test_driver/mbed/.gitignore b/src/test_driver/mbed/unit_tests/.gitignore similarity index 100% rename from src/test_driver/mbed/.gitignore rename to src/test_driver/mbed/unit_tests/.gitignore diff --git a/src/test_driver/mbed/CMakeLists.txt b/src/test_driver/mbed/unit_tests/CMakeLists.txt similarity index 98% rename from src/test_driver/mbed/CMakeLists.txt rename to src/test_driver/mbed/unit_tests/CMakeLists.txt index f4b921528f6317..e1fd925374ebb8 100644 --- a/src/test_driver/mbed/CMakeLists.txt +++ b/src/test_driver/mbed/unit_tests/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.19.0) -get_filename_component(CHIP_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../../.. REALPATH) +get_filename_component(CHIP_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../../../.. REALPATH) configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/config.in diff --git a/src/test_driver/mbed/README.md b/src/test_driver/mbed/unit_tests/README.md similarity index 97% rename from src/test_driver/mbed/README.md rename to src/test_driver/mbed/unit_tests/README.md index d3487dd9482d9b..925db3e6321704 100644 --- a/src/test_driver/mbed/README.md +++ b/src/test_driver/mbed/unit_tests/README.md @@ -51,8 +51,8 @@ build target and its requirements. > devcontainer is the recommended way to interact with Arm Mbed-OS port of the > Matter Project.** > -> **Please read this [README.md](../../..//docs/VSCODE_DEVELOPMENT.md) for more -> information about using VSCode in container.** +> **Please read this [README.md](../../../../docs/VSCODE_DEVELOPMENT.md) for +> more information about using VSCode in container.** To initialize the development environment, download all registered sub-modules and activate the environment: @@ -183,7 +183,7 @@ devices: - More details and guidelines about porting new hardware into the Matter project with Mbed OS can be found in - [MbedNewTarget](../../../docs/guides/mbedos_add_new_target.md) + [MbedNewTarget](../../../../docs/guides/mbedos_add_new_target.md) - Some useful information about HW platform specific settings can be found in `test_driver/mbed/mbed_app.json`. Information about this file syntax and its meaning in mbed-os project can be diff --git a/src/test_driver/mbed/config.in b/src/test_driver/mbed/unit_tests/config.in similarity index 100% rename from src/test_driver/mbed/config.in rename to src/test_driver/mbed/unit_tests/config.in diff --git a/src/test_driver/mbed/main/include/CHIPProjectConfig.h b/src/test_driver/mbed/unit_tests/main/include/CHIPProjectConfig.h similarity index 100% rename from src/test_driver/mbed/main/include/CHIPProjectConfig.h rename to src/test_driver/mbed/unit_tests/main/include/CHIPProjectConfig.h diff --git a/src/test_driver/mbed/main/main.cpp b/src/test_driver/mbed/unit_tests/main/main.cpp similarity index 100% rename from src/test_driver/mbed/main/main.cpp rename to src/test_driver/mbed/unit_tests/main/main.cpp diff --git a/src/test_driver/mbed/mbed_app.json b/src/test_driver/mbed/unit_tests/mbed_app.json similarity index 100% rename from src/test_driver/mbed/mbed_app.json rename to src/test_driver/mbed/unit_tests/mbed_app.json