From cf3e79f88d11e24ee13eeb2c549ba55937eabb78 Mon Sep 17 00:00:00 2001 From: Song GUO Date: Tue, 20 Jun 2023 12:14:22 +0800 Subject: [PATCH] [Python] Implement TLV List type (#25238) * [python] Implement TLV List type * Update * Revert "Update" This reverts commit 448c46f0e4b150b27f8ad4f982d670d231b68bf8. * Fix * Update --- src/controller/python/BUILD.gn | 1 + src/controller/python/chip/tlv/__init__.py | 12 +- src/controller/python/chip/tlv/tlvlist.py | 175 ++++++++++++++++++ .../python/test/unit_tests/test_tlv.py | 62 ++++++- 4 files changed, 245 insertions(+), 5 deletions(-) create mode 100644 src/controller/python/chip/tlv/tlvlist.py diff --git a/src/controller/python/BUILD.gn b/src/controller/python/BUILD.gn index 07f288830a3f7d..c76c64c65d7df4 100644 --- a/src/controller/python/BUILD.gn +++ b/src/controller/python/BUILD.gn @@ -360,6 +360,7 @@ chip_python_wheel_action("chip-clusters") { "chip/clusters/Types.py", "chip/clusters/enum.py", "chip/tlv/__init__.py", + "chip/tlv/tlvlist.py", ] }, { diff --git a/src/controller/python/chip/tlv/__init__.py b/src/controller/python/chip/tlv/__init__.py index 6b22fbb8536299..a29704f5343ebd 100644 --- a/src/controller/python/chip/tlv/__init__.py +++ b/src/controller/python/chip/tlv/__init__.py @@ -32,6 +32,8 @@ from collections.abc import Mapping, Sequence from enum import Enum +from .tlvlist import TLVList + TLV_TYPE_SIGNED_INTEGER = 0x00 TLV_TYPE_UNSIGNED_INTEGER = 0x04 TLV_TYPE_BOOLEAN = 0x08 @@ -223,6 +225,11 @@ def put(self, tag, val): for containedTag, containedVal in val.items(): self.put(containedTag, containedVal) self.endContainer() + elif isinstance(val, TLVList): + self.startPath(tag) + for containedTag, containedVal in val: + self.put(containedTag, containedVal) + self.endContainer() elif isinstance(val, Sequence): self.startArray(tag) for containedVal in val: @@ -576,7 +583,7 @@ def _decodeVal(self, tlv, decoding): decoding["Array"] = [] self._get(tlv, decoding["Array"], decoding["value"]) elif decoding["type"] == "Path": - decoding["value"] = [] + decoding["value"] = TLVList() decoding["Path"] = [] self._get(tlv, decoding["Path"], decoding["value"]) elif decoding["type"] == "Null": @@ -682,6 +689,9 @@ def _get(self, tlv, decodings, out): if isinstance(out, Mapping): tag = decoding["tag"] if decoding["tag"] is not None else "Any" out[tag] = decoding["value"] + elif isinstance(out, TLVList): + tag = decoding["tag"] if decoding["tag"] is not None else None + out.append(tag, decoding["value"]) else: out.append(decoding["value"]) else: diff --git a/src/controller/python/chip/tlv/tlvlist.py b/src/controller/python/chip/tlv/tlvlist.py new file mode 100644 index 00000000000000..a9afaa3e2bb26e --- /dev/null +++ b/src/controller/python/chip/tlv/tlvlist.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +# coding=utf-8 + +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import dataclasses +import enum +from typing import Any, Iterator, List, Tuple, Union + + +class TLVList: + """Represents a list in CHIP TLV. + + A TLVList can be constructed from a `list` of tuples of tag and value. `None` stands for "anonymous tag". + + e.g. + ``` + l = TLVList([(1, 'a'), (2, 'b'), (None, 'c')]) + ``` + + Constructs a list of three items, tag 1 is 'a', tag 2 is 'b' and with an anonymous item 'c'. + + Since TLVLists are ordered, it is meanful to iterate over an list: + + e.g. + ``` + for tag, val in l: + print(f"tag={tag}, val={val}") + ``` + + Outputs: + ``` + tag=1, val=a + tag=2, val=b + tag=None, val=c + ``` + + One can also append items into an list: + + e.g. + ``` + l.append(3, 'd') + ``` + + The content of `l` will be `[(1, 'a'), (2, 'b'), (None, 'c'), (3, 'd')]` + + One can access an item in the list via the tag. + + e.g. + ``` + val = l[1] + # val is 'a' + ``` + + It is also possible to get an item via the index since it is ordered: + + e.g. + ``` + tag, val = l[TLVList.IndexMethod.Tag:2] + # tag is None, val is 'c' + ``` + """ + + @dataclasses.dataclass + class TLVListItem: + tag: Union[None, int] + value: Any + + def as_tuple(self): + return (self.tag, self.value) + + def as_rich_repr_tuple(self): + if self.tag is None: + return "Anonymous", repr(self.value) + else: + return str(self.tag), repr(self.value) + + def __repr__(self): + if self.tag is None: + return "Anonymous: " + repr(self.value) + else: + return str(self.tag) + ": " + repr(self.value) + + def __rich_repr__(self): + yield self.as_rich_repr_tuple() + + class IndexMethod(enum.Enum): + Index = 0 + Tag = 1 + + class Iterator: + def __init__(self, iter: Iterator): + self._iterator = iter + + def __iter__(self): + return self + + def __next__(self): + res = next(self._iterator) + return res.tag, res.value + + def __init__(self, items: List[Tuple[Union[int, None], Any]] = []): + """Constructs a TLVList. + + items: A list of tuples for the tag and value for the items in the TLVList. + """ + self._data: List[TLVList.TLVListItem] = [] + + for tag, val in items: + self.append(tag, val) + + def _get_item_by_tag(self, tag) -> Any: + if not isinstance(tag, int): + raise ValueError("Tag should be a integer for non-anonymous fields.") + for data in self._data: + if data.tag == tag: + return data.value + raise KeyError(f"Tag {tag} not found in the list.") + + def __getitem__(self, access) -> Any: + """Gets a item in the list by the tag or the index. + + Examples: + ``` + tlv_list[1] # returns the item in the list with tag `1` + tlv_list[TLVList.IndexMethod.Tag:2] # returns the item in the list with tag `2` + tlv_list[TLVList.IndexMethod.Index:0] # returns the tag and value of the first item in the list + ``` + """ + if isinstance(access, slice): + tag, index = access.start, access.stop + if tag == TLVList.IndexMethod.Tag: + return self._get_item_by_tag(index) + elif tag == TLVList.IndexMethod.Index: + return self._data[index].as_tuple() + raise ValueError("Method should be TLVList.IndexMethod.Tag or TLVList.IndexMethod.Index") + elif isinstance(access, int): + return self._get_item_by_tag(access) + raise ValueError("Invalid access method") + + def append(self, tag: Union[None, int], value: Any) -> None: + """Appends an item to the list.""" + if (tag is not None) and (not isinstance(tag, int)): + raise KeyError(f"Tag should be a integer or none for anonymous tag, {type(tag)} got") + self._data.append(TLVList.TLVListItem(tag, value)) + + def __repr__(self): + return "TLVList" + repr(self._data) + + def __rich_repr__(self): + for items in self._data: + yield items.as_rich_repr_tuple() + + def __iter__(self) -> """TLVList.Iterator""": + return TLVList.Iterator(iter(self._data)) + + def __eq__(self, rhs: "TLVList") -> bool: + if not isinstance(rhs, TLVList): + return False + return self._data == rhs._data diff --git a/src/controller/python/test/unit_tests/test_tlv.py b/src/controller/python/test/unit_tests/test_tlv.py index 74d7aefe0f07cc..7a80b24c497320 100644 --- a/src/controller/python/test/unit_tests/test_tlv.py +++ b/src/controller/python/test/unit_tests/test_tlv.py @@ -17,7 +17,7 @@ import unittest -from chip.tlv import TLVReader, TLVWriter +from chip.tlv import TLVList, TLVReader, TLVWriter from chip.tlv import uint as tlvUint @@ -112,6 +112,24 @@ def test_uint(self): except Exception: pass + def test_list(self): + encodeVal = self._getEncoded(TLVList([(None, 1), (None, 2), (1, 3)])) + self.assertEqual(encodeVal, bytearray([0b00010111, # List, anonymous tag + 0x00, 0x01, # Anonymous tag, 1 octet signed int `1`` + 0x00, 0x02, # Anonymous tag, 1 octet signed int `2`` + 0b00100000, 0x01, 0x03, # Context specific tag `1`, 1 octet signed int `3` + 0x18 # End of container + ])) + encodeVal = self._getEncoded(TLVList([(None, 1), (None, TLVList([(None, 2), (3, 4)]))])) + self.assertEqual(encodeVal, bytearray([0b00010111, # List, anonymous tag + 0x00, 0x01, # Anonymous tag, 1 octet signed int `1`` + 0b00010111, # List anonymous tag + 0x00, 0x02, # Anonymous tag, 1 octet signed int `2`` + 0b00100000, 0x03, 0x04, # Context specific tag `1`, 1 octet signed int `3` + 0x18, # End of inner list + 0x18 # End of container + ])) + class TestTLVReader(unittest.TestCase): def _read_case(self, input, answer): @@ -151,16 +169,52 @@ def test_structure(self): test_cases = [ (b'\x15\x36\x01\x15\x35\x01\x26\x00\xBF\xA2\x55\x16\x37\x01\x24' b'\x02\x00\x24\x03\x28\x24\x04\x00\x18\x24\x02\x01\x18\x18\x18\x18', - {1: [{1: {0: 374710975, 1: [0, 40, 0], 2: 1}}]}), + {1: [{1: {0: 374710975, 1: TLVList([(2, 0), (3, 40), (4, 0)]), 2: 1}}]}), (b'\x156\x01\x155\x01&\x00\xBF\xA2U\x167\x01$\x02\x00$\x03($\x04\x01' b'\x18,\x02\x18Nordic Semiconductor ASA\x18\x18\x18\x18', - {1: [{1: {0: 374710975, 1: [0, 40, 1], 2: 'Nordic Semiconductor ASA'}}]}), + {1: [{1: {0: 374710975, 1: TLVList([(2, 0), (3, 40), (4, 1)]), 2: 'Nordic Semiconductor ASA'}}]}), (b"\0256\001\0255\001&\000\031\346x\2077\001$\002\001$\003\006$\004\000\030(\002\030\030\030\030", - {1: [{1: {0: 2272847385, 1: [1, 6, 0], 2: False}}]}) + {1: [{1: {0: 2272847385, 1: TLVList([(2, 1), (3, 6), (4, 0)]), 2: False}}]}) ] for tlv_bytes, answer in test_cases: self._read_case(tlv_bytes, answer) + def test_list(self): + self._read_case([0b00010111, # List, anonymous tag + 0x00, 0x01, # Anonymous tag, 1 octet signed int `1`` + 0x00, 0x02, # Anonymous tag, 1 octet signed int `2`` + 0b00100000, 0x01, 0x03, # Context specific tag `1`, 1 octet signed int `3` + 0x18 # End of container + ], TLVList([(None, 1), (None, 2), (1, 3)])) + self._read_case([0b00010111, # List, anonymous tag + 0x00, 0x01, # Anonymous tag, 1 octet signed int `1`` + 0b00010111, # List anonymous tag + 0x00, 0x02, # Anonymous tag, 1 octet signed int `2`` + 0b00100000, 0x03, 0x04, # Context specific tag `1`, 1 octet signed int `3` + 0x18, # End of inner list + 0x18 # End of container + ], TLVList([(None, 1), (None, TLVList([(None, 2), (3, 4)]))])) + + +class TestTLVTypes(unittest.TestCase): + def test_list(self): + var = TLVList([(None, 1), (None, 2), (1, 3)]) + self.assertEqual(var[1], 3) + self.assertEqual(var[TLVList.IndexMethod.Index:0], (None, 1)) + self.assertEqual(var[TLVList.IndexMethod.Tag:1], 3) + + var.append(None, 4) + self.assertEqual(var, TLVList([(None, 1), (None, 2), (1, 3), (None, 4)])) + + var.append(5, 6) + self.assertEqual(var, TLVList([(None, 1), (None, 2), (1, 3), (None, 4), (5, 6)])) + + expectIterateContent = [(None, 1), (None, 2), (1, 3), (None, 4), (5, 6)] + iteratedContent = [] + for tag, value in var: + iteratedContent.append((tag, value)) + self.assertEqual(expectIterateContent, iteratedContent) + if __name__ == '__main__': unittest.main()