Skip to content

Commit

Permalink
TC-SM-1.2: Implement parts list topology test (#28871)
Browse files Browse the repository at this point in the history
* TC-SM-1.2: Implement parts list topology test

* Address review comments
  • Loading branch information
cecille authored and pull[bot] committed Oct 4, 2023
1 parent d934879 commit 1537954
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 2 deletions.
137 changes: 135 additions & 2 deletions src/python_testing/TC_DeviceBasicComposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,81 @@ def check_non_empty_list_of_ints_in_range(min_value: int, max_value: int, max_si
return check_list_of_ints_in_range(min_value, max_value, min_size=1, max_size=max_size, allow_null=allow_null)


def separate_endpoint_types(endpoint_dict: dict[int, Any]) -> tuple[list[int], list[int]]:
"""Returns a tuple containing the list of flat endpoints and a list of tree endpoints"""
flat = []
tree = []
for endpoint_id, endpoint in endpoint_dict.items():
if endpoint_id == 0:
continue
aggregator_id = 0x000e
device_types = [d.deviceType for d in endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]]
if aggregator_id in device_types:
flat.append(endpoint_id)
else:
tree.append(endpoint_id)
return (flat, tree)


def get_all_children(endpoint_id, endpoint_dict: dict[int, Any]) -> set[int]:
"""Returns all the children (include subchildren) of the given endpoint
This assumes we've already checked that there are no cycles, so we can do the dumb things and just trace the tree
"""
children = set()

def add_children(endpoint_id, children):
immediate_children = endpoint_dict[endpoint_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]
if not immediate_children:
return
children.update(set(immediate_children))
for child in immediate_children:
add_children(child, children)

add_children(endpoint_id, children)
return children


def find_tree_roots(tree_endpoints: list[int], endpoint_dict: dict[int, Any]) -> set[int]:
"""Returns a set of all the endpoints in tree_endpoints that are roots for a tree (not include singletons)"""
tree_roots = set()

def find_tree_root(current_id):
for endpoint_id, endpoint in endpoint_dict.items():
if endpoint_id not in tree_endpoints:
continue
if current_id in endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]:
# this is not the root, move up
return find_tree_root(endpoint_id)
return current_id

for endpoint_id in tree_endpoints:
root = find_tree_root(endpoint_id)
if root != endpoint_id:
tree_roots.add(root)
return tree_roots


def parts_list_cycles(tree_endpoints: list[int], endpoint_dict: dict[int, Any]) -> list[int]:
"""Returns a list of all the endpoints in the tree_endpoints list that contain cycles"""
def parts_list_cycle_detect(visited: set, current_id: int) -> bool:
if current_id in visited:
return True
visited.add(current_id)
for child in endpoint_dict[current_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]:
child_has_cycles = parts_list_cycle_detect(visited, child)
if child_has_cycles:
return True
return False

cycles = []
# This is quick enough that we can do all the endpoints wihtout searching for the roots
for endpoint_id in tree_endpoints:
visited = set()
if parts_list_cycle_detect(visited, endpoint_id):
cycles.append(endpoint_id)
return cycles


class TC_DeviceBasicComposition(MatterBaseTest):
@async_test_body
async def setup_class(self):
Expand Down Expand Up @@ -425,8 +500,66 @@ def test_all_endpoints_have_valid_composition(self):
asserts.skip(
"TODO: Make a test that verifies each endpoint has valid set of device types, and that the device type conformance is respected for each")

def test_topology_is_valid(self):
asserts.skip("TODO: Make a test that verifies each endpoint only lists direct descendants, except Root Node and Aggregator endpoints that list all their descendants")
def test_TC_SM_1_2(self):
self.print_step(1, "Wildcard read of device - already done")

self.print_step(2, "Verify the Descriptor cluster PartsList on endpoint 0 exactly lists all the other (non-0) endpoints on the DUT")
parts_list_0 = self.endpoints[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]
cluster_id = Clusters.Descriptor.id
attribute_id = Clusters.Descriptor.Attributes.PartsList.attribute_id
location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=attribute_id)
if len(self.endpoints.keys()) != len(set(self.endpoints.keys())):
self.record_error(self.get_test_name(), location=location,
problem='duplicate endpoint ids found in the returned data', spec_location="PartsList Attribute")
self.fail_current_test()

if len(parts_list_0) != len(set(parts_list_0)):
self.record_error(self.get_test_name(), location=location,
problem='Duplicate endpoint ids found in the parts list on ep0', spec_location="PartsList Attribute")
self.fail_current_test()

expected_parts = set(self.endpoints.keys())
expected_parts.remove(0)
if set(parts_list_0) != expected_parts:
self.record_error(self.get_test_name(), location=location,
problem='EP0 Descriptor parts list does not match the set of returned endpoints', spec_location="PartsList Attribute")
self.fail_current_test()

self.print_step(
3, "For each endpoint on the DUT (including EP 0), verify the PartsList in the Descriptor cluster on that endpoint does not include itself")
for endpoint_id, endpoint in self.endpoints.items():
if endpoint_id in endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f"Endpoint {endpoint_id} parts list includes itself", spec_location="PartsList Attribute")
self.fail_current_test()

self.print_step(4, "Separate endpoints into flat and tree style")
flat, tree = separate_endpoint_types(self.endpoints)

self.print_step(5, "Check for cycles in the tree endpoints")
cycles = parts_list_cycles(tree, self.endpoints)
if len(cycles) != 0:
for id in cycles:
location = AttributePathLocation(endpoint_id=id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f"Endpoint {id} parts list includes a cycle", spec_location="PartsList Attribute")
self.fail_current_test()

