Skip to content

Commit

Permalink
[Python] Convert async API functions to python asyncio (project-chip#…
Browse files Browse the repository at this point in the history
…33989)

* [Python] Use context manager for Commissioning

Use a context manager to handle the commissioning process in the
device controller. This will ensure that the commissioning resources
are properly cleaned up after completion and removes boiler plate
code.

Also clear fabricCheckNodeId and mark it internal use by adding the
underline prefix.

Also call pychip_ScriptDevicePairingDelegate_SetExpectingPairingComplete
directly on the Python Thread, as this is an atomic operation. This is
will also be more asyncio friendly as it is guaranteed to not block.

* [Python] Use context manager for all callbacks

Use context managers for all APIs which wait for callbacks. This
allows to cleanly wrap the future and add additional handling e.g.
locks for asyncio in the future.

* [Python] Convert commissioning APIs to async functions

Make all commissioning APIs async functions. This avoids the need
to use run_in_executor() to call them from asyncio code in a non-
blocking way.

* [Python] Convert UnpairDevice/OpenCommissioningWindow to asyncio

* [Python] Convert EstablishPASESession to asyncio

* [Python] Convert IssueNOCChain to asyncio

* [Python] Add locking to prevent concurrent access with asyncio

Make sure that different asyncio tasks do not run the same function
concurrently. This is done by adding an asyncio lock to functions
which use callbacks.

* [Python] Raise an exception if the future did not complete

* [Python] Convert tests in src/controller/python/ to asyncio

* [Python] Convert tests in src/python_testing/ to asyncio

* Adjust yamltest_with_chip_repl_tester to use asyncio

* [Python] Add documentation to the new context managers

* [Python] Use asyncio.run() to run async tests
  • Loading branch information
agners authored Jun 20, 2024
1 parent ed87048 commit e407d40
Show file tree
Hide file tree
Showing 28 changed files with 331 additions and 322 deletions.
8 changes: 4 additions & 4 deletions scripts/tests/chiptest/yamltest_with_chip_repl_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def execute_test(yaml, runner):
'--pics-file',
default=None,
help='Optional PICS file')
def main(setup_code, yaml_path, node_id, pics_file):
async def main(setup_code, yaml_path, node_id, pics_file):
# Setting up python environment for running YAML CI tests using python parser.
with tempfile.NamedTemporaryFile() as chip_stack_storage:
chip.native.Init()
Expand All @@ -122,7 +122,7 @@ def main(setup_code, yaml_path, node_id, pics_file):
# Creating and commissioning to a single controller to match what is currently done when
# running.
dev_ctrl = ca_list[0].adminList[0].NewController()
dev_ctrl.CommissionWithCode(setup_code, node_id)
await dev_ctrl.CommissionWithCode(setup_code, node_id)

def _StackShutDown():
# Tearing down chip stack. If not done in the correct order test will fail.
Expand All @@ -143,7 +143,7 @@ def _StackShutDown():
runner = ReplTestRunner(
clusters_definitions, certificate_authority_manager, dev_ctrl)

asyncio.run(execute_test(yaml, runner))
await execute_test(yaml, runner)

except Exception:
print(traceback.format_exc())
Expand All @@ -153,4 +153,4 @@ def _StackShutDown():


if __name__ == '__main__':
main()
asyncio.run(main())
308 changes: 159 additions & 149 deletions src/controller/python/chip/ChipDeviceCtrl.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/controller/python/chip/commissioning/pase.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def __exit__(self, type, value, traceback):
self.devCtrl.CloseBLEConnection(self.is_ble)


