Skip to content

Commit

Permalink
[OVENOPSTATE] Add OvenOpState 2.6 test case from base (second try) (#…
Browse files Browse the repository at this point in the history
…34783)

* Add OvenOpState 2.6

* Added steps to me TP easier to write

* Reduce sleep, fix checks

* Changed test
  • Loading branch information
rbultman authored Aug 8, 2024
1 parent 0370489 commit b1c49ab
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 27 deletions.
62 changes: 62 additions & 0 deletions src/python_testing/TC_OVENOPSTATE_2_6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs: run1
# test-runner-run/run1/app: ${ALL_CLUSTERS_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --endpoint 1 --int-arg PIXIT.WAITTIME.REBOOT:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# === END CI TEST ARGUMENTS ===


import chip.clusters as Clusters
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from TC_OpstateCommon import TC_OPSTATE_BASE, TestInfo


class TC_OVENOPSTATE_2_6(MatterBaseTest, TC_OPSTATE_BASE):
def __init__(self, *args):
super().__init__(*args)

test_info = TestInfo(
pics_code="OVENOPSTATE",
cluster=Clusters.OvenCavityOperationalState
)

super().setup_base(test_info=test_info)

def steps_TC_OVENOPSTATE_2_6(self) -> list[TestStep]:
return self.steps_TC_OPSTATE_BASE_2_6()

def pics_TC_OVENOPSTATE_2_6(self) -> list[str]:
return ["OVENOPSTATE.S", "OVENOPSTATE.S.A0002"]

@async_test_body
async def test_TC_OVENOPSTATE_2_6(self):
# endpoint = self.matter_test_config.endpoint

# await self.TEST_TC_OPSTATE_BASE_2_6(endpoint=endpoint)
await self.TEST_TC_OPSTATE_BASE_2_6(endpoint=1)


if __name__ == "__main__":
default_matter_test_main()
73 changes: 46 additions & 27 deletions src/python_testing/TC_OpstateCommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,8 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1):
f"Completion event error code mismatched from expectation on endpoint {endpoint}.")

if event_data.totalOperationalTime is not NullValue:
asserts.assert_less_equal(initial_countdown_time, event_data.totalOperationalTime,
time_diff = abs(initial_countdown_time - event_data.totalOperationalTime)
asserts.assert_less_equal(time_diff, 1,
f"The total operation time shall be at least {initial_countdown_time:.1f}")

asserts.assert_equal(0, event_data.pausedTime,
Expand Down Expand Up @@ -1231,15 +1232,20 @@ def steps_TC_OPSTATE_BASE_2_6(self) -> list[TestStep]:
TestStep(2, "Subscribe to CountdownTime attribute"),
TestStep(3, "Manually put the DUT into a state where it will use the CountdownTime attribute, "
"the initial value of the CountdownTime is greater than 30, "
"and it will begin counting down the CountdownTime attribute."),
TestStep(4, "Over a period of 30 seconds, TH counts all report transactions with an attribute "
"and it will begin counting down the CountdownTime attribute. "
"Test harness reads the CountdownTime attribute."),
TestStep(4, "Test harness reads the CountdownTime attribute."),
TestStep(5, "Over a period of 30 seconds, TH counts all report transactions with an attribute "
"report for the CountdownTime attribute in numberOfReportsReceived"),
TestStep(5, "Until the current operation finishes, TH counts all report transactions with "
TestStep(6, "Test harness reads the CountdownTime attribute."),
TestStep(7, "Until the current operation finishes, TH counts all report transactions with "
"an attribute report for the CountdownTime attribute in numberOfReportsReceived and saves up to 5 such reports."),
TestStep(6, "Manually put the DUT into a state where it will use the CountdownTime attribute, "
"the initial value of the CountdownTime is greater than 30, and it will begin counting down the CountdownTime attribute."),
TestStep(7, "TH reads from the DUT the OperationalState attribute"),
TestStep(8, "Manually put the device in the Paused(0x02) operational state")
TestStep(8, "Manually put the DUT into a state where it will use the CountdownTime attribute, "
"the initial value of the CountdownTime is greater than 30, and it will begin counting down the CountdownTime attribute."
"Test harness reads the CountdownTime attribute."),
TestStep(9, "TH reads from the DUT the OperationalState attribute"),
TestStep(10, "Test harness reads the CountdownTime attribute."),
TestStep(11, "Manually put the device in the Paused(0x02) operational state")
]
return steps

Expand Down Expand Up @@ -1267,26 +1273,35 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if countdownTime is not NullValue:
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(3)

sub_handler.reset()
self.step(4)
logging.info('Test will now collect data for 30 seconds')
time.sleep(30)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if countdownTime is not NullValue:
self.step(5)
logging.info('Test will now collect data for 10 seconds')
time.sleep(10)

count = sub_handler.attribute_report_counts[attributes.CountdownTime]
sub_handler.reset()
asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime")
asserts.assert_greater_equal(count, 0, "Did not receive any reports for CountdownTime")
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
sub_handler.reset()
asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime")
asserts.assert_greater_equal(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(5)

self.step(6)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
attr_value = await self.read_expect_success(
endpoint=endpoint,
attribute=attributes.OperationalState)
if attr_value == cluster.Enums.OperationalStateEnum.kRunning:
self.step(5)
if attr_value == cluster.Enums.OperationalStateEnum.kRunning and countdownTime is not NullValue:
self.step(7)
wait_count = 0
while (attr_value != cluster.Enums.OperationalStateEnum.kStopped) and (wait_count < 20):
time.sleep(1)
Expand All @@ -1298,36 +1313,40 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):
asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime")
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(5)
self.skip_step(7)

sub_handler.reset()
self.step(6)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")):
self.step(8)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Start")
time.sleep(1)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if countdownTime is not NullValue:
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(6)
self.skip_step(8)

self.step(7)
self.step(9)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)

sub_handler.reset()
self.step(8)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")):
self.step(10)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")) and countdownTime is not NullValue:
self.step(11)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Pause")
time.sleep(1)
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(8)
self.skip_step(11)

0 comments on commit b1c49ab

Please sign in to comment.