forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTC_DeviceBasicComposition.py
946 lines (834 loc) · 55.4 KB
/
TC_DeviceBasicComposition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${ALL_CLUSTERS_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# run2:
# app: ${CHIP_LOCK_APP}
# app-args: --discriminator 1234 --KVS kvs1
# script-args: --storage-path admin_storage.json --manual-code 10054912339
# factory-reset: true
# quiet: true
# run3:
# app: ${CHIP_LOCK_APP}
# app-args: --discriminator 1234 --KVS kvs1
# script-args: --storage-path admin_storage.json --qr-code MT:-24J0Q1212-10648G00
# factory-reset: true
# quiet: true
# run4:
# app: ${CHIP_LOCK_APP}
# app-args: --discriminator 1234 --KVS kvs1
# script-args: >
# --storage-path admin_storage.json
# --discriminator 1234
# --passcode 20202021
# factory-reset: true
# quiet: true
# run5:
# app: ${CHIP_LOCK_APP}
# app-args: --discriminator 1234 --KVS kvs1
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --commissioning-method on-network
# factory-reset: true
# quiet: true
# run6:
# app: ${CHIP_LOCK_APP}
# app-args: --discriminator 1234 --KVS kvs1
# script-args: >
# --storage-path admin_storage.json
# --qr-code MT:-24J0Q1212-10648G00
# --commissioning-method on-network
# factory-reset: true
# quiet: true
# run7:
# app: ${CHIP_LOCK_APP}
# app-args: --discriminator 1234 --KVS kvs1
# script-args: >
# --storage-path admin_storage.json
# --discriminator 1234
# --passcode 20202021
# --commissioning-method on-network
# factory-reset: true
# quiet: true
# run8:
# app: ${CHIP_LOCK_APP}
# app-args: --discriminator 1234 --KVS kvs1
# script-args: --storage-path admin_storage.json
# factory-reset: false
# quiet: true
# run9:
# app: ${ENERGY_MANAGEMENT_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# run10:
# app: ${LIT_ICD_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# run11:
# app: ${CHIP_MICROWAVE_OVEN_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# run12:
# app: ${CHIP_RVC_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# run13:
# app: ${NETWORK_MANAGEMENT_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# run14:
# app: ${LIGHTING_APP_NO_UNIQUE_ID}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --manual-code 10054912339
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# === END CI TEST ARGUMENTS ===
# Run 1: runs through all tests
# Run 2: tests PASE connection using manual code (12.1 only)
# Run 3: tests PASE connection using QR code (12.1 only)
# Run 4: tests PASE connection using discriminator and passcode (12.1 only)
# Run 5: Tests CASE connection using manual code (12.1 only)
# Run 6: Tests CASE connection using QR code (12.1 only)
# Run 7: Tests CASE connection using manual discriminator and passcode (12.1 only)
# Run 8: Tests reusing storage from run7 (i.e. factory-reset=false)
# Run 9: Tests against energy-management-app
# Run 10: Tests against lit-icd app
# Run 11: Tests against microwave-oven app
# Run 12: Tests against chip-rvc app
# Run 13: Tests against network-management-app
# Run 14: Tests against lighting-app-data-mode-no-unique-id
import logging
from dataclasses import dataclass
from typing import Any, Callable
import chip.clusters as Clusters
import chip.clusters.ClusterObjects
import chip.tlv
from chip import ChipUtility
from chip.clusters.Attribute import ValueDecodeFailure
from chip.clusters.ClusterObjects import ClusterAttributeDescriptor, ClusterObjectFieldDescriptor
from chip.interaction_model import InteractionModelError, Status
from chip.testing.basic_composition import BasicCompositionTests
from chip.testing.global_attribute_ids import (AttributeIdType, ClusterIdType, CommandIdType, GlobalAttributeIds, attribute_id_type,
cluster_id_type, command_id_type)
from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, TestStep,
async_test_body, default_matter_test_main)
from chip.testing.taglist_and_topology_test import (create_device_type_list_for_root, create_device_type_lists,
find_tag_list_problems, find_tree_roots, flat_list_ok,
get_direct_children_of_root, parts_list_cycles, separate_endpoint_types)
from chip.tlv import uint
def get_vendor_id(mei: int) -> int:
"""Get the vendor ID portion (MEI prefix) of an overall MEI."""
return (mei >> 16) & 0xffff
def check_int_in_range(min_value: int, max_value: int, allow_null: bool = False) -> Callable:
"""Returns a checker for whether `obj` is an int that fits in a range."""
def int_in_range_checker(obj: Any):
"""Inner checker logic for check_int_in_range
Checker validates that `obj` must have decoded as an integral value in range [min_value, max_value].
On failure, a ValueError is raised with a diagnostic message.
"""
if obj is None and allow_null:
return
if not isinstance(obj, int) and not isinstance(obj, chip.tlv.uint):
raise ValueError(f"Value {str(obj)} is not an integer or uint (decoded type: {type(obj)})")
int_val = int(obj)
if (int_val < min_value) or (int_val > max_value):
raise ValueError(
f"Value {int_val} (0x{int_val:X}) not in range [{min_value}, {max_value}] ([0x{min_value:X}, 0x{max_value:X}])")
return int_in_range_checker
def check_list_of_ints_in_range(min_value: int, max_value: int, min_size: int = 0, max_size: int = 65535, allow_null: bool = False) -> Callable:
"""Returns a checker for whether `obj` is a list of ints that fit in a range."""
def list_of_ints_in_range_checker(obj: Any):
"""Inner checker for check_list_of_ints_in_range.
Checker validates that `obj` must have decoded as a list of integral values in range [min_value, max_value].
The length of the list must be between [min_size, max_size].
On failure, a ValueError is raised with a diagnostic message.
"""
if obj is None and allow_null:
return
if not isinstance(obj, list):
raise ValueError(f"Value {str(obj)} is not a list, but a list was expected (decoded type: {type(obj)})")
if len(obj) < min_size or len(obj) > max_size:
raise ValueError(
f"Value {str(obj)} is a list of size {len(obj)}, but expected a list with size in range [{min_size}, {max_size}]")
for val_idx, val in enumerate(obj):
if not isinstance(val, int) and not isinstance(val, chip.tlv.uint):
raise ValueError(
f"At index {val_idx} in {str(obj)}, value {val} is not an int/uint, but an int/uint was expected (decoded type: {type(val)})")
int_val = int(val)
if not ((int_val >= min_value) and (int_val <= max_value)):
raise ValueError(
f"At index {val_idx} in {str(obj)}, value {int_val} (0x{int_val:X}) not in range [{min_value}, {max_value}] ([0x{min_value:X}, 0x{max_value:X}])")
return list_of_ints_in_range_checker
def check_non_empty_list_of_ints_in_range(min_value: int, max_value: int, max_size: int = 65535, allow_null: bool = False) -> Callable:
"""Returns a checker for whether `obj` is a non-empty list of ints that fit in a range."""
return check_list_of_ints_in_range(min_value, max_value, min_size=1, max_size=max_size, allow_null=allow_null)
def check_no_duplicates(obj: Any) -> None:
if not isinstance(obj, list):
raise ValueError(f"Value {str(obj)} is not a list, but a list was expected (decoded type: {type(obj)})")
if len(set(obj)) != len(obj):
raise ValueError(f"Value {str(obj)} contains duplicate values")
class TC_DeviceBasicComposition(MatterBaseTest, BasicCompositionTests):
@async_test_body
async def setup_class(self):
super().setup_class()
await self.setup_class_helper()
# ======= START OF ACTUAL TESTS =======
def test_TC_SM_1_1(self):
ROOT_NODE_DEVICE_TYPE = 0x16
self.print_step(1, "Perform a wildcard read of attributes on all endpoints - already done")
self.print_step(2, "Verify that endpoint 0 exists")
if 0 not in self.endpoints:
self.record_error(self.get_test_name(), location=AttributePathLocation(endpoint_id=0),
problem="Did not find Endpoint 0.", spec_location="Endpoint Composition")
self.fail_current_test()
self.print_step(3, "Verify that endpoint 0 descriptor cluster includes the root node device type")
if Clusters.Descriptor not in self.endpoints[0]:
self.record_error(self.get_test_name(), location=AttributePathLocation(endpoint_id=0),
problem="No descriptor cluster on Endpoint 0", spec_location="Root node device type")
self.fail_current_test()
listed_device_types = [i.deviceType for i in self.endpoints[0]
[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]]
if ROOT_NODE_DEVICE_TYPE not in listed_device_types:
self.record_error(self.get_test_name(), location=AttributePathLocation(endpoint_id=0),
problem="Root node device type not listed on endpoint 0", spec_location="Root node device type")
self.fail_current_test()
self.print_step(4, "Verify that the root node device type does not appear in any of the non-zero endpoints")
for endpoint_id, endpoint in self.endpoints.items():
if endpoint_id == 0:
continue
listed_device_types = [i.deviceType for i in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.DeviceTypeList]]
if ROOT_NODE_DEVICE_TYPE in listed_device_types:
self.record_error(self.get_test_name(), location=AttributePathLocation(endpoint_id=endpoint_id),
problem=f'Root node device type listed on endpoint {endpoint_id}', spec_location="Root node device type")
self.fail_current_test()
self.print_step(5, "Verify the existence of all the root node clusters on EP0")
root = self.endpoints[0]
required_clusters = [Clusters.BasicInformation, Clusters.AccessControl, Clusters.GroupKeyManagement,
Clusters.GeneralCommissioning, Clusters.AdministratorCommissioning, Clusters.OperationalCredentials, Clusters.GeneralDiagnostics]
for c in required_clusters:
if c not in root:
self.record_error(self.get_test_name(), location=AttributePathLocation(endpoint_id=0),
problem=f'Root node does not contain required cluster {c}', spec_location="Root node device type")
self.fail_current_test()
def test_TC_DT_1_1(self):
self.print_step(1, "Perform a wildcard read of attributes on all endpoints - already done")
self.print_step(2, "Verify that each endpoint includes a descriptor cluster")
success = True
for endpoint_id, endpoint in self.endpoints.items():
has_descriptor = (Clusters.Descriptor in endpoint)
logging.info(f"Checking descriptor on Endpoint {endpoint_id}: {'found' if has_descriptor else 'not_found'}")
if not has_descriptor:
self.record_error(self.get_test_name(), location=AttributePathLocation(endpoint_id=endpoint_id, cluster_id=Clusters.Descriptor.id),
problem=f"Did not find a descriptor on endpoint {endpoint_id}", spec_location="Base Cluster Requirements for Matter")
success = False
if not success:
self.fail_current_test("At least one endpoint was missing the descriptor cluster.")
async def _read_non_standard_attribute_check_unsupported_read(self, endpoint_id, cluster_id, attribute_id) -> bool:
@dataclass
class TempAttribute(ClusterAttributeDescriptor):
@ChipUtility.classproperty
def cluster_id(cls) -> int:
return cluster_id
@ChipUtility.classproperty
def attribute_id(cls) -> int:
return attribute_id
@ChipUtility.classproperty
def attribute_type(cls) -> ClusterObjectFieldDescriptor:
return ClusterObjectFieldDescriptor(Type=uint)
@ChipUtility.classproperty
def standard_attribute(cls) -> bool:
return False
value: 'uint' = 0
result = await self.default_controller.Read(nodeid=self.dut_node_id, attributes=[(endpoint_id, TempAttribute)])
try:
attr_ret = result.tlvAttributes[endpoint_id][cluster_id][attribute_id]
except KeyError:
attr_ret = None
error_type_ok = attr_ret is not None and isinstance(
attr_ret, Clusters.Attribute.ValueDecodeFailure) and isinstance(attr_ret.Reason, InteractionModelError)
got_expected_error = error_type_ok and attr_ret.Reason.status == Status.UnsupportedRead
return got_expected_error
@async_test_body
async def test_TC_IDM_10_1(self):
self.print_step(1, "Perform a wildcard read of attributes on all endpoints - already done")
@dataclass
class RequiredMandatoryAttribute:
id: int
name: str
validators: list[Callable]
ATTRIBUTES_TO_CHECK = [
RequiredMandatoryAttribute(id=GlobalAttributeIds.CLUSTER_REVISION_ID, name="ClusterRevision",
validators=[check_int_in_range(1, 0xFFFF)]),
RequiredMandatoryAttribute(id=GlobalAttributeIds.FEATURE_MAP_ID, name="FeatureMap",
validators=[check_int_in_range(0, 0xFFFF_FFFF)]),
RequiredMandatoryAttribute(id=GlobalAttributeIds.ATTRIBUTE_LIST_ID, name="AttributeList",
validators=[check_non_empty_list_of_ints_in_range(0, 0xFFFF_FFFF), check_no_duplicates]),
# TODO: Check for EventList
# RequiredMandatoryAttribute(id=0xFFFA, name="EventList", validator=check_list_of_ints_in_range(0, 0xFFFF_FFFF)),
RequiredMandatoryAttribute(id=GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID, name="AcceptedCommandList",
validators=[check_list_of_ints_in_range(0, 0xFFFF_FFFF), check_no_duplicates]),
RequiredMandatoryAttribute(id=GlobalAttributeIds.GENERATED_COMMAND_LIST_ID, name="GeneratedCommandList",
validators=[check_list_of_ints_in_range(0, 0xFFFF_FFFF), check_no_duplicates]),
]
self.print_step(2, "Validate all global attributes are present")
success = True
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
for req_attribute in ATTRIBUTES_TO_CHECK:
attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, req_attribute.id)
has_attribute = (req_attribute.id in cluster)
location = AttributePathLocation(endpoint_id, cluster_id, req_attribute.id)
logging.debug(
f"Checking for mandatory global {attribute_string} on {location.as_cluster_string(self.cluster_mapper)}: {'found' if has_attribute else 'not_found'}")
# Check attribute is actually present
if not has_attribute:
self.record_error(self.get_test_name(), location=location,
problem=f"Did not find mandatory global {attribute_string} on {location.as_cluster_string(self.cluster_mapper)}", spec_location="Global Elements")
success = False
continue
self.print_step(3, "Validate the global attributes are in range and do not contain duplicates")
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
for req_attribute in ATTRIBUTES_TO_CHECK:
# Validate attribute value based on the provided validators.
for validator in req_attribute.validators:
try:
validator(cluster[req_attribute.id])
except ValueError as e:
location = AttributePathLocation(endpoint_id, cluster_id, req_attribute.id)
self.record_error(self.get_test_name(), location=location,
problem=f"Failed validation of value on {location.as_string(self.cluster_mapper)}: {str(e)}", spec_location="Global Elements")
success = False
continue
except KeyError:
# A KeyError here means the attribute does not exist. This problem was already recorded in step 2,
# but we don't assert until the end of the test, so ignore this and don't re-record the error.
continue
self.print_step(4, "Validate the attribute list exactly matches the set of reported attributes")
if success:
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
attribute_list = cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID]
for attribute_id in attribute_list:
location = AttributePathLocation(endpoint_id, cluster_id, attribute_id)
has_attribute = attribute_id in cluster
attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id)
logging.debug(
f"Checking presence of claimed supported {attribute_string} on {location.as_cluster_string(self.cluster_mapper)}: {'found' if has_attribute else 'not_found'}")
if not has_attribute:
# Check if this is a write-only attribute by trying to read it.
# If it's present and write-only it should return an UNSUPPORTED_READ error. All other errors are a failure.
# Because these can be MEI attributes, we need to build the ClusterAttributeDescriptor manually since it's
# not guaranteed to be generated. Since we expect an error back anyway, the type doesn't matter.
write_only_attribute = await self._read_non_standard_attribute_check_unsupported_read(
endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
if not write_only_attribute:
self.record_error(self.get_test_name(), location=location,
problem=f"Did not find {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} when it was claimed in AttributeList ({attribute_list})", spec_location="AttributeList Attribute")
success = False
continue
attribute_value = cluster[attribute_id]
if isinstance(attribute_value, ValueDecodeFailure):
self.record_warning(self.get_test_name(), location=location,
problem=f"Found a failure to read/decode {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} when it was claimed as supported in AttributeList ({attribute_list}): {str(attribute_value)}", spec_location="AttributeList Attribute")
# Warn only for now
# TODO: Fail in the future
continue
for attribute_id in cluster:
if attribute_id not in attribute_list:
attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id)
location = AttributePathLocation(endpoint_id, cluster_id, attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f'Found attribute {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} not listed in attribute list', spec_location="AttributeList Attribute")
success = False
self.print_step(
5, "Validate that the global attributes do not contain any additional values in the standard or scoped range that are not defined by the cluster specification")
# Validate there are attributes in the global range that are not in the required list
allowed_globals = [a.id for a in ATTRIBUTES_TO_CHECK]
# also allow event list because it's not disallowed
event_list_id = 0xFFFA
allowed_globals.append(event_list_id)
global_range_min = 0x0000_F000
attribute_standard_range_max = 0x000_4FFF
mei_range_min = 0x0001_0000
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
globals = [a for a in cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID] if a >= global_range_min and a < mei_range_min]
unexpected_globals = sorted(list(set(globals) - set(allowed_globals)))
for unexpected in unexpected_globals:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=unexpected)
self.record_error(self.get_test_name(), location=location,
problem=f"Unexpected global attribute {unexpected} in cluster {cluster_id}", spec_location="Global elements")
success = False
# validate that all the returned attributes in the standard clusters contain only known attribute ids
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
if cluster_id not in chip.clusters.ClusterObjects.ALL_ATTRIBUTES:
# Skip clusters that are not part of the standard generated corpus (e.g. MS clusters)
continue
standard_attributes = [a for a in cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID]
if a <= attribute_standard_range_max]
allowed_standard_attributes = chip.clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]
unexpected_standard_attributes = sorted(list(set(standard_attributes) - set(allowed_standard_attributes)))
for unexpected in unexpected_standard_attributes:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=unexpected)
self.record_error(self.get_test_name(), location=location,
problem=f"Unexpected standard attribute {unexpected} in cluster {cluster_id}", spec_location=f"Cluster {cluster_id}")
success = False
# validate there are no attributes in the range between standard and global
# This is de-facto already covered in the check above, assuming the spec hasn't defined any values in this range, but we should make sure
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
bad_range_values = [a for a in cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID] if a >
attribute_standard_range_max and a < global_range_min]
for bad in bad_range_values:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=bad)
self.record_error(self.get_test_name(), location=location,
problem=f"Attribute in undefined range {bad} in cluster {cluster_id}", spec_location=f"Cluster {cluster_id}")
success = False
command_standard_range_max = 0x0000_00FF
# Command lists only have a scoped range, so we only need to check for known command ids, no global range check
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
if cluster_id not in chip.clusters.ClusterObjects.ALL_CLUSTERS:
continue
standard_accepted_commands = [
a for a in cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] if a <= command_standard_range_max]
standard_generated_commands = [
a for a in cluster[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID] if a <= command_standard_range_max]
if cluster_id in chip.clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS:
allowed_accepted_commands = [a for a in chip.clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]]
else:
allowed_accepted_commands = []
if cluster_id in chip.clusters.ClusterObjects.ALL_GENERATED_COMMANDS:
allowed_generated_commands = [a for a in chip.clusters.ClusterObjects.ALL_GENERATED_COMMANDS[cluster_id]]
else:
allowed_generated_commands = []
# Compare the set of commands in the standard range that the DUT says it accepts vs. the commands we know about.
unexpected_accepted_commands = sorted(list(set(standard_accepted_commands) - set(allowed_accepted_commands)))
unexpected_generated_commands = sorted(list(set(standard_generated_commands) - set(allowed_generated_commands)))
for unexpected in unexpected_accepted_commands:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=unexpected)
self.record_error(self.get_test_name(
), location=location, problem=f'Unexpected accepted command {unexpected} in cluster {cluster_id} allowed: {allowed_accepted_commands} listed: {standard_accepted_commands}', spec_location=f'Cluster {cluster_id}')
success = False
for unexpected in unexpected_generated_commands:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=unexpected)
self.record_error(self.get_test_name(
), location=location, problem=f'Unexpected generated command {unexpected} in cluster {cluster_id} allowed: {allowed_generated_commands} listed: {standard_generated_commands}', spec_location=f'Cluster {cluster_id}')
success = False
self.print_step(
6, "Validate that none of the global attribute IDs contain values with prefixes outside of the allowed standard or MEI prefix range")
if self.is_pics_sdk_ci_only:
# test vendor prefixes are allowed in the CI because we use them internally in examples
bad_prefix_min = 0xFFF5_0000
else:
# test vendor prefixes are not allowed in products
bad_prefix_min = 0xFFF1_0000
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
attr_prefixes = [a & 0xFFFF_0000 for a in cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID]]
cmd_values = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] + \
cluster[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID]
cmd_prefixes = [a & 0xFFFF_0000 for a in cmd_values]
bad_attrs = [a for a in attr_prefixes if a >= bad_prefix_min]
bad_cmds = [a for a in cmd_prefixes if a >= bad_prefix_min]
for bad_attrib_id in bad_attrs:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=bad_attrib_id)
vendor_id = get_vendor_id(bad_attrib_id)
self.record_error(self.get_test_name(
), location=location, problem=f'Attribute 0x{bad_attrib_id:08x} with bad prefix 0x{vendor_id:04x} in cluster 0x{cluster_id:08x}' + (' (Test Vendor)' if attribute_id_type(bad_attrib_id) == AttributeIdType.kTest else ''), spec_location='Manufacturer Extensible Identifier (MEI)')
success = False
for bad_cmd_id in bad_cmds:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=bad_cmd_id)
vendor_id = get_vendor_id(bad_cmd_id)
self.record_error(self.get_test_name(
), location=location, problem=f'Command 0x{bad_cmd_id:08x} with bad prefix 0x{vendor_id:04x} in cluster 0x{cluster_id:08x}' + (' (Test Vendor)' if command_id_type(bad_cmd_id) == CommandIdType.kTest else ''), spec_location='Manufacturer Extensible Identifier (MEI)')
success = False
self.print_step(7, "Validate that none of the MEI global attribute IDs contain values outside of the allowed suffix range")
# Validate that any attribute in the manufacturer prefix range is in the standard suffix range.
suffix_mask = 0x0000_FFFF
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
manufacturer_range_values = [a for a in cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID] if a > mei_range_min]
for manufacturer_value in manufacturer_range_values:
suffix = manufacturer_value & suffix_mask
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id,
attribute_id=manufacturer_value)
if suffix > attribute_standard_range_max and suffix < global_range_min:
self.record_error(self.get_test_name(), location=location,
problem=f"Manufacturer attribute in undefined range {manufacturer_value} in cluster {cluster_id}",
spec_location=f"Cluster {cluster_id}")
success = False
elif suffix >= global_range_min:
self.record_error(self.get_test_name(), location=location,
problem=f"Manufacturer attribute in global range {manufacturer_value} in cluster {cluster_id}",
spec_location=f"Cluster {cluster_id}")
success = False
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
accepted_manufacturer_range_values = [
a for a in cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] if a > mei_range_min]
generated_manufacturer_range_values = [
a for a in cluster[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID] if a > mei_range_min]
all_command_manufacturer_range_values = accepted_manufacturer_range_values + generated_manufacturer_range_values
for manufacturer_value in all_command_manufacturer_range_values:
suffix = manufacturer_value & suffix_mask
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=manufacturer_value)
if suffix > command_standard_range_max:
self.record_error(self.get_test_name(
), location=location, problem=f'Manufacturer command in the undefined suffix range {manufacturer_value} in cluster {cluster_id}', spec_location='Manufacturer Extensible Identifier (MEI)')
success = False
self.print_step(8, "Validate that all cluster ID prefixes are in the standard or MEI range")
for endpoint_id, endpoint in self.endpoints_tlv.items():
cluster_prefixes = [a & 0xFFFF_0000 for a in endpoint.keys()]
bad_clusters_ids = [a for a in cluster_prefixes if a >= bad_prefix_min]
for bad_cluster_id in bad_clusters_ids:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=bad_cluster_id)
vendor_id = get_vendor_id(bad_cluster_id)
self.record_error(self.get_test_name(), location=location,
problem=f'Cluster 0x{bad_cluster_id:08x} with bad prefix 0x{vendor_id:04x}' + (' (Test Vendor)' if cluster_id_type(bad_cluster_id) == ClusterIdType.kTest else ''), spec_location='Manufacturer Extensible Identifier (MEI)')
success = False
self.print_step(9, "Validate that all clusters in the standard range have a known cluster ID")
for endpoint_id, endpoint in self.endpoints_tlv.items():
standard_clusters = [a for a in endpoint.keys() if a < mei_range_min]
unknown_clusters = sorted(list(set(standard_clusters) - set(chip.clusters.ClusterObjects.ALL_CLUSTERS)))
for bad in unknown_clusters:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=bad)
self.record_error(self.get_test_name(
), location=location, problem=f'Unknown cluster ID in the standard range {bad}', spec_location='Manufacturer Extensible Identifier (MEI)')
success = False
self.print_step(10, "Validate that all clusters in the MEI range have a suffix in the manufacturer suffix range")
for endpoint_id, endpoint in self.endpoints_tlv.items():
mei_clusters = [a for a in endpoint.keys() if a >= mei_range_min]
bad_clusters = [a for a in mei_clusters if ((a & 0x0000_FFFF) < 0xFC00) or ((a & 0x0000_FFFF) > 0xFFFE)]
for bad in bad_clusters:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=bad)
self.record_error(self.get_test_name(
), location=location, problem=f'MEI cluster with an out of range suffix {bad}', spec_location='Manufacturer Extensible Identifier (MEI)')
success = False
self.print_step(11, "Validate that standard cluster FeatureMap attributes contains only known feature flags")
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
if cluster_id not in chip.clusters.ClusterObjects.ALL_CLUSTERS:
continue
feature_map = cluster[GlobalAttributeIds.FEATURE_MAP_ID]
feature_mask = 0
try:
feature_map_enum = chip.clusters.ClusterObjects.ALL_CLUSTERS[cluster_id].Bitmaps.Feature
for f in feature_map_enum:
feature_mask = feature_mask | f
except AttributeError:
# If there is no feature bitmap, feature mask 0 is correct
pass
feature_map_extras = feature_map & ~feature_mask
if feature_map_extras != 0:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id)
self.record_error(self.get_test_name(), location=location,
problem=f'Standard cluster {cluster_id} with unkonwn feature {feature_map_extras:02x}')
success = False
if not success:
self.fail_current_test(
"At least one cluster has failed the range and support checks for its listed attributes, commands or features")
def test_TC_IDM_11_1(self):
success = True
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
for attribute_id, attribute in cluster.items():
if cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES or attribute_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]:
continue
if Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id].attribute_type.Type is not str:
continue
try:
cluster[attribute_id].encode('utf-8', errors='strict')
except UnicodeError:
location = AttributePathLocation(endpoint_id, cluster_id, attribute_id)
attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id)
self.record_error(self.get_test_name(
), location=location, problem=f'Attribute {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} is invalid UTF-8', spec_location="Data types - Character String")
success = False
if not success:
self.fail_current_test("At least one attribute string was not valid UTF-8")
# def test_all_event_strings_valid(self):
# asserts.skip("TODO: Validate every string in the read events is valid UTF-8 and has no nulls")
# def test_all_schema_scalars(self):
# asserts.skip("TODO: Validate all int/uint are in range of the schema (or null if nullable) for known attributes")
# def test_all_commands_reported_are_executable(self):
# asserts.skip("TODO: Validate all commands reported in AcceptedCommandList are actually executable")
# def test_dump_all_pics_for_all_endpoints(self):
# asserts.skip("TODO: Make a test that generates the basic PICS list for each endpoint based on actually reported contents")
# def test_all_schema_mandatory_elements_present(self):
# asserts.skip(
# "TODO: Make a test that ensures every known cluster has the mandatory elements present (commands, attributes) based on features")
# def test_all_endpoints_have_valid_composition(self):
# asserts.skip(
# "TODO: Make a test that verifies each endpoint has valid set of device types, and that the device type conformance is respected for each")
def test_TC_SM_1_2(self):
self.print_step(1, "Wildcard read of device - already done")
self.print_step(2, "Verify the Descriptor cluster PartsList on endpoint 0 exactly lists all the other (non-0) endpoints on the DUT")
parts_list_0 = self.endpoints[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]
cluster_id = Clusters.Descriptor.id
attribute_id = Clusters.Descriptor.Attributes.PartsList.attribute_id
location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=attribute_id)
if len(self.endpoints.keys()) != len(set(self.endpoints.keys())):
self.record_error(self.get_test_name(), location=location,
problem='duplicate endpoint ids found in the returned data', spec_location="PartsList Attribute")
self.fail_current_test()
if len(parts_list_0) != len(set(parts_list_0)):
self.record_error(self.get_test_name(), location=location,
problem='Duplicate endpoint ids found in the parts list on ep0', spec_location="PartsList Attribute")
self.fail_current_test()
expected_parts = set(self.endpoints.keys())
expected_parts.remove(0)
if set(parts_list_0) != expected_parts:
self.record_error(self.get_test_name(), location=location,
problem='EP0 Descriptor parts list does not match the set of returned endpoints', spec_location="PartsList Attribute")
self.fail_current_test()
self.print_step(
3, "For each endpoint on the DUT (including EP 0), verify the PartsList in the Descriptor cluster on that endpoint does not include itself")
for endpoint_id, endpoint in self.endpoints.items():
if endpoint_id in endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f"Endpoint {endpoint_id} parts list includes itself", spec_location="PartsList Attribute")
self.fail_current_test()
self.print_step(4, "Separate endpoints into flat and tree style")
flat, tree = separate_endpoint_types(self.endpoints)
self.print_step(5, "Check for cycles in the tree endpoints")
cycles = parts_list_cycles(tree, self.endpoints)
if len(cycles) != 0:
for id in cycles:
location = AttributePathLocation(endpoint_id=id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f"Endpoint {id} parts list includes a cycle", spec_location="PartsList Attribute")
self.fail_current_test()
self.print_step(6, "Check flat lists include all sub ids")
ok = True
for endpoint_id in flat:
# ensure that every sub-id in the parts list is included in the parent
if not flat_list_ok(endpoint_id, self.endpoints):
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem='Flat parts list does not exactly match sub-parts', spec_location='Endpoint composition')
ok = False
if not ok:
self.fail_current_test()
def test_TC_PS_3_1(self):
BRIDGED_NODE_DEVICE_TYPE_ID = 0x13
success = True
self.print_step(1, "Wildcard read of device - already done")
self.print_step(2, "Verify that all endpoints listed in the EndpointList are valid")
attribute_id = Clusters.PowerSource.Attributes.EndpointList.attribute_id
cluster_id = Clusters.PowerSource.id
attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id)
for endpoint_id, endpoint in self.endpoints.items():
if Clusters.PowerSource not in endpoint:
continue
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
cluster_revision = Clusters.PowerSource.Attributes.ClusterRevision
if cluster_revision not in endpoint[Clusters.PowerSource]:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id,
attribute_id=cluster_revision.attribute_id)
self.record_error(self.get_test_name(
), location=location, problem=f'Did not find Cluster revision on {location.as_cluster_string(self.cluster_mapper)}', spec_location='Global attributes')
if endpoint[Clusters.PowerSource][cluster_revision] < 2:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id,
attribute_id=cluster_revision.attribute_id)
self.record_note(self.get_test_name(), location=location,
problem='Power source ClusterRevision is < 2, skipping remainder of test for this endpoint')
continue
if Clusters.PowerSource.Attributes.EndpointList not in endpoint[Clusters.PowerSource]:
self.record_error(self.get_test_name(), location=location,
problem=f'Did not find {attribute_string} on {location.as_cluster_string(self.cluster_mapper)}', spec_location="EndpointList Attribute")
success = False
continue
endpoint_list = endpoint[Clusters.PowerSource][Clusters.PowerSource.Attributes.EndpointList]
non_existent = set(endpoint_list) - set(self.endpoints.keys())
if non_existent:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f'{attribute_string} lists a non-existent endpoint', spec_location="EndpointList Attribute")
success = False
self.print_step(3, "Verify that all Bridged Node endpoint lists are correct")
device_types = {}
parts_list = {}
for endpoint_id, endpoint in self.endpoints.items():
if Clusters.PowerSource not in endpoint or Clusters.PowerSource.Attributes.EndpointList not in endpoint[Clusters.PowerSource]:
continue
def GetPartValidityProblem(endpoint):
if Clusters.Descriptor not in endpoint:
return "Missing cluster descriptor"
if Clusters.Descriptor.Attributes.PartsList not in endpoint[Clusters.Descriptor]:
return "Missing PartList in descriptor cluster"
if Clusters.Descriptor.Attributes.DeviceTypeList not in endpoint[Clusters.Descriptor]:
return "Missing DeviceTypeList in descriptor cluster"
return None
problem = GetPartValidityProblem(endpoint)
if problem:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=Clusters.Descriptor.id,
attribute_id=Clusters.Descriptor.Attributes.PartsList.id)
self.record_error(self.get_test_name(), location=location,
problem=problem, spec_location="PartsList Attribute")
success = False
continue
device_types[endpoint_id] = [i.deviceType for i in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.DeviceTypeList]]
parts_list[endpoint_id] = endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]
bridged_nodes = [id for (id, dev_type) in device_types.items() if BRIDGED_NODE_DEVICE_TYPE_ID in dev_type]
for endpoint_id in bridged_nodes:
if Clusters.PowerSource not in self.endpoints[endpoint_id]:
continue
# using a list because we do want to preserve duplicates and error on those.
desired_endpoint_list = parts_list[endpoint_id].copy()
desired_endpoint_list.append(endpoint_id)
desired_endpoint_list.sort()
ep_list = self.endpoints[endpoint_id][Clusters.PowerSource][Clusters.PowerSource.Attributes.EndpointList]
ep_list.sort()
if ep_list != desired_endpoint_list:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f'Power source EndpointList on bridged node endpoint {endpoint_id} is not as expected. Desired: {desired_endpoint_list} Actual: {ep_list}', spec_location="EndpointList Attribute")
success = False
self.print_step(4, "Verify that all Bridged Node children endpoint lists are correct")
children = []
# note, this doesn't handle the full tree structure, single layer only
for endpoint_id in bridged_nodes:
children = children + parts_list[endpoint_id]
for endpoint_id in children:
if Clusters.PowerSource not in self.endpoints[endpoint_id]:
continue
desired_endpoint_list = [endpoint_id]
ep_list = self.endpoints[endpoint_id][Clusters.PowerSource][Clusters.PowerSource.Attributes.EndpointList]
ep_list.sort()
if ep_list != desired_endpoint_list:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
self.record_error(self.get_test_name(), location=location,
problem=f'Power source EndpointList on bridged child endpoint {endpoint_id} is not as expected. Desired: {desired_endpoint_list} Actual: {ep_list}', spec_location="EndpointList Attribute")
success = False
if not success:
self.fail_current_test("power source EndpointList attribute is incorrect")
def test_TC_DESC_2_2(self):
self.print_step(0, "Wildcard read of device - already done")
self.print_step(
1, "Identify all endpoints that are roots of a tree-composition. Omit any endpoints that include the Content App device type.")
_, tree = separate_endpoint_types(self.endpoints)
roots = find_tree_roots(tree, self.endpoints)
self.print_step(
1.1, "For each tree root, go through each of the children and add their endpoint IDs to a list of device types based on the DeviceTypes list")
device_types = create_device_type_lists(roots, self.endpoints)
self.print_step(
1.2, "For device types with more than one endpoint listed, ensure each of the listed endpoints has a tag attribute and the tag attributes are not the same")
problems = find_tag_list_problems(roots, device_types, self.endpoints)
def record_problems(problems):
for ep, problem in problems.items():
location = AttributePathLocation(endpoint_id=ep, cluster_id=Clusters.Descriptor.id,
attribute_id=Clusters.Descriptor.Attributes.TagList.attribute_id)
msg = f'problem on ep {ep}: missing feature = {problem.missing_feature}, missing attribute = {problem.missing_attribute}, duplicates = {problem.duplicates}, same_tags = {problem.same_tag}'
self.record_error(self.get_test_name(), location=location, problem=msg, spec_location="Descriptor TagList")
record_problems(problems)
self.print_step(2, "Identify all the direct children of the root node endpoint")
root_direct_children = get_direct_children_of_root(self.endpoints)
self.print_step(
2.1, "Go through each of the direct children of the root node and add their endpoint IDs to a list of device types based on the DeviceTypes list")
device_types = create_device_type_list_for_root(root_direct_children, self.endpoints)
self.print_step(
2.2, "For device types with more than one endpoint listed, ensure each of the listed endpoints has a tag attribute and the tag attributes are not the same")
root_problems = find_tag_list_problems([0], {0: device_types}, self.endpoints)
record_problems(root_problems)
if problems or root_problems:
self.fail_current_test("Problems with tags lists")
def steps_TC_IDM_12_1(self):
return [TestStep(0, "TH performs a wildcard read of all attributes and endpoints on the device"),
TestStep(1, "TH creates a MatterTlvJson dump of the wildcard attributes for submission to certification.")]
def test_TC_IDM_12_1(self):
# wildcard read - already done.
self.step(0)
# Create the dump
self.step(1)
pid = self.endpoints[0][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.ProductID]
vid = self.endpoints[0][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.VendorID]
software_version = self.endpoints[0][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.SoftwareVersion]
filename = f'device_dump_0x{vid:04X}_0x{pid:04X}_{software_version}.json'
dump_device_composition_path = self.user_params.get("dump_device_composition_path", filename)
json_str, txt_str = self.dump_wildcard(dump_device_composition_path)
# Structured dump so we can pull these back out of the logs
def log_structured_data(start_tag: str, dump_string):
lines = dump_string.splitlines()
logging.info(f'{start_tag}BEGIN ({len(lines)} lines)====')
for line in lines:
logging.info(f'{start_tag}{line}')
logging.info(f'{start_tag}END ====')
log_structured_data('==== json: ', json_str)
log_structured_data('==== txt: ', txt_str)
if __name__ == "__main__":
default_matter_test_main()