Skip to content

Commit

Permalink
[Python] Make AttributePath more pythonic (#33571)
Browse files Browse the repository at this point in the history
* [Python] Make AttributePath more pythonic

Use dataclass default initializer to initialize AttributePath. Use
static method to initialize from Cluster or Attribute.

Also hash the integer fields directly, this is more efficient than
formatting a string first.

* Drop AttributePathWithListIndex

Drop AttributePathWithListIndex as it is unused.

* Make DataVersionFilter/EventPath pythonic as well

Use frozen data classes and static initializers similar to
AttributePath.

* Fix _parseEventPathTuple

* Fix _parseDataVersionFilterTuple
  • Loading branch information
agners authored May 24, 2024
1 parent 638aa28 commit 4d4cb5e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 98 deletions.
47 changes: 16 additions & 31 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,33 +1170,26 @@ def _parseAttributePathTuple(self, pathTuple: typing.Union[
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]]
]):
endpoint = None
cluster = None
attribute = None

if pathTuple == ('*') or pathTuple == ():
# Wildcard
pass
return ClusterAttribute.AttributePath()
elif not isinstance(pathTuple, tuple):
if isinstance(pathTuple, int):
endpoint = pathTuple
return ClusterAttribute.AttributePath(EndpointId=pathTuple)
elif issubclass(pathTuple, ClusterObjects.Cluster):
cluster = pathTuple
return ClusterAttribute.AttributePath.from_cluster(EndpointId=None, Cluster=pathTuple)
elif issubclass(pathTuple, ClusterObjects.ClusterAttributeDescriptor):
attribute = pathTuple
return ClusterAttribute.AttributePath.from_attribute(EndpointId=None, Attribute=pathTuple)
else:
raise ValueError("Unsupported Attribute Path")
else:
# endpoint + (cluster) attribute / endpoint + cluster
endpoint = pathTuple[0]
if issubclass(pathTuple[1], ClusterObjects.Cluster):
cluster = pathTuple[1]
return ClusterAttribute.AttributePath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1])
elif issubclass(pathTuple[1], ClusterAttribute.ClusterAttributeDescriptor):
attribute = pathTuple[1]
return ClusterAttribute.AttributePath.from_attribute(EndpointId=pathTuple[0], Attribute=pathTuple[1])
else:
raise ValueError("Unsupported Attribute Path")
return ClusterAttribute.AttributePath(
EndpointId=endpoint, Cluster=cluster, Attribute=attribute)

def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]):
endpoint = None
Expand All @@ -1209,7 +1202,7 @@ def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int,
else:
raise ValueError("Unsupported Cluster Path")
dataVersion = pathTuple[2]
return ClusterAttribute.DataVersionFilter(
return ClusterAttribute.DataVersionFilter.from_cluster(
EndpointId=endpoint, Cluster=cluster, DataVersion=dataVersion)

def _parseEventPathTuple(self, pathTuple: typing.Union[
Expand All @@ -1226,39 +1219,31 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
typing.Tuple[int,
typing.Type[ClusterObjects.ClusterEvent], int]
]):
endpoint = None
cluster = None
event = None
urgent = False
if pathTuple in [('*'), ()]:
# Wildcard
pass
return ClusterAttribute.EventPath()
elif not isinstance(pathTuple, tuple):
logging.debug(type(pathTuple))
if isinstance(pathTuple, int):
endpoint = pathTuple
return ClusterAttribute.EventPath(EndpointId=pathTuple)
elif issubclass(pathTuple, ClusterObjects.Cluster):
cluster = pathTuple
return ClusterAttribute.EventPath.from_cluster(EndpointId=None, Cluster=pathTuple)
elif issubclass(pathTuple, ClusterObjects.ClusterEvent):
event = pathTuple
return ClusterAttribute.EventPath.from_event(EndpointId=None, Event=pathTuple)
else:
raise ValueError("Unsupported Event Path")
else:
if pathTuple[0] == '*':
urgent = pathTuple[-1]
pass
return ClusterAttribute.EventPath(Urgent=pathTuple[-1])
else:
urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False
# endpoint + (cluster) event / endpoint + cluster
endpoint = pathTuple[0]
if issubclass(pathTuple[1], ClusterObjects.Cluster):
cluster = pathTuple[1]
return ClusterAttribute.EventPath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1], Urgent=urgent)
elif issubclass(pathTuple[1], ClusterAttribute.ClusterEvent):
event = pathTuple[1]
return ClusterAttribute.EventPath.from_event(EndpointId=pathTuple[0], Event=pathTuple[1], Urgent=urgent)
else:
raise ValueError("Unsupported Attribute Path")
urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False
return ClusterAttribute.EventPath(
EndpointId=endpoint, Cluster=cluster, Event=event, Urgent=urgent)

