diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index d63a3772e62dc6..05bc3984b179f1 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -1170,33 +1170,26 @@ def _parseAttributePathTuple(self, pathTuple: typing.Union[ # Concrete path typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]] ]): - endpoint = None - cluster = None - attribute = None - if pathTuple == ('*') or pathTuple == (): # Wildcard - pass + return ClusterAttribute.AttributePath() elif not isinstance(pathTuple, tuple): if isinstance(pathTuple, int): - endpoint = pathTuple + return ClusterAttribute.AttributePath(EndpointId=pathTuple) elif issubclass(pathTuple, ClusterObjects.Cluster): - cluster = pathTuple + return ClusterAttribute.AttributePath.from_cluster(EndpointId=None, Cluster=pathTuple) elif issubclass(pathTuple, ClusterObjects.ClusterAttributeDescriptor): - attribute = pathTuple + return ClusterAttribute.AttributePath.from_attribute(EndpointId=None, Attribute=pathTuple) else: raise ValueError("Unsupported Attribute Path") else: # endpoint + (cluster) attribute / endpoint + cluster - endpoint = pathTuple[0] if issubclass(pathTuple[1], ClusterObjects.Cluster): - cluster = pathTuple[1] + return ClusterAttribute.AttributePath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1]) elif issubclass(pathTuple[1], ClusterAttribute.ClusterAttributeDescriptor): - attribute = pathTuple[1] + return ClusterAttribute.AttributePath.from_attribute(EndpointId=pathTuple[0], Attribute=pathTuple[1]) else: raise ValueError("Unsupported Attribute Path") - return ClusterAttribute.AttributePath( - EndpointId=endpoint, Cluster=cluster, Attribute=attribute) def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]): endpoint = None @@ -1209,7 +1202,7 @@ def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int, else: raise ValueError("Unsupported Cluster Path") dataVersion = pathTuple[2] - return ClusterAttribute.DataVersionFilter( + return ClusterAttribute.DataVersionFilter.from_cluster( EndpointId=endpoint, Cluster=cluster, DataVersion=dataVersion) def _parseEventPathTuple(self, pathTuple: typing.Union[ @@ -1226,39 +1219,31 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[ typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] ]): - endpoint = None - cluster = None - event = None - urgent = False if pathTuple in [('*'), ()]: # Wildcard - pass + return ClusterAttribute.EventPath() elif not isinstance(pathTuple, tuple): logging.debug(type(pathTuple)) if isinstance(pathTuple, int): - endpoint = pathTuple + return ClusterAttribute.EventPath(EndpointId=pathTuple) elif issubclass(pathTuple, ClusterObjects.Cluster): - cluster = pathTuple + return ClusterAttribute.EventPath.from_cluster(EndpointId=None, Cluster=pathTuple) elif issubclass(pathTuple, ClusterObjects.ClusterEvent): - event = pathTuple + return ClusterAttribute.EventPath.from_event(EndpointId=None, Event=pathTuple) else: raise ValueError("Unsupported Event Path") else: if pathTuple[0] == '*': - urgent = pathTuple[-1] - pass + return ClusterAttribute.EventPath(Urgent=pathTuple[-1]) else: + urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False # endpoint + (cluster) event / endpoint + cluster - endpoint = pathTuple[0] if issubclass(pathTuple[1], ClusterObjects.Cluster): - cluster = pathTuple[1] + return ClusterAttribute.EventPath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1], Urgent=urgent) elif issubclass(pathTuple[1], ClusterAttribute.ClusterEvent): - event = pathTuple[1] + return ClusterAttribute.EventPath.from_event(EndpointId=pathTuple[0], Event=pathTuple[1], Urgent=urgent) else: raise ValueError("Unsupported Attribute Path") - urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False - return ClusterAttribute.EventPath( - EndpointId=endpoint, Cluster=cluster, Event=event, Urgent=urgent) async def Read(self, nodeid: int, attributes: typing.List[typing.Union[ None, # Empty tuple, all wildcard @@ -1530,7 +1515,7 @@ def ZCLReadAttribute(self, cluster, attribute, nodeid, endpoint, groupid, blocki result = asyncio.run(self.ReadAttribute( nodeid, [(endpoint, attributeType)])) - path = ClusterAttribute.AttributePath( + path = ClusterAttribute.AttributePath.from_attribute( EndpointId=endpoint, Attribute=attributeType) return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId), status=0, value=result[endpoint][clusterType][attributeType], dataVersion=result[endpoint][clusterType][ClusterAttribute.DataVersion]) diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 9e46eed469d39f..a1a341ed46614a 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -54,62 +54,43 @@ class EventPriority(Enum): CRITICAL = 2 -@dataclass +@dataclass(frozen=True) class AttributePath: EndpointId: int = None ClusterId: int = None AttributeId: int = None - def __init__(self, EndpointId: int = None, Cluster=None, Attribute=None, ClusterId=None, AttributeId=None): - self.EndpointId = EndpointId - if Cluster is not None: - # Wildcard read for a specific cluster - if (Attribute is not None) or (ClusterId is not None) or (AttributeId is not None): - raise Warning( - "Attribute, ClusterId and AttributeId is ignored when Cluster is specified") - self.ClusterId = Cluster.id - return - if Attribute is not None: - if (ClusterId is not None) or (AttributeId is not None): - raise Warning( - "ClusterId and AttributeId is ignored when Attribute is specified") - self.ClusterId = Attribute.cluster_id - self.AttributeId = Attribute.attribute_id - return - self.ClusterId = ClusterId - self.AttributeId = AttributeId + @staticmethod + def from_cluster(EndpointId: int, Cluster: Cluster) -> AttributePath: + if Cluster is None: + raise ValueError("Cluster cannot be None") + return AttributePath(EndpointId=EndpointId, ClusterId=Cluster.id) + + @staticmethod + def from_attribute(EndpointId: int, Attribute: ClusterAttributeDescriptor) -> AttributePath: + if Attribute is None: + raise ValueError("Attribute cannot be None") + return AttributePath(EndpointId=EndpointId, ClusterId=Attribute.cluster_id, AttributeId=Attribute.attribute_id) def __str__(self) -> str: return f"{self.EndpointId}/{self.ClusterId}/{self.AttributeId}" - def __hash__(self): - return str(self).__hash__() - -@dataclass +@dataclass(frozen=True) class DataVersionFilter: EndpointId: int = None ClusterId: int = None DataVersion: int = None - def __init__(self, EndpointId: int = None, Cluster=None, ClusterId=None, DataVersion=None): - self.EndpointId = EndpointId - if Cluster is not None: - # Wildcard read for a specific cluster - if (ClusterId is not None): - raise Warning( - "Attribute, ClusterId and AttributeId is ignored when Cluster is specified") - self.ClusterId = Cluster.id - else: - self.ClusterId = ClusterId - self.DataVersion = DataVersion + @staticmethod + def from_cluster(EndpointId: int, Cluster: Cluster, DataVersion: int = None) -> AttributePath: + if Cluster is None: + raise ValueError("Cluster cannot be None") + return DataVersionFilter(EndpointId=EndpointId, ClusterId=Cluster.id, DataVersion=DataVersion) def __str__(self) -> str: return f"{self.EndpointId}/{self.ClusterId}/{self.DataVersion}" - def __hash__(self): - return str(self).__hash__() - @dataclass class TypedAttributePath: @@ -165,44 +146,28 @@ def __init__(self, ClusterType: Cluster = None, AttributeType: ClusterAttributeD self.AttributeId = self.AttributeType.attribute_id -@dataclass +@dataclass(frozen=True) class EventPath: EndpointId: int = None ClusterId: int = None EventId: int = None Urgent: int = None - def __init__(self, EndpointId: int = None, Cluster=None, Event=None, ClusterId=None, EventId=None, Urgent=None): - self.EndpointId = EndpointId - self.Urgent = Urgent - if Cluster is not None: - # Wildcard read for a specific cluster - if (Event is not None) or (ClusterId is not None) or (EventId is not None): - raise Warning( - "Event, ClusterId and AttributeId is ignored when Cluster is specified") - self.ClusterId = Cluster.id - return - if Event is not None: - if (ClusterId is not None) or (EventId is not None): - raise Warning( - "ClusterId and EventId is ignored when Event is specified") - self.ClusterId = Event.cluster_id - self.EventId = Event.event_id - return - self.ClusterId = ClusterId - self.EventId = EventId + @staticmethod + def from_cluster(EndpointId: int, Cluster: Cluster, EventId: int = None, Urgent: int = None) -> "EventPath": + if Cluster is None: + raise ValueError("Cluster cannot be None") + return EventPath(EndpointId=EndpointId, ClusterId=Cluster.id, EventId=EventId, Urgent=Urgent) + + @staticmethod + def from_event(EndpointId: int, Event: ClusterEvent, Urgent: int = None) -> "EventPath": + if Event is None: + raise ValueError("Event cannot be None") + return EventPath(EndpointId=EndpointId, ClusterId=Event.cluster_id, EventId=Event.event_id, Urgent=Urgent) def __str__(self) -> str: return f"{self.EndpointId}/{self.ClusterId}/{self.EventId}/{self.Urgent}" - def __hash__(self): - return str(self).__hash__() - - -@dataclass -class AttributePathWithListIndex(AttributePath): - ListIndex: int = None - @dataclass class EventHeader: @@ -688,7 +653,7 @@ def SetClientObjPointers(self, pReadClient, pReadCallback): def GetAllEventValues(self): return self._events - def handleAttributeData(self, path: AttributePathWithListIndex, dataVersion: int, status: int, data: bytes): + def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes): try: imStatus = chip.interaction_model.Status(status) diff --git a/src/controller/python/test/test_scripts/cluster_objects.py b/src/controller/python/test/test_scripts/cluster_objects.py index 53516c1dc75e9b..37f6819cbe66a4 100644 --- a/src/controller/python/test/test_scripts/cluster_objects.py +++ b/src/controller/python/test/test_scripts/cluster_objects.py @@ -164,7 +164,7 @@ async def TestWriteRequest(cls, devCtrl): ] ) expectedRes = [ - AttributeStatus(Path=AttributePath( + AttributeStatus(Path=AttributePath.from_attribute( EndpointId=1, Attribute=Clusters.UnitTesting.Attributes.ListLongOctetString), Status=chip.interaction_model.Status.Success), ]