From 108061111f2fada9ffd43ba2293cd706e636e2d3 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 21 Jun 2022 19:49:57 -0700 Subject: [PATCH] Add Error Callback for subscriptions (#19693) This is useful when subscriptions are not renewed automatically to get the actual error from the stack. Without this change, if auto-renew is disabled, the following exception is thrown: ERROR Exception in callback AsyncReadTransaction._handleError(50) handle: Traceback (most recent call last): File "/usr/lib/python3.9/asyncio/events.py", line 80, in _run self._context.run(self._callback, *self._args) File "/home/sag/projects/project-chip/connectedhomeip/out/python_env/lib/python3.9/site-packages/chip/clusters/Attribute.py", line 661, in _handleError self._future.set_exception( asyncio.exceptions.InvalidStateError: invalid state --- .../python/chip/clusters/Attribute.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index a70f49dda601e7..07ae05a6b0ebf4 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -468,6 +468,7 @@ class SubscriptionTransaction: def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl): self._onAttributeChangeCb = DefaultAttributeChangeCallback self._onEventChangeCb = DefaultEventChangeCallback + self._onErrorCb = DefaultErrorCallback self._readTransaction = transaction self._subscriptionId = subscriptionId self._devCtrl = devCtrl @@ -502,6 +503,13 @@ def SetEventUpdateCallback(self, callback: Callable[[EventReadResult, Subscripti if callback is not None: self._onEventChangeCb = callback + def SetErrorCallback(self, callback: Callable[[int, SubscriptionTransaction], None]): + ''' + Sets the callback function in case a subscription error occured, accepts a Callable accepts an error code and the cached data. + ''' + if callback is not None: + self._onErrorCb = callback + @property def OnAttributeChangeCb(self) -> Callable[[TypedAttributePath, SubscriptionTransaction], None]: return self._onAttributeChangeCb @@ -510,6 +518,10 @@ def OnAttributeChangeCb(self) -> Callable[[TypedAttributePath, SubscriptionTrans def OnEventChangeCb(self) -> Callable[[EventReadResult, SubscriptionTransaction], None]: return self._onEventChangeCb + @property + def OnErrorCb(self) -> Callable[[int, SubscriptionTransaction], None]: + return self._onErrorCb + def Shutdown(self): if (self._isDone): print("Subscription was already terminated previously!") @@ -545,6 +557,10 @@ def DefaultEventChangeCallback(data: EventReadResult, transaction: SubscriptionT pprint(data, expand_all=True) +def DefaultErrorCallback(chipError: int, transaction: SubscriptionTransaction): + print("Error during Subscription: Chip Stack Error %d".format(chipError)) + + def _BuildEventIndex(): ''' Build internal event index for locating the corresponding cluster object by path in the future. We do this because this operation will take a long time when there are lots of events, it takes about 300ms for a single query. @@ -659,8 +675,10 @@ def handleEventData(self, header: EventHeader, path: EventPath, data: bytes, sta self._handleEventData(header, path, data, status) def _handleError(self, chipError: int): - self._future.set_exception( - chip.exceptions.ChipStackError(chipError)) + if not self._future.done(): + self._future.set_exception( + chip.exceptions.ChipStackError(chipError)) + self._subscription_handler.OnErrorCb(chipError, self._subscription_handler) def handleError(self, chipError: int): self._event_loop.call_soon_threadsafe(