Skip to content

Commit

Permalink
Updating TC_OpstateCommon and TC_RVCOPSTATE_2_1 test modules:
Browse files Browse the repository at this point in the history
- Updated method for attributes_guard functionality.
- Added additional check to make sure that endpoint is not 0 or not provided in command line
  • Loading branch information
j-ororke committed Oct 26, 2024
1 parent 28f3676 commit 20bdc0c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 24 deletions.
32 changes: 16 additions & 16 deletions src/python_testing/TC_OpstateCommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature
attributes.ClusterRevision.attribute_id
]

if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
expected_value.append(attributes.CountdownTime.attribute_id)

await self.read_and_expect_array_contains(endpoint=endpoint,
Expand Down Expand Up @@ -347,7 +347,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1):

# STEP 2: TH reads from the DUT the PhaseList attribute
self.step(2)
if self.attributes_guard(attributes.PhaseList):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.PhaseList):
phase_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.PhaseList)
if phase_list is not NullValue:
Expand All @@ -357,7 +357,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1):

# STEP 3: TH reads from the DUT the CurrentPhase attribute
self.step(3)
if self.attributes_guard(attributes.CurrentPhase):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CurrentPhase):
current_phase = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CurrentPhase)
if (phase_list == NullValue) or (not phase_list):
Expand All @@ -369,7 +369,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1):

# STEP 4: TH reads from the DUT the CountdownTime attribute
self.step(4)
if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if countdown_time is not NullValue:
Expand All @@ -378,7 +378,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1):

# STEP 5: TH reads from the DUT the OperationalStateList attribute
self.step(5)
if self.attributes_guard(attributes.OperationalStateList):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.OperationalStateList):
operational_state_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalStateList)
defined_states = [state.value for state in cluster.Enums.OperationalStateEnum
Expand Down Expand Up @@ -464,7 +464,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1):

# STEP 7: TH reads from the DUT the OperationalError attribute
self.step(7)
if self.attributes_guard(attributes.OperationalError):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.OperationalError):
operational_error = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalError)
# Defined Errors
Expand Down Expand Up @@ -600,7 +600,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1):

# STEP 3: TH reads from the DUT the OperationalStateList attribute
self.step(3)
if self.attributes_guard(attributes.OperationalStateList):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.OperationalStateList):
operational_state_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalStateList)

Expand Down Expand Up @@ -628,15 +628,15 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1):

# STEP 6: TH reads from the DUT the OperationalError attribute
self.step(6)
if self.attributes_guard(attributes.OperationalError):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.OperationalError):
await self.read_and_expect_property_value(endpoint=endpoint,
attribute=attributes.OperationalError,
attr_property="errorStateID",
expected_value=cluster.Enums.ErrorStateEnum.kNoError)

# STEP 7: TH reads from the DUT the CountdownTime attribute
self.step(7)
if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
initial_countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if initial_countdown_time is not NullValue:
Expand All @@ -645,7 +645,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1):

# STEP 8: TH reads from the DUT the PhaseList attribute
self.step(8)
if self.attributes_guard(attributes.PhaseList):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.PhaseList):
phase_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.PhaseList)
phase_list_len = 0
Expand All @@ -656,7 +656,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1):

# STEP 9: TH reads from the DUT the CurrentPhase attribute
self.step(9)
if self.attributes_guard(attributes.CurrentPhase):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CurrentPhase):
current_phase = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CurrentPhase)
if (phase_list == NullValue) or (not phase_list):
Expand All @@ -669,12 +669,12 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1):

# STEP 10: TH waits for {PIXIT.WAITTIME.COUNTDOWN}
self.step(10)
if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
time.sleep(wait_time)

# STEP 11: TH reads from the DUT the CountdownTime attribute
self.step(11)
if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)

Expand Down Expand Up @@ -815,7 +815,7 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1):

# STEP 6: TH reads from the DUT the CountdownTime attribute
self.step(6)
if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
initial_countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if initial_countdown_time is not NullValue:
Expand All @@ -829,7 +829,7 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1):

# STEP 8: TH reads from the DUT the CountdownTime attribute
self.step(8)
if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)

Expand Down Expand Up @@ -1061,7 +1061,7 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1):

# STEP 5: TH reads from the DUT the CountdownTime attribute
self.step(5)
if self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=endpoint, attribute=attributes.CountdownTime):
initial_countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)

Expand Down
14 changes: 8 additions & 6 deletions src/python_testing/TC_RVCOPSTATE_2_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def TC_RVCOPSTATE_2_1(self) -> list[str]:

@async_test_body
async def test_TC_RVCOPSTATE_2_1(self):
if self.matter_test_config.endpoint is None or self.matter_test_config.endpoint == 0:
asserts.fail("--endpoint must be set and not set to 0 for this test to run correctly.")
self.endpoint = self.matter_test_config.endpoint
asserts.assert_false(self.endpoint is None, "--endpoint <endpoint> must be included on the command line in.")
self.is_ci = self.check_pics("PICS_SDK_CI_ONLY")
Expand All @@ -103,7 +105,7 @@ async def test_TC_RVCOPSTATE_2_1(self):
if self.is_ci:
self.write_to_app_pipe({"Name": "Reset"})

