From 28c5353186721602f1dc0d5fc1bbe3e06d3218b6 Mon Sep 17 00:00:00 2001 From: Nirav Shah Date: Tue, 18 May 2021 15:19:03 -0400 Subject: [PATCH] Implementation of ble-scan method with returning discovered devices (#18) * Implemented ble_scan with peripheral list return. --- src/controller/python/chip-device-ctrl.py | 53 ++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/src/controller/python/chip-device-ctrl.py b/src/controller/python/chip-device-ctrl.py index da29208feebd93..739cc4fedca662 100755 --- a/src/controller/python/chip-device-ctrl.py +++ b/src/controller/python/chip-device-ctrl.py @@ -44,6 +44,8 @@ from chip.ChipBleUtility import FAKE_CONN_OBJ_VALUE from chip.setup_payload import SetupPayload from xmlrpc.server import SimpleXMLRPCServer +from enum import Enum +from typing import Any, Dict,Optional from enum import Enum from typing import Any, Dict,Optional @@ -736,6 +738,7 @@ def emptyline(self): device_manager = DeviceMgrCmd(rendezvousAddr=None, controllerNodeId=0, bluetoothAdapter=0) + # CHIP commands needed by the Harness Tool def echo_alive(message): print(message) @@ -751,6 +754,41 @@ def resolve(fabric_id: int, node_id: int) -> Dict[str, Any]: *address) if address else "unknown" return __get_response_dict(status = StatusCodeEnum.SUCCESS, result = {'address': address}) + +def ble_scan() -> Dict[Any, Any]: + try: + __check_supported_os() + device_manager.do_blescan("") + + return __get_response_dict(status = StatusCodeEnum.SUCCESS, result = __get_peripheral_list()) + except Exception as e: + return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(e)) + +def __get_peripheral_list() -> Dict[Any, Any]: + device_list = [] + for device in device_manager.bleMgr.peripheral_list: + device_detail = {} + devIdInfo = device_manager.bleMgr.get_peripheral_devIdInfo(device) + if devIdInfo != None: + device_detail['name'] = str(device.Name) + device_detail['id'] = str(device.device_id) + device_detail['rssi'] = str(device.RSSI) + device_detail['address'] = str(device.Address) + device_detail['pairing_state'] = devIdInfo.pairingState + device_detail['discriminator'] = devIdInfo.discriminator + device_detail['vendor_id'] = devIdInfo.vendorId + device_detail['product_id'] = devIdInfo.productId + if device.ServiceData: + for advuuid in device.ServiceData: + device_detail['adv_uuid'] = str(advuuid) + device_list.append(device_detail) + return device_list + +def ble_connect(discriminator: int, pin_code: int, node_id: int) -> Dict[str, any]: + try: + __check_supported_os() + device_manager.devCtrl.ConnectBLE(discriminator, pin_code, node_id) + return __get_response_dict(status = StatusCodeEnum.SUCCESS) except exceptions.ChipStackException as ex: return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(ex)) except Exception as e: @@ -760,16 +798,28 @@ def ble_scan(): device_manager.do_blescan("") #TODO: Return a list of available devices return "Scan started" - + +def ip_connect(ip_address: string, pin_code: int, node_id: int) -> Dict[str, any]: + try: + __check_supported_os() + device_manager.devCtrl.ConnectIP(ip_address.encode("utf-8"), pin_code, node_id) + return __get_response_dict(status = StatusCodeEnum.SUCCESS) + except exceptions.ChipStackException as ex: + return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(ex)) + except Exception as e: + return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(e)) def qr_code_parse(qr_code): return SetupPayload().ParseQrCode(qr_code).Dictionary() def start_rpc_server(): + with SimpleXMLRPCServer(("0.0.0.0", 5000)) as server: server.register_function(echo_alive) server.register_function(ble_scan) server.register_function(qr_code_parse) server.register_function(resolve) + server.register_function(ble_connect) + server.register_function(ip_connect) server.register_multicall_functions() print('Serving XML-RPC on localhost port 5000') try: @@ -788,6 +838,7 @@ def __check_supported_os()-> bool: return True raise Exception("OS Not Supported") + ######--------------------------------------------------###### def main():