self.print_step(6, "Check flat lists include all sub ids")
ok = True
for endpoint_id in flat:
# ensure that every sub-id in the parts list is included in the parent
sub_children = []
for child in self.endpoints[endpoint_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]:
sub_children.update(get_all_children(child))
if not all(item in sub_children for item in self.endpoints[endpoint_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]):
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem='Flat parts list does not include all the sub-parts', spec_location='Endpoint composition')
ok = False
if not ok:
self.fail_current_test()

def test_TC_PS_3_1(self):
BRIDGED_NODE_DEVICE_TYPE_ID = 0x13
Expand Down
112 changes: 112 additions & 0 deletions src/python_testing/TestMatterTestingSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from matter_testing_support import (MatterBaseTest, async_test_body, compare_time, default_matter_test_main,
get_wait_seconds_from_set_time, parse_pics, type_matches, utc_time_in_matter_epoch)
from mobly import asserts, signals
from TC_DeviceBasicComposition import find_tree_roots, get_all_children, parts_list_cycles, separate_endpoint_types


def get_raw_type_list():
Expand Down Expand Up @@ -200,6 +201,117 @@ def test_get_wait_time_function(self):
secs = get_wait_seconds_from_set_time(th_utc, 15)
asserts.assert_equal(secs, 14)

def create_example_topology(self):
"""Creates a limited example of a wildcard read that contains only the descriptor cluster parts list and device types"""
def create_endpoint(parts_list: list[uint], device_types: list[uint]):
endpoint = {}
device_types_structs = []
for device_type in device_types:
device_types_structs.append(Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=device_type, revision=1))
endpoint[Clusters.Descriptor] = {Clusters.Descriptor.Attributes.PartsList: parts_list,
Clusters.Descriptor.Attributes.DeviceTypeList: device_types_structs}
return endpoint

endpoints = {}
# Root node is 0
# We have two trees in the root node and two trees in the aggregator
# 2 - 1
# - 3 - 4
# - 5 - 9
# 6 - 7
# - 8
# 10
# 11 (aggregator - all remaining are under it)
# 13 - 12
# - 14 - 15
# - 16
# 17 - 18
# - 19
# 20
# 21
endpoints[0] = create_endpoint([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21], [22])
endpoints[1] = create_endpoint([], [1]) # Just using a random device id, as long as it's not the aggregator it's fine
endpoints[2] = create_endpoint([1, 3], [1])
endpoints[3] = create_endpoint([4, 5], [1])
endpoints[4] = create_endpoint([], [1])
endpoints[5] = create_endpoint([9], [1])
endpoints[6] = create_endpoint([7, 8], [1])
endpoints[7] = create_endpoint([], [1])
endpoints[8] = create_endpoint([], [1])
endpoints[9] = create_endpoint([], [1])
endpoints[10] = create_endpoint([], [1])
endpoints[11] = create_endpoint([12, 13, 14, 15, 16, 17, 18, 19, 20, 21], [0xe]) # aggregator device type
endpoints[12] = create_endpoint([], [1])
endpoints[13] = create_endpoint([12, 14], [1])
endpoints[14] = create_endpoint([15, 16], [1])
endpoints[15] = create_endpoint([], [1])
endpoints[16] = create_endpoint([], [1])
endpoints[17] = create_endpoint([18, 19], [1])
endpoints[18] = create_endpoint([], [1])
endpoints[19] = create_endpoint([], [1])
endpoints[20] = create_endpoint([], [1])
endpoints[21] = create_endpoint([], [1])

return endpoints

def test_cycle_detection_and_splitting(self):
# Example topology has no cycles
endpoints = self.create_example_topology()
flat, tree = separate_endpoint_types(endpoints)
asserts.assert_equal(len(flat), len(set(flat)), "Duplicate endpoints found in flat list")
asserts.assert_equal(len(tree), len(set(tree)), "Duplicate endpoints found in tree list")
asserts.assert_equal(set(flat), {11}, "Aggregator node not found in list")
asserts.assert_equal(set(tree), {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21})

cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(len(cycles), 0, "Found cycles in the example tree")

# Add in several cycles and make sure we detect them all
# ep 10 refers back to itself (0 level cycle) on 10
endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(10)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [10])
endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(10)
print(endpoints[10])

# ep 4 refers back to 3 (1 level cycle) on 3 (will include 2, 3 and 4 in the cycles list)
endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(3)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [2, 3, 4])
endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(3)

# ep 16 refers back to 13 (2 level cycle) on 13 (will include 13, 14 and 16 in cycles)
endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(13)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [13, 14, 16])
endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(13)

# ep 9 refers back to 2 (3 level cycle) on 2 (includes 2, 3, 5, and 9)
endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(2)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [2, 3, 5, 9])
endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(2)

# make sure we get them all
endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(10)
endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(3)
endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(13)
endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(2)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [2, 3, 4, 5, 9, 10, 13, 14, 16])

def test_get_all_children(self):
endpoints = self.create_example_topology()
asserts.assert_equal(get_all_children(2, endpoints), {1, 3, 4, 5, 9}, "Child list for ep2 is incorrect")
asserts.assert_equal(get_all_children(6, endpoints), {7, 8}, "Child list for ep6 is incorrect")
asserts.assert_equal(get_all_children(13, endpoints), {12, 14, 15, 16}, "Child list for ep13 is incorrect")
asserts.assert_equal(get_all_children(17, endpoints), {18, 19}, "Child list for ep17 is incorrect")

def test_get_tree_roots(self):
endpoints = self.create_example_topology()
_, tree = separate_endpoint_types(endpoints)
asserts.assert_equal(find_tree_roots(tree, endpoints), {2, 6, 13, 17}, "Incorrect tree root list")


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 1537954

Please sign in to comment.