def establish_session(devCtrl: ChipDeviceCtrl.ChipDeviceControllerBase, parameter: commissioning.PaseParameters) -> ContextManager:
async def establish_session(devCtrl: ChipDeviceCtrl.ChipDeviceControllerBase, parameter: commissioning.PaseParameters) -> ContextManager:
if isinstance(parameter, commissioning.PaseOverBLEParameters):
devCtrl.EstablishPASESessionBLE(parameter.setup_pin, parameter.discriminator, parameter.temporary_nodeid)
await devCtrl.EstablishPASESessionBLE(parameter.setup_pin, parameter.discriminator, parameter.temporary_nodeid)
elif isinstance(parameter, commissioning.PaseOverIPParameters):
device = devCtrl.DiscoverCommissionableNodes(filterType=discovery.FilterType.LONG_DISCRIMINATOR,
filter=parameter.long_discriminator, stopOnFirst=True)
Expand All @@ -63,7 +63,7 @@ def establish_session(devCtrl: ChipDeviceCtrl.ChipDeviceControllerBase, paramete
break
if selected_address is None:
raise ValueError("The node for commissioning does not contains routable ip addresses information")
devCtrl.EstablishPASESessionIP(selected_address, parameter.setup_pin, parameter.temporary_nodeid)
await devCtrl.EstablishPASESessionIP(selected_address, parameter.setup_pin, parameter.temporary_nodeid)
else:
raise TypeError("Expect PaseOverBLEParameters or PaseOverIPParameters for establishing PASE session")
return ContextManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl,

csrForAddNOC = await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32)))

chainForAddNOC = newFabricDevCtrl.IssueNOCChain(csrForAddNOC, newNodeId)
chainForAddNOC = await newFabricDevCtrl.IssueNOCChain(csrForAddNOC, newNodeId)
if (chainForAddNOC.rcacBytes is None or
chainForAddNOC.icacBytes is None or
chainForAddNOC.nocBytes is None or chainForAddNOC.ipkBytes is None):
Expand Down Expand Up @@ -225,7 +225,7 @@ async def UpdateNOC(devCtrl, existingNodeId, newNodeId):
return False
csrForUpdateNOC = await devCtrl.SendCommand(
existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32), isForUpdateNOC=True))
chainForUpdateNOC = devCtrl.IssueNOCChain(csrForUpdateNOC, newNodeId)
chainForUpdateNOC = await devCtrl.IssueNOCChain(csrForUpdateNOC, newNodeId)
if (chainForUpdateNOC.rcacBytes is None or
chainForUpdateNOC.icacBytes is None or
chainForUpdateNOC.nocBytes is None or chainForUpdateNOC.ipkBytes is None):
Expand Down
2 changes: 1 addition & 1 deletion src/controller/python/chip/yaml/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
return _ActionResult(status=_ActionStatus.SUCCESS, response=_GetCommissionerNodeIdResult(dev_ctrl.nodeId))

try:
dev_ctrl.CommissionWithCode(self._setup_payload, self._node_id)
await dev_ctrl.CommissionWithCode(self._setup_payload, self._node_id)
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
except ChipStackError:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def start(self):
# device with the provided node id.
if self._node_id_to_commission is not None:
# Magic value is the defaults expected for YAML tests.
dev_ctrl.CommissionWithCode('MT:-24J0AFN00KA0648G00', self._node_id_to_commission)
await dev_ctrl.CommissionWithCode('MT:-24J0AFN00KA0648G00', self._node_id_to_commission)

self._chip_stack = chip_stack
self._certificate_authority_manager = certificate_authority_manager
Expand Down
48 changes: 24 additions & 24 deletions src/controller/python/test/test_scripts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def CreateNewFabricController(self):
async def TestRevokeCommissioningWindow(self, ip: str, setuppin: int, nodeid: int):
await self.devCtrl.SendCommand(
nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180), timedRequestTimeoutMs=10000)
if not self.TestPaseOnly(ip=ip, setuppin=setuppin, nodeid=nodeid, devCtrl=self.devCtrl2):
if not await self.TestPaseOnly(ip=ip, setuppin=setuppin, nodeid=nodeid, devCtrl=self.devCtrl2):
return False

