From 827fa4535fd67500654475373b7cc56d34708f04 Mon Sep 17 00:00:00 2001 From: "tennessee.carmelveilleux@gmail.com" Date: Wed, 31 Jul 2024 11:42:34 -0400 Subject: [PATCH] Fix most TC-SWTCH-2.4 remaining issues - Move 2.4 in a better place in the file - Add test steps properly - Allow default button press position override Issue #34656 Testing done: - Test still passes on DUT with automation --- src/python_testing/TC_SWTCH.py | 255 +++++++++++++++++---------------- 1 file changed, 132 insertions(+), 123 deletions(-) diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 6da5c7a229e0b7..08bf23d91c30d4 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -45,22 +45,28 @@ logger = logging.getLogger(__name__) -class TC_SwitchTests(MatterBaseTest): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) +def bump_substep(step: str) -> str: + """Given a string like "5a", bump it to "5b", or "6c" to "6d" """ + if len(step) == 0: + raise ValueError("Can't bump empty steps!") - def desc_TC_SWTCH_2_4(self) -> str: - """Returns a description of this test""" - return "[TC-SWTCH-2.4] Momentary Switch Long Press Verification" + end_char = step[-1] + if not end_char.isalpha(): + return step + "a" - # TODO(#34656): Fill test steps - # def steps_TC_SWTCH_2_4(self) -> list[TestStep]: - # steps = [ - # TestStep("0", "Commissioning, already done", is_commissioning=True), - # # TODO: fill when test is done - # ] + step_prefix = step[:-1] + next_end_char = chr(ord(end_char) + 1) + if ord(next_end_char) > ord('z'): + raise ValueError(f"Reached max substep for step '{step}'") + next_step = step_prefix + next_end_char - # return steps + return next_step + + +class TC_SwitchTests(MatterBaseTest): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._default_pressed_position = self.user_params.get("default_pressed_position", 1) def _send_named_pipe_command(self, command_dict: dict[str, Any]): app_pid = self.matter_test_config.app_pid @@ -164,14 +170,6 @@ def _ask_for_release(self): else: time.sleep(self.keep_pressed_delay/1000) - def _placeholder_for_step(self, step_id: str): - # TODO(#34656): Global search an replace of `self._placeholder_for_step` with `self.step` when done. - logging.info(f"Step {step_id}") - pass - - def _placeholder_for_skip(self, step_id: str): - logging.info(f"Skipped step {step_id}") - def _await_sequence_of_reports(self, report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float): start_time = time.time() elapsed = 0.0 @@ -269,93 +267,6 @@ def _expect_no_events_for_cluster(self, event_queue: queue.Queue, endpoint_id: i logging.info(f"Successfully waited for no further events on {expected_cluster} for {elapsed:.1f} seconds") - @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch)) - async def test_TC_SWTCH_2_4(self): - # TODO(#34656): Make this come from PIXIT - switch_pressed_position = 1 - post_prompt_settle_delay_seconds = 10.0 - - # Commission DUT - already done - - # Read feature map to set bool markers - cluster = Clusters.Objects.Switch - feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) - - has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0 - has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 - has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0 - has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 - # has_msm_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchMultiPress) != 0 - - if not has_ms_feature: - logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present") - self.skip_all_remaining_steps("2") - - endpoint_id = self.matter_test_config.endpoint - - # Step 1: Set up subscription to all Switch cluster events - self._placeholder_for_step("1") - event_listener = EventChangeCallback(cluster) - attrib_listener = ClusterAttributeChangeAccumulator(cluster) - await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) - await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) - - # Step 2: Operator does not operate switch on the DUT - self._placeholder_for_step("2") - self._ask_for_switch_idle() - - # Step 3: TH reads the CurrentPosition attribute from the DUT - self._placeholder_for_step("3") - - # Verify that the value is 0 - current_position = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.CurrentPosition) - asserts.assert_equal(current_position, 0) - - # Step 4a: Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it - self._placeholder_for_step("4a") - self._ask_for_long_press(endpoint_id, switch_pressed_position, feature_map) - - # Step 4b: TH expects report of CurrentPosition 1, followed by a report of Current Position 0. - self._placeholder_for_step("4b") - logging.info( - f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for CurrentPosition to go {switch_pressed_position}, then 0.") - self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.CurrentPosition, sequence=[ - switch_pressed_position, 0], timeout_sec=post_prompt_settle_delay_seconds) - - # Step 4c: TH expects at least InitialPress with NewPosition = 1 - self._placeholder_for_step("4c") - logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for InitialPress event.") - expected_events = [cluster.Events.InitialPress(newPosition=switch_pressed_position)] - self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, - sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) - - # Step 4d: For MSL/AS, expect to see LongPress/LongRelease in that order - if not has_msl_feature and not has_as_feature: - logging.info("Skipping Step 4d due to missing MSL and AS features") - self._placeholder_for_skip("4d") - else: - # Steb 4d: TH expects report of LongPress, LongRelease in that order. - self._placeholder_for_step("4d") - logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for LongPress then LongRelease.") - expected_events = [] - expected_events.append(cluster.Events.LongPress(newPosition=switch_pressed_position)) - expected_events.append(cluster.Events.LongRelease(previousPosition=switch_pressed_position)) - self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, - sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) - - # Step 4e: For MS & (!MSL & !AS & !MSR), expect no further events for 10 seconds. - if not has_msl_feature and not has_as_feature and not has_msr_feature: - self._placeholder_for_step("4e") - self._expect_no_events_for_cluster(event_queue=event_listener.event_queue, - endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0) - - # Step 4f: For MSR & not MSL, expect to see ShortRelease. - if not has_msl_feature and has_msr_feature: - self._placeholder_for_step("4f") - expected_events = [cluster.Events.ShortRelease(previousPosition=switch_pressed_position)] - self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, - sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) - def _received_event(self, event_listener: EventChangeCallback, target_event: ClusterObjects.ClusterEvent, timeout_s: int) -> bool: """ Returns true if this event was received, false otherwise @@ -534,6 +445,104 @@ async def test_TC_SWTCH_2_3(self): button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) asserts.assert_equal(button_val, 0, "Button value is not 0") + def desc_TC_SWTCH_2_4(self) -> str: + return "[TC-SWTCH-2.4] Momentary Switch Long Press Verification" + + def steps_TC_SWTCH_2_4(self): + return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), + TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), + TestStep(3, "Operator does not operate switch on the DUT"), + TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), + TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it", + """ + * TH expects receiving a subscription report of CurrentPosition 1, followed by a report of Current Position 0. + * TH expects receiving at InitialPress event with NewPosition = 1. + * if MSL or AS feature is supported, TH expect receiving LongPress/LongRelease in that order. + * if MS & (!MSL & !AS & !MSR) features present, TH expects receiving no further events for 10 seconds after release. + * if (MSR & !MSL) features present, TH expects receiving ShortRelease event. + """) + ] + + @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch)) + async def test_TC_SWTCH_2_4(self): + switch_pressed_position = self._default_pressed_position + post_prompt_settle_delay_seconds = 10.0 + + endpoint_id = self.matter_test_config.endpoint + cluster = Clusters.Objects.Switch + + # Step 1: Commission DUT - already done + self.step(1) + + # Read feature map to set bool markers + feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) + + has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0 + has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 + has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0 + has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 + + if not has_ms_feature: + logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present") + self.skip_all_remaining_steps("2") + + # Step 2: Set up subscription to all events of Switch cluster on the endpoint + self.step(2) + event_listener = EventChangeCallback(cluster) + attrib_listener = ClusterAttributeChangeAccumulator(cluster) + await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + + # Step 3: Operator does not operate switch on the DUT + self.step(3) + self._ask_for_switch_idle() + + # Step 4: TH reads the CurrentPosition attribute from the DUT + self.step(4) + + # Verify that the value is 0 + current_position = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(current_position, 0) + + # Step 5: Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it + self.step(5) + self._ask_for_long_press(endpoint_id, switch_pressed_position, feature_map) + + # - TH expects report of CurrentPosition 1, followed by a report of Current Position 0. + logging.info( + f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for CurrentPosition to go {switch_pressed_position}, then 0.") + self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.CurrentPosition, sequence=[ + switch_pressed_position, 0], timeout_sec=post_prompt_settle_delay_seconds) + + # - TH expects at least InitialPress with NewPosition = 1 + logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for InitialPress event.") + expected_events = [cluster.Events.InitialPress(newPosition=switch_pressed_position)] + self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, + sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) + + # - if MSL or AS feature is supported, expect to see LongPress/LongRelease in that order. + if not has_msl_feature and not has_as_feature: + logging.info("Since MSL and AS features both unsupported, skipping check for LongPress/LongRelease") + else: + # - TH expects report of LongPress, LongRelease in that order. + logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for LongPress then LongRelease.") + expected_events = [] + expected_events.append(cluster.Events.LongPress(newPosition=switch_pressed_position)) + expected_events.append(cluster.Events.LongRelease(previousPosition=switch_pressed_position)) + self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, + sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) + + # - if MS & (!MSL & !AS & !MSR) features present, expect no further events for 10 seconds after release. + if not has_msl_feature and not has_as_feature and not has_msr_feature: + self._expect_no_events_for_cluster(event_queue=event_listener.event_queue, + endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0) + + # - if (MSR & !MSL) features present, expect to see ShortRelease event. + if not has_msl_feature and has_msr_feature: + expected_events = [cluster.Events.ShortRelease(previousPosition=switch_pressed_position)] + self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, + sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) + def steps_TC_SWTCH_2_5(self): return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), TestStep(2, "Set up a subscription to all Switch cluster events"), @@ -638,7 +647,7 @@ async def test_TC_SWTCH_2_5(self): multi_press_max = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.MultiPressMax) endpoint_id = self.matter_test_config.endpoint - pressed_position = 1 + pressed_position = self._default_pressed_position self.step(2) event_listener = EventChangeCallback(cluster) @@ -657,27 +666,27 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool = else: self._ask_for_multi_press(endpoint_id, number_of_presses=count, pressed_position=pressed_position, feature_map=feature_map, multi_press_max=multi_press_max) - for i in range(count): + for pos_idx in range(count): event = event_listener.wait_for_event_report(cluster.Events.InitialPress) asserts.assert_equal(event.newPosition, pressed_position, "Unexpected NewPosition on InitialEvent") - if i > 0: + if pos_idx > 0: event = event_listener.wait_for_event_report(cluster.Events.MultiPressOngoing) asserts.assert_equal(event.newPosition, pressed_position, "Unexpected NewPosition on MultiPressOngoing") - asserts.assert_equal(event.currentNumberOfPressesCounted, i+1, + asserts.assert_equal(event.currentNumberOfPressesCounted, pos_idx + 1, "Unexpected CurrentNumberOfPressesCounted on MultiPressOngoing") event = event_listener.wait_for_event_report(cluster.Events.ShortRelease) asserts.assert_equal(event.previousPosition, pressed_position, "Unexpected PreviousPosition on ShortRelease") - step = step[:-1] + chr(ord(step[-1])+1) + step = bump_substep(step) self.step(step) self._ask_for_switch_idle() event = event_listener.wait_for_event_report(cluster.Events.MultiPressComplete) asserts.assert_equal(event.previousPosition, pressed_position, "Unexpected PreviousPosition on MultiPressComplete") asserts.assert_equal(event.totalNumberOfPressesCounted, count, "Unexpected count on MultiPressComplete") - test_multi_press_sequence("4a", 1) + test_multi_press_sequence("4a", count=1) - test_multi_press_sequence("5a", 2) + test_multi_press_sequence("5a", count=2) self.step("6a") multi_press_max = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.MultiPressMax) @@ -685,7 +694,7 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool = self.skip_step("6b") self.skip_step("6c") else: - test_multi_press_sequence("6b", 3) + test_multi_press_sequence("6b", count=3) if not has_msl_feature: self.skip_all_remaining_steps(7) @@ -694,7 +703,7 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool = self.step(7) # subscription is already set up - test_multi_press_sequence("8a", 2, short_long=True) + test_multi_press_sequence("8a", count=2, short_long=True) self.step("9a") self._ask_for_multi_press_long_short(endpoint_id, pressed_position, feature_map) @@ -813,7 +822,7 @@ async def test_TC_SWTCH_2_6(self): multi_press_max = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.MultiPressMax) endpoint_id = self.matter_test_config.endpoint - pressed_position = 1 + pressed_position = self._default_pressed_position self.step(2) event_listener = EventChangeCallback(cluster) @@ -836,7 +845,7 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool = event = event_listener.wait_for_event_report(cluster.Events.InitialPress) asserts.assert_equal(event.newPosition, pressed_position, "Unexpected NewPosition on InitialEvent") - step = step[:-1] + chr(ord(step[-1])+1) + step = bump_substep(step) self.step(step) self._ask_for_switch_idle() event = event_listener.wait_for_event_report(cluster.Events.MultiPressComplete) @@ -844,11 +853,11 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool = expected_count = 0 if count > multi_press_max else count asserts.assert_equal(event.totalNumberOfPressesCounted, expected_count, "Unexpected count on MultiPressComplete") - test_multi_press_sequence("4a", 1) + test_multi_press_sequence("4a", count=1) - test_multi_press_sequence("5a", 2) + test_multi_press_sequence("5a", count=2) - test_multi_press_sequence("6a", multi_press_max + 1) + test_multi_press_sequence("6a", count=(multi_press_max + 1)) self.step("7a") if not has_msl_feature: @@ -857,7 +866,7 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool = # subscription is already established self.step("7b") - test_multi_press_sequence("8a", 2, short_long=True) + test_multi_press_sequence("8a", count=2, short_long=True) self.step("9a") self._ask_for_multi_press_long_short(endpoint_id, pressed_position, feature_map)