Skip to content

Commit

Permalink
[Python] Cluster Object based Subscription & Remove Generated C++ cod…
Browse files Browse the repository at this point in the history
…e for python (#12285)

* [Python] Add subscription support to ClusterObject API

* Remove unused code

* Run Codegen
  • Loading branch information
erjiaqing authored and pull[bot] committed Jun 9, 2022
1 parent f356dc3 commit 3844729
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 18,719 deletions.
1 change: 0 additions & 1 deletion src/controller/python/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ shared_library("ChipDeviceCtrl") {
"ChipDeviceController-ScriptDevicePairingDelegate.h",
"ChipDeviceController-StorageDelegate.cpp",
"ChipDeviceController-StorageDelegate.h",
"chip/clusters/CHIPClusters.cpp",
"chip/clusters/attribute.cpp",
"chip/clusters/command.cpp",
"chip/discovery/NodeResolution.cpp",
Expand Down
4 changes: 3 additions & 1 deletion src/controller/python/chip-device-ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,8 +824,10 @@ def do_zclsubscribe(self, line):
elif len(args) == 6:
if args[0] not in all_attrs:
raise exceptions.UnknownCluster(args[0])
self.devCtrl.ZCLSubscribeAttribute(args[0], args[1], int(
res = self.devCtrl.ZCLSubscribeAttribute(args[0], args[1], int(
args[2]), int(args[3]), int(args[4]), int(args[5]))
print(res.GetAllValues())
print(f"Subscription Established: {res}")
elif len(args) == 2 and args[0] == '-shutdown':
subscriptionId = int(args[1], base=0)
self.devCtrl.ZCLShutdownSubscription(subscriptionId)
Expand Down
41 changes: 22 additions & 19 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from __future__ import print_function
import asyncio
from ctypes import *

from .ChipStack import *
from .interaction_model import InteractionModelError, delegate as im
from .exceptions import *
Expand Down Expand Up @@ -110,7 +111,6 @@ def HandleKeyExchangeComplete(err):
err)
else:
print("Secure Session to Device Established")
self._ChipStack.callbackRes = True
self.state = DCState.IDLE
self._ChipStack.completeEvent.set()

Expand Down Expand Up @@ -423,7 +423,7 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.List[typing.Union[
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]]
]]):
]], reportInterval: typing.Tuple[int, int] = None):
'''
Read a list of attributes from a target node
Expand All @@ -438,10 +438,13 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.List[typing.Union[
The cluster and attributes specified above are to be selected from the generated cluster objects.
e.g
e.g.
ReadAttribute(1, [ 1 ] ) -- case 4 above.
ReadAttribute(1, [ Clusters.Basic ] ) -- case 5 above.
ReadAttribute(1, [ (1, Clusters.Basic.Attributes.Location ] ) -- case 1 above.
reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions.
When not provided, a read request will be sent.
'''

eventLoop = asyncio.get_running_loop()
Expand Down Expand Up @@ -478,7 +481,7 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.List[typing.Union[
attrs.append(ClusterAttribute.AttributePath(
EndpointId=endpoint, Cluster=cluster, Attribute=attribute))
res = self._ChipStack.Call(
lambda: ClusterAttribute.ReadAttributes(future, eventLoop, device, attrs))
lambda: ClusterAttribute.ReadAttributes(future, eventLoop, device, self, attrs, ClusterAttribute.SubscriptionParameters(reportInterval[0], reportInterval[1]) if reportInterval else None))
if res != 0:
raise self._ChipStack.ErrorToException(res)
return await future
Expand All @@ -498,13 +501,16 @@ def ZCLSend(self, cluster, command, nodeid, endpoint, groupid, args, blocking=Fa
return (int(ex.state), None)

def ZCLReadAttribute(self, cluster, attribute, nodeid, endpoint, groupid, blocking=True):
device = self.GetConnectedDeviceSync(nodeid)
req = None
try:
req = eval(f"GeneratedObjects.{cluster}.Attributes.{attribute}")
except:
raise UnknownAttribute(cluster, attribute)

# We are not using IM for Attributes.
self._Cluster.ReadAttribute(
device, cluster, attribute, endpoint, groupid, False)
if blocking:
return im.GetAttributeReadResponse(im.DEFAULT_ATTRIBUTEREAD_APPID)
result = asyncio.run(self.ReadAttribute(nodeid, [(endpoint, req)]))
path = ClusterAttribute.AttributePath(
EndpointId=endpoint, Attribute=req)
return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId), status=0, value=result[path].Data.value)

