Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Make AttributePath more pythonic #33571

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 16 additions & 31 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,33 +1170,26 @@ def _parseAttributePathTuple(self, pathTuple: typing.Union[
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]]
]):
endpoint = None
cluster = None
attribute = None

if pathTuple == ('*') or pathTuple == ():
# Wildcard
pass
return ClusterAttribute.AttributePath()
elif not isinstance(pathTuple, tuple):
if isinstance(pathTuple, int):
endpoint = pathTuple
return ClusterAttribute.AttributePath(EndpointId=pathTuple)
elif issubclass(pathTuple, ClusterObjects.Cluster):
cluster = pathTuple
return ClusterAttribute.AttributePath.from_cluster(EndpointId=None, Cluster=pathTuple)
elif issubclass(pathTuple, ClusterObjects.ClusterAttributeDescriptor):
attribute = pathTuple
return ClusterAttribute.AttributePath.from_attribute(EndpointId=None, Attribute=pathTuple)
else:
raise ValueError("Unsupported Attribute Path")
else:
# endpoint + (cluster) attribute / endpoint + cluster
endpoint = pathTuple[0]
if issubclass(pathTuple[1], ClusterObjects.Cluster):
cluster = pathTuple[1]
return ClusterAttribute.AttributePath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1])
elif issubclass(pathTuple[1], ClusterAttribute.ClusterAttributeDescriptor):
attribute = pathTuple[1]
return ClusterAttribute.AttributePath.from_attribute(EndpointId=pathTuple[0], Attribute=pathTuple[1])
else:
raise ValueError("Unsupported Attribute Path")
return ClusterAttribute.AttributePath(
EndpointId=endpoint, Cluster=cluster, Attribute=attribute)