await self.devCtrl2.SendCommand(
Expand All @@ -248,17 +248,17 @@ async def TestRevokeCommissioningWindow(self, ip: str, setuppin: int, nodeid: in
nodeid, 0, Clusters.AdministratorCommissioning.Commands.RevokeCommissioning(), timedRequestTimeoutMs=10000)
return True

def TestEnhancedCommissioningWindow(self, ip: str, nodeid: int):
params = self.devCtrl.OpenCommissioningWindow(nodeid=nodeid, timeout=600, iteration=10000, discriminator=3840, option=1)
return self.TestPaseOnly(ip=ip, nodeid=nodeid, setuppin=params.setupPinCode, devCtrl=self.devCtrl2)
async def TestEnhancedCommissioningWindow(self, ip: str, nodeid: int):
params = await self.devCtrl.OpenCommissioningWindow(nodeid=nodeid, timeout=600, iteration=10000, discriminator=3840, option=1)
return await self.TestPaseOnly(ip=ip, nodeid=nodeid, setuppin=params.setupPinCode, devCtrl=self.devCtrl2)

def TestPaseOnly(self, ip: str, setuppin: int, nodeid: int, devCtrl=None):
async def TestPaseOnly(self, ip: str, setuppin: int, nodeid: int, devCtrl=None):
if devCtrl is None:
devCtrl = self.devCtrl
self.logger.info(
"Attempting to establish PASE session with device id: {} addr: {}".format(str(nodeid), ip))
try:
devCtrl.EstablishPASESessionIP(ip, setuppin, nodeid)
await devCtrl.EstablishPASESessionIP(ip, setuppin, nodeid)
except ChipStackException:
self.logger.info(
"Failed to establish PASE session with device id: {} addr: {}".format(str(nodeid), ip))
Expand All @@ -267,11 +267,11 @@ def TestPaseOnly(self, ip: str, setuppin: int, nodeid: int, devCtrl=None):
"Successfully established PASE session with device id: {} addr: {}".format(str(nodeid), ip))
return True

def TestCommissionOnly(self, nodeid: int):
async def TestCommissionOnly(self, nodeid: int):
self.logger.info(
"Commissioning device with id {}".format(nodeid))
try:
self.devCtrl.Commission(nodeid)
await self.devCtrl.Commission(nodeid)
except ChipStackException:
self.logger.info(
"Failed to commission device with id {}".format(str(nodeid)))
Expand All @@ -280,17 +280,17 @@ def TestCommissionOnly(self, nodeid: int):
"Successfully commissioned device with id {}".format(str(nodeid)))
return True

def TestKeyExchangeBLE(self, discriminator: int, setuppin: int, nodeid: int):
async def TestKeyExchangeBLE(self, discriminator: int, setuppin: int, nodeid: int):
self.logger.info(
"Conducting key exchange with device {}".format(discriminator))
if not self.devCtrl.ConnectBLE(discriminator, setuppin, nodeid):
if not await self.devCtrl.ConnectBLE(discriminator, setuppin, nodeid):
self.logger.info(
"Failed to finish key exchange with device {}".format(discriminator))
return False
self.logger.info("Device finished key exchange.")
return True

def TestCommissionFailure(self, nodeid: int, failAfter: int):
async def TestCommissionFailure(self, nodeid: int, failAfter: int):
self.devCtrl.ResetTestCommissioner()
a = self.devCtrl.SetTestCommissionerSimulateFailureOnStage(failAfter)
if not a:
Expand All @@ -299,43 +299,43 @@ def TestCommissionFailure(self, nodeid: int, failAfter: int):

self.logger.info(
"Commissioning device, expecting failure after stage {}".format(failAfter))
self.devCtrl.Commission(nodeid)
await self.devCtrl.Commission(nodeid)
return self.devCtrl.CheckTestCommissionerCallbacks() and self.devCtrl.CheckTestCommissionerPaseConnection(nodeid)

def TestCommissionFailureOnReport(self, nodeid: int, failAfter: int):
async def TestCommissionFailureOnReport(self, nodeid: int, failAfter: int):
self.devCtrl.ResetTestCommissioner()
a = self.devCtrl.SetTestCommissionerSimulateFailureOnReport(failAfter)
if not a:
# We're not going to hit this stage during commissioning so no sense trying, just say it was fine.
return True
self.logger.info(
"Commissioning device, expecting failure on report for stage {}".format(failAfter))
self.devCtrl.Commission(nodeid)
await self.devCtrl.Commission(nodeid)
return self.devCtrl.CheckTestCommissionerCallbacks() and self.devCtrl.CheckTestCommissionerPaseConnection(nodeid)

def TestCommissioning(self, ip: str, setuppin: int, nodeid: int):
async def TestCommissioning(self, ip: str, setuppin: int, nodeid: int):
self.logger.info("Commissioning device {}".format(ip))
try:
self.devCtrl.CommissionIP(ip, setuppin, nodeid)
await self.devCtrl.CommissionIP(ip, setuppin, nodeid)
except ChipStackException:
self.logger.exception(
"Failed to finish commissioning device {}".format(ip))
return False
self.logger.info("Commissioning finished.")
return True

