Skip to content

Commit

Permalink
Initial Test Script for TC_MCORE_FS_1_1 (#34634)
Browse files Browse the repository at this point in the history
* Initial Test Script for FS Top-Level

* Resolve linter issues.

* Update TC_MCORE_FS_1_1 to be testable

* Restyled by autopep8

* Address PR comments

* Restyled by isort

---------

Co-authored-by: Terence Hampson <[email protected]>
Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
3 people authored and pull[bot] committed Oct 18, 2024
1 parent cfec057 commit 09c2dc0
Show file tree
Hide file tree
Showing 2 changed files with 306 additions and 0 deletions.
144 changes: 144 additions & 0 deletions src/python_testing/TC_MCORE_FS_1_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This test requires a TH_SERVER application. Please specify with --string-arg th_server_app_path:<path_to_app>

import logging
import os
import random
import signal
import subprocess
import time
import uuid

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from mobly import asserts


class TC_MCORE_FS_1_1(MatterBaseTest):

@async_test_body
async def setup_class(self):
super().setup_class()
# TODO: confirm whether we can open processes like this on the TH
app = self.matter_test_config.user_params.get("th_server_app_path", None)
if not app:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')

self.kvs = f'kvs_{str(uuid.uuid4())}'
self.port = 5543
discriminator = random.randint(0, 4095)
passcode = 20202021
app_args = f'--secured-device-port {self.port} --discriminator {discriminator} --passcode {passcode} --KVS {self.kvs}'
cmd = f'{app} {app_args}'
# TODO: Determine if we want these logs cooked or pushed to somewhere else
logging.info("Starting application to acts mock a server portion of TH_FSA")
self.app_process = subprocess.Popen(cmd, bufsize=0, shell=True)
logging.info("Started application to acts mock a server portion of TH_FSA")
time.sleep(3)

logging.info("Commissioning from separate fabric")
# Create a second controller on a new fabric to communicate to the server
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
paa_path = str(self.matter_test_config.paa_trust_store_path)
self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path)
self.server_nodeid = 1111
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.server_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)
logging.info("Commissioning TH_SERVER complete")

def teardown_class(self):
logging.warning("Stopping app with SIGTERM")
self.app_process.send_signal(signal.SIGTERM.value)
self.app_process.wait()

os.remove(self.kvs)
super().teardown_class()

def steps_TC_MCORE_FS_1_1(self) -> list[TestStep]:
steps = [TestStep(1, "Enable Fabric Synchronization on DUT_FSA using the manufacturer specified mechanism.", is_commissioning=True),
TestStep(2, "Commission DUT_FSA onto TH_FSA fabric."),
TestStep(3, "Reverse Commision Commission TH_FSAs onto DUT_FSA fabric."),
TestStep("3a", "TH_FSA sends RequestCommissioningApproval"),
TestStep("3b", "TH_FSA sends CommissionNode"),
TestStep("3c", "DUT_FSA commissions TH_FSA")]
return steps

@async_test_body
async def test_TC_MCORE_FS_1_1(self):
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
# TODO this value should either be determined or passed in from command line
dut_commissioning_control_endpoint = 0
self.step(1)
self.step(2)
self.step(3)
th_fsa_server_fabrics = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
th_fsa_server_vid = await self.read_single_attribute_check_success(cluster=Clusters.BasicInformation, attribute=Clusters.BasicInformation.Attributes.VendorID, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
th_fsa_server_pid = await self.read_single_attribute_check_success(cluster=Clusters.BasicInformation, attribute=Clusters.BasicInformation.Attributes.ProductID, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)

event_path = [(dut_commissioning_control_endpoint, Clusters.CommissionerControl.Events.CommissioningRequestResult, 1)]
events = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path)

self.step("3a")
good_request_id = 0x1234567812345678
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(
requestId=good_request_id, vendorId=th_fsa_server_vid, productId=th_fsa_server_pid, label="Test Ecosystem")
await self.send_single_cmd(cmd, endpoint=dut_commissioning_control_endpoint)

if not self.is_ci:
self.wait_for_use_input("Approve Commissioning approval request on DUT using manufacturer specified mechanism")

if not events:
new_event = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path)
else:
event_nums = [e.Header.EventNumber for e in events]
new_event = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path, eventNumberFilter=max(event_nums)+1)

