Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Python] Use thread-safe futures for concurrent operations (#33891)
Browse files Browse the repository at this point in the history
* [Python] Use thread-safe futures for concurrent operations

Instead of using quasi-global variables in the ChipStack singleton
use device controller local futures to store results from callbacks.
This has several advantages, namely:
- Avoid unnecessary shared state between device controllers
- Avoid unnecessary shared state between various operations within a
  device controller (those who don't share callbacks could be called
  from different threads now)
- Explicitly set Futures to None to detect spurious/unexpected callbacks
- Better code readability
- concurrent.futures are thread-safe
- Will make asyncio transition easier

This change shouldn't change the external API.

* [Python] Fix EstablishPASESession API compatibility

* [Python] Make ConnectBLE behave as before
agners authored and pull[bot] committed Dec 3, 2024
1 parent 6cd68bd commit 1065000
Showing 2 changed files with 184 additions and 160 deletions.
317 changes: 183 additions & 134 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@

import asyncio
import builtins
import concurrent.futures
import copy
import ctypes
import enum
@@ -225,15 +226,6 @@ def wrapper(*args, **kwargs):
return wrapper


class DCState(enum.IntEnum):
NOT_INITIALIZED = 0
IDLE = 1
BLE_READY = 2
RENDEZVOUS_ONGOING = 3
RENDEZVOUS_CONNECTED = 4
COMMISSIONING = 5


class CommissionableNode(discovery.CommissionableNode):
def SetDeviceController(self, devCtrl: 'ChipDeviceController'):
self._devCtrl = devCtrl
@@ -330,7 +322,6 @@ class ChipDeviceControllerBase():
activeList = set()

def __init__(self, name: str = ''):
self.state = DCState.NOT_INITIALIZED
self.devCtrl = None
self._ChipStack = builtins.chipStack
self._dmLib = None
@@ -348,22 +339,28 @@ def __init__(self, name: str = ''):

self._Cluster = ChipClusters(builtins.chipStack)
self._Cluster.InitLib(self._dmLib)
self._commissioning_complete_future: typing.Optional[concurrent.futures.Future] = None
self._open_window_complete_future: typing.Optional[concurrent.futures.Future] = None
self._unpair_device_complete_future: typing.Optional[concurrent.futures.Future] = None
self._pase_establishment_complete_future: typing.Optional[concurrent.futures.Future] = None

def _set_dev_ctrl(self, devCtrl, pairingDelegate):
def HandleCommissioningComplete(nodeid, err):
def HandleCommissioningComplete(nodeId: int, err: PyChipError):
if err.is_success:
logging.info("Commissioning complete")
else:
logging.warning("Failed to commission: {}".format(err))

self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters(False, None)
self.state = DCState.IDLE
self._ChipStack.callbackRes = err
self._ChipStack.commissioningEventRes = err

if self._dmLib.pychip_TestCommissionerUsed():
self._ChipStack.commissioningEventRes = self._dmLib.pychip_GetCompletionError()
self._ChipStack.commissioningCompleteEvent.set()
self._ChipStack.completeEvent.set()
err = self._dmLib.pychip_GetCompletionError()

if self._commissioning_complete_future is None:
logging.exception("HandleCommissioningComplete called unexpectedly")
return

self._commissioning_complete_future.set_result(err)

def HandleFabricCheck(nodeId):
self.fabricCheckNodeId = nodeId
@@ -372,41 +369,53 @@ def HandleOpenWindowComplete(nodeid: int, setupPinCode: int, setupManualCode: st
setupQRCode: str, err: PyChipError) -> None:
if err.is_success:
logging.info("Open Commissioning Window complete setting nodeid {} pincode to {}".format(nodeid, setupPinCode))
self._ChipStack.openCommissioningWindowPincode[nodeid] = CommissioningParameters(
commissioningParameters = CommissioningParameters(
setupPinCode=setupPinCode, setupManualCode=setupManualCode.decode(), setupQRCode=setupQRCode.decode())
else:
logging.warning("Failed to open commissioning window: {}".format(err))

self._ChipStack.callbackRes = err
self._ChipStack.completeEvent.set()
if self._open_window_complete_future is None:
logging.exception("HandleOpenWindowComplete called unexpectedly")
return

if err.is_success:
self._open_window_complete_future.set_result(commissioningParameters)
else:
self._open_window_complete_future.set_exception(err.to_exception())

def HandleUnpairDeviceComplete(nodeid: int, err: PyChipError):
if err.is_success:
logging.info("Succesfully unpaired device with nodeid {}".format(nodeid))
else:
logging.warning("Failed to unpair device: {}".format(err))

self._ChipStack.callbackRes = err
self._ChipStack.completeEvent.set()
if self._unpair_device_complete_future is None:
logging.exception("HandleUnpairDeviceComplete called unexpectedly")
return

if err.is_success:
self._unpair_device_complete_future.set_result(None)
else:
self._unpair_device_complete_future.set_exception(err.to_exception())

def HandlePASEEstablishmentComplete(err: PyChipError):
if not err.is_success:
logging.warning("Failed to establish secure session to device: {}".format(err))
self._ChipStack.callbackRes = err.to_exception()
else:
logging.info("Established secure session with Device")

if self.state != DCState.COMMISSIONING:
# During Commissioning, HandlePASEEstablishmentComplete will also be called,
# in this case the async operation should be marked as finished by
# HandleCommissioningComplete instead this function.
self.state = DCState.IDLE
self._ChipStack.completeEvent.set()
else:
# When commissioning, getting an error during key exhange
# needs to unblock the entire commissioning flow.
if self._commissioning_complete_future is not None:
# During Commissioning, HandlePASEEstablishmentComplete will also be called.
# Only complete the future if PASE session establishment failed.
if not err.is_success:
HandleCommissioningComplete(0, err)
self._commissioning_complete_future.set_result(err)
return

if self._pase_establishment_complete_future is None:
logging.exception("HandlePASEEstablishmentComplete called unexpectedly")
return

self._pase_establishment_complete_future.set_result(err)

self.pairingDelegate = pairingDelegate
self.devCtrl = devCtrl
@@ -431,15 +440,13 @@ def HandlePASEEstablishmentComplete(err: PyChipError):

self.cbHandleDeviceUnpairCompleteFunct = _DeviceUnpairingCompleteFunct(HandleUnpairDeviceComplete)

self.state = DCState.IDLE
self._isActive = True
# Validate FabricID/NodeID followed from NOC Chain
self._fabricId = self.GetFabricIdInternal()
self._fabricIndex = self.GetFabricIndexInternal()
self._nodeId = self.GetNodeIdInternal()

def _finish_init(self):
self.state = DCState.IDLE
self._isActive = True

ChipDeviceController.activeList.add(self)
@@ -529,26 +536,35 @@ def IsConnected(self):
def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError:
self.CheckIsActive()

self._ChipStack.commissioningCompleteEvent.clear()
self._commissioning_complete_future = concurrent.futures.Future()

self.state = DCState.COMMISSIONING
self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, setupPinCode, nodeid)
).raise_on_error()
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return PyChipError(CHIP_ERROR_TIMEOUT)
return self._ChipStack.commissioningEventRes
try:
self._enablePairingCompeleteCallback(True)
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, setupPinCode, nodeid)
).raise_on_error()

