diff --git a/.travis.yml b/.travis.yml index 8525af6..d2b7594 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,4 +14,4 @@ install: branches: only: - master -script: python tests/test_zocp.py +script: python -m unittest -v tests/test_zocp.py diff --git a/examples/BpyZOCP.py b/examples/BpyZOCP.py index a5732e9..9757243 100644 --- a/examples/BpyZOCP.py +++ b/examples/BpyZOCP.py @@ -149,46 +149,50 @@ def refresh_objects(self): def _register_lamp(self, obj): self.set_object(obj.name, "BPY_Lamp") - self.register_vec3f("location", obj.location[:]) - #self.register_mat3f("worldOrientation", obj.worldOrientation[:]) - self.register_vec3f("orientation", obj.rotation_euler[:]) - self.register_vec3f("scale", obj.scale[:]) - self.register_vec3f("color", obj.data.color[:]) - self.register_float("energy", obj.data.energy) - self.register_float("distance", obj.data.distance) + self.register_vec3f("location", obj.location[:], 're') + #self.register_mat3f("worldOrientation", obj.worldOrientation[:], 're') + self.register_vec3f("orientation", obj.rotation_euler[:], 're') + self.register_vec3f("scale", obj.scale[:], 're') + self.register_vec3f("color", obj.data.color[:], 're') + self.register_float("energy", obj.data.energy, 're') + self.register_float("distance", obj.data.distance, 're') #self.register_int ("state", obj.state) #self.register_float("mass", obj.mass) def _register_camera(self, obj): self.set_object(obj.name, "BPY_Camera") - self.register_vec3f("location", obj.location[:]) - #self.register_mat3f("worldOrientation", obj.worldOrientation[:]) - self.register_vec3f("orientation", obj.rotation_euler[:]) - self.register_float("angle", obj.data.angle, 'r') - self.register_float("shift_x", obj.data.shift_x, 'r') - self.register_float("shift_y", obj.data.shift_y, 'r') + self.register_vec3f("location", obj.location[:], 're') + #self.register_mat3f("worldOrientation", obj.worldOrientation[:], 're') + self.register_vec3f("orientation", obj.rotation_euler[:], 're') + self.register_float("angle", obj.data.angle, 're') + self.register_float("shift_x", obj.data.shift_x, 're') + self.register_float("shift_y", obj.data.shift_y, 're') def _register_mesh(self, obj): self.set_object(obj.name, "BPY_Mesh") - self.register_vec3f("location", obj.location[:]) - #self.register_mat3f("worldOrientation", obj.worldOrientation[:]) - self.register_vec3f("orientation", obj.rotation_euler[:]) - self.register_vec3f("scale", obj.scale[:]) - self.register_vec4f("color", obj.color[:]) - #self.register_int ("state", obj.state) - #self.register_float("mass", obj.mass) + self.register_vec3f("location", obj.location[:], 're') + #self.register_mat3f("worldOrientation", obj.worldOrientation[:], 're') + self.register_vec3f("orientation", obj.rotation_euler[:], 're') + self.register_vec3f("scale", obj.scale[:], 're') + self.register_vec4f("color", obj.color[:], 're') + #self.register_int ("state", obj.state, 're') + #self.register_float("mass", obj.mass, 're') def send_object_changes(self, obj): self.set_object(obj.name, "BPY_Mesh") if self._cur_obj.get("location", {}).get("value") != obj.location[:]: - self.register_vec3f("location", obj.location[:]) + #self.register_vec3f("location", obj.location[:]) + self.emit_signal("location", obj.location[:]) if self._cur_obj.get("orientation", {}).get("value") != obj.rotation_euler[:]: - self.register_vec3f("orientation", obj.rotation_euler[:]) + #self.register_vec3f("orientation", obj.rotation_euler[:]) + self.emit_signal("orientation", obj.rotation_euler[:]) if self._cur_obj.get("scale", {}).get("value") != obj.scale[:]: - self.register_vec3f("scale", obj.scale[:]) + #self.register_vec3f("scale", obj.scale[:]) + self.emit_signal("scale", obj.scale[:]) if obj.type == "LAMP": if self._cur_obj.get("color", {}).get("value") != obj.data.color[:]: - self.register_vec3f("color", obj.data.color[:]) + #self.register_vec3f("color", obj.data.color[:]) + self.emit_signal("color", obj.data.color[:]) if self._cur_obj.get("energy", {}).get("value") != obj.data.energy[:]: self.register_float("energy", obj.data.energy[:]) if self._cur_obj.get("distance", {}).get("value") != obj.data.distance[:]: @@ -199,6 +203,8 @@ def send_object_changes(self, obj): elif obj.type == "CAMERA": self._register_camera(obj) + def emit_signal(self, name, data): + super().emit_signal(".".join(self._cur_obj_keys + (name, )), data) ######################################### # Event methods. These can be overwritten diff --git a/examples/signal_subscriber.py b/examples/signal_subscriber.py index 50be369..2b02558 100755 --- a/examples/signal_subscriber.py +++ b/examples/signal_subscriber.py @@ -23,7 +23,7 @@ def on_peer_enter(self, peer, name, *args, **kwargs): split_name = name.split("@",1) if(split_name[0] == 'subscribee'): #self.signal_subscribe(self.get_uuid(), 'My String', peer, 'My String') - self.signal_subscribe(self.get_uuid(), 'Linked counter', peer, 'Counter') + self.signal_subscribe(self.get_uuid(), 1, peer, 2) if __name__ == '__main__': diff --git a/examples/simple_node.py b/examples/simple_node.py index 676a8fc..be60642 100644 --- a/examples/simple_node.py +++ b/examples/simple_node.py @@ -1,11 +1,13 @@ #!/usr/bin/python -from zocp import ZOCP import logging +from zocp import ZOCP if __name__ == '__main__': zl = logging.getLogger("zocp") zl.setLevel(logging.DEBUG) + sh = logging.StreamHandler() + zl.addHandler(sh) z = ZOCP() z.set_name("ZOCP-Test") diff --git a/src/__init__.py b/src/__init__.py index f1b9110..84657f4 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,3 +1,3 @@ -__all__ = ['zocp'] +__all__ = ['zocp', 'parameter'] from .zocp import ZOCP diff --git a/src/parameter.py b/src/parameter.py new file mode 100644 index 0000000..46bf397 --- /dev/null +++ b/src/parameter.py @@ -0,0 +1,251 @@ +import logging +import json +import uuid +from collections import MutableSequence + +logger = logging.getLogger(__name__) + +def make_dict(keys, value): + """ + returns a nested dict + keys argument must be a list + """ + a = {} + d = a + for key in keys[:-1]: + d = d[key] + d[keys[-1]] = value + return a + +class ZOCPParameterList(MutableSequence): + """ + A container for manipulating lists of parameters + + Perhaps we should use weakrefs: + https://docs.python.org/2/library/weakref.html#weak-reference-objects + + It's also easier to just use a dict but we need this logic in C as + well + """ + def __init__(self): + """Initialize the class""" + self._list = list() + self._free_idx = list() # list of free indexes + + def __len__(self): + """List length""" + return len(self._list) + + def __getitem__(self, ii): + """Get a list item""" + return self._list[ii] + + def __delitem__(self, ii): + """Delete an item by marking""" + if ii >= len(self._list): + raise IndexError("Index {0} to remove is beyond list boundary".format(ii)) + # ii can be negative so convert to real index + idx = ii % len(self._list) + print("deleting idx {0}".format(idx)) + if idx == len(self._list)-1: + self._list.pop() + return + self._list[idx] = None + self._free_idx.append(idx) + + def __setitem__(self, ii, val): + self._list[ii] = val + + def __str__(self): + return "ZOCPParameterList:"+ str(self._list) + + def __repr__(self): + return self._list + + def insert(self, param): + if param.sig_id == None: + # find a free spot in the list + try: + param.sig_id = self._free_idx.pop(0) + except IndexError: + param.sig_id = len(self._list) + self._list.append(param) + else: + print("reusing id", param.sig_id, ) + self._list[param.sig_id] = param + elif param.sig_id == len(self._list): + self._list.append(param) + else: + self._list[param.sig_id] = param + + def append(self, param): + raise NotImplemented("Append is not implemented, use insert") + + def clear(self): + # http://bugs.python.org/issue11388 + try: + while True: + param = self.pop() + param.sig_id = None + except IndexError: + pass + + +class ZOCPParameter(object): + """ + Wrapper class for parameters used through ZOCP + """ + + def __init__(self, znode, value, name, access, type_hint, signature, min=None, max=None, step=None, sig_id=None, *args, **kwargs): + self._value = value + # init meta data + self._znode = znode # reference to the ZOCPNode instance + self.name = name # name of the parameter + self.min = min # minimum value of the parameter (optional) + self.max = max # maximum value of the parameter (optional) + self.step = step + self.access = access # description of access methods (Read,Write,signal Emitter,Signal receiver) + self.type_hint = type_hint # a hint of the type of data + self.signature = signature # signature describing the parameter in memory + self._subscribers = [] # list of peer receivers for emitted signals in case we're an emitter + self._sig_id = sig_id # the id of the parameter (needed for referencing to other nodes) + + # get the params_list and monitor_list before we get extra meta data! + self._params_list = kwargs.pop('params_list', None) + self._monitor_subscribers = kwargs.pop("monitor_list", None) + + self.extended_meta = kwargs # optional extra meta data + + # in case we're an emitter overwrite the set method + if 'e' in self.access: + self.set = self._set_emit + + if self._params_list == None: + self._params_list = self._znode._parameter_list + if self._monitor_subscribers == None: + self._monitor_subscribers = self._znode.monitor_subscribers + # get ourselves an sig_id by inserting in the params_list + self._params_list.insert(self) + + def _set_emit(self, value): + """ + Set and emit value as a signal + """ + self._value = value + msg = json.dumps({'SIG': [self.sig_id, self._value]}) + for peer, recv_id in self._subscribers: + self._znode.whisper(uuid.UUID(peer), msg.encode('utf-8')) + for peer in self._monitor_subscribers: + self._znode.whisper(peer, msg.encode('utf-8')) + + def set_sig_id(self, sig_id): + if self._sig_id != None and sig_id != None: + logger.warning("ZOCPParameter signal id is overwritten from"\ + +" {0} to {1}".format(self._sig_id, sig_id)) + self._sig_id = sig_id + + def get_sig_id(self): + return self._sig_id + + def set_object(self, obj): + """ + Set object path + + we need this in order to find ourselves in the capability tree + """ + self._object = obj + + def get(self): + return self._value + + def set(self, value): + self._value = value + + def subscribe_receiver(self, recv_peer, receiver_id): + # update subscribers list + # TODO: I'm not sure we need to register the receiver_id??? + subscriber = (recv_peer.hex, receiver_id) + if subscriber not in self._subscribers: + self._subscribers.append(subscriber) + if self._object: + data = make_dict(self._object, {"subscribers": self._subscribers}) + else: + data = { self.name: {"subscribers": self._subscribers}} + self._znode._on_modified(data=data) + + def unsubscribe_receiver(self, recv_peer, receiver_id): + # update subscribers list + # TODO: I'm not sure we need to register the receiver_id??? + subscriber = (recv_peer.hex, receiver_id) + if subscriber in self._subscribers: + self._subscribers.remove(subscriber) + if self._object: + data = make_dict(self._object, {"subscribers": self._subscribers}) + else: + data = { self.name: {"subscribers": self._subscribers}} + self._znode._on_modified(data=data) + + def _to_bytes(self): + """ + converts value to an array of bytes + + ref: https://docs.python.org/2/library/stdtypes.html#memoryview + """ + return struct.pack(self.signature, self.value) + + def to_dict(self): + """ + Converts this parameter to a representing dictionary + """ + d = self.extended_meta + d['name'] = self.name + d['value'] = self._value + if self.min: + d['min'] = self.min + if self.max: + d['max'] = self.max + if self.step: + d['step'] = self.step + d['access'] = self.access + d['typeHint'] = self.type_hint + d['sig'] = self.signature + d['sig_id'] = self.sig_id + if 'e' in self.access: + d['subscribers'] = self._subscribers + return d + + def __str__(self): + return str(self.to_dict()) + + def __repr__(self): + return "ZOCPParameter({0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8})"\ + .format("znode", self._value, self.name, self.access, self.type_hint, self.signature, self.min, self.max, self.step, self.sig_id) + #return self.to_dict().__repr__() + + def __dict__(self): + return to_dict() + + def remove(self): + # try to remove itself from the params_list + # could already be done by clear() + try: + self._params_list.remove(self) + except ValueError: + pass + + value = property(get, set) + sig_id = property(get_sig_id, set_sig_id) + + +if __name__ == '__main__': + plist = ZOCPParameterList() + mlist = [] + param1 = ZOCPParameter(None, 1, 'param1', 'rwes', None, 'i', params_list=plist, monitor_list=mlist) + param2 = ZOCPParameter(None, 0.1, 'param2', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + param3 = ZOCPParameter(None, 0.3, 'param3', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + print("removing 3") + param3.remove() + print("adding 4&5") + param4 = ZOCPParameter(None, 0.4, 'param4', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + param5 = ZOCPParameter(None, 0.5, 'param5', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + print(plist) diff --git a/src/zocp.py b/src/zocp.py index fbb4313..84dde97 100755 --- a/src/zocp.py +++ b/src/zocp.py @@ -16,6 +16,7 @@ # License along with this library. from pyre import Pyre +from parameter import ZOCPParameter, ZOCPParameterList import json import zmq import uuid @@ -81,12 +82,13 @@ class ZOCP(Pyre): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.subscriptions = {} - self.subscribers = {} + self.monitor_subscribers = [] self.set_header("X-ZOCP", "1") self.peers_capabilities = {} # peer id : capability data self.capability = kwargs.get('capability', {}) self._cur_obj = self.capability self._cur_obj_keys = () + self._parameter_list = ZOCPParameterList() self._running = False # We always join the ZOCP group self.join("ZOCP") @@ -100,6 +102,7 @@ def set_capability(self, cap): """ Set node's capability, overwites previous """ + logger.warning("DEPRECATED: set_capability is deprecated, capability is contructed automatically") self.capability = cap self._on_modified(data=cap) @@ -172,15 +175,12 @@ def set_object(self, name=None, type="Unknown"): self._cur_obj = self.capability['objects'][name] self._cur_obj_keys = ('objects', name) - def _register_param(self, name, value, type_hint, access='r', min=None, max=None, step=None): - self._cur_obj[name] = {'value': value, 'typeHint': type_hint, 'access':access, 'subscribers': [] } - if min: - self._cur_obj[name]['min'] = min - if max: - self._cur_obj[name]['max'] = max - if step: - self._cur_obj[name]['step'] = step - self._on_modified(data={name: self._cur_obj[name]}) + def _register_param(self, name, value, type_hint, signature, access='r', min=None, max=None, step=None): + param = ZOCPParameter(self, value, name, access, type_hint, signature, min, max, step) + param.set_object(self._cur_obj_keys) + self._cur_obj[name] = param.to_dict() + self._on_modified(data={name: param.to_dict()}) + return param def register_int(self, name, int, access='r', min=None, max=None, step=None): """ @@ -195,7 +195,7 @@ def register_int(self, name, int, access='r', min=None, max=None, step=None): * max: maximal value * step: step value used by increments and decrements """ - self._register_param(name, int, 'int', access, min, max, step) + return self._register_param(name, int, 'int', 'I', access, min, max, step) def register_float(self, name, flt, access='r', min=None, max=None, step=None): """ @@ -210,7 +210,7 @@ def register_float(self, name, flt, access='r', min=None, max=None, step=None): * max: maximal value * step: step value used by increments and decrements """ - self._register_param(name, flt, 'flt', access, min, max, step) + return self._register_param(name, flt, 'flt', 'f', access, min, max, step) def register_percent(self, name, pct, access='r', min=None, max=None, step=None): """ @@ -225,7 +225,7 @@ def register_percent(self, name, pct, access='r', min=None, max=None, step=None) * max: maximal value * step: step value used by increments and decrements """ - self._register_param(name, pct, 'percent', access, min, max, step) + return self._register_param(name, pct, 'percent', 'B', access, min, max, step) def register_bool(self, name, bl, access='r'): """ @@ -237,7 +237,7 @@ def register_bool(self, name, bl, access='r'): * access: 'r' and/or 'w' as to if it's readable and writeable state 'e' if the value can be emitted and/or 's' if it can be received """ - self._register_param(name, bl, 'bool', access) + return self._register_param(name, bl, 'bool', '?', access) def register_string(self, name, s, access='r'): """ @@ -249,7 +249,7 @@ def register_string(self, name, s, access='r'): * access: 'r' and/or 'w' as to if it's readable and writeable state 'e' if the value can be emitted and/or 's' if it can be received """ - self._register_param(name, s, 'string', access) + return self._register_param(name, s, 'string', 's', access) def register_vec2f(self, name, vec2f, access='r', min=None, max=None, step=None): """ @@ -264,7 +264,7 @@ def register_vec2f(self, name, vec2f, access='r', min=None, max=None, step=None) * max: maximal value * step: step value used by increments and decrements """ - self._register_param(name, vec2f, 'vec2f', access, min, max, step) + return self._register_param(name, vec2f, 'vec2f', '2f', access, min, max, step) def register_vec3f(self, name, vec3f, access='r', min=None, max=None, step=None): """ @@ -279,7 +279,7 @@ def register_vec3f(self, name, vec3f, access='r', min=None, max=None, step=None) * max: maximal value * step: step value used by increments and decrements """ - self._register_param(name, vec3f, 'vec3f', access, min, max, step) + return self._register_param(name, vec3f, 'vec3f', '3f', access, min, max, step) def register_vec4f(self, name, vec4f, access='r', min=None, max=None, step=None): """ @@ -294,7 +294,7 @@ def register_vec4f(self, name, vec4f, access='r', min=None, max=None, step=None) * max: maximal value * step: step value used by increments and decrements """ - self._register_param(name, vec4f, 'vec4f', access, min, max, step) + return self._register_param(name, vec4f, 'vec4f', '4f', access, min, max, step) ######################################### # Node methods to peers @@ -329,17 +329,17 @@ def peer_call(self, peer, method, *args): msg = json.dumps({'CALL': [method, args]}) self.whisper(peer, msg.encode('utf-8')) - def signal_subscribe(self, recv_peer, receiver, emit_peer, emitter): + def signal_subscribe(self, recv_peer, receiver_id, emit_peer, emitter_id): """ Subscribe a receiver to an emitter Arguments are: * recv_peer: id of the receiving peer. - * receiver: capability id of the receiver on the receiving peer. + * receiver_id: capability id of the receiver on the receiving peer. If None, no capability on the receiving peer is updated, but a on_peer_signal event is still fired. * emit_peer: id of the peer to subscribe to - * emitter: capability name of the emitter on the peer to + * emitter_id: capability id of the emitter on the peer to subscribe to. If None, all capabilities will emit to the receiver @@ -353,30 +353,37 @@ def signal_subscribe(self, recv_peer, receiver, emit_peer, emitter): peer_subscriptions = {} if emit_peer in self.subscriptions: peer_subscriptions = self.subscriptions[emit_peer] - if not emitter in peer_subscriptions: - peer_subscriptions[emitter] = [receiver] - elif not receiver in peer_subscriptions[emitter]: - peer_subscriptions[emitter].append(receiver) + if not emitter_id in peer_subscriptions: + peer_subscriptions[emitter_id] = [receiver_id] + elif not receiver_id in peer_subscriptions[emitter_id]: + peer_subscriptions[emitter_id].append(receiver_id) self.subscriptions[emit_peer] = peer_subscriptions # check if the peer capability is known - if receiver is not None: - if receiver not in self.peers_capabilities: - self.peer_get(recv_peer, {receiver: {}}) + if receiver_id is not None: + if receiver_id not in self.peers_capabilities: + self.peer_get(recv_peer, {receiver_id: {}}) + + if emit_peer == self.get_uuid(): + # we are the emitter so register the receiver + if emitter_id == None: + self.monitor_subscribers.add(recv_peer) + else: + self._parameter_list[emitter_id].subscribe_receiver(recv_peer, receiver_id) - msg = json.dumps({'SUB': [emit_peer.hex, emitter, recv_peer.hex, receiver]}) + msg = json.dumps({'SUB': [emit_peer.hex, emitter_id, recv_peer.hex, receiver_id]}) self.whisper(emit_peer, msg.encode('utf-8')) - def signal_unsubscribe(self, recv_peer, receiver, emit_peer, emitter): + def signal_unsubscribe(self, recv_peer, receiver_id, emit_peer, emitter_id): """ Unsubscribe a receiver from an emitter Arguments are: * recv_peer: id of the receiving peer - * receiver: capability id of the receiver on the receiving peer, or + * receiver_id: capability id of the receiver on the receiving peer, or None if no receiver was specified when subscribing * emit_peer: id of the peer to unsubscribe from - * emitter: capability name of the emitter on the peer to + * emitter_id3: capability name of the emitter on the peer to unsubscribe from, or None if no emitter was specified during subscription @@ -388,18 +395,26 @@ def signal_unsubscribe(self, recv_peer, receiver, emit_peer, emitter): if recv_peer == self.get_uuid(): # we are the receiver so unregister the emitter if (emit_peer in self.subscriptions and - emitter in self.subscriptions[emit_peer] and - receiver in self.subscriptions[emit_peer][emitter]): - self.subscriptions[emit_peer][emitter].remove(receiver) - if not any(self.subscriptions[emit_peer][emitter]): - self.subscriptions[emit_peer].pop(emitter) + emitter_id in self.subscriptions[emit_peer] and + receiver_id in self.subscriptions[emit_peer][emitter_id]): + self.subscriptions[emit_peer][emitter_id].remove(receiver_id) + if not any(self.subscriptions[emit_peer][emitter_id]): + self.subscriptions[emit_peer].pop(emitter_id) if not any(self.subscriptions[emit_peer]): self.subscriptions.pop(emit_peer) + if emit_peer == self.get_uuid(): + # we are the emitter so unregister the receiver + #self._parameter_list[receiver_id].signal_unsubscribe_receiver(recv_peer, receiver_id) + if emitter_id == None: + self._monitor_subscribers.remove(recv_peer) + else: + self._parameter_list[emitter_id].unsubscribe_receiver(recv_peer, receiver_id) + - msg = json.dumps({'UNSUB': [emit_peer.hex, emitter, recv_peer.hex, receiver]}) + msg = json.dumps({'UNSUB': [emit_peer.hex, emitter_id, recv_peer.hex, receiver_id]}) self.whisper(emit_peer, msg.encode('utf-8')) - def emit_signal(self, emitter, data): + def emit_signal(self, emitter_id, data): """ Update the value of the emitter and signal all subscribed receivers @@ -407,6 +422,8 @@ def emit_signal(self, emitter, data): * emitter: name of the emitting capability * data: value """ + logger.warning("DEPRECATED: just set the value ( set(value) or param.value = ) on the ZOCPParameter") + return self.capability[emitter]['value'] = data msg = json.dumps({'SIG': [emitter, data]}) @@ -552,8 +569,8 @@ def get_message(self): return if type == "EXIT": - if peer in self.subscribers: - self.subscribers.pop(peer) + if peer in self.monitor_subscribers: + self.monitor_subscribers.remove(peer) if peer in self.subscriptions: self.subscriptions.pop(peer) self.on_peer_exit(peer, name, msg) @@ -639,7 +656,7 @@ def _handle_CALL(self, data, peer, name, grp): return def _handle_SUB(self, data, peer, name, grp): - [emit_peer, emitter, recv_peer, receiver] = data + [emit_peer, emitter_id, recv_peer, receiver_id] = data node_id = self.get_uuid() recv_peer = uuid.UUID(recv_peer) @@ -654,31 +671,21 @@ def _handle_SUB(self, data, peer, name, grp): if recv_peer != peer: # check if this should be forwarded (third party subscription request) logger.debug("ZOCP SUB : forwarding subscription request: %s" % data) - self.signal_subscribe(emit_peer, emitter, recv_peer, receiver) + self.signal_subscribe(emit_peer, emitter_id, recv_peer, receiver_id) return - if emitter is not None: - # update subscribers in capability tree - subscriber = (recv_peer.hex, receiver) - subscribers = self.capability[emitter]["subscribers"] - if subscriber not in subscribers: - subscribers.append(subscriber) - self._on_modified(data={emitter: {"subscribers": subscribers}}) - - peer_subscribers = {} - if recv_peer in self.subscribers: - peer_subscribers = self.subscribers[recv_peer] - if not emitter in peer_subscribers: - peer_subscribers[emitter] = [receiver] - elif not receiver in peer_subscribers[emitter]: - peer_subscribers[emitter].append(receiver) - self.subscribers[recv_peer] = peer_subscribers - - self.on_peer_subscribed(recv_peer, name, data) + # the sender is the receiver! This means we are the emitter + if emitter_id == None: + if recv_peer not in self.monitor_subscribers: + self.monitor_subscribers.append(recv_peer) + else: + self._parameter_list[emitter_id].subscribe_receiver(recv_peer, receiver_id) + + self.on_peer_subscribed(peer, name, data) return def _handle_UNSUB(self, data, peer, name, grp): - [emit_peer, emitter, recv_peer, receiver] = data + [emit_peer, emitter_id, recv_peer, receiver_id] = data node_id = self.get_uuid() recv_peer = uuid.UUID(recv_peer) @@ -693,28 +700,18 @@ def _handle_UNSUB(self, data, peer, name, grp): if recv_peer != peer: # check if this should be forwarded (third party unsubscription request) logger.debug("ZOCP UNSUB : forwarding unsubscription request: %s" % data) - self.signal_unsubscribe(emit_peer, emitter, recv_peer, receiver) + self.signal_unsubscribe(emit_peer, emitter_id, recv_peer, receiver_id) return - if emitter is not None: - # update subscribers in capability tree - subscriber = (recv_peer.hex, receiver) - subscribers = self.capability[emitter]["subscribers"] - if subscriber in subscribers: - subscribers.remove(subscriber) - self._on_modified(data={emitter: {"subscribers": subscribers}}) - - if (recv_peer in self.subscribers and - emitter in self.subscribers[recv_peer] and - receiver in self.subscribers[recv_peer][emitter]): - self.subscribers[recv_peer][emitter].remove(receiver) - if not any(self.subscribers[recv_peer][emitter]): - self.subscribers[recv_peer].pop(emitter) - if not any(self.subscribers[recv_peer]): - self.subscribers.pop(recv_peer) - - self.on_peer_unsubscribed(peer, name, data) - return + if emitter_id == None: + try: + self.monitor_subscribers.remove(recv_peer) + except ValueError: + pass + else: + self._parameter_list[emitter_id].unsubscribe_receiver(recv_peer, receiver_id) + + self.on_peer_unsubscribed(peer, name, data) def _handle_REP(self, data, peer, name, grp): return @@ -724,20 +721,20 @@ def _handle_MOD(self, data, peer, name, grp): self.on_peer_modified(peer, name, data) def _handle_SIG(self, data, peer, name, grp): - [emitter, value] = data - if emitter in self.peers_capabilities[peer]: - self.peers_capabilities[peer][emitter].update({'value': value}) + [emitter_id, value] = data + if emitter_id in self.peers_capabilities[peer]: + self.peers_capabilities[peer][emitter_id].update({'value': value}) if peer in self.subscriptions: subscription = self.subscriptions[peer] - if emitter in subscription: + if emitter_id in subscription: # propagate the signal if it changes the value of this node - receivers = subscription[emitter] - for receiver in receivers: - if receiver is not None and self.capability[receiver]['value'] != value: - self.emit_signal(receiver, value) + receivers = subscription[emitter_id] + for receiver_id in receivers: + if receiver_id is not None and self._parameter_list[receiver_id].value != value: + self._parameter_list[receiver_id].value = value - if None in subscription or emitter in subscription: + if None in subscription or emitter_id in subscription: self.on_peer_signaled(peer, name, data) def _on_modified(self, data, peer=None, name=None): @@ -750,29 +747,12 @@ def _on_modified(self, data, peer=None, name=None): data = new_data self.on_modified(peer, name, data) - if len(data) == 1: - # if the only modification is a value change, - # emit a SIG instead of a MOD - name = list(data.keys())[0] - if len(data[name]) == 1 and 'value' in data[name]: - msg = json.dumps({'SIG': [name, data[name]['value']]}) - for subscriber in self.subscribers: - # no need to send the signal to the node that - # modified the value - if subscriber != peer and ( - None in self.subscribers[subscriber] or - name in self.subscribers[subscriber]): - self.whisper(subscriber, msg.encode('utf-8')) - data = {} - - if any(data): + if len(data): msg = json.dumps({ 'MOD' :data}).encode('utf-8') - for subscriber in self.subscribers: + for subscriber in self.monitor_subscribers: # inform node that are subscribed to one or more # updated capabilities that they have changed - if subscriber != peer and ( - None in self.subscribers[subscriber] or - len(set(self.subscribers[subscriber]) & set(data)) > 0): + if subscriber != peer: self.whisper(subscriber, msg) def run_once(self, timeout=None): @@ -790,7 +770,7 @@ def run_once(self, timeout=None): for fd, ev in items.items(): if self.inbox == fd and ev == zmq.POLLIN: self.get_message() - # just q quick query + # just a quick query items = dict(self.poller.poll(0)) def run(self, timeout=None): diff --git a/tests/test_parameter.py b/tests/test_parameter.py new file mode 100644 index 0000000..35c4b5c --- /dev/null +++ b/tests/test_parameter.py @@ -0,0 +1,85 @@ +import unittest +from parameter import ZOCPParameter, ZOCPParameterList + +class ZOCPTest(unittest.TestCase): + + def test_insert(self): + plist = ZOCPParameterList() + mlist = [] + param1 = ZOCPParameter(None, 1, 'param1', 'rwes', None, 'i', params_list=plist, monitor_list=mlist) + self.assertEqual(len(plist), 1) + self.assertIs(param1, plist[param1.sig_id]) + param2 = ZOCPParameter(None, 0.1, 'param2', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + self.assertEqual(len(plist), 2) + self.assertEqual(plist.index(param1), param1.sig_id) + self.assertIs(param1, plist[param1.sig_id]) + self.assertIs(param2, plist[param2.sig_id]) + param3 = ZOCPParameter(None, 0.3, 'param3', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + self.assertEqual(len(plist), 3) + self.assertIs(param1, plist[param1.sig_id]) + self.assertIs(param2, plist[param2.sig_id]) + self.assertIs(param3, plist[param3.sig_id]) + self.assertEqual(plist.index(param1), param1.sig_id) + self.assertEqual(plist.index(param2), param2.sig_id) + self.assertEqual(plist.index(param3), param3.sig_id) + + def test_remove(self): + plist = ZOCPParameterList() + mlist = [] + param1 = ZOCPParameter(None, 1, 'param1', 'rwes', None, 'i', params_list=plist, monitor_list=mlist) + param2 = ZOCPParameter(None, 0.1, 'param2', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + param3 = ZOCPParameter(None, 0.3, 'param3', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + self.assertEqual(len(plist), 3) + # remove param2 and test if its id is reused + param2.remove() + # list length remains the same + self.assertEqual(len(plist), 3) + # param2's id is in the free_idx list + self.assertIn(1, plist._free_idx) + param4 = ZOCPParameter(None, 0.3, 'param4', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + self.assertEqual(param2.sig_id, 1) + del param2 + # test if param4's id equals 1 (previously id of param2) + self.assertEqual(plist.index(param4), 1) + self.assertEqual(param4.sig_id, 1) + param3.remove() + self.assertEqual(len(plist), 2) + self.assertEqual(0, len(plist._free_idx)) + + def test_params_list(self): + plist = ZOCPParameterList() + mlist = [] + param1 = ZOCPParameter(None, 1, 'param1', 'rwes', None, 'i', params_list=plist, monitor_list=mlist) + param2 = ZOCPParameter(None, 0.1, 'param2', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + param3 = ZOCPParameter(None, 0.3, 'param3', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + self.assertIsInstance(plist._list, list) + self.assertIsInstance(plist, ZOCPParameterList) + self.assertIs(param1, plist[param1.sig_id]) + self.assertIs(param2, plist[param2.sig_id]) + self.assertIs(param3, plist[param3.sig_id]) + + def test_dict_out(self): + plist = ZOCPParameterList() + mlist = [] + param1 = ZOCPParameter(None, 1, 'param1', 'rwes', None, 'i', params_list=plist, monitor_list=mlist) + d = {'sig_id': 0, 'name': 'param1', 'access': 'rwes', 'typeHint': None, 'sig': 'i', 'subscribers': [], 'value': 1} + self.assertDictEqual(param1.to_dict(), d) + param2 = ZOCPParameter(None, 0.1, 'param2', 'rw', 'float', 'f', -1.0, 1.0, 0.01, params_list=plist, monitor_list=mlist) + d = {'sig_id': 1, 'name': 'param2', 'access': 'rw', 'typeHint': 'float', 'sig': 'f', 'value': 0.1, 'min': -1.0, 'max': 1.0, 'step': 0.01} + self.assertDictEqual(param2.to_dict(), d) + + @unittest.skip("custom JSON serializing is kind of pain in the ***") + def test_serialize(self): + plist = ZOCPParameterList() + mlist = [] + param1 = ZOCPParameter(None, 1, 'param1', 'rwes', None, 'i', params_list=plist, monitor_list=mlist) + import json + js = json.dumps(param1) + self.assertIsInstance(js, str) + param2 = ZOCPParameter(None, 0.1, 'param2', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + param3 = ZOCPParameter(None, 0.3, 'param3', 'rw', None, 'f', params_list=plist, monitor_list=mlist) + js = json.dumps(plist) + +if __name__ == '__main__': + #ZOCPTest().test_remove() + unittest.main() diff --git a/tests/test_zocp.py b/tests/test_zocp.py index bbc5913..c509ca6 100644 --- a/tests/test_zocp.py +++ b/tests/test_zocp.py @@ -2,15 +2,18 @@ import zocp import zmq import time +import logging class ZOCPTest(unittest.TestCase): def setUp(self, *args, **kwargs): - ctx = zmq.Context() - self.node1 = zocp.ZOCP(ctx=ctx) + #zl = logging.getLogger("zocp") + #zl.setLevel(logging.DEBUG) + self.ctx = zmq.Context() + self.node1 = zocp.ZOCP(ctx=self.ctx) self.node1.set_header("X-TEST", "1") self.node1.set_name("node1") - self.node2 = zocp.ZOCP(ctx=ctx) + self.node2 = zocp.ZOCP(ctx=self.ctx) self.node2.set_header("X-TEST", "1") self.node2.set_name("node2") self.node1.start() @@ -22,6 +25,11 @@ def setUp(self, *args, **kwargs): def tearDown(self): self.node1.stop() self.node2.stop() + try: + self.monitor_node.stop() + except: + pass + self.ctx.destroy() # end tearDown def test_get_name(self): @@ -75,42 +83,125 @@ def test_get_peer_groups(self): self.assertIn("TEST", self.node2.get_peer_groups()) # end test_get_peer_groups + def test_param(self): + # make node2 monitor node1 + self.node2.signal_subscribe(self.node2.get_uuid(), None, self.node1.get_uuid(), None) + self.node1.run_once(100) + self.node2.run_once(100) + # add parameter to node1 and retrieve from node2 + bool_param = self.node1.register_bool("test_bool", True, 'rw') + # sends out a MOD + self.node1.run_once(100) + self.node2.run_once(100) + self.node1.run_once(100) + self.node2.run_once(100) + self.assertIn("test_bool", self.node2.peers_capabilities.get(self.node1.get_uuid(), {}).keys()) + self.assertDictEqual(bool_param.to_dict(), self.node2.peers_capabilities.get(self.node1.get_uuid()).get("test_bool")) + + def test_object_param(self): + # make node2 monitor node1 + self.node2.signal_subscribe(self.node2.get_uuid(), None, self.node1.get_uuid(), None) + self.node1.run_once(100) + self.node2.run_once(100) + # add parameter to node1 and retrieve from node2 + self.node1.set_object("test_object") + float_param = self.node1.register_float("test_float", True, 'rw') + # sends out a MOD + self.node1.run_once(100) + self.node2.run_once(100) + self.node1.run_once(100) + self.node2.run_once(100) + self.assertIn("test_object", self.node2.peers_capabilities.get(self.node1.get_uuid(), {}).get("objects").keys()) + self.assertIn("test_float", self.node2.peers_capabilities.get(self.node1.get_uuid(), {}).get("objects").get("test_object").keys()) + self.assertDictEqual(float_param.to_dict(), self.node2.peers_capabilities[self.node1.get_uuid()]['objects']["test_object"]["test_float"]) + def test_signal_subscribe(self): self.node1.register_float("TestEmitFloat", 1.0, 'rwe') self.node2.register_float("TestRecvFloat", 1.0, 'rws') # give time for dispersion self.node1.run_once() self.node2.run_once() - self.node2.signal_subscribe(self.node2.get_uuid(), "TestRecvFloat", self.node1.get_uuid(), "TestEmitFloat") + self.node2.signal_subscribe(self.node2.get_uuid(), 0, self.node1.get_uuid(), 0) # give time for dispersion time.sleep(0.5) self.node1.run_once() # subscriptions structure: {Emitter nodeID: {'EmitterID': ['Local ReceiverID']}} - self.assertIn("TestRecvFloat", self.node2.subscriptions[self.node1.get_uuid()]["TestEmitFloat"]) - self.assertIn("TestRecvFloat", self.node1.subscribers[self.node2.get_uuid()]["TestEmitFloat"]) + self.assertIn(0, self.node2.subscriptions[self.node1.get_uuid()][0]) + self.assertIn((self.node2.get_uuid().hex, 0), self.node1._parameter_list[0]._subscribers) # unsubscribe - self.node2.signal_unsubscribe(self.node2.get_uuid(), "TestRecvFloat", self.node1.get_uuid(), "TestEmitFloat") + self.node2.signal_unsubscribe(self.node2.get_uuid(), 0, self.node1.get_uuid(), 0) time.sleep(0.5) self.node1.run_once() - self.assertNotIn("TestRecvFloat", self.node2.subscriptions.get(self.node1.get_uuid(), {}).get("TestEmitFloat", {})) - self.assertNotIn("TestRecvFloat", self.node1.subscribers.get(self.node2.get_uuid(), {}).get("TestEmitFloat", {})) + self.assertNotIn(self.node1.get_uuid(), self.node2.subscriptions) + self.assertNotIn((self.node2.get_uuid().hex, 0), self.node1._parameter_list[0]._subscribers) - def test_emit_signal(self): + def test_signal_monitor(self): self.node1.register_float("TestEmitFloat", 1.0, 'rwe') self.node2.register_float("TestRecvFloat", 1.0, 'rws') + self.monitor_node = zocp.ZOCP(ctx=self.ctx) + self.monitor_node.set_name("monitor") + self.monitor_node.start() + # give time for dispersion + self.node1.run_once(100) + self.node2.run_once(100) + self.monitor_node.run_once(100) + # subscribe monitor to both nodes + self.monitor_node.signal_subscribe(self.monitor_node.get_uuid(), None, self.node1.get_uuid(), None) + self.monitor_node.signal_subscribe(self.monitor_node.get_uuid(), None, self.node2.get_uuid(), None) + # give time for dispersion + time.sleep(0.5) + self.node1.run_once(100) + self.node2.run_once(100) + self.monitor_node.run_once(100) + # monitor subscribers structure: [PeerId] + self.assertIn(self.monitor_node.get_uuid(), self.node1.monitor_subscribers) + self.assertIn(self.monitor_node.get_uuid(), self.node2.monitor_subscribers) + # subscribe signal from node2 to node1 + self.node2.signal_subscribe(self.node2.get_uuid(), 0, self.node1.get_uuid(), 0) + time.sleep(0.5) + self.node1.run_once(100) + self.node2.run_once(100) + self.monitor_node.run_once(100) + #time.sleep(0.5) + self.node1.run_once(100) + self.node2.run_once(100) + self.monitor_node.run_once(100) + # monitor should now know about subscription of node2 to node1 + for key, val in self.monitor_node.peers_capabilities[self.node1.get_uuid()].items(): + if val.get('sig_id') == 0: + self.assertIn([self.node2.get_uuid().hex, 0], val.get('subscribers', ['subscribersnotfound'])) + # unsubscribe + self.node2.signal_unsubscribe(self.node2.get_uuid(), 0, self.node1.get_uuid(), 0) + time.sleep(0.5) + self.node1.run_once(100) + self.node2.run_once(100) + self.monitor_node.run_once(100) + self.node1.run_once(100) + self.node2.run_once(100) + self.monitor_node.run_once(100) + # monitor should now not now about subscription of node2 to node1 + for key, val in self.monitor_node.peers_capabilities[self.node1.get_uuid()].items(): + if val.get('sig_id') == 0: + self.assertNotIn([self.node2.get_uuid().hex, 0], val.get('subscribers', ['subscribersnotfound'])) + self.assertNotIn(self.node1.get_uuid(), self.node2.subscriptions) + self.assertNotIn((self.node2.get_uuid().hex, 0), self.node1._parameter_list[0]._subscribers) + + def test_emit_signal(self): + param1 = self.node1.register_float("TestEmitFloat", 1.0, 'rwe') + param2 = self.node2.register_float("TestRecvFloat", 1.0, 'rws') # give time for dispersion time.sleep(0.5) self.node1.run_once() - self.node2.signal_subscribe(self.node2.get_uuid(), "TestRecvFloat", self.node1.get_uuid(), "TestEmitFloat") + self.node2.signal_subscribe(self.node2.get_uuid(), 0, self.node1.get_uuid(), 0) # give time for dispersion time.sleep(0.1) self.node1.run_once() - self.node1.emit_signal("TestEmitFloat", 2.0) + param1.set(2.0) #self.node1.emit_signal(0, 2.0) time.sleep(0.1) self.node2.run_once() - self.assertEqual(2.0, self.node2.capability["TestRecvFloat"]["value"]) + self.assertEqual(2.0, param2.get()) # unsubscribe - self.node2.signal_unsubscribe(self.node2.get_uuid(), "TestRecvFloat", self.node1.get_uuid(), "TestEmitFloat") + self.node2.signal_unsubscribe(self.node2.get_uuid(), 0, self.node1.get_uuid(), 0) time.sleep(0.1) self.node1.run_once() # end ZOCPTest