asserts.assert_equal(len(new_event), 1, "Unexpected event list len")
asserts.assert_equal(new_event[0].Data.statusCode, 0, "Unexpected status code")
asserts.assert_equal(new_event[0].Data.clientNodeId,
self.matter_test_config.controller_node_id, "Unexpected client node id")
asserts.assert_equal(new_event[0].Data.requestId, good_request_id, "Unexpected request ID")

self.step("3b")
cmd = Clusters.CommissionerControl.Commands.CommissionNode(requestId=good_request_id, responseTimeoutSeconds=30)
resp = await self.send_single_cmd(cmd, endpoint=dut_commissioning_control_endpoint)
asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow,
"Incorrect response type")

# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60,
PAKEPasscodeVerifier=resp.PAKEPasscodeVerifier,
discriminator=resp.discriminator,
iterations=resp.iterations, salt=resp.salt)
await self.send_single_cmd(cmd, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0, timedRequestTimeoutMs=5000)

self.step("3c")
if not self.is_ci:
time.sleep(30)

th_fsa_server_fabrics_new = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
# TODO: this should be mocked too.
if not self.is_ci:
asserts.assert_equal(len(th_fsa_server_fabrics) + 1, len(th_fsa_server_fabrics_new),
"Unexpected number of fabrics on TH_SERVER")


if __name__ == "__main__":
default_matter_test_main()
162 changes: 162 additions & 0 deletions src/python_testing/test_testing/test_TC_MCORE_FS_1_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env -S python3 -B
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import os
import pathlib
import sys
import typing

import chip.clusters as Clusters
import click
from chip import ChipDeviceCtrl
from chip.clusters import Attribute
from chip.interaction_model import InteractionModelError, Status
from MockTestRunner import AsyncMock, MockTestRunner