def TestCommissioningWithSetupPayload(self, setupPayload: str, nodeid: int, discoveryType: int = 2):
async def TestCommissioningWithSetupPayload(self, setupPayload: str, nodeid: int, discoveryType: int = 2):
self.logger.info("Commissioning device with setup payload {}".format(setupPayload))
try:
self.devCtrl.CommissionWithCode(setupPayload, nodeid, chip.discovery.DiscoveryType(discoveryType))
await self.devCtrl.CommissionWithCode(setupPayload, nodeid, chip.discovery.DiscoveryType(discoveryType))
except ChipStackException:
self.logger.exception(
"Failed to finish commissioning device {}".format(setupPayload))
return False
self.logger.info("Commissioning finished.")
return True

def TestOnNetworkCommissioning(self, discriminator: int, setuppin: int, nodeid: int, ip_override: str = None):
async def TestOnNetworkCommissioning(self, discriminator: int, setuppin: int, nodeid: int, ip_override: str = None):
self.logger.info("Testing discovery")
device = self.TestDiscovery(discriminator=discriminator)
if not device:
Expand All @@ -345,7 +345,7 @@ def TestOnNetworkCommissioning(self, discriminator: int, setuppin: int, nodeid:
if ip_override:
address = ip_override
self.logger.info("Testing commissioning")
if not self.TestCommissioning(address, setuppin, nodeid):
if not await self.TestCommissioning(address, setuppin, nodeid):
self.logger.info("Failed to finish commissioning")
return False
return True
Expand Down Expand Up @@ -792,7 +792,7 @@ async def TestMultiFabric(self, ip: str, setuppin: int, nodeid: int):
self.controllerNodeId, self.paaTrustStorePath)

try:
self.devCtrl2.CommissionIP(ip, setuppin, nodeid)
await self.devCtrl2.CommissionIP(ip, setuppin, nodeid)
except ChipStackException:
self.logger.exception(
"Failed to finish key exchange with device {}".format(ip))
Expand Down Expand Up @@ -1313,15 +1313,15 @@ def TestNonControllerAPIs(self):
return False
return True

def TestFabricScopedCommandDuringPase(self, nodeid: int):
async def TestFabricScopedCommandDuringPase(self, nodeid: int):
'''Validates that fabric-scoped commands fail during PASE with UNSUPPORTED_ACCESS
The nodeid is the PASE pseudo-node-ID used during PASE establishment
'''
status = None
try:
asyncio.run(self.devCtrl.SendCommand(
nodeid, 0, Clusters.OperationalCredentials.Commands.UpdateFabricLabel("roboto")))
await self.devCtrl.SendCommand(
nodeid, 0, Clusters.OperationalCredentials.Commands.UpdateFabricLabel("roboto"))
except IM.InteractionModelError as ex:
status = ex.status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
GROUP_ID = 0


