diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index 4e57e3d6044e67..d63a3772e62dc6 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -186,7 +186,7 @@ def __init__(self, deviceProxy: ctypes.c_void_p, dmLib=None): def __del__(self): if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None): # This destructor is called from any threading context, including on the Matter threading context. - # So, we cannot call chipStack.Call or chipStack.CallAsync which waits for the posted work to + # So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to # actually be executed. Instead, we just post/schedule the work and move on. builtins.chipStack.PostTaskOnChipThread(lambda: self._dmLib.pychip_FreeOperationalDeviceProxy(self._deviceProxy)) @@ -447,7 +447,7 @@ def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError: self.state = DCState.COMMISSIONING self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsync( + self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_ConnectBLE( self.devCtrl, discriminator, setupPinCode, nodeid) ).raise_on_error() @@ -459,7 +459,7 @@ def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError: def UnpairDevice(self, nodeid: int): self.CheckIsActive() - return self._ChipStack.CallAsync( + return self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_UnpairDevice( self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct) ).raise_on_error() @@ -498,7 +498,7 @@ def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid: self.state = DCState.RENDEZVOUS_ONGOING self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsync( + return self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE( self.devCtrl, setupPinCode, discriminator, nodeid) ) @@ -508,7 +508,7 @@ def EstablishPASESessionIP(self, ipaddr: str, setupPinCode: int, nodeid: int, po self.state = DCState.RENDEZVOUS_ONGOING self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsync( + return self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP( self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port) ) @@ -518,7 +518,7 @@ def EstablishPASESession(self, setUpCode: str, nodeid: int): self.state = DCState.RENDEZVOUS_ONGOING self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsync( + return self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_EstablishPASESession( self.devCtrl, setUpCode.encode("utf-8"), nodeid) ) @@ -737,7 +737,7 @@ def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: int, Returns CommissioningParameters ''' self.CheckIsActive() - self._ChipStack.CallAsync( + self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow( self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option) ).raise_on_error() @@ -858,7 +858,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in if allowPASE: returnDevice = c_void_p(None) - res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( + res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) if res.is_success: logging.info('Using PASE connection') @@ -888,11 +888,12 @@ def deviceAvailable(self, device, err): closure = DeviceAvailableClosure(eventLoop, future) ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure)) - self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId( + res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId( self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback), - timeoutMs).raise_on_error() + timeoutMs) + res.raise_on_error() - # The callback might have been received synchronously (during self._ChipStack.Call()). + # The callback might have been received synchronously (during self._ChipStack.CallAsync()). # In that case the Future has already been set it will return immediately if timeoutMs is not None: timeout = float(timeoutMs) / 1000 @@ -1020,13 +1021,14 @@ async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects. future = eventLoop.create_future() device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs) - ClusterCommand.SendCommand( + res = await ClusterCommand.SendCommand( future, eventLoop, responseType, device.deviceProxy, ClusterCommand.CommandPath( EndpointId=endpoint, ClusterId=payload.cluster_id, CommandId=payload.command_id, ), payload, timedRequestTimeoutMs=timedRequestTimeoutMs, - interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error() + interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse) + res.raise_on_error() return await future async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo], @@ -1062,10 +1064,11 @@ async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterComm device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs) - ClusterCommand.SendBatchCommands( + res = await ClusterCommand.SendBatchCommands( future, eventLoop, device.deviceProxy, commands, timedRequestTimeoutMs=timedRequestTimeoutMs, - interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error() + interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse) + res.raise_on_error() return await future def SendGroupCommand(self, groupid: int, payload: ClusterObjects.ClusterCommand, busyWaitMs: typing.Union[None, int] = None): @@ -1895,7 +1898,7 @@ def Commission(self, nodeid) -> PyChipError: self._ChipStack.commissioningCompleteEvent.clear() self.state = DCState.COMMISSIONING - self._ChipStack.CallAsync( + self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_Commission( self.devCtrl, nodeid) ) @@ -2011,7 +2014,7 @@ def CommissionOnNetwork(self, nodeId: int, setupPinCode: int, self._ChipStack.commissioningCompleteEvent.clear() self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsync( + self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission( self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec) ) @@ -2035,7 +2038,7 @@ def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: Disc self._ChipStack.commissioningCompleteEvent.clear() self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsync( + self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_ConnectWithCode( self.devCtrl, setupPayload, nodeid, discoveryType.value) ) @@ -2055,7 +2058,7 @@ def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> PyChipErr self._ChipStack.commissioningCompleteEvent.clear() self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsync( + self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_ConnectIP( self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid) ) @@ -2069,7 +2072,7 @@ def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRRespons The NOC chain will be provided in TLV cert format.""" self.CheckIsActive() - return self._ChipStack.CallAsync( + return self._ChipStack.CallAsyncWithCompleteCallback( lambda: self._dmLib.pychip_DeviceController_IssueNOCChain( self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId) ) diff --git a/src/controller/python/chip/ChipStack.py b/src/controller/python/chip/ChipStack.py index 6df7e41de4433a..35f9e24ef4db25 100644 --- a/src/controller/python/chip/ChipStack.py +++ b/src/controller/python/chip/ChipStack.py @@ -26,6 +26,7 @@ from __future__ import absolute_import, print_function +import asyncio import builtins import logging import os @@ -164,6 +165,35 @@ def Wait(self, timeoutMs: int = None): return self._res +class AsyncioCallableHandle: + """Class which handles Matter SDK Calls asyncio friendly""" + + def __init__(self, callback): + self._callback = callback + self._loop = asyncio.get_event_loop() + self._future = self._loop.create_future() + self._result = None + self._exception = None + + @property + def future(self): + return self._future + + def _done(self): + if self._exception: + self._future.set_exception(self._exception) + else: + self._future.set_result(self._result) + + def __call__(self): + try: + self._result = self._callback() + except Exception as ex: + self._exception = ex + self._loop.call_soon_threadsafe(self._done) + pythonapi.Py_DecRef(py_object(self)) + + _CompleteFunct = CFUNCTYPE(None, c_void_p, c_void_p) _ErrorFunct = CFUNCTYPE(None, c_void_p, c_void_p, c_ulong, POINTER(DeviceStatusStruct)) @@ -178,6 +208,7 @@ def __init__(self, persistentStoragePath: str, installDefaultLogHandler=True, bluetoothAdapter=None, enableServerInteractions=True): builtins.enableDebugMode = False + # TODO: Probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321. self.networkLock = Lock() self.completeEvent = Event() self.commissioningCompleteEvent = Event() @@ -318,6 +349,7 @@ def setLogFunct(self, logFunct): logFunct = 0 if not isinstance(logFunct, _LogMessageFunct): logFunct = _LogMessageFunct(logFunct) + # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321. with self.networkLock: # NOTE: ChipStack must hold a reference to the CFUNCTYPE object while it is # set. Otherwise it may get garbage collected, and logging calls from the @@ -360,6 +392,7 @@ def Call(self, callFunct, timeoutMs: int = None): # throw error if op in progress self.callbackRes = None self.completeEvent.clear() + # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321. with self.networkLock: res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs) self.completeEvent.set() @@ -367,14 +400,32 @@ def Call(self, callFunct, timeoutMs: int = None): return self.callbackRes return res - def CallAsync(self, callFunct): + async def CallAsync(self, callFunct, timeoutMs: int = None): + '''Run a Python function on CHIP stack, and wait for the response. + This function will post a task on CHIP mainloop and waits for the call response in a asyncio friendly manner. + ''' + callObj = AsyncioCallableHandle(callFunct) + pythonapi.Py_IncRef(py_object(callObj)) + + res = self._ChipStackLib.pychip_DeviceController_PostTaskOnChipThread( + self.cbHandleChipThreadRun, py_object(callObj)) + + if not res.is_success: + pythonapi.Py_DecRef(py_object(callObj)) + raise res.to_exception() + + return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None) + + def CallAsyncWithCompleteCallback(self, callFunct): '''Run a Python function on CHIP stack, and wait for the application specific response. This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics. Calling this function on CHIP on CHIP mainloop thread will cause deadlock. + Make sure to register the necessary callbacks which release the function by setting the completeEvent. ''' # throw error if op in progress self.callbackRes = None self.completeEvent.clear() + # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321. with self.networkLock: res = self.PostTaskOnChipThread(callFunct).Wait() diff --git a/src/controller/python/chip/clusters/Command.py b/src/controller/python/chip/clusters/Command.py index 89aae537c9b3fd..6ef25cb211d4da 100644 --- a/src/controller/python/chip/clusters/Command.py +++ b/src/controller/python/chip/clusters/Command.py @@ -291,9 +291,9 @@ def TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(future: Future, eventLo )) -def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, - timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None, busyWaitMs: Union[None, int] = None, - suppressResponse: Union[None, bool] = None) -> PyChipError: +async def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand, + timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None, + busyWaitMs: Union[None, int] = None, suppressResponse: Union[None, bool] = None) -> PyChipError: ''' Send a cluster-object encapsulated command to a device and does the following: - On receipt of a successful data response, returns the cluster-object equivalent through the provided future. - None (on a successful response containing no data) @@ -316,7 +316,7 @@ def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPa payloadTLV = payload.ToTLV() ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction)) - return builtins.chipStack.Call( + return await builtins.chipStack.CallAsync( lambda: handle.pychip_CommandSender_SendCommand( ctypes.py_object(transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId, @@ -353,9 +353,9 @@ def _BuildPyInvokeRequestData(commands: List[InvokeRequestInfo], timedRequestTim return pyBatchCommandsData -def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo], - timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None, busyWaitMs: Optional[int] = None, - suppressResponse: Optional[bool] = None) -> PyChipError: +async def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo], + timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None, + busyWaitMs: Optional[int] = None, suppressResponse: Optional[bool] = None) -> PyChipError: ''' Initiates an InvokeInteraction with the batch commands provided. Arguments: @@ -388,7 +388,7 @@ def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRe transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes) ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction)) - return builtins.chipStack.Call( + return await builtins.chipStack.CallAsync( lambda: handle.pychip_CommandSender_SendBatchCommands( py_object(transaction), device, c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),