Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Implement TLV List type #25238

Merged
merged 5 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/controller/python/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ chip_python_wheel_action("chip-clusters") {
"chip/clusters/Types.py",
"chip/clusters/enum.py",
"chip/tlv/__init__.py",
"chip/tlv/tlvlist.py",
andy31415 marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
Expand Down
12 changes: 11 additions & 1 deletion src/controller/python/chip/tlv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from collections.abc import Mapping, Sequence
from enum import Enum

from .tlvlist import TLVList

TLV_TYPE_SIGNED_INTEGER = 0x00
TLV_TYPE_UNSIGNED_INTEGER = 0x04
TLV_TYPE_BOOLEAN = 0x08
Expand Down Expand Up @@ -223,6 +225,11 @@ def put(self, tag, val):
for containedTag, containedVal in val.items():
self.put(containedTag, containedVal)
self.endContainer()
elif isinstance(val, TLVList):
self.startPath(tag)
for containedTag, containedVal in val:
self.put(containedTag, containedVal)
self.endContainer()
elif isinstance(val, Sequence):
self.startArray(tag)
for containedVal in val:
Expand Down Expand Up @@ -576,7 +583,7 @@ def _decodeVal(self, tlv, decoding):
decoding["Array"] = []
self._get(tlv, decoding["Array"], decoding["value"])
elif decoding["type"] == "Path":
decoding["value"] = []
decoding["value"] = TLVList()
decoding["Path"] = []
self._get(tlv, decoding["Path"], decoding["value"])
elif decoding["type"] == "Null":
Expand Down Expand Up @@ -682,6 +689,9 @@ def _get(self, tlv, decodings, out):
if isinstance(out, Mapping):
tag = decoding["tag"] if decoding["tag"] is not None else "Any"
out[tag] = decoding["value"]
elif isinstance(out, TLVList):
tag = decoding["tag"] if decoding["tag"] is not None else None
out.append(tag, decoding["value"])
else:
out.append(decoding["value"])
else:
Expand Down
175 changes: 175 additions & 0 deletions src/controller/python/chip/tlv/tlvlist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env python3
# coding=utf-8

#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import dataclasses
import enum
from typing import Any, Iterator, List, Tuple, Union


class TLVList:
"""Represents a list in CHIP TLV.

A TLVList can be constructed from a `list` of tuples of tag and value. `None` stands for "anonymous tag".

e.g.
```
l = TLVList([(1, 'a'), (2, 'b'), (None, 'c')])
```

Constructs a list of three items, tag 1 is 'a', tag 2 is 'b' and with an anonymous item 'c'.

Since TLVLists are ordered, it is meanful to iterate over an list:

e.g.
```
for tag, val in l:
print(f"tag={tag}, val={val}")
```

Outputs:
```
tag=1, val=a
tag=2, val=b
tag=None, val=c
```

One can also append items into an list:

e.g.
```
l.append(3, 'd')
```

The content of `l` will be `[(1, 'a'), (2, 'b'), (None, 'c'), (3, 'd')]`

One can access an item in the list via the tag.

e.g.
```
val = l[1]
# val is 'a'
```

It is also possible to get an item via the index since it is ordered:

e.g.
```
tag, val = l[TLVList.IndexMethod.Tag:2]
# tag is None, val is 'c'
```
"""

@dataclasses.dataclass
class TLVListItem:
tag: Union[None, int]
value: Any

def as_tuple(self):
return (self.tag, self.value)

def as_rich_repr_tuple(self):
if self.tag is None:
return "Anonymous", repr(self.value)
else:
return str(self.tag), repr(self.value)

def __repr__(self):
if self.tag is None:
return "Anonymous: " + repr(self.value)
else:
return str(self.tag) + ": " + repr(self.value)

def __rich_repr__(self):
yield self.as_rich_repr_tuple()

class IndexMethod(enum.Enum):
Index = 0
Tag = 1

class Iterator:
def __init__(self, iter: Iterator):
self._iterator = iter

def __iter__(self):
return self

def __next__(self):
res = next(self._iterator)
return res.tag, res.value

def __init__(self, items: List[Tuple[Union[int, None], Any]] = []):
"""Constructs a TLVList.

items: A list of tuples for the tag and value for the items in the TLVList.
"""
self._data: List[TLVList.TLVListItem] = []

for tag, val in items:
self.append(tag, val)

def _get_item_by_tag(self, tag) -> Any:
if not isinstance(tag, int):
raise ValueError("Tag should be a integer for non-anonymous fields.")
for data in self._data:
if data.tag == tag:
return data.value
raise KeyError(f"Tag {tag} not found in the list.")

def __getitem__(self, access) -> Any:
"""Gets a item in the list by the tag or the index.

Examples:
```
tlv_list[1] # returns the item in the list with tag `1`
tlv_list[TLVList.IndexMethod.Tag:2] # returns the item in the list with tag `2`
tlv_list[TLVList.IndexMethod.Index:0] # returns the tag and value of the first item in the list
```
"""
if isinstance(access, slice):
tag, index = access.start, access.stop
if tag == TLVList.IndexMethod.Tag:
return self._get_item_by_tag(index)
elif tag == TLVList.IndexMethod.Index:
return self._data[index].as_tuple()
raise ValueError("Method should be TLVList.IndexMethod.Tag or TLVList.IndexMethod.Index")
elif isinstance(access, int):
return self._get_item_by_tag(access)
raise ValueError("Invalid access method")

def append(self, tag: Union[None, int], value: Any) -> None:
"""Appends an item to the list."""
if (tag is not None) and (not isinstance(tag, int)):
raise KeyError(f"Tag should be a integer or none for anonymous tag, {type(tag)} got")
self._data.append(TLVList.TLVListItem(tag, value))

def __repr__(self):
return "TLVList" + repr(self._data)

def __rich_repr__(self):
for items in self._data:
yield items.as_rich_repr_tuple()

def __iter__(self) -> """TLVList.Iterator""":
return TLVList.Iterator(iter(self._data))

def __eq__(self, rhs: "TLVList") -> bool:
if not isinstance(rhs, TLVList):
return False
return self._data == rhs._data
62 changes: 58 additions & 4 deletions src/controller/python/test/unit_tests/test_tlv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import unittest

from chip.tlv import TLVReader, TLVWriter
from chip.tlv import TLVList, TLVReader, TLVWriter
from chip.tlv import uint as tlvUint


Expand Down Expand Up @@ -112,6 +112,24 @@ def test_uint(self):
except Exception:
pass

def test_list(self):
encodeVal = self._getEncoded(TLVList([(None, 1), (None, 2), (1, 3)]))
self.assertEqual(encodeVal, bytearray([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x01, 0x03, # Context specific tag `1`, 1 octet signed int `3`
0x18 # End of container
]))
encodeVal = self._getEncoded(TLVList([(None, 1), (None, TLVList([(None, 2), (3, 4)]))]))
self.assertEqual(encodeVal, bytearray([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0b00010111, # List anonymous tag
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x03, 0x04, # Context specific tag `1`, 1 octet signed int `3`
0x18, # End of inner list
0x18 # End of container
]))


class TestTLVReader(unittest.TestCase):
def _read_case(self, input, answer):
Expand Down Expand Up @@ -151,16 +169,52 @@ def test_structure(self):
test_cases = [
(b'\x15\x36\x01\x15\x35\x01\x26\x00\xBF\xA2\x55\x16\x37\x01\x24'
b'\x02\x00\x24\x03\x28\x24\x04\x00\x18\x24\x02\x01\x18\x18\x18\x18',
{1: [{1: {0: 374710975, 1: [0, 40, 0], 2: 1}}]}),
{1: [{1: {0: 374710975, 1: TLVList([(2, 0), (3, 40), (4, 0)]), 2: 1}}]}),
(b'\x156\x01\x155\x01&\x00\xBF\xA2U\x167\x01$\x02\x00$\x03($\x04\x01'
b'\x18,\x02\x18Nordic Semiconductor ASA\x18\x18\x18\x18',
{1: [{1: {0: 374710975, 1: [0, 40, 1], 2: 'Nordic Semiconductor ASA'}}]}),
{1: [{1: {0: 374710975, 1: TLVList([(2, 0), (3, 40), (4, 1)]), 2: 'Nordic Semiconductor ASA'}}]}),
(b"\0256\001\0255\001&\000\031\346x\2077\001$\002\001$\003\006$\004\000\030(\002\030\030\030\030",
{1: [{1: {0: 2272847385, 1: [1, 6, 0], 2: False}}]})
{1: [{1: {0: 2272847385, 1: TLVList([(2, 1), (3, 6), (4, 0)]), 2: False}}]})
]
for tlv_bytes, answer in test_cases:
self._read_case(tlv_bytes, answer)