async def Read(self, nodeid: int, attributes: typing.List[typing.Union[
None, # Empty tuple, all wildcard
Expand Down Expand Up @@ -1530,7 +1515,7 @@ def ZCLReadAttribute(self, cluster, attribute, nodeid, endpoint, groupid, blocki

result = asyncio.run(self.ReadAttribute(
nodeid, [(endpoint, attributeType)]))
path = ClusterAttribute.AttributePath(
path = ClusterAttribute.AttributePath.from_attribute(
EndpointId=endpoint, Attribute=attributeType)
return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId),
status=0, value=result[endpoint][clusterType][attributeType], dataVersion=result[endpoint][clusterType][ClusterAttribute.DataVersion])
Expand Down
97 changes: 31 additions & 66 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,62 +54,43 @@ class EventPriority(Enum):
CRITICAL = 2


@dataclass
@dataclass(frozen=True)
class AttributePath:
EndpointId: int = None
ClusterId: int = None
AttributeId: int = None

def __init__(self, EndpointId: int = None, Cluster=None, Attribute=None, ClusterId=None, AttributeId=None):
self.EndpointId = EndpointId
if Cluster is not None:
# Wildcard read for a specific cluster
if (Attribute is not None) or (ClusterId is not None) or (AttributeId is not None):
raise Warning(
"Attribute, ClusterId and AttributeId is ignored when Cluster is specified")
self.ClusterId = Cluster.id
return
if Attribute is not None:
if (ClusterId is not None) or (AttributeId is not None):
raise Warning(
"ClusterId and AttributeId is ignored when Attribute is specified")
self.ClusterId = Attribute.cluster_id
self.AttributeId = Attribute.attribute_id
return
self.ClusterId = ClusterId
self.AttributeId = AttributeId
@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster) -> AttributePath:
if Cluster is None:
raise ValueError("Cluster cannot be None")
return AttributePath(EndpointId=EndpointId, ClusterId=Cluster.id)

@staticmethod
def from_attribute(EndpointId: int, Attribute: ClusterAttributeDescriptor) -> AttributePath:
if Attribute is None:
raise ValueError("Attribute cannot be None")
return AttributePath(EndpointId=EndpointId, ClusterId=Attribute.cluster_id, AttributeId=Attribute.attribute_id)

def __str__(self) -> str:
return f"{self.EndpointId}/{self.ClusterId}/{self.AttributeId}"

def __hash__(self):
return str(self).__hash__()


@dataclass
@dataclass(frozen=True)
class DataVersionFilter:
EndpointId: int = None
ClusterId: int = None
DataVersion: int = None

def __init__(self, EndpointId: int = None, Cluster=None, ClusterId=None, DataVersion=None):
self.EndpointId = EndpointId
if Cluster is not None:
# Wildcard read for a specific cluster
if (ClusterId is not None):
raise Warning(
"Attribute, ClusterId and AttributeId is ignored when Cluster is specified")
self.ClusterId = Cluster.id
else:
self.ClusterId = ClusterId
self.DataVersion = DataVersion
@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster, DataVersion: int = None) -> AttributePath:
if Cluster is None:
raise ValueError("Cluster cannot be None")
return DataVersionFilter(EndpointId=EndpointId, ClusterId=Cluster.id, DataVersion=DataVersion)

def __str__(self) -> str:
return f"{self.EndpointId}/{self.ClusterId}/{self.DataVersion}"

def __hash__(self):
return str(self).__hash__()


@dataclass
class TypedAttributePath:
Expand Down Expand Up @@ -165,44 +146,28 @@ def __init__(self, ClusterType: Cluster = None, AttributeType: ClusterAttributeD
self.AttributeId = self.AttributeType.attribute_id


@dataclass
@dataclass(frozen=True)
class EventPath:
EndpointId: int = None
ClusterId: int = None
EventId: int = None
Urgent: int = None

def __init__(self, EndpointId: int = None, Cluster=None, Event=None, ClusterId=None, EventId=None, Urgent=None):
self.EndpointId = EndpointId
self.Urgent = Urgent
if Cluster is not None:
# Wildcard read for a specific cluster
if (Event is not None) or (ClusterId is not None) or (EventId is not None):
raise Warning(
"Event, ClusterId and AttributeId is ignored when Cluster is specified")
self.ClusterId = Cluster.id
return
if Event is not None:
if (ClusterId is not None) or (EventId is not None):
raise Warning(
"ClusterId and EventId is ignored when Event is specified")
self.ClusterId = Event.cluster_id
self.EventId = Event.event_id
return
self.ClusterId = ClusterId
self.EventId = EventId
@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster, EventId: int = None, Urgent: int = None) -> "EventPath":
if Cluster is None:
raise ValueError("Cluster cannot be None")
return EventPath(EndpointId=EndpointId, ClusterId=Cluster.id, EventId=EventId, Urgent=Urgent)

@staticmethod
def from_event(EndpointId: int, Event: ClusterEvent, Urgent: int = None) -> "EventPath":
if Event is None:
raise ValueError("Event cannot be None")
return EventPath(EndpointId=EndpointId, ClusterId=Event.cluster_id, EventId=Event.event_id, Urgent=Urgent)

def __str__(self) -> str:
return f"{self.EndpointId}/{self.ClusterId}/{self.EventId}/{self.Urgent}"

def __hash__(self):
return str(self).__hash__()


@dataclass
class AttributePathWithListIndex(AttributePath):
ListIndex: int = None


@dataclass
class EventHeader:
Expand Down Expand Up @@ -688,7 +653,7 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
def GetAllEventValues(self):
return self._events

def handleAttributeData(self, path: AttributePathWithListIndex, dataVersion: int, status: int, data: bytes):
def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
try:
imStatus = chip.interaction_model.Status(status)

Expand Down
2 changes: 1 addition & 1 deletion src/controller/python/test/test_scripts/cluster_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async def TestWriteRequest(cls, devCtrl):
]
)
expectedRes = [
AttributeStatus(Path=AttributePath(
AttributeStatus(Path=AttributePath.from_attribute(
EndpointId=1,
Attribute=Clusters.UnitTesting.Attributes.ListLongOctetString), Status=chip.interaction_model.Status.Success),
]
Expand Down

0 comments on commit 4d4cb5e

Please sign in to comment.