def main():
async def main():
optParser = OptionParser()
optParser.add_option(
"-t",
Expand Down Expand Up @@ -98,32 +98,32 @@ def main():
# TODO: Start at stage 2 once handling for arming failsafe on pase is done.
if options.report:
for testFailureStage in range(3, 21):
FailIfNot(test.TestPaseOnly(ip=options.deviceAddress1,
setuppin=20202021,
nodeid=1),
FailIfNot(await test.TestPaseOnly(ip=options.deviceAddress1,
setuppin=20202021,
nodeid=1),
"Failed to establish PASE connection with device")
FailIfNot(test.TestCommissionFailureOnReport(1, testFailureStage),
FailIfNot(await test.TestCommissionFailureOnReport(1, testFailureStage),
"Commissioning failure tests failed for simulated report failure on stage {}".format(testFailureStage))

else:
for testFailureStage in range(3, 21):
FailIfNot(test.TestPaseOnly(ip=options.deviceAddress1,
setuppin=20202021,
nodeid=1),
FailIfNot(await test.TestPaseOnly(ip=options.deviceAddress1,
setuppin=20202021,
nodeid=1),
"Failed to establish PASE connection with device")
FailIfNot(test.TestCommissionFailure(1, testFailureStage),
FailIfNot(await test.TestCommissionFailure(1, testFailureStage),
"Commissioning failure tests failed for simulated stage failure on stage {}".format(testFailureStage))

# Ensure we can still commission for real
FailIfNot(test.TestPaseOnly(ip=options.deviceAddress1,
setuppin=20202021,
nodeid=1),
FailIfNot(await test.TestPaseOnly(ip=options.deviceAddress1,
setuppin=20202021,
nodeid=1),
"Failed to establish PASE connection with device")
FailIfNot(test.TestCommissionFailure(1, 0), "Failed to commission device")
FailIfNot(await test.TestCommissionFailure(1, 0), "Failed to commission device")

logger.info("Testing on off cluster")
FailIfNot(asyncio.run(test.TestOnOffCluster(nodeid=1,
endpoint=LIGHTING_ENDPOINT_ID)), "Failed to test on off cluster")
FailIfNot(await test.TestOnOffCluster(nodeid=1,
endpoint=LIGHTING_ENDPOINT_ID), "Failed to test on off cluster")

timeoutTicker.stop()

Expand All @@ -136,7 +136,7 @@ def main():

if __name__ == "__main__":
try:
main()
asyncio.run(main())
except Exception as ex:
logger.exception(ex)
TestFail("Exception occurred when running tests.")
20 changes: 10 additions & 10 deletions src/controller/python/test/test_scripts/commissioning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
GROUP_ID = 0


def main():
async def main():
optParser = OptionParser()
optParser.add_option(
"-t",
Expand Down Expand Up @@ -133,22 +133,22 @@ def main():

if options.deviceAddress:
logger.info("Testing commissioning (IP)")
FailIfNot(test.TestCommissioning(ip=options.deviceAddress,
setuppin=20202021,
nodeid=options.nodeid),
FailIfNot(await test.TestCommissioning(ip=options.deviceAddress,
setuppin=20202021,
nodeid=options.nodeid),
"Failed to finish commissioning")
elif options.setupPayload:
logger.info("Testing commissioning (w/ Setup Payload)")
FailIfNot(test.TestCommissioningWithSetupPayload(setupPayload=options.setupPayload,
nodeid=options.nodeid,
discoveryType=options.discoveryType),
FailIfNot(await test.TestCommissioningWithSetupPayload(setupPayload=options.setupPayload,
nodeid=options.nodeid,
discoveryType=options.discoveryType),
"Failed to finish commissioning")
else:
TestFail("Must provide device address or setup payload to commissioning the device")

logger.info("Testing on off cluster")
FailIfNot(asyncio.run(test.TestOnOffCluster(nodeid=options.nodeid,
endpoint=LIGHTING_ENDPOINT_ID)), "Failed to test on off cluster")
FailIfNot(await test.TestOnOffCluster(nodeid=options.nodeid,
endpoint=LIGHTING_ENDPOINT_ID), "Failed to test on off cluster")

FailIfNot(test.TestUsedTestCommissioner(),
"Test commissioner check failed")
Expand All @@ -164,7 +164,7 @@ def main():

if __name__ == "__main__":
try:
main()
asyncio.run(main())
except Exception as ex:
logger.exception(ex)
TestFail("Exception occurred when running tests.")
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ async def main():
"Failed to finish network commissioning")

logger.info("Commissioning DUT from first commissioner")
FailIfNot(test.TestPaseOnly(ip=options.deviceAddress, setuppin=20202021, nodeid=1),
FailIfNot(await test.TestPaseOnly(ip=options.deviceAddress, setuppin=20202021, nodeid=1),
"Unable to establish PASE connection to device")
FailIfNot(test.TestCommissionOnly(nodeid=1), "Unable to commission device")
FailIfNot(await test.TestCommissionOnly(nodeid=1), "Unable to commission device")

logger.info("Creating controller on a new fabric")
FailIfNot(test.CreateNewFabricController(), "Unable to create new controller")
Expand All @@ -103,7 +103,7 @@ async def main():
"RevokeCommissioning test failed")

logger.info("Test Enhanced Commissioning Window")
FailIfNot(test.TestEnhancedCommissioningWindow(ip=options.deviceAddress, nodeid=1), "EnhancedCommissioningWindow open failed")
FailIfNot(await test.TestEnhancedCommissioningWindow(ip=options.deviceAddress, nodeid=1), "EnhancedCommissioningWindow open failed")

timeoutTicker.stop()

Expand Down
Loading

0 comments on commit e407d40

Please sign in to comment.