From 4d4cb5ed2e5dc8f379a7f57c818e14c7d6c0de17 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Fri, 24 May 2024 10:39:41 +0200 Subject: [PATCH] [Python] Make AttributePath more pythonic (#33571) * [Python] Make AttributePath more pythonic Use dataclass default initializer to initialize AttributePath. Use static method to initialize from Cluster or Attribute. Also hash the integer fields directly, this is more efficient than formatting a string first. * Drop AttributePathWithListIndex Drop AttributePathWithListIndex as it is unused. * Make DataVersionFilter/EventPath pythonic as well Use frozen data classes and static initializers similar to AttributePath. * Fix _parseEventPathTuple * Fix _parseDataVersionFilterTuple --- src/controller/python/chip/ChipDeviceCtrl.py | 47 +++------ .../python/chip/clusters/Attribute.py | 97 ++++++------------- .../test/test_scripts/cluster_objects.py | 2 +- 3 files changed, 48 insertions(+), 98 deletions(-) diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index d63a3772e62dc6..05bc3984b179f1 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -1170,33 +1170,26 @@ def _parseAttributePathTuple(self, pathTuple: typing.Union[ # Concrete path typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]] ]): - endpoint = None - cluster = None - attribute = None - if pathTuple == ('*') or pathTuple == (): # Wildcard - pass + return ClusterAttribute.AttributePath() elif not isinstance(pathTuple, tuple): if isinstance(pathTuple, int): - endpoint = pathTuple + return ClusterAttribute.AttributePath(EndpointId=pathTuple) elif issubclass(pathTuple, ClusterObjects.Cluster): - cluster = pathTuple + return ClusterAttribute.AttributePath.from_cluster(EndpointId=None, Cluster=pathTuple) elif issubclass(pathTuple, ClusterObjects.ClusterAttributeDescriptor): - attribute = pathTuple + return ClusterAttribute.AttributePath.from_attribute(EndpointId=None, Attribute=pathTuple) else: raise ValueError("Unsupported Attribute Path") else: # endpoint + (cluster) attribute / endpoint + cluster - endpoint = pathTuple[0] if issubclass(pathTuple[1], ClusterObjects.Cluster): - cluster = pathTuple[1] + return ClusterAttribute.AttributePath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1]) elif issubclass(pathTuple[1], ClusterAttribute.ClusterAttributeDescriptor): - attribute = pathTuple[1] + return ClusterAttribute.AttributePath.from_attribute(EndpointId=pathTuple[0], Attribute=pathTuple[1]) else: raise ValueError("Unsupported Attribute Path") - return ClusterAttribute.AttributePath( - EndpointId=endpoint, Cluster=cluster, Attribute=attribute) def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]): endpoint = None @@ -1209,7 +1202,7 @@ def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int, else: raise ValueError("Unsupported Cluster Path") dataVersion = pathTuple[2] - return ClusterAttribute.DataVersionFilter( + return ClusterAttribute.DataVersionFilter.from_cluster( EndpointId=endpoint, Cluster=cluster, DataVersion=dataVersion) def _parseEventPathTuple(self, pathTuple: typing.Union[ @@ -1226,39 +1219,31 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[ typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] ]): - endpoint = None - cluster = None - event = None - urgent = False if pathTuple in [('*'), ()]: # Wildcard - pass + return ClusterAttribute.EventPath() elif not isinstance(pathTuple, tuple): logging.debug(type(pathTuple)) if isinstance(pathTuple, int): - endpoint = pathTuple + return ClusterAttribute.EventPath(EndpointId=pathTuple) elif issubclass(pathTuple, ClusterObjects.Cluster): - cluster = pathTuple + return ClusterAttribute.EventPath.from_cluster(EndpointId=None, Cluster=pathTuple) elif issubclass(pathTuple, ClusterObjects.ClusterEvent): - event = pathTuple + return ClusterAttribute.EventPath.from_event(EndpointId=None, Event=pathTuple) else: raise ValueError("Unsupported Event Path") else: if pathTuple[0] == '*': - urgent = pathTuple[-1] - pass + return ClusterAttribute.EventPath(Urgent=pathTuple[-1]) else: + urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False # endpoint + (cluster) event / endpoint + cluster - endpoint = pathTuple[0] if issubclass(pathTuple[1], ClusterObjects.Cluster): - cluster = pathTuple[1] + return ClusterAttribute.EventPath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1], Urgent=urgent) elif issubclass(pathTuple[1], ClusterAttribute.ClusterEvent): - event = pathTuple[1] + return ClusterAttribute.EventPath.from_event(EndpointId=pathTuple[0], Event=pathTuple[1], Urgent=urgent) else: raise ValueError("Unsupported Attribute Path") - urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False - return ClusterAttribute.EventPath( - EndpointId=endpoint, Cluster=cluster, Event=event, Urgent=urgent) async def Read(self, nodeid: int, attributes: typing.List[typing.Union[ None, # Empty tuple, all wildcard @@ -1530,7 +1515,7 @@ def ZCLReadAttribute(self, cluster, attribute, nodeid, endpoint, groupid, blocki result = asyncio.run(self.ReadAttribute( nodeid, [(endpoint, attributeType)])) - path = ClusterAttribute.AttributePath( + path = ClusterAttribute.AttributePath.from_attribute( EndpointId=endpoint, Attribute=attributeType) return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId), status=0, value=result[endpoint][clusterType][attributeType], dataVersion=result[endpoint][clusterType][ClusterAttribute.DataVersion]) diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 9e46eed469d39f..a1a341ed46614a 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -54,62 +54,43 @@ class EventPriority(Enum): CRITICAL = 2 -@dataclass +@dataclass(frozen=True) class AttributePath: EndpointId: int = None ClusterId: int = None AttributeId: int = None - def __init__(self, EndpointId: int = None, Cluster=None, Attribute=None, ClusterId=None, AttributeId=None): - self.EndpointId = EndpointId - if Cluster is not None: - # Wildcard read for a specific cluster - if (Attribute is not None) or (ClusterId is not None) or (AttributeId is not None): - raise Warning( - "Attribute, ClusterId and AttributeId is ignored when Cluster is specified") - self.ClusterId = Cluster.id - return - if Attribute is not None: - if (ClusterId is not None) or (AttributeId is not None): - raise Warning( - "ClusterId and AttributeId is ignored when Attribute is specified") - self.ClusterId = Attribute.cluster_id - self.AttributeId = Attribute.attribute_id - return - self.ClusterId = ClusterId - self.AttributeId = AttributeId + @staticmethod + def from_cluster(EndpointId: int, Cluster: Cluster) -> AttributePath: + if Cluster is None: + raise ValueError("Cluster cannot be None") + return AttributePath(EndpointId=EndpointId, ClusterId=Cluster.id) + + @staticmethod + def from_attribute(EndpointId: int, Attribute: ClusterAttributeDescriptor) -> AttributePath: + if Attribute is None: + raise ValueError("Attribute cannot be None") + return AttributePath(EndpointId=EndpointId, ClusterId=Attribute.cluster_id, AttributeId=Attribute.attribute_id) def __str__(self) -> str: return f"{self.EndpointId}/{self.ClusterId}/{self.AttributeId}" - def __hash__(self): - return str(self).__hash__() - -@dataclass +@dataclass(frozen=True) class DataVersionFilter: EndpointId: int = None ClusterId: int = None DataVersion: int = None - def __init__(self, EndpointId: int = None, Cluster=None, ClusterId=None, DataVersion=None): - self.EndpointId = EndpointId - if Cluster is not None: - # Wildcard read for a specific cluster - if (ClusterId is not None): - raise Warning( - "Attribute, ClusterId and AttributeId is ignored when Cluster is specified") - self.ClusterId = Cluster.id - else: - self.ClusterId = ClusterId - self.DataVersion = DataVersion + @staticmethod + def from_cluster(EndpointId: int, Cluster: Cluster, DataVersion: int = None) -> AttributePath: + if Cluster is None: + raise ValueError("Cluster cannot be None") + return DataVersionFilter(EndpointId=EndpointId, ClusterId=Cluster.id, DataVersion=DataVersion) def __str__(self) -> str: return f"{self.EndpointId}/{self.ClusterId}/{self.DataVersion}" - def __hash__(self): - return str(self).__hash__() - @dataclass class TypedAttributePath: @@ -165,44 +146,28 @@ def __init__(self, ClusterType: Cluster = None, AttributeType: ClusterAttributeD self.AttributeId = self.AttributeType.attribute_id -@dataclass +@dataclass(frozen=True) class EventPath: EndpointId: int = None ClusterId: int = None EventId: int = None Urgent: int = None - def __init__(self, EndpointId: int = None, Cluster=None, Event=None, ClusterId=None, EventId=None, Urgent=None): - self.EndpointId = EndpointId - self.Urgent = Urgent - if Cluster is not None: - # Wildcard read for a specific cluster - if (Event is not None) or (ClusterId is not None) or (EventId is not None): - raise Warning( - "Event, ClusterId and AttributeId is ignored when Cluster is specified") - self.ClusterId = Cluster.id - return - if Event is not None: - if (ClusterId is not None) or (EventId is not None): - raise Warning( - "ClusterId and EventId is ignored when Event is specified") - self.ClusterId = Event.cluster_id - self.EventId = Event.event_id - return - self.ClusterId = ClusterId - self.EventId = EventId + @staticmethod + def from_cluster(EndpointId: int, Cluster: Cluster, EventId: int = None, Urgent: int = None) -> "EventPath": + if Cluster is None: + raise ValueError("Cluster cannot be None") + return EventPath(EndpointId=EndpointId, ClusterId=Cluster.id, EventId=EventId, Urgent=Urgent) + + @staticmethod + def from_event(EndpointId: int, Event: ClusterEvent, Urgent: int = None) -> "EventPath": + if Event is None: + raise ValueError("Event cannot be None") + return EventPath(EndpointId=EndpointId, ClusterId=Event.cluster_id, EventId=Event.event_id, Urgent=Urgent) def __str__(self) -> str: return f"{self.EndpointId}/{self.ClusterId}/{self.EventId}/{self.Urgent}" - def __hash__(self): - return str(self).__hash__() - - -@dataclass -class AttributePathWithListIndex(AttributePath): - ListIndex: int = None - @dataclass class EventHeader: @@ -688,7 +653,7 @@ def SetClientObjPointers(self, pReadClient, pReadCallback): def GetAllEventValues(self): return self._events - def handleAttributeData(self, path: AttributePathWithListIndex, dataVersion: int, status: int, data: bytes): + def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes): try: imStatus = chip.interaction_model.Status(status) diff --git a/src/controller/python/test/test_scripts/cluster_objects.py b/src/controller/python/test/test_scripts/cluster_objects.py index 53516c1dc75e9b..37f6819cbe66a4 100644 --- a/src/controller/python/test/test_scripts/cluster_objects.py +++ b/src/controller/python/test/test_scripts/cluster_objects.py @@ -164,7 +164,7 @@ async def TestWriteRequest(cls, devCtrl): ] ) expectedRes = [ - AttributeStatus(Path=AttributePath( + AttributeStatus(Path=AttributePath.from_attribute( EndpointId=1, Attribute=Clusters.UnitTesting.Attributes.ListLongOctetString), Status=chip.interaction_model.Status.Success), ]