From 1065000dca05274d8fecfbdfd4844e79aff8d9bb Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 13 Jun 2024 16:44:23 +0200 Subject: [PATCH] [Python] Use thread-safe futures for concurrent operations (#33891) * [Python] Use thread-safe futures for concurrent operations Instead of using quasi-global variables in the ChipStack singleton use device controller local futures to store results from callbacks. This has several advantages, namely: - Avoid unnecessary shared state between device controllers - Avoid unnecessary shared state between various operations within a device controller (those who don't share callbacks could be called from different threads now) - Explicitly set Futures to None to detect spurious/unexpected callbacks - Better code readability - concurrent.futures are thread-safe - Will make asyncio transition easier This change shouldn't change the external API. * [Python] Fix EstablishPASESession API compatibility * [Python] Make ConnectBLE behave as before --- src/controller/python/chip/ChipDeviceCtrl.py | 317 +++++++++++-------- src/controller/python/chip/ChipStack.py | 27 +- 2 files changed, 184 insertions(+), 160 deletions(-) diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index e9ff7280d9a31d..42ee9e6f535197 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -30,6 +30,7 @@ import asyncio import builtins +import concurrent.futures import copy import ctypes import enum @@ -225,15 +226,6 @@ def wrapper(*args, **kwargs): return wrapper -class DCState(enum.IntEnum): - NOT_INITIALIZED = 0 - IDLE = 1 - BLE_READY = 2 - RENDEZVOUS_ONGOING = 3 - RENDEZVOUS_CONNECTED = 4 - COMMISSIONING = 5 - - class CommissionableNode(discovery.CommissionableNode): def SetDeviceController(self, devCtrl: 'ChipDeviceController'): self._devCtrl = devCtrl @@ -330,7 +322,6 @@ class ChipDeviceControllerBase(): activeList = set() def __init__(self, name: str = ''): - self.state = DCState.NOT_INITIALIZED self.devCtrl = None self._ChipStack = builtins.chipStack self._dmLib = None @@ -348,22 +339,28 @@ def __init__(self, name: str = ''): self._Cluster = ChipClusters(builtins.chipStack) self._Cluster.InitLib(self._dmLib) + self._commissioning_complete_future: typing.Optional[concurrent.futures.Future] = None + self._open_window_complete_future: typing.Optional[concurrent.futures.Future] = None + self._unpair_device_complete_future: typing.Optional[concurrent.futures.Future] = None + self._pase_establishment_complete_future: typing.Optional[concurrent.futures.Future] = None def _set_dev_ctrl(self, devCtrl, pairingDelegate): - def HandleCommissioningComplete(nodeid, err): + def HandleCommissioningComplete(nodeId: int, err: PyChipError): if err.is_success: logging.info("Commissioning complete") else: logging.warning("Failed to commission: {}".format(err)) self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters(False, None) - self.state = DCState.IDLE - self._ChipStack.callbackRes = err - self._ChipStack.commissioningEventRes = err + if self._dmLib.pychip_TestCommissionerUsed(): - self._ChipStack.commissioningEventRes = self._dmLib.pychip_GetCompletionError() - self._ChipStack.commissioningCompleteEvent.set() - self._ChipStack.completeEvent.set() + err = self._dmLib.pychip_GetCompletionError() + + if self._commissioning_complete_future is None: + logging.exception("HandleCommissioningComplete called unexpectedly") + return + + self._commissioning_complete_future.set_result(err) def HandleFabricCheck(nodeId): self.fabricCheckNodeId = nodeId @@ -372,13 +369,19 @@ def HandleOpenWindowComplete(nodeid: int, setupPinCode: int, setupManualCode: st setupQRCode: str, err: PyChipError) -> None: if err.is_success: logging.info("Open Commissioning Window complete setting nodeid {} pincode to {}".format(nodeid, setupPinCode)) - self._ChipStack.openCommissioningWindowPincode[nodeid] = CommissioningParameters( + commissioningParameters = CommissioningParameters( setupPinCode=setupPinCode, setupManualCode=setupManualCode.decode(), setupQRCode=setupQRCode.decode()) else: logging.warning("Failed to open commissioning window: {}".format(err)) - self._ChipStack.callbackRes = err - self._ChipStack.completeEvent.set() + if self._open_window_complete_future is None: + logging.exception("HandleOpenWindowComplete called unexpectedly") + return + + if err.is_success: + self._open_window_complete_future.set_result(commissioningParameters) + else: + self._open_window_complete_future.set_exception(err.to_exception()) def HandleUnpairDeviceComplete(nodeid: int, err: PyChipError): if err.is_success: @@ -386,27 +389,33 @@ def HandleUnpairDeviceComplete(nodeid: int, err: PyChipError): else: logging.warning("Failed to unpair device: {}".format(err)) - self._ChipStack.callbackRes = err - self._ChipStack.completeEvent.set() + if self._unpair_device_complete_future is None: + logging.exception("HandleUnpairDeviceComplete called unexpectedly") + return + + if err.is_success: + self._unpair_device_complete_future.set_result(None) + else: + self._unpair_device_complete_future.set_exception(err.to_exception()) def HandlePASEEstablishmentComplete(err: PyChipError): if not err.is_success: logging.warning("Failed to establish secure session to device: {}".format(err)) - self._ChipStack.callbackRes = err.to_exception() else: logging.info("Established secure session with Device") - if self.state != DCState.COMMISSIONING: - # During Commissioning, HandlePASEEstablishmentComplete will also be called, - # in this case the async operation should be marked as finished by - # HandleCommissioningComplete instead this function. - self.state = DCState.IDLE - self._ChipStack.completeEvent.set() - else: - # When commissioning, getting an error during key exhange - # needs to unblock the entire commissioning flow. + if self._commissioning_complete_future is not None: + # During Commissioning, HandlePASEEstablishmentComplete will also be called. + # Only complete the future if PASE session establishment failed. if not err.is_success: - HandleCommissioningComplete(0, err) + self._commissioning_complete_future.set_result(err) + return + + if self._pase_establishment_complete_future is None: + logging.exception("HandlePASEEstablishmentComplete called unexpectedly") + return + + self._pase_establishment_complete_future.set_result(err) self.pairingDelegate = pairingDelegate self.devCtrl = devCtrl @@ -431,7 +440,6 @@ def HandlePASEEstablishmentComplete(err: PyChipError): self.cbHandleDeviceUnpairCompleteFunct = _DeviceUnpairingCompleteFunct(HandleUnpairDeviceComplete) - self.state = DCState.IDLE self._isActive = True # Validate FabricID/NodeID followed from NOC Chain self._fabricId = self.GetFabricIdInternal() @@ -439,7 +447,6 @@ def HandlePASEEstablishmentComplete(err: PyChipError): self._nodeId = self.GetNodeIdInternal() def _finish_init(self): - self.state = DCState.IDLE self._isActive = True ChipDeviceController.activeList.add(self) @@ -529,26 +536,35 @@ def IsConnected(self): def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError: self.CheckIsActive() - self._ChipStack.commissioningCompleteEvent.clear() + self._commissioning_complete_future = concurrent.futures.Future() - self.state = DCState.COMMISSIONING - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_ConnectBLE( - self.devCtrl, discriminator, setupPinCode, nodeid) - ).raise_on_error() - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_ConnectBLE( + self.devCtrl, discriminator, setupPinCode, nodeid) + ).raise_on_error() - def UnpairDevice(self, nodeid: int): + # TODO: Change return None. Only returning on success is not useful. + # but that is what the previous implementation did. + res = self._commissioning_complete_future.result() + res.raise_on_error() + return res + finally: + self._commissioning_complete_future = None + + def UnpairDevice(self, nodeid: int) -> None: self.CheckIsActive() - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_UnpairDevice( - self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct) - ).raise_on_error() + self._unpair_device_complete_future = concurrent.futures.Future() + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_UnpairDevice( + self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct) + ).raise_on_error() + self._unpair_device_complete_future.result() + finally: + self._unpair_device_complete_future = None def CloseBLEConnection(self): self.CheckIsActive() @@ -582,32 +598,62 @@ def CloseSession(self, nodeid): def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid: int): self.CheckIsActive() - self.state = DCState.RENDEZVOUS_ONGOING - self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE( - self.devCtrl, setupPinCode, discriminator, nodeid) - ) + self._pase_establishment_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE( + self.devCtrl, setupPinCode, discriminator, nodeid) + ).raise_on_error() + + # TODO: This is a bit funky, but what the API returned with the previous + # implementation. We should revisit this. + err = self._pase_establishment_complete_future.result() + if not err.is_success: + return err.to_exception() + return None + finally: + self._pase_establishment_complete_future = None def EstablishPASESessionIP(self, ipaddr: str, setupPinCode: int, nodeid: int, port: int = 0): self.CheckIsActive() - self.state = DCState.RENDEZVOUS_ONGOING - self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP( - self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port) - ) + self._pase_establishment_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP( + self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port) + ).raise_on_error() + + # TODO: This is a bit funky, but what the API returned with the previous + # implementation. We should revisit this. + err = self._pase_establishment_complete_future.result() + if not err.is_success: + return err.to_exception() + return None + finally: + self._pase_establishment_complete_future = None def EstablishPASESession(self, setUpCode: str, nodeid: int): self.CheckIsActive() - self.state = DCState.RENDEZVOUS_ONGOING - self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_EstablishPASESession( - self.devCtrl, setUpCode.encode("utf-8"), nodeid) - ) + self._pase_establishment_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_EstablishPASESession( + self.devCtrl, setUpCode.encode("utf-8"), nodeid) + ).raise_on_error() + + # TODO: This is a bit funky, but what the API returned with the previous + # implementation. We should revisit this. + err = self._pase_establishment_complete_future.result() + if not err.is_success: + return err.to_exception() + return None + finally: + self._pase_establishment_complete_future = None def GetTestCommissionerUsed(self): return self._ChipStack.Call( @@ -642,11 +688,6 @@ def CheckStageSuccessful(self, stage: int): def CheckTestCommissionerPaseConnection(self, nodeid): return self._dmLib.pychip_TestPaseConnection(nodeid) - def NOCChainCallback(self, nocChain): - self._ChipStack.callbackRes = nocChain - self._ChipStack.completeEvent.set() - return - def ResolveNode(self, nodeid): self.CheckIsActive() @@ -753,12 +794,16 @@ def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: int, Returns CommissioningParameters ''' self.CheckIsActive() - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow( - self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option) - ).raise_on_error() - self._ChipStack.callbackRes.raise_on_error() - return self._ChipStack.openCommissioningWindowPincode[nodeid] + self._open_window_complete_future = concurrent.futures.Future() + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow( + self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option) + ).raise_on_error() + + return self._open_window_complete_future.result() + finally: + self._open_window_complete_future = None def GetCompressedFabricId(self): self.CheckIsActive() @@ -1773,6 +1818,7 @@ def __init__(self, opCredsContext: ctypes.c_void_p, fabricId: int, nodeId: int, f"caIndex({fabricAdmin.caIndex:x})/fabricId(0x{fabricId:016X})/nodeId(0x{nodeId:016X})" ) + self._issue_node_chain_complete: typing.Optional[concurrent.futures.Future] = None self._dmLib.pychip_DeviceController_SetIssueNOCChainCallbackPythonCallback(_IssueNOCChainCallbackPythonCallback) pairingDelegate = c_void_p(None) @@ -1823,17 +1869,18 @@ def Commission(self, nodeid) -> PyChipError: bool: True if successful, False otherwise. ''' self.CheckIsActive() - self._ChipStack.commissioningCompleteEvent.clear() - self.state = DCState.COMMISSIONING - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_Commission( - self.devCtrl, nodeid) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + self._commissioning_complete_future = concurrent.futures.Future() + + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_Commission( + self.devCtrl, nodeid) + ).raise_on_error() + + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None def CommissionThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes) -> PyChipError: ''' Commissions a Thread device over BLE @@ -1963,25 +2010,21 @@ def CommissionOnNetwork(self, nodeId: int, setupPinCode: int, ''' self.CheckIsActive() - # IP connection will run through full commissioning, so we need to wait - # for the commissioning complete event, not just any callback. - self.state = DCState.COMMISSIONING - # Convert numerical filters to string for passing down to binding. if isinstance(filter, int): filter = str(filter) - self._ChipStack.commissioningCompleteEvent.clear() + self._commissioning_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission( + self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec) + ).raise_on_error() - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission( - self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: DiscoveryType = DiscoveryType.DISCOVERY_ALL) -> PyChipError: ''' Commission with the given nodeid from the setupPayload. @@ -1991,51 +2034,57 @@ def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: Disc setupPayload = setupPayload.encode() + b'\0' - # IP connection will run through full commissioning, so we need to wait - # for the commissioning complete event, not just any callback. - self.state = DCState.COMMISSIONING + self._commissioning_complete_future = concurrent.futures.Future() - self._ChipStack.commissioningCompleteEvent.clear() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_ConnectWithCode( + self.devCtrl, setupPayload, nodeid, discoveryType.value) + ).raise_on_error() - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_ConnectWithCode( - self.devCtrl, setupPayload, nodeid, discoveryType.value) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> PyChipError: """ DEPRECATED, DO NOT USE! Use `CommissionOnNetwork` or `CommissionWithCode` """ self.CheckIsActive() - # IP connection will run through full commissioning, so we need to wait - # for the commissioning complete event, not just any callback. - self.state = DCState.COMMISSIONING + self._commissioning_complete_future = concurrent.futures.Future() - self._ChipStack.commissioningCompleteEvent.clear() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_ConnectIP( + self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid) + ).raise_on_error() - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_ConnectIP( - self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None + + def NOCChainCallback(self, nocChain): + if self._issue_node_chain_complete is None: + logging.exception("NOCChainCallback while not expecting a callback") + return + self._issue_node_chain_complete.set_result(nocChain) + return def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRResponse, nodeId: int): """Issue an NOC chain using the associated OperationalCredentialsDelegate. The NOC chain will be provided in TLV cert format.""" self.CheckIsActive() - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_IssueNOCChain( - self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId) - ) + self._issue_node_chain_complete = concurrent.futures.Future() + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_IssueNOCChain( + self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId) + ).raise_on_error() + return self._issue_node_chain_complete.result() + finally: + self._issue_node_chain_complete = None class BareChipDeviceController(ChipDeviceControllerBase): diff --git a/src/controller/python/chip/ChipStack.py b/src/controller/python/chip/ChipStack.py index 5fd0601ba204e7..4f19776664cfa7 100644 --- a/src/controller/python/chip/ChipStack.py +++ b/src/controller/python/chip/ChipStack.py @@ -30,7 +30,7 @@ import builtins import os from ctypes import CFUNCTYPE, Structure, c_bool, c_char_p, c_uint16, c_uint32, c_void_p, py_object, pythonapi -from threading import Condition, Event, Lock +from threading import Condition, Lock import chip.native from chip.native import PyChipError @@ -144,14 +144,9 @@ class ChipStack(object): def __init__(self, persistentStoragePath: str, enableServerInteractions=True): builtins.enableDebugMode = False - self.completeEvent = Event() - self.commissioningCompleteEvent = Event() self._ChipStackLib = None self._chipDLLPath = None self.devMgr = None - self.callbackRes = None - self.commissioningEventRes = None - self.openCommissioningWindowPincode = {} self._enableServerInteractions = enableServerInteractions # @@ -212,7 +207,6 @@ def Shutdown(self): self._ChipStackLib = None self._chipDLLPath = None self.devMgr = None - self.callbackRes = None delattr(builtins, "chipStack") @@ -239,25 +233,6 @@ async def CallAsync(self, callFunct, timeoutMs: int = None): return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None) - def CallAsyncWithCompleteCallback(self, callFunct): - '''Run a Python function on CHIP stack, and wait for the application specific response. - This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics. - Calling this function on CHIP on CHIP mainloop thread will cause deadlock. - Make sure to register the necessary callbacks which release the function by setting the completeEvent. - ''' - # throw error if op in progress - self.callbackRes = None - self.completeEvent.clear() - res = self.PostTaskOnChipThread(callFunct).Wait() - - if not res.is_success: - self.completeEvent.set() - raise res.to_exception() - self.completeEvent.wait() - if isinstance(self.callbackRes, ChipStackException): - raise self.callbackRes - return self.callbackRes - def PostTaskOnChipThread(self, callFunct) -> AsyncCallableHandle: '''Run a Python function on CHIP stack, and wait for the response. This function will post a task on CHIP mainloop, and return an object with Wait() method for getting the result.