Skip to content

Commit

Permalink
[Python] Call SDK asyncio friendly (project-chip#32764)
Browse files Browse the repository at this point in the history
* [Python] Rename CallAsync to CallAsyncWithCallback

CallAsync continuously calls a callback function during the wait
for the call. Rename the function to reflect that fact.

This frees up CallAsync for an asyncio friendly implementation.

* [Python] Implement asyncio variant of CallAsync

Call Matter SDK in a asyncio friendly way. During posting of the task
onto the CHIP mainloop, it makes sure that the asyncio loop is not
blocked.

* [Python] Use CallAsync where appropriate

* Rename AsyncSimpleCallableHandle to AsyncioCallableHandle

* Rename CallAsyncWithCallback to CallAsyncWithCompleteCallback

Also add a comment that the function needs to be released by registering
a callback and setting the complete event.

* Add comments about lock
  • Loading branch information
agners authored May 16, 2024
1 parent 3e6657c commit 4d0a1f0
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 29 deletions.
43 changes: 23 additions & 20 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def __init__(self, deviceProxy: ctypes.c_void_p, dmLib=None):
def __del__(self):
if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
# This destructor is called from any threading context, including on the Matter threading context.
# So, we cannot call chipStack.Call or chipStack.CallAsync which waits for the posted work to
# So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to
# actually be executed. Instead, we just post/schedule the work and move on.
builtins.chipStack.PostTaskOnChipThread(lambda: self._dmLib.pychip_FreeOperationalDeviceProxy(self._deviceProxy))

Expand Down Expand Up @@ -447,7 +447,7 @@ def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError:

self.state = DCState.COMMISSIONING
self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsync(
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, setupPinCode, nodeid)
).raise_on_error()
Expand All @@ -459,7 +459,7 @@ def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError:
def UnpairDevice(self, nodeid: int):
self.CheckIsActive()

return self._ChipStack.CallAsync(
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
).raise_on_error()
Expand Down Expand Up @@ -498,7 +498,7 @@ def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid:

self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
return self._ChipStack.CallAsync(
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE(
self.devCtrl, setupPinCode, discriminator, nodeid)
)
Expand All @@ -508,7 +508,7 @@ def EstablishPASESessionIP(self, ipaddr: str, setupPinCode: int, nodeid: int, po

self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
return self._ChipStack.CallAsync(
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port)
)
Expand All @@ -518,7 +518,7 @@ def EstablishPASESession(self, setUpCode: str, nodeid: int):