def UnpairDevice(self, nodeid: int):
# TODO: Change return None. Only returning on success is not useful.
# but that is what the previous implementation did.
res = self._commissioning_complete_future.result()
res.raise_on_error()
return res
finally:
self._commissioning_complete_future = None

def UnpairDevice(self, nodeid: int) -> None:
self.CheckIsActive()

return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
).raise_on_error()
self._unpair_device_complete_future = concurrent.futures.Future()
try:
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
).raise_on_error()
self._unpair_device_complete_future.result()
finally:
self._unpair_device_complete_future = None

def CloseBLEConnection(self):
self.CheckIsActive()
@@ -582,32 +598,62 @@ def CloseSession(self, nodeid):
def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid: int):
self.CheckIsActive()

self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE(
self.devCtrl, setupPinCode, discriminator, nodeid)
)
self._pase_establishment_complete_future = concurrent.futures.Future()
try:
self._enablePairingCompeleteCallback(True)
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE(
self.devCtrl, setupPinCode, discriminator, nodeid)
).raise_on_error()

# TODO: This is a bit funky, but what the API returned with the previous
# implementation. We should revisit this.
err = self._pase_establishment_complete_future.result()
if not err.is_success:
return err.to_exception()
return None
finally:
self._pase_establishment_complete_future = None

def EstablishPASESessionIP(self, ipaddr: str, setupPinCode: int, nodeid: int, port: int = 0):
self.CheckIsActive()

