diff --git a/src/python_testing/TC_OVENOPSTATE_2_6.py b/src/python_testing/TC_OVENOPSTATE_2_6.py new file mode 100644 index 00000000000000..09436f4e76cc0b --- /dev/null +++ b/src/python_testing/TC_OVENOPSTATE_2_6.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --endpoint 1 --int-arg PIXIT.WAITTIME.REBOOT:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# === END CI TEST ARGUMENTS === + + +import chip.clusters as Clusters +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from TC_OpstateCommon import TC_OPSTATE_BASE, TestInfo + + +class TC_OVENOPSTATE_2_6(MatterBaseTest, TC_OPSTATE_BASE): + def __init__(self, *args): + super().__init__(*args) + + test_info = TestInfo( + pics_code="OVENOPSTATE", + cluster=Clusters.OvenCavityOperationalState + ) + + super().setup_base(test_info=test_info) + + def steps_TC_OVENOPSTATE_2_6(self) -> list[TestStep]: + return self.steps_TC_OPSTATE_BASE_2_6() + + def pics_TC_OVENOPSTATE_2_6(self) -> list[str]: + return ["OVENOPSTATE.S", "OVENOPSTATE.S.A0002"] + + @async_test_body + async def test_TC_OVENOPSTATE_2_6(self): + # endpoint = self.matter_test_config.endpoint + + # await self.TEST_TC_OPSTATE_BASE_2_6(endpoint=endpoint) + await self.TEST_TC_OPSTATE_BASE_2_6(endpoint=1) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_OpstateCommon.py b/src/python_testing/TC_OpstateCommon.py index ece50ac90c65b6..edc72f253eead5 100644 --- a/src/python_testing/TC_OpstateCommon.py +++ b/src/python_testing/TC_OpstateCommon.py @@ -1102,7 +1102,8 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): f"Completion event error code mismatched from expectation on endpoint {endpoint}.") if event_data.totalOperationalTime is not NullValue: - asserts.assert_less_equal(initial_countdown_time, event_data.totalOperationalTime, + time_diff = abs(initial_countdown_time - event_data.totalOperationalTime) + asserts.assert_less_equal(time_diff, 1, f"The total operation time shall be at least {initial_countdown_time:.1f}") asserts.assert_equal(0, event_data.pausedTime, @@ -1231,15 +1232,20 @@ def steps_TC_OPSTATE_BASE_2_6(self) -> list[TestStep]: TestStep(2, "Subscribe to CountdownTime attribute"), TestStep(3, "Manually put the DUT into a state where it will use the CountdownTime attribute, " "the initial value of the CountdownTime is greater than 30, " - "and it will begin counting down the CountdownTime attribute."), - TestStep(4, "Over a period of 30 seconds, TH counts all report transactions with an attribute " + "and it will begin counting down the CountdownTime attribute. " + "Test harness reads the CountdownTime attribute."), + TestStep(4, "Test harness reads the CountdownTime attribute."), + TestStep(5, "Over a period of 30 seconds, TH counts all report transactions with an attribute " "report for the CountdownTime attribute in numberOfReportsReceived"), - TestStep(5, "Until the current operation finishes, TH counts all report transactions with " + TestStep(6, "Test harness reads the CountdownTime attribute."), + TestStep(7, "Until the current operation finishes, TH counts all report transactions with " "an attribute report for the CountdownTime attribute in numberOfReportsReceived and saves up to 5 such reports."), - TestStep(6, "Manually put the DUT into a state where it will use the CountdownTime attribute, " - "the initial value of the CountdownTime is greater than 30, and it will begin counting down the CountdownTime attribute."), - TestStep(7, "TH reads from the DUT the OperationalState attribute"), - TestStep(8, "Manually put the device in the Paused(0x02) operational state") + TestStep(8, "Manually put the DUT into a state where it will use the CountdownTime attribute, " + "the initial value of the CountdownTime is greater than 30, and it will begin counting down the CountdownTime attribute." + "Test harness reads the CountdownTime attribute."), + TestStep(9, "TH reads from the DUT the OperationalState attribute"), + TestStep(10, "Test harness reads the CountdownTime attribute."), + TestStep(11, "Manually put the device in the Paused(0x02) operational state") ] return steps @@ -1267,26 +1273,35 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) - count = sub_handler.attribute_report_counts[attributes.CountdownTime] - asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime") + countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) + if countdownTime is not NullValue: + count = sub_handler.attribute_report_counts[attributes.CountdownTime] + asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime") else: self.skip_step(3) sub_handler.reset() self.step(4) - logging.info('Test will now collect data for 30 seconds') - time.sleep(30) + countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) + if countdownTime is not NullValue: + self.step(5) + logging.info('Test will now collect data for 10 seconds') + time.sleep(10) - count = sub_handler.attribute_report_counts[attributes.CountdownTime] - sub_handler.reset() - asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime") - asserts.assert_greater_equal(count, 0, "Did not receive any reports for CountdownTime") + count = sub_handler.attribute_report_counts[attributes.CountdownTime] + sub_handler.reset() + asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime") + asserts.assert_greater_equal(count, 0, "Did not receive any reports for CountdownTime") + else: + self.skip_step(5) + self.step(6) + countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) attr_value = await self.read_expect_success( endpoint=endpoint, attribute=attributes.OperationalState) - if attr_value == cluster.Enums.OperationalStateEnum.kRunning: - self.step(5) + if attr_value == cluster.Enums.OperationalStateEnum.kRunning and countdownTime is not NullValue: + self.step(7) wait_count = 0 while (attr_value != cluster.Enums.OperationalStateEnum.kStopped) and (wait_count < 20): time.sleep(1) @@ -1298,11 +1313,11 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime") asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime") else: - self.skip_step(5) + self.skip_step(7) sub_handler.reset() - self.step(6) if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")): + self.step(8) self.send_manual_or_pipe_command(name="OperationalStateChange", device=self.device, operation="Start") @@ -1310,19 +1325,23 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) - count = sub_handler.attribute_report_counts[attributes.CountdownTime] - asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime") + countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) + if countdownTime is not NullValue: + count = sub_handler.attribute_report_counts[attributes.CountdownTime] + asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime") else: - self.skip_step(6) + self.skip_step(8) - self.step(7) + self.step(9) await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) sub_handler.reset() - self.step(8) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")): + self.step(10) + countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) + if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")) and countdownTime is not NullValue: + self.step(11) self.send_manual_or_pipe_command(name="OperationalStateChange", device=self.device, operation="Pause") @@ -1330,4 +1349,4 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): count = sub_handler.attribute_report_counts[attributes.CountdownTime] asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime") else: - self.skip_step(8) + self.skip_step(11)