Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TC_MCORE_FS_1_3 to align with the Test Spec #35270

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 19 additions & 41 deletions src/python_testing/TC_MCORE_FS_1_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_app_path:<path_to_app>
# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_no_uid_app_path:<path_to_app>

import logging
import os
Expand All @@ -36,14 +36,10 @@ class TC_MCORE_FS_1_3(MatterBaseTest):
@async_test_body
yufengwangca marked this conversation as resolved.
Show resolved Hide resolved
async def setup_class(self):
super().setup_class()
self.device_for_th_eco_nodeid = 1111
self.device_for_th_eco_kvs = None
self.device_for_th_eco_port = 5543
self.app_process_for_th_eco = None

self.device_for_dut_eco_nodeid = 1112
self.device_for_dut_eco_kvs = None
self.device_for_dut_eco_port = 5544

self.th_server_nodeid = 1111
self.th_server_kvs = None
self.th_server_port = 5543
self.app_process_for_dut_eco = None

# Create a second controller on a new fabric to communicate to the server
Expand All @@ -57,20 +53,14 @@ def teardown_class(self):
logging.warning("Stopping app with SIGTERM")
self.app_process_for_dut_eco.send_signal(signal.SIGTERM.value)
self.app_process_for_dut_eco.wait()
if self.app_process_for_th_eco is not None:
logging.warning("Stopping app with SIGTERM")
self.app_process_for_th_eco.send_signal(signal.SIGTERM.value)
self.app_process_for_th_eco.wait()

os.remove(self.device_for_dut_eco_kvs)
if self.device_for_th_eco_kvs is not None:
os.remove(self.device_for_th_eco_kvs)
os.remove(self.th_server_kvs)
super().teardown_class()

async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for_th, device_info):
yufengwangca marked this conversation as resolved.
Show resolved Hide resolved
app = self.user_params.get("th_server_app_path", None)
app = self.user_params.get("th_server_no_uid_app_path", None)
if not app:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_no_uid_app_path:<path_to_app>')

if not os.path.exists(app):
asserts.fail(f'The path {app} does not exist')
Expand All @@ -95,33 +85,33 @@ async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for
logging.info("Commissioning device for DUT ecosystem onto TH for managing")

def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]:
steps = [TestStep(1, "DUT_FSA commissions TH_SED_DUT to DUT_FSAs fabric and generates a UniqueID", is_commissioning=True),
TestStep(2, "TH_FSA commissions TH_SED_TH onto TH_FSAs fabric and generates a UniqueID."),
TestStep(3, "Follow manufacturer provided instructions to enable DUT_FSA to synchronize TH_SED_TH onto DUT_FSAs fabric."),
TestStep(4, "DUT_FSA synchronizes TH_SED_TH onto DUT_FSAs fabric and copies the UniqueID presented by TH_FSAs Bridged Device Basic Information Cluster.")]
steps = [TestStep(1, "TH commissions TH_SERVER to TH’s fabric.", is_commissioning=True),
TestStep(2, "DUT_FSA commissions TH_SERVER to DUT_FSA’s fabric and generates a UniqueID.")]
return steps

@async_test_body
async def test_TC_MCORE_FS_1_3(self):
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
self.print_step(0, "Commissioning DUT to TH, already done")

self.step(1)
# These steps are not explicitly in step 1, but they help identify the dynamically added endpoint in step 1.
root_node_endpoint = 0
root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
set_of_endpoints_before_adding_device = set(root_part_list)
logging.info(f"Set of endpoints before adding the device: {set_of_endpoints_before_adding_device}")

kvs = f'kvs_{str(uuid.uuid4())}'
device_info = "for DUT ecosystem"
await self.create_device_and_commission_to_th_fabric(kvs, self.device_for_dut_eco_port, self.device_for_dut_eco_nodeid, device_info)
device_info = "for TH ecosystem"
await self.create_device_and_commission_to_th_fabric(kvs, self.th_server_port, self.th_server_nodeid, device_info)

self.device_for_dut_eco_kvs = kvs
read_result = await self.TH_server_controller.ReadAttribute(self.device_for_dut_eco_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)])
self.th_server_kvs = kvs
read_result = await self.TH_server_controller.ReadAttribute(self.th_server_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)])
result = read_result[root_node_endpoint][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.UniqueID]
asserts.assert_true(type_matches(result, Clusters.Attribute.ValueDecodeFailure), "We were expecting a value decode failure")
asserts.assert_equal(result.Reason.status, Status.UnsupportedAttribute, "Incorrect error returned from reading UniqueID")

params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.device_for_dut_eco_nodeid)
self.step(2)
params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.th_server_nodeid)

self.wait_for_user_input(
prompt_msg=f"Using the DUT vendor's provided interface, commission the device using the following parameters:\n"
Expand All @@ -134,6 +124,7 @@ async def test_TC_MCORE_FS_1_3(self):

root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
set_of_endpoints_after_adding_device = set(root_part_list)
logging.info(f"Set of endpoints after adding the device: {set_of_endpoints_after_adding_device}")

asserts.assert_true(set_of_endpoints_after_adding_device.issuperset(
set_of_endpoints_before_adding_device), "Expected only new endpoints to be added")
Expand All @@ -145,19 +136,6 @@ async def test_TC_MCORE_FS_1_3(self):
asserts.assert_true(type_matches(th_sed_dut_unique_id, str), "UniqueID should be a string")
asserts.assert_true(th_sed_dut_unique_id, "UniqueID should not be an empty string")

self.step(2)
kvs = f'kvs_{str(uuid.uuid4())}'
device_info = "for TH_FSA ecosystem"
await self.create_device_and_commission_to_th_fabric(kvs, self.device_for_th_eco_port, self.device_for_th_eco_nodeid, device_info)
self.device_for_th_eco_kvs = kvs
# TODO(https://github.com/CHIP-Specifications/chip-test-plans/issues/4375) During setup we need to create the TH_FSA device
# where we would commission device created in create_device_and_commission_to_th_fabric to be commissioned into TH_FSA.

# TODO(https://github.com/CHIP-Specifications/chip-test-plans/issues/4375) Because we cannot create a TH_FSA and there is
# no way to mock it the following 2 test steps are skipped for now.
self.skip_step(3)
self.skip_step(4)


if __name__ == "__main__":
default_matter_test_main()
Loading