Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix most TC-SWTCH-2.4 remaining issues #34677

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 132 additions & 123 deletions src/python_testing/TC_SWTCH.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,28 @@
logger = logging.getLogger(__name__)


class TC_SwitchTests(MatterBaseTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def bump_substep(step: str) -> str:
"""Given a string like "5a", bump it to "5b", or "6c" to "6d" """
if len(step) == 0:
raise ValueError("Can't bump empty steps!")

def desc_TC_SWTCH_2_4(self) -> str:
"""Returns a description of this test"""
return "[TC-SWTCH-2.4] Momentary Switch Long Press Verification"
end_char = step[-1]
if not end_char.isalpha():
return step + "a"

# TODO(#34656): Fill test steps
# def steps_TC_SWTCH_2_4(self) -> list[TestStep]:
# steps = [
# TestStep("0", "Commissioning, already done", is_commissioning=True),
# # TODO: fill when test is done
# ]
step_prefix = step[:-1]
next_end_char = chr(ord(end_char) + 1)
if ord(next_end_char) > ord('z'):
raise ValueError(f"Reached max substep for step '{step}'")
next_step = step_prefix + next_end_char

# return steps
return next_step


class TC_SwitchTests(MatterBaseTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._default_pressed_position = self.user_params.get("default_pressed_position", 1)

def _send_named_pipe_command(self, command_dict: dict[str, Any]):
app_pid = self.matter_test_config.app_pid
Expand Down Expand Up @@ -164,14 +170,6 @@ def _ask_for_release(self):
else:
time.sleep(self.keep_pressed_delay/1000)

def _placeholder_for_step(self, step_id: str):
# TODO(#34656): Global search an replace of `self._placeholder_for_step` with `self.step` when done.
logging.info(f"Step {step_id}")
pass

def _placeholder_for_skip(self, step_id: str):
logging.info(f"Skipped step {step_id}")

def _await_sequence_of_reports(self, report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float):
start_time = time.time()
elapsed = 0.0
Expand Down Expand Up @@ -269,93 +267,6 @@ def _expect_no_events_for_cluster(self, event_queue: queue.Queue, endpoint_id: i

logging.info(f"Successfully waited for no further events on {expected_cluster} for {elapsed:.1f} seconds")

@per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch))
async def test_TC_SWTCH_2_4(self):
# TODO(#34656): Make this come from PIXIT
switch_pressed_position = 1
post_prompt_settle_delay_seconds = 10.0

# Commission DUT - already done

# Read feature map to set bool markers
cluster = Clusters.Objects.Switch
feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap)

has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0
has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0
has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0
has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0
# has_msm_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchMultiPress) != 0

if not has_ms_feature:
logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present")
self.skip_all_remaining_steps("2")

endpoint_id = self.matter_test_config.endpoint

# Step 1: Set up subscription to all Switch cluster events
self._placeholder_for_step("1")
event_listener = EventChangeCallback(cluster)
attrib_listener = ClusterAttributeChangeAccumulator(cluster)
await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)

# Step 2: Operator does not operate switch on the DUT
self._placeholder_for_step("2")
self._ask_for_switch_idle()

# Step 3: TH reads the CurrentPosition attribute from the DUT
self._placeholder_for_step("3")

# Verify that the value is 0
current_position = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(current_position, 0)

# Step 4a: Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it
self._placeholder_for_step("4a")
self._ask_for_long_press(endpoint_id, switch_pressed_position, feature_map)

# Step 4b: TH expects report of CurrentPosition 1, followed by a report of Current Position 0.
self._placeholder_for_step("4b")
logging.info(
f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for CurrentPosition to go {switch_pressed_position}, then 0.")
self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.CurrentPosition, sequence=[
switch_pressed_position, 0], timeout_sec=post_prompt_settle_delay_seconds)

# Step 4c: TH expects at least InitialPress with NewPosition = 1
self._placeholder_for_step("4c")
logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for InitialPress event.")
expected_events = [cluster.Events.InitialPress(newPosition=switch_pressed_position)]
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)

# Step 4d: For MSL/AS, expect to see LongPress/LongRelease in that order
if not has_msl_feature and not has_as_feature:
logging.info("Skipping Step 4d due to missing MSL and AS features")
self._placeholder_for_skip("4d")
else:
# Steb 4d: TH expects report of LongPress, LongRelease in that order.
self._placeholder_for_step("4d")
logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for LongPress then LongRelease.")
expected_events = []
expected_events.append(cluster.Events.LongPress(newPosition=switch_pressed_position))
expected_events.append(cluster.Events.LongRelease(previousPosition=switch_pressed_position))
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)