def ZCLWriteAttribute(self, cluster: str, attribute: str, nodeid, endpoint, groupid, value, blocking=True):
req = None
Expand All @@ -517,15 +523,12 @@ def ZCLWriteAttribute(self, cluster: str, attribute: str, nodeid, endpoint, grou
return asyncio.run(self.WriteAttribute(nodeid, [(endpoint, req)]))

def ZCLSubscribeAttribute(self, cluster, attribute, nodeid, endpoint, minInterval, maxInterval, blocking=True):
device = self.GetConnectedDeviceSync(nodeid)

commandSenderHandle = self._dmLib.pychip_GetCommandSenderHandle(device)
im.ClearCommandStatus(commandSenderHandle)
self._Cluster.SubscribeAttribute(
device, cluster, attribute, endpoint, minInterval, maxInterval, commandSenderHandle != 0)
if blocking:
# We only send 1 command by this function, so index is always 0
return im.WaitCommandIndexStatus(commandSenderHandle, 1)
req = None
try:
req = eval(f"GeneratedObjects.{cluster}.Attributes.{attribute}")
except:
raise UnknownAttribute(cluster, attribute)
return asyncio.run(self.ReadAttribute(nodeid, [(endpoint, req)], reportInterval=(minInterval, maxInterval)))

def ZCLShutdownSubscription(self, subscriptionId: int):
res = self._ChipStack.Call(
Expand Down
134 changes: 117 additions & 17 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
from asyncio.futures import Future
import ctypes
from dataclasses import dataclass
from typing import Type, Union, List, Any
from ctypes import CFUNCTYPE, c_char_p, c_size_t, c_void_p, c_uint32, c_uint16, py_object
from typing import Tuple, Type, Union, List, Any, Callable
from ctypes import CFUNCTYPE, c_char_p, c_size_t, c_void_p, c_uint32, c_uint16, py_object, c_uint64

from .ClusterObjects import ClusterAttributeDescriptor
from .ClusterObjects import Cluster, ClusterAttributeDescriptor
import chip.exceptions
import chip.interaction_model
import chip.tlv

import inspect
import sys
import logging
import threading


@dataclass
Expand Down Expand Up @@ -59,6 +60,14 @@ def __init__(self, EndpointId: int = None, Cluster=None, Attribute=None, Cluster
def __str__(self) -> str:
return f"{self.EndpointId}/{self.ClusterId}/{self.AttributeId}"

def __hash__(self):
return str(self).__hash__()


@dataclass
class AttributePathWithListIndex(AttributePath):
ListIndex: int = None


@dataclass
class AttributeStatus:
Expand Down Expand Up @@ -114,13 +123,73 @@ def _BuildAttributeIndex():
'chip.clusters.Objects.' + clusterName + '.Attributes.' + attributeName)


def _on_update_noop(path: AttributePath, value: Any):
'''
Default OnUpdate callback, simplily does nothing.
'''
pass


@dataclass
class SubscriptionParameters:
MinReportIntervalFloorSeconds: int
MaxReportIntervalCeilingSeconds: int


class SubscriptionTransaction:
def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl):
self._on_update = _on_update_noop
self._read_transaction = transaction
self._subscriptionId = subscriptionId
self._devCtrl = devCtrl

def GetValue(self, path: Tuple[int, Type[ClusterAttributeDescriptor]]):
'''
Gets the attribute from cache, returns the value and the timestamp when it was updated last time.
'''
return self._read_transaction.GetValue(AttributePath(path[0], Attribute=path[1]))

def GetAllValues(self):
return self._read_transaction.GetAllValues()

def SetAttributeUpdateCallback(self, callback: Callable[[AttributePath, Any], None]):
'''
Sets the callback function for the attribute value change event, accepts a Callable accpets an attribute path and its updated value.
'''
if callback is None:
self._on_update = _on_update_noop
else:
self._on_update = callback

@property
def OnUpdate(self) -> Callable[[AttributePath, Any], None]:
return self._on_update

def Shutdown(self):
self._devCtrl.ZCLShutdownSubscription(self._subscriptionId)

def __repr__(self):
return f'<Subscription (Id={self._subscriptionId})>'


class AsyncReadTransaction:
def __init__(self, future: Future, eventLoop):
def __init__(self, future: Future, eventLoop, devCtrl):
self._event_loop = eventLoop
self._future = future
self._res = []
self._subscription_handler = None
self._res = {}
self._devCtrl = devCtrl
# For subscriptions, the data comes from CHIP Thread, whild the value will be accessed from Python's thread, so a lock is required here.
self._resLock = threading.Lock()

def GetValue(self, path: AttributePath):
with self._resLock:
return self._res.get(path)

def _handleAttributeData(self, path: AttributePath, status: int, data: bytes):
def GetAllValues(self):
return self._res

def _handleAttributeData(self, path: AttributePathWithListIndex, status: int, data: bytes):
try:
imStatus = status
try:
Expand All @@ -142,14 +211,21 @@ def _handleAttributeData(self, path: AttributePath, status: int, data: bytes):
f"Failed Cluster Object: {str(attributeType)}")
raise

self._res.append(AttributeReadResult(
Path=path, Status=imStatus, Data=attributeType(attributeValue)))
with self._resLock:
self._res[path] = AttributeReadResult(
Path=path, Status=imStatus, Data=attributeType(attributeValue))
if self._subscription_handler is not None:
self._subscription_handler.OnUpdate(
path, attributeType(attributeValue))
except Exception as ex:
logging.exception(ex)