self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port)
)
self._pase_establishment_complete_future = concurrent.futures.Future()
try:
self._enablePairingCompeleteCallback(True)
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port)
).raise_on_error()

# TODO: This is a bit funky, but what the API returned with the previous
# implementation. We should revisit this.
err = self._pase_establishment_complete_future.result()
if not err.is_success:
return err.to_exception()
return None
finally:
self._pase_establishment_complete_future = None

def EstablishPASESession(self, setUpCode: str, nodeid: int):
self.CheckIsActive()

self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESession(
self.devCtrl, setUpCode.encode("utf-8"), nodeid)
)
self._pase_establishment_complete_future = concurrent.futures.Future()
try:
self._enablePairingCompeleteCallback(True)
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESession(
self.devCtrl, setUpCode.encode("utf-8"), nodeid)
).raise_on_error()

# TODO: This is a bit funky, but what the API returned with the previous
# implementation. We should revisit this.
err = self._pase_establishment_complete_future.result()
if not err.is_success:
return err.to_exception()
return None
finally:
self._pase_establishment_complete_future = None

def GetTestCommissionerUsed(self):
return self._ChipStack.Call(
@@ -642,11 +688,6 @@ def CheckStageSuccessful(self, stage: int):
def CheckTestCommissionerPaseConnection(self, nodeid):
return self._dmLib.pychip_TestPaseConnection(nodeid)

def NOCChainCallback(self, nocChain):
self._ChipStack.callbackRes = nocChain
self._ChipStack.completeEvent.set()
return

def ResolveNode(self, nodeid):
self.CheckIsActive()

@@ -753,12 +794,16 @@ def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: int,
Returns CommissioningParameters
'''
self.CheckIsActive()
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
).raise_on_error()
self._ChipStack.callbackRes.raise_on_error()
return self._ChipStack.openCommissioningWindowPincode[nodeid]
self._open_window_complete_future = concurrent.futures.Future()
try:
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
).raise_on_error()

return self._open_window_complete_future.result()
finally:
self._open_window_complete_future = None

def GetCompressedFabricId(self):
self.CheckIsActive()
@@ -1773,6 +1818,7 @@ def __init__(self, opCredsContext: ctypes.c_void_p, fabricId: int, nodeId: int,
f"caIndex({fabricAdmin.caIndex:x})/fabricId(0x{fabricId:016X})/nodeId(0x{nodeId:016X})"
)

self._issue_node_chain_complete: typing.Optional[concurrent.futures.Future] = None
self._dmLib.pychip_DeviceController_SetIssueNOCChainCallbackPythonCallback(_IssueNOCChainCallbackPythonCallback)

pairingDelegate = c_void_p(None)
@@ -1823,17 +1869,18 @@ def Commission(self, nodeid) -> PyChipError:
bool: True if successful, False otherwise.
'''
self.CheckIsActive()
self._ChipStack.commissioningCompleteEvent.clear()
self.state = DCState.COMMISSIONING

self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return PyChipError(CHIP_ERROR_TIMEOUT)
return self._ChipStack.commissioningEventRes
self._commissioning_complete_future = concurrent.futures.Future()

try:
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
).raise_on_error()

return self._commissioning_complete_future.result()
finally:
self._commissioning_complete_future = None

def CommissionThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes) -> PyChipError:
''' Commissions a Thread device over BLE
@@ -1963,25 +2010,21 @@ def CommissionOnNetwork(self, nodeId: int, setupPinCode: int,
'''
self.CheckIsActive()

# IP connection will run through full commissioning, so we need to wait
# for the commissioning complete event, not just any callback.
self.state = DCState.COMMISSIONING

# Convert numerical filters to string for passing down to binding.
if isinstance(filter, int):
filter = str(filter)

self._ChipStack.commissioningCompleteEvent.clear()
self._commissioning_complete_future = concurrent.futures.Future()
try:
self._enablePairingCompeleteCallback(True)
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec)
).raise_on_error()

self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec)
)
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return PyChipError(CHIP_ERROR_TIMEOUT)
return self._ChipStack.commissioningEventRes
return self._commissioning_complete_future.result()
finally:
self._commissioning_complete_future = None

