Skip to content

Commit

Permalink
[Python] Process attribute cache updates in Python thread (#35557)
Browse files Browse the repository at this point in the history
* [Python] Process attribute cache updates in Python thread

Instead of processing the attribute update in the SDK thread, process
them on request in the Python thread. This avoids acks being sent back
too late to the device  after the last DataReport if there are many
attribute updates sent at once.

Currently still the same data model and processing is done. There is
certainly also room for optimization to make this more efficient.

* Get updated attribute values

Make sure to get the attribute values again after each command to get
the updated attribute cache.

* Reference ReadEvent/ReadAttribute APIs on dev controller object
  • Loading branch information
agners authored and pull[bot] committed Nov 28, 2024
1 parent 1565e95 commit 905b2fd
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 101 deletions.
4 changes: 2 additions & 2 deletions docs/guides/repl/Matter_Basic_Interactions.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3504,7 +3504,7 @@
"source": [
"#### Read Events:\n",
"\n",
"A `ReadEvents` API exists that behaves similarly to the `ReadAttributes` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations."
"A `ReadEvent` API exists that behaves similarly to the `ReadAttribute` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations."
]
},
{
Expand Down Expand Up @@ -3609,7 +3609,7 @@
"source": [
"### Subscription Interaction\n",
"\n",
"To subscribe to a Node, the same `ReadAttributes` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription."
"To subscribe to a Node, the same `ReadAttribute` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription."
]
},
{
Expand Down
132 changes: 75 additions & 57 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,20 +1433,23 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
else:
raise ValueError("Unsupported Attribute Path")

async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]] = None,
async def Read(
self,
nodeid: int,
attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]] = None,
dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[
typing.Union[
None, # Empty tuple, all wildcard
Expand All @@ -1461,10 +1464,11 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]]] = None,
eventNumberFilter: typing.Optional[int] = None,
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
eventNumberFilter: typing.Optional[int] = None,
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of attributes and/or events from a target node
Expand Down Expand Up @@ -1534,33 +1538,43 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
eventPaths = [self._parseEventPathTuple(
v) for v in events] if events else None

ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self,
transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject)
ClusterAttribute.Read(transaction, device=device.deviceProxy,
attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths,
eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject,
eventNumberFilter=eventNumberFilter,
subscriptionParameters=ClusterAttribute.SubscriptionParameters(
reportInterval[0], reportInterval[1]) if reportInterval else None,
fabricFiltered=fabricFiltered,
keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error()
return await future
await future

async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
returnClusterObject: bool = False,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
if result := transaction.GetSubscriptionHandler():
return result
else:
return transaction.GetReadResponse()

async def ReadAttribute(
self,
nodeid: int,
attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
returnClusterObject: bool = False,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()
Expand Down Expand Up @@ -1629,24 +1643,28 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.Li
else:
return res.attributes

async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[str, int], # all wildcard with urgency set
typing.Tuple[int, int], # Endpoint,
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]], eventNumberFilter: typing.Optional[int] = None,
fabricFiltered: bool = True,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
keepSubscriptions: bool = False,
autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
async def ReadEvent(
self,
nodeid: int,
events: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[str, int], # all wildcard with urgency set
typing.Tuple[int, int], # Endpoint,
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]], eventNumberFilter: typing.Optional[int] = None,
fabricFiltered: bool = True,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
keepSubscriptions: bool = False,
autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of events from a target node, this is a wrapper of DeviceController.Read()
Expand Down
72 changes: 34 additions & 38 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,17 @@ class AttributeCache:
returnClusterObject: bool = False
attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field(
default_factory=lambda: {})
attributeCache: Dict[int, List[Cluster]] = field(
default_factory=lambda: {})
versionList: Dict[int, Dict[int, Dict[int, int]]] = field(
default_factory=lambda: {})

_attributeCacheUpdateNeeded: set[AttributePath] = field(
default_factory=lambda: set())
_attributeCache: Dict[int, List[Cluster]] = field(
default_factory=lambda: {})

def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]):
''' Store data in TLV since that makes it easiest to eventually convert to either the
cluster or attribute view representations (see below in UpdateCachedData).
cluster or attribute view representations (see below in GetUpdatedAttributeCache()).
'''
if (path.EndpointId not in self.attributeTLVCache):
self.attributeTLVCache[path.EndpointId] = {}
Expand All @@ -344,7 +347,10 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V

clusterCache[path.AttributeId] = data