self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
return self._ChipStack.CallAsync(
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESession(
self.devCtrl, setUpCode.encode("utf-8"), nodeid)
)
Expand Down Expand Up @@ -737,7 +737,7 @@ def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: int,
Returns CommissioningParameters
'''
self.CheckIsActive()
self._ChipStack.CallAsync(
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
).raise_on_error()
Expand Down Expand Up @@ -858,7 +858,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in

if allowPASE:
returnDevice = c_void_p(None)
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
logging.info('Using PASE connection')
Expand Down Expand Up @@ -888,11 +888,12 @@ def deviceAvailable(self, device, err):

closure = DeviceAvailableClosure(eventLoop, future)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback),
timeoutMs).raise_on_error()
timeoutMs)
res.raise_on_error()

# The callback might have been received synchronously (during self._ChipStack.Call()).
# The callback might have been received synchronously (during self._ChipStack.CallAsync()).
# In that case the Future has already been set it will return immediately
if timeoutMs is not None:
timeout = float(timeoutMs) / 1000
Expand Down Expand Up @@ -1020,13 +1021,14 @@ async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.
future = eventLoop.create_future()

device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)
ClusterCommand.SendCommand(
res = await ClusterCommand.SendCommand(
future, eventLoop, responseType, device.deviceProxy, ClusterCommand.CommandPath(
EndpointId=endpoint,
ClusterId=payload.cluster_id,
CommandId=payload.command_id,
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs,
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
res.raise_on_error()
return await future

async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo],
Expand Down Expand Up @@ -1062,10 +1064,11 @@ async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterComm

device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)

ClusterCommand.SendBatchCommands(
res = await ClusterCommand.SendBatchCommands(
future, eventLoop, device.deviceProxy, commands,
timedRequestTimeoutMs=timedRequestTimeoutMs,
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
res.raise_on_error()
return await future

def SendGroupCommand(self, groupid: int, payload: ClusterObjects.ClusterCommand, busyWaitMs: typing.Union[None, int] = None):
Expand Down Expand Up @@ -1895,7 +1898,7 @@ def Commission(self, nodeid) -> PyChipError:
self._ChipStack.commissioningCompleteEvent.clear()
self.state = DCState.COMMISSIONING

self._ChipStack.CallAsync(
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
Expand Down Expand Up @@ -2011,7 +2014,7 @@ def CommissionOnNetwork(self, nodeId: int, setupPinCode: int,
self._ChipStack.commissioningCompleteEvent.clear()

self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsync(
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec)
)
Expand All @@ -2035,7 +2038,7 @@ def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: Disc
self._ChipStack.commissioningCompleteEvent.clear()

self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsync(
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
self.devCtrl, setupPayload, nodeid, discoveryType.value)
)
Expand All @@ -2055,7 +2058,7 @@ def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> PyChipErr
self._ChipStack.commissioningCompleteEvent.clear()

self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsync(
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
)
Expand All @@ -2069,7 +2072,7 @@ def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRRespons
The NOC chain will be provided in TLV cert format."""
self.CheckIsActive()

return self._ChipStack.CallAsync(
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
)
Expand Down
53 changes: 52 additions & 1 deletion src/controller/python/chip/ChipStack.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from __future__ import absolute_import, print_function

import asyncio
import builtins
import logging
import os
Expand Down Expand Up @@ -164,6 +165,35 @@ def Wait(self, timeoutMs: int = None):
return self._res


class AsyncioCallableHandle:
"""Class which handles Matter SDK Calls asyncio friendly"""

def __init__(self, callback):
self._callback = callback
self._loop = asyncio.get_event_loop()
self._future = self._loop.create_future()
self._result = None
self._exception = None

@property
def future(self):
return self._future

def _done(self):
if self._exception:
self._future.set_exception(self._exception)
else:
self._future.set_result(self._result)

def __call__(self):
try:
self._result = self._callback()
except Exception as ex:
self._exception = ex
self._loop.call_soon_threadsafe(self._done)
pythonapi.Py_DecRef(py_object(self))


_CompleteFunct = CFUNCTYPE(None, c_void_p, c_void_p)
_ErrorFunct = CFUNCTYPE(None, c_void_p, c_void_p,
c_ulong, POINTER(DeviceStatusStruct))
Expand All @@ -178,6 +208,7 @@ def __init__(self, persistentStoragePath: str, installDefaultLogHandler=True,
bluetoothAdapter=None, enableServerInteractions=True):
builtins.enableDebugMode = False

# TODO: Probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
self.networkLock = Lock()
self.completeEvent = Event()
self.commissioningCompleteEvent = Event()
Expand Down Expand Up @@ -318,6 +349,7 @@ def setLogFunct(self, logFunct):
logFunct = 0
if not isinstance(logFunct, _LogMessageFunct):
logFunct = _LogMessageFunct(logFunct)
# TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
with self.networkLock:
# NOTE: ChipStack must hold a reference to the CFUNCTYPE object while it is
# set. Otherwise it may get garbage collected, and logging calls from the
Expand Down Expand Up @@ -360,21 +392,40 @@ def Call(self, callFunct, timeoutMs: int = None):
# throw error if op in progress
self.callbackRes = None
self.completeEvent.clear()
# TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
with self.networkLock:
res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)
self.completeEvent.set()
if res == 0 and self.callbackRes is not None:
return self.callbackRes
return res

def CallAsync(self, callFunct):
async def CallAsync(self, callFunct, timeoutMs: int = None):
'''Run a Python function on CHIP stack, and wait for the response.
This function will post a task on CHIP mainloop and waits for the call response in a asyncio friendly manner.
'''
callObj = AsyncioCallableHandle(callFunct)
pythonapi.Py_IncRef(py_object(callObj))

res = self._ChipStackLib.pychip_DeviceController_PostTaskOnChipThread(
self.cbHandleChipThreadRun, py_object(callObj))

if not res.is_success:
pythonapi.Py_DecRef(py_object(callObj))
raise res.to_exception()

return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None)

def CallAsyncWithCompleteCallback(self, callFunct):
'''Run a Python function on CHIP stack, and wait for the application specific response.
This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics.
Calling this function on CHIP on CHIP mainloop thread will cause deadlock.
Make sure to register the necessary callbacks which release the function by setting the completeEvent.
'''
# throw error if op in progress
self.callbackRes = None
self.completeEvent.clear()
# TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
with self.networkLock:
res = self.PostTaskOnChipThread(callFunct).Wait()

Expand Down
16 changes: 8 additions & 8 deletions src/controller/python/chip/clusters/Command.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ def TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(future: Future, eventLo
))


def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None, busyWaitMs: Union[None, int] = None,
suppressResponse: Union[None, bool] = None) -> PyChipError:
async def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None,
busyWaitMs: Union[None, int] = None, suppressResponse: Union[None, bool] = None) -> PyChipError:
''' Send a cluster-object encapsulated command to a device and does the following:
- On receipt of a successful data response, returns the cluster-object equivalent through the provided future.
- None (on a successful response containing no data)
Expand All @@ -316,7 +316,7 @@ def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPa

payloadTLV = payload.ToTLV()
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
return builtins.chipStack.Call(
return await builtins.chipStack.CallAsync(
lambda: handle.pychip_CommandSender_SendCommand(
ctypes.py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId,
Expand Down Expand Up @@ -353,9 +353,9 @@ def _BuildPyInvokeRequestData(commands: List[InvokeRequestInfo], timedRequestTim
return pyBatchCommandsData


def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None, busyWaitMs: Optional[int] = None,
suppressResponse: Optional[bool] = None) -> PyChipError:
async def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None,
busyWaitMs: Optional[int] = None, suppressResponse: Optional[bool] = None) -> PyChipError:
''' Initiates an InvokeInteraction with the batch commands provided.
Arguments:
Expand Down Expand Up @@ -388,7 +388,7 @@ def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRe
transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))

return builtins.chipStack.Call(
return await builtins.chipStack.CallAsync(
lambda: handle.pychip_CommandSender_SendBatchCommands(
py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),
Expand Down

0 comments on commit 4d0a1f0

Please sign in to comment.