if self.attributes_guard(attributes.PhaseList):
if await self.attributes_guard(endpoint=self.endpoint, attribute=attributes.PhaseList):
self.print_step(2, "Read PhaseList attribute")
phase_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.PhaseList)

Expand All @@ -116,7 +118,7 @@ async def test_TC_RVCOPSTATE_2_1(self):

asserts.assert_less_equal(phase_list_len, 32, "PhaseList length(%d) must be less than 32!" % phase_list_len)

if self.attributes_guard(attributes.CurrentPhase):
if await self.attributes_guard(endpoint=self.endpoint, attribute=attributes.CurrentPhase):
self.print_step(3, "Read CurrentPhase attribute")
current_phase = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentPhase)
logging.info("CurrentPhase: %s" % (current_phase))
Expand All @@ -127,7 +129,7 @@ async def test_TC_RVCOPSTATE_2_1(self):
asserts.assert_true(0 <= current_phase < phase_list_len,
"CurrentPhase(%s) must be between 0 and %d" % (current_phase, (phase_list_len - 1)))

if await self.attributes_guard(attributes.CountdownTime):
if await self.attributes_guard(endpoint=self.endpoint, attribute=attributes.CountdownTime):
self.print_step(4, "Read CountdownTime attribute")
countdown_time = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.CountdownTime)
Expand All @@ -137,7 +139,7 @@ async def test_TC_RVCOPSTATE_2_1(self):
asserts.assert_true(countdown_time >= 0 and countdown_time <= 259200,
"CountdownTime(%s) must be between 0 and 259200" % countdown_time)

if self.attributes_guard(attributes.OperationalStateList):
if await self.attributes_guard(endpoint=self.endpoint, attribute=attributes.OperationalStateList):
self.print_step(5, "Read OperationalStateList attribute")
operational_state_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalStateList)
Expand All @@ -160,7 +162,7 @@ async def test_TC_RVCOPSTATE_2_1(self):

asserts.assert_true(error_state_present, "The OperationalStateList does not have an ID entry of Error(0x03)")

if self.attributes_guard(attributes.OperationalState):
if await self.attributes_guard(endpoint=self.endpoint, attribute=attributes.OperationalState):
self.print_step(6, "Read OperationalState attribute")
operational_state = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalState)
Expand Down Expand Up @@ -227,7 +229,7 @@ async def test_TC_RVCOPSTATE_2_1(self):
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
await self.read_and_validate_opstate(step="6n", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked)

if self.attributes_guard(attributes.OperationalError):
if await self.attributes_guard(endpoint=self.endpoint, attribute=attributes.OperationalError):
self.print_step(7, "Read OperationalError attribute")
operational_error = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,9 @@ def setup_class(self):
self.current_step_index = 0
self.step_start_time = datetime.now(timezone.utc)
self.step_skipped = False
self.global_wildcard = asyncio.wait_for(self.default_controller.Read(self.dut_node_id, [(Clusters.Descriptor), Attribute.AttributePath(None, None, GlobalAttributeIds.ATTRIBUTE_LIST_ID), Attribute.AttributePath(None, None, GlobalAttributeIds.FEATURE_MAP_ID), Attribute.AttributePath(None, None, GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID)]), timeout=60)
# self.stored_global_wildcard stores value of self.global_wildcard after first async call, appears needed in order to not timeout during commissioning
self.stored_global_wildcard = None

def setup_test(self):
self.current_step_index = 0
Expand Down Expand Up @@ -1415,7 +1418,7 @@ def pics_guard(self, pics_condition: bool):
self.mark_current_step_skipped()
return pics_condition

async def attributes_guard(self, attribute_condition: bool):
async def attributes_guard(self, endpoint: int, attribute: str):
"""Similar to pics_guard above, except checks a condition and if False marks the test step as skipped and
returns False using attributes against attributes_list, otherwise returns True.
For example can be used to check if a test step should be run:
Expand All @@ -1428,7 +1431,9 @@ async def attributes_guard(self, attribute_condition: bool):
if self.attribute_guard(condition2_needs_to_be_false_to_skip_step):
# skip step 2 if condition not met
"""
attr_condition = await asyncio.wait_for(should_run_test_on_endpoint(self, has_attribute(attribute_condition)), timeout=60)
if self.stored_global_wildcard == None:
self.stored_global_wildcard = await self.global_wildcard
attr_condition = _has_attribute(wildcard=self.stored_global_wildcard, endpoint=endpoint, attribute=attribute)
if not attr_condition:
self.mark_current_step_skipped()
return attr_condition
Expand Down

0 comments on commit 20bdc0c

Please sign in to comment.