From f106cea7eaede220b7002798689481c4505ee046 Mon Sep 17 00:00:00 2001 From: Jake Ororke Date: Wed, 10 Jul 2024 13:56:26 -0700 Subject: [PATCH] Possible fix for matter-test-scripts issue #227: - Removed PICS checks and replaced with attribute and command checks from endpoint during tests. --- src/python_testing/TC_OpstateCommon.py | 250 ++++++++++++++++--------- 1 file changed, 164 insertions(+), 86 deletions(-) diff --git a/src/python_testing/TC_OpstateCommon.py b/src/python_testing/TC_OpstateCommon.py index 648eea59f2bb3c..29f1f76ceb18d0 100644 --- a/src/python_testing/TC_OpstateCommon.py +++ b/src/python_testing/TC_OpstateCommon.py @@ -213,9 +213,29 @@ def STEPS_TC_OPSTATE_BASE_1_1(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature_map=0): cluster = self.test_info.cluster + + # Gathering Available Attributes and associated ids attributes = cluster.Attributes - events = cluster.Events + OPSTATE_attr_list = attributes.AttributeList + attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_attr_list) + countdown_time_attr_id = attributes.CountdownTime.attribute_id + oprtnlstate_attr_id = attributes.OperationalState.attribute_id + + # Gathering Available Commands and associated ids commands = cluster.Commands + OPSTATE_accptcmd_list = attributes.AcceptedCommandList + OPSTATE_gencmd_list = attributes.GeneratedCommandList + accepted_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_accptcmd_list) + generated_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_gencmd_list) + pause_cmd_id = commands.Pause.command_id + start_cmd_id = commands.Start.command_id + stop_cmd_id = commands.Stop.command_id + resume_cmd_id = commands.Resume.command_id + ocr_cmd_id = commands.OperationalCommandResponse.command_id + + # Gathers Event id for OperationCompletion event, if available then returns true + events = cluster.Events + Oprtcmplt_event = bool(events.OperationCompletion.event_id) self.init_test() @@ -249,7 +269,7 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature attributes.ClusterRevision.attribute_id ] - if self.check_pics(f"{self.test_info.pics_code}.S.A0002"): + if countdown_time_attr_id in attribute_list: expected_value.append(attributes.CountdownTime.attribute_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -263,7 +283,7 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature events.OperationalError.event_id, ] - if self.check_pics(f"{self.test_info.pics_code}.S.E01"): + if Oprtcmplt_event: expected_value.append(events.OperationCompletion.event_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -274,19 +294,19 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature self.step(6) expected_value = [] - if (self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp")): + if ((pause_cmd_id in accepted_cmd_list) or + (resume_cmd_id in accepted_cmd_list)): expected_value.append(commands.Pause.command_id) - if (self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp")): + if ((stop_cmd_id in accepted_cmd_list) or + (start_cmd_id in accepted_cmd_list)): expected_value.append(commands.Stop.command_id) - if self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp"): + if start_cmd_id in accepted_cmd_list: expected_value.append(commands.Start.command_id) - if (self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp")): + if ((resume_cmd_id in accepted_cmd_list) or + (pause_cmd_id in accepted_cmd_list)): expected_value.append(commands.Resume.command_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -297,10 +317,10 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature self.step(7) expected_value = [] - if (self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp")): + if ((pause_cmd_id in accepted_cmd_list) or + (stop_cmd_id in accepted_cmd_list) or + (start_cmd_id in accepted_cmd_list) or + (resume_cmd_id in accepted_cmd_list)): expected_value.append(commands.OperationalCommandResponse.command_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -337,9 +357,19 @@ def STEPS_TC_OPSTATE_BASE_2_1(self) -> list[TestStep]: ] return steps - async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): + async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): cluster = self.test_info.cluster + + # Gathering Available Attributes and associated ids attributes = cluster.Attributes + OPSTATE_attr_list = attributes.AttributeList + attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_attr_list) + phaselist_attr_id = attributes.PhaseList.attribute_id + currentphase_attr_id = attributes.CurrentPhase.attribute_id + countdown_time_attr_id = attributes.CountdownTime.attribute_id + oprtnlstate_list_attr_id = attributes.OperationalStateList.attribute_id + oprtnlstate_attr_id = attributes.OperationalState.attribute_id + oprtnlerror_attr_id = attributes.OperationalError.attribute_id self.init_test() @@ -348,7 +378,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 2: TH reads from the DUT the PhaseList attribute self.step(2) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0000")): + if phaselist_attr_id in attribute_list: phase_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.PhaseList) if phase_list is not NullValue: @@ -358,7 +388,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 3: TH reads from the DUT the CurrentPhase attribute self.step(3) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0001")): + if currentphase_attr_id in attribute_list: current_phase = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CurrentPhase) if (phase_list == NullValue) or (not phase_list): @@ -369,7 +399,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 4: TH reads from the DUT the CountdownTime attribute self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if countdown_time_attr_id in attribute_list: countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if countdown_time is not NullValue: @@ -378,7 +408,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 5: TH reads from the DUT the OperationalStateList attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0003")): + if oprtnlstate_list_attr_id in attribute_list: operational_state_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalStateList) defined_states = [state.value for state in cluster.Enums.OperationalStateEnum @@ -399,7 +429,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 6: TH reads from the DUT the OperationalState attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: operational_state = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalState) in_range = (0x80 <= operational_state <= 0xBF) @@ -465,7 +495,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 7: TH reads from the DUT the OperationalError attribute self.step(7) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0005")): + if oprtnlerror_attr_id in attribute_list: operational_error = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalError) # Defined Errors @@ -568,8 +598,29 @@ def STEPS_TC_OPSTATE_BASE_2_2(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): cluster = self.test_info.cluster + + # Gathering Available Attributes and associated ids attributes = cluster.Attributes + OPSTATE_attr_list = attributes.AttributeList + attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_attr_list) + phaselist_attr_id = attributes.PhaseList.attribute_id + currentphase_attr_id = attributes.CurrentPhase.attribute_id + countdown_time_attr_id = attributes.CountdownTime.attribute_id + oprtnlstate_list_attr_id = attributes.OperationalStateList.attribute_id + oprtnlstate_attr_id = attributes.OperationalState.attribute_id + oprtnlerror_attr_id = attributes.OperationalError.attribute_id + + # Gathering Available Commands and associated ids commands = cluster.Commands + OPSTATE_accptcmd_list = attributes.AcceptedCommandList + OPSTATE_gencmd_list = attributes.GeneratedCommandList + accepted_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_accptcmd_list) + generated_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_gencmd_list) + pause_cmd_id = commands.Pause.command_id # 0 + start_cmd_id = commands.Start.command_id # 2 + stop_cmd_id = commands.Stop.command_id # 1 + resume_cmd_id = commands.Resume.command_id # 3 + ocr_cmd_id = commands.OperationalCommandResponse.command_id # 4 self.init_test() @@ -598,7 +649,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 3: TH reads from the DUT the OperationalStateList attribute self.step(3) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0003")): + if oprtnlstate_list_attr_id in attribute_list: operational_state_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalStateList) @@ -613,22 +664,21 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 4: TH sends Start command to the DUT self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((start_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 5: TH reads from the DUT the OperationalState attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 6: TH reads from the DUT the OperationalError attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0005")): + if oprtnlerror_attr_id in attribute_list: await self.read_and_expect_property_value(endpoint=endpoint, attribute=attributes.OperationalError, attr_property="errorStateID", @@ -636,7 +686,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 7: TH reads from the DUT the CountdownTime attribute self.step(7) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if countdown_time_attr_id in attribute_list: initial_countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if initial_countdown_time is not NullValue: @@ -645,7 +695,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 8: TH reads from the DUT the PhaseList attribute self.step(8) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0000")): + if phaselist_attr_id in attribute_list: phase_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.PhaseList) phase_list_len = 0 @@ -656,7 +706,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 9: TH reads from the DUT the CurrentPhase attribute self.step(9) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0001")): + if currentphase_attr_id in attribute_list: current_phase = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CurrentPhase) if (phase_list == NullValue) or (not phase_list): @@ -669,12 +719,12 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 10: TH waits for {PIXIT.WAITTIME.COUNTDOWN} self.step(10) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if countdown_time_attr_id in attribute_list: time.sleep(wait_time) # STEP 11: TH reads from the DUT the CountdownTime attribute self.step(11) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if countdown_time_attr_id in attribute_list: countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) @@ -684,31 +734,28 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 12: TH sends Start command to the DUT self.step(12) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((start_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 13: TH sends Stop command to the DUT self.step(13) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((stop_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 14: TH reads from the DUT the OperationalState attribute self.step(14) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kStopped) # STEP 15: TH sends Stop command to the DUT self.step(15) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((stop_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) @@ -723,9 +770,9 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 17: TH sends Start command to the DUT self.step(17) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_START_OR_RESUME") and - self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if self.pics_guard((self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_START_OR_RESUME")) and + ((start_cmd_id in accepted_cmd_list) and + (ocr_cmd_id in generated_cmd_list))): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume) @@ -757,8 +804,24 @@ def STEPS_TC_OPSTATE_BASE_2_3(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): cluster = self.test_info.cluster + + # Gathering Available Attributes and associated ids attributes = cluster.Attributes + OPSTATE_attr_list = attributes.AttributeList + attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_attr_list) + countdown_time_attr_id = attributes.CountdownTime.attribute_id + oprtnlstate_list_attr_id = attributes.OperationalStateList.attribute_id + oprtnlstate_attr_id = attributes.OperationalState.attribute_id + + # Gathering Available Commands and associated ids commands = cluster.Commands + OPSTATE_accptcmd_list = attributes.AcceptedCommandList + OPSTATE_gencmd_list = attributes.GeneratedCommandList + accepted_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_accptcmd_list) + generated_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_gencmd_list) + pause_cmd_id = commands.Pause.command_id + resume_cmd_id = commands.Resume.command_id # 3 + ocr_cmd_id = commands.OperationalCommandResponse.command_id # 4 self.init_test() @@ -787,7 +850,7 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 3: TH reads from the DUT the OperationalStateList attribute self.step(3) - if self.pics_guard(self.check_pics((f"{self.test_info.pics_code}.S.A0003"))): + if oprtnlstate_attr_id in attribute_list: operational_state_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalStateList) @@ -802,22 +865,21 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 4: TH sends Pause command to the DUT self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((pause_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 5: TH reads from the DUT the OperationalState attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kPaused) # STEP 6: TH reads from the DUT the CountdownTime attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if countdown_time_attr_id in attribute_list: initial_countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if initial_countdown_time is not NullValue: @@ -830,7 +892,7 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 8: TH reads from the DUT the CountdownTime attribute self.step(8) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if countdown_time_attr_id in attribute_list: countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) @@ -840,31 +902,28 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 9: TH sends Pause command to the DUT self.step(9) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((pause_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 10: TH sends Resume command to the DUT self.step(10) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((resume_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 11: TH reads from the DUT the OperationalState attribute self.step(11) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 12: TH sends Resume command to the DUT self.step(12) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((resume_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) @@ -878,16 +937,14 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 14: TH sends Pause command to the DUT self.step(14) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((pause_cmd_id in attribute_list) and (ocr_cmd_id in attribute_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) # STEP 15: TH sends Resume command to the DUT self.step(15) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((resume_cmd_id in accepted_cmd_list) and (ocr_cmd_id in attribute_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) @@ -902,16 +959,14 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 17: TH sends Pause command to the DUT self.step(17) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((pause_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) # STEP 18: TH sends Resume command to the DUT self.step(18) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((pause_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) @@ -929,6 +984,13 @@ def STEPS_TC_OPSTATE_BASE_2_4(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_2_4(self, endpoint=1): cluster = self.test_info.cluster + + # Gathering Available Attributes and associated ids + attributes = cluster.Attributes + OPSTATE_attr_list = attributes.AttributeList + attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_attr_list) + oprtnlstate_attr_id = attributes.OperationalState.attribute_id + attributes = cluster.Attributes events = cluster.Events @@ -944,7 +1006,7 @@ async def TEST_TC_OPSTATE_BASE_2_4(self, endpoint=1): # STEP 1: Commission DUT to TH (can be skipped if done in a preceding test) self.step(1) - if self.pics_guard(error_event_gen): + if error_event_gen: # STEP 2: Set up a subscription to the OperationalError event self.step(2) # Subscribe to Events and when they are sent push them to a queue for checking later @@ -974,10 +1036,12 @@ async def TEST_TC_OPSTATE_BASE_2_4(self, endpoint=1): # STEP 4: TH reads from the DUT the OperationalState attribute self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kError) + else: self.skip_step(2) self.skip_step(3) @@ -1014,10 +1078,30 @@ def STEPS_TC_OPSTATE_BASE_2_5(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): cluster = self.test_info.cluster + + # Gathering Available Attributes and associated ids attributes = cluster.Attributes + OPSTATE_attr_list = attributes.AttributeList + attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_attr_list) + countdown_time_attr_id = attributes.CountdownTime.attribute_id + oprtnlstate_attr_id = attributes.OperationalState.attribute_id + + # Gathering Available Commands and associated ids commands = cluster.Commands + OPSTATE_accptcmd_list = attributes.AcceptedCommandList + OPSTATE_gencmd_list = attributes.GeneratedCommandList + accepted_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_accptcmd_list) + generated_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=OPSTATE_gencmd_list) + pause_cmd_id = commands.Pause.command_id + start_cmd_id = commands.Start.command_id + stop_cmd_id = commands.Stop.command_id + resume_cmd_id = commands.Resume.command_id + ocr_cmd_id = commands.OperationalCommandResponse.command_id + + # Gathering Available Event and associated id events = cluster.Events - + Oprtcmplt_event = bool(events.OperationCompletion.event_id) + self.init_test() asserts.assert_true('PIXIT.WAITTIME.REBOOT' in self.matter_test_config.global_test_params, @@ -1034,7 +1118,7 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 2: Set up a subscription to the OperationCompletion event self.step(2) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.E01")): + if Oprtcmplt_event: # Subscribe to Events and when they are sent push them to a queue for checking later events_callback = EventSpecificChangeCallback(events.OperationCompletion) await events_callback.start(self.default_controller, @@ -1054,22 +1138,21 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 4: TH sends Start command to the DUT self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((start_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 5: TH reads from the DUT the CountdownTime attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if countdown_time_attr_id in attribute_list: initial_countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if initial_countdown_time is not NullValue: # STEP 6: TH reads from the DUT the OperationalState attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) @@ -1080,15 +1163,14 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 8: TH sends Stop command to the DUT self.step(8) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((stop_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 9: TH waits for OperationCompletion event self.step(9) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.E01")): + if Oprtcmplt_event: event_data = events_callback.wait_for_event_report() asserts.assert_equal(event_data.completionErrorCode, cluster.Enums.ErrorStateEnum.kNoError, @@ -1103,7 +1185,7 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 10: TH reads from the DUT the OperationalState attribute self.step(10) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kStopped) @@ -1121,30 +1203,28 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 13: TH sends Start command to the DUT self.step(13) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((start_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 14: TH reads from the DUT the OperationalState attribute self.step(14) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 15: TH sends Pause command to the DUT self.step(15) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((pause_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 16: TH reads from the DUT the OperationalState attribute self.step(16) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kPaused) @@ -1155,15 +1235,14 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 18: TH sends Resume command to the DUT self.step(18) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((resume_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 19: TH reads from the DUT the OperationalState attribute self.step(19) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): + if oprtnlstate_attr_id in attribute_list: await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) @@ -1174,15 +1253,14 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 21: TH sends Stop command to the DUT self.step(21) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((stop_cmd_id in accepted_cmd_list) and (ocr_cmd_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 22: TH waits for OperationCompletion event self.step(22) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.E01")): + if Oprtcmplt_event: event_data = events_callback.wait_for_event_report() asserts.assert_equal(event_data.completionErrorCode, cluster.Enums.ErrorStateEnum.kNoError,