def handleAttributeData(self, path: AttributePath, status: int, data: bytes):
self._event_loop.call_soon_threadsafe(
self._handleAttributeData, path, status, data)
if self._subscription_handler is not None:
self._handleAttributeData(path, status, data)
else:
self._event_loop.call_soon_threadsafe(
self._handleAttributeData, path, status, data)

def _handleError(self, chipError: int):
self._future.set_exception(
Expand All @@ -160,12 +236,22 @@ def handleError(self, chipError: int):
self._handleError, chipError
)

def _handleDone(self, asd):
def _handleSubscriptionEstablished(self, subscriptionId):
if not self._future.done():
self._subscription_handler = SubscriptionTransaction(
self, subscriptionId, self._devCtrl)
self._future.set_result(self._subscription_handler)

def handleSubscriptionEstablished(self, subscriptionId):
self._event_loop.call_soon_threadsafe(
self._handleSubscriptionEstablished, subscriptionId)

def _handleDone(self):
if not self._future.done():
self._future.set_result(self._res)

def handleDone(self):
self._event_loop.call_soon_threadsafe(self._handleDone, "asdasa")
self._event_loop.call_soon_threadsafe(self._handleDone)


class AsyncWriteTransaction:
Expand Down Expand Up @@ -204,6 +290,7 @@ def handleDone(self):

_OnReadAttributeDataCallbackFunct = CFUNCTYPE(
None, py_object, c_uint16, c_uint32, c_uint32, c_uint32, c_void_p, c_size_t)
_OnSubscriptionEstablishedCallbackFunct = CFUNCTYPE(None, py_object, c_uint64)
_OnReadErrorCallbackFunct = CFUNCTYPE(
None, py_object, c_uint32)
_OnReadDoneCallbackFunct = CFUNCTYPE(
Expand All @@ -217,6 +304,11 @@ def _OnReadAttributeDataCallback(closure, endpoint: int, cluster: int, attribute
EndpointId=endpoint, ClusterId=cluster, AttributeId=attribute), status, dataBytes[:])


@_OnSubscriptionEstablishedCallbackFunct
def _OnSubscriptionEstablishedCallback(closure, subscriptionId):
closure.handleSubscriptionEstablished(subscriptionId)


@_OnReadErrorCallbackFunct
def _OnReadErrorCallback(closure, chiperror: int):
closure.handleError(chiperror)
Expand Down Expand Up @@ -278,9 +370,9 @@ def WriteAttributes(future: Future, eventLoop, device, attributes: List[Attribut
return res


def ReadAttributes(future: Future, eventLoop, device, attributes: List[AttributePath]) -> int:
def ReadAttributes(future: Future, eventLoop, device, devCtrl, attributes: List[AttributePath], subscriptionParameters: SubscriptionParameters = None) -> int:
handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(future, eventLoop)
transaction = AsyncReadTransaction(future, eventLoop, devCtrl)

readargs = []
for attr in attributes:
Expand All @@ -296,8 +388,16 @@ def ReadAttributes(future: Future, eventLoop, device, attributes: List[Attribute
readargs.append(ctypes.c_char_p(path))

ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
minInterval = 0
maxInterval = 0
if subscriptionParameters is not None:
minInterval = subscriptionParameters.MinReportIntervalFloorSeconds
maxInterval = subscriptionParameters.MaxReportIntervalCeilingSeconds
res = handle.pychip_ReadClient_ReadAttributes(
ctypes.py_object(transaction), device, ctypes.c_size_t(len(attributes)), *readargs)
ctypes.py_object(transaction), device,
ctypes.c_bool(subscriptionParameters is not None),
ctypes.c_uint32(minInterval), ctypes.c_uint32(maxInterval),
ctypes.c_size_t(len(attributes)), *readargs)
if res != 0:
ctypes.pythonapi.Py_DecRef(ctypes.py_object(transaction))
return res
Expand All @@ -316,11 +416,11 @@ def Init():
_OnWriteResponseCallbackFunct, _OnWriteErrorCallbackFunct, _OnWriteDoneCallbackFunct])
handle.pychip_ReadClient_ReadAttributes.restype = c_uint32
setter.Set('pychip_ReadClient_InitCallbacks', None, [
_OnReadAttributeDataCallbackFunct, _OnReadErrorCallbackFunct, _OnReadDoneCallbackFunct])
_OnReadAttributeDataCallbackFunct, _OnSubscriptionEstablishedCallbackFunct, _OnReadErrorCallbackFunct, _OnReadDoneCallbackFunct])

handle.pychip_WriteClient_InitCallbacks(
_OnWriteResponseCallback, _OnWriteErrorCallback, _OnWriteDoneCallback)
handle.pychip_ReadClient_InitCallbacks(
_OnReadAttributeDataCallback, _OnReadErrorCallback, _OnReadDoneCallback)
_OnReadAttributeDataCallback, _OnSubscriptionEstablishedCallback, _OnReadErrorCallback, _OnReadDoneCallback)

_BuildAttributeIndex()
Loading

0 comments on commit 3844729

Please sign in to comment.