def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: DiscoveryType = DiscoveryType.DISCOVERY_ALL) -> PyChipError:
''' Commission with the given nodeid from the setupPayload.
@@ -1991,51 +2034,57 @@ def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: Disc

setupPayload = setupPayload.encode() + b'\0'

# IP connection will run through full commissioning, so we need to wait
# for the commissioning complete event, not just any callback.
self.state = DCState.COMMISSIONING
self._commissioning_complete_future = concurrent.futures.Future()

self._ChipStack.commissioningCompleteEvent.clear()
try:
self._enablePairingCompeleteCallback(True)
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
self.devCtrl, setupPayload, nodeid, discoveryType.value)
).raise_on_error()

self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
self.devCtrl, setupPayload, nodeid, discoveryType.value)
)
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return PyChipError(CHIP_ERROR_TIMEOUT)
return self._ChipStack.commissioningEventRes
return self._commissioning_complete_future.result()
finally:
self._commissioning_complete_future = None

def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> PyChipError:
""" DEPRECATED, DO NOT USE! Use `CommissionOnNetwork` or `CommissionWithCode` """
self.CheckIsActive()

# IP connection will run through full commissioning, so we need to wait
# for the commissioning complete event, not just any callback.
self.state = DCState.COMMISSIONING
self._commissioning_complete_future = concurrent.futures.Future()

self._ChipStack.commissioningCompleteEvent.clear()
try:
self._enablePairingCompeleteCallback(True)
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
).raise_on_error()

self._enablePairingCompeleteCallback(True)
self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
)
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return PyChipError(CHIP_ERROR_TIMEOUT)
return self._ChipStack.commissioningEventRes
return self._commissioning_complete_future.result()
finally:
self._commissioning_complete_future = None

def NOCChainCallback(self, nocChain):
if self._issue_node_chain_complete is None:
logging.exception("NOCChainCallback while not expecting a callback")
return
self._issue_node_chain_complete.set_result(nocChain)
return

def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRResponse, nodeId: int):
"""Issue an NOC chain using the associated OperationalCredentialsDelegate.
The NOC chain will be provided in TLV cert format."""
self.CheckIsActive()

return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
)
self._issue_node_chain_complete = concurrent.futures.Future()
try:
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
).raise_on_error()
return self._issue_node_chain_complete.result()
finally:
self._issue_node_chain_complete = None


class BareChipDeviceController(ChipDeviceControllerBase):
27 changes: 1 addition & 26 deletions src/controller/python/chip/ChipStack.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
import builtins
import os
from ctypes import CFUNCTYPE, Structure, c_bool, c_char_p, c_uint16, c_uint32, c_void_p, py_object, pythonapi
from threading import Condition, Event, Lock
from threading import Condition, Lock

import chip.native
from chip.native import PyChipError
@@ -144,14 +144,9 @@ class ChipStack(object):
def __init__(self, persistentStoragePath: str, enableServerInteractions=True):
builtins.enableDebugMode = False

self.completeEvent = Event()
self.commissioningCompleteEvent = Event()
self._ChipStackLib = None
self._chipDLLPath = None
self.devMgr = None
self.callbackRes = None
self.commissioningEventRes = None
self.openCommissioningWindowPincode = {}
self._enableServerInteractions = enableServerInteractions

#
@@ -212,7 +207,6 @@ def Shutdown(self):
self._ChipStackLib = None
self._chipDLLPath = None
self.devMgr = None
self.callbackRes = None

delattr(builtins, "chipStack")

@@ -239,25 +233,6 @@ async def CallAsync(self, callFunct, timeoutMs: int = None):

return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None)

def CallAsyncWithCompleteCallback(self, callFunct):
'''Run a Python function on CHIP stack, and wait for the application specific response.
This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics.
Calling this function on CHIP on CHIP mainloop thread will cause deadlock.
Make sure to register the necessary callbacks which release the function by setting the completeEvent.
'''
# throw error if op in progress
self.callbackRes = None
self.completeEvent.clear()
res = self.PostTaskOnChipThread(callFunct).Wait()

if not res.is_success:
self.completeEvent.set()
raise res.to_exception()
self.completeEvent.wait()
if isinstance(self.callbackRes, ChipStackException):
raise self.callbackRes
return self.callbackRes

def PostTaskOnChipThread(self, callFunct) -> AsyncCallableHandle:
'''Run a Python function on CHIP stack, and wait for the response.
This function will post a task on CHIP mainloop, and return an object with Wait() method for getting the result.

0 comments on commit 1065000

Please sign in to comment.