def test_list(self):
self._read_case([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x01, 0x03, # Context specific tag `1`, 1 octet signed int `3`
0x18 # End of container
], TLVList([(None, 1), (None, 2), (1, 3)]))
self._read_case([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0b00010111, # List anonymous tag
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x03, 0x04, # Context specific tag `1`, 1 octet signed int `3`
0x18, # End of inner list
0x18 # End of container
], TLVList([(None, 1), (None, TLVList([(None, 2), (3, 4)]))]))


class TestTLVTypes(unittest.TestCase):
def test_list(self):
var = TLVList([(None, 1), (None, 2), (1, 3)])
self.assertEqual(var[1], 3)
self.assertEqual(var[TLVList.IndexMethod.Index:0], (None, 1))
self.assertEqual(var[TLVList.IndexMethod.Tag:1], 3)

var.append(None, 4)
self.assertEqual(var, TLVList([(None, 1), (None, 2), (1, 3), (None, 4)]))

var.append(5, 6)
self.assertEqual(var, TLVList([(None, 1), (None, 2), (1, 3), (None, 4), (5, 6)]))

expectIterateContent = [(None, 1), (None, 2), (1, 3), (None, 4), (5, 6)]
iteratedContent = []
for tag, value in var:
iteratedContent.append((tag, value))
self.assertEqual(expectIterateContent, iteratedContent)


if __name__ == '__main__':
unittest.main()