Skip to content

Commit

Permalink
Implementation of ble-scan method with returning discovered devices (#18
Browse files Browse the repository at this point in the history
)

* Implemented ble_scan with peripheral list return.
  • Loading branch information
nirav1911 authored and doublemis1 committed Jun 28, 2021
1 parent f1eedfa commit 28c5353
Showing 1 changed file with 52 additions and 1 deletion.
53 changes: 52 additions & 1 deletion src/controller/python/chip-device-ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
from chip.ChipBleUtility import FAKE_CONN_OBJ_VALUE
from chip.setup_payload import SetupPayload
from xmlrpc.server import SimpleXMLRPCServer
from enum import Enum
from typing import Any, Dict,Optional

from enum import Enum
from typing import Any, Dict,Optional
Expand Down Expand Up @@ -736,6 +738,7 @@ def emptyline(self):
device_manager = DeviceMgrCmd(rendezvousAddr=None,
controllerNodeId=0, bluetoothAdapter=0)


# CHIP commands needed by the Harness Tool
def echo_alive(message):
print(message)
Expand All @@ -751,6 +754,41 @@ def resolve(fabric_id: int, node_id: int) -> Dict[str, Any]:
*address) if address else "unknown"

return __get_response_dict(status = StatusCodeEnum.SUCCESS, result = {'address': address})

def ble_scan() -> Dict[Any, Any]:
try:
__check_supported_os()
device_manager.do_blescan("")

return __get_response_dict(status = StatusCodeEnum.SUCCESS, result = __get_peripheral_list())
except Exception as e:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(e))

def __get_peripheral_list() -> Dict[Any, Any]:
device_list = []
for device in device_manager.bleMgr.peripheral_list:
device_detail = {}
devIdInfo = device_manager.bleMgr.get_peripheral_devIdInfo(device)
if devIdInfo != None:
device_detail['name'] = str(device.Name)
device_detail['id'] = str(device.device_id)
device_detail['rssi'] = str(device.RSSI)
device_detail['address'] = str(device.Address)
device_detail['pairing_state'] = devIdInfo.pairingState
device_detail['discriminator'] = devIdInfo.discriminator
device_detail['vendor_id'] = devIdInfo.vendorId
device_detail['product_id'] = devIdInfo.productId
if device.ServiceData:
for advuuid in device.ServiceData:
device_detail['adv_uuid'] = str(advuuid)
device_list.append(device_detail)
return device_list

def ble_connect(discriminator: int, pin_code: int, node_id: int) -> Dict[str, any]:
try:
__check_supported_os()
device_manager.devCtrl.ConnectBLE(discriminator, pin_code, node_id)
return __get_response_dict(status = StatusCodeEnum.SUCCESS)
except exceptions.ChipStackException as ex:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(ex))
except Exception as e:
Expand All @@ -760,16 +798,28 @@ def ble_scan():
device_manager.do_blescan("")
#TODO: Return a list of available devices
return "Scan started"


def ip_connect(ip_address: string, pin_code: int, node_id: int) -> Dict[str, any]:
try:
__check_supported_os()
device_manager.devCtrl.ConnectIP(ip_address.encode("utf-8"), pin_code, node_id)
return __get_response_dict(status = StatusCodeEnum.SUCCESS)
except exceptions.ChipStackException as ex:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(ex))
except Exception as e:
return __get_response_dict(status = StatusCodeEnum.FAILED, error = str(e))
def qr_code_parse(qr_code):
return SetupPayload().ParseQrCode(qr_code).Dictionary()

def start_rpc_server():

with SimpleXMLRPCServer(("0.0.0.0", 5000)) as server:
server.register_function(echo_alive)
server.register_function(ble_scan)
server.register_function(qr_code_parse)
server.register_function(resolve)
server.register_function(ble_connect)
server.register_function(ip_connect)
server.register_multicall_functions()
print('Serving XML-RPC on localhost port 5000')
try:
Expand All @@ -788,6 +838,7 @@ def __check_supported_os()-> bool:
return True

raise Exception("OS Not Supported")

######--------------------------------------------------######

def main():
Expand Down

0 comments on commit 28c5353

Please sign in to comment.