try:
from matter_testing_support import MatterTestConfig, get_default_paa_trust_store, run_tests_no_exit
except ImportError:
sys.path.append(os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')))
from matter_testing_support import MatterTestConfig, get_default_paa_trust_store, run_tests_no_exit

invoke_call_count = 0
event_call_count = 0


def dynamic_invoke_return(*args, **argv):
''' Returns the response to a mocked SendCommand call.
'''
global invoke_call_count
invoke_call_count += 1

# passcode 20202024
reverse_open = Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow(commissioningTimeout=30,
PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==",
discriminator=2222, iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8')))

print(f'invoke call {invoke_call_count}')
if invoke_call_count == 1: # Commission node with no prior request, return failure - step 5
return None
elif invoke_call_count == 2: # Commission node over pase - return unsupported access - step 7
return reverse_open
else:
raise InteractionModelError(Status.Failure)


def dynamic_event_return(*args, **argv):
''' Returns the response to a mocked ReadEvent call.
'''
global event_call_count
event_call_count += 1

if event_call_count == 1: # reading events, start empty - no events
return []
elif event_call_count == 2: # read event with filter - expect empty
header = Attribute.EventHeader(EndpointId=0, ClusterId=Clusters.CommissionerControl.id,
EventId=Clusters.CommissionerControl.Events.CommissioningRequestResult.event_id, EventNumber=1)
data = Clusters.CommissionerControl.Events.CommissioningRequestResult(
requestId=0x1234567812345678, clientNodeId=112233, statusCode=0)
result = Attribute.EventReadResult(Header=header, Status=Status.Success, Data=data)
return [result]
else:
raise InteractionModelError(Status.Failure)


def wildcard() -> Attribute.AsyncReadTransaction.ReadResponse:
''' Returns the response to a wildcard read.
For this test, we just need descriptors and a few attributes
Tree
EP1 (Aggregator): Descriptor
- EP2 (Bridged Node): Descriptor, Bridged Device Basic Information, Ecosystem Information
'''
cc = Clusters.CommissionerControl
ei = Clusters.EcosystemInformation
desc = Clusters.Descriptor
bdbi = Clusters.BridgedDeviceBasicInformation

# EP1 is aggregator device type with a commissioner control cluster
# children - EP2 type bridged node endpoint, ecosystem information, bridged device basic information. Should also have and admin commissioning, but I don't need it for this test.
desc_ep1 = {desc.Attributes.PartsList: [2], desc.Attributes.ServerList: [
cc.id], desc.Attributes.DeviceTypeList: [desc.Structs.DeviceTypeStruct(deviceType=0x000E, revision=2)]}
desc_ep2 = {desc.Attributes.ServerList: [bdbi.id, ei.id], desc.Attributes.DeviceTypeList: [
desc.Structs.DeviceTypeStruct(deviceType=0x0013, revision=3)]}

# I'm not filling anything in here, because I don't care. I just care that the cluster exists.
ei_attrs = {ei.Attributes.AttributeList: [ei.Attributes.DeviceDirectory.attribute_id,
ei.Attributes.LocationDirectory.attribute_id], ei.Attributes.DeviceDirectory: [], ei.Attributes.LocationDirectory: []}

# This cluster just needs to exist, so I'm just going to throw on the mandatory items for now.
bdbi_attrs = {bdbi.Attributes.AttributeList: [bdbi.Attributes.Reachable.attribute_id,
bdbi.Attributes.UniqueID.attribute_id], bdbi.Attributes.Reachable: True, bdbi.Attributes.UniqueID: 'something'}

cc_attrs = {cc.Attributes.AttributeList: [cc.Attributes.SupportedDeviceCategories], cc.Attributes.AcceptedCommandList: [cc.Commands.RequestCommissioningApproval, cc.Commands.CommissionNode],
cc.Attributes.GeneratedCommandList: [cc.Commands.RequestCommissioningApproval], cc.Attributes.SupportedDeviceCategories: 1}

resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {})
resp.attributes = {1: {desc: desc_ep1, cc: cc_attrs}, 2: {desc: desc_ep2, ei: ei_attrs, bdbi: bdbi_attrs}}
return resp


class MyMock(MockTestRunner):
def run_test_with_mock(self, dynamic_invoke_return: typing.Callable, dynamic_event_return: typing.Callable, read_cache: Attribute.AsyncReadTransaction.ReadResponse, hooks=None):
''' Run the test using the Mocked versions of Read, SendCommand, OpenCommissioningWindow, FindOrEstablishPASESession and ReadEvent
dynamic_invoke_return: Callable function that returns the result of a SendCommand call
Function should return one of
- command response for commands with responses
- None for commands with success results
- raise InteractionModelError for error results
dynamic_event_return: Callable function that returns the result of a ReadEvent call
Function should return one of
- list of EventReadResult for successful reads
- raise InteractionModelError for error results
read_cache : Response to a Read call. For this test, this will be the wildcard read of all teh attributes
hooks : Test harness hook object if desired.
'''
self.default_controller.Read = AsyncMock(return_value=read_cache)
self.default_controller.SendCommand = AsyncMock(return_value=None, side_effect=dynamic_invoke_return)
# It doesn't actually matter what we return here because I'm going to catch the next pase session connection anyway
params = ChipDeviceCtrl.CommissioningParameters(setupPinCode=0, setupManualCode='', setupQRCode='')
self.default_controller.OpenCommissioningWindow = AsyncMock(return_value=params)
self.default_controller.FindOrEstablishPASESession = AsyncMock(return_value=None)
self.default_controller.ReadEvent = AsyncMock(return_value=[], side_effect=dynamic_event_return)

return run_tests_no_exit(self.test_class, self.config, hooks, self.default_controller, self.stack)


@click.command()
@click.argument('th_server_app', type=click.Path(exists=True))
def main(th_server_app: str):
root = os.path.abspath(os.path.join(pathlib.Path(__file__).resolve().parent, '..', '..', '..'))
print(f'root = {root}')
paa_path = get_default_paa_trust_store(root)
print(f'paa = {paa_path}')

pics = {"PICS_SDK_CI_ONLY": True}
test_runner = MyMock('TC_MCORE_FS_1_1', 'TC_MCORE_FS_1_1', 'test_TC_MCORE_FS_1_1', 1, paa_trust_store_path=paa_path, pics=pics)
config = MatterTestConfig()
config.user_params = {'th_server_app_path': th_server_app}
test_runner.set_test_config(config)

test_runner.run_test_with_mock(dynamic_invoke_return, dynamic_event_return, wildcard())
test_runner.Shutdown()


if __name__ == "__main__":
sys.exit(main())

0 comments on commit 09c2dc0

Please sign in to comment.