Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python test framework: PICS 2.0 #34384

Merged
merged 13 commits into from
Jul 27, 2024
6 changes: 6 additions & 0 deletions scripts/py_matter_yamltests/matter_yamltests/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ def show_prompt(self,
"""
pass

def test_skipped(self, filename: str, name: str):
"""
This method is called when the test script determines that the test is not applicable for the DUT.
"""
pass


class WebSocketRunnerHooks():
def connecting(self, url: str):
Expand Down
55 changes: 24 additions & 31 deletions src/python_testing/TC_TIMESYNC_2_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,53 +32,46 @@

import chip.clusters as Clusters
from chip.clusters.Types import NullValue
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, utc_time_in_matter_epoch
from matter_testing_support import (MatterBaseTest, default_matter_test_main, has_attribute, has_cluster, per_endpoint_test,
utc_time_in_matter_epoch)
from mobly import asserts


class TC_TIMESYNC_2_1(MatterBaseTest):
async def read_ts_attribute_expect_success(self, endpoint, attribute):
async def read_ts_attribute_expect_success(self, attribute):
cluster = Clusters.Objects.TimeSynchronization
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
return await self.read_single_attribute_check_success(endpoint=None, cluster=cluster, attribute=attribute)

def pics_TC_TIMESYNC_2_1(self) -> list[str]:
return ["TIMESYNC.S"]

@async_test_body
@per_endpoint_test(has_cluster(Clusters.TimeSynchronization) and has_attribute(Clusters.TimeSynchronization.Attributes.TimeSource))
async def test_TC_TIMESYNC_2_1(self):
endpoint = 0

features = await self.read_single_attribute(dev_ctrl=self.default_controller, node_id=self.dut_node_id,
endpoint=endpoint, attribute=Clusters.TimeSynchronization.Attributes.FeatureMap)
attributes = Clusters.TimeSynchronization.Attributes
features = await self.read_ts_attribute_expect_success(attribute=attributes.FeatureMap)

self.supports_time_zone = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeZone)
self.supports_ntpc = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kNTPClient)
self.supports_ntps = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kNTPServer)
self.supports_trusted_time_source = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeSyncClient)

time_cluster = Clusters.TimeSynchronization
timesync_attr_list = time_cluster.Attributes.AttributeList
attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=time_cluster, attribute=timesync_attr_list)
timesource_attr_id = time_cluster.Attributes.TimeSource.attribute_id
timesync_attr_list = attributes.AttributeList
attribute_list = await self.read_ts_attribute_expect_success(attribute=timesync_attr_list)
timesource_attr_id = attributes.TimeSource.attribute_id

self.print_step(1, "Commissioning, already done")
attributes = Clusters.TimeSynchronization.Attributes

self.print_step(2, "Read Granularity attribute")
granularity_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.Granularity)
granularity_dut = await self.read_ts_attribute_expect_success(attribute=attributes.Granularity)
asserts.assert_less(granularity_dut, Clusters.TimeSynchronization.Enums.GranularityEnum.kUnknownEnumValue,
"Granularity is not in valid range")

self.print_step(3, "Read TimeSource")
if timesource_attr_id in attribute_list:
time_source = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeSource)
time_source = await self.read_ts_attribute_expect_success(attribute=attributes.TimeSource)
asserts.assert_less(time_source, Clusters.TimeSynchronization.Enums.TimeSourceEnum.kUnknownEnumValue,
"TimeSource is not in valid range")

self.print_step(4, "Read TrustedTimeSource")
if self.supports_trusted_time_source:
trusted_time_source = await self.read_ts_attribute_expect_success(endpoint=endpoint,
attribute=attributes.TrustedTimeSource)
trusted_time_source = await self.read_ts_attribute_expect_success(attribute=attributes.TrustedTimeSource)
if trusted_time_source is not NullValue:
asserts.assert_less_equal(trusted_time_source.fabricIndex, 0xFE,
"FabricIndex for the TrustedTimeSource is out of range")
Expand All @@ -87,7 +80,7 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(5, "Read DefaultNTP")
if self.supports_ntpc:
default_ntp = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DefaultNTP)
default_ntp = await self.read_ts_attribute_expect_success(attribute=attributes.DefaultNTP)
if default_ntp is not NullValue:
asserts.assert_less_equal(len(default_ntp), 128, "DefaultNTP length must be less than 128")
# Assume this is a valid web address if it has at least one . in the name
Expand All @@ -102,7 +95,7 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(6, "Read TimeZone")
if self.supports_time_zone:
tz_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZone)
tz_dut = await self.read_ts_attribute_expect_success(attribute=attributes.TimeZone)
asserts.assert_greater_equal(len(tz_dut), 1, "TimeZone must have at least one entry in the list")
asserts.assert_less_equal(len(tz_dut), 2, "TimeZone may have a maximum of two entries in the list")
for entry in tz_dut:
Expand All @@ -117,7 +110,7 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(7, "Read DSTOffset")
if self.supports_time_zone:
dst_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DSTOffset)
dst_dut = await self.read_ts_attribute_expect_success(attribute=attributes.DSTOffset)
last_valid_until = -1
last_valid_starting = -1
for dst in dst_dut:
Expand All @@ -131,7 +124,7 @@ async def test_TC_TIMESYNC_2_1(self):
asserts.assert_equal(dst, dst_dut[-1], "DSTOffset list must have Null ValidUntil at the end")

self.print_step(8, "Read UTCTime")
utc_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.UTCTime)
utc_dut = await self.read_ts_attribute_expect_success(attribute=attributes.UTCTime)
if utc_dut is NullValue:
asserts.assert_equal(granularity_dut, Clusters.TimeSynchronization.Enums.GranularityEnum.kNoTimeGranularity)
else:
Expand All @@ -146,8 +139,8 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(9, "Read LocalTime")
if self.supports_time_zone:
utc_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.UTCTime)
local_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.LocalTime)
utc_dut = await self.read_ts_attribute_expect_success(attribute=attributes.UTCTime)
local_dut = await self.read_ts_attribute_expect_success(attribute=attributes.LocalTime)
if utc_dut is NullValue:
asserts.assert_true(local_dut is NullValue, "LocalTime must be Null if UTC time is Null")
elif len(dst_dut) == 0:
Expand All @@ -161,30 +154,30 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(10, "Read TimeZoneDatabase")
if self.supports_time_zone:
tz_db_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZoneDatabase)
tz_db_dut = await self.read_ts_attribute_expect_success(attribute=attributes.TimeZoneDatabase)
asserts.assert_less(tz_db_dut, Clusters.TimeSynchronization.Enums.TimeZoneDatabaseEnum.kUnknownEnumValue,
"TimeZoneDatabase is not in valid range")

self.print_step(11, "Read NTPServerAvailable")
if self.supports_ntps:
# bool typechecking happens in the test read functions, so all we need to do here is do the read
await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.NTPServerAvailable)
await self.read_ts_attribute_expect_success(attribute=attributes.NTPServerAvailable)

self.print_step(12, "Read TimeZoneListMaxSize")
if self.supports_time_zone:
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZoneListMaxSize)
size = await self.read_ts_attribute_expect_success(attribute=attributes.TimeZoneListMaxSize)
asserts.assert_greater_equal(size, 1, "TimeZoneListMaxSize must be at least 1")
asserts.assert_less_equal(size, 2, "TimeZoneListMaxSize must be max 2")

self.print_step(13, "Read DSTOffsetListMaxSize")
if self.supports_time_zone:
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DSTOffsetListMaxSize)
size = await self.read_ts_attribute_expect_success(attribute=attributes.DSTOffsetListMaxSize)
asserts.assert_greater_equal(size, 1, "DSTOffsetListMaxSize must be at least 1")

self.print_step(14, "Read SupportsDNSResolve")
# bool typechecking happens in the test read functions, so all we need to do here is do the read
if self.supports_ntpc:
await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.SupportsDNSResolve)
await self.read_ts_attribute_expect_success(attribute=attributes.SupportsDNSResolve)


if __name__ == "__main__":
Expand Down
Loading
Loading