Skip to content

Commit

Permalink
Add Error Callback for subscriptions (#19693)
Browse files Browse the repository at this point in the history
This is useful when subscriptions are not renewed automatically to get
the actual error from the stack.

Without this change, if auto-renew is disabled, the following exception
is thrown:
ERROR Exception in callback AsyncReadTransaction._handleError(50)
handle: <Handle AsyncReadTransaction._handleError(50)>
Traceback (most recent call last):
  File "/usr/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/sag/projects/project-chip/connectedhomeip/out/python_env/lib/python3.9/site-packages/chip/clusters/Attribute.py", line 661, in _handleError
    self._future.set_exception(
asyncio.exceptions.InvalidStateError: invalid state
  • Loading branch information
agners authored and pull[bot] committed Aug 5, 2022
1 parent e7ca6c1 commit 1080611
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class SubscriptionTransaction:
def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl):
self._onAttributeChangeCb = DefaultAttributeChangeCallback
self._onEventChangeCb = DefaultEventChangeCallback
self._onErrorCb = DefaultErrorCallback
self._readTransaction = transaction
self._subscriptionId = subscriptionId
self._devCtrl = devCtrl
Expand Down Expand Up @@ -502,6 +503,13 @@ def SetEventUpdateCallback(self, callback: Callable[[EventReadResult, Subscripti
if callback is not None:
self._onEventChangeCb = callback

def SetErrorCallback(self, callback: Callable[[int, SubscriptionTransaction], None]):
'''
Sets the callback function in case a subscription error occured, accepts a Callable accepts an error code and the cached data.
'''
if callback is not None:
self._onErrorCb = callback

@property
def OnAttributeChangeCb(self) -> Callable[[TypedAttributePath, SubscriptionTransaction], None]:
return self._onAttributeChangeCb
Expand All @@ -510,6 +518,10 @@ def OnAttributeChangeCb(self) -> Callable[[TypedAttributePath, SubscriptionTrans
def OnEventChangeCb(self) -> Callable[[EventReadResult, SubscriptionTransaction], None]:
return self._onEventChangeCb

@property
def OnErrorCb(self) -> Callable[[int, SubscriptionTransaction], None]:
return self._onErrorCb

def Shutdown(self):
if (self._isDone):
print("Subscription was already terminated previously!")
Expand Down Expand Up @@ -545,6 +557,10 @@ def DefaultEventChangeCallback(data: EventReadResult, transaction: SubscriptionT
pprint(data, expand_all=True)


def DefaultErrorCallback(chipError: int, transaction: SubscriptionTransaction):
print("Error during Subscription: Chip Stack Error %d".format(chipError))


def _BuildEventIndex():
''' Build internal event index for locating the corresponding cluster object by path in the future.
We do this because this operation will take a long time when there are lots of events, it takes about 300ms for a single query.
Expand Down Expand Up @@ -659,8 +675,10 @@ def handleEventData(self, header: EventHeader, path: EventPath, data: bytes, sta
self._handleEventData(header, path, data, status)

def _handleError(self, chipError: int):
self._future.set_exception(
chip.exceptions.ChipStackError(chipError))
if not self._future.done():
self._future.set_exception(
chip.exceptions.ChipStackError(chipError))
self._subscription_handler.OnErrorCb(chipError, self._subscription_handler)

def handleError(self, chipError: int):
self._event_loop.call_soon_threadsafe(
Expand Down

0 comments on commit 1080611

Please sign in to comment.