# Step 4e: For MS & (!MSL & !AS & !MSR), expect no further events for 10 seconds.
if not has_msl_feature and not has_as_feature and not has_msr_feature:
self._placeholder_for_step("4e")
self._expect_no_events_for_cluster(event_queue=event_listener.event_queue,
endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0)

# Step 4f: For MSR & not MSL, expect to see ShortRelease.
if not has_msl_feature and has_msr_feature:
self._placeholder_for_step("4f")
expected_events = [cluster.Events.ShortRelease(previousPosition=switch_pressed_position)]
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)

def _received_event(self, event_listener: EventChangeCallback, target_event: ClusterObjects.ClusterEvent, timeout_s: int) -> bool:
"""
Returns true if this event was received, false otherwise
Expand Down Expand Up @@ -534,6 +445,104 @@ async def test_TC_SWTCH_2_3(self):
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, 0, "Button value is not 0")

def desc_TC_SWTCH_2_4(self) -> str:
return "[TC-SWTCH-2.4] Momentary Switch Long Press Verification"

def steps_TC_SWTCH_2_4(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
TestStep(3, "Operator does not operate switch on the DUT"),
TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it",
"""
* TH expects receiving a subscription report of CurrentPosition 1, followed by a report of Current Position 0.
* TH expects receiving at InitialPress event with NewPosition = 1.
* if MSL or AS feature is supported, TH expect receiving LongPress/LongRelease in that order.
* if MS & (!MSL & !AS & !MSR) features present, TH expects receiving no further events for 10 seconds after release.
* if (MSR & !MSL) features present, TH expects receiving ShortRelease event.
""")
]

@per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch))
async def test_TC_SWTCH_2_4(self):
switch_pressed_position = self._default_pressed_position
post_prompt_settle_delay_seconds = 10.0

endpoint_id = self.matter_test_config.endpoint
cluster = Clusters.Objects.Switch

# Step 1: Commission DUT - already done
self.step(1)

# Read feature map to set bool markers
feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap)

has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0
has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0
has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0
has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0

if not has_ms_feature:
logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present")
self.skip_all_remaining_steps("2")

# Step 2: Set up subscription to all events of Switch cluster on the endpoint
self.step(2)
event_listener = EventChangeCallback(cluster)
attrib_listener = ClusterAttributeChangeAccumulator(cluster)
await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)

# Step 3: Operator does not operate switch on the DUT
self.step(3)
self._ask_for_switch_idle()

# Step 4: TH reads the CurrentPosition attribute from the DUT
self.step(4)

# Verify that the value is 0
current_position = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(current_position, 0)

# Step 5: Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it
self.step(5)
self._ask_for_long_press(endpoint_id, switch_pressed_position, feature_map)

# - TH expects report of CurrentPosition 1, followed by a report of Current Position 0.
logging.info(
f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for CurrentPosition to go {switch_pressed_position}, then 0.")
self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.CurrentPosition, sequence=[
switch_pressed_position, 0], timeout_sec=post_prompt_settle_delay_seconds)

# - TH expects at least InitialPress with NewPosition = 1
logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for InitialPress event.")
expected_events = [cluster.Events.InitialPress(newPosition=switch_pressed_position)]
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)

# - if MSL or AS feature is supported, expect to see LongPress/LongRelease in that order.
if not has_msl_feature and not has_as_feature:
logging.info("Since MSL and AS features both unsupported, skipping check for LongPress/LongRelease")
else:
# - TH expects report of LongPress, LongRelease in that order.
logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for LongPress then LongRelease.")
expected_events = []
expected_events.append(cluster.Events.LongPress(newPosition=switch_pressed_position))
expected_events.append(cluster.Events.LongRelease(previousPosition=switch_pressed_position))
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)

# - if MS & (!MSL & !AS & !MSR) features present, expect no further events for 10 seconds after release.
if not has_msl_feature and not has_as_feature and not has_msr_feature:
self._expect_no_events_for_cluster(event_queue=event_listener.event_queue,
endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0)

# - if (MSR & !MSL) features present, expect to see ShortRelease event.
if not has_msl_feature and has_msr_feature:
expected_events = [cluster.Events.ShortRelease(previousPosition=switch_pressed_position)]
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)

def steps_TC_SWTCH_2_5(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
TestStep(2, "Set up a subscription to all Switch cluster events"),
Expand Down Expand Up @@ -638,7 +647,7 @@ async def test_TC_SWTCH_2_5(self):
multi_press_max = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.MultiPressMax)

endpoint_id = self.matter_test_config.endpoint
pressed_position = 1
pressed_position = self._default_pressed_position

self.step(2)
event_listener = EventChangeCallback(cluster)
Expand All @@ -657,35 +666,35 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool =
else:
self._ask_for_multi_press(endpoint_id, number_of_presses=count, pressed_position=pressed_position,
feature_map=feature_map, multi_press_max=multi_press_max)
for i in range(count):
for pos_idx in range(count):
event = event_listener.wait_for_event_report(cluster.Events.InitialPress)
asserts.assert_equal(event.newPosition, pressed_position, "Unexpected NewPosition on InitialEvent")
if i > 0:
if pos_idx > 0:
event = event_listener.wait_for_event_report(cluster.Events.MultiPressOngoing)
asserts.assert_equal(event.newPosition, pressed_position, "Unexpected NewPosition on MultiPressOngoing")
asserts.assert_equal(event.currentNumberOfPressesCounted, i+1,
asserts.assert_equal(event.currentNumberOfPressesCounted, pos_idx + 1,
"Unexpected CurrentNumberOfPressesCounted on MultiPressOngoing")
event = event_listener.wait_for_event_report(cluster.Events.ShortRelease)
asserts.assert_equal(event.previousPosition, pressed_position, "Unexpected PreviousPosition on ShortRelease")

step = step[:-1] + chr(ord(step[-1])+1)
step = bump_substep(step)
self.step(step)
self._ask_for_switch_idle()
event = event_listener.wait_for_event_report(cluster.Events.MultiPressComplete)
asserts.assert_equal(event.previousPosition, pressed_position, "Unexpected PreviousPosition on MultiPressComplete")
asserts.assert_equal(event.totalNumberOfPressesCounted, count, "Unexpected count on MultiPressComplete")

test_multi_press_sequence("4a", 1)
test_multi_press_sequence("4a", count=1)

test_multi_press_sequence("5a", 2)
test_multi_press_sequence("5a", count=2)

self.step("6a")
multi_press_max = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.MultiPressMax)
if multi_press_max == 2:
self.skip_step("6b")
self.skip_step("6c")
else:
test_multi_press_sequence("6b", 3)
test_multi_press_sequence("6b", count=3)

if not has_msl_feature:
self.skip_all_remaining_steps(7)
Expand All @@ -694,7 +703,7 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool =
self.step(7)
# subscription is already set up

test_multi_press_sequence("8a", 2, short_long=True)
test_multi_press_sequence("8a", count=2, short_long=True)

self.step("9a")
self._ask_for_multi_press_long_short(endpoint_id, pressed_position, feature_map)
Expand Down Expand Up @@ -813,7 +822,7 @@ async def test_TC_SWTCH_2_6(self):
multi_press_max = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.MultiPressMax)

endpoint_id = self.matter_test_config.endpoint
pressed_position = 1
pressed_position = self._default_pressed_position

self.step(2)
event_listener = EventChangeCallback(cluster)
Expand All @@ -836,19 +845,19 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool =
event = event_listener.wait_for_event_report(cluster.Events.InitialPress)
asserts.assert_equal(event.newPosition, pressed_position, "Unexpected NewPosition on InitialEvent")

step = step[:-1] + chr(ord(step[-1])+1)
step = bump_substep(step)
self.step(step)
self._ask_for_switch_idle()
event = event_listener.wait_for_event_report(cluster.Events.MultiPressComplete)
asserts.assert_equal(event.previousPosition, pressed_position, "Unexpected PreviousPosition on MultiPressComplete")
expected_count = 0 if count > multi_press_max else count
asserts.assert_equal(event.totalNumberOfPressesCounted, expected_count, "Unexpected count on MultiPressComplete")

test_multi_press_sequence("4a", 1)
test_multi_press_sequence("4a", count=1)

test_multi_press_sequence("5a", 2)
test_multi_press_sequence("5a", count=2)

test_multi_press_sequence("6a", multi_press_max + 1)
test_multi_press_sequence("6a", count=(multi_press_max + 1))

self.step("7a")
if not has_msl_feature:
Expand All @@ -857,7 +866,7 @@ def test_multi_press_sequence(starting_step: str, count: int, short_long: bool =
# subscription is already established
self.step("7b")

test_multi_press_sequence("8a", 2, short_long=True)
test_multi_press_sequence("8a", count=2, short_long=True)

self.step("9a")
self._ask_for_multi_press_long_short(endpoint_id, pressed_position, feature_map)
Expand Down
Loading