Skip to content

Commit

Permalink
Update TC_MCORE_FS_1_3 to align with the Test Spec (#35270)
Browse files Browse the repository at this point in the history
* Update TC_MCORE_FS_1_3 to align with the test spec

* Instead of th_server_app_path use th_server_no_uid_app_path
  • Loading branch information
yufengwangca authored and pull[bot] committed Dec 4, 2024
1 parent ad23581 commit 1062808
Showing 1 changed file with 19 additions and 41 deletions.
60 changes: 19 additions & 41 deletions src/python_testing/TC_MCORE_FS_1_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_app_path:<path_to_app>
# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_no_uid_app_path:<path_to_app>

import logging
import os
Expand All @@ -36,14 +36,10 @@ class TC_MCORE_FS_1_3(MatterBaseTest):
@async_test_body
async def setup_class(self):
super().setup_class()
self.device_for_th_eco_nodeid = 1111
self.device_for_th_eco_kvs = None
self.device_for_th_eco_port = 5543
self.app_process_for_th_eco = None

self.device_for_dut_eco_nodeid = 1112
self.device_for_dut_eco_kvs = None
self.device_for_dut_eco_port = 5544

self.th_server_nodeid = 1111
self.th_server_kvs = None
self.th_server_port = 5543
self.app_process_for_dut_eco = None

# Create a second controller on a new fabric to communicate to the server
Expand All @@ -57,20 +53,14 @@ def teardown_class(self):
logging.warning("Stopping app with SIGTERM")
self.app_process_for_dut_eco.send_signal(signal.SIGTERM.value)
self.app_process_for_dut_eco.wait()
if self.app_process_for_th_eco is not None:
logging.warning("Stopping app with SIGTERM")
self.app_process_for_th_eco.send_signal(signal.SIGTERM.value)
self.app_process_for_th_eco.wait()

os.remove(self.device_for_dut_eco_kvs)
if self.device_for_th_eco_kvs is not None:
os.remove(self.device_for_th_eco_kvs)
os.remove(self.th_server_kvs)
super().teardown_class()

async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for_th, device_info):
app = self.user_params.get("th_server_app_path", None)
app = self.user_params.get("th_server_no_uid_app_path", None)
if not app:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>')

if not os.path.exists(app):
asserts.fail(f'The path {app} does not exist')
Expand All @@ -95,33 +85,33 @@ async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for
logging.info("Commissioning device for DUT ecosystem onto TH for managing")

def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]:
steps = [TestStep(1, "DUT_FSA commissions TH_SED_DUT to DUT_FSAs fabric and generates a UniqueID", is_commissioning=True),
TestStep(2, "TH_FSA commissions TH_SED_TH onto TH_FSAs fabric and generates a UniqueID."),
TestStep(3, "Follow manufacturer provided instructions to enable DUT_FSA to synchronize TH_SED_TH onto DUT_FSAs fabric."),
TestStep(4, "DUT_FSA synchronizes TH_SED_TH onto DUT_FSAs fabric and copies the UniqueID presented by TH_FSAs Bridged Device Basic Information Cluster.")]
steps = [TestStep(1, "TH commissions TH_SERVER to TH’s fabric.", is_commissioning=True),
TestStep(2, "DUT_FSA commissions TH_SERVER to DUT_FSA’s fabric and generates a UniqueID.")]
return steps

@async_test_body
async def test_TC_MCORE_FS_1_3(self):
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
self.print_step(0, "Commissioning DUT to TH, already done")

self.step(1)
# These steps are not explicitly in step 1, but they help identify the dynamically added endpoint in step 1.
root_node_endpoint = 0
root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
set_of_endpoints_before_adding_device = set(root_part_list)
logging.info(f"Set of endpoints before adding the device: {set_of_endpoints_before_adding_device}")

kvs = f'kvs_{str(uuid.uuid4())}'
device_info = "for DUT ecosystem"
await self.create_device_and_commission_to_th_fabric(kvs, self.device_for_dut_eco_port, self.device_for_dut_eco_nodeid, device_info)
device_info = "for TH ecosystem"
await self.create_device_and_commission_to_th_fabric(kvs, self.th_server_port, self.th_server_nodeid, device_info)

self.device_for_dut_eco_kvs = kvs
read_result = await self.TH_server_controller.ReadAttribute(self.device_for_dut_eco_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)])
self.th_server_kvs = kvs
read_result = await self.TH_server_controller.ReadAttribute(self.th_server_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)])
result = read_result[root_node_endpoint][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.UniqueID]
asserts.assert_true(type_matches(result, Clusters.Attribute.ValueDecodeFailure), "We were expecting a value decode failure")
asserts.assert_equal(result.Reason.status, Status.UnsupportedAttribute, "Incorrect error returned from reading UniqueID")

params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.device_for_dut_eco_nodeid)
self.step(2)
params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.th_server_nodeid)

self.wait_for_user_input(
prompt_msg=f"Using the DUT vendor's provided interface, commission the device using the following parameters:\n"
Expand All @@ -134,6 +124,7 @@ async def test_TC_MCORE_FS_1_3(self):

root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
set_of_endpoints_after_adding_device = set(root_part_list)
logging.info(f"Set of endpoints after adding the device: {set_of_endpoints_after_adding_device}")

asserts.assert_true(set_of_endpoints_after_adding_device.issuperset(
set_of_endpoints_before_adding_device), "Expected only new endpoints to be added")
Expand All @@ -145,19 +136,6 @@ async def test_TC_MCORE_FS_1_3(self):
asserts.assert_true(type_matches(th_sed_dut_unique_id, str), "UniqueID should be a string")
asserts.assert_true(th_sed_dut_unique_id, "UniqueID should not be an empty string")

self.step(2)
kvs = f'kvs_{str(uuid.uuid4())}'
device_info = "for TH_FSA ecosystem"
await self.create_device_and_commission_to_th_fabric(kvs, self.device_for_th_eco_port, self.device_for_th_eco_nodeid, device_info)
self.device_for_th_eco_kvs = kvs
# TODO(https://github.com/CHIP-Specifications/chip-test-plans/issues/4375) During setup we need to create the TH_FSA device
# where we would commission device created in create_device_and_commission_to_th_fabric to be commissioned into TH_FSA.

# TODO(https://github.com/CHIP-Specifications/chip-test-plans/issues/4375) Because we cannot create a TH_FSA and there is
# no way to mock it the following 2 test steps are skipped for now.
self.skip_step(3)
self.skip_step(4)


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 1062808

Please sign in to comment.