def UpdateCachedData(self, changedPathSet: set[AttributePath]):
# For this path the attribute cache still requires an update.
self._attributeCacheUpdateNeeded.add(path)

def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]:
''' This converts the raw TLV data into a cluster object format.
Two formats are available:
Expand Down Expand Up @@ -381,12 +387,12 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):
except Exception as ex:
return ValueDecodeFailure(value, ex)

for attributePath in changedPathSet:
for attributePath in self._attributeCacheUpdateNeeded:
endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId

if endpointId not in self.attributeCache:
self.attributeCache[endpointId] = {}
endpointCache = self.attributeCache[endpointId]
if endpointId not in self._attributeCache:
self._attributeCache[endpointId] = {}
endpointCache = self._attributeCache[endpointId]

if clusterId not in _ClusterIndex:
#
Expand Down Expand Up @@ -414,6 +420,8 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):

attributeType = _AttributeIndex[(clusterId, attributeId)][0]
clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType)
self._attributeCacheUpdateNeeded.clear()
return self._attributeCache


class SubscriptionTransaction:
Expand All @@ -434,12 +442,12 @@ def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl):
def GetAttributes(self):
''' Returns the attribute value cache tracking the latest state on the publisher.
'''
return self._readTransaction._cache.attributeCache
return self._readTransaction._cache.GetUpdatedAttributeCache()

def GetAttribute(self, path: TypedAttributePath) -> Any:
''' Returns a specific attribute given a TypedAttributePath.
'''
data = self._readTransaction._cache.attributeCache
data = self._readTransaction._cache.GetUpdatedAttributeCache()

if (self._readTransaction._cache.returnClusterObject):
return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}')
Expand Down Expand Up @@ -650,6 +658,18 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
def GetAllEventValues(self):
return self._events

def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse:
"""Prepares and returns the ReadResponse object."""
return self.ReadResponse(
attributes=self._cache.GetUpdatedAttributeCache(),
events=self._events,
tlvAttributes=self._cache.attributeTLVCache
)

def GetSubscriptionHandler(self) -> SubscriptionTransaction | None:
"""Returns subscription transaction."""
return self._subscription_handler

def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
try:
imStatus = chip.interaction_model.Status(status)
Expand Down Expand Up @@ -716,7 +736,7 @@ def _handleSubscriptionEstablished(self, subscriptionId):
if not self._future.done():
self._subscription_handler = SubscriptionTransaction(
self, subscriptionId, self._devCtrl)
self._future.set_result(self._subscription_handler)
self._future.set_result(self)
else:
self._subscription_handler._subscriptionId = subscriptionId
if self._subscription_handler._onResubscriptionSucceededCb is not None:
Expand Down Expand Up @@ -745,8 +765,6 @@ def _handleReportBegin(self):
pass

def _handleReportEnd(self):
self._cache.UpdateCachedData(self._changedPathSet)

if (self._subscription_handler is not None):
for change in self._changedPathSet:
try:
Expand All @@ -772,8 +790,7 @@ def _handleDone(self):
if self._resultError is not None:
self._future.set_exception(self._resultError.to_exception())
else:
self._future.set_result(AsyncReadTransaction.ReadResponse(
attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache))
self._future.set_result(self)

#
# Decrement the ref on ourselves to match the increment that happened at allocation.
Expand Down Expand Up @@ -1001,18 +1018,16 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri
)


def Read(future: Future, eventLoop, device, devCtrl,
def Read(transaction: AsyncReadTransaction, device,
attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None,
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None,
subscriptionParameters: Optional[SubscriptionParameters] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
if (not attributes) and dataVersionFilters:
raise ValueError(
"Must provide valid attribute list when data version filters is not null")

handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(
future, eventLoop, devCtrl, returnClusterObject)

attributePathsForCffi = None
if attributes is not None:
Expand Down Expand Up @@ -1119,25 +1134,6 @@ def Read(future: Future, eventLoop, device, devCtrl,
return res


def ReadAttributes(future: Future, eventLoop, device, devCtrl,
attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None,
returnClusterObject: bool = True,
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device,
devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters,
events=None, returnClusterObject=returnClusterObject,
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)


def ReadEvents(future: Future, eventLoop, device, devCtrl,
events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True,
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None,
dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter,
returnClusterObject=returnClusterObject,
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)


def Init():
handle = chip.native.GetLibraryHandle()

Expand Down
Loading

0 comments on commit 905b2fd

Please sign in to comment.