def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]):
endpoint = None
Expand All @@ -1209,7 +1202,7 @@ def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int,
else:
raise ValueError("Unsupported Cluster Path")
dataVersion = pathTuple[2]
return ClusterAttribute.DataVersionFilter(
return ClusterAttribute.DataVersionFilter.from_cluster(
EndpointId=endpoint, Cluster=cluster, DataVersion=dataVersion)

def _parseEventPathTuple(self, pathTuple: typing.Union[
Expand All @@ -1226,39 +1219,31 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
typing.Tuple[int,
typing.Type[ClusterObjects.ClusterEvent], int]
]):
endpoint = None
cluster = None
event = None
urgent = False
if pathTuple in [('*'), ()]:
# Wildcard
pass
return ClusterAttribute.EventPath()
elif not isinstance(pathTuple, tuple):
logging.debug(type(pathTuple))
if isinstance(pathTuple, int):
endpoint = pathTuple
return ClusterAttribute.EventPath(EndpointId=pathTuple)
elif issubclass(pathTuple, ClusterObjects.Cluster):
cluster = pathTuple
return ClusterAttribute.EventPath.from_cluster(EndpointId=None, Cluster=pathTuple)
elif issubclass(pathTuple, ClusterObjects.ClusterEvent):
event = pathTuple
return ClusterAttribute.EventPath.from_event(EndpointId=None, Event=pathTuple)
else:
raise ValueError("Unsupported Event Path")
else:
if pathTuple[0] == '*':
urgent = pathTuple[-1]
pass
return ClusterAttribute.EventPath(Urgent=pathTuple[-1])
else:
urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False
# endpoint + (cluster) event / endpoint + cluster
endpoint = pathTuple[0]
if issubclass(pathTuple[1], ClusterObjects.Cluster):
cluster = pathTuple[1]
return ClusterAttribute.EventPath.from_cluster(EndpointId=pathTuple[0], Cluster=pathTuple[1], Urgent=urgent)
elif issubclass(pathTuple[1], ClusterAttribute.ClusterEvent):
event = pathTuple[1]
return ClusterAttribute.EventPath.from_event(EndpointId=pathTuple[0], Event=pathTuple[1], Urgent=urgent)
else:
raise ValueError("Unsupported Attribute Path")
urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False
return ClusterAttribute.EventPath(
EndpointId=endpoint, Cluster=cluster, Event=event, Urgent=urgent)

async def Read(self, nodeid: int, attributes: typing.List[typing.Union[
None, # Empty tuple, all wildcard
Expand Down Expand Up @@ -1530,7 +1515,7 @@ def ZCLReadAttribute(self, cluster, attribute, nodeid, endpoint, groupid, blocki

result = asyncio.run(self.ReadAttribute(
nodeid, [(endpoint, attributeType)]))
path = ClusterAttribute.AttributePath(
path = ClusterAttribute.AttributePath.from_attribute(
EndpointId=endpoint, Attribute=attributeType)
return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId),
status=0, value=result[endpoint][clusterType][attributeType], dataVersion=result[endpoint][clusterType][ClusterAttribute.DataVersion])
Expand Down
97 changes: 31 additions & 66 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,62 +54,43 @@ class EventPriority(Enum):
CRITICAL = 2


@dataclass
@dataclass(frozen=True)
class AttributePath:
EndpointId: int = None
ClusterId: int = None
AttributeId: int = None

def __init__(self, EndpointId: int = None, Cluster=None, Attribute=None, ClusterId=None, AttributeId=None):
self.EndpointId = EndpointId
if Cluster is not None:
# Wildcard read for a specific cluster
if (Attribute is not None) or (ClusterId is not None) or (AttributeId is not None):
raise Warning(
"Attribute, ClusterId and AttributeId is ignored when Cluster is specified")
self.ClusterId = Cluster.id
return
if Attribute is not None:
if (ClusterId is not None) or (AttributeId is not None):
raise Warning(
"ClusterId and AttributeId is ignored when Attribute is specified")
self.ClusterId = Attribute.cluster_id
self.AttributeId = Attribute.attribute_id
return
self.ClusterId = ClusterId
self.AttributeId = AttributeId
@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster) -> AttributePath:
if Cluster is None:
raise ValueError("Cluster cannot be None")
return AttributePath(EndpointId=EndpointId, ClusterId=Cluster.id)

@staticmethod
def from_attribute(EndpointId: int, Attribute: ClusterAttributeDescriptor) -> AttributePath:
if Attribute is None:
raise ValueError("Attribute cannot be None")
return AttributePath(EndpointId=EndpointId, ClusterId=Attribute.cluster_id, AttributeId=Attribute.attribute_id)

def __str__(self) -> str:
return f"{self.EndpointId}/{self.ClusterId}/{self.AttributeId}"

def __hash__(self):
return str(self).__hash__()


@dataclass
@dataclass(frozen=True)
class DataVersionFilter:
EndpointId: int = None
ClusterId: int = None
DataVersion: int = None

def __init__(self, EndpointId: int = None, Cluster=None, ClusterId=None, DataVersion=None):
self.EndpointId = EndpointId
if Cluster is not None:
# Wildcard read for a specific cluster
if (ClusterId is not None):
raise Warning(
"Attribute, ClusterId and AttributeId is ignored when Cluster is specified")
self.ClusterId = Cluster.id
else:
self.ClusterId = ClusterId
self.DataVersion = DataVersion
@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster, DataVersion: int = None) -> AttributePath:
if Cluster is None:
raise ValueError("Cluster cannot be None")
return DataVersionFilter(EndpointId=EndpointId, ClusterId=Cluster.id, DataVersion=DataVersion)

def __str__(self) -> str:
return f"{self.EndpointId}/{self.ClusterId}/{self.DataVersion}"

def __hash__(self):
return str(self).__hash__()


@dataclass
class TypedAttributePath:
Expand Down Expand Up @@ -165,44 +146,28 @@ def __init__(self, ClusterType: Cluster = None, AttributeType: ClusterAttributeD
self.AttributeId = self.AttributeType.attribute_id


@dataclass
@dataclass(frozen=True)
class EventPath:
EndpointId: int = None
ClusterId: int = None
EventId: int = None
Urgent: int = None

def __init__(self, EndpointId: int = None, Cluster=None, Event=None, ClusterId=None, EventId=None, Urgent=None):
self.EndpointId = EndpointId
self.Urgent = Urgent
if Cluster is not None:
# Wildcard read for a specific cluster
if (Event is not None) or (ClusterId is not None) or (EventId is not None):
raise Warning(
"Event, ClusterId and AttributeId is ignored when Cluster is specified")
self.ClusterId = Cluster.id
return
if Event is not None:
if (ClusterId is not None) or (EventId is not None):
raise Warning(
"ClusterId and EventId is ignored when Event is specified")
self.ClusterId = Event.cluster_id
self.EventId = Event.event_id
return
self.ClusterId = ClusterId
self.EventId = EventId
@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster, EventId: int = None, Urgent: int = None) -> "EventPath":
if Cluster is None:
raise ValueError("Cluster cannot be None")
return EventPath(EndpointId=EndpointId, ClusterId=Cluster.id, EventId=EventId, Urgent=Urgent)

@staticmethod
def from_event(EndpointId: int, Event: ClusterEvent, Urgent: int = None) -> "EventPath":
if Event is None:
raise ValueError("Event cannot be None")
return EventPath(EndpointId=EndpointId, ClusterId=Event.cluster_id, EventId=Event.event_id, Urgent=Urgent)

def __str__(self) -> str:
return f"{self.EndpointId}/{self.ClusterId}/{self.EventId}/{self.Urgent}"

def __hash__(self):
return str(self).__hash__()


@dataclass
class AttributePathWithListIndex(AttributePath):
ListIndex: int = None


@dataclass
class EventHeader:
Expand Down Expand Up @@ -688,7 +653,7 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
def GetAllEventValues(self):
return self._events

def handleAttributeData(self, path: AttributePathWithListIndex, dataVersion: int, status: int, data: bytes):
def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
try:
imStatus = chip.interaction_model.Status(status)

Expand Down
2 changes: 1 addition & 1 deletion src/controller/python/test/test_scripts/cluster_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async def TestWriteRequest(cls, devCtrl):
]
)
expectedRes = [
AttributeStatus(Path=AttributePath(
AttributeStatus(Path=AttributePath.from_attribute(
EndpointId=1,
Attribute=Clusters.UnitTesting.Attributes.ListLongOctetString), Status=chip.interaction_model.Status.Success),